source: trunk/oarutils/oar-parexec @ 44

Last change on this file since 44 was 44, checked in by g7moreau, 12 years ago
  • doc doc and doc!
File size: 12.2 KB
Line 
1#!/usr/bin/perl
2#
3# 2011/11/27 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
16use Cwd qw( getcwd );
17
18my $filecmd  = '';
19my $logtrace = '';
20my $verbose;
21my $job_np = 1;
22my $nodefile = $ENV{OAR_NODE_FILE} || '';
23my $masterio;
24my $switchio;
25my $help;
26my $oarsh = 'oarsh -q -T';
27
28Getopt::Long::GetOptions(
29   'filecmd=s'  => \$filecmd,
30   'logtrace=s' => \$logtrace,
31   'verbose'    => \$verbose,
32   'help'       => \$help,
33   'oarsh=s'    => \$oarsh,
34   'jobnp=i'    => \$job_np,
35   'nodefile=s' => \$nodefile,
36   'masterio=s' => \$masterio,
37   'switchio'   => \$switchio,
38   ) || pod2usage(-verbose => 0);
39pod2usage(-verbose => 2) if $help;
40pod2usage(-verbose => 2) if not -e $filecmd;
41
42# re-run, keep trace of job already done
43my %state;
44my $log_h = IO::File->new();
45if (-e $logtrace) {
46   $log_h->open("< $logtrace")
47      or die "error: can't read log file: $!";
48   while (<$log_h>) {
49      $state{$1} = 'start' if m/^start\s+job\s+(\d+)\s/;
50      $state{$1} = 'end'   if m/^end\s+job\s+(\d+)\s/;
51      }
52   $log_h->close();
53   }
54if ($logtrace) {
55   $log_h->open(">> $logtrace")
56      or die "error: can't append log file $logtrace: $!";
57   $log_h->autoflush;
58   $log_h = unblock $log_h;
59   }
60
61# job to run
62my @job = ();
63open(JOB_LIST, '<', "$filecmd") or die "error: can't open job file $filecmd: $!";
64while (<JOB_LIST>) {
65   chomp;
66   next if m/^#/;
67   next if m/^\s*$/;
68   push @job, $_;
69   }
70close JOB_LIST;
71
72# ressources available
73my @ressources = ();
74open(NODE_FILE, '<', "$nodefile")
75   or die "can't open $nodefile: $!";
76while (<NODE_FILE>) {
77   chomp;
78   next if m/^#/;
79   next if m/^\s*$/;
80   push @ressources, $_;
81   }
82close NODE_FILE;
83
84my $ressource_size = scalar(@ressources);
85die "error: not enought ressources jobnp $job_np > ressources $ressource_size"
86   if $job_np > $ressource_size;
87
88my $current_dir = getcwd();
89
90my $stderr = $ENV{OAR_STDERR} || '';
91$stderr =~ s/\.stderr$//;
92$stderr = $masterio if $masterio;
93my $stdout = $ENV{OAR_STDOUT} || '';
94$stdout =~ s/\.stdout$//;
95$stdout = $masterio if $masterio;
96
97my $finished = new Coro::Signal;
98my $job_todo = new Coro::Semaphore 0;
99$job_todo->up for (@job);
100
101# slice of ressources for parallel job
102my $ressources = new Coro::Channel;
103for my $slot (1 .. int($ressource_size / $job_np)) {
104   $ressources->put(
105      join(',',
106         @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ])
107         );
108   }
109
110my $job_num   = 0;
111my %scheduled = ();
112
113# OAR checkpoint and default signal SIGUSR2
114my $oar_checkpoint = new Coro::Semaphore 0;
115$SIG{USR2} = sub {
116   print "warning: receive checkpoint at "
117      . time
118      . ", no new job, just finishing running job\n"
119      if $verbose;
120   $oar_checkpoint->up();
121   };
122
123# asynchrone start job block
124async {
125        JOB:
126   for my $job (@job) {
127      $job_num++;
128
129      # job has been already run ?
130      if (exists $state{$job_num}) {
131         if ($state{$job_num} eq 'start') {
132            print "warning: job $job_num was not clearly finished, relaunching...\n"
133               if $verbose;
134            }
135         elsif ($state{$job_num} eq 'end') {
136            delete $state{$job_num}; # free memory
137            $job_todo->down;
138            print "warning: job $job_num already run\n" if $verbose;
139            cede;
140            next JOB;
141            }
142         }
143
144      # take job ressource
145      my $job_ressource = $ressources->get;
146
147      # no more launch job when OAR checkpointing
148      last JOB if $oar_checkpoint->count() > 0;
149
150      my ($node_connect) = split ',', $job_ressource;
151      my $fh = IO::File->new();
152      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
153         or die "error: can't start subjob: $!";
154
155      $fh->autoflush;
156      $fh = unblock $fh;
157
158      $scheduled{$job_pid} = {
159         fh           => $fh,
160         node_connect => $node_connect,
161         ressource    => $job_ressource,
162         num          => $job_num
163         };
164
165      my $msg = sprintf "start job %5i / %5i at %s on node %s\n",
166         $job_num, $job_pid, time, $job_ressource;
167      $log_h->print($msg) if $logtrace;
168      print($msg) if $verbose;
169
170      my ($job_stdout, $job_stderr);
171      $job_stdout = ">  $stdout-$job_num.stdout" if $stdout ne '' and $switchio;
172      $job_stderr = "2> $stderr-$job_num.stderr" if $stderr ne '' and $switchio;
173
174      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$job_num";
175
176     # set job environment, run it and clean
177      if ($job_np > 1) {
178         $fh->print("printf \""
179               . join('\n', split(',', $job_ressource,))
180               . "\" > $job_nodefile\n");
181         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
182         $fh->print("OAR_NP=$job_np\n");
183         $fh->print("export OAR_NODE_FILE\n");
184         $fh->print("export OAR_NP\n");
185         $fh->print("unset OAR_MSG_NODEFILE\n");
186         }
187      $fh->print("cd $current_dir\n");
188      $fh->print("$job $job_stdout $job_stderr\n");
189      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
190      $fh->print("exit\n");
191      cede;
192      }
193   }
194
195# asynchrone end job block
196async {
197   while () {
198      for my $job_pid (keys %scheduled) {
199                        # non blocking PID test
200         if (waitpid($job_pid, WNOHANG)) {
201            my $msg = sprintf "end   job %5i / %5i at %s on node %s\n",
202               $scheduled{$job_pid}->{num},
203               $job_pid, time, $scheduled{$job_pid}->{ressource};
204            $log_h->print($msg) if $logtrace;
205            print($msg) if $verbose;
206            close $scheduled{$job_pid}->{fh};
207            # leave ressources for another job
208            $ressources->put($scheduled{$job_pid}->{ressource});
209            $job_todo->down;
210            delete $scheduled{$job_pid};
211            }
212         cede;
213         }
214
215      # checkpointing ! just finishing running job and quit
216      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
217
218      $finished->send if $job_todo->count() == 0;
219      cede;
220      }
221   }
222
223cede;
224
225# all job have been done
226$finished->wait;
227
228# close log trace file
229$log_h->close() if $logtrace;
230
231__END__
232
233=head1 NAME
234
235oar-parexec - parallel execution of many small job
236
237=head1 SYNOPSIS
238
239 oar-parexec --filecmd filecommand [--logtrace tracefile] [--verbose] [--jobnp integer] \
240            [--nodefile filenode] [--masterio basefileio] [--switchio] [--oarsh sssh]
241 oar-parexec --help
242
243=head1 DESCRIPTION
244
245C<oar-parexec> can execute lot of small job in parallel inside a cluster.
246Number of parallel job at one time cannot exceed the number of core define in the node file
247C<oar-parexec> is easier to use inside an OAR job environment
248which define automatically these strategics parameters...
249However, it can be used outside OAR.
250
251Option C<--filecmd> is the only mandatory one.
252
253Small job will be launch in the same folder as the master job.
254Two environment variable are defined for each small job
255and only in case of parallel small job (option C<--jobnp> > 1).
256
257 OAR_NODE_FILE - file that list node for parallel computing
258 OAR_NP        - number of processor affected
259
260The file define by OAR_NODE_FILE is created  in /tmp
261on the node before launching the small job
262and this file will be delete after job complete.
263C<oar-parexec> is a simple script,
264OAR_NODE_FILE will not be deleted in case of crash of the master job.
265
266OAR define other variable that are equivalent to OAR_NODE_FILE:
267OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
268You can use in your script the OAR original file ressources
269by using these variable if you need it.
270 
271
272=head1 OPTIONS
273
274=over 12
275
276=item B<-f|--filecmd    filecommand>
277
278File name which content job list.
279
280=item B<-l|--logtrace tracefile>
281
282File which log and trace running job.
283In case of running the same master command (after crash for example),
284only job that are not mark as done will be run again.
285Be careful, job mark as running (start but not finish) will be run again.
286
287This option is very usefull in case of crash
288but also for checkpointing and idempotent OAR job.
289
290=item B<-v|--verbose>
291
292=item B<-j|--jobnp integer>
293
294Number of processor to allocated for each small job.
2951 by default.
296
297=item B<-n|--nodefile filenode>
298
299File name that list all the node where job could be launch.
300By defaut, it's define automatically by OAR via
301environment variable C<OAR_NODE_FILE>.
302
303For example, if you want to use 6 core on your cluster node,
304you need to put 6 times the hostname node in this file,
305one per line...
306It's a very common file in MPI process !
307
308=item B<-m|--masterio basefileio>
309
310The C<basefileio> will be use in place of environment variable
311C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
312(only use when option C<swithio> is activated).
313
314=item B<-s|--switchio>
315
316Each small job will have it's own output STDOUT and STDERR
317base on master OAR job with C<JOB_NUM> inside
318(or base on C<basefileio> if option C<masterio>).
319Example :
320
321 OAR.151524.stdout -> OAR.151524-JOB_NUM.stdout
322
323where 151524 here is the master C<OAR_JOB_ID>
324and C<JOB_NUM> is the small job nnumber.
325
326=item B<-o|-oarsh command>
327
328Command use to launch a shell on a node.
329By default
330
331 oarsh -q -T
332
333Change it to C<ssh> if you are not using an OAR cluster...
334
335=item B<-h|--help>
336
337=back
338
339
340=head1 EXAMPLE
341
342=head2 Simple list of sequential job
343
344Content for the job file command (option C<--filecmd>) could have:
345
346 - empty line
347 - comment line begin with #
348 - valid shell command
349
350Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
351
352 $HOME/test/subjob1.sh
353 $HOME/test/subjob2.sh
354 $HOME/test/subjob3.sh
355 $HOME/test/subjob4.sh
356 ...
357 $HOME/test/subjob38.sh
358 $HOME/test/subjob39.sh
359 $HOME/test/subjob40.sh
360
361These jobs could be launch by:
362
363 oarsub -n test -l /core=6,walltime=04:00:00 "oar-parexec -f ./subjob.list.txt"
364
365=head2 Parallel job
366
367You need to put the number of core each small job need with option C<--jobnp>.
368If your job is build on OpenMP or MPI,
369you can use OAR_NP and OAR_NODE_FILE variables to configure them.
370On OAR cluster, you need to use C<oarsh> or a wrapper like C<oar-envsh>
371for connexion between node instead of C<ssh>.
372
373Example with parallel small job on 2 core:
374
375 oarsub -n test -l /core=6,walltime=04:00:00 "oar-parexec -j 2 -f ./subjob.list.txt"
376
377=head2 Tracing and master crash
378
379If the master node crash after hours of calculus, everything is lost ?
380No, with option C<--logtrace>,
381it's possible to remember older result
382and not re-run these job the second and next time.
383
384 oarsub -n test -l /core=6,walltime=04:00:00 "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
385
386After a crash or an C<oardel> command,
387you can then re-run the same command that will end to execute the jobs in the list
388
389 oarsub -n test -l /core=6,walltime=04:00:00 "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
390
391C<logtrace> file are just plain file.
392We use the extension '.log' because these files are automatically
393eliminate from our backup system!
394
395=head2 Checkpointing and Idempotent
396
397C<oar-parexec> is compatible with the OAR checkpointing.
398Il you have 2000 small jobs that need 55h to be done on 6 cores,
399you can cut this in small parts.
400
401For this example, we suppose that each small job need about 10min...
402So, we send a checkpoint 12min before the end of the process
403to let C<oar-parexec> finish the jobs started.
404After being checkpointed, C<oar-parexec> do not start any new small job.
405
406 oarsub -t idempotent -n test -l /core=6,walltime=04:00:00 --checkpoint 720 \
407   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
408
409After 3h48min, the OAR job will begin to stop launching new small job.
410When all running small job are finished, it's exit.
411But as the OAR job is type C<idempotent>,
412OAR will re-submit it as long as all small job are not executed...
413
414This way, we let other users a chance to use the cluster!
415
416In this last exemple, we use moldable OAR job with idempotent
417to reserve many core for a small time or a few cores for a long time:
418
419 oarsub -t idempotent -n test \
420   -l /core=50,walltime=01:05:00 \
421   -l /core=6,walltime=04:00:00 \
422   --checkpoint 720 \
423   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
424
425
426=head1 SEE ALSO
427
428oar-dispatch, mpilauncher,
429orsh, oar-envsh, ssh
430
431
432=head1 AUTHORS
433
434Written by Gabriel Moreau, Grenoble - France
435
436
437=head1 LICENSE AND COPYRIGHT
438
439GPL version 2 or later and Perl equivalent
440
441Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
442
Note: See TracBrowser for help on using the repository browser.