source: trunk/oarutils/oar-parexec @ 86

Last change on this file since 86 was 86, checked in by g7moreau, 14 years ago
  • job with comment -> solution: inside {}
File size: 17.7 KB
Line 
1#!/usr/bin/perl
2#
3# 2011/11/27 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
16use Cwd qw( getcwd );
17
18my $file;
19my $dir;
20my $cmd;
21my $logtrace;
22my $verbose;
23my $job_np         = 1;
24my $nodefile       = $ENV{OAR_NODE_FILE} || '';
25my $masterio;
26my $switchio;
27my $help;
28my $oarsh          = 'oarsh -q -T';
29my $sig_transmit;
30my $sig_checkpoint = 'USR2';
31
32Getopt::Long::GetOptions(
33   'file=s'     => \$file,
34   'dir=s'      => \$dir,
35   'cmd=s'      => \$cmd,
36   'logtrace=s' => \$logtrace,
37   'verbose'    => \$verbose,
38   'help'       => \$help,
39   'oarsh=s'    => \$oarsh,
40   'jobnp=i'    => \$job_np,
41   'nodefile=s' => \$nodefile,
42   'masterio=s' => \$masterio,
43   'switchio'   => \$switchio,
44   'transmit'   => \$sig_transmit,
45   'kill=s'     => \$sig_checkpoint,
46   ) || pod2usage(-verbose => 0);
47pod2usage(-verbose => 2) if $help;
48pod2usage(-verbose => 2) if not (
49 (-e "$file")
50 or (-d "$dir" and $cmd ne '')
51 );
52
53# re-run, keep trace of job already done
54my %state;
55my $log_h = IO::File->new();
56if (-e "$logtrace") {
57   $log_h->open("< $logtrace")
58      or die "error: can't read log file: $!";
59   while (<$log_h>) {
60      $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/;
61      $state{$1} = 'end'   if m/^end\s+job\s+([^\s]+)\s/;
62      }
63   $log_h->close();
64   }
65if ($logtrace) {
66   $log_h->open(">> $logtrace")
67      or die "error: can't append log file $logtrace: $!";
68   $log_h->autoflush;
69   $log_h = unblock $log_h;
70   }
71
72# job to run
73my @job = ();
74if (-e "$file") {
75   my $job_num = 0;
76   open(JOB_LIST, '<', "$file") or die "error: can't open job file $file: $!";
77   while (my $job_cmd = <JOB_LIST>) {
78      chomp $job_cmd;
79      next if $job_cmd =~ m/^#/;
80      next if $job_cmd =~ m/^\s*$/;
81      $job_num++;
82      my ($job_name) = $job_cmd =~ m/#.*?\bname=(\S+?)\b/i;
83      $job_name ||= $job_num;
84      push @job, { name => $job_name, cmd => "$job_cmd" };
85      }
86   close JOB_LIST;
87   }
88else {
89   opendir(DIR, $dir) or die "error: can't open folder $dir: $!";
90   while (my $item = readdir(DIR)) {
91      next if $item =~ m/^\./;
92      next if $item =~ m/:/;
93      next if $item =~ m/\.old$/;
94      next if $item =~ m/\.sav$/;
95      next if $item =~ m/\.bak$/;
96      next if $item =~ m/\.no$/;
97      next unless (-d "$dir/$item");
98      push @job, { name => $item, cmd => "( cd $dir/$item/; $cmd )" };
99      }
100   closedir DIR;
101   }
102
103# ressources available
104my @ressources = ();
105open(NODE_FILE, '<', "$nodefile")
106   or die "can't open $nodefile: $!";
107while (<NODE_FILE>) {
108   chomp;
109   next if m/^#/;
110   next if m/^\s*$/;
111   push @ressources, $_;
112   }
113close NODE_FILE;
114
115my $ressource_size = scalar(@ressources);
116die "error: not enought ressources jobnp $job_np > ressources $ressource_size"
117   if $job_np > $ressource_size;
118
119my $current_dir = getcwd();
120
121my $stderr = $ENV{OAR_STDERR} || '';
122$stderr =~ s/\.stderr$//;
123$stderr = $masterio if $masterio;
124my $stdout = $ENV{OAR_STDOUT} || '';
125$stdout =~ s/\.stdout$//;
126$stdout = $masterio if $masterio;
127
128my $finished = new Coro::Signal;
129my $job_todo = new Coro::Semaphore 0;
130my $job_name_maxlen;
131for (@job) {
132   $job_todo->up;
133   $job_name_maxlen = length($_->{name}) if length($_->{name}) > $job_name_maxlen;
134   }
135
136# slice of ressources for parallel job
137my $ressources = new Coro::Channel;
138for my $slot (1 .. int($ressource_size / $job_np)) {
139   $ressources->put(
140      join(',',
141         @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ])
142         );
143   }
144
145my %scheduled = ();
146
147# OAR checkpoint and default signal SIGUSR2
148my $oar_checkpoint = new Coro::Semaphore 0;
149my $notify         = new Coro::Signal;
150$SIG{$sig_checkpoint} = sub {
151   print "warning: receive checkpoint at "
152      . time
153      . ", no new job, just finishing running job\n"
154      if $verbose;
155   $oar_checkpoint->up();
156   $notify->send if $sig_transmit;
157   };
158
159# asynchrone notify job
160async {
161   while () {
162      $notify->wait;
163
164      for my $job_pid (keys %scheduled) {
165         my $job_name     = $scheduled{$job_pid}->{name};
166         my $job_pidfile  = $scheduled{$job_pid}->{pidfile};
167         my $node_connect = $scheduled{$job_pid}->{node_connect};
168
169         my $fh = IO::File->new();
170         $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
171            or die "error: can't notify subjob: $!";
172
173         $fh->autoflush;
174         $fh = unblock $fh;
175
176         $fh->print("kill -$sig_checkpoint \$(cat $job_pidfile)\n");
177         $fh->print("exit\n");
178
179         print "warning: transmit signal $sig_checkpoint"
180            . " to job $job_name on node $node_connect.\n"
181            if $verbose;
182
183         close $fh;
184         cede;
185         }
186      }
187   }
188
189# asynchrone start job block
190async {
191   JOB:
192   for my $job (@job) {
193      my $job_name   = $job->{name};
194      my $job_cmd    = $job->{cmd};
195
196      # job has been already run ?
197      if (exists $state{$job_name}) {
198         if ($state{$job_name} eq 'start') {
199            print "warning: job $job_name was not clearly finished, relaunching...\n"
200               if $verbose;
201            }
202         elsif ($state{$job_name} eq 'end') {
203            delete $state{$job_name}; # free memory
204            $job_todo->down;
205            print "warning: job $job_name already run\n" if $verbose;
206            cede;
207            next JOB;
208            }
209         }
210
211      # take job ressource
212      my $job_ressource = $ressources->get;
213
214      # no more launch job when OAR checkpointing
215      last JOB if $oar_checkpoint->count() > 0;
216
217      my ($node_connect) = split ',', $job_ressource;
218      my $fh = IO::File->new();
219      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
220         or die "error: can't start subjob: $!";
221
222      $fh->autoflush;
223      $fh = unblock $fh;
224
225      my $msg = sprintf "start job %${job_name_maxlen}s / %5i at %s on node %s\n",
226         $job_name, $job_pid, time, $job_ressource;
227      $log_h->print($msg) if $logtrace;
228      print($msg) if $verbose;
229
230      my ($job_stdout, $job_stderr);
231      $job_stdout = ">  $stdout-$job_name.stdout" if $stdout ne '' and $switchio;
232      $job_stderr = "2> $stderr-$job_name.stderr" if $stderr ne '' and $switchio;
233
234      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$ENV{OAR_JOB_ID}-$job_name";
235      my $job_pidfile  = "/tmp/oar-parexec-$ENV{LOGNAME}-$ENV{OAR_JOB_ID}-$job_name.pid";
236
237      $scheduled{$job_pid} = {
238         fh           => $fh,
239         node_connect => $node_connect,
240         ressource    => $job_ressource,
241         name         => $job_name,
242         pidfile      => $job_pidfile,
243         };
244
245      #$job_cmd =~ s/(\s+##.*)$//; # suppress comment
246
247      # set job environment, run it and clean
248      if ($job_np > 1) {
249         $fh->print("printf \""
250               . join('\n', split(',', $job_ressource,))
251               . "\" > $job_nodefile\n");
252         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
253         $fh->print("OAR_NP=$job_np\n");
254         $fh->print("export OAR_NODE_FILE\n");
255         $fh->print("export OAR_NP\n");
256         $fh->print("unset OAR_MSG_NODEFILE\n");
257         }
258      $fh->print("cd $current_dir\n");
259      if ($sig_transmit) {
260         $fh->print("trap 'kill -$sig_checkpoint \$(jobs -p)' $sig_checkpoint\n");
261         $fh->print("echo \$\$ > $job_pidfile\n");
262         $fh->print("{\n");
263         $fh->print("$job_cmd\n");
264         $fh->print("} $job_stdout $job_stderr \&\n");
265         $fh->print("while [ \$(jobs -p | wc -l) -gt 0 ]\n");
266         $fh->print("do\n");
267         $fh->print("   wait\n");
268         $fh->print("done\n");
269         $fh->print("rm -f $job_pidfile\n");
270         }
271      else {
272         $fh->print("{\n");
273         $fh->print("$job_cmd\n");
274         $fh->print("} $job_stdout $job_stderr\n");
275         }
276      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
277      $fh->print("exit\n");
278      cede;
279      }
280   }
281
282# asynchrone end job block
283async {
284   while () {
285      for my $job_pid (keys %scheduled) {
286         # non blocking PID test
287         if (waitpid($job_pid, WNOHANG)) {
288            my $msg = sprintf "end   job %${job_name_maxlen}s / %5i at %s on node %s\n",
289               $scheduled{$job_pid}->{name},
290               $job_pid, time, $scheduled{$job_pid}->{ressource};
291
292            # Job non finish, just suspend if received checkpoint signal
293            $msg =~ s/^end\s+job/suspend job/
294               if $sig_transmit and $oar_checkpoint->count() > 0;
295
296            $log_h->print($msg) if $logtrace;
297            print($msg) if $verbose;
298            close $scheduled{$job_pid}->{fh};
299            # leave ressources for another job
300            $ressources->put($scheduled{$job_pid}->{ressource});
301            $job_todo->down;
302            delete $scheduled{$job_pid};
303            }
304         cede;
305         }
306
307      # checkpointing ! just finishing running job and quit
308      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
309
310      $finished->send if $job_todo->count() == 0;
311      cede;
312      }
313   }
314
315cede;
316
317# all job have been done
318$finished->wait;
319
320# close log trace file
321$log_h->close() if $logtrace;
322
323__END__
324
325=head1 NAME
326
327oar-parexec - parallel execution of many small job
328
329=head1 SYNOPSIS
330
331 oar-parexec --file filecommand \
332    [--logtrace tracefile] [--verbose] \
333    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
334    [--switchio] [--masterio basefileio]
335
336 oar-parexec --dir foldertoiterate --cmd commandtolaunch \
337    [--logtrace tracefile] [--verbose] \
338    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
339    [--switchio] [--masterio basefileio]
340
341 oar-parexec --help
342
343=head1 DESCRIPTION
344
345C<oar-parexec> can execute lot of small job in parallel inside a cluster.
346Number of parallel job at one time cannot exceed the number of core define in the node file
347C<oar-parexec> is easier to use inside an OAR job environment
348which define automatically these strategics parameters...
349However, it can be used outside OAR.
350
351Option C<--file> or C<--dir> and C<--cmd> are the only mandatory parameters.
352
353Small job will be launch in the same folder as the master job.
354Two environment variable are defined for each small job
355and only in case of parallel small job (option C<--jobnp> > 1).
356
357 OAR_NODE_FILE - file that list node for parallel computing
358 OAR_NP        - number of processor affected
359
360The file define by OAR_NODE_FILE is created  in /tmp
361on the node before launching the small job
362and this file will be delete after job complete.
363C<oar-parexec> is a simple script,
364OAR_NODE_FILE will not be deleted in case of crash of the master job.
365
366OAR define other variable that are equivalent to OAR_NODE_FILE:
367OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
368You can use in your script the OAR original file ressources
369by using these variable if you need it.
370
371
372=head1 OPTIONS
373
374=over 12
375
376=item B<-f|--file filecommand>
377
378File name which content job list.
379For the JOB_NAME definition,
380the first valid job in the list will have the number 1 and so on...
381
382It's possible to fix the name inside a comment on the job line.
383For example:
384
385 $HOME/test/subjob1.sh # name=subjob1
386
387The key C<name> is case insensitive,
388the associated value cannot have a space...
389
390=item B<-d|--dir foldertoiterate>
391
392Command C<--cmd> will be launch in all sub-folder of this master folder.
393Files in this folder will be ignored.
394Sub-folder name which begin with F<.>
395or finish with F<.old>, F<.sav>, F<.bak>, F<.no> will either be ignored...
396
397The JOB_NAME is simply the Sub-folder name.
398
399=item B<-c|--cmd commandtolaunch>
400
401Command (and argument to it) tha will be launch in all sub-folder
402parameter folfer C<--dir>
403
404=item B<-l|--logtrace tracefile>
405
406File which log and trace running job.
407In case of running the same master command (after crash for example),
408only job that are not mark as done will be run again.
409Be careful, job mark as running (start but not finish) will be run again.
410Tracing is base on the JOB_NAME between multiple run.
411
412This option is very usefull in case of crash
413but also for checkpointing and idempotent OAR job.
414
415=item B<-v|--verbose>
416
417=item B<-j|--jobnp integer>
418
419Number of processor to allocated for each small job.
4201 by default.
421
422=item B<-n|--nodefile filenode>
423
424File name that list all the node where job could be launch.
425By defaut, it's define automatically by OAR via
426environment variable C<OAR_NODE_FILE>.
427
428For example, if you want to use 6 core on your cluster node,
429you need to put 6 times the hostname node in this file,
430one per line...
431It's a very common file in MPI process !
432
433=item B<-o|-oarsh command>
434
435Command use to launch a shell on a node.
436By default
437
438 oarsh -q -T
439
440Change it to C<ssh> if you are not using an OAR cluster...
441
442=item B<-s|--switchio>
443
444Each small job will have it's own output STDOUT and STDERR
445base on master OAR job with C<JOB_NAME> inside
446(or base on C<basefileio> if option C<masterio>).
447Example :
448
449 OAR.151524.stdout -> OAR.151524-JOB_NAME.stdout
450
451where 151524 here is the master C<OAR_JOB_ID>
452and C<JOB_NAME> is the small job name.
453
454=item B<-m|--masterio basefileio>
455
456The C<basefileio> will be use in place of environment variable
457C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
458(only use when option C<swithio> is activated).
459
460=item B<-k|--kill signal>
461
462Signal to listen and make a clean stop of the current C<oar-parexec> process.
463By default, use USR2 signal (see C<kill -l>> for a list of possible signal).
464
465=item B<-t|--transmit>
466
467Resend catch signal to sub-job when receiving it.
468By default, no signal is transmis to child process.
469
470It's only valuable if use for long sub-job than can
471in return make themselves a clean restart.
472
473
474=item B<-h|--help>
475
476=back
477
478
479=head1 EXAMPLE
480
481=head2 Simple list of sequential job
482
483Content for the job file command (option C<--file>) could have:
484
485 - empty line
486 - comment line begin with #
487 - valid shell command (can containt comment)
488
489Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
490
491 $HOME/test/subjob01.sh  # name=subjob01
492 $HOME/test/subjob02.sh  # name=subjob02
493 $HOME/test/subjob03.sh  # name=subjob03
494 $HOME/test/subjob04.sh  # name=subjob04
495 ...
496 $HOME/test/subjob38.sh  # name=subjob38
497 $HOME/test/subjob39.sh  # name=subjob39
498 $HOME/test/subjob40.sh  # name=subjob40
499
500These jobs could be launch by:
501
502 oarsub -n test -l /core=6,walltime=04:00:00 \
503   "oar-parexec -f ./subjob.list.txt"
504
505=head2 Folder job
506
507In a folder F<subjob.d>, create sub-folder with your data inside : F<test1>, <test2>...
508The same command will be executed in every sub-folder.
509C<oar-parexec> change the current directory to the sub-folder before launching it.
510
511A very simple job could be:
512
513 oarsub -n test -l /core=6,walltime=04:00:00 \
514   "oar-parexec -d ./subjob.d -c 'sleep 10; env'"
515
516The command C<env> will be excuted in all folder F<test1>, F<test2>... after a 10s pause.
517
518Sometime, it's simpler to use file list command,
519sometime, jobs by folder with the same command run is more relevant.
520
521=head2 Parallel job
522
523You need to put the number of core each small job need with option C<--jobnp>.
524If your job is build on OpenMP or MPI,
525you can use OAR_NP and OAR_NODE_FILE variables to configure them.
526On OAR cluster, you need to use C<oarsh> or a wrapper like C<oar-envsh>
527for connexion between node instead of C<ssh>.
528
529Example with parallel small job on 2 core:
530
531 oarsub -n test -l /core=6,walltime=04:00:00 \
532   "oar-parexec -j 2 -f ./subjob.list.txt"
533
534=head2 Tracing and master crash
535
536If the master node crash after hours of calculus, everything is lost ?
537No, with option C<--logtrace>,
538it's possible to remember older result
539and not re-run these job the second and next time.
540
541 oarsub -n test -l /core=6,walltime=04:00:00 \
542   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
543
544After a crash or an C<oardel> command,
545you can then re-run the same command that will end to execute the jobs in the list
546
547 oarsub -n test -l /core=6,walltime=04:00:00 \
548   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
549
550C<logtrace> file are just plain file.
551We use the extension '.log' because these files are automatically
552eliminate from our backup system!
553
554=head2 Checkpointing and Idempotent
555
556C<oar-parexec> is compatible with the OAR checkpointing.
557Il you have 2000 small jobs that need 55h to be done on 6 cores,
558you can cut this in small parts.
559
560For this example, we suppose that each small job need about 10min...
561So, we send a checkpoint 12min before the end of the process
562to let C<oar-parexec> finish the jobs started.
563After being checkpointed, C<oar-parexec> do not start any new small job.
564
565 oarsub -t idempotent -n test \
566   -l /core=6,walltime=04:00:00 \
567   --checkpoint 720 \
568   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
569
570After 3h48min, the OAR job will begin to stop launching new small job.
571When all running small job are finished, it's exit.
572But as the OAR job is type C<idempotent>,
573OAR will re-submit it as long as all small job are not executed...
574
575This way, we let other users a chance to use the cluster!
576
577In this last exemple, we use moldable OAR job with idempotent
578to reserve many core for a small time or a few cores for a long time:
579
580 oarsub -t idempotent -n test \
581   -l /core=50,walltime=01:05:00 \
582   -l /core=6,walltime=04:00:00 \
583   --checkpoint 720 \
584   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
585
586=head2 Signal, recurse and long job
587
588By default, OAR use signal USR2 for checkpointing.
589It's possible to change this with option C<--kill>.
590
591When use with long small job, checkpointing could be too long...
592More than walltime!
593The option C<--transmit> could be use to checkpoint small job!
594These long small job will then stop cleanly and will be restarted next time.
595
596In the C<logtrace> file, small job will have the status suspend.
597They will be launch with the same command line at the next OAR run.
598
599=head1 SEE ALSO
600
601oar-dispatch, mpilauncher,
602orsh, oar-envsh, ssh
603
604
605=head1 AUTHORS
606
607Written by Gabriel Moreau, Grenoble - France
608
609
610=head1 LICENSE AND COPYRIGHT
611
612GPL version 2 or later and Perl equivalent
613
614Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
615
Note: See TracBrowser for help on using the repository browser.