source: trunk/oarutils/oar-parexec @ 114

Last change on this file since 114 was 113, checked in by g7moreau, 9 years ago
  • Delay launch of each subjob by one second
File size: 19.7 KB
Line 
1#!/usr/bin/perl
2#
3# 2011/11/27 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
16use Cwd qw( getcwd );
17
18my $file;
19my $dir;
20my $cmd;
21my $logtrace;
22my $verbose;
23my $job_np         = 1;
24my $nodefile       = $ENV{OAR_NODE_FILE} || '';
25my $masterio;
26my $switchio;
27my $help;
28my $oarsh          = 'oarsh -q -T';
29my $sig_transmit;
30my $sig_checkpoint = 'USR2';
31my $job_launch_brake = 1; # one second time brake
32
33Getopt::Long::GetOptions(
34   'file=s'     => \$file,
35   'dir=s'      => \$dir,
36   'cmd=s'      => \$cmd,
37   'logtrace=s' => \$logtrace,
38   'verbose'    => \$verbose,
39   'help'       => \$help,
40   'oarsh=s'    => \$oarsh,
41   'jobnp=i'    => \$job_np,
42   'nodefile=s' => \$nodefile,
43   'masterio=s' => \$masterio,
44   'switchio'   => \$switchio,
45   'transmit'   => \$sig_transmit,
46   'kill=s'     => \$sig_checkpoint,
47   ) || pod2usage(-verbose => 0);
48pod2usage(-verbose => 2) if $help;
49pod2usage(-verbose => 2) if not (
50 (-e "$file")
51 or (-d "$dir" and $cmd ne '')
52 );
53
54# re-run, keep trace of job already done
55my %state;
56my $log_h = IO::File->new();
57if (-e "$logtrace") {
58   $log_h->open("< $logtrace")
59      or die "error: can't read log file: $!";
60   while (<$log_h>) {
61      $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/;
62      $state{$1} = 'end'   if m/^end\s+job\s+([^\s]+)\s/;
63      }
64   $log_h->close();
65   }
66if ($logtrace) {
67   $log_h->open(">> $logtrace")
68      or die "error: can't append log file $logtrace: $!";
69   $log_h->autoflush;
70   $log_h = unblock $log_h;
71   }
72
73# job to run
74my @job = ();
75if (-e "$file") {
76   my $job_num = 0;
77   open(JOB_LIST, '<', "$file") or die "error: can't open job file $file: $!";
78   while (my $job_cmd = <JOB_LIST>) {
79      chomp $job_cmd;
80      next if $job_cmd =~ m/^#/;
81      next if $job_cmd =~ m/^\s*$/;
82      $job_num++;
83      my ($job_name) = $job_cmd =~ m/#.*?\bname=(\S+?)\b/i;
84      $job_name ||= $job_num;
85      push @job, {
86         name   => $job_name,
87         cmd    => "$job_cmd",
88         num    => $job_num,
89         };
90      }
91   close JOB_LIST;
92   }
93else {
94   my $job_num = 0;
95   opendir(DIR, $dir) or die "error: can't open folder $dir: $!";
96   while (my $item = readdir(DIR)) {
97      next if $item =~ m/^\./;
98      next if $item =~ m/:/;
99      next if $item =~ m/\.old$/;
100      next if $item =~ m/\.sav$/;
101      next if $item =~ m/\.bak$/;
102      next if $item =~ m/\.no$/;
103      next unless (-d "$dir/$item");
104      $job_num++;
105      push @job, {
106         name   => $item,
107         cmd    => "cd $dir/$item/; $cmd",
108         num    => $job_num,
109         };
110      }
111   closedir DIR;
112   }
113
114# assume unique job name
115{
116   my %seen = ();
117   my $count_unique_name = grep { ! $seen{ $_->{name} }++ } @job;
118   if ($count_unique_name != $#job) {
119      $_->{name} = $_->{num} for @job;
120      }
121   }
122
123# ressources available
124my @ressources = ();
125open(NODE_FILE, '<', "$nodefile")
126   or die "can't open $nodefile: $!";
127while (<NODE_FILE>) {
128   chomp;
129   next if m/^#/;
130   next if m/^\s*$/;
131   push @ressources, $_;
132   }
133close NODE_FILE;
134
135my $ressource_size = scalar(@ressources);
136die "error: not enought ressources jobnp $job_np > ressources $ressource_size"
137   if $job_np > $ressource_size;
138
139my $current_dir = getcwd();
140
141my $stderr = $ENV{OAR_STDERR} || '';
142$stderr =~ s/\.stderr$//;
143$stderr = $masterio if $masterio;
144my $stdout = $ENV{OAR_STDOUT} || '';
145$stdout =~ s/\.stdout$//;
146$stdout = $masterio if $masterio;
147
148my $finished = new Coro::Signal;
149my $job_todo = new Coro::Semaphore 0;
150my $job_name_maxlen;
151for (@job) {
152   $job_todo->up;
153   $job_name_maxlen = length($_->{name}) if length($_->{name}) > $job_name_maxlen;
154   }
155
156# slice of ressources for parallel job
157my $ressources = new Coro::Channel;
158for my $slot (1 .. int($ressource_size / $job_np)) {
159   $ressources->put(
160      join(',',
161         @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ])
162         );
163   }
164
165my %scheduled = ();
166
167# OAR checkpoint and default signal SIGUSR2
168my $oar_checkpoint = new Coro::Semaphore 0;
169my $notify         = new Coro::Signal;
170$SIG{$sig_checkpoint} = sub {
171   print "warning: receive checkpoint at "
172      . time
173      . ", no new job, just finishing running job\n"
174      if $verbose;
175   $oar_checkpoint->up();
176   $notify->send if $sig_transmit;
177   };
178
179# asynchrone notify job
180async {
181   while () {
182      $notify->wait;
183
184      for my $job_pid (keys %scheduled) {
185         my $job_name     = $scheduled{$job_pid}->{name};
186         my $job_pidfile  = $scheduled{$job_pid}->{pidfile};
187         my $node_connect = $scheduled{$job_pid}->{node_connect};
188
189         my $fh = IO::File->new();
190         $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
191            or die "error: can't notify subjob: $!";
192
193         $fh->autoflush;
194         $fh = unblock $fh;
195
196         $fh->print("kill -$sig_checkpoint \$(cat $job_pidfile)\n");
197         $fh->print("exit\n");
198
199         print "warning: transmit signal $sig_checkpoint"
200            . " to job $job_name on node $node_connect.\n"
201            if $verbose;
202
203         close $fh;
204         cede;
205         }
206      }
207   }
208
209# asynchrone start job block
210async {
211   my $timer;
212   JOB:
213   for my $job (@job) {
214      my $job_name   = $job->{name};
215      my $job_cmd    = $job->{cmd};
216
217      # job has been already run ?
218      if (exists $state{$job_name}) {
219         if ($state{$job_name} eq 'start') {
220            print "warning: job $job_name was not clearly finished, relaunching...\n"
221               if $verbose;
222            }
223         elsif ($state{$job_name} eq 'end') {
224            delete $state{$job_name}; # free memory
225            $job_todo->down;
226            print "warning: job $job_name already run\n" if $verbose;
227            cede;
228            next JOB;
229            }
230         }
231
232      # wait to not re-launch oarstat to fast
233      # equivalent to sleep $job_launch_brake
234      $timer = AE::now + $job_launch_brake;
235      while ( AE::now < $timer ) {
236         # force update of AE time
237         AE::now_update;
238         cede;
239         }
240
241      # take job ressource
242      my $job_ressource = $ressources->get;
243
244      # no more launch job when OAR checkpointing
245      last JOB if $oar_checkpoint->count() > 0;
246
247      my ($node_connect) = split ',', $job_ressource;
248      my $fh = IO::File->new();
249      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
250         or die "error: can't start subjob: $!";
251
252      $fh->autoflush;
253      $fh = unblock $fh;
254
255      my $msg = sprintf "start job %${job_name_maxlen}s / %5i at %s oar job %i on node %s\n",
256         $job_name, $job_pid, time, $ENV{OAR_JOB_ID}, $job_ressource;
257      $log_h->print($msg) if $logtrace;
258      print($msg) if $verbose;
259
260      my ($job_stdout, $job_stderr);
261      $job_stdout = ">  $stdout-$job_name.stdout" if $stdout ne '' and $switchio;
262      $job_stderr = "2> $stderr-$job_name.stderr" if $stderr ne '' and $switchio;
263
264      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$ENV{OAR_JOB_ID}-$job_name";
265      my $job_pidfile  = "/tmp/oar-parexec-$ENV{LOGNAME}-$ENV{OAR_JOB_ID}-$job_name.pid";
266
267      $scheduled{$job_pid} = {
268         fh           => $fh,
269         node_connect => $node_connect,
270         ressource    => $job_ressource,
271         name         => $job_name,
272         pidfile      => $job_pidfile,
273         };
274
275      # set job environment, run it and clean
276      if ($job_np > 1) {
277         $fh->print("printf \""
278               . join('\n', split(',', $job_ressource,))
279               . "\" > $job_nodefile\n");
280         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
281         $fh->print("OAR_NP=$job_np\n");
282         $fh->print("export OAR_NODE_FILE\n");
283         $fh->print("export OAR_NP\n");
284         $fh->print("unset OAR_MSG_NODEFILE\n");
285         }
286
287      $fh->print("cd $current_dir\n");
288
289      if ($sig_transmit) {
290         $fh->print("trap 'jobs -p|xargs -r ps -o pid --no-headers --ppid|xargs -r kill -$sig_checkpoint' $sig_checkpoint\n");
291         $fh->print("echo \$\$ > $job_pidfile\n");
292         }
293
294      $fh->print("(\n");
295      $fh->print("$job_cmd\n");
296      $fh->print(") $job_stdout $job_stderr \&\n");
297      $fh->print("while [ \$(jobs -p | wc -l) -gt 0 ]\n");
298      $fh->print("do\n");
299      $fh->print("   wait\n");
300      $fh->print("done\n");
301
302      $fh->print("rm -f $job_pidfile\n")  if $sig_transmit;
303      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
304      $fh->print("exit\n");
305      cede;
306      }
307   }
308
309# asynchrone end job block
310async {
311   while () {
312      for my $job_pid (keys %scheduled) {
313         # non blocking PID test
314         if (waitpid($job_pid, WNOHANG)) {
315            my $msg = sprintf "end   job %${job_name_maxlen}s / %5i at %s oar job %i on node %s\n",
316               $scheduled{$job_pid}->{name},
317               $job_pid, time, $ENV{OAR_JOB_ID}, $scheduled{$job_pid}->{ressource};
318
319            # Job non finish, just suspend if received checkpoint signal
320            $msg =~ s/^end\s+job/suspend job/
321               if $sig_transmit and $oar_checkpoint->count() > 0;
322
323            $log_h->print($msg) if $logtrace;
324            print($msg) if $verbose;
325            close $scheduled{$job_pid}->{fh};
326            # leave ressources for another job
327            $ressources->put($scheduled{$job_pid}->{ressource});
328            $job_todo->down;
329            delete $scheduled{$job_pid};
330            }
331         cede;
332         }
333
334      # checkpointing ! just finishing running job and quit
335      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
336
337      $finished->send if $job_todo->count() == 0;
338      cede;
339      }
340   }
341
342cede;
343
344# all job have been done
345$finished->wait;
346
347# close log trace file
348$log_h->close() if $logtrace;
349
350__END__
351
352=head1 NAME
353
354oar-parexec - parallel execution of many small short or long job
355
356=head1 SYNOPSIS
357
358 oar-parexec --file filecommand \
359    [--logtrace tracefile] [--verbose] \
360    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
361    [--switchio] [--masterio basefileio] \
362    [--kill signal] [--transmit]
363
364 oar-parexec --dir foldertoiterate --cmd commandtolaunch \
365    [--logtrace tracefile] [--verbose] \
366    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
367    [--switchio] [--masterio basefileio] \
368    [--kill signal] [--transmit]
369
370 oar-parexec --help
371
372=head1 DESCRIPTION
373
374C<oar-parexec> can execute lot of small short or long job in parallel inside a cluster.
375Number of parallel job at one time cannot exceed the number of core define in the node file.
376C<oar-parexec> is easier to use inside an OAR job environment
377which define automatically these strategics parameters...
378However, it can be used outside OAR.
379
380Option C<--file> or C<--dir> and C<--cmd> are the only mandatory parameters.
381
382Small job will be launch in the same folder as the master job.
383Two environment variable are defined for each small job
384and only in case of parallel small job (option C<--jobnp> > 1).
385
386 OAR_NODE_FILE - file that list node for parallel computing
387 OAR_NP        - number of processor affected
388
389The file define by OAR_NODE_FILE is created  in /tmp
390on the node before launching the small job
391and this file will be delete after job complete.
392C<oar-parexec> is a simple script,
393OAR_NODE_FILE will not be deleted in case of crash of the master job.
394
395OAR define other variable that are equivalent to OAR_NODE_FILE:
396OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
397You can use in your script the OAR original file ressources
398by using these variable if you need it.
399
400When use with long job,
401activate option C<--tranmit> to send OAR checkpoint signal
402and suspend small job before the walltime cut!
403
404=head1 OPTIONS
405
406=over 12
407
408=item B<-f|--file filecommand>
409
410File name which content job list.
411For the JOB_NAME definition,
412the first valid job in the list will have the number 1 and so on...
413
414It's possible to fix the name inside a comment on the job line.
415For example:
416
417 $HOME/test/subjob1.sh # name=subjob1
418
419The key C<name> is case insensitive,
420the associated value cannot have a space...
421
422The command can be any shell command.
423It's possible to change folder,
424or launch an asynchrone job in parallel,
425but one command must block and not be launch in asynchrone (with & or coproc).
426Example :
427
428 cd ./test; ./subjob1.sh
429 cd ./test; nice -18 du -sk ./ & ./test/subjob1.sh
430
431Command C<du -sk ./> will be done in parallel on the same ressource...
432
433=item B<-d|--dir foldertoiterate>
434
435Command C<--cmd> will be launch in all sub-folder of this master folder.
436Files in this folder will be ignored.
437Sub-folder name which begin with F<.>
438or finish with F<.old>, F<.sav>, F<.bak>, F<.no> will either be ignored...
439
440The JOB_NAME is simply the Sub-folder name.
441
442=item B<-c|--cmd commandtolaunch>
443
444Command (and argument to it) that will be launch in all sub-folder
445parameter folfer C<--dir>.
446Like for option C<--file>, command can be any valid shell command
447but one must block.
448
449=item B<-l|--logtrace tracefile>
450
451File which log and trace running job.
452In case of running the same master command (after crash for example),
453only job that are not mark as done will be run again.
454Be careful, job mark as running (start but not finish) will be run again.
455Tracing is base on the JOB_NAME between multiple run.
456
457This option is very usefull in case of crash
458but also for checkpointing and idempotent OAR job.
459
460=item B<-v|--verbose>
461
462=item B<-j|--jobnp integer>
463
464Number of processor to allocated for each small job.
4651 by default.
466
467=item B<-n|--nodefile filenode>
468
469File name that list all the node where job could be launch.
470By defaut, it's define automatically by OAR via
471environment variable C<OAR_NODE_FILE>.
472
473For example, if you want to use 6 core on your cluster node,
474you need to put 6 times the hostname node in this file,
475one per line...
476It's a very common file in MPI process !
477
478=item B<-o|-oarsh command>
479
480Command use to launch a shell on a node.
481By default
482
483 oarsh -q -T
484
485Change it to C<ssh> if you are not using an OAR cluster...
486
487=item B<-s|--switchio>
488
489Each small job will have it's own output STDOUT and STDERR
490base on master OAR job with C<JOB_NAME> inside
491(or base on C<basefileio> if option C<masterio>).
492Example :
493
494 OAR.151524.stdout -> OAR.151524-JOB_NAME.stdout
495
496where 151524 here is the master C<OAR_JOB_ID>
497and C<JOB_NAME> is the small job name.
498
499=item B<-m|--masterio basefileio>
500
501The C<basefileio> will be use in place of environment variable
502C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
503(only use when option C<swithio> is activated).
504
505=item B<-k|--kill signal>
506
507Signal to listen and make a clean stop of the current C<oar-parexec> process.
508By default, use USR2 signal (see C<kill -l>> for a list of possible signal).
509
510=item B<-t|--transmit>
511
512Resend catch signal to sub-job when receiving it.
513By default, no signal is transmis to child process.
514
515It's only valuable if use for long sub-job than can
516in return make themselves a clean restart.
517
518
519=item B<-h|--help>
520
521=back
522
523
524=head1 EXAMPLE
525
526=head2 Simple list of sequential job
527
528Content for the job file command (option C<--file>) could have:
529
530 - empty line
531 - comment line begin with #
532 - valid shell command (can containt comment)
533
534Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
535
536 $HOME/test/subjob01.sh  # name=subjob01
537 $HOME/test/subjob02.sh  # name=subjob02
538 $HOME/test/subjob03.sh  # name=subjob03
539 $HOME/test/subjob04.sh  # name=subjob04
540 ...
541 $HOME/test/subjob38.sh  # name=subjob38
542 $HOME/test/subjob39.sh  # name=subjob39
543 $HOME/test/subjob40.sh  # name=subjob40
544
545These jobs could be launch by:
546
547 oarsub -n test -l /core=6,walltime=04:00:00 \
548   "oar-parexec -f ./subjob.list.txt"
549
550=head2 Folder job
551
552In a folder F<subjob.d>, create sub-folder with your data inside : F<test1>, <test2>...
553The same command will be executed in every sub-folder.
554C<oar-parexec> change the current directory to the sub-folder before launching it.
555
556A very simple job could be:
557
558 oarsub -n test -l /core=6,walltime=04:00:00 \
559   "oar-parexec -d ./subjob.d -c 'sleep 10; env'"
560
561The command C<env> will be excuted in all folder F<test1>, F<test2>... after a 10s pause.
562
563Sometime, it's simpler to use file list command,
564sometime, jobs by folder with the same command run is more relevant.
565
566=head2 Parallel job
567
568You need to put the number of core each small job need with option C<--jobnp>.
569If your job is build on OpenMP or MPI,
570you can use OAR_NP and OAR_NODE_FILE variables to configure them.
571On OAR cluster, you need to use C<oarsh> or a wrapper like C<oar-envsh>
572for connexion between node instead of C<ssh>.
573
574Example with parallel small job on 2 core:
575
576 oarsub -n test -l /core=6,walltime=04:00:00 \
577   "oar-parexec -j 2 -f ./subjob.list.txt"
578
579=head2 Tracing and master crash
580
581If the master node crash after hours of calculus, everything is lost ?
582No, with option C<--logtrace>,
583it's possible to remember older result
584and not re-run these job the second and next time.
585
586 oarsub -n test -l /core=6,walltime=04:00:00 \
587   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
588
589After a crash or an C<oardel> command,
590you can then re-run the same command that will end to execute the jobs in the list
591
592 oarsub -n test -l /core=6,walltime=04:00:00 \
593   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
594
595C<logtrace> file are just plain file.
596We use the extension '.log' because these files are automatically
597eliminate from our backup system!
598
599=head2 Checkpointing and Idempotent
600
601C<oar-parexec> is compatible with the OAR checkpointing.
602If you have 2000 small jobs that need 55h to be done on 6 cores,
603you can cut this in small parts.
604
605For this example, we suppose that each small job need about 10min...
606So, we send a checkpoint 12min before the end of the process
607to let C<oar-parexec> finish the jobs started.
608After being checkpointed, C<oar-parexec> do not start any new small job.
609
610 oarsub -t idempotent -n test \
611   -l /core=6,walltime=04:00:00 \
612   --checkpoint 720 \
613   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
614
615After 3h48min, the OAR job will begin to stop launching new small job.
616When all running small job are finished, it's exit.
617But as the OAR job is type C<idempotent>,
618OAR will re-submit it as long as all small job are not executed...
619
620This way, we let other users a chance to use the cluster!
621
622In this last exemple, we use moldable OAR job with idempotent
623to reserve many core for a small time or a few cores for a long time:
624
625 oarsub -t idempotent -n test \
626   -l /core=50,walltime=01:05:00 \
627   -l /core=6,walltime=04:00:00 \
628   --checkpoint 720 \
629   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
630
631=head2 Signal, recurse and long job
632
633By default, OAR use signal USR2 for checkpointing.
634It's possible to change this with option C<--kill>.
635
636When use with long small job, checkpointing could be too long...
637More than walltime!
638The option C<--transmit> could be use to checkpoint small job!
639These long small job will then stop cleanly and will be restarted next time.
640
641In the C<logtrace> file, small job will have the status suspend.
642They will be launch with the same command line at the next OAR run.
643
644Example: if you have 50 small jobs that each need 72h to be done on 1 cores,
645you can cut this in 24h parts.
646
647For this example, we suppose that each long job loop need about 20min...
648So, we send a checkpoint 30min before the end of the process
649to let C<oar-parexec> suspend the jobs started.
650After being checkpointed, C<oar-parexec> do not start any new small job.
651
652 oarsub -t idempotent -n test \
653   -l /core=6,walltime=24:00:00 \
654   --checkpoint 1800 \
655   --transmit \
656   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
657
658After 23h30min, the OAR job will begin to stop launching new small job.
659When all running small job are suspend, it's exit.
660But as the OAR job is type C<idempotent>,
661OAR will re-submit it as long as all small job are not finished...
662
663=head1 SEE ALSO
664
665oar-dispatch, mpilauncher,
666orsh, oar-envsh, ssh
667
668
669=head1 AUTHORS
670
671Written by Gabriel Moreau, Grenoble - France
672
673
674=head1 LICENSE AND COPYRIGHT
675
676GPL version 2 or later and Perl equivalent
677
678Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
679
Note: See TracBrowser for help on using the repository browser.