source: trunk/oarutils/oar-parexec @ 80

Last change on this file since 80 was 80, checked in by g7moreau, 13 years ago
  • Add a verbose message for signal transmit
File size: 16.2 KB
RevLine 
[13]1#!/usr/bin/perl
2#
3# 2011/11/27 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
[32]16use Cwd qw( getcwd );
[13]17
[75]18my $file;
19my $dir;
20my $cmd;
21my $logtrace;
[13]22my $verbose;
[75]23my $job_np         = 1;
24my $nodefile       = $ENV{OAR_NODE_FILE} || '';
[32]25my $masterio;
[13]26my $switchio;
27my $help;
[75]28my $oarsh          = 'oarsh -q -T';
29my $sig_transmit;
30my $sig_checkpoint = 'USR2';
[13]31
32Getopt::Long::GetOptions(
[47]33   'file=s'     => \$file,
[45]34   'dir=s'      => \$dir,
35   'cmd=s'      => \$cmd,
[43]36   'logtrace=s' => \$logtrace,
[32]37   'verbose'    => \$verbose,
38   'help'       => \$help,
39   'oarsh=s'    => \$oarsh,
[34]40   'jobnp=i'    => \$job_np,
[32]41   'nodefile=s' => \$nodefile,
42   'masterio=s' => \$masterio,
43   'switchio'   => \$switchio,
[75]44   'transmit'   => \$sig_transmit,
45   'kill=s'     => \$sig_checkpoint,
[41]46   ) || pod2usage(-verbose => 0);
47pod2usage(-verbose => 2) if $help;
[45]48pod2usage(-verbose => 2) if not (
[47]49 (-e "$file")
[45]50 or (-d "$dir" and $cmd ne '')
51 );
[13]52
[43]53# re-run, keep trace of job already done
[38]54my %state;
55my $log_h = IO::File->new();
[45]56if (-e "$logtrace") {
[43]57   $log_h->open("< $logtrace")
58      or die "error: can't read log file: $!";
[38]59   while (<$log_h>) {
[45]60      $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/;
61      $state{$1} = 'end'   if m/^end\s+job\s+([^\s]+)\s/;
[41]62      }
[38]63   $log_h->close();
64   }
[43]65if ($logtrace) {
66   $log_h->open(">> $logtrace")
67      or die "error: can't append log file $logtrace: $!";
[40]68   $log_h->autoflush;
[38]69   $log_h = unblock $log_h;
70   }
71
[43]72# job to run
[13]73my @job = ();
[47]74if (-e "$file") {
[45]75   my $job_num = 0;
[47]76   open(JOB_LIST, '<', "$file") or die "error: can't open job file $file: $!";
[77]77   while (my $job_cmd = <JOB_LIST>) {
78      chomp $job_cmd;
79      next if $job_cmd =~ m/^#/;
80      next if $job_cmd =~ m/^\s*$/;
[45]81      $job_num++;
[77]82      my ($job_name) = $job_cmd =~ m/#.*?\bname=(\S+?)\b/i;
83      $job_name ||= $job_num;
84      push @job, { name => $job_name, cmd => "$job_cmd" };
[45]85      }
86   close JOB_LIST;
[13]87   }
[45]88else {
89   opendir(DIR, $dir) or die "error: can't open folder $dir: $!";
90   while (my $item = readdir(DIR)) {
91      next if $item =~ m/^\./;
92      next if $item =~ m/:/;
93      next if $item =~ m/\.old$/;
94      next if $item =~ m/\.sav$/;
95      next if $item =~ m/\.bak$/;
96      next if $item =~ m/\.no$/;
97      next unless (-d "$dir/$item");
98      push @job, { name => $item, cmd => "( cd $dir/$item/; $cmd )" };
99      }
100   closedir DIR;
101   }
[13]102
[43]103# ressources available
[34]104my @ressources = ();
[41]105open(NODE_FILE, '<', "$nodefile")
[34]106   or die "can't open $nodefile: $!";
107while (<NODE_FILE>) {
108   chomp;
109   next if m/^#/;
110   next if m/^\s*$/;
[41]111   push @ressources, $_;
[34]112   }
113close NODE_FILE;
114
115my $ressource_size = scalar(@ressources);
[43]116die "error: not enought ressources jobnp $job_np > ressources $ressource_size"
[41]117   if $job_np > $ressource_size;
[34]118
119my $current_dir = getcwd();
120
[32]121my $stderr = $ENV{OAR_STDERR} || '';
[13]122$stderr =~ s/\.stderr$//;
[32]123$stderr = $masterio if $masterio;
124my $stdout = $ENV{OAR_STDOUT} || '';
[13]125$stdout =~ s/\.stdout$//;
[32]126$stdout = $masterio if $masterio;
[13]127
128my $finished = new Coro::Signal;
129my $job_todo = new Coro::Semaphore 0;
[45]130my $job_name_maxlen;
131for (@job) {
132   $job_todo->up;
133   $job_name_maxlen = length($_->{name}) if length($_->{name}) > $job_name_maxlen;
134   }
[13]135
[43]136# slice of ressources for parallel job
[13]137my $ressources = new Coro::Channel;
[34]138for my $slot (1 .. int($ressource_size / $job_np)) {
[41]139   $ressources->put(
140      join(',',
141         @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ])
142         );
[13]143   }
144
145my %scheduled = ();
146
[43]147# OAR checkpoint and default signal SIGUSR2
[39]148my $oar_checkpoint = new Coro::Semaphore 0;
[75]149$SIG{$sig_checkpoint} = sub {
[42]150   print "warning: receive checkpoint at "
151      . time
152      . ", no new job, just finishing running job\n"
153      if $verbose;
154   $oar_checkpoint->up();
[75]155   kill $sig_checkpoint => keys %scheduled if $sig_transmit;
[80]156   print "warning: transmit signal $sig_checkpoint to: " . join(' ', keys %scheduled) if $sig_transmit and $verbose;
[42]157   };
[39]158
[43]159# asynchrone start job block
[13]160async {
[43]161        JOB:
[13]162   for my $job (@job) {
[45]163      my $job_name = $job->{name};
164      my $job_cmd  = $job->{cmd};
[38]165
[43]166      # job has been already run ?
[45]167      if (exists $state{$job_name}) {
168         if ($state{$job_name} eq 'start') {
169            print "warning: job $job_name was not clearly finished, relaunching...\n"
[41]170               if $verbose;
171            }
[45]172         elsif ($state{$job_name} eq 'end') {
173            delete $state{$job_name}; # free memory
[41]174            $job_todo->down;
[45]175            print "warning: job $job_name already run\n" if $verbose;
[41]176            cede;
[43]177            next JOB;
[41]178            }
179         }
[40]180
[43]181      # take job ressource
[36]182      my $job_ressource = $ressources->get;
[13]183
[43]184      # no more launch job when OAR checkpointing
185      last JOB if $oar_checkpoint->count() > 0;
[39]186
[36]187      my ($node_connect) = split ',', $job_ressource;
[41]188      my $fh = IO::File->new();
[34]189      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
[43]190         or die "error: can't start subjob: $!";
[13]191
192      $fh->autoflush;
193      $fh = unblock $fh;
194
[41]195      $scheduled{$job_pid} = {
196         fh           => $fh,
197         node_connect => $node_connect,
198         ressource    => $job_ressource,
[45]199         name         => $job_name
[41]200         };
[13]201
[45]202      my $msg = sprintf "start job %${job_name_maxlen}s / %5i at %s on node %s\n",
203         $job_name, $job_pid, time, $job_ressource;
[43]204      $log_h->print($msg) if $logtrace;
[42]205      print($msg) if $verbose;
[13]206
[41]207      my ($job_stdout, $job_stderr);
[45]208      $job_stdout = ">  $stdout-$job_name.stdout" if $stdout ne '' and $switchio;
209      $job_stderr = "2> $stderr-$job_name.stderr" if $stderr ne '' and $switchio;
[13]210
[45]211      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$job_name";
[34]212
[43]213     # set job environment, run it and clean
[34]214      if ($job_np > 1) {
[36]215         $fh->print("printf \""
[41]216               . join('\n', split(',', $job_ressource,))
217               . "\" > $job_nodefile\n");
[37]218         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
[34]219         $fh->print("OAR_NP=$job_np\n");
[37]220         $fh->print("export OAR_NODE_FILE\n");
[34]221         $fh->print("export OAR_NP\n");
222         $fh->print("unset OAR_MSG_NODEFILE\n");
223         }
[32]224      $fh->print("cd $current_dir\n");
[45]225      $fh->print("$job_cmd $job_stdout $job_stderr\n");
[34]226      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
[13]227      $fh->print("exit\n");
228      cede;
229      }
230   }
231
[43]232# asynchrone end job block
[13]233async {
234   while () {
[41]235      for my $job_pid (keys %scheduled) {
[43]236                        # non blocking PID test
[41]237         if (waitpid($job_pid, WNOHANG)) {
[45]238            my $msg = sprintf "end   job %${job_name_maxlen}s / %5i at %s on node %s\n",
239               $scheduled{$job_pid}->{name},
[42]240               $job_pid, time, $scheduled{$job_pid}->{ressource};
[76]241
242            # Job non finish, just suspend if received checkpoint signal
243            $msg =~ s/^end\s+job/suspend job/
244               if $sig_transmit and $oar_checkpoint->count() > 0;
245
[43]246            $log_h->print($msg) if $logtrace;
[42]247            print($msg) if $verbose;
[13]248            close $scheduled{$job_pid}->{fh};
[43]249            # leave ressources for another job
[41]250            $ressources->put($scheduled{$job_pid}->{ressource});
[13]251            $job_todo->down;
252            delete $scheduled{$job_pid};
253            }
254         cede;
255         }
256
[43]257      # checkpointing ! just finishing running job and quit
[42]258      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
[39]259
[42]260      $finished->send if $job_todo->count() == 0;
[13]261      cede;
262      }
263   }
264
265cede;
266
[43]267# all job have been done
[13]268$finished->wait;
269
[43]270# close log trace file
271$log_h->close() if $logtrace;
[38]272
[13]273__END__
274
275=head1 NAME
276
[44]277oar-parexec - parallel execution of many small job
[13]278
279=head1 SYNOPSIS
280
[47]281 oar-parexec --file filecommand \
282    [--logtrace tracefile] [--verbose] \
283    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
284    [--switchio] [--masterio basefileio]
[46]285
[47]286 oar-parexec --dir foldertoiterate --cmd commandtolaunch \
287    [--logtrace tracefile] [--verbose] \
288    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
289    [--switchio] [--masterio basefileio]
[46]290
[13]291 oar-parexec --help
292
[32]293=head1 DESCRIPTION
294
[44]295C<oar-parexec> can execute lot of small job in parallel inside a cluster.
296Number of parallel job at one time cannot exceed the number of core define in the node file
[32]297C<oar-parexec> is easier to use inside an OAR job environment
[44]298which define automatically these strategics parameters...
299However, it can be used outside OAR.
[32]300
[47]301Option C<--file> or C<--dir> and C<--cmd> are the only mandatory parameters.
[32]302
303Small job will be launch in the same folder as the master job.
[44]304Two environment variable are defined for each small job
[37]305and only in case of parallel small job (option C<--jobnp> > 1).
[32]306
[34]307 OAR_NODE_FILE - file that list node for parallel computing
308 OAR_NP        - number of processor affected
[32]309
[44]310The file define by OAR_NODE_FILE is created  in /tmp
311on the node before launching the small job
312and this file will be delete after job complete.
[34]313C<oar-parexec> is a simple script,
314OAR_NODE_FILE will not be deleted in case of crash of the master job.
315
[37]316OAR define other variable that are equivalent to OAR_NODE_FILE:
317OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
318You can use in your script the OAR original file ressources
319by using these variable if you need it.
320 
[34]321
[13]322=head1 OPTIONS
323
[32]324=over 12
[13]325
[47]326=item B<-f|--file filecommand>
[13]327
[32]328File name which content job list.
[45]329For the JOB_NAME definition,
330the first valid job in the list will have the number 1 and so on...
[13]331
[77]332It's possible to fix the name inside a comment on the job line.
333For example:
334
335 $HOME/test/subjob1.sh # name=subjob1
336
337The key C<name> is case insensitive,
338the associated value cannot have a space...
339
[47]340=item B<-d|--dir foldertoiterate>
[45]341
342Command C<--cmd> will be launch in all sub-folder of this master folder.
343Files in this folder will be ignored.
[47]344Sub-folder name which begin with F<.>
345or finish with F<.old>, F<.sav>, F<.bak>, F<.no> will either be ignored...
[45]346
347The JOB_NAME is simply the Sub-folder name.
348
349=item B<-c|--cmd commandtolaunch>
350
351Command (and argument to it) tha will be launch in all sub-folder
352parameter folfer C<--dir>
353
[43]354=item B<-l|--logtrace tracefile>
355
356File which log and trace running job.
[44]357In case of running the same master command (after crash for example),
358only job that are not mark as done will be run again.
359Be careful, job mark as running (start but not finish) will be run again.
[45]360Tracing is base on the JOB_NAME between multiple run.
[43]361
362This option is very usefull in case of crash
363but also for checkpointing and idempotent OAR job.
364
[32]365=item B<-v|--verbose>
[13]366
[34]367=item B<-j|--jobnp integer>
[13]368
[34]369Number of processor to allocated for each small job.
3701 by default.
371
372=item B<-n|--nodefile filenode>
373
[44]374File name that list all the node where job could be launch.
[32]375By defaut, it's define automatically by OAR via
376environment variable C<OAR_NODE_FILE>.
[13]377
[32]378For example, if you want to use 6 core on your cluster node,
379you need to put 6 times the hostname node in this file,
380one per line...
381It's a very common file in MPI process !
[13]382
[46]383=item B<-o|-oarsh command>
[13]384
[46]385Command use to launch a shell on a node.
386By default
[13]387
[46]388 oarsh -q -T
389
390Change it to C<ssh> if you are not using an OAR cluster...
391
[32]392=item B<-s|--switchio>
[21]393
[32]394Each small job will have it's own output STDOUT and STDERR
[45]395base on master OAR job with C<JOB_NAME> inside
[32]396(or base on C<basefileio> if option C<masterio>).
397Example :
[21]398
[45]399 OAR.151524.stdout -> OAR.151524-JOB_NAME.stdout
[21]400
[32]401where 151524 here is the master C<OAR_JOB_ID>
[45]402and C<JOB_NAME> is the small job name.
[21]403
[46]404=item B<-m|--masterio basefileio>
[32]405
[46]406The C<basefileio> will be use in place of environment variable
407C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
408(only use when option C<swithio> is activated).
[32]409
[78]410=item B<-k|--kill signal>
411
412Signal to listen and make a clean stop of the current C<oar-parexec> process.
413By default, use USR2 signal (see C<kill -l>> for a list of possible signal).
414
415=item B<-t|--transmit>
416
417Resend catch signal to sub-job when receiving it.
418By default, no signal is transmis to child process.
419
420It's only valuable if use for long sub-job than can
421in return make themselves a clean restart.
422
423
[32]424=item B<-h|--help>
425
426=back
427
428
429=head1 EXAMPLE
430
[44]431=head2 Simple list of sequential job
432
[47]433Content for the job file command (option C<--file>) could have:
[21]434
[13]435 - empty line
436 - comment line begin with #
437 - valid shell command
438
439Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
440
[77]441 $HOME/test/subjob01.sh  # name=subjob01
442 $HOME/test/subjob02.sh  # name=subjob02
443 $HOME/test/subjob03.sh  # name=subjob03
444 $HOME/test/subjob04.sh  # name=subjob04
[32]445 ...
[77]446 $HOME/test/subjob38.sh  # name=subjob38
447 $HOME/test/subjob39.sh  # name=subjob39
448 $HOME/test/subjob40.sh  # name=subjob40
[13]449
[44]450These jobs could be launch by:
[13]451
[49]452 oarsub -n test -l /core=6,walltime=04:00:00 \
453   "oar-parexec -f ./subjob.list.txt"
[13]454
[47]455=head2 Folder job
456
457In a folder F<subjob.d>, create sub-folder with your data inside : F<test1>, <test2>...
458The same command will be executed in every sub-folder.
459C<oar-parexec> change the current directory to the sub-folder before launching it.
460
461A very simple job could be:
462
[49]463 oarsub -n test -l /core=6,walltime=04:00:00 \
464   "oar-parexec -d ./subjob.d -c 'sleep 10; env'"
[47]465
466The command C<env> will be excuted in all folder F<test1>, F<test2>... after a 10s pause.
467
468Sometime, it's simpler to use file list command,
469sometime, jobs by folder with the same command run is more relevant.
470
[44]471=head2 Parallel job
[28]472
[44]473You need to put the number of core each small job need with option C<--jobnp>.
474If your job is build on OpenMP or MPI,
475you can use OAR_NP and OAR_NODE_FILE variables to configure them.
476On OAR cluster, you need to use C<oarsh> or a wrapper like C<oar-envsh>
477for connexion between node instead of C<ssh>.
478
479Example with parallel small job on 2 core:
480
[49]481 oarsub -n test -l /core=6,walltime=04:00:00 \
482   "oar-parexec -j 2 -f ./subjob.list.txt"
[44]483
484=head2 Tracing and master crash
485
486If the master node crash after hours of calculus, everything is lost ?
487No, with option C<--logtrace>,
488it's possible to remember older result
489and not re-run these job the second and next time.
490
[49]491 oarsub -n test -l /core=6,walltime=04:00:00 \
492   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
[44]493
494After a crash or an C<oardel> command,
495you can then re-run the same command that will end to execute the jobs in the list
496
[49]497 oarsub -n test -l /core=6,walltime=04:00:00 \
498   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
[44]499
500C<logtrace> file are just plain file.
501We use the extension '.log' because these files are automatically
502eliminate from our backup system!
503
504=head2 Checkpointing and Idempotent
505
506C<oar-parexec> is compatible with the OAR checkpointing.
507Il you have 2000 small jobs that need 55h to be done on 6 cores,
508you can cut this in small parts.
509
510For this example, we suppose that each small job need about 10min...
511So, we send a checkpoint 12min before the end of the process
512to let C<oar-parexec> finish the jobs started.
513After being checkpointed, C<oar-parexec> do not start any new small job.
514
[49]515 oarsub -t idempotent -n test \
516   -l /core=6,walltime=04:00:00 \
517   --checkpoint 720 \
[44]518   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
519
520After 3h48min, the OAR job will begin to stop launching new small job.
521When all running small job are finished, it's exit.
522But as the OAR job is type C<idempotent>,
523OAR will re-submit it as long as all small job are not executed...
524
525This way, we let other users a chance to use the cluster!
526
527In this last exemple, we use moldable OAR job with idempotent
528to reserve many core for a small time or a few cores for a long time:
529
530 oarsub -t idempotent -n test \
531   -l /core=50,walltime=01:05:00 \
532   -l /core=6,walltime=04:00:00 \
533   --checkpoint 720 \
534   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
535
[78]536=head2 Signal, recurse and long job
[44]537
[78]538By default, OAR use signal USR2 for checkpointing.
[79]539It's possible to change this with option C<--kill>.
[78]540
541When use with long small job, checkpointing could be too long...
[79]542More than walltime!
543The option C<--transmit> could be use to checkpoint small job!
544These long small job will then stop cleanly and will be restarted next time.
[78]545
546In the C<logtrace> file, small job will have the status suspend.
[79]547They will be launch with the same command line at the next OAR run.
[78]548
[21]549=head1 SEE ALSO
550
[44]551oar-dispatch, mpilauncher,
552orsh, oar-envsh, ssh
[21]553
554
[13]555=head1 AUTHORS
556
[21]557Written by Gabriel Moreau, Grenoble - France
[13]558
[21]559
560=head1 LICENSE AND COPYRIGHT
561
562GPL version 2 or later and Perl equivalent
563
[28]564Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
[21]565
Note: See TracBrowser for help on using the repository browser.