source: trunk/oarutils/oar-parexec @ 82

Last change on this file since 82 was 82, checked in by g7moreau, 12 years ago
  • Delete tab
  • Tmp file more robust name
File size: 17.6 KB
RevLine 
[13]1#!/usr/bin/perl
2#
3# 2011/11/27 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
[32]16use Cwd qw( getcwd );
[13]17
[75]18my $file;
19my $dir;
20my $cmd;
21my $logtrace;
[13]22my $verbose;
[82]23my $job_np         = 1;
24my $nodefile       = $ENV{OAR_NODE_FILE} || '';
[32]25my $masterio;
[13]26my $switchio;
27my $help;
[82]28my $oarsh          = 'oarsh -q -T';
[75]29my $sig_transmit;
30my $sig_checkpoint = 'USR2';
[13]31
32Getopt::Long::GetOptions(
[47]33   'file=s'     => \$file,
[45]34   'dir=s'      => \$dir,
35   'cmd=s'      => \$cmd,
[43]36   'logtrace=s' => \$logtrace,
[32]37   'verbose'    => \$verbose,
38   'help'       => \$help,
39   'oarsh=s'    => \$oarsh,
[34]40   'jobnp=i'    => \$job_np,
[32]41   'nodefile=s' => \$nodefile,
42   'masterio=s' => \$masterio,
43   'switchio'   => \$switchio,
[75]44   'transmit'   => \$sig_transmit,
45   'kill=s'     => \$sig_checkpoint,
[41]46   ) || pod2usage(-verbose => 0);
47pod2usage(-verbose => 2) if $help;
[45]48pod2usage(-verbose => 2) if not (
[47]49 (-e "$file")
[45]50 or (-d "$dir" and $cmd ne '')
51 );
[13]52
[43]53# re-run, keep trace of job already done
[38]54my %state;
55my $log_h = IO::File->new();
[45]56if (-e "$logtrace") {
[43]57   $log_h->open("< $logtrace")
58      or die "error: can't read log file: $!";
[38]59   while (<$log_h>) {
[45]60      $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/;
61      $state{$1} = 'end'   if m/^end\s+job\s+([^\s]+)\s/;
[41]62      }
[38]63   $log_h->close();
64   }
[43]65if ($logtrace) {
66   $log_h->open(">> $logtrace")
67      or die "error: can't append log file $logtrace: $!";
[40]68   $log_h->autoflush;
[38]69   $log_h = unblock $log_h;
70   }
71
[43]72# job to run
[13]73my @job = ();
[47]74if (-e "$file") {
[45]75   my $job_num = 0;
[47]76   open(JOB_LIST, '<', "$file") or die "error: can't open job file $file: $!";
[77]77   while (my $job_cmd = <JOB_LIST>) {
78      chomp $job_cmd;
79      next if $job_cmd =~ m/^#/;
80      next if $job_cmd =~ m/^\s*$/;
[45]81      $job_num++;
[77]82      my ($job_name) = $job_cmd =~ m/#.*?\bname=(\S+?)\b/i;
83      $job_name ||= $job_num;
84      push @job, { name => $job_name, cmd => "$job_cmd" };
[45]85      }
86   close JOB_LIST;
[13]87   }
[45]88else {
89   opendir(DIR, $dir) or die "error: can't open folder $dir: $!";
90   while (my $item = readdir(DIR)) {
91      next if $item =~ m/^\./;
92      next if $item =~ m/:/;
93      next if $item =~ m/\.old$/;
94      next if $item =~ m/\.sav$/;
95      next if $item =~ m/\.bak$/;
96      next if $item =~ m/\.no$/;
97      next unless (-d "$dir/$item");
98      push @job, { name => $item, cmd => "( cd $dir/$item/; $cmd )" };
99      }
100   closedir DIR;
101   }
[13]102
[43]103# ressources available
[34]104my @ressources = ();
[41]105open(NODE_FILE, '<', "$nodefile")
[34]106   or die "can't open $nodefile: $!";
107while (<NODE_FILE>) {
108   chomp;
109   next if m/^#/;
110   next if m/^\s*$/;
[41]111   push @ressources, $_;
[34]112   }
113close NODE_FILE;
114
115my $ressource_size = scalar(@ressources);
[43]116die "error: not enought ressources jobnp $job_np > ressources $ressource_size"
[41]117   if $job_np > $ressource_size;
[34]118
119my $current_dir = getcwd();
120
[32]121my $stderr = $ENV{OAR_STDERR} || '';
[13]122$stderr =~ s/\.stderr$//;
[32]123$stderr = $masterio if $masterio;
124my $stdout = $ENV{OAR_STDOUT} || '';
[13]125$stdout =~ s/\.stdout$//;
[32]126$stdout = $masterio if $masterio;
[13]127
128my $finished = new Coro::Signal;
129my $job_todo = new Coro::Semaphore 0;
[45]130my $job_name_maxlen;
131for (@job) {
132   $job_todo->up;
133   $job_name_maxlen = length($_->{name}) if length($_->{name}) > $job_name_maxlen;
134   }
[13]135
[43]136# slice of ressources for parallel job
[13]137my $ressources = new Coro::Channel;
[34]138for my $slot (1 .. int($ressource_size / $job_np)) {
[41]139   $ressources->put(
140      join(',',
141         @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ])
142         );
[13]143   }
144
145my %scheduled = ();
146
[43]147# OAR checkpoint and default signal SIGUSR2
[39]148my $oar_checkpoint = new Coro::Semaphore 0;
[75]149$SIG{$sig_checkpoint} = sub {
[42]150   print "warning: receive checkpoint at "
151      . time
152      . ", no new job, just finishing running job\n"
153      if $verbose;
154   $oar_checkpoint->up();
155   };
[39]156
[81]157# asynchrone notify job
158async {
159   while () {
160      next if $oar_checkpoint->count() == 0;
161
162      # only notify with transmit flag
163      if (not $sig_transmit) {
164         cede;
165         next;
166         }
167
168      for my $job_pid (keys %scheduled) {
169         my $job_name     = $scheduled{$job_pid}->{name};
170         my $job_pidfile  = $scheduled{$job_pid}->{pidfile};
171         my $node_connect = $scheduled{$job_pid}->{node_connect};
172
173         my $fh = IO::File->new();
174         $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
175            or die "error: can't notify subjob: $!";
176
177         $fh->autoflush;
178         $fh = unblock $fh;
179
180         $fh->print("kill -$sig_checkpoint \$(cat $job_pidfile)");
181         $fh->print("exit\n");
182
183         print "warning: transmit signal $sig_checkpoint"
184            . " to $job_name on $node_connect.\n"
185            if $verbose;
[82]186
[81]187         close $fh;
188         cede;
189         }
190      }
191   }
192
[43]193# asynchrone start job block
[13]194async {
[81]195   JOB:
[13]196   for my $job (@job) {
[45]197      my $job_name = $job->{name};
198      my $job_cmd  = $job->{cmd};
[38]199
[43]200      # job has been already run ?
[45]201      if (exists $state{$job_name}) {
202         if ($state{$job_name} eq 'start') {
203            print "warning: job $job_name was not clearly finished, relaunching...\n"
[41]204               if $verbose;
205            }
[45]206         elsif ($state{$job_name} eq 'end') {
207            delete $state{$job_name}; # free memory
[41]208            $job_todo->down;
[45]209            print "warning: job $job_name already run\n" if $verbose;
[41]210            cede;
[43]211            next JOB;
[41]212            }
213         }
[40]214
[43]215      # take job ressource
[36]216      my $job_ressource = $ressources->get;
[13]217
[43]218      # no more launch job when OAR checkpointing
219      last JOB if $oar_checkpoint->count() > 0;
[39]220
[36]221      my ($node_connect) = split ',', $job_ressource;
[41]222      my $fh = IO::File->new();
[34]223      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
[43]224         or die "error: can't start subjob: $!";
[13]225
226      $fh->autoflush;
227      $fh = unblock $fh;
228
[45]229      my $msg = sprintf "start job %${job_name_maxlen}s / %5i at %s on node %s\n",
230         $job_name, $job_pid, time, $job_ressource;
[43]231      $log_h->print($msg) if $logtrace;
[42]232      print($msg) if $verbose;
[13]233
[41]234      my ($job_stdout, $job_stderr);
[45]235      $job_stdout = ">  $stdout-$job_name.stdout" if $stdout ne '' and $switchio;
236      $job_stderr = "2> $stderr-$job_name.stderr" if $stderr ne '' and $switchio;
[13]237
[82]238      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$ENV{OAR_JOB_ID}-$job_name";
239      my $job_pidfile  = "/tmp/oar-parexec-$ENV{LOGNAME}-$ENV{OAR_JOB_ID}-$job_name.pid";
[34]240
[81]241      $scheduled{$job_pid} = {
242         fh           => $fh,
243         node_connect => $node_connect,
244         ressource    => $job_ressource,
245         name         => $job_name,
246         pidfile      => $job_pidfile,
247         };
248
249      # set job environment, run it and clean
[34]250      if ($job_np > 1) {
[36]251         $fh->print("printf \""
[41]252               . join('\n', split(',', $job_ressource,))
253               . "\" > $job_nodefile\n");
[37]254         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
[34]255         $fh->print("OAR_NP=$job_np\n");
[37]256         $fh->print("export OAR_NODE_FILE\n");
[34]257         $fh->print("export OAR_NP\n");
258         $fh->print("unset OAR_MSG_NODEFILE\n");
259         }
[32]260      $fh->print("cd $current_dir\n");
[81]261      if ($sig_transmit) {
262         $fh->print("trap 'kill -$sig_checkpoint \$(jobs -p)' $sig_checkpoint\n");
263         $fh->print("echo \$\$ > $job_pidfile\n");
264         $fh->print("$job_cmd $job_stdout $job_stderr &\n");
265         $fh->print("while [ \$(jobs -p | wc -l) -gt 0 ]\n");
266         $fh->print("do\n");
267         $fh->print("   wait\n");
268         $fh->print("done\n");
269         $fh->print("rm -f $job_pidfile\n");
270         }
271      else {
272         $fh->print("$job_cmd $job_stdout $job_stderr\n");
273         }
[34]274      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
[13]275      $fh->print("exit\n");
276      cede;
277      }
278   }
279
[43]280# asynchrone end job block
[13]281async {
282   while () {
[41]283      for my $job_pid (keys %scheduled) {
[82]284         # non blocking PID test
[41]285         if (waitpid($job_pid, WNOHANG)) {
[45]286            my $msg = sprintf "end   job %${job_name_maxlen}s / %5i at %s on node %s\n",
287               $scheduled{$job_pid}->{name},
[42]288               $job_pid, time, $scheduled{$job_pid}->{ressource};
[76]289
290            # Job non finish, just suspend if received checkpoint signal
291            $msg =~ s/^end\s+job/suspend job/
292               if $sig_transmit and $oar_checkpoint->count() > 0;
293
[43]294            $log_h->print($msg) if $logtrace;
[42]295            print($msg) if $verbose;
[13]296            close $scheduled{$job_pid}->{fh};
[43]297            # leave ressources for another job
[41]298            $ressources->put($scheduled{$job_pid}->{ressource});
[13]299            $job_todo->down;
300            delete $scheduled{$job_pid};
301            }
302         cede;
303         }
304
[43]305      # checkpointing ! just finishing running job and quit
[42]306      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
[39]307
[42]308      $finished->send if $job_todo->count() == 0;
[13]309      cede;
310      }
311   }
312
313cede;
314
[43]315# all job have been done
[13]316$finished->wait;
317
[43]318# close log trace file
319$log_h->close() if $logtrace;
[38]320
[13]321__END__
322
323=head1 NAME
324
[44]325oar-parexec - parallel execution of many small job
[13]326
327=head1 SYNOPSIS
328
[47]329 oar-parexec --file filecommand \
330    [--logtrace tracefile] [--verbose] \
331    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
332    [--switchio] [--masterio basefileio]
[46]333
[47]334 oar-parexec --dir foldertoiterate --cmd commandtolaunch \
335    [--logtrace tracefile] [--verbose] \
336    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
337    [--switchio] [--masterio basefileio]
[46]338
[13]339 oar-parexec --help
340
[32]341=head1 DESCRIPTION
342
[44]343C<oar-parexec> can execute lot of small job in parallel inside a cluster.
344Number of parallel job at one time cannot exceed the number of core define in the node file
[32]345C<oar-parexec> is easier to use inside an OAR job environment
[44]346which define automatically these strategics parameters...
347However, it can be used outside OAR.
[32]348
[47]349Option C<--file> or C<--dir> and C<--cmd> are the only mandatory parameters.
[32]350
351Small job will be launch in the same folder as the master job.
[44]352Two environment variable are defined for each small job
[37]353and only in case of parallel small job (option C<--jobnp> > 1).
[32]354
[34]355 OAR_NODE_FILE - file that list node for parallel computing
356 OAR_NP        - number of processor affected
[32]357
[44]358The file define by OAR_NODE_FILE is created  in /tmp
359on the node before launching the small job
360and this file will be delete after job complete.
[34]361C<oar-parexec> is a simple script,
362OAR_NODE_FILE will not be deleted in case of crash of the master job.
363
[37]364OAR define other variable that are equivalent to OAR_NODE_FILE:
365OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
366You can use in your script the OAR original file ressources
367by using these variable if you need it.
[34]368
[82]369
[13]370=head1 OPTIONS
371
[32]372=over 12
[13]373
[47]374=item B<-f|--file filecommand>
[13]375
[32]376File name which content job list.
[45]377For the JOB_NAME definition,
378the first valid job in the list will have the number 1 and so on...
[13]379
[77]380It's possible to fix the name inside a comment on the job line.
381For example:
382
383 $HOME/test/subjob1.sh # name=subjob1
384
385The key C<name> is case insensitive,
386the associated value cannot have a space...
387
[47]388=item B<-d|--dir foldertoiterate>
[45]389
390Command C<--cmd> will be launch in all sub-folder of this master folder.
391Files in this folder will be ignored.
[47]392Sub-folder name which begin with F<.>
393or finish with F<.old>, F<.sav>, F<.bak>, F<.no> will either be ignored...
[45]394
395The JOB_NAME is simply the Sub-folder name.
396
397=item B<-c|--cmd commandtolaunch>
398
399Command (and argument to it) tha will be launch in all sub-folder
400parameter folfer C<--dir>
401
[43]402=item B<-l|--logtrace tracefile>
403
404File which log and trace running job.
[44]405In case of running the same master command (after crash for example),
406only job that are not mark as done will be run again.
407Be careful, job mark as running (start but not finish) will be run again.
[45]408Tracing is base on the JOB_NAME between multiple run.
[43]409
410This option is very usefull in case of crash
411but also for checkpointing and idempotent OAR job.
412
[32]413=item B<-v|--verbose>
[13]414
[34]415=item B<-j|--jobnp integer>
[13]416
[34]417Number of processor to allocated for each small job.
4181 by default.
419
420=item B<-n|--nodefile filenode>
421
[44]422File name that list all the node where job could be launch.
[32]423By defaut, it's define automatically by OAR via
424environment variable C<OAR_NODE_FILE>.
[13]425
[32]426For example, if you want to use 6 core on your cluster node,
427you need to put 6 times the hostname node in this file,
428one per line...
429It's a very common file in MPI process !
[13]430
[46]431=item B<-o|-oarsh command>
[13]432
[46]433Command use to launch a shell on a node.
434By default
[13]435
[46]436 oarsh -q -T
437
438Change it to C<ssh> if you are not using an OAR cluster...
439
[32]440=item B<-s|--switchio>
[21]441
[32]442Each small job will have it's own output STDOUT and STDERR
[45]443base on master OAR job with C<JOB_NAME> inside
[32]444(or base on C<basefileio> if option C<masterio>).
445Example :
[21]446
[45]447 OAR.151524.stdout -> OAR.151524-JOB_NAME.stdout
[21]448
[32]449where 151524 here is the master C<OAR_JOB_ID>
[45]450and C<JOB_NAME> is the small job name.
[21]451
[46]452=item B<-m|--masterio basefileio>
[32]453
[46]454The C<basefileio> will be use in place of environment variable
455C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
456(only use when option C<swithio> is activated).
[32]457
[78]458=item B<-k|--kill signal>
459
460Signal to listen and make a clean stop of the current C<oar-parexec> process.
461By default, use USR2 signal (see C<kill -l>> for a list of possible signal).
462
463=item B<-t|--transmit>
464
465Resend catch signal to sub-job when receiving it.
466By default, no signal is transmis to child process.
467
468It's only valuable if use for long sub-job than can
469in return make themselves a clean restart.
470
471
[32]472=item B<-h|--help>
473
474=back
475
476
477=head1 EXAMPLE
478
[44]479=head2 Simple list of sequential job
480
[47]481Content for the job file command (option C<--file>) could have:
[21]482
[13]483 - empty line
484 - comment line begin with #
485 - valid shell command
486
487Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
488
[77]489 $HOME/test/subjob01.sh  # name=subjob01
490 $HOME/test/subjob02.sh  # name=subjob02
491 $HOME/test/subjob03.sh  # name=subjob03
492 $HOME/test/subjob04.sh  # name=subjob04
[32]493 ...
[77]494 $HOME/test/subjob38.sh  # name=subjob38
495 $HOME/test/subjob39.sh  # name=subjob39
496 $HOME/test/subjob40.sh  # name=subjob40
[13]497
[44]498These jobs could be launch by:
[13]499
[49]500 oarsub -n test -l /core=6,walltime=04:00:00 \
501   "oar-parexec -f ./subjob.list.txt"
[13]502
[47]503=head2 Folder job
504
505In a folder F<subjob.d>, create sub-folder with your data inside : F<test1>, <test2>...
506The same command will be executed in every sub-folder.
507C<oar-parexec> change the current directory to the sub-folder before launching it.
508
509A very simple job could be:
510
[49]511 oarsub -n test -l /core=6,walltime=04:00:00 \
512   "oar-parexec -d ./subjob.d -c 'sleep 10; env'"
[47]513
514The command C<env> will be excuted in all folder F<test1>, F<test2>... after a 10s pause.
515
516Sometime, it's simpler to use file list command,
517sometime, jobs by folder with the same command run is more relevant.
518
[44]519=head2 Parallel job
[28]520
[44]521You need to put the number of core each small job need with option C<--jobnp>.
522If your job is build on OpenMP or MPI,
523you can use OAR_NP and OAR_NODE_FILE variables to configure them.
524On OAR cluster, you need to use C<oarsh> or a wrapper like C<oar-envsh>
525for connexion between node instead of C<ssh>.
526
527Example with parallel small job on 2 core:
528
[49]529 oarsub -n test -l /core=6,walltime=04:00:00 \
530   "oar-parexec -j 2 -f ./subjob.list.txt"
[44]531
532=head2 Tracing and master crash
533
534If the master node crash after hours of calculus, everything is lost ?
535No, with option C<--logtrace>,
536it's possible to remember older result
537and not re-run these job the second and next time.
538
[49]539 oarsub -n test -l /core=6,walltime=04:00:00 \
540   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
[44]541
542After a crash or an C<oardel> command,
543you can then re-run the same command that will end to execute the jobs in the list
544
[49]545 oarsub -n test -l /core=6,walltime=04:00:00 \
546   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
[44]547
548C<logtrace> file are just plain file.
549We use the extension '.log' because these files are automatically
550eliminate from our backup system!
551
552=head2 Checkpointing and Idempotent
553
554C<oar-parexec> is compatible with the OAR checkpointing.
555Il you have 2000 small jobs that need 55h to be done on 6 cores,
556you can cut this in small parts.
557
558For this example, we suppose that each small job need about 10min...
559So, we send a checkpoint 12min before the end of the process
560to let C<oar-parexec> finish the jobs started.
561After being checkpointed, C<oar-parexec> do not start any new small job.
562
[49]563 oarsub -t idempotent -n test \
564   -l /core=6,walltime=04:00:00 \
565   --checkpoint 720 \
[44]566   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
567
568After 3h48min, the OAR job will begin to stop launching new small job.
569When all running small job are finished, it's exit.
570But as the OAR job is type C<idempotent>,
571OAR will re-submit it as long as all small job are not executed...
572
573This way, we let other users a chance to use the cluster!
574
575In this last exemple, we use moldable OAR job with idempotent
576to reserve many core for a small time or a few cores for a long time:
577
578 oarsub -t idempotent -n test \
579   -l /core=50,walltime=01:05:00 \
580   -l /core=6,walltime=04:00:00 \
581   --checkpoint 720 \
582   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
583
[78]584=head2 Signal, recurse and long job
[44]585
[78]586By default, OAR use signal USR2 for checkpointing.
[79]587It's possible to change this with option C<--kill>.
[78]588
589When use with long small job, checkpointing could be too long...
[79]590More than walltime!
591The option C<--transmit> could be use to checkpoint small job!
592These long small job will then stop cleanly and will be restarted next time.
[78]593
594In the C<logtrace> file, small job will have the status suspend.
[79]595They will be launch with the same command line at the next OAR run.
[78]596
[21]597=head1 SEE ALSO
598
[44]599oar-dispatch, mpilauncher,
600orsh, oar-envsh, ssh
[21]601
602
[13]603=head1 AUTHORS
604
[21]605Written by Gabriel Moreau, Grenoble - France
[13]606
[21]607
608=head1 LICENSE AND COPYRIGHT
609
610GPL version 2 or later and Perl equivalent
611
[28]612Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
[21]613
Note: See TracBrowser for help on using the repository browser.