source: trunk/oarutils/oar-parexec @ 78

Last change on this file since 78 was 78, checked in by g7moreau, 12 years ago
  • Documentation for option killand transmit
File size: 16.1 KB
RevLine 
[13]1#!/usr/bin/perl
2#
3# 2011/11/27 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
[32]16use Cwd qw( getcwd );
[13]17
[75]18my $file;
19my $dir;
20my $cmd;
21my $logtrace;
[13]22my $verbose;
[75]23my $job_np         = 1;
24my $nodefile       = $ENV{OAR_NODE_FILE} || '';
[32]25my $masterio;
[13]26my $switchio;
27my $help;
[75]28my $oarsh          = 'oarsh -q -T';
29my $sig_transmit;
30my $sig_checkpoint = 'USR2';
[13]31
32Getopt::Long::GetOptions(
[47]33   'file=s'     => \$file,
[45]34   'dir=s'      => \$dir,
35   'cmd=s'      => \$cmd,
[43]36   'logtrace=s' => \$logtrace,
[32]37   'verbose'    => \$verbose,
38   'help'       => \$help,
39   'oarsh=s'    => \$oarsh,
[34]40   'jobnp=i'    => \$job_np,
[32]41   'nodefile=s' => \$nodefile,
42   'masterio=s' => \$masterio,
43   'switchio'   => \$switchio,
[75]44   'transmit'   => \$sig_transmit,
45   'kill=s'     => \$sig_checkpoint,
[41]46   ) || pod2usage(-verbose => 0);
47pod2usage(-verbose => 2) if $help;
[45]48pod2usage(-verbose => 2) if not (
[47]49 (-e "$file")
[45]50 or (-d "$dir" and $cmd ne '')
51 );
[13]52
[43]53# re-run, keep trace of job already done
[38]54my %state;
55my $log_h = IO::File->new();
[45]56if (-e "$logtrace") {
[43]57   $log_h->open("< $logtrace")
58      or die "error: can't read log file: $!";
[38]59   while (<$log_h>) {
[45]60      $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/;
61      $state{$1} = 'end'   if m/^end\s+job\s+([^\s]+)\s/;
[41]62      }
[38]63   $log_h->close();
64   }
[43]65if ($logtrace) {
66   $log_h->open(">> $logtrace")
67      or die "error: can't append log file $logtrace: $!";
[40]68   $log_h->autoflush;
[38]69   $log_h = unblock $log_h;
70   }
71
[43]72# job to run
[13]73my @job = ();
[47]74if (-e "$file") {
[45]75   my $job_num = 0;
[47]76   open(JOB_LIST, '<', "$file") or die "error: can't open job file $file: $!";
[77]77   while (my $job_cmd = <JOB_LIST>) {
78      chomp $job_cmd;
79      next if $job_cmd =~ m/^#/;
80      next if $job_cmd =~ m/^\s*$/;
[45]81      $job_num++;
[77]82      my ($job_name) = $job_cmd =~ m/#.*?\bname=(\S+?)\b/i;
83      $job_name ||= $job_num;
84      push @job, { name => $job_name, cmd => "$job_cmd" };
[45]85      }
86   close JOB_LIST;
[13]87   }
[45]88else {
89   opendir(DIR, $dir) or die "error: can't open folder $dir: $!";
90   while (my $item = readdir(DIR)) {
91      next if $item =~ m/^\./;
92      next if $item =~ m/:/;
93      next if $item =~ m/\.old$/;
94      next if $item =~ m/\.sav$/;
95      next if $item =~ m/\.bak$/;
96      next if $item =~ m/\.no$/;
97      next unless (-d "$dir/$item");
98      push @job, { name => $item, cmd => "( cd $dir/$item/; $cmd )" };
99      }
100   closedir DIR;
101   }
[13]102
[43]103# ressources available
[34]104my @ressources = ();
[41]105open(NODE_FILE, '<', "$nodefile")
[34]106   or die "can't open $nodefile: $!";
107while (<NODE_FILE>) {
108   chomp;
109   next if m/^#/;
110   next if m/^\s*$/;
[41]111   push @ressources, $_;
[34]112   }
113close NODE_FILE;
114
115my $ressource_size = scalar(@ressources);
[43]116die "error: not enought ressources jobnp $job_np > ressources $ressource_size"
[41]117   if $job_np > $ressource_size;
[34]118
119my $current_dir = getcwd();
120
[32]121my $stderr = $ENV{OAR_STDERR} || '';
[13]122$stderr =~ s/\.stderr$//;
[32]123$stderr = $masterio if $masterio;
124my $stdout = $ENV{OAR_STDOUT} || '';
[13]125$stdout =~ s/\.stdout$//;
[32]126$stdout = $masterio if $masterio;
[13]127
128my $finished = new Coro::Signal;
129my $job_todo = new Coro::Semaphore 0;
[45]130my $job_name_maxlen;
131for (@job) {
132   $job_todo->up;
133   $job_name_maxlen = length($_->{name}) if length($_->{name}) > $job_name_maxlen;
134   }
[13]135
[43]136# slice of ressources for parallel job
[13]137my $ressources = new Coro::Channel;
[34]138for my $slot (1 .. int($ressource_size / $job_np)) {
[41]139   $ressources->put(
140      join(',',
141         @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ])
142         );
[13]143   }
144
145my %scheduled = ();
146
[43]147# OAR checkpoint and default signal SIGUSR2
[39]148my $oar_checkpoint = new Coro::Semaphore 0;
[75]149$SIG{$sig_checkpoint} = sub {
[42]150   print "warning: receive checkpoint at "
151      . time
152      . ", no new job, just finishing running job\n"
153      if $verbose;
154   $oar_checkpoint->up();
[75]155   kill $sig_checkpoint => keys %scheduled if $sig_transmit;
[42]156   };
[39]157
[43]158# asynchrone start job block
[13]159async {
[43]160        JOB:
[13]161   for my $job (@job) {
[45]162      my $job_name = $job->{name};
163      my $job_cmd  = $job->{cmd};
[38]164
[43]165      # job has been already run ?
[45]166      if (exists $state{$job_name}) {
167         if ($state{$job_name} eq 'start') {
168            print "warning: job $job_name was not clearly finished, relaunching...\n"
[41]169               if $verbose;
170            }
[45]171         elsif ($state{$job_name} eq 'end') {
172            delete $state{$job_name}; # free memory
[41]173            $job_todo->down;
[45]174            print "warning: job $job_name already run\n" if $verbose;
[41]175            cede;
[43]176            next JOB;
[41]177            }
178         }
[40]179
[43]180      # take job ressource
[36]181      my $job_ressource = $ressources->get;
[13]182
[43]183      # no more launch job when OAR checkpointing
184      last JOB if $oar_checkpoint->count() > 0;
[39]185
[36]186      my ($node_connect) = split ',', $job_ressource;
[41]187      my $fh = IO::File->new();
[34]188      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
[43]189         or die "error: can't start subjob: $!";
[13]190
191      $fh->autoflush;
192      $fh = unblock $fh;
193
[41]194      $scheduled{$job_pid} = {
195         fh           => $fh,
196         node_connect => $node_connect,
197         ressource    => $job_ressource,
[45]198         name         => $job_name
[41]199         };
[13]200
[45]201      my $msg = sprintf "start job %${job_name_maxlen}s / %5i at %s on node %s\n",
202         $job_name, $job_pid, time, $job_ressource;
[43]203      $log_h->print($msg) if $logtrace;
[42]204      print($msg) if $verbose;
[13]205
[41]206      my ($job_stdout, $job_stderr);
[45]207      $job_stdout = ">  $stdout-$job_name.stdout" if $stdout ne '' and $switchio;
208      $job_stderr = "2> $stderr-$job_name.stderr" if $stderr ne '' and $switchio;
[13]209
[45]210      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$job_name";
[34]211
[43]212     # set job environment, run it and clean
[34]213      if ($job_np > 1) {
[36]214         $fh->print("printf \""
[41]215               . join('\n', split(',', $job_ressource,))
216               . "\" > $job_nodefile\n");
[37]217         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
[34]218         $fh->print("OAR_NP=$job_np\n");
[37]219         $fh->print("export OAR_NODE_FILE\n");
[34]220         $fh->print("export OAR_NP\n");
221         $fh->print("unset OAR_MSG_NODEFILE\n");
222         }
[32]223      $fh->print("cd $current_dir\n");
[45]224      $fh->print("$job_cmd $job_stdout $job_stderr\n");
[34]225      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
[13]226      $fh->print("exit\n");
227      cede;
228      }
229   }
230
[43]231# asynchrone end job block
[13]232async {
233   while () {
[41]234      for my $job_pid (keys %scheduled) {
[43]235                        # non blocking PID test
[41]236         if (waitpid($job_pid, WNOHANG)) {
[45]237            my $msg = sprintf "end   job %${job_name_maxlen}s / %5i at %s on node %s\n",
238               $scheduled{$job_pid}->{name},
[42]239               $job_pid, time, $scheduled{$job_pid}->{ressource};
[76]240
241            # Job non finish, just suspend if received checkpoint signal
242            $msg =~ s/^end\s+job/suspend job/
243               if $sig_transmit and $oar_checkpoint->count() > 0;
244
[43]245            $log_h->print($msg) if $logtrace;
[42]246            print($msg) if $verbose;
[13]247            close $scheduled{$job_pid}->{fh};
[43]248            # leave ressources for another job
[41]249            $ressources->put($scheduled{$job_pid}->{ressource});
[13]250            $job_todo->down;
251            delete $scheduled{$job_pid};
252            }
253         cede;
254         }
255
[43]256      # checkpointing ! just finishing running job and quit
[42]257      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
[39]258
[42]259      $finished->send if $job_todo->count() == 0;
[13]260      cede;
261      }
262   }
263
264cede;
265
[43]266# all job have been done
[13]267$finished->wait;
268
[43]269# close log trace file
270$log_h->close() if $logtrace;
[38]271
[13]272__END__
273
274=head1 NAME
275
[44]276oar-parexec - parallel execution of many small job
[13]277
278=head1 SYNOPSIS
279
[47]280 oar-parexec --file filecommand \
281    [--logtrace tracefile] [--verbose] \
282    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
283    [--switchio] [--masterio basefileio]
[46]284
[47]285 oar-parexec --dir foldertoiterate --cmd commandtolaunch \
286    [--logtrace tracefile] [--verbose] \
287    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
288    [--switchio] [--masterio basefileio]
[46]289
[13]290 oar-parexec --help
291
[32]292=head1 DESCRIPTION
293
[44]294C<oar-parexec> can execute lot of small job in parallel inside a cluster.
295Number of parallel job at one time cannot exceed the number of core define in the node file
[32]296C<oar-parexec> is easier to use inside an OAR job environment
[44]297which define automatically these strategics parameters...
298However, it can be used outside OAR.
[32]299
[47]300Option C<--file> or C<--dir> and C<--cmd> are the only mandatory parameters.
[32]301
302Small job will be launch in the same folder as the master job.
[44]303Two environment variable are defined for each small job
[37]304and only in case of parallel small job (option C<--jobnp> > 1).
[32]305
[34]306 OAR_NODE_FILE - file that list node for parallel computing
307 OAR_NP        - number of processor affected
[32]308
[44]309The file define by OAR_NODE_FILE is created  in /tmp
310on the node before launching the small job
311and this file will be delete after job complete.
[34]312C<oar-parexec> is a simple script,
313OAR_NODE_FILE will not be deleted in case of crash of the master job.
314
[37]315OAR define other variable that are equivalent to OAR_NODE_FILE:
316OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
317You can use in your script the OAR original file ressources
318by using these variable if you need it.
319 
[34]320
[13]321=head1 OPTIONS
322
[32]323=over 12
[13]324
[47]325=item B<-f|--file filecommand>
[13]326
[32]327File name which content job list.
[45]328For the JOB_NAME definition,
329the first valid job in the list will have the number 1 and so on...
[13]330
[77]331It's possible to fix the name inside a comment on the job line.
332For example:
333
334 $HOME/test/subjob1.sh # name=subjob1
335
336The key C<name> is case insensitive,
337the associated value cannot have a space...
338
[47]339=item B<-d|--dir foldertoiterate>
[45]340
341Command C<--cmd> will be launch in all sub-folder of this master folder.
342Files in this folder will be ignored.
[47]343Sub-folder name which begin with F<.>
344or finish with F<.old>, F<.sav>, F<.bak>, F<.no> will either be ignored...
[45]345
346The JOB_NAME is simply the Sub-folder name.
347
348=item B<-c|--cmd commandtolaunch>
349
350Command (and argument to it) tha will be launch in all sub-folder
351parameter folfer C<--dir>
352
[43]353=item B<-l|--logtrace tracefile>
354
355File which log and trace running job.
[44]356In case of running the same master command (after crash for example),
357only job that are not mark as done will be run again.
358Be careful, job mark as running (start but not finish) will be run again.
[45]359Tracing is base on the JOB_NAME between multiple run.
[43]360
361This option is very usefull in case of crash
362but also for checkpointing and idempotent OAR job.
363
[32]364=item B<-v|--verbose>
[13]365
[34]366=item B<-j|--jobnp integer>
[13]367
[34]368Number of processor to allocated for each small job.
3691 by default.
370
371=item B<-n|--nodefile filenode>
372
[44]373File name that list all the node where job could be launch.
[32]374By defaut, it's define automatically by OAR via
375environment variable C<OAR_NODE_FILE>.
[13]376
[32]377For example, if you want to use 6 core on your cluster node,
378you need to put 6 times the hostname node in this file,
379one per line...
380It's a very common file in MPI process !
[13]381
[46]382=item B<-o|-oarsh command>
[13]383
[46]384Command use to launch a shell on a node.
385By default
[13]386
[46]387 oarsh -q -T
388
389Change it to C<ssh> if you are not using an OAR cluster...
390
[32]391=item B<-s|--switchio>
[21]392
[32]393Each small job will have it's own output STDOUT and STDERR
[45]394base on master OAR job with C<JOB_NAME> inside
[32]395(or base on C<basefileio> if option C<masterio>).
396Example :
[21]397
[45]398 OAR.151524.stdout -> OAR.151524-JOB_NAME.stdout
[21]399
[32]400where 151524 here is the master C<OAR_JOB_ID>
[45]401and C<JOB_NAME> is the small job name.
[21]402
[46]403=item B<-m|--masterio basefileio>
[32]404
[46]405The C<basefileio> will be use in place of environment variable
406C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
407(only use when option C<swithio> is activated).
[32]408
[78]409=item B<-k|--kill signal>
410
411Signal to listen and make a clean stop of the current C<oar-parexec> process.
412By default, use USR2 signal (see C<kill -l>> for a list of possible signal).
413
414=item B<-t|--transmit>
415
416Resend catch signal to sub-job when receiving it.
417By default, no signal is transmis to child process.
418
419It's only valuable if use for long sub-job than can
420in return make themselves a clean restart.
421
422
[32]423=item B<-h|--help>
424
425=back
426
427
428=head1 EXAMPLE
429
[44]430=head2 Simple list of sequential job
431
[47]432Content for the job file command (option C<--file>) could have:
[21]433
[13]434 - empty line
435 - comment line begin with #
436 - valid shell command
437
438Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
439
[77]440 $HOME/test/subjob01.sh  # name=subjob01
441 $HOME/test/subjob02.sh  # name=subjob02
442 $HOME/test/subjob03.sh  # name=subjob03
443 $HOME/test/subjob04.sh  # name=subjob04
[32]444 ...
[77]445 $HOME/test/subjob38.sh  # name=subjob38
446 $HOME/test/subjob39.sh  # name=subjob39
447 $HOME/test/subjob40.sh  # name=subjob40
[13]448
[44]449These jobs could be launch by:
[13]450
[49]451 oarsub -n test -l /core=6,walltime=04:00:00 \
452   "oar-parexec -f ./subjob.list.txt"
[13]453
[47]454=head2 Folder job
455
456In a folder F<subjob.d>, create sub-folder with your data inside : F<test1>, <test2>...
457The same command will be executed in every sub-folder.
458C<oar-parexec> change the current directory to the sub-folder before launching it.
459
460A very simple job could be:
461
[49]462 oarsub -n test -l /core=6,walltime=04:00:00 \
463   "oar-parexec -d ./subjob.d -c 'sleep 10; env'"
[47]464
465The command C<env> will be excuted in all folder F<test1>, F<test2>... after a 10s pause.
466
467Sometime, it's simpler to use file list command,
468sometime, jobs by folder with the same command run is more relevant.
469
[44]470=head2 Parallel job
[28]471
[44]472You need to put the number of core each small job need with option C<--jobnp>.
473If your job is build on OpenMP or MPI,
474you can use OAR_NP and OAR_NODE_FILE variables to configure them.
475On OAR cluster, you need to use C<oarsh> or a wrapper like C<oar-envsh>
476for connexion between node instead of C<ssh>.
477
478Example with parallel small job on 2 core:
479
[49]480 oarsub -n test -l /core=6,walltime=04:00:00 \
481   "oar-parexec -j 2 -f ./subjob.list.txt"
[44]482
483=head2 Tracing and master crash
484
485If the master node crash after hours of calculus, everything is lost ?
486No, with option C<--logtrace>,
487it's possible to remember older result
488and not re-run these job the second and next time.
489
[49]490 oarsub -n test -l /core=6,walltime=04:00:00 \
491   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
[44]492
493After a crash or an C<oardel> command,
494you can then re-run the same command that will end to execute the jobs in the list
495
[49]496 oarsub -n test -l /core=6,walltime=04:00:00 \
497   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
[44]498
499C<logtrace> file are just plain file.
500We use the extension '.log' because these files are automatically
501eliminate from our backup system!
502
503=head2 Checkpointing and Idempotent
504
505C<oar-parexec> is compatible with the OAR checkpointing.
506Il you have 2000 small jobs that need 55h to be done on 6 cores,
507you can cut this in small parts.
508
509For this example, we suppose that each small job need about 10min...
510So, we send a checkpoint 12min before the end of the process
511to let C<oar-parexec> finish the jobs started.
512After being checkpointed, C<oar-parexec> do not start any new small job.
513
[49]514 oarsub -t idempotent -n test \
515   -l /core=6,walltime=04:00:00 \
516   --checkpoint 720 \
[44]517   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
518
519After 3h48min, the OAR job will begin to stop launching new small job.
520When all running small job are finished, it's exit.
521But as the OAR job is type C<idempotent>,
522OAR will re-submit it as long as all small job are not executed...
523
524This way, we let other users a chance to use the cluster!
525
526In this last exemple, we use moldable OAR job with idempotent
527to reserve many core for a small time or a few cores for a long time:
528
529 oarsub -t idempotent -n test \
530   -l /core=50,walltime=01:05:00 \
531   -l /core=6,walltime=04:00:00 \
532   --checkpoint 720 \
533   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
534
[78]535=head2 Signal, recurse and long job
[44]536
[78]537By default, OAR use signal USR2 for checkpointing.
538It's possible to change this with option C<--kill>>.
539
540When use with long small job, checkpointing could be too long...
541Option C<--transmit> could be use to checkpoint small job!
542These will then stop cleanly and will be restarted next time.
543
544In the C<logtrace> file, small job will have the status suspend.
545They will be launch with the same command next OAR run.
546
[21]547=head1 SEE ALSO
548
[44]549oar-dispatch, mpilauncher,
550orsh, oar-envsh, ssh
[21]551
552
[13]553=head1 AUTHORS
554
[21]555Written by Gabriel Moreau, Grenoble - France
[13]556
[21]557
558=head1 LICENSE AND COPYRIGHT
559
560GPL version 2 or later and Perl equivalent
561
[28]562Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
[21]563
Note: See TracBrowser for help on using the repository browser.