source: trunk/oarutils/oar-parexec @ 75

Last change on this file since 75 was 75, checked in by g7moreau, 12 years ago
  • Add optin --kill and --transmit for signal handling
File size: 14.6 KB
RevLine 
[13]1#!/usr/bin/perl
2#
3# 2011/11/27 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
[32]16use Cwd qw( getcwd );
[13]17
[75]18my $file;
19my $dir;
20my $cmd;
21my $logtrace;
[13]22my $verbose;
[75]23my $job_np         = 1;
24my $nodefile       = $ENV{OAR_NODE_FILE} || '';
[32]25my $masterio;
[13]26my $switchio;
27my $help;
[75]28my $oarsh          = 'oarsh -q -T';
29my $sig_transmit;
30my $sig_checkpoint = 'USR2';
[13]31
32Getopt::Long::GetOptions(
[47]33   'file=s'     => \$file,
[45]34   'dir=s'      => \$dir,
35   'cmd=s'      => \$cmd,
[43]36   'logtrace=s' => \$logtrace,
[32]37   'verbose'    => \$verbose,
38   'help'       => \$help,
39   'oarsh=s'    => \$oarsh,
[34]40   'jobnp=i'    => \$job_np,
[32]41   'nodefile=s' => \$nodefile,
42   'masterio=s' => \$masterio,
43   'switchio'   => \$switchio,
[75]44   'transmit'   => \$sig_transmit,
45   'kill=s'     => \$sig_checkpoint,
[41]46   ) || pod2usage(-verbose => 0);
47pod2usage(-verbose => 2) if $help;
[45]48pod2usage(-verbose => 2) if not (
[47]49 (-e "$file")
[45]50 or (-d "$dir" and $cmd ne '')
51 );
[13]52
[43]53# re-run, keep trace of job already done
[38]54my %state;
55my $log_h = IO::File->new();
[45]56if (-e "$logtrace") {
[43]57   $log_h->open("< $logtrace")
58      or die "error: can't read log file: $!";
[38]59   while (<$log_h>) {
[45]60      $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/;
61      $state{$1} = 'end'   if m/^end\s+job\s+([^\s]+)\s/;
[41]62      }
[38]63   $log_h->close();
64   }
[43]65if ($logtrace) {
66   $log_h->open(">> $logtrace")
67      or die "error: can't append log file $logtrace: $!";
[40]68   $log_h->autoflush;
[38]69   $log_h = unblock $log_h;
70   }
71
[43]72# job to run
[13]73my @job = ();
[47]74if (-e "$file") {
[45]75   my $job_num = 0;
[47]76   open(JOB_LIST, '<', "$file") or die "error: can't open job file $file: $!";
[45]77   while (<JOB_LIST>) {
78      chomp;
79      next if m/^#/;
80      next if m/^\s*$/;
81      $job_num++;
82      push @job, { name => $job_num, cmd => "$_" };
83      }
84   close JOB_LIST;
[13]85   }
[45]86else {
87   opendir(DIR, $dir) or die "error: can't open folder $dir: $!";
88   while (my $item = readdir(DIR)) {
89      next if $item =~ m/^\./;
90      next if $item =~ m/:/;
91      next if $item =~ m/\.old$/;
92      next if $item =~ m/\.sav$/;
93      next if $item =~ m/\.bak$/;
94      next if $item =~ m/\.no$/;
95      next unless (-d "$dir/$item");
96      push @job, { name => $item, cmd => "( cd $dir/$item/; $cmd )" };
97      }
98   closedir DIR;
99   }
[13]100
[43]101# ressources available
[34]102my @ressources = ();
[41]103open(NODE_FILE, '<', "$nodefile")
[34]104   or die "can't open $nodefile: $!";
105while (<NODE_FILE>) {
106   chomp;
107   next if m/^#/;
108   next if m/^\s*$/;
[41]109   push @ressources, $_;
[34]110   }
111close NODE_FILE;
112
113my $ressource_size = scalar(@ressources);
[43]114die "error: not enought ressources jobnp $job_np > ressources $ressource_size"
[41]115   if $job_np > $ressource_size;
[34]116
117my $current_dir = getcwd();
118
[32]119my $stderr = $ENV{OAR_STDERR} || '';
[13]120$stderr =~ s/\.stderr$//;
[32]121$stderr = $masterio if $masterio;
122my $stdout = $ENV{OAR_STDOUT} || '';
[13]123$stdout =~ s/\.stdout$//;
[32]124$stdout = $masterio if $masterio;
[13]125
126my $finished = new Coro::Signal;
127my $job_todo = new Coro::Semaphore 0;
[45]128my $job_name_maxlen;
129for (@job) {
130   $job_todo->up;
131   $job_name_maxlen = length($_->{name}) if length($_->{name}) > $job_name_maxlen;
132   }
[13]133
[43]134# slice of ressources for parallel job
[13]135my $ressources = new Coro::Channel;
[34]136for my $slot (1 .. int($ressource_size / $job_np)) {
[41]137   $ressources->put(
138      join(',',
139         @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ])
140         );
[13]141   }
142
143my %scheduled = ();
144
[43]145# OAR checkpoint and default signal SIGUSR2
[39]146my $oar_checkpoint = new Coro::Semaphore 0;
[75]147$SIG{$sig_checkpoint} = sub {
[42]148   print "warning: receive checkpoint at "
149      . time
150      . ", no new job, just finishing running job\n"
151      if $verbose;
152   $oar_checkpoint->up();
[75]153   kill $sig_checkpoint => keys %scheduled if $sig_transmit;
[42]154   };
[39]155
[43]156# asynchrone start job block
[13]157async {
[43]158        JOB:
[13]159   for my $job (@job) {
[45]160      my $job_name = $job->{name};
161      my $job_cmd  = $job->{cmd};
[38]162
[43]163      # job has been already run ?
[45]164      if (exists $state{$job_name}) {
165         if ($state{$job_name} eq 'start') {
166            print "warning: job $job_name was not clearly finished, relaunching...\n"
[41]167               if $verbose;
168            }
[45]169         elsif ($state{$job_name} eq 'end') {
170            delete $state{$job_name}; # free memory
[41]171            $job_todo->down;
[45]172            print "warning: job $job_name already run\n" if $verbose;
[41]173            cede;
[43]174            next JOB;
[41]175            }
176         }
[40]177
[43]178      # take job ressource
[36]179      my $job_ressource = $ressources->get;
[13]180
[43]181      # no more launch job when OAR checkpointing
182      last JOB if $oar_checkpoint->count() > 0;
[39]183
[36]184      my ($node_connect) = split ',', $job_ressource;
[41]185      my $fh = IO::File->new();
[34]186      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
[43]187         or die "error: can't start subjob: $!";
[13]188
189      $fh->autoflush;
190      $fh = unblock $fh;
191
[41]192      $scheduled{$job_pid} = {
193         fh           => $fh,
194         node_connect => $node_connect,
195         ressource    => $job_ressource,
[45]196         name         => $job_name
[41]197         };
[13]198
[45]199      my $msg = sprintf "start job %${job_name_maxlen}s / %5i at %s on node %s\n",
200         $job_name, $job_pid, time, $job_ressource;
[43]201      $log_h->print($msg) if $logtrace;
[42]202      print($msg) if $verbose;
[13]203
[41]204      my ($job_stdout, $job_stderr);
[45]205      $job_stdout = ">  $stdout-$job_name.stdout" if $stdout ne '' and $switchio;
206      $job_stderr = "2> $stderr-$job_name.stderr" if $stderr ne '' and $switchio;
[13]207
[45]208      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$job_name";
[34]209
[43]210     # set job environment, run it and clean
[34]211      if ($job_np > 1) {
[36]212         $fh->print("printf \""
[41]213               . join('\n', split(',', $job_ressource,))
214               . "\" > $job_nodefile\n");
[37]215         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
[34]216         $fh->print("OAR_NP=$job_np\n");
[37]217         $fh->print("export OAR_NODE_FILE\n");
[34]218         $fh->print("export OAR_NP\n");
219         $fh->print("unset OAR_MSG_NODEFILE\n");
220         }
[32]221      $fh->print("cd $current_dir\n");
[45]222      $fh->print("$job_cmd $job_stdout $job_stderr\n");
[34]223      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
[13]224      $fh->print("exit\n");
225      cede;
226      }
227   }
228
[43]229# asynchrone end job block
[13]230async {
231   while () {
[41]232      for my $job_pid (keys %scheduled) {
[43]233                        # non blocking PID test
[41]234         if (waitpid($job_pid, WNOHANG)) {
[45]235            my $msg = sprintf "end   job %${job_name_maxlen}s / %5i at %s on node %s\n",
236               $scheduled{$job_pid}->{name},
[42]237               $job_pid, time, $scheduled{$job_pid}->{ressource};
[43]238            $log_h->print($msg) if $logtrace;
[42]239            print($msg) if $verbose;
[13]240            close $scheduled{$job_pid}->{fh};
[43]241            # leave ressources for another job
[41]242            $ressources->put($scheduled{$job_pid}->{ressource});
[13]243            $job_todo->down;
244            delete $scheduled{$job_pid};
245            }
246         cede;
247         }
248
[43]249      # checkpointing ! just finishing running job and quit
[42]250      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
[39]251
[42]252      $finished->send if $job_todo->count() == 0;
[13]253      cede;
254      }
255   }
256
257cede;
258
[43]259# all job have been done
[13]260$finished->wait;
261
[43]262# close log trace file
263$log_h->close() if $logtrace;
[38]264
[13]265__END__
266
267=head1 NAME
268
[44]269oar-parexec - parallel execution of many small job
[13]270
271=head1 SYNOPSIS
272
[47]273 oar-parexec --file filecommand \
274    [--logtrace tracefile] [--verbose] \
275    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
276    [--switchio] [--masterio basefileio]
[46]277
[47]278 oar-parexec --dir foldertoiterate --cmd commandtolaunch \
279    [--logtrace tracefile] [--verbose] \
280    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
281    [--switchio] [--masterio basefileio]
[46]282
[13]283 oar-parexec --help
284
[32]285=head1 DESCRIPTION
286
[44]287C<oar-parexec> can execute lot of small job in parallel inside a cluster.
288Number of parallel job at one time cannot exceed the number of core define in the node file
[32]289C<oar-parexec> is easier to use inside an OAR job environment
[44]290which define automatically these strategics parameters...
291However, it can be used outside OAR.
[32]292
[47]293Option C<--file> or C<--dir> and C<--cmd> are the only mandatory parameters.
[32]294
295Small job will be launch in the same folder as the master job.
[44]296Two environment variable are defined for each small job
[37]297and only in case of parallel small job (option C<--jobnp> > 1).
[32]298
[34]299 OAR_NODE_FILE - file that list node for parallel computing
300 OAR_NP        - number of processor affected
[32]301
[44]302The file define by OAR_NODE_FILE is created  in /tmp
303on the node before launching the small job
304and this file will be delete after job complete.
[34]305C<oar-parexec> is a simple script,
306OAR_NODE_FILE will not be deleted in case of crash of the master job.
307
[37]308OAR define other variable that are equivalent to OAR_NODE_FILE:
309OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
310You can use in your script the OAR original file ressources
311by using these variable if you need it.
312 
[34]313
[13]314=head1 OPTIONS
315
[32]316=over 12
[13]317
[47]318=item B<-f|--file filecommand>
[13]319
[32]320File name which content job list.
[45]321For the JOB_NAME definition,
322the first valid job in the list will have the number 1 and so on...
[13]323
[47]324=item B<-d|--dir foldertoiterate>
[45]325
326Command C<--cmd> will be launch in all sub-folder of this master folder.
327Files in this folder will be ignored.
[47]328Sub-folder name which begin with F<.>
329or finish with F<.old>, F<.sav>, F<.bak>, F<.no> will either be ignored...
[45]330
331The JOB_NAME is simply the Sub-folder name.
332
333=item B<-c|--cmd commandtolaunch>
334
335Command (and argument to it) tha will be launch in all sub-folder
336parameter folfer C<--dir>
337
[43]338=item B<-l|--logtrace tracefile>
339
340File which log and trace running job.
[44]341In case of running the same master command (after crash for example),
342only job that are not mark as done will be run again.
343Be careful, job mark as running (start but not finish) will be run again.
[45]344Tracing is base on the JOB_NAME between multiple run.
[43]345
346This option is very usefull in case of crash
347but also for checkpointing and idempotent OAR job.
348
[32]349=item B<-v|--verbose>
[13]350
[34]351=item B<-j|--jobnp integer>
[13]352
[34]353Number of processor to allocated for each small job.
3541 by default.
355
356=item B<-n|--nodefile filenode>
357
[44]358File name that list all the node where job could be launch.
[32]359By defaut, it's define automatically by OAR via
360environment variable C<OAR_NODE_FILE>.
[13]361
[32]362For example, if you want to use 6 core on your cluster node,
363you need to put 6 times the hostname node in this file,
364one per line...
365It's a very common file in MPI process !
[13]366
[46]367=item B<-o|-oarsh command>
[13]368
[46]369Command use to launch a shell on a node.
370By default
[13]371
[46]372 oarsh -q -T
373
374Change it to C<ssh> if you are not using an OAR cluster...
375
[32]376=item B<-s|--switchio>
[21]377
[32]378Each small job will have it's own output STDOUT and STDERR
[45]379base on master OAR job with C<JOB_NAME> inside
[32]380(or base on C<basefileio> if option C<masterio>).
381Example :
[21]382
[45]383 OAR.151524.stdout -> OAR.151524-JOB_NAME.stdout
[21]384
[32]385where 151524 here is the master C<OAR_JOB_ID>
[45]386and C<JOB_NAME> is the small job name.
[21]387
[46]388=item B<-m|--masterio basefileio>
[32]389
[46]390The C<basefileio> will be use in place of environment variable
391C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
392(only use when option C<swithio> is activated).
[32]393
394=item B<-h|--help>
395
396=back
397
398
399=head1 EXAMPLE
400
[44]401=head2 Simple list of sequential job
402
[47]403Content for the job file command (option C<--file>) could have:
[21]404
[13]405 - empty line
406 - comment line begin with #
407 - valid shell command
408
409Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
410
411 $HOME/test/subjob1.sh
412 $HOME/test/subjob2.sh
413 $HOME/test/subjob3.sh
414 $HOME/test/subjob4.sh
[32]415 ...
[13]416 $HOME/test/subjob38.sh
417 $HOME/test/subjob39.sh
418 $HOME/test/subjob40.sh
419
[44]420These jobs could be launch by:
[13]421
[49]422 oarsub -n test -l /core=6,walltime=04:00:00 \
423   "oar-parexec -f ./subjob.list.txt"
[13]424
[47]425=head2 Folder job
426
427In a folder F<subjob.d>, create sub-folder with your data inside : F<test1>, <test2>...
428The same command will be executed in every sub-folder.
429C<oar-parexec> change the current directory to the sub-folder before launching it.
430
431A very simple job could be:
432
[49]433 oarsub -n test -l /core=6,walltime=04:00:00 \
434   "oar-parexec -d ./subjob.d -c 'sleep 10; env'"
[47]435
436The command C<env> will be excuted in all folder F<test1>, F<test2>... after a 10s pause.
437
438Sometime, it's simpler to use file list command,
439sometime, jobs by folder with the same command run is more relevant.
440
[44]441=head2 Parallel job
[28]442
[44]443You need to put the number of core each small job need with option C<--jobnp>.
444If your job is build on OpenMP or MPI,
445you can use OAR_NP and OAR_NODE_FILE variables to configure them.
446On OAR cluster, you need to use C<oarsh> or a wrapper like C<oar-envsh>
447for connexion between node instead of C<ssh>.
448
449Example with parallel small job on 2 core:
450
[49]451 oarsub -n test -l /core=6,walltime=04:00:00 \
452   "oar-parexec -j 2 -f ./subjob.list.txt"
[44]453
454=head2 Tracing and master crash
455
456If the master node crash after hours of calculus, everything is lost ?
457No, with option C<--logtrace>,
458it's possible to remember older result
459and not re-run these job the second and next time.
460
[49]461 oarsub -n test -l /core=6,walltime=04:00:00 \
462   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
[44]463
464After a crash or an C<oardel> command,
465you can then re-run the same command that will end to execute the jobs in the list
466
[49]467 oarsub -n test -l /core=6,walltime=04:00:00 \
468   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
[44]469
470C<logtrace> file are just plain file.
471We use the extension '.log' because these files are automatically
472eliminate from our backup system!
473
474=head2 Checkpointing and Idempotent
475
476C<oar-parexec> is compatible with the OAR checkpointing.
477Il you have 2000 small jobs that need 55h to be done on 6 cores,
478you can cut this in small parts.
479
480For this example, we suppose that each small job need about 10min...
481So, we send a checkpoint 12min before the end of the process
482to let C<oar-parexec> finish the jobs started.
483After being checkpointed, C<oar-parexec> do not start any new small job.
484
[49]485 oarsub -t idempotent -n test \
486   -l /core=6,walltime=04:00:00 \
487   --checkpoint 720 \
[44]488   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
489
490After 3h48min, the OAR job will begin to stop launching new small job.
491When all running small job are finished, it's exit.
492But as the OAR job is type C<idempotent>,
493OAR will re-submit it as long as all small job are not executed...
494
495This way, we let other users a chance to use the cluster!
496
497In this last exemple, we use moldable OAR job with idempotent
498to reserve many core for a small time or a few cores for a long time:
499
500 oarsub -t idempotent -n test \
501   -l /core=50,walltime=01:05:00 \
502   -l /core=6,walltime=04:00:00 \
503   --checkpoint 720 \
504   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
505
506
[21]507=head1 SEE ALSO
508
[44]509oar-dispatch, mpilauncher,
510orsh, oar-envsh, ssh
[21]511
512
[13]513=head1 AUTHORS
514
[21]515Written by Gabriel Moreau, Grenoble - France
[13]516
[21]517
518=head1 LICENSE AND COPYRIGHT
519
520GPL version 2 or later and Perl equivalent
521
[28]522Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
[21]523
Note: See TracBrowser for help on using the repository browser.