source: trunk/oarutils/oar-parexec @ 93

Last change on this file since 93 was 89, checked in by g7moreau, 12 years ago
  • Example of transmit use
File size: 19.3 KB
Line 
1#!/usr/bin/perl
2#
3# 2011/11/27 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
16use Cwd qw( getcwd );
17
18my $file;
19my $dir;
20my $cmd;
21my $logtrace;
22my $verbose;
23my $job_np         = 1;
24my $nodefile       = $ENV{OAR_NODE_FILE} || '';
25my $masterio;
26my $switchio;
27my $help;
28my $oarsh          = 'oarsh -q -T';
29my $sig_transmit;
30my $sig_checkpoint = 'USR2';
31
32Getopt::Long::GetOptions(
33   'file=s'     => \$file,
34   'dir=s'      => \$dir,
35   'cmd=s'      => \$cmd,
36   'logtrace=s' => \$logtrace,
37   'verbose'    => \$verbose,
38   'help'       => \$help,
39   'oarsh=s'    => \$oarsh,
40   'jobnp=i'    => \$job_np,
41   'nodefile=s' => \$nodefile,
42   'masterio=s' => \$masterio,
43   'switchio'   => \$switchio,
44   'transmit'   => \$sig_transmit,
45   'kill=s'     => \$sig_checkpoint,
46   ) || pod2usage(-verbose => 0);
47pod2usage(-verbose => 2) if $help;
48pod2usage(-verbose => 2) if not (
49 (-e "$file")
50 or (-d "$dir" and $cmd ne '')
51 );
52
53# re-run, keep trace of job already done
54my %state;
55my $log_h = IO::File->new();
56if (-e "$logtrace") {
57   $log_h->open("< $logtrace")
58      or die "error: can't read log file: $!";
59   while (<$log_h>) {
60      $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/;
61      $state{$1} = 'end'   if m/^end\s+job\s+([^\s]+)\s/;
62      }
63   $log_h->close();
64   }
65if ($logtrace) {
66   $log_h->open(">> $logtrace")
67      or die "error: can't append log file $logtrace: $!";
68   $log_h->autoflush;
69   $log_h = unblock $log_h;
70   }
71
72# job to run
73my @job = ();
74if (-e "$file") {
75   my $job_num = 0;
76   open(JOB_LIST, '<', "$file") or die "error: can't open job file $file: $!";
77   while (my $job_cmd = <JOB_LIST>) {
78      chomp $job_cmd;
79      next if $job_cmd =~ m/^#/;
80      next if $job_cmd =~ m/^\s*$/;
81      $job_num++;
82      my ($job_name) = $job_cmd =~ m/#.*?\bname=(\S+?)\b/i;
83      $job_name ||= $job_num;
84      push @job, {
85         name   => $job_name,
86         cmd    => "$job_cmd",
87         num    => $job_num,
88         };
89      }
90   close JOB_LIST;
91   }
92else {
93   my $job_num = 0;
94   opendir(DIR, $dir) or die "error: can't open folder $dir: $!";
95   while (my $item = readdir(DIR)) {
96      next if $item =~ m/^\./;
97      next if $item =~ m/:/;
98      next if $item =~ m/\.old$/;
99      next if $item =~ m/\.sav$/;
100      next if $item =~ m/\.bak$/;
101      next if $item =~ m/\.no$/;
102      next unless (-d "$dir/$item");
103      $job_num++;
104      push @job, {
105         name   => $item,
106         cmd    => "cd $dir/$item/; $cmd",
107         num    => $job_num,
108         };
109      }
110   closedir DIR;
111   }
112
113# assume unique job name
114{
115   my %seen = ();
116   my $count_unique_name = grep { ! $seen{ $_->{name} }++ } @job;
117   if ($count_unique_name != $#job) {
118      $_->{name} = $_->{num} for @job;
119      }
120   }
121
122# ressources available
123my @ressources = ();
124open(NODE_FILE, '<', "$nodefile")
125   or die "can't open $nodefile: $!";
126while (<NODE_FILE>) {
127   chomp;
128   next if m/^#/;
129   next if m/^\s*$/;
130   push @ressources, $_;
131   }
132close NODE_FILE;
133
134my $ressource_size = scalar(@ressources);
135die "error: not enought ressources jobnp $job_np > ressources $ressource_size"
136   if $job_np > $ressource_size;
137
138my $current_dir = getcwd();
139
140my $stderr = $ENV{OAR_STDERR} || '';
141$stderr =~ s/\.stderr$//;
142$stderr = $masterio if $masterio;
143my $stdout = $ENV{OAR_STDOUT} || '';
144$stdout =~ s/\.stdout$//;
145$stdout = $masterio if $masterio;
146
147my $finished = new Coro::Signal;
148my $job_todo = new Coro::Semaphore 0;
149my $job_name_maxlen;
150for (@job) {
151   $job_todo->up;
152   $job_name_maxlen = length($_->{name}) if length($_->{name}) > $job_name_maxlen;
153   }
154
155# slice of ressources for parallel job
156my $ressources = new Coro::Channel;
157for my $slot (1 .. int($ressource_size / $job_np)) {
158   $ressources->put(
159      join(',',
160         @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ])
161         );
162   }
163
164my %scheduled = ();
165
166# OAR checkpoint and default signal SIGUSR2
167my $oar_checkpoint = new Coro::Semaphore 0;
168my $notify         = new Coro::Signal;
169$SIG{$sig_checkpoint} = sub {
170   print "warning: receive checkpoint at "
171      . time
172      . ", no new job, just finishing running job\n"
173      if $verbose;
174   $oar_checkpoint->up();
175   $notify->send if $sig_transmit;
176   };
177
178# asynchrone notify job
179async {
180   while () {
181      $notify->wait;
182
183      for my $job_pid (keys %scheduled) {
184         my $job_name     = $scheduled{$job_pid}->{name};
185         my $job_pidfile  = $scheduled{$job_pid}->{pidfile};
186         my $node_connect = $scheduled{$job_pid}->{node_connect};
187
188         my $fh = IO::File->new();
189         $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
190            or die "error: can't notify subjob: $!";
191
192         $fh->autoflush;
193         $fh = unblock $fh;
194
195         $fh->print("kill -$sig_checkpoint \$(cat $job_pidfile)\n");
196         $fh->print("exit\n");
197
198         print "warning: transmit signal $sig_checkpoint"
199            . " to job $job_name on node $node_connect.\n"
200            if $verbose;
201
202         close $fh;
203         cede;
204         }
205      }
206   }
207
208# asynchrone start job block
209async {
210   JOB:
211   for my $job (@job) {
212      my $job_name   = $job->{name};
213      my $job_cmd    = $job->{cmd};
214
215      # job has been already run ?
216      if (exists $state{$job_name}) {
217         if ($state{$job_name} eq 'start') {
218            print "warning: job $job_name was not clearly finished, relaunching...\n"
219               if $verbose;
220            }
221         elsif ($state{$job_name} eq 'end') {
222            delete $state{$job_name}; # free memory
223            $job_todo->down;
224            print "warning: job $job_name already run\n" if $verbose;
225            cede;
226            next JOB;
227            }
228         }
229
230      # take job ressource
231      my $job_ressource = $ressources->get;
232
233      # no more launch job when OAR checkpointing
234      last JOB if $oar_checkpoint->count() > 0;
235
236      my ($node_connect) = split ',', $job_ressource;
237      my $fh = IO::File->new();
238      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
239         or die "error: can't start subjob: $!";
240
241      $fh->autoflush;
242      $fh = unblock $fh;
243
244      my $msg = sprintf "start job %${job_name_maxlen}s / %5i at %s on node %s\n",
245         $job_name, $job_pid, time, $job_ressource;
246      $log_h->print($msg) if $logtrace;
247      print($msg) if $verbose;
248
249      my ($job_stdout, $job_stderr);
250      $job_stdout = ">  $stdout-$job_name.stdout" if $stdout ne '' and $switchio;
251      $job_stderr = "2> $stderr-$job_name.stderr" if $stderr ne '' and $switchio;
252
253      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$ENV{OAR_JOB_ID}-$job_name";
254      my $job_pidfile  = "/tmp/oar-parexec-$ENV{LOGNAME}-$ENV{OAR_JOB_ID}-$job_name.pid";
255
256      $scheduled{$job_pid} = {
257         fh           => $fh,
258         node_connect => $node_connect,
259         ressource    => $job_ressource,
260         name         => $job_name,
261         pidfile      => $job_pidfile,
262         };
263
264      # set job environment, run it and clean
265      if ($job_np > 1) {
266         $fh->print("printf \""
267               . join('\n', split(',', $job_ressource,))
268               . "\" > $job_nodefile\n");
269         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
270         $fh->print("OAR_NP=$job_np\n");
271         $fh->print("export OAR_NODE_FILE\n");
272         $fh->print("export OAR_NP\n");
273         $fh->print("unset OAR_MSG_NODEFILE\n");
274         }
275
276      $fh->print("cd $current_dir\n");
277
278      if ($sig_transmit) {
279         $fh->print("trap 'jobs -p|xargs -r ps -o pid --no-headers --ppid|xargs -r kill -$sig_checkpoint' $sig_checkpoint\n");
280         $fh->print("echo \$\$ > $job_pidfile\n");
281         }
282
283      $fh->print("(\n");
284      $fh->print("$job_cmd\n");
285      $fh->print(") $job_stdout $job_stderr \&\n");
286      $fh->print("while [ \$(jobs -p | wc -l) -gt 0 ]\n");
287      $fh->print("do\n");
288      $fh->print("   wait\n");
289      $fh->print("done\n");
290
291      $fh->print("rm -f $job_pidfile\n")  if $sig_transmit;
292      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
293      $fh->print("exit\n");
294      cede;
295      }
296   }
297
298# asynchrone end job block
299async {
300   while () {
301      for my $job_pid (keys %scheduled) {
302         # non blocking PID test
303         if (waitpid($job_pid, WNOHANG)) {
304            my $msg = sprintf "end   job %${job_name_maxlen}s / %5i at %s on node %s\n",
305               $scheduled{$job_pid}->{name},
306               $job_pid, time, $scheduled{$job_pid}->{ressource};
307
308            # Job non finish, just suspend if received checkpoint signal
309            $msg =~ s/^end\s+job/suspend job/
310               if $sig_transmit and $oar_checkpoint->count() > 0;
311
312            $log_h->print($msg) if $logtrace;
313            print($msg) if $verbose;
314            close $scheduled{$job_pid}->{fh};
315            # leave ressources for another job
316            $ressources->put($scheduled{$job_pid}->{ressource});
317            $job_todo->down;
318            delete $scheduled{$job_pid};
319            }
320         cede;
321         }
322
323      # checkpointing ! just finishing running job and quit
324      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
325
326      $finished->send if $job_todo->count() == 0;
327      cede;
328      }
329   }
330
331cede;
332
333# all job have been done
334$finished->wait;
335
336# close log trace file
337$log_h->close() if $logtrace;
338
339__END__
340
341=head1 NAME
342
343oar-parexec - parallel execution of many small short or long job
344
345=head1 SYNOPSIS
346
347 oar-parexec --file filecommand \
348    [--logtrace tracefile] [--verbose] \
349    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
350    [--switchio] [--masterio basefileio] \
351    [--kill signal] [--transmit]
352
353 oar-parexec --dir foldertoiterate --cmd commandtolaunch \
354    [--logtrace tracefile] [--verbose] \
355    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
356    [--switchio] [--masterio basefileio] \
357    [--kill signal] [--transmit]
358
359 oar-parexec --help
360
361=head1 DESCRIPTION
362
363C<oar-parexec> can execute lot of small short or long job in parallel inside a cluster.
364Number of parallel job at one time cannot exceed the number of core define in the node file.
365C<oar-parexec> is easier to use inside an OAR job environment
366which define automatically these strategics parameters...
367However, it can be used outside OAR.
368
369Option C<--file> or C<--dir> and C<--cmd> are the only mandatory parameters.
370
371Small job will be launch in the same folder as the master job.
372Two environment variable are defined for each small job
373and only in case of parallel small job (option C<--jobnp> > 1).
374
375 OAR_NODE_FILE - file that list node for parallel computing
376 OAR_NP        - number of processor affected
377
378The file define by OAR_NODE_FILE is created  in /tmp
379on the node before launching the small job
380and this file will be delete after job complete.
381C<oar-parexec> is a simple script,
382OAR_NODE_FILE will not be deleted in case of crash of the master job.
383
384OAR define other variable that are equivalent to OAR_NODE_FILE:
385OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
386You can use in your script the OAR original file ressources
387by using these variable if you need it.
388
389When use with long job,
390activate option C<--tranmit> to send OAR checkpoint signal
391and suspend small job before the walltime cut!
392
393=head1 OPTIONS
394
395=over 12
396
397=item B<-f|--file filecommand>
398
399File name which content job list.
400For the JOB_NAME definition,
401the first valid job in the list will have the number 1 and so on...
402
403It's possible to fix the name inside a comment on the job line.
404For example:
405
406 $HOME/test/subjob1.sh # name=subjob1
407
408The key C<name> is case insensitive,
409the associated value cannot have a space...
410
411The command can be any shell command.
412It's possible to change folder,
413or launch an asynchrone job in parallel,
414but one command must block and not be launch in asynchrone (with & or coproc).
415Example :
416
417 cd ./test; ./subjob1.sh
418 cd ./test; nice -18 du -sk ./ & ./test/subjob1.sh
419
420Command C<du -sk ./> will be done in parallel on the same ressource...
421
422=item B<-d|--dir foldertoiterate>
423
424Command C<--cmd> will be launch in all sub-folder of this master folder.
425Files in this folder will be ignored.
426Sub-folder name which begin with F<.>
427or finish with F<.old>, F<.sav>, F<.bak>, F<.no> will either be ignored...
428
429The JOB_NAME is simply the Sub-folder name.
430
431=item B<-c|--cmd commandtolaunch>
432
433Command (and argument to it) that will be launch in all sub-folder
434parameter folfer C<--dir>.
435Like for option C<--file>, command can be any valid shell command
436but one must block.
437
438=item B<-l|--logtrace tracefile>
439
440File which log and trace running job.
441In case of running the same master command (after crash for example),
442only job that are not mark as done will be run again.
443Be careful, job mark as running (start but not finish) will be run again.
444Tracing is base on the JOB_NAME between multiple run.
445
446This option is very usefull in case of crash
447but also for checkpointing and idempotent OAR job.
448
449=item B<-v|--verbose>
450
451=item B<-j|--jobnp integer>
452
453Number of processor to allocated for each small job.
4541 by default.
455
456=item B<-n|--nodefile filenode>
457
458File name that list all the node where job could be launch.
459By defaut, it's define automatically by OAR via
460environment variable C<OAR_NODE_FILE>.
461
462For example, if you want to use 6 core on your cluster node,
463you need to put 6 times the hostname node in this file,
464one per line...
465It's a very common file in MPI process !
466
467=item B<-o|-oarsh command>
468
469Command use to launch a shell on a node.
470By default
471
472 oarsh -q -T
473
474Change it to C<ssh> if you are not using an OAR cluster...
475
476=item B<-s|--switchio>
477
478Each small job will have it's own output STDOUT and STDERR
479base on master OAR job with C<JOB_NAME> inside
480(or base on C<basefileio> if option C<masterio>).
481Example :
482
483 OAR.151524.stdout -> OAR.151524-JOB_NAME.stdout
484
485where 151524 here is the master C<OAR_JOB_ID>
486and C<JOB_NAME> is the small job name.
487
488=item B<-m|--masterio basefileio>
489
490The C<basefileio> will be use in place of environment variable
491C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
492(only use when option C<swithio> is activated).
493
494=item B<-k|--kill signal>
495
496Signal to listen and make a clean stop of the current C<oar-parexec> process.
497By default, use USR2 signal (see C<kill -l>> for a list of possible signal).
498
499=item B<-t|--transmit>
500
501Resend catch signal to sub-job when receiving it.
502By default, no signal is transmis to child process.
503
504It's only valuable if use for long sub-job than can
505in return make themselves a clean restart.
506
507
508=item B<-h|--help>
509
510=back
511
512
513=head1 EXAMPLE
514
515=head2 Simple list of sequential job
516
517Content for the job file command (option C<--file>) could have:
518
519 - empty line
520 - comment line begin with #
521 - valid shell command (can containt comment)
522
523Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
524
525 $HOME/test/subjob01.sh  # name=subjob01
526 $HOME/test/subjob02.sh  # name=subjob02
527 $HOME/test/subjob03.sh  # name=subjob03
528 $HOME/test/subjob04.sh  # name=subjob04
529 ...
530 $HOME/test/subjob38.sh  # name=subjob38
531 $HOME/test/subjob39.sh  # name=subjob39
532 $HOME/test/subjob40.sh  # name=subjob40
533
534These jobs could be launch by:
535
536 oarsub -n test -l /core=6,walltime=04:00:00 \
537   "oar-parexec -f ./subjob.list.txt"
538
539=head2 Folder job
540
541In a folder F<subjob.d>, create sub-folder with your data inside : F<test1>, <test2>...
542The same command will be executed in every sub-folder.
543C<oar-parexec> change the current directory to the sub-folder before launching it.
544
545A very simple job could be:
546
547 oarsub -n test -l /core=6,walltime=04:00:00 \
548   "oar-parexec -d ./subjob.d -c 'sleep 10; env'"
549
550The command C<env> will be excuted in all folder F<test1>, F<test2>... after a 10s pause.
551
552Sometime, it's simpler to use file list command,
553sometime, jobs by folder with the same command run is more relevant.
554
555=head2 Parallel job
556
557You need to put the number of core each small job need with option C<--jobnp>.
558If your job is build on OpenMP or MPI,
559you can use OAR_NP and OAR_NODE_FILE variables to configure them.
560On OAR cluster, you need to use C<oarsh> or a wrapper like C<oar-envsh>
561for connexion between node instead of C<ssh>.
562
563Example with parallel small job on 2 core:
564
565 oarsub -n test -l /core=6,walltime=04:00:00 \
566   "oar-parexec -j 2 -f ./subjob.list.txt"
567
568=head2 Tracing and master crash
569
570If the master node crash after hours of calculus, everything is lost ?
571No, with option C<--logtrace>,
572it's possible to remember older result
573and not re-run these job the second and next time.
574
575 oarsub -n test -l /core=6,walltime=04:00:00 \
576   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
577
578After a crash or an C<oardel> command,
579you can then re-run the same command that will end to execute the jobs in the list
580
581 oarsub -n test -l /core=6,walltime=04:00:00 \
582   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
583
584C<logtrace> file are just plain file.
585We use the extension '.log' because these files are automatically
586eliminate from our backup system!
587
588=head2 Checkpointing and Idempotent
589
590C<oar-parexec> is compatible with the OAR checkpointing.
591If you have 2000 small jobs that need 55h to be done on 6 cores,
592you can cut this in small parts.
593
594For this example, we suppose that each small job need about 10min...
595So, we send a checkpoint 12min before the end of the process
596to let C<oar-parexec> finish the jobs started.
597After being checkpointed, C<oar-parexec> do not start any new small job.
598
599 oarsub -t idempotent -n test \
600   -l /core=6,walltime=04:00:00 \
601   --checkpoint 720 \
602   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
603
604After 3h48min, the OAR job will begin to stop launching new small job.
605When all running small job are finished, it's exit.
606But as the OAR job is type C<idempotent>,
607OAR will re-submit it as long as all small job are not executed...
608
609This way, we let other users a chance to use the cluster!
610
611In this last exemple, we use moldable OAR job with idempotent
612to reserve many core for a small time or a few cores for a long time:
613
614 oarsub -t idempotent -n test \
615   -l /core=50,walltime=01:05:00 \
616   -l /core=6,walltime=04:00:00 \
617   --checkpoint 720 \
618   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
619
620=head2 Signal, recurse and long job
621
622By default, OAR use signal USR2 for checkpointing.
623It's possible to change this with option C<--kill>.
624
625When use with long small job, checkpointing could be too long...
626More than walltime!
627The option C<--transmit> could be use to checkpoint small job!
628These long small job will then stop cleanly and will be restarted next time.
629
630In the C<logtrace> file, small job will have the status suspend.
631They will be launch with the same command line at the next OAR run.
632
633Example: if you have 50 small jobs that each need 72h to be done on 1 cores,
634you can cut this in 24h parts.
635
636For this example, we suppose that each long job loop need about 20min...
637So, we send a checkpoint 30min before the end of the process
638to let C<oar-parexec> suspend the jobs started.
639After being checkpointed, C<oar-parexec> do not start any new small job.
640
641 oarsub -t idempotent -n test \
642   -l /core=6,walltime=24:00:00 \
643   --checkpoint 1800 \
644   --transmit \
645   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
646
647After 23h30min, the OAR job will begin to stop launching new small job.
648When all running small job are suspend, it's exit.
649But as the OAR job is type C<idempotent>,
650OAR will re-submit it as long as all small job are not finished...
651
652=head1 SEE ALSO
653
654oar-dispatch, mpilauncher,
655orsh, oar-envsh, ssh
656
657
658=head1 AUTHORS
659
660Written by Gabriel Moreau, Grenoble - France
661
662
663=head1 LICENSE AND COPYRIGHT
664
665GPL version 2 or later and Perl equivalent
666
667Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
668
Note: See TracBrowser for help on using the repository browser.