source: trunk/oarutils/oar-parexec @ 43

Last change on this file since 43 was 43, checked in by g7moreau, 12 years ago
  • --file -> --filecmd
  • --logfile -> --logtrace
  • Comment code
  • Begin man for option --logtrace
File size: 9.7 KB
RevLine 
[13]1#!/usr/bin/perl
2#
3# 2011/11/27 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
[32]16use Cwd qw( getcwd );
[13]17
[43]18my $filecmd  = '';
19my $logtrace = '';
[13]20my $verbose;
[34]21my $job_np = 1;
[32]22my $nodefile = $ENV{OAR_NODE_FILE} || '';
23my $masterio;
[13]24my $switchio;
25my $help;
26my $oarsh = 'oarsh -q -T';
27
28Getopt::Long::GetOptions(
[43]29   'filecmd=s'  => \$filecmd,
30   'logtrace=s' => \$logtrace,
[32]31   'verbose'    => \$verbose,
32   'help'       => \$help,
33   'oarsh=s'    => \$oarsh,
[34]34   'jobnp=i'    => \$job_np,
[32]35   'nodefile=s' => \$nodefile,
36   'masterio=s' => \$masterio,
37   'switchio'   => \$switchio,
[41]38   ) || pod2usage(-verbose => 0);
39pod2usage(-verbose => 2) if $help;
[43]40pod2usage(-verbose => 2) if not -e $filecmd;
[13]41
[43]42# re-run, keep trace of job already done
[38]43my %state;
44my $log_h = IO::File->new();
[43]45if (-e $logtrace) {
46   $log_h->open("< $logtrace")
47      or die "error: can't read log file: $!";
[38]48   while (<$log_h>) {
[41]49      $state{$1} = 'start' if m/^start\s+job\s+(\d+)\s/;
50      $state{$1} = 'end'   if m/^end\s+job\s+(\d+)\s/;
51      }
[38]52   $log_h->close();
53   }
[43]54if ($logtrace) {
55   $log_h->open(">> $logtrace")
56      or die "error: can't append log file $logtrace: $!";
[40]57   $log_h->autoflush;
[38]58   $log_h = unblock $log_h;
59   }
60
[43]61# job to run
[13]62my @job = ();
[43]63open(JOB_LIST, '<', "$filecmd") or die "error: can't open job file $filecmd: $!";
[13]64while (<JOB_LIST>) {
65   chomp;
66   next if m/^#/;
[32]67   next if m/^\s*$/;
[41]68   push @job, $_;
[13]69   }
70close JOB_LIST;
71
[43]72# ressources available
[34]73my @ressources = ();
[41]74open(NODE_FILE, '<', "$nodefile")
[34]75   or die "can't open $nodefile: $!";
76while (<NODE_FILE>) {
77   chomp;
78   next if m/^#/;
79   next if m/^\s*$/;
[41]80   push @ressources, $_;
[34]81   }
82close NODE_FILE;
83
84my $ressource_size = scalar(@ressources);
[43]85die "error: not enought ressources jobnp $job_np > ressources $ressource_size"
[41]86   if $job_np > $ressource_size;
[34]87
88my $current_dir = getcwd();
89
[32]90my $stderr = $ENV{OAR_STDERR} || '';
[13]91$stderr =~ s/\.stderr$//;
[32]92$stderr = $masterio if $masterio;
93my $stdout = $ENV{OAR_STDOUT} || '';
[13]94$stdout =~ s/\.stdout$//;
[32]95$stdout = $masterio if $masterio;
[13]96
97my $finished = new Coro::Signal;
98my $job_todo = new Coro::Semaphore 0;
99$job_todo->up for (@job);
100
[43]101# slice of ressources for parallel job
[13]102my $ressources = new Coro::Channel;
[34]103for my $slot (1 .. int($ressource_size / $job_np)) {
[41]104   $ressources->put(
105      join(',',
106         @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ])
107         );
[13]108   }
109
110my $job_num   = 0;
111my %scheduled = ();
112
[43]113# OAR checkpoint and default signal SIGUSR2
[39]114my $oar_checkpoint = new Coro::Semaphore 0;
[42]115$SIG{USR2} = sub {
116   print "warning: receive checkpoint at "
117      . time
118      . ", no new job, just finishing running job\n"
119      if $verbose;
120   $oar_checkpoint->up();
121   };
[39]122
[43]123# asynchrone start job block
[13]124async {
[43]125        JOB:
[13]126   for my $job (@job) {
[38]127      $job_num++;
128
[43]129      # job has been already run ?
[40]130      if (exists $state{$job_num}) {
[41]131         if ($state{$job_num} eq 'start') {
[43]132            print "warning: job $job_num was not clearly finished, relaunching...\n"
[41]133               if $verbose;
134            }
[40]135         elsif ($state{$job_num} eq 'end') {
[43]136            delete $state{$job_num}; # free memory
[41]137            $job_todo->down;
[43]138            print "warning: job $job_num already run\n" if $verbose;
[41]139            cede;
[43]140            next JOB;
[41]141            }
142         }
[40]143
[43]144      # take job ressource
[36]145      my $job_ressource = $ressources->get;
[13]146
[43]147      # no more launch job when OAR checkpointing
148      last JOB if $oar_checkpoint->count() > 0;
[39]149
[36]150      my ($node_connect) = split ',', $job_ressource;
[41]151      my $fh = IO::File->new();
[34]152      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
[43]153         or die "error: can't start subjob: $!";
[13]154
155      $fh->autoflush;
156      $fh = unblock $fh;
157
[41]158      $scheduled{$job_pid} = {
159         fh           => $fh,
160         node_connect => $node_connect,
161         ressource    => $job_ressource,
162         num          => $job_num
163         };
[13]164
[42]165      my $msg = sprintf "start job %5i / %5i at %s on node %s\n",
166         $job_num, $job_pid, time, $job_ressource;
[43]167      $log_h->print($msg) if $logtrace;
[42]168      print($msg) if $verbose;
[13]169
[41]170      my ($job_stdout, $job_stderr);
[13]171      $job_stdout = ">  $stdout-$job_num.stdout" if $stdout ne '' and $switchio;
172      $job_stderr = "2> $stderr-$job_num.stderr" if $stderr ne '' and $switchio;
173
[36]174      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$job_num";
[34]175
[43]176     # set job environment, run it and clean
[34]177      if ($job_np > 1) {
[36]178         $fh->print("printf \""
[41]179               . join('\n', split(',', $job_ressource,))
180               . "\" > $job_nodefile\n");
[37]181         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
[34]182         $fh->print("OAR_NP=$job_np\n");
[37]183         $fh->print("export OAR_NODE_FILE\n");
[34]184         $fh->print("export OAR_NP\n");
185         $fh->print("unset OAR_MSG_NODEFILE\n");
186         }
[32]187      $fh->print("cd $current_dir\n");
[13]188      $fh->print("$job $job_stdout $job_stderr\n");
[34]189      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
[13]190      $fh->print("exit\n");
191      cede;
192      }
193   }
194
[43]195# asynchrone end job block
[13]196async {
197   while () {
[41]198      for my $job_pid (keys %scheduled) {
[43]199                        # non blocking PID test
[41]200         if (waitpid($job_pid, WNOHANG)) {
[42]201            my $msg = sprintf "end   job %5i / %5i at %s on node %s\n",
[13]202               $scheduled{$job_pid}->{num},
[42]203               $job_pid, time, $scheduled{$job_pid}->{ressource};
[43]204            $log_h->print($msg) if $logtrace;
[42]205            print($msg) if $verbose;
[13]206            close $scheduled{$job_pid}->{fh};
[43]207            # leave ressources for another job
[41]208            $ressources->put($scheduled{$job_pid}->{ressource});
[13]209            $job_todo->down;
210            delete $scheduled{$job_pid};
211            }
212         cede;
213         }
214
[43]215      # checkpointing ! just finishing running job and quit
[42]216      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
[39]217
[42]218      $finished->send if $job_todo->count() == 0;
[13]219      cede;
220      }
221   }
222
223cede;
224
[43]225# all job have been done
[13]226$finished->wait;
227
[43]228# close log trace file
229$log_h->close() if $logtrace;
[38]230
[13]231__END__
232
233=head1 NAME
234
235oar-parexec - parallel execute lot of small job
236
237=head1 SYNOPSIS
238
[43]239 oar-parexec --filecmd filecommand [--logtrace tracefile] [--verbose] [--jobnp integer] [--nodefile filenode] [--masterio basefileio] [--switchio] [--oarsh sssh]
[13]240 oar-parexec --help
241
[32]242=head1 DESCRIPTION
243
244C<oar-parexec> execute lot of small job.in parallel inside a cluster.
245Number of parallel job at one time cannot excede core number in the node file.
246C<oar-parexec> is easier to use inside an OAR job environment
247which define automatically theses strategics parameters...
248
[43]249Option C<--filecmd> is the only mandatory one.
[32]250
251Small job will be launch in the same folder as the master job.
[34]252Two environment variable are define for each small job
[37]253and only in case of parallel small job (option C<--jobnp> > 1).
[32]254
[34]255 OAR_NODE_FILE - file that list node for parallel computing
256 OAR_NP        - number of processor affected
[32]257
[34]258The file define by OAR_NODE_FILE is created on the node before launching
259the small job in /tmp and will be delete after...
260C<oar-parexec> is a simple script,
261OAR_NODE_FILE will not be deleted in case of crash of the master job.
262
[37]263OAR define other variable that are equivalent to OAR_NODE_FILE:
264OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
265You can use in your script the OAR original file ressources
266by using these variable if you need it.
267 
[34]268
[13]269=head1 OPTIONS
270
[32]271=over 12
[13]272
[43]273=item B<-f|--filecmd    filecommand>
[13]274
[32]275File name which content job list.
[13]276
[43]277=item B<-l|--logtrace tracefile>
278
279File which log and trace running job.
280In case of running the same command (after crash for example),
281only job that ar not mark as done will be run again.
282Be carefful, job mark as running (start but for finish) will be run again.
283
284This option is very usefull in case of crash
285but also for checkpointing and idempotent OAR job.
286
[32]287=item B<-v|--verbose>
[13]288
[34]289=item B<-j|--jobnp integer>
[13]290
[34]291Number of processor to allocated for each small job.
2921 by default.
293
294=item B<-n|--nodefile filenode>
295
[32]296File name that list all the node to launch job.
297By defaut, it's define automatically by OAR via
298environment variable C<OAR_NODE_FILE>.
[13]299
[32]300For example, if you want to use 6 core on your cluster node,
301you need to put 6 times the hostname node in this file,
302one per line...
303It's a very common file in MPI process !
[13]304
[32]305=item B<-m|--masterio basefileio>
[13]306
[32]307The C<basefileio> will be use in place of environment variable
308C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
[37]309(only use when option C<swithio> is activated).
[13]310
[32]311=item B<-s|--switchio>
[21]312
[32]313Each small job will have it's own output STDOUT and STDERR
314base on master OAR job with C<JOB_NUM> inside
315(or base on C<basefileio> if option C<masterio>).
316Example :
[21]317
[32]318 OAR.151524.stdout -> OAR.151524-JOB_NUM.stdout
[21]319
[32]320where 151524 here is the master C<OAR_JOB_ID>
321and C<JOB_NUM> is the small job nnumber.
[21]322
[32]323=item B<-o|-oarsh command>
324
325Command use to launch a shell on a node.
326By default
327
328        oarsh -q -T
329
330=item B<-h|--help>
331
332=back
333
334
335=head1 EXAMPLE
336
[43]337Content for the job file command (option C<--filecmd>) could have:
[21]338
[13]339 - empty line
340 - comment line begin with #
341 - valid shell command
342
343Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
344
345 $HOME/test/subjob1.sh
346 $HOME/test/subjob2.sh
347 $HOME/test/subjob3.sh
348 $HOME/test/subjob4.sh
[32]349 ...
[13]350 $HOME/test/subjob38.sh
351 $HOME/test/subjob39.sh
352 $HOME/test/subjob40.sh
353
354These jobs could be launch by
355
356 oarsub -n test -l /core=6,walltime=00:35:00 "oar-parexec -f ./subjob.list.txt"
357
[28]358
[21]359=head1 SEE ALSO
360
361oar-dispatch, mpilauncher
362
363
[13]364=head1 AUTHORS
365
[21]366Written by Gabriel Moreau, Grenoble - France
[13]367
[21]368
369=head1 LICENSE AND COPYRIGHT
370
371GPL version 2 or later and Perl equivalent
372
[28]373Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
[21]374
Note: See TracBrowser for help on using the repository browser.