source: trunk/oarutils/oar-parexec @ 83

Last change on this file since 83 was 83, checked in by g7moreau, 12 years ago
  • Add folder parameter
  • Rewrite notify async section
File size: 17.7 KB
RevLine 
[13]1#!/usr/bin/perl
2#
3# 2011/11/27 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
[32]16use Cwd qw( getcwd );
[13]17
[75]18my $file;
19my $dir;
20my $cmd;
21my $logtrace;
[13]22my $verbose;
[82]23my $job_np         = 1;
24my $nodefile       = $ENV{OAR_NODE_FILE} || '';
[32]25my $masterio;
[13]26my $switchio;
27my $help;
[82]28my $oarsh          = 'oarsh -q -T';
[75]29my $sig_transmit;
30my $sig_checkpoint = 'USR2';
[13]31
32Getopt::Long::GetOptions(
[47]33   'file=s'     => \$file,
[45]34   'dir=s'      => \$dir,
35   'cmd=s'      => \$cmd,
[43]36   'logtrace=s' => \$logtrace,
[32]37   'verbose'    => \$verbose,
38   'help'       => \$help,
39   'oarsh=s'    => \$oarsh,
[34]40   'jobnp=i'    => \$job_np,
[32]41   'nodefile=s' => \$nodefile,
42   'masterio=s' => \$masterio,
43   'switchio'   => \$switchio,
[75]44   'transmit'   => \$sig_transmit,
45   'kill=s'     => \$sig_checkpoint,
[41]46   ) || pod2usage(-verbose => 0);
47pod2usage(-verbose => 2) if $help;
[45]48pod2usage(-verbose => 2) if not (
[47]49 (-e "$file")
[45]50 or (-d "$dir" and $cmd ne '')
51 );
[13]52
[43]53# re-run, keep trace of job already done
[38]54my %state;
55my $log_h = IO::File->new();
[45]56if (-e "$logtrace") {
[43]57   $log_h->open("< $logtrace")
58      or die "error: can't read log file: $!";
[38]59   while (<$log_h>) {
[45]60      $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/;
61      $state{$1} = 'end'   if m/^end\s+job\s+([^\s]+)\s/;
[41]62      }
[38]63   $log_h->close();
64   }
[43]65if ($logtrace) {
66   $log_h->open(">> $logtrace")
67      or die "error: can't append log file $logtrace: $!";
[40]68   $log_h->autoflush;
[38]69   $log_h = unblock $log_h;
70   }
71
[43]72# job to run
[13]73my @job = ();
[47]74if (-e "$file") {
[45]75   my $job_num = 0;
[47]76   open(JOB_LIST, '<', "$file") or die "error: can't open job file $file: $!";
[77]77   while (my $job_cmd = <JOB_LIST>) {
78      chomp $job_cmd;
79      next if $job_cmd =~ m/^#/;
80      next if $job_cmd =~ m/^\s*$/;
[45]81      $job_num++;
[77]82      my ($job_name) = $job_cmd =~ m/#.*?\bname=(\S+?)\b/i;
83      $job_name ||= $job_num;
[83]84      my ($job_folder) = $job_cmd =~ m/#.*?\bfolder=(\S+?)\b/i;
85      $job_folder ||= './';
86      push @job, { name => $job_name, cmd => "$job_cmd", folder => $job_folder };
[45]87      }
88   close JOB_LIST;
[13]89   }
[45]90else {
91   opendir(DIR, $dir) or die "error: can't open folder $dir: $!";
92   while (my $item = readdir(DIR)) {
93      next if $item =~ m/^\./;
94      next if $item =~ m/:/;
95      next if $item =~ m/\.old$/;
96      next if $item =~ m/\.sav$/;
97      next if $item =~ m/\.bak$/;
98      next if $item =~ m/\.no$/;
99      next unless (-d "$dir/$item");
100      push @job, { name => $item, cmd => "( cd $dir/$item/; $cmd )" };
101      }
102   closedir DIR;
103   }
[13]104
[43]105# ressources available
[34]106my @ressources = ();
[41]107open(NODE_FILE, '<', "$nodefile")
[34]108   or die "can't open $nodefile: $!";
109while (<NODE_FILE>) {
110   chomp;
111   next if m/^#/;
112   next if m/^\s*$/;
[41]113   push @ressources, $_;
[34]114   }
115close NODE_FILE;
116
117my $ressource_size = scalar(@ressources);
[43]118die "error: not enought ressources jobnp $job_np > ressources $ressource_size"
[41]119   if $job_np > $ressource_size;
[34]120
121my $current_dir = getcwd();
122
[32]123my $stderr = $ENV{OAR_STDERR} || '';
[13]124$stderr =~ s/\.stderr$//;
[32]125$stderr = $masterio if $masterio;
126my $stdout = $ENV{OAR_STDOUT} || '';
[13]127$stdout =~ s/\.stdout$//;
[32]128$stdout = $masterio if $masterio;
[13]129
130my $finished = new Coro::Signal;
131my $job_todo = new Coro::Semaphore 0;
[45]132my $job_name_maxlen;
133for (@job) {
134   $job_todo->up;
135   $job_name_maxlen = length($_->{name}) if length($_->{name}) > $job_name_maxlen;
136   }
[13]137
[43]138# slice of ressources for parallel job
[13]139my $ressources = new Coro::Channel;
[34]140for my $slot (1 .. int($ressource_size / $job_np)) {
[41]141   $ressources->put(
142      join(',',
143         @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ])
144         );
[13]145   }
146
147my %scheduled = ();
148
[43]149# OAR checkpoint and default signal SIGUSR2
[39]150my $oar_checkpoint = new Coro::Semaphore 0;
[75]151$SIG{$sig_checkpoint} = sub {
[42]152   print "warning: receive checkpoint at "
153      . time
154      . ", no new job, just finishing running job\n"
155      if $verbose;
156   $oar_checkpoint->up();
157   };
[39]158
[81]159# asynchrone notify job
160async {
[83]161   NOTIFY:
[81]162   while () {
163      # only notify with transmit flag
[83]164      if ($sig_transmit and $oar_checkpoint->count() > 0) {
[81]165
[83]166     for my $job_pid (keys %scheduled) {
167        my $job_name     = $scheduled{$job_pid}->{name};
168        my $job_pidfile  = $scheduled{$job_pid}->{pidfile};
169        my $node_connect = $scheduled{$job_pid}->{node_connect};
[81]170
[83]171        my $fh = IO::File->new();
172        $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
173           or die "error: can't notify subjob: $!";
[81]174
[83]175        $fh->autoflush;
176        $fh = unblock $fh;
[81]177
[83]178        $fh->print("kill -$sig_checkpoint \$(cat $job_pidfile)");
179        $fh->print("exit\n");
[81]180
[83]181        print "warning: transmit signal $sig_checkpoint"
182           . " to $job_name on $node_connect.\n"
183           if $verbose;
[82]184
[83]185        close $fh;
186        cede;
187        }
[81]188         }
[83]189
190      cede;
[81]191      }
192   }
193
[43]194# asynchrone start job block
[13]195async {
[81]196   JOB:
[13]197   for my $job (@job) {
[83]198      my $job_name   = $job->{name};
199      my $job_cmd    = $job->{cmd};
200      my $job_folder = $job->{folder};
[38]201
[43]202      # job has been already run ?
[45]203      if (exists $state{$job_name}) {
204         if ($state{$job_name} eq 'start') {
205            print "warning: job $job_name was not clearly finished, relaunching...\n"
[41]206               if $verbose;
207            }
[45]208         elsif ($state{$job_name} eq 'end') {
209            delete $state{$job_name}; # free memory
[41]210            $job_todo->down;
[45]211            print "warning: job $job_name already run\n" if $verbose;
[41]212            cede;
[43]213            next JOB;
[41]214            }
215         }
[40]216
[43]217      # take job ressource
[36]218      my $job_ressource = $ressources->get;
[13]219
[43]220      # no more launch job when OAR checkpointing
221      last JOB if $oar_checkpoint->count() > 0;
[39]222
[36]223      my ($node_connect) = split ',', $job_ressource;
[41]224      my $fh = IO::File->new();
[34]225      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
[43]226         or die "error: can't start subjob: $!";
[13]227
228      $fh->autoflush;
229      $fh = unblock $fh;
230
[45]231      my $msg = sprintf "start job %${job_name_maxlen}s / %5i at %s on node %s\n",
232         $job_name, $job_pid, time, $job_ressource;
[43]233      $log_h->print($msg) if $logtrace;
[42]234      print($msg) if $verbose;
[13]235
[41]236      my ($job_stdout, $job_stderr);
[45]237      $job_stdout = ">  $stdout-$job_name.stdout" if $stdout ne '' and $switchio;
238      $job_stderr = "2> $stderr-$job_name.stderr" if $stderr ne '' and $switchio;
[13]239
[82]240      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$ENV{OAR_JOB_ID}-$job_name";
241      my $job_pidfile  = "/tmp/oar-parexec-$ENV{LOGNAME}-$ENV{OAR_JOB_ID}-$job_name.pid";
[34]242
[81]243      $scheduled{$job_pid} = {
244         fh           => $fh,
245         node_connect => $node_connect,
246         ressource    => $job_ressource,
247         name         => $job_name,
248         pidfile      => $job_pidfile,
249         };
250
251      # set job environment, run it and clean
[34]252      if ($job_np > 1) {
[36]253         $fh->print("printf \""
[41]254               . join('\n', split(',', $job_ressource,))
255               . "\" > $job_nodefile\n");
[37]256         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
[34]257         $fh->print("OAR_NP=$job_np\n");
[37]258         $fh->print("export OAR_NODE_FILE\n");
[34]259         $fh->print("export OAR_NP\n");
260         $fh->print("unset OAR_MSG_NODEFILE\n");
261         }
[32]262      $fh->print("cd $current_dir\n");
[83]263      $fh->print("cd $job_folder\n");
[81]264      if ($sig_transmit) {
265         $fh->print("trap 'kill -$sig_checkpoint \$(jobs -p)' $sig_checkpoint\n");
266         $fh->print("echo \$\$ > $job_pidfile\n");
267         $fh->print("$job_cmd $job_stdout $job_stderr &\n");
268         $fh->print("while [ \$(jobs -p | wc -l) -gt 0 ]\n");
269         $fh->print("do\n");
270         $fh->print("   wait\n");
271         $fh->print("done\n");
272         $fh->print("rm -f $job_pidfile\n");
273         }
274      else {
275         $fh->print("$job_cmd $job_stdout $job_stderr\n");
276         }
[34]277      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
[13]278      $fh->print("exit\n");
279      cede;
280      }
281   }
282
[43]283# asynchrone end job block
[13]284async {
285   while () {
[41]286      for my $job_pid (keys %scheduled) {
[82]287         # non blocking PID test
[41]288         if (waitpid($job_pid, WNOHANG)) {
[45]289            my $msg = sprintf "end   job %${job_name_maxlen}s / %5i at %s on node %s\n",
290               $scheduled{$job_pid}->{name},
[42]291               $job_pid, time, $scheduled{$job_pid}->{ressource};
[76]292
293            # Job non finish, just suspend if received checkpoint signal
294            $msg =~ s/^end\s+job/suspend job/
295               if $sig_transmit and $oar_checkpoint->count() > 0;
296
[43]297            $log_h->print($msg) if $logtrace;
[42]298            print($msg) if $verbose;
[13]299            close $scheduled{$job_pid}->{fh};
[43]300            # leave ressources for another job
[41]301            $ressources->put($scheduled{$job_pid}->{ressource});
[13]302            $job_todo->down;
303            delete $scheduled{$job_pid};
304            }
305         cede;
306         }
307
[43]308      # checkpointing ! just finishing running job and quit
[42]309      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
[39]310
[42]311      $finished->send if $job_todo->count() == 0;
[13]312      cede;
313      }
314   }
315
316cede;
317
[43]318# all job have been done
[13]319$finished->wait;
320
[43]321# close log trace file
322$log_h->close() if $logtrace;
[38]323
[13]324__END__
325
326=head1 NAME
327
[44]328oar-parexec - parallel execution of many small job
[13]329
330=head1 SYNOPSIS
331
[47]332 oar-parexec --file filecommand \
333    [--logtrace tracefile] [--verbose] \
334    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
335    [--switchio] [--masterio basefileio]
[46]336
[47]337 oar-parexec --dir foldertoiterate --cmd commandtolaunch \
338    [--logtrace tracefile] [--verbose] \
339    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
340    [--switchio] [--masterio basefileio]
[46]341
[13]342 oar-parexec --help
343
[32]344=head1 DESCRIPTION
345
[44]346C<oar-parexec> can execute lot of small job in parallel inside a cluster.
347Number of parallel job at one time cannot exceed the number of core define in the node file
[32]348C<oar-parexec> is easier to use inside an OAR job environment
[44]349which define automatically these strategics parameters...
350However, it can be used outside OAR.
[32]351
[47]352Option C<--file> or C<--dir> and C<--cmd> are the only mandatory parameters.
[32]353
354Small job will be launch in the same folder as the master job.
[44]355Two environment variable are defined for each small job
[37]356and only in case of parallel small job (option C<--jobnp> > 1).
[32]357
[34]358 OAR_NODE_FILE - file that list node for parallel computing
359 OAR_NP        - number of processor affected
[32]360
[44]361The file define by OAR_NODE_FILE is created  in /tmp
362on the node before launching the small job
363and this file will be delete after job complete.
[34]364C<oar-parexec> is a simple script,
365OAR_NODE_FILE will not be deleted in case of crash of the master job.
366
[37]367OAR define other variable that are equivalent to OAR_NODE_FILE:
368OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
369You can use in your script the OAR original file ressources
370by using these variable if you need it.
[34]371
[82]372
[13]373=head1 OPTIONS
374
[32]375=over 12
[13]376
[47]377=item B<-f|--file filecommand>
[13]378
[32]379File name which content job list.
[45]380For the JOB_NAME definition,
381the first valid job in the list will have the number 1 and so on...
[13]382
[77]383It's possible to fix the name inside a comment on the job line.
384For example:
385
386 $HOME/test/subjob1.sh # name=subjob1
387
388The key C<name> is case insensitive,
389the associated value cannot have a space...
390
[47]391=item B<-d|--dir foldertoiterate>
[45]392
393Command C<--cmd> will be launch in all sub-folder of this master folder.
394Files in this folder will be ignored.
[47]395Sub-folder name which begin with F<.>
396or finish with F<.old>, F<.sav>, F<.bak>, F<.no> will either be ignored...
[45]397
398The JOB_NAME is simply the Sub-folder name.
399
400=item B<-c|--cmd commandtolaunch>
401
402Command (and argument to it) tha will be launch in all sub-folder
403parameter folfer C<--dir>
404
[43]405=item B<-l|--logtrace tracefile>
406
407File which log and trace running job.
[44]408In case of running the same master command (after crash for example),
409only job that are not mark as done will be run again.
410Be careful, job mark as running (start but not finish) will be run again.
[45]411Tracing is base on the JOB_NAME between multiple run.
[43]412
413This option is very usefull in case of crash
414but also for checkpointing and idempotent OAR job.
415
[32]416=item B<-v|--verbose>
[13]417
[34]418=item B<-j|--jobnp integer>
[13]419
[34]420Number of processor to allocated for each small job.
4211 by default.
422
423=item B<-n|--nodefile filenode>
424
[44]425File name that list all the node where job could be launch.
[32]426By defaut, it's define automatically by OAR via
427environment variable C<OAR_NODE_FILE>.
[13]428
[32]429For example, if you want to use 6 core on your cluster node,
430you need to put 6 times the hostname node in this file,
431one per line...
432It's a very common file in MPI process !
[13]433
[46]434=item B<-o|-oarsh command>
[13]435
[46]436Command use to launch a shell on a node.
437By default
[13]438
[46]439 oarsh -q -T
440
441Change it to C<ssh> if you are not using an OAR cluster...
442
[32]443=item B<-s|--switchio>
[21]444
[32]445Each small job will have it's own output STDOUT and STDERR
[45]446base on master OAR job with C<JOB_NAME> inside
[32]447(or base on C<basefileio> if option C<masterio>).
448Example :
[21]449
[45]450 OAR.151524.stdout -> OAR.151524-JOB_NAME.stdout
[21]451
[32]452where 151524 here is the master C<OAR_JOB_ID>
[45]453and C<JOB_NAME> is the small job name.
[21]454
[46]455=item B<-m|--masterio basefileio>
[32]456
[46]457The C<basefileio> will be use in place of environment variable
458C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
459(only use when option C<swithio> is activated).
[32]460
[78]461=item B<-k|--kill signal>
462
463Signal to listen and make a clean stop of the current C<oar-parexec> process.
464By default, use USR2 signal (see C<kill -l>> for a list of possible signal).
465
466=item B<-t|--transmit>
467
468Resend catch signal to sub-job when receiving it.
469By default, no signal is transmis to child process.
470
471It's only valuable if use for long sub-job than can
472in return make themselves a clean restart.
473
474
[32]475=item B<-h|--help>
476
477=back
478
479
480=head1 EXAMPLE
481
[44]482=head2 Simple list of sequential job
483
[47]484Content for the job file command (option C<--file>) could have:
[21]485
[13]486 - empty line
487 - comment line begin with #
488 - valid shell command
489
490Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
491
[77]492 $HOME/test/subjob01.sh  # name=subjob01
493 $HOME/test/subjob02.sh  # name=subjob02
494 $HOME/test/subjob03.sh  # name=subjob03
495 $HOME/test/subjob04.sh  # name=subjob04
[32]496 ...
[77]497 $HOME/test/subjob38.sh  # name=subjob38
498 $HOME/test/subjob39.sh  # name=subjob39
499 $HOME/test/subjob40.sh  # name=subjob40
[13]500
[44]501These jobs could be launch by:
[13]502
[49]503 oarsub -n test -l /core=6,walltime=04:00:00 \
504   "oar-parexec -f ./subjob.list.txt"
[13]505
[47]506=head2 Folder job
507
508In a folder F<subjob.d>, create sub-folder with your data inside : F<test1>, <test2>...
509The same command will be executed in every sub-folder.
510C<oar-parexec> change the current directory to the sub-folder before launching it.
511
512A very simple job could be:
513
[49]514 oarsub -n test -l /core=6,walltime=04:00:00 \
515   "oar-parexec -d ./subjob.d -c 'sleep 10; env'"
[47]516
517The command C<env> will be excuted in all folder F<test1>, F<test2>... after a 10s pause.
518
519Sometime, it's simpler to use file list command,
520sometime, jobs by folder with the same command run is more relevant.
521
[44]522=head2 Parallel job
[28]523
[44]524You need to put the number of core each small job need with option C<--jobnp>.
525If your job is build on OpenMP or MPI,
526you can use OAR_NP and OAR_NODE_FILE variables to configure them.
527On OAR cluster, you need to use C<oarsh> or a wrapper like C<oar-envsh>
528for connexion between node instead of C<ssh>.
529
530Example with parallel small job on 2 core:
531
[49]532 oarsub -n test -l /core=6,walltime=04:00:00 \
533   "oar-parexec -j 2 -f ./subjob.list.txt"
[44]534
535=head2 Tracing and master crash
536
537If the master node crash after hours of calculus, everything is lost ?
538No, with option C<--logtrace>,
539it's possible to remember older result
540and not re-run these job the second and next time.
541
[49]542 oarsub -n test -l /core=6,walltime=04:00:00 \
543   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
[44]544
545After a crash or an C<oardel> command,
546you can then re-run the same command that will end to execute the jobs in the list
547
[49]548 oarsub -n test -l /core=6,walltime=04:00:00 \
549   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
[44]550
551C<logtrace> file are just plain file.
552We use the extension '.log' because these files are automatically
553eliminate from our backup system!
554
555=head2 Checkpointing and Idempotent
556
557C<oar-parexec> is compatible with the OAR checkpointing.
558Il you have 2000 small jobs that need 55h to be done on 6 cores,
559you can cut this in small parts.
560
561For this example, we suppose that each small job need about 10min...
562So, we send a checkpoint 12min before the end of the process
563to let C<oar-parexec> finish the jobs started.
564After being checkpointed, C<oar-parexec> do not start any new small job.
565
[49]566 oarsub -t idempotent -n test \
567   -l /core=6,walltime=04:00:00 \
568   --checkpoint 720 \
[44]569   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
570
571After 3h48min, the OAR job will begin to stop launching new small job.
572When all running small job are finished, it's exit.
573But as the OAR job is type C<idempotent>,
574OAR will re-submit it as long as all small job are not executed...
575
576This way, we let other users a chance to use the cluster!
577
578In this last exemple, we use moldable OAR job with idempotent
579to reserve many core for a small time or a few cores for a long time:
580
581 oarsub -t idempotent -n test \
582   -l /core=50,walltime=01:05:00 \
583   -l /core=6,walltime=04:00:00 \
584   --checkpoint 720 \
585   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
586
[78]587=head2 Signal, recurse and long job
[44]588
[78]589By default, OAR use signal USR2 for checkpointing.
[79]590It's possible to change this with option C<--kill>.
[78]591
592When use with long small job, checkpointing could be too long...
[79]593More than walltime!
594The option C<--transmit> could be use to checkpoint small job!
595These long small job will then stop cleanly and will be restarted next time.
[78]596
597In the C<logtrace> file, small job will have the status suspend.
[79]598They will be launch with the same command line at the next OAR run.
[78]599
[21]600=head1 SEE ALSO
601
[44]602oar-dispatch, mpilauncher,
603orsh, oar-envsh, ssh
[21]604
605
[13]606=head1 AUTHORS
607
[21]608Written by Gabriel Moreau, Grenoble - France
[13]609
[21]610
611=head1 LICENSE AND COPYRIGHT
612
613GPL version 2 or later and Perl equivalent
614
[28]615Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
[21]616
Note: See TracBrowser for help on using the repository browser.