source: trunk/oarutils/oar-parexec @ 46

Last change on this file since 46 was 46, checked in by g7moreau, 9 years ago
  • Line too long in synopsis
  • Change option order
File size: 13.9 KB
Line 
1#!/usr/bin/perl
2#
3# 2011/11/27 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
16use Cwd qw( getcwd );
17
18my $filecmd  = '';
19my $dir      = '';
20my $cmd      = '';
21my $logtrace = '';
22my $verbose;
23my $job_np = 1;
24my $nodefile = $ENV{OAR_NODE_FILE} || '';
25my $masterio;
26my $switchio;
27my $help;
28my $oarsh = 'oarsh -q -T';
29
30Getopt::Long::GetOptions(
31   'filecmd=s'  => \$filecmd,
32   'dir=s'      => \$dir,
33   'cmd=s'      => \$cmd,
34   'logtrace=s' => \$logtrace,
35   'verbose'    => \$verbose,
36   'help'       => \$help,
37   'oarsh=s'    => \$oarsh,
38   'jobnp=i'    => \$job_np,
39   'nodefile=s' => \$nodefile,
40   'masterio=s' => \$masterio,
41   'switchio'   => \$switchio,
42   ) || pod2usage(-verbose => 0);
43pod2usage(-verbose => 2) if $help;
44pod2usage(-verbose => 2) if not (
45 (-e "$filecmd")
46 or (-d "$dir" and $cmd ne '')
47 );
48
49# re-run, keep trace of job already done
50my %state;
51my $log_h = IO::File->new();
52if (-e "$logtrace") {
53   $log_h->open("< $logtrace")
54      or die "error: can't read log file: $!";
55   while (<$log_h>) {
56      $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/;
57      $state{$1} = 'end'   if m/^end\s+job\s+([^\s]+)\s/;
58      }
59   $log_h->close();
60   }
61if ($logtrace) {
62   $log_h->open(">> $logtrace")
63      or die "error: can't append log file $logtrace: $!";
64   $log_h->autoflush;
65   $log_h = unblock $log_h;
66   }
67
68# job to run
69my @job = ();
70if (-e "$filecmd") {
71   my $job_num = 0;
72   open(JOB_LIST, '<', "$filecmd") or die "error: can't open job file $filecmd: $!";
73   while (<JOB_LIST>) {
74      chomp;
75      next if m/^#/;
76      next if m/^\s*$/;
77      $job_num++;
78      push @job, { name => $job_num, cmd => "$_" };
79      }
80   close JOB_LIST;
81   }
82else {
83   opendir(DIR, $dir) or die "error: can't open folder $dir: $!";
84   while (my $item = readdir(DIR)) {
85      next if $item =~ m/^\./;
86      next if $item =~ m/:/;
87      next if $item =~ m/\.old$/;
88      next if $item =~ m/\.sav$/;
89      next if $item =~ m/\.bak$/;
90      next if $item =~ m/\.no$/;
91      next unless (-d "$dir/$item");
92      push @job, { name => $item, cmd => "( cd $dir/$item/; $cmd )" };
93      }
94   closedir DIR;
95   }
96
97# ressources available
98my @ressources = ();
99open(NODE_FILE, '<', "$nodefile")
100   or die "can't open $nodefile: $!";
101while (<NODE_FILE>) {
102   chomp;
103   next if m/^#/;
104   next if m/^\s*$/;
105   push @ressources, $_;
106   }
107close NODE_FILE;
108
109my $ressource_size = scalar(@ressources);
110die "error: not enought ressources jobnp $job_np > ressources $ressource_size"
111   if $job_np > $ressource_size;
112
113my $current_dir = getcwd();
114
115my $stderr = $ENV{OAR_STDERR} || '';
116$stderr =~ s/\.stderr$//;
117$stderr = $masterio if $masterio;
118my $stdout = $ENV{OAR_STDOUT} || '';
119$stdout =~ s/\.stdout$//;
120$stdout = $masterio if $masterio;
121
122my $finished = new Coro::Signal;
123my $job_todo = new Coro::Semaphore 0;
124my $job_name_maxlen;
125for (@job) {
126   $job_todo->up;
127   $job_name_maxlen = length($_->{name}) if length($_->{name}) > $job_name_maxlen;
128   }
129
130# slice of ressources for parallel job
131my $ressources = new Coro::Channel;
132for my $slot (1 .. int($ressource_size / $job_np)) {
133   $ressources->put(
134      join(',',
135         @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ])
136         );
137   }
138
139my %scheduled = ();
140
141# OAR checkpoint and default signal SIGUSR2
142my $oar_checkpoint = new Coro::Semaphore 0;
143$SIG{USR2} = sub {
144   print "warning: receive checkpoint at "
145      . time
146      . ", no new job, just finishing running job\n"
147      if $verbose;
148   $oar_checkpoint->up();
149   };
150
151# asynchrone start job block
152async {
153        JOB:
154   for my $job (@job) {
155      my $job_name = $job->{name};
156      my $job_cmd  = $job->{cmd};
157
158      # job has been already run ?
159      if (exists $state{$job_name}) {
160         if ($state{$job_name} eq 'start') {
161            print "warning: job $job_name was not clearly finished, relaunching...\n"
162               if $verbose;
163            }
164         elsif ($state{$job_name} eq 'end') {
165            delete $state{$job_name}; # free memory
166            $job_todo->down;
167            print "warning: job $job_name already run\n" if $verbose;
168            cede;
169            next JOB;
170            }
171         }
172
173      # take job ressource
174      my $job_ressource = $ressources->get;
175
176      # no more launch job when OAR checkpointing
177      last JOB if $oar_checkpoint->count() > 0;
178
179      my ($node_connect) = split ',', $job_ressource;
180      my $fh = IO::File->new();
181      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
182         or die "error: can't start subjob: $!";
183
184      $fh->autoflush;
185      $fh = unblock $fh;
186
187      $scheduled{$job_pid} = {
188         fh           => $fh,
189         node_connect => $node_connect,
190         ressource    => $job_ressource,
191         name         => $job_name
192         };
193
194      my $msg = sprintf "start job %${job_name_maxlen}s / %5i at %s on node %s\n",
195         $job_name, $job_pid, time, $job_ressource;
196      $log_h->print($msg) if $logtrace;
197      print($msg) if $verbose;
198
199      my ($job_stdout, $job_stderr);
200      $job_stdout = ">  $stdout-$job_name.stdout" if $stdout ne '' and $switchio;
201      $job_stderr = "2> $stderr-$job_name.stderr" if $stderr ne '' and $switchio;
202
203      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$job_name";
204
205     # set job environment, run it and clean
206      if ($job_np > 1) {
207         $fh->print("printf \""
208               . join('\n', split(',', $job_ressource,))
209               . "\" > $job_nodefile\n");
210         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
211         $fh->print("OAR_NP=$job_np\n");
212         $fh->print("export OAR_NODE_FILE\n");
213         $fh->print("export OAR_NP\n");
214         $fh->print("unset OAR_MSG_NODEFILE\n");
215         }
216      $fh->print("cd $current_dir\n");
217      $fh->print("$job_cmd $job_stdout $job_stderr\n");
218      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
219      $fh->print("exit\n");
220      cede;
221      }
222   }
223
224# asynchrone end job block
225async {
226   while () {
227      for my $job_pid (keys %scheduled) {
228                        # non blocking PID test
229         if (waitpid($job_pid, WNOHANG)) {
230            my $msg = sprintf "end   job %${job_name_maxlen}s / %5i at %s on node %s\n",
231               $scheduled{$job_pid}->{name},
232               $job_pid, time, $scheduled{$job_pid}->{ressource};
233            $log_h->print($msg) if $logtrace;
234            print($msg) if $verbose;
235            close $scheduled{$job_pid}->{fh};
236            # leave ressources for another job
237            $ressources->put($scheduled{$job_pid}->{ressource});
238            $job_todo->down;
239            delete $scheduled{$job_pid};
240            }
241         cede;
242         }
243
244      # checkpointing ! just finishing running job and quit
245      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
246
247      $finished->send if $job_todo->count() == 0;
248      cede;
249      }
250   }
251
252cede;
253
254# all job have been done
255$finished->wait;
256
257# close log trace file
258$log_h->close() if $logtrace;
259
260__END__
261
262=head1 NAME
263
264oar-parexec - parallel execution of many small job
265
266=head1 SYNOPSIS
267
268 oar-parexec --filecmd filecommand [--logtrace tracefile] [--verbose] \
269            [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
270            [--switchio] [--masterio basefileio]
271
272 oar-parexec --dir foldertoitemize --cmd commandtolaunch [--logtrace tracefile] [--verbose] \
273            [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
274            [--switchio] [--masterio basefileio]
275
276 oar-parexec --help
277
278=head1 DESCRIPTION
279
280C<oar-parexec> can execute lot of small job in parallel inside a cluster.
281Number of parallel job at one time cannot exceed the number of core define in the node file
282C<oar-parexec> is easier to use inside an OAR job environment
283which define automatically these strategics parameters...
284However, it can be used outside OAR.
285
286Option C<--filecmd> or C<--dir> and C<--cmd> are the only mandatory parameters.
287
288Small job will be launch in the same folder as the master job.
289Two environment variable are defined for each small job
290and only in case of parallel small job (option C<--jobnp> > 1).
291
292 OAR_NODE_FILE - file that list node for parallel computing
293 OAR_NP        - number of processor affected
294
295The file define by OAR_NODE_FILE is created  in /tmp
296on the node before launching the small job
297and this file will be delete after job complete.
298C<oar-parexec> is a simple script,
299OAR_NODE_FILE will not be deleted in case of crash of the master job.
300
301OAR define other variable that are equivalent to OAR_NODE_FILE:
302OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
303You can use in your script the OAR original file ressources
304by using these variable if you need it.
305 
306
307=head1 OPTIONS
308
309=over 12
310
311=item B<-f|--filecmd filecommand>
312
313File name which content job list.
314For the JOB_NAME definition,
315the first valid job in the list will have the number 1 and so on...
316
317=item B<-d|--dir foldertoitemize>
318
319Command C<--cmd> will be launch in all sub-folder of this master folder.
320Files in this folder will be ignored.
321Sub-folder name which begin with '.'
322or finish with '.old', '.sav', '.bak', '.no' will either be ignored...
323
324The JOB_NAME is simply the Sub-folder name.
325
326=item B<-c|--cmd commandtolaunch>
327
328Command (and argument to it) tha will be launch in all sub-folder
329parameter folfer C<--dir>
330
331=item B<-l|--logtrace tracefile>
332
333File which log and trace running job.
334In case of running the same master command (after crash for example),
335only job that are not mark as done will be run again.
336Be careful, job mark as running (start but not finish) will be run again.
337Tracing is base on the JOB_NAME between multiple run.
338
339This option is very usefull in case of crash
340but also for checkpointing and idempotent OAR job.
341
342=item B<-v|--verbose>
343
344=item B<-j|--jobnp integer>
345
346Number of processor to allocated for each small job.
3471 by default.
348
349=item B<-n|--nodefile filenode>
350
351File name that list all the node where job could be launch.
352By defaut, it's define automatically by OAR via
353environment variable C<OAR_NODE_FILE>.
354
355For example, if you want to use 6 core on your cluster node,
356you need to put 6 times the hostname node in this file,
357one per line...
358It's a very common file in MPI process !
359
360=item B<-o|-oarsh command>
361
362Command use to launch a shell on a node.
363By default
364
365 oarsh -q -T
366
367Change it to C<ssh> if you are not using an OAR cluster...
368
369=item B<-s|--switchio>
370
371Each small job will have it's own output STDOUT and STDERR
372base on master OAR job with C<JOB_NAME> inside
373(or base on C<basefileio> if option C<masterio>).
374Example :
375
376 OAR.151524.stdout -> OAR.151524-JOB_NAME.stdout
377
378where 151524 here is the master C<OAR_JOB_ID>
379and C<JOB_NAME> is the small job name.
380
381=item B<-m|--masterio basefileio>
382
383The C<basefileio> will be use in place of environment variable
384C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
385(only use when option C<swithio> is activated).
386
387=item B<-h|--help>
388
389=back
390
391
392=head1 EXAMPLE
393
394=head2 Simple list of sequential job
395
396Content for the job file command (option C<--filecmd>) could have:
397
398 - empty line
399 - comment line begin with #
400 - valid shell command
401
402Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
403
404 $HOME/test/subjob1.sh
405 $HOME/test/subjob2.sh
406 $HOME/test/subjob3.sh
407 $HOME/test/subjob4.sh
408 ...
409 $HOME/test/subjob38.sh
410 $HOME/test/subjob39.sh
411 $HOME/test/subjob40.sh
412
413These jobs could be launch by:
414
415 oarsub -n test -l /core=6,walltime=04:00:00 "oar-parexec -f ./subjob.list.txt"
416
417=head2 Parallel job
418
419You need to put the number of core each small job need with option C<--jobnp>.
420If your job is build on OpenMP or MPI,
421you can use OAR_NP and OAR_NODE_FILE variables to configure them.
422On OAR cluster, you need to use C<oarsh> or a wrapper like C<oar-envsh>
423for connexion between node instead of C<ssh>.
424
425Example with parallel small job on 2 core:
426
427 oarsub -n test -l /core=6,walltime=04:00:00 "oar-parexec -j 2 -f ./subjob.list.txt"
428
429=head2 Tracing and master crash
430
431If the master node crash after hours of calculus, everything is lost ?
432No, with option C<--logtrace>,
433it's possible to remember older result
434and not re-run these job the second and next time.
435
436 oarsub -n test -l /core=6,walltime=04:00:00 "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
437
438After a crash or an C<oardel> command,
439you can then re-run the same command that will end to execute the jobs in the list
440
441 oarsub -n test -l /core=6,walltime=04:00:00 "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
442
443C<logtrace> file are just plain file.
444We use the extension '.log' because these files are automatically
445eliminate from our backup system!
446
447=head2 Checkpointing and Idempotent
448
449C<oar-parexec> is compatible with the OAR checkpointing.
450Il you have 2000 small jobs that need 55h to be done on 6 cores,
451you can cut this in small parts.
452
453For this example, we suppose that each small job need about 10min...
454So, we send a checkpoint 12min before the end of the process
455to let C<oar-parexec> finish the jobs started.
456After being checkpointed, C<oar-parexec> do not start any new small job.
457
458 oarsub -t idempotent -n test -l /core=6,walltime=04:00:00 --checkpoint 720 \
459   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
460
461After 3h48min, the OAR job will begin to stop launching new small job.
462When all running small job are finished, it's exit.
463But as the OAR job is type C<idempotent>,
464OAR will re-submit it as long as all small job are not executed...
465
466This way, we let other users a chance to use the cluster!
467
468In this last exemple, we use moldable OAR job with idempotent
469to reserve many core for a small time or a few cores for a long time:
470
471 oarsub -t idempotent -n test \
472   -l /core=50,walltime=01:05:00 \
473   -l /core=6,walltime=04:00:00 \
474   --checkpoint 720 \
475   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
476
477
478=head1 SEE ALSO
479
480oar-dispatch, mpilauncher,
481orsh, oar-envsh, ssh
482
483
484=head1 AUTHORS
485
486Written by Gabriel Moreau, Grenoble - France
487
488
489=head1 LICENSE AND COPYRIGHT
490
491GPL version 2 or later and Perl equivalent
492
493Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
494
Note: See TracBrowser for help on using the repository browser.