source: trunk/oarutils/oar-parexec @ 85

Last change on this file since 85 was 85, checked in by g7moreau, 13 years ago
  • Delete meta comment folder can be replace by a simple 'cd'
File size: 17.6 KB
RevLine 
[13]1#!/usr/bin/perl
2#
3# 2011/11/27 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
[32]16use Cwd qw( getcwd );
[13]17
[75]18my $file;
19my $dir;
20my $cmd;
21my $logtrace;
[13]22my $verbose;
[82]23my $job_np         = 1;
24my $nodefile       = $ENV{OAR_NODE_FILE} || '';
[32]25my $masterio;
[13]26my $switchio;
27my $help;
[82]28my $oarsh          = 'oarsh -q -T';
[75]29my $sig_transmit;
30my $sig_checkpoint = 'USR2';
[13]31
32Getopt::Long::GetOptions(
[47]33   'file=s'     => \$file,
[45]34   'dir=s'      => \$dir,
35   'cmd=s'      => \$cmd,
[43]36   'logtrace=s' => \$logtrace,
[32]37   'verbose'    => \$verbose,
38   'help'       => \$help,
39   'oarsh=s'    => \$oarsh,
[34]40   'jobnp=i'    => \$job_np,
[32]41   'nodefile=s' => \$nodefile,
42   'masterio=s' => \$masterio,
43   'switchio'   => \$switchio,
[75]44   'transmit'   => \$sig_transmit,
45   'kill=s'     => \$sig_checkpoint,
[41]46   ) || pod2usage(-verbose => 0);
47pod2usage(-verbose => 2) if $help;
[45]48pod2usage(-verbose => 2) if not (
[47]49 (-e "$file")
[45]50 or (-d "$dir" and $cmd ne '')
51 );
[13]52
[43]53# re-run, keep trace of job already done
[38]54my %state;
55my $log_h = IO::File->new();
[45]56if (-e "$logtrace") {
[43]57   $log_h->open("< $logtrace")
58      or die "error: can't read log file: $!";
[38]59   while (<$log_h>) {
[45]60      $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/;
61      $state{$1} = 'end'   if m/^end\s+job\s+([^\s]+)\s/;
[41]62      }
[38]63   $log_h->close();
64   }
[43]65if ($logtrace) {
66   $log_h->open(">> $logtrace")
67      or die "error: can't append log file $logtrace: $!";
[40]68   $log_h->autoflush;
[38]69   $log_h = unblock $log_h;
70   }
71
[43]72# job to run
[13]73my @job = ();
[47]74if (-e "$file") {
[45]75   my $job_num = 0;
[47]76   open(JOB_LIST, '<', "$file") or die "error: can't open job file $file: $!";
[77]77   while (my $job_cmd = <JOB_LIST>) {
78      chomp $job_cmd;
79      next if $job_cmd =~ m/^#/;
80      next if $job_cmd =~ m/^\s*$/;
[45]81      $job_num++;
[77]82      my ($job_name) = $job_cmd =~ m/#.*?\bname=(\S+?)\b/i;
83      $job_name ||= $job_num;
[85]84      push @job, { name => $job_name, cmd => "$job_cmd" };
[45]85      }
86   close JOB_LIST;
[13]87   }
[45]88else {
89   opendir(DIR, $dir) or die "error: can't open folder $dir: $!";
90   while (my $item = readdir(DIR)) {
91      next if $item =~ m/^\./;
92      next if $item =~ m/:/;
93      next if $item =~ m/\.old$/;
94      next if $item =~ m/\.sav$/;
95      next if $item =~ m/\.bak$/;
96      next if $item =~ m/\.no$/;
97      next unless (-d "$dir/$item");
98      push @job, { name => $item, cmd => "( cd $dir/$item/; $cmd )" };
99      }
100   closedir DIR;
101   }
[13]102
[43]103# ressources available
[34]104my @ressources = ();
[41]105open(NODE_FILE, '<', "$nodefile")
[34]106   or die "can't open $nodefile: $!";
107while (<NODE_FILE>) {
108   chomp;
109   next if m/^#/;
110   next if m/^\s*$/;
[41]111   push @ressources, $_;
[34]112   }
113close NODE_FILE;
114
115my $ressource_size = scalar(@ressources);
[43]116die "error: not enought ressources jobnp $job_np > ressources $ressource_size"
[41]117   if $job_np > $ressource_size;
[34]118
119my $current_dir = getcwd();
120
[32]121my $stderr = $ENV{OAR_STDERR} || '';
[13]122$stderr =~ s/\.stderr$//;
[32]123$stderr = $masterio if $masterio;
124my $stdout = $ENV{OAR_STDOUT} || '';
[13]125$stdout =~ s/\.stdout$//;
[32]126$stdout = $masterio if $masterio;
[13]127
128my $finished = new Coro::Signal;
129my $job_todo = new Coro::Semaphore 0;
[45]130my $job_name_maxlen;
131for (@job) {
132   $job_todo->up;
133   $job_name_maxlen = length($_->{name}) if length($_->{name}) > $job_name_maxlen;
134   }
[13]135
[43]136# slice of ressources for parallel job
[13]137my $ressources = new Coro::Channel;
[34]138for my $slot (1 .. int($ressource_size / $job_np)) {
[41]139   $ressources->put(
140      join(',',
141         @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ])
142         );
[13]143   }
144
145my %scheduled = ();
146
[43]147# OAR checkpoint and default signal SIGUSR2
[39]148my $oar_checkpoint = new Coro::Semaphore 0;
[84]149my $notify         = new Coro::Signal;
[75]150$SIG{$sig_checkpoint} = sub {
[42]151   print "warning: receive checkpoint at "
152      . time
153      . ", no new job, just finishing running job\n"
154      if $verbose;
155   $oar_checkpoint->up();
[84]156   $notify->send if $sig_transmit;
[42]157   };
[39]158
[81]159# asynchrone notify job
160async {
161   while () {
[84]162      $notify->wait;
[81]163
[84]164      for my $job_pid (keys %scheduled) {
165         my $job_name     = $scheduled{$job_pid}->{name};
166         my $job_pidfile  = $scheduled{$job_pid}->{pidfile};
167         my $node_connect = $scheduled{$job_pid}->{node_connect};
[81]168
[84]169         my $fh = IO::File->new();
170         $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
171            or die "error: can't notify subjob: $!";
[81]172
[84]173         $fh->autoflush;
174         $fh = unblock $fh;
[81]175
[84]176         $fh->print("kill -$sig_checkpoint \$(cat $job_pidfile)\n");
177         $fh->print("exit\n");
[81]178
[84]179         print "warning: transmit signal $sig_checkpoint"
180            . " to job $job_name on node $node_connect.\n"
181            if $verbose;
[82]182
[84]183         close $fh;
184         cede;
[81]185         }
186      }
187   }
188
[43]189# asynchrone start job block
[13]190async {
[81]191   JOB:
[13]192   for my $job (@job) {
[83]193      my $job_name   = $job->{name};
194      my $job_cmd    = $job->{cmd};
[38]195
[43]196      # job has been already run ?
[45]197      if (exists $state{$job_name}) {
198         if ($state{$job_name} eq 'start') {
199            print "warning: job $job_name was not clearly finished, relaunching...\n"
[41]200               if $verbose;
201            }
[45]202         elsif ($state{$job_name} eq 'end') {
203            delete $state{$job_name}; # free memory
[41]204            $job_todo->down;
[45]205            print "warning: job $job_name already run\n" if $verbose;
[41]206            cede;
[43]207            next JOB;
[41]208            }
209         }
[40]210
[43]211      # take job ressource
[36]212      my $job_ressource = $ressources->get;
[13]213
[43]214      # no more launch job when OAR checkpointing
215      last JOB if $oar_checkpoint->count() > 0;
[39]216
[36]217      my ($node_connect) = split ',', $job_ressource;
[41]218      my $fh = IO::File->new();
[34]219      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
[43]220         or die "error: can't start subjob: $!";
[13]221
222      $fh->autoflush;
223      $fh = unblock $fh;
224
[45]225      my $msg = sprintf "start job %${job_name_maxlen}s / %5i at %s on node %s\n",
226         $job_name, $job_pid, time, $job_ressource;
[43]227      $log_h->print($msg) if $logtrace;
[42]228      print($msg) if $verbose;
[13]229
[41]230      my ($job_stdout, $job_stderr);
[45]231      $job_stdout = ">  $stdout-$job_name.stdout" if $stdout ne '' and $switchio;
232      $job_stderr = "2> $stderr-$job_name.stderr" if $stderr ne '' and $switchio;
[13]233
[82]234      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$ENV{OAR_JOB_ID}-$job_name";
235      my $job_pidfile  = "/tmp/oar-parexec-$ENV{LOGNAME}-$ENV{OAR_JOB_ID}-$job_name.pid";
[34]236
[81]237      $scheduled{$job_pid} = {
238         fh           => $fh,
239         node_connect => $node_connect,
240         ressource    => $job_ressource,
241         name         => $job_name,
242         pidfile      => $job_pidfile,
243         };
244
[84]245      $job_cmd =~ s/(\s+##.*)$//; # suppress comment
246
[81]247      # set job environment, run it and clean
[34]248      if ($job_np > 1) {
[36]249         $fh->print("printf \""
[41]250               . join('\n', split(',', $job_ressource,))
251               . "\" > $job_nodefile\n");
[37]252         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
[34]253         $fh->print("OAR_NP=$job_np\n");
[37]254         $fh->print("export OAR_NODE_FILE\n");
[34]255         $fh->print("export OAR_NP\n");
256         $fh->print("unset OAR_MSG_NODEFILE\n");
257         }
[32]258      $fh->print("cd $current_dir\n");
[81]259      if ($sig_transmit) {
260         $fh->print("trap 'kill -$sig_checkpoint \$(jobs -p)' $sig_checkpoint\n");
261         $fh->print("echo \$\$ > $job_pidfile\n");
[84]262         $fh->print("$job_cmd $job_stdout $job_stderr \&\n");
[81]263         $fh->print("while [ \$(jobs -p | wc -l) -gt 0 ]\n");
264         $fh->print("do\n");
265         $fh->print("   wait\n");
266         $fh->print("done\n");
267         $fh->print("rm -f $job_pidfile\n");
268         }
269      else {
270         $fh->print("$job_cmd $job_stdout $job_stderr\n");
271         }
[34]272      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
[13]273      $fh->print("exit\n");
274      cede;
275      }
276   }
277
[43]278# asynchrone end job block
[13]279async {
280   while () {
[41]281      for my $job_pid (keys %scheduled) {
[82]282         # non blocking PID test
[41]283         if (waitpid($job_pid, WNOHANG)) {
[45]284            my $msg = sprintf "end   job %${job_name_maxlen}s / %5i at %s on node %s\n",
285               $scheduled{$job_pid}->{name},
[42]286               $job_pid, time, $scheduled{$job_pid}->{ressource};
[76]287
288            # Job non finish, just suspend if received checkpoint signal
289            $msg =~ s/^end\s+job/suspend job/
290               if $sig_transmit and $oar_checkpoint->count() > 0;
291
[43]292            $log_h->print($msg) if $logtrace;
[42]293            print($msg) if $verbose;
[13]294            close $scheduled{$job_pid}->{fh};
[43]295            # leave ressources for another job
[41]296            $ressources->put($scheduled{$job_pid}->{ressource});
[13]297            $job_todo->down;
298            delete $scheduled{$job_pid};
299            }
300         cede;
301         }
302
[43]303      # checkpointing ! just finishing running job and quit
[42]304      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
[39]305
[42]306      $finished->send if $job_todo->count() == 0;
[13]307      cede;
308      }
309   }
310
311cede;
312
[43]313# all job have been done
[13]314$finished->wait;
315
[43]316# close log trace file
317$log_h->close() if $logtrace;
[38]318
[13]319__END__
320
321=head1 NAME
322
[44]323oar-parexec - parallel execution of many small job
[13]324
325=head1 SYNOPSIS
326
[47]327 oar-parexec --file filecommand \
328    [--logtrace tracefile] [--verbose] \
329    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
330    [--switchio] [--masterio basefileio]
[46]331
[47]332 oar-parexec --dir foldertoiterate --cmd commandtolaunch \
333    [--logtrace tracefile] [--verbose] \
334    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
335    [--switchio] [--masterio basefileio]
[46]336
[13]337 oar-parexec --help
338
[32]339=head1 DESCRIPTION
340
[44]341C<oar-parexec> can execute lot of small job in parallel inside a cluster.
342Number of parallel job at one time cannot exceed the number of core define in the node file
[32]343C<oar-parexec> is easier to use inside an OAR job environment
[44]344which define automatically these strategics parameters...
345However, it can be used outside OAR.
[32]346
[47]347Option C<--file> or C<--dir> and C<--cmd> are the only mandatory parameters.
[32]348
349Small job will be launch in the same folder as the master job.
[44]350Two environment variable are defined for each small job
[37]351and only in case of parallel small job (option C<--jobnp> > 1).
[32]352
[34]353 OAR_NODE_FILE - file that list node for parallel computing
354 OAR_NP        - number of processor affected
[32]355
[44]356The file define by OAR_NODE_FILE is created  in /tmp
357on the node before launching the small job
358and this file will be delete after job complete.
[34]359C<oar-parexec> is a simple script,
360OAR_NODE_FILE will not be deleted in case of crash of the master job.
361
[37]362OAR define other variable that are equivalent to OAR_NODE_FILE:
363OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
364You can use in your script the OAR original file ressources
365by using these variable if you need it.
[34]366
[82]367
[13]368=head1 OPTIONS
369
[32]370=over 12
[13]371
[47]372=item B<-f|--file filecommand>
[13]373
[32]374File name which content job list.
[45]375For the JOB_NAME definition,
376the first valid job in the list will have the number 1 and so on...
[13]377
[77]378It's possible to fix the name inside a comment on the job line.
379For example:
380
381 $HOME/test/subjob1.sh # name=subjob1
382
383The key C<name> is case insensitive,
384the associated value cannot have a space...
385
[47]386=item B<-d|--dir foldertoiterate>
[45]387
388Command C<--cmd> will be launch in all sub-folder of this master folder.
389Files in this folder will be ignored.
[47]390Sub-folder name which begin with F<.>
391or finish with F<.old>, F<.sav>, F<.bak>, F<.no> will either be ignored...
[45]392
393The JOB_NAME is simply the Sub-folder name.
394
395=item B<-c|--cmd commandtolaunch>
396
397Command (and argument to it) tha will be launch in all sub-folder
398parameter folfer C<--dir>
399
[43]400=item B<-l|--logtrace tracefile>
401
402File which log and trace running job.
[44]403In case of running the same master command (after crash for example),
404only job that are not mark as done will be run again.
405Be careful, job mark as running (start but not finish) will be run again.
[45]406Tracing is base on the JOB_NAME between multiple run.
[43]407
408This option is very usefull in case of crash
409but also for checkpointing and idempotent OAR job.
410
[32]411=item B<-v|--verbose>
[13]412
[34]413=item B<-j|--jobnp integer>
[13]414
[34]415Number of processor to allocated for each small job.
4161 by default.
417
418=item B<-n|--nodefile filenode>
419
[44]420File name that list all the node where job could be launch.
[32]421By defaut, it's define automatically by OAR via
422environment variable C<OAR_NODE_FILE>.
[13]423
[32]424For example, if you want to use 6 core on your cluster node,
425you need to put 6 times the hostname node in this file,
426one per line...
427It's a very common file in MPI process !
[13]428
[46]429=item B<-o|-oarsh command>
[13]430
[46]431Command use to launch a shell on a node.
432By default
[13]433
[46]434 oarsh -q -T
435
436Change it to C<ssh> if you are not using an OAR cluster...
437
[32]438=item B<-s|--switchio>
[21]439
[32]440Each small job will have it's own output STDOUT and STDERR
[45]441base on master OAR job with C<JOB_NAME> inside
[32]442(or base on C<basefileio> if option C<masterio>).
443Example :
[21]444
[45]445 OAR.151524.stdout -> OAR.151524-JOB_NAME.stdout
[21]446
[32]447where 151524 here is the master C<OAR_JOB_ID>
[45]448and C<JOB_NAME> is the small job name.
[21]449
[46]450=item B<-m|--masterio basefileio>
[32]451
[46]452The C<basefileio> will be use in place of environment variable
453C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
454(only use when option C<swithio> is activated).
[32]455
[78]456=item B<-k|--kill signal>
457
458Signal to listen and make a clean stop of the current C<oar-parexec> process.
459By default, use USR2 signal (see C<kill -l>> for a list of possible signal).
460
461=item B<-t|--transmit>
462
463Resend catch signal to sub-job when receiving it.
464By default, no signal is transmis to child process.
465
466It's only valuable if use for long sub-job than can
467in return make themselves a clean restart.
468
469
[32]470=item B<-h|--help>
471
472=back
473
474
475=head1 EXAMPLE
476
[44]477=head2 Simple list of sequential job
478
[47]479Content for the job file command (option C<--file>) could have:
[21]480
[13]481 - empty line
482 - comment line begin with #
[85]483 - valid shell command with double # for meta comment
[13]484
485Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
486
[85]487 $HOME/test/subjob01.sh  ## name=subjob01
488 $HOME/test/subjob02.sh  ## name=subjob02
489 $HOME/test/subjob03.sh  ## name=subjob03
490 $HOME/test/subjob04.sh  ## name=subjob04
[32]491 ...
[85]492 $HOME/test/subjob38.sh  ## name=subjob38
493 $HOME/test/subjob39.sh  ## name=subjob39
494 $HOME/test/subjob40.sh  ## name=subjob40
[13]495
[44]496These jobs could be launch by:
[13]497
[49]498 oarsub -n test -l /core=6,walltime=04:00:00 \
499   "oar-parexec -f ./subjob.list.txt"
[13]500
[47]501=head2 Folder job
502
503In a folder F<subjob.d>, create sub-folder with your data inside : F<test1>, <test2>...
504The same command will be executed in every sub-folder.
505C<oar-parexec> change the current directory to the sub-folder before launching it.
506
507A very simple job could be:
508
[49]509 oarsub -n test -l /core=6,walltime=04:00:00 \
510   "oar-parexec -d ./subjob.d -c 'sleep 10; env'"
[47]511
512The command C<env> will be excuted in all folder F<test1>, F<test2>... after a 10s pause.
513
514Sometime, it's simpler to use file list command,
515sometime, jobs by folder with the same command run is more relevant.
516
[44]517=head2 Parallel job
[28]518
[44]519You need to put the number of core each small job need with option C<--jobnp>.
520If your job is build on OpenMP or MPI,
521you can use OAR_NP and OAR_NODE_FILE variables to configure them.
522On OAR cluster, you need to use C<oarsh> or a wrapper like C<oar-envsh>
523for connexion between node instead of C<ssh>.
524
525Example with parallel small job on 2 core:
526
[49]527 oarsub -n test -l /core=6,walltime=04:00:00 \
528   "oar-parexec -j 2 -f ./subjob.list.txt"
[44]529
530=head2 Tracing and master crash
531
532If the master node crash after hours of calculus, everything is lost ?
533No, with option C<--logtrace>,
534it's possible to remember older result
535and not re-run these job the second and next time.
536
[49]537 oarsub -n test -l /core=6,walltime=04:00:00 \
538   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
[44]539
540After a crash or an C<oardel> command,
541you can then re-run the same command that will end to execute the jobs in the list
542
[49]543 oarsub -n test -l /core=6,walltime=04:00:00 \
544   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
[44]545
546C<logtrace> file are just plain file.
547We use the extension '.log' because these files are automatically
548eliminate from our backup system!
549
550=head2 Checkpointing and Idempotent
551
552C<oar-parexec> is compatible with the OAR checkpointing.
553Il you have 2000 small jobs that need 55h to be done on 6 cores,
554you can cut this in small parts.
555
556For this example, we suppose that each small job need about 10min...
557So, we send a checkpoint 12min before the end of the process
558to let C<oar-parexec> finish the jobs started.
559After being checkpointed, C<oar-parexec> do not start any new small job.
560
[49]561 oarsub -t idempotent -n test \
562   -l /core=6,walltime=04:00:00 \
563   --checkpoint 720 \
[44]564   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
565
566After 3h48min, the OAR job will begin to stop launching new small job.
567When all running small job are finished, it's exit.
568But as the OAR job is type C<idempotent>,
569OAR will re-submit it as long as all small job are not executed...
570
571This way, we let other users a chance to use the cluster!
572
573In this last exemple, we use moldable OAR job with idempotent
574to reserve many core for a small time or a few cores for a long time:
575
576 oarsub -t idempotent -n test \
577   -l /core=50,walltime=01:05:00 \
578   -l /core=6,walltime=04:00:00 \
579   --checkpoint 720 \
580   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
581
[78]582=head2 Signal, recurse and long job
[44]583
[78]584By default, OAR use signal USR2 for checkpointing.
[79]585It's possible to change this with option C<--kill>.
[78]586
587When use with long small job, checkpointing could be too long...
[79]588More than walltime!
589The option C<--transmit> could be use to checkpoint small job!
590These long small job will then stop cleanly and will be restarted next time.
[78]591
592In the C<logtrace> file, small job will have the status suspend.
[79]593They will be launch with the same command line at the next OAR run.
[78]594
[21]595=head1 SEE ALSO
596
[44]597oar-dispatch, mpilauncher,
598orsh, oar-envsh, ssh
[21]599
600
[13]601=head1 AUTHORS
602
[21]603Written by Gabriel Moreau, Grenoble - France
[13]604
[21]605
606=head1 LICENSE AND COPYRIGHT
607
608GPL version 2 or later and Perl equivalent
609
[28]610Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
[21]611
Note: See TracBrowser for help on using the repository browser.