source: trunk/oarutils/oar-parexec @ 49

Last change on this file since 49 was 49, checked in by g7moreau, 12 years ago
  • General makefile width install and update
  • Cut long line
File size: 14.4 KB
RevLine 
[13]1#!/usr/bin/perl
2#
3# 2011/11/27 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
[32]16use Cwd qw( getcwd );
[13]17
[47]18my $file  = '';
[45]19my $dir      = '';
20my $cmd      = '';
[43]21my $logtrace = '';
[13]22my $verbose;
[34]23my $job_np = 1;
[32]24my $nodefile = $ENV{OAR_NODE_FILE} || '';
25my $masterio;
[13]26my $switchio;
27my $help;
28my $oarsh = 'oarsh -q -T';
29
30Getopt::Long::GetOptions(
[47]31   'file=s'     => \$file,
[45]32   'dir=s'      => \$dir,
33   'cmd=s'      => \$cmd,
[43]34   'logtrace=s' => \$logtrace,
[32]35   'verbose'    => \$verbose,
36   'help'       => \$help,
37   'oarsh=s'    => \$oarsh,
[34]38   'jobnp=i'    => \$job_np,
[32]39   'nodefile=s' => \$nodefile,
40   'masterio=s' => \$masterio,
41   'switchio'   => \$switchio,
[41]42   ) || pod2usage(-verbose => 0);
43pod2usage(-verbose => 2) if $help;
[45]44pod2usage(-verbose => 2) if not (
[47]45 (-e "$file")
[45]46 or (-d "$dir" and $cmd ne '')
47 );
[13]48
[43]49# re-run, keep trace of job already done
[38]50my %state;
51my $log_h = IO::File->new();
[45]52if (-e "$logtrace") {
[43]53   $log_h->open("< $logtrace")
54      or die "error: can't read log file: $!";
[38]55   while (<$log_h>) {
[45]56      $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/;
57      $state{$1} = 'end'   if m/^end\s+job\s+([^\s]+)\s/;
[41]58      }
[38]59   $log_h->close();
60   }
[43]61if ($logtrace) {
62   $log_h->open(">> $logtrace")
63      or die "error: can't append log file $logtrace: $!";
[40]64   $log_h->autoflush;
[38]65   $log_h = unblock $log_h;
66   }
67
[43]68# job to run
[13]69my @job = ();
[47]70if (-e "$file") {
[45]71   my $job_num = 0;
[47]72   open(JOB_LIST, '<', "$file") or die "error: can't open job file $file: $!";
[45]73   while (<JOB_LIST>) {
74      chomp;
75      next if m/^#/;
76      next if m/^\s*$/;
77      $job_num++;
78      push @job, { name => $job_num, cmd => "$_" };
79      }
80   close JOB_LIST;
[13]81   }
[45]82else {
83   opendir(DIR, $dir) or die "error: can't open folder $dir: $!";
84   while (my $item = readdir(DIR)) {
85      next if $item =~ m/^\./;
86      next if $item =~ m/:/;
87      next if $item =~ m/\.old$/;
88      next if $item =~ m/\.sav$/;
89      next if $item =~ m/\.bak$/;
90      next if $item =~ m/\.no$/;
91      next unless (-d "$dir/$item");
92      push @job, { name => $item, cmd => "( cd $dir/$item/; $cmd )" };
93      }
94   closedir DIR;
95   }
[13]96
[43]97# ressources available
[34]98my @ressources = ();
[41]99open(NODE_FILE, '<', "$nodefile")
[34]100   or die "can't open $nodefile: $!";
101while (<NODE_FILE>) {
102   chomp;
103   next if m/^#/;
104   next if m/^\s*$/;
[41]105   push @ressources, $_;
[34]106   }
107close NODE_FILE;
108
109my $ressource_size = scalar(@ressources);
[43]110die "error: not enought ressources jobnp $job_np > ressources $ressource_size"
[41]111   if $job_np > $ressource_size;
[34]112
113my $current_dir = getcwd();
114
[32]115my $stderr = $ENV{OAR_STDERR} || '';
[13]116$stderr =~ s/\.stderr$//;
[32]117$stderr = $masterio if $masterio;
118my $stdout = $ENV{OAR_STDOUT} || '';
[13]119$stdout =~ s/\.stdout$//;
[32]120$stdout = $masterio if $masterio;
[13]121
122my $finished = new Coro::Signal;
123my $job_todo = new Coro::Semaphore 0;
[45]124my $job_name_maxlen;
125for (@job) {
126   $job_todo->up;
127   $job_name_maxlen = length($_->{name}) if length($_->{name}) > $job_name_maxlen;
128   }
[13]129
[43]130# slice of ressources for parallel job
[13]131my $ressources = new Coro::Channel;
[34]132for my $slot (1 .. int($ressource_size / $job_np)) {
[41]133   $ressources->put(
134      join(',',
135         @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ])
136         );
[13]137   }
138
139my %scheduled = ();
140
[43]141# OAR checkpoint and default signal SIGUSR2
[39]142my $oar_checkpoint = new Coro::Semaphore 0;
[42]143$SIG{USR2} = sub {
144   print "warning: receive checkpoint at "
145      . time
146      . ", no new job, just finishing running job\n"
147      if $verbose;
148   $oar_checkpoint->up();
149   };
[39]150
[43]151# asynchrone start job block
[13]152async {
[43]153        JOB:
[13]154   for my $job (@job) {
[45]155      my $job_name = $job->{name};
156      my $job_cmd  = $job->{cmd};
[38]157
[43]158      # job has been already run ?
[45]159      if (exists $state{$job_name}) {
160         if ($state{$job_name} eq 'start') {
161            print "warning: job $job_name was not clearly finished, relaunching...\n"
[41]162               if $verbose;
163            }
[45]164         elsif ($state{$job_name} eq 'end') {
165            delete $state{$job_name}; # free memory
[41]166            $job_todo->down;
[45]167            print "warning: job $job_name already run\n" if $verbose;
[41]168            cede;
[43]169            next JOB;
[41]170            }
171         }
[40]172
[43]173      # take job ressource
[36]174      my $job_ressource = $ressources->get;
[13]175
[43]176      # no more launch job when OAR checkpointing
177      last JOB if $oar_checkpoint->count() > 0;
[39]178
[36]179      my ($node_connect) = split ',', $job_ressource;
[41]180      my $fh = IO::File->new();
[34]181      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
[43]182         or die "error: can't start subjob: $!";
[13]183
184      $fh->autoflush;
185      $fh = unblock $fh;
186
[41]187      $scheduled{$job_pid} = {
188         fh           => $fh,
189         node_connect => $node_connect,
190         ressource    => $job_ressource,
[45]191         name         => $job_name
[41]192         };
[13]193
[45]194      my $msg = sprintf "start job %${job_name_maxlen}s / %5i at %s on node %s\n",
195         $job_name, $job_pid, time, $job_ressource;
[43]196      $log_h->print($msg) if $logtrace;
[42]197      print($msg) if $verbose;
[13]198
[41]199      my ($job_stdout, $job_stderr);
[45]200      $job_stdout = ">  $stdout-$job_name.stdout" if $stdout ne '' and $switchio;
201      $job_stderr = "2> $stderr-$job_name.stderr" if $stderr ne '' and $switchio;
[13]202
[45]203      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$job_name";
[34]204
[43]205     # set job environment, run it and clean
[34]206      if ($job_np > 1) {
[36]207         $fh->print("printf \""
[41]208               . join('\n', split(',', $job_ressource,))
209               . "\" > $job_nodefile\n");
[37]210         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
[34]211         $fh->print("OAR_NP=$job_np\n");
[37]212         $fh->print("export OAR_NODE_FILE\n");
[34]213         $fh->print("export OAR_NP\n");
214         $fh->print("unset OAR_MSG_NODEFILE\n");
215         }
[32]216      $fh->print("cd $current_dir\n");
[45]217      $fh->print("$job_cmd $job_stdout $job_stderr\n");
[34]218      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
[13]219      $fh->print("exit\n");
220      cede;
221      }
222   }
223
[43]224# asynchrone end job block
[13]225async {
226   while () {
[41]227      for my $job_pid (keys %scheduled) {
[43]228                        # non blocking PID test
[41]229         if (waitpid($job_pid, WNOHANG)) {
[45]230            my $msg = sprintf "end   job %${job_name_maxlen}s / %5i at %s on node %s\n",
231               $scheduled{$job_pid}->{name},
[42]232               $job_pid, time, $scheduled{$job_pid}->{ressource};
[43]233            $log_h->print($msg) if $logtrace;
[42]234            print($msg) if $verbose;
[13]235            close $scheduled{$job_pid}->{fh};
[43]236            # leave ressources for another job
[41]237            $ressources->put($scheduled{$job_pid}->{ressource});
[13]238            $job_todo->down;
239            delete $scheduled{$job_pid};
240            }
241         cede;
242         }
243
[43]244      # checkpointing ! just finishing running job and quit
[42]245      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
[39]246
[42]247      $finished->send if $job_todo->count() == 0;
[13]248      cede;
249      }
250   }
251
252cede;
253
[43]254# all job have been done
[13]255$finished->wait;
256
[43]257# close log trace file
258$log_h->close() if $logtrace;
[38]259
[13]260__END__
261
262=head1 NAME
263
[44]264oar-parexec - parallel execution of many small job
[13]265
266=head1 SYNOPSIS
267
[47]268 oar-parexec --file filecommand \
269    [--logtrace tracefile] [--verbose] \
270    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
271    [--switchio] [--masterio basefileio]
[46]272
[47]273 oar-parexec --dir foldertoiterate --cmd commandtolaunch \
274    [--logtrace tracefile] [--verbose] \
275    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
276    [--switchio] [--masterio basefileio]
[46]277
[13]278 oar-parexec --help
279
[32]280=head1 DESCRIPTION
281
[44]282C<oar-parexec> can execute lot of small job in parallel inside a cluster.
283Number of parallel job at one time cannot exceed the number of core define in the node file
[32]284C<oar-parexec> is easier to use inside an OAR job environment
[44]285which define automatically these strategics parameters...
286However, it can be used outside OAR.
[32]287
[47]288Option C<--file> or C<--dir> and C<--cmd> are the only mandatory parameters.
[32]289
290Small job will be launch in the same folder as the master job.
[44]291Two environment variable are defined for each small job
[37]292and only in case of parallel small job (option C<--jobnp> > 1).
[32]293
[34]294 OAR_NODE_FILE - file that list node for parallel computing
295 OAR_NP        - number of processor affected
[32]296
[44]297The file define by OAR_NODE_FILE is created  in /tmp
298on the node before launching the small job
299and this file will be delete after job complete.
[34]300C<oar-parexec> is a simple script,
301OAR_NODE_FILE will not be deleted in case of crash of the master job.
302
[37]303OAR define other variable that are equivalent to OAR_NODE_FILE:
304OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
305You can use in your script the OAR original file ressources
306by using these variable if you need it.
307 
[34]308
[13]309=head1 OPTIONS
310
[32]311=over 12
[13]312
[47]313=item B<-f|--file filecommand>
[13]314
[32]315File name which content job list.
[45]316For the JOB_NAME definition,
317the first valid job in the list will have the number 1 and so on...
[13]318
[47]319=item B<-d|--dir foldertoiterate>
[45]320
321Command C<--cmd> will be launch in all sub-folder of this master folder.
322Files in this folder will be ignored.
[47]323Sub-folder name which begin with F<.>
324or finish with F<.old>, F<.sav>, F<.bak>, F<.no> will either be ignored...
[45]325
326The JOB_NAME is simply the Sub-folder name.
327
328=item B<-c|--cmd commandtolaunch>
329
330Command (and argument to it) tha will be launch in all sub-folder
331parameter folfer C<--dir>
332
[43]333=item B<-l|--logtrace tracefile>
334
335File which log and trace running job.
[44]336In case of running the same master command (after crash for example),
337only job that are not mark as done will be run again.
338Be careful, job mark as running (start but not finish) will be run again.
[45]339Tracing is base on the JOB_NAME between multiple run.
[43]340
341This option is very usefull in case of crash
342but also for checkpointing and idempotent OAR job.
343
[32]344=item B<-v|--verbose>
[13]345
[34]346=item B<-j|--jobnp integer>
[13]347
[34]348Number of processor to allocated for each small job.
3491 by default.
350
351=item B<-n|--nodefile filenode>
352
[44]353File name that list all the node where job could be launch.
[32]354By defaut, it's define automatically by OAR via
355environment variable C<OAR_NODE_FILE>.
[13]356
[32]357For example, if you want to use 6 core on your cluster node,
358you need to put 6 times the hostname node in this file,
359one per line...
360It's a very common file in MPI process !
[13]361
[46]362=item B<-o|-oarsh command>
[13]363
[46]364Command use to launch a shell on a node.
365By default
[13]366
[46]367 oarsh -q -T
368
369Change it to C<ssh> if you are not using an OAR cluster...
370
[32]371=item B<-s|--switchio>
[21]372
[32]373Each small job will have it's own output STDOUT and STDERR
[45]374base on master OAR job with C<JOB_NAME> inside
[32]375(or base on C<basefileio> if option C<masterio>).
376Example :
[21]377
[45]378 OAR.151524.stdout -> OAR.151524-JOB_NAME.stdout
[21]379
[32]380where 151524 here is the master C<OAR_JOB_ID>
[45]381and C<JOB_NAME> is the small job name.
[21]382
[46]383=item B<-m|--masterio basefileio>
[32]384
[46]385The C<basefileio> will be use in place of environment variable
386C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
387(only use when option C<swithio> is activated).
[32]388
389=item B<-h|--help>
390
391=back
392
393
394=head1 EXAMPLE
395
[44]396=head2 Simple list of sequential job
397
[47]398Content for the job file command (option C<--file>) could have:
[21]399
[13]400 - empty line
401 - comment line begin with #
402 - valid shell command
403
404Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
405
406 $HOME/test/subjob1.sh
407 $HOME/test/subjob2.sh
408 $HOME/test/subjob3.sh
409 $HOME/test/subjob4.sh
[32]410 ...
[13]411 $HOME/test/subjob38.sh
412 $HOME/test/subjob39.sh
413 $HOME/test/subjob40.sh
414
[44]415These jobs could be launch by:
[13]416
[49]417 oarsub -n test -l /core=6,walltime=04:00:00 \
418   "oar-parexec -f ./subjob.list.txt"
[13]419
[47]420=head2 Folder job
421
422In a folder F<subjob.d>, create sub-folder with your data inside : F<test1>, <test2>...
423The same command will be executed in every sub-folder.
424C<oar-parexec> change the current directory to the sub-folder before launching it.
425
426A very simple job could be:
427
[49]428 oarsub -n test -l /core=6,walltime=04:00:00 \
429   "oar-parexec -d ./subjob.d -c 'sleep 10; env'"
[47]430
431The command C<env> will be excuted in all folder F<test1>, F<test2>... after a 10s pause.
432
433Sometime, it's simpler to use file list command,
434sometime, jobs by folder with the same command run is more relevant.
435
[44]436=head2 Parallel job
[28]437
[44]438You need to put the number of core each small job need with option C<--jobnp>.
439If your job is build on OpenMP or MPI,
440you can use OAR_NP and OAR_NODE_FILE variables to configure them.
441On OAR cluster, you need to use C<oarsh> or a wrapper like C<oar-envsh>
442for connexion between node instead of C<ssh>.
443
444Example with parallel small job on 2 core:
445
[49]446 oarsub -n test -l /core=6,walltime=04:00:00 \
447   "oar-parexec -j 2 -f ./subjob.list.txt"
[44]448
449=head2 Tracing and master crash
450
451If the master node crash after hours of calculus, everything is lost ?
452No, with option C<--logtrace>,
453it's possible to remember older result
454and not re-run these job the second and next time.
455
[49]456 oarsub -n test -l /core=6,walltime=04:00:00 \
457   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
[44]458
459After a crash or an C<oardel> command,
460you can then re-run the same command that will end to execute the jobs in the list
461
[49]462 oarsub -n test -l /core=6,walltime=04:00:00 \
463   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
[44]464
465C<logtrace> file are just plain file.
466We use the extension '.log' because these files are automatically
467eliminate from our backup system!
468
469=head2 Checkpointing and Idempotent
470
471C<oar-parexec> is compatible with the OAR checkpointing.
472Il you have 2000 small jobs that need 55h to be done on 6 cores,
473you can cut this in small parts.
474
475For this example, we suppose that each small job need about 10min...
476So, we send a checkpoint 12min before the end of the process
477to let C<oar-parexec> finish the jobs started.
478After being checkpointed, C<oar-parexec> do not start any new small job.
479
[49]480 oarsub -t idempotent -n test \
481   -l /core=6,walltime=04:00:00 \
482   --checkpoint 720 \
[44]483   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
484
485After 3h48min, the OAR job will begin to stop launching new small job.
486When all running small job are finished, it's exit.
487But as the OAR job is type C<idempotent>,
488OAR will re-submit it as long as all small job are not executed...
489
490This way, we let other users a chance to use the cluster!
491
492In this last exemple, we use moldable OAR job with idempotent
493to reserve many core for a small time or a few cores for a long time:
494
495 oarsub -t idempotent -n test \
496   -l /core=50,walltime=01:05:00 \
497   -l /core=6,walltime=04:00:00 \
498   --checkpoint 720 \
499   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
500
501
[21]502=head1 SEE ALSO
503
[44]504oar-dispatch, mpilauncher,
505orsh, oar-envsh, ssh
[21]506
507
[13]508=head1 AUTHORS
509
[21]510Written by Gabriel Moreau, Grenoble - France
[13]511
[21]512
513=head1 LICENSE AND COPYRIGHT
514
515GPL version 2 or later and Perl equivalent
516
[28]517Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
[21]518
Note: See TracBrowser for help on using the repository browser.