source: trunk/oarutils/oar-parexec @ 76

Last change on this file since 76 was 76, checked in by g7moreau, 12 years ago
  • Change message in logtrace. Suspend process must be relaunch
File size: 14.8 KB
Line 
1#!/usr/bin/perl
2#
3# 2011/11/27 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
16use Cwd qw( getcwd );
17
18my $file;
19my $dir;
20my $cmd;
21my $logtrace;
22my $verbose;
23my $job_np         = 1;
24my $nodefile       = $ENV{OAR_NODE_FILE} || '';
25my $masterio;
26my $switchio;
27my $help;
28my $oarsh          = 'oarsh -q -T';
29my $sig_transmit;
30my $sig_checkpoint = 'USR2';
31
32Getopt::Long::GetOptions(
33   'file=s'     => \$file,
34   'dir=s'      => \$dir,
35   'cmd=s'      => \$cmd,
36   'logtrace=s' => \$logtrace,
37   'verbose'    => \$verbose,
38   'help'       => \$help,
39   'oarsh=s'    => \$oarsh,
40   'jobnp=i'    => \$job_np,
41   'nodefile=s' => \$nodefile,
42   'masterio=s' => \$masterio,
43   'switchio'   => \$switchio,
44   'transmit'   => \$sig_transmit,
45   'kill=s'     => \$sig_checkpoint,
46   ) || pod2usage(-verbose => 0);
47pod2usage(-verbose => 2) if $help;
48pod2usage(-verbose => 2) if not (
49 (-e "$file")
50 or (-d "$dir" and $cmd ne '')
51 );
52
53# re-run, keep trace of job already done
54my %state;
55my $log_h = IO::File->new();
56if (-e "$logtrace") {
57   $log_h->open("< $logtrace")
58      or die "error: can't read log file: $!";
59   while (<$log_h>) {
60      $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/;
61      $state{$1} = 'end'   if m/^end\s+job\s+([^\s]+)\s/;
62      }
63   $log_h->close();
64   }
65if ($logtrace) {
66   $log_h->open(">> $logtrace")
67      or die "error: can't append log file $logtrace: $!";
68   $log_h->autoflush;
69   $log_h = unblock $log_h;
70   }
71
72# job to run
73my @job = ();
74if (-e "$file") {
75   my $job_num = 0;
76   open(JOB_LIST, '<', "$file") or die "error: can't open job file $file: $!";
77   while (<JOB_LIST>) {
78      chomp;
79      next if m/^#/;
80      next if m/^\s*$/;
81      $job_num++;
82      push @job, { name => $job_num, cmd => "$_" };
83      }
84   close JOB_LIST;
85   }
86else {
87   opendir(DIR, $dir) or die "error: can't open folder $dir: $!";
88   while (my $item = readdir(DIR)) {
89      next if $item =~ m/^\./;
90      next if $item =~ m/:/;
91      next if $item =~ m/\.old$/;
92      next if $item =~ m/\.sav$/;
93      next if $item =~ m/\.bak$/;
94      next if $item =~ m/\.no$/;
95      next unless (-d "$dir/$item");
96      push @job, { name => $item, cmd => "( cd $dir/$item/; $cmd )" };
97      }
98   closedir DIR;
99   }
100
101# ressources available
102my @ressources = ();
103open(NODE_FILE, '<', "$nodefile")
104   or die "can't open $nodefile: $!";
105while (<NODE_FILE>) {
106   chomp;
107   next if m/^#/;
108   next if m/^\s*$/;
109   push @ressources, $_;
110   }
111close NODE_FILE;
112
113my $ressource_size = scalar(@ressources);
114die "error: not enought ressources jobnp $job_np > ressources $ressource_size"
115   if $job_np > $ressource_size;
116
117my $current_dir = getcwd();
118
119my $stderr = $ENV{OAR_STDERR} || '';
120$stderr =~ s/\.stderr$//;
121$stderr = $masterio if $masterio;
122my $stdout = $ENV{OAR_STDOUT} || '';
123$stdout =~ s/\.stdout$//;
124$stdout = $masterio if $masterio;
125
126my $finished = new Coro::Signal;
127my $job_todo = new Coro::Semaphore 0;
128my $job_name_maxlen;
129for (@job) {
130   $job_todo->up;
131   $job_name_maxlen = length($_->{name}) if length($_->{name}) > $job_name_maxlen;
132   }
133
134# slice of ressources for parallel job
135my $ressources = new Coro::Channel;
136for my $slot (1 .. int($ressource_size / $job_np)) {
137   $ressources->put(
138      join(',',
139         @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ])
140         );
141   }
142
143my %scheduled = ();
144
145# OAR checkpoint and default signal SIGUSR2
146my $oar_checkpoint = new Coro::Semaphore 0;
147$SIG{$sig_checkpoint} = sub {
148   print "warning: receive checkpoint at "
149      . time
150      . ", no new job, just finishing running job\n"
151      if $verbose;
152   $oar_checkpoint->up();
153   kill $sig_checkpoint => keys %scheduled if $sig_transmit;
154   };
155
156# asynchrone start job block
157async {
158        JOB:
159   for my $job (@job) {
160      my $job_name = $job->{name};
161      my $job_cmd  = $job->{cmd};
162
163      # job has been already run ?
164      if (exists $state{$job_name}) {
165         if ($state{$job_name} eq 'start') {
166            print "warning: job $job_name was not clearly finished, relaunching...\n"
167               if $verbose;
168            }
169         elsif ($state{$job_name} eq 'end') {
170            delete $state{$job_name}; # free memory
171            $job_todo->down;
172            print "warning: job $job_name already run\n" if $verbose;
173            cede;
174            next JOB;
175            }
176         }
177
178      # take job ressource
179      my $job_ressource = $ressources->get;
180
181      # no more launch job when OAR checkpointing
182      last JOB if $oar_checkpoint->count() > 0;
183
184      my ($node_connect) = split ',', $job_ressource;
185      my $fh = IO::File->new();
186      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
187         or die "error: can't start subjob: $!";
188
189      $fh->autoflush;
190      $fh = unblock $fh;
191
192      $scheduled{$job_pid} = {
193         fh           => $fh,
194         node_connect => $node_connect,
195         ressource    => $job_ressource,
196         name         => $job_name
197         };
198
199      my $msg = sprintf "start job %${job_name_maxlen}s / %5i at %s on node %s\n",
200         $job_name, $job_pid, time, $job_ressource;
201      $log_h->print($msg) if $logtrace;
202      print($msg) if $verbose;
203
204      my ($job_stdout, $job_stderr);
205      $job_stdout = ">  $stdout-$job_name.stdout" if $stdout ne '' and $switchio;
206      $job_stderr = "2> $stderr-$job_name.stderr" if $stderr ne '' and $switchio;
207
208      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$job_name";
209
210     # set job environment, run it and clean
211      if ($job_np > 1) {
212         $fh->print("printf \""
213               . join('\n', split(',', $job_ressource,))
214               . "\" > $job_nodefile\n");
215         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
216         $fh->print("OAR_NP=$job_np\n");
217         $fh->print("export OAR_NODE_FILE\n");
218         $fh->print("export OAR_NP\n");
219         $fh->print("unset OAR_MSG_NODEFILE\n");
220         }
221      $fh->print("cd $current_dir\n");
222      $fh->print("$job_cmd $job_stdout $job_stderr\n");
223      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
224      $fh->print("exit\n");
225      cede;
226      }
227   }
228
229# asynchrone end job block
230async {
231   while () {
232      for my $job_pid (keys %scheduled) {
233                        # non blocking PID test
234         if (waitpid($job_pid, WNOHANG)) {
235            my $msg = sprintf "end   job %${job_name_maxlen}s / %5i at %s on node %s\n",
236               $scheduled{$job_pid}->{name},
237               $job_pid, time, $scheduled{$job_pid}->{ressource};
238
239            # Job non finish, just suspend if received checkpoint signal
240            $msg =~ s/^end\s+job/suspend job/
241               if $sig_transmit and $oar_checkpoint->count() > 0;
242
243            $log_h->print($msg) if $logtrace;
244            print($msg) if $verbose;
245            close $scheduled{$job_pid}->{fh};
246            # leave ressources for another job
247            $ressources->put($scheduled{$job_pid}->{ressource});
248            $job_todo->down;
249            delete $scheduled{$job_pid};
250            }
251         cede;
252         }
253
254      # checkpointing ! just finishing running job and quit
255      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
256
257      $finished->send if $job_todo->count() == 0;
258      cede;
259      }
260   }
261
262cede;
263
264# all job have been done
265$finished->wait;
266
267# close log trace file
268$log_h->close() if $logtrace;
269
270__END__
271
272=head1 NAME
273
274oar-parexec - parallel execution of many small job
275
276=head1 SYNOPSIS
277
278 oar-parexec --file filecommand \
279    [--logtrace tracefile] [--verbose] \
280    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
281    [--switchio] [--masterio basefileio]
282
283 oar-parexec --dir foldertoiterate --cmd commandtolaunch \
284    [--logtrace tracefile] [--verbose] \
285    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
286    [--switchio] [--masterio basefileio]
287
288 oar-parexec --help
289
290=head1 DESCRIPTION
291
292C<oar-parexec> can execute lot of small job in parallel inside a cluster.
293Number of parallel job at one time cannot exceed the number of core define in the node file
294C<oar-parexec> is easier to use inside an OAR job environment
295which define automatically these strategics parameters...
296However, it can be used outside OAR.
297
298Option C<--file> or C<--dir> and C<--cmd> are the only mandatory parameters.
299
300Small job will be launch in the same folder as the master job.
301Two environment variable are defined for each small job
302and only in case of parallel small job (option C<--jobnp> > 1).
303
304 OAR_NODE_FILE - file that list node for parallel computing
305 OAR_NP        - number of processor affected
306
307The file define by OAR_NODE_FILE is created  in /tmp
308on the node before launching the small job
309and this file will be delete after job complete.
310C<oar-parexec> is a simple script,
311OAR_NODE_FILE will not be deleted in case of crash of the master job.
312
313OAR define other variable that are equivalent to OAR_NODE_FILE:
314OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
315You can use in your script the OAR original file ressources
316by using these variable if you need it.
317 
318
319=head1 OPTIONS
320
321=over 12
322
323=item B<-f|--file filecommand>
324
325File name which content job list.
326For the JOB_NAME definition,
327the first valid job in the list will have the number 1 and so on...
328
329=item B<-d|--dir foldertoiterate>
330
331Command C<--cmd> will be launch in all sub-folder of this master folder.
332Files in this folder will be ignored.
333Sub-folder name which begin with F<.>
334or finish with F<.old>, F<.sav>, F<.bak>, F<.no> will either be ignored...
335
336The JOB_NAME is simply the Sub-folder name.
337
338=item B<-c|--cmd commandtolaunch>
339
340Command (and argument to it) tha will be launch in all sub-folder
341parameter folfer C<--dir>
342
343=item B<-l|--logtrace tracefile>
344
345File which log and trace running job.
346In case of running the same master command (after crash for example),
347only job that are not mark as done will be run again.
348Be careful, job mark as running (start but not finish) will be run again.
349Tracing is base on the JOB_NAME between multiple run.
350
351This option is very usefull in case of crash
352but also for checkpointing and idempotent OAR job.
353
354=item B<-v|--verbose>
355
356=item B<-j|--jobnp integer>
357
358Number of processor to allocated for each small job.
3591 by default.
360
361=item B<-n|--nodefile filenode>
362
363File name that list all the node where job could be launch.
364By defaut, it's define automatically by OAR via
365environment variable C<OAR_NODE_FILE>.
366
367For example, if you want to use 6 core on your cluster node,
368you need to put 6 times the hostname node in this file,
369one per line...
370It's a very common file in MPI process !
371
372=item B<-o|-oarsh command>
373
374Command use to launch a shell on a node.
375By default
376
377 oarsh -q -T
378
379Change it to C<ssh> if you are not using an OAR cluster...
380
381=item B<-s|--switchio>
382
383Each small job will have it's own output STDOUT and STDERR
384base on master OAR job with C<JOB_NAME> inside
385(or base on C<basefileio> if option C<masterio>).
386Example :
387
388 OAR.151524.stdout -> OAR.151524-JOB_NAME.stdout
389
390where 151524 here is the master C<OAR_JOB_ID>
391and C<JOB_NAME> is the small job name.
392
393=item B<-m|--masterio basefileio>
394
395The C<basefileio> will be use in place of environment variable
396C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
397(only use when option C<swithio> is activated).
398
399=item B<-h|--help>
400
401=back
402
403
404=head1 EXAMPLE
405
406=head2 Simple list of sequential job
407
408Content for the job file command (option C<--file>) could have:
409
410 - empty line
411 - comment line begin with #
412 - valid shell command
413
414Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
415
416 $HOME/test/subjob1.sh
417 $HOME/test/subjob2.sh
418 $HOME/test/subjob3.sh
419 $HOME/test/subjob4.sh
420 ...
421 $HOME/test/subjob38.sh
422 $HOME/test/subjob39.sh
423 $HOME/test/subjob40.sh
424
425These jobs could be launch by:
426
427 oarsub -n test -l /core=6,walltime=04:00:00 \
428   "oar-parexec -f ./subjob.list.txt"
429
430=head2 Folder job
431
432In a folder F<subjob.d>, create sub-folder with your data inside : F<test1>, <test2>...
433The same command will be executed in every sub-folder.
434C<oar-parexec> change the current directory to the sub-folder before launching it.
435
436A very simple job could be:
437
438 oarsub -n test -l /core=6,walltime=04:00:00 \
439   "oar-parexec -d ./subjob.d -c 'sleep 10; env'"
440
441The command C<env> will be excuted in all folder F<test1>, F<test2>... after a 10s pause.
442
443Sometime, it's simpler to use file list command,
444sometime, jobs by folder with the same command run is more relevant.
445
446=head2 Parallel job
447
448You need to put the number of core each small job need with option C<--jobnp>.
449If your job is build on OpenMP or MPI,
450you can use OAR_NP and OAR_NODE_FILE variables to configure them.
451On OAR cluster, you need to use C<oarsh> or a wrapper like C<oar-envsh>
452for connexion between node instead of C<ssh>.
453
454Example with parallel small job on 2 core:
455
456 oarsub -n test -l /core=6,walltime=04:00:00 \
457   "oar-parexec -j 2 -f ./subjob.list.txt"
458
459=head2 Tracing and master crash
460
461If the master node crash after hours of calculus, everything is lost ?
462No, with option C<--logtrace>,
463it's possible to remember older result
464and not re-run these job the second and next time.
465
466 oarsub -n test -l /core=6,walltime=04:00:00 \
467   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
468
469After a crash or an C<oardel> command,
470you can then re-run the same command that will end to execute the jobs in the list
471
472 oarsub -n test -l /core=6,walltime=04:00:00 \
473   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
474
475C<logtrace> file are just plain file.
476We use the extension '.log' because these files are automatically
477eliminate from our backup system!
478
479=head2 Checkpointing and Idempotent
480
481C<oar-parexec> is compatible with the OAR checkpointing.
482Il you have 2000 small jobs that need 55h to be done on 6 cores,
483you can cut this in small parts.
484
485For this example, we suppose that each small job need about 10min...
486So, we send a checkpoint 12min before the end of the process
487to let C<oar-parexec> finish the jobs started.
488After being checkpointed, C<oar-parexec> do not start any new small job.
489
490 oarsub -t idempotent -n test \
491   -l /core=6,walltime=04:00:00 \
492   --checkpoint 720 \
493   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
494
495After 3h48min, the OAR job will begin to stop launching new small job.
496When all running small job are finished, it's exit.
497But as the OAR job is type C<idempotent>,
498OAR will re-submit it as long as all small job are not executed...
499
500This way, we let other users a chance to use the cluster!
501
502In this last exemple, we use moldable OAR job with idempotent
503to reserve many core for a small time or a few cores for a long time:
504
505 oarsub -t idempotent -n test \
506   -l /core=50,walltime=01:05:00 \
507   -l /core=6,walltime=04:00:00 \
508   --checkpoint 720 \
509   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
510
511
512=head1 SEE ALSO
513
514oar-dispatch, mpilauncher,
515orsh, oar-envsh, ssh
516
517
518=head1 AUTHORS
519
520Written by Gabriel Moreau, Grenoble - France
521
522
523=head1 LICENSE AND COPYRIGHT
524
525GPL version 2 or later and Perl equivalent
526
527Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
528
Note: See TracBrowser for help on using the repository browser.