source: trunk/oarutils/oar-parexec @ 119

Last change on this file since 119 was 119, checked in by g7moreau, 9 years ago
  • Better example
File size: 20.0 KB
Line 
1#!/usr/bin/perl
2#
3# 2011/11/27 Gabriel Moreau
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
16use Cwd qw( getcwd );
17
18my $file;
19my $dir;
20my $cmd;
21my $logtrace;
22my $verbose;
23my $job_np         = 1;
24my $nodefile       = $ENV{OAR_NODE_FILE} || '';
25my $masterio;
26my $switchio;
27my $help;
28my $oarsh          = 'oarsh -q -T';
29my $sig_transmit;
30my $sig_checkpoint = 'USR2';
31my $job_launch_brake = 1; # one second time brake
32
33Getopt::Long::GetOptions(
34   'file=s'     => \$file,
35   'dir=s'      => \$dir,
36   'cmd=s'      => \$cmd,
37   'logtrace=s' => \$logtrace,
38   'verbose'    => \$verbose,
39   'help'       => \$help,
40   'oarsh=s'    => \$oarsh,
41   'jobnp=i'    => \$job_np,
42   'nodefile=s' => \$nodefile,
43   'masterio=s' => \$masterio,
44   'switchio'   => \$switchio,
45   'transmit'   => \$sig_transmit,
46   'kill=s'     => \$sig_checkpoint,
47   ) || pod2usage(-verbose => 0);
48pod2usage(-verbose => 2) if $help;
49pod2usage(-verbose => 2) if not (
50 (-e "$file")
51 or (-d "$dir" and $cmd ne '')
52 );
53
54my $oar_version = `oarsub -V | awk '{print \$4}'`;
55chomp $oar_version;
56
57# re-run, keep trace of job already done
58my %state;
59my $log_h = IO::File->new();
60if (-e "$logtrace") {
61   $log_h->open("< $logtrace")
62      or die "error: can't read log file: $!";
63   while (<$log_h>) {
64      $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/;
65      $state{$1} = 'end'   if m/^end\s+job\s+([^\s]+)\s/;
66      }
67   $log_h->close();
68   }
69if ($logtrace) {
70   $log_h->open(">> $logtrace")
71      or die "error: can't append log file $logtrace: $!";
72   $log_h->autoflush;
73   $log_h = unblock $log_h;
74   }
75
76# job to run
77my @job = ();
78if (-e "$file") {
79   my $job_num = 0;
80   open(JOB_LIST, '<', "$file") or die "error: can't open job file $file: $!";
81   while (my $job_cmd = <JOB_LIST>) {
82      chomp $job_cmd;
83      next if $job_cmd =~ m/^#/;
84      next if $job_cmd =~ m/^\s*$/;
85      $job_num++;
86      my ($job_name) = $job_cmd =~ m/#.*?\bname=(\S+?)\b/i;
87      $job_name ||= $job_num;
88      push @job, {
89         name   => $job_name,
90         cmd    => "$job_cmd",
91         num    => $job_num,
92         };
93      }
94   close JOB_LIST;
95   }
96else {
97   my $job_num = 0;
98   opendir(DIR, $dir) or die "error: can't open folder $dir: $!";
99   while (my $item = readdir(DIR)) {
100      next if $item =~ m/^\./;
101      next if $item =~ m/:/;
102      next if $item =~ m/\.old$/;
103      next if $item =~ m/\.sav$/;
104      next if $item =~ m/\.bak$/;
105      next if $item =~ m/\.no$/;
106      next unless (-d "$dir/$item");
107      $job_num++;
108      push @job, {
109         name   => $item,
110         cmd    => "cd $dir/$item/; $cmd",
111         num    => $job_num,
112         };
113      }
114   closedir DIR;
115   }
116
117# assume unique job name
118{
119   my %seen = ();
120   my $count_unique_name = grep { ! $seen{ $_->{name} }++ } @job;
121   if ($count_unique_name != $#job) {
122      $_->{name} = $_->{num} for @job;
123      }
124   }
125
126# ressources available
127my @ressources = ();
128open(NODE_FILE, '<', "$nodefile")
129   or die "can't open $nodefile: $!";
130while (<NODE_FILE>) {
131   chomp;
132   next if m/^#/;
133   next if m/^\s*$/;
134   push @ressources, $_;
135   }
136close NODE_FILE;
137
138my $ressource_size = scalar(@ressources);
139die "error: not enought ressources jobnp $job_np > ressources $ressource_size"
140   if $job_np > $ressource_size;
141
142my $current_dir = getcwd();
143
144my $stderr = $ENV{OAR_STDERR} || '';
145$stderr =~ s/\.stderr$//;
146$stderr = $masterio if $masterio;
147my $stdout = $ENV{OAR_STDOUT} || '';
148$stdout =~ s/\.stdout$//;
149$stdout = $masterio if $masterio;
150
151my $finished = new Coro::Signal;
152my $job_todo = new Coro::Semaphore 0;
153my $job_name_maxlen;
154for (@job) {
155   $job_todo->up;
156   $job_name_maxlen = length($_->{name}) if length($_->{name}) > $job_name_maxlen;
157   }
158
159# slice of ressources for parallel job
160my $ressources = new Coro::Channel;
161for my $slot (1 .. int($ressource_size / $job_np)) {
162   $ressources->put(
163      join(',',
164         @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ])
165         );
166   }
167
168my %scheduled = ();
169
170# OAR checkpoint and default signal SIGUSR2
171my $oar_checkpoint = new Coro::Semaphore 0;
172my $notify         = new Coro::Signal;
173$SIG{$sig_checkpoint} = sub {
174   print "warning: receive checkpoint at "
175      . time
176      . ", no new job, just finishing running job\n"
177      if $verbose;
178   $oar_checkpoint->up();
179   $notify->send if $sig_transmit;
180   };
181
182# asynchrone notify job
183async {
184   while () {
185      $notify->wait;
186
187      for my $job_pid (keys %scheduled) {
188         my $job_name     = $scheduled{$job_pid}->{name};
189         my $job_pidfile  = $scheduled{$job_pid}->{pidfile};
190         my $node_connect = $scheduled{$job_pid}->{node_connect};
191
192         my $fh = IO::File->new();
193         $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
194            or die "error: can't notify subjob: $!";
195
196         $fh->autoflush;
197         $fh = unblock $fh;
198
199         $fh->print("kill -$sig_checkpoint \$(cat $job_pidfile)\n");
200         $fh->print("exit\n");
201
202         print "warning: transmit signal $sig_checkpoint"
203            . " to job $job_name on node $node_connect.\n"
204            if $verbose;
205
206         close $fh;
207         cede;
208         }
209      }
210   }
211
212# asynchrone start job block
213async {
214   my $timer;
215   JOB:
216   for my $job (@job) {
217      my $job_name   = $job->{name};
218      my $job_cmd    = $job->{cmd};
219
220      # job has been already run ?
221      if (exists $state{$job_name}) {
222         if ($state{$job_name} eq 'start') {
223            print "warning: job $job_name was not clearly finished, relaunching...\n"
224               if $verbose;
225            }
226         elsif ($state{$job_name} eq 'end') {
227            delete $state{$job_name}; # free memory
228            $job_todo->down;
229            print "warning: job $job_name already run\n" if $verbose;
230            cede;
231            next JOB;
232            }
233         }
234
235      # wait to not re-launch oarstat to fast
236      # equivalent to sleep $job_launch_brake
237      $timer = AE::now + $job_launch_brake;
238      while ( AE::now < $timer ) {
239         # force update of AE time
240         AE::now_update;
241         cede;
242         }
243
244      # take job ressource
245      my $job_ressource = $ressources->get;
246
247      # no more launch job when OAR checkpointing
248      last JOB if $oar_checkpoint->count() > 0;
249
250      my ($node_connect) = split ',', $job_ressource;
251      my $fh = IO::File->new();
252      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
253         or die "error: can't start subjob: $!";
254
255      $fh->autoflush;
256      $fh = unblock $fh;
257
258      my $msg = sprintf "start job %${job_name_maxlen}s / %5i at %s oar job %i on node %s\n",
259         $job_name, $job_pid, time, $ENV{OAR_JOB_ID}, $job_ressource;
260      $log_h->print($msg) if $logtrace;
261      print($msg) if $verbose;
262
263      my ($job_stdout, $job_stderr);
264      $job_stdout = ">  $stdout-$job_name.stdout" if $stdout ne '' and $switchio;
265      $job_stderr = "2> $stderr-$job_name.stderr" if $stderr ne '' and $switchio;
266
267      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$ENV{OAR_JOB_ID}-$job_name";
268      my $job_pidfile  = "/tmp/oar-parexec-$ENV{LOGNAME}-$ENV{OAR_JOB_ID}-$job_name.pid";
269
270      $scheduled{$job_pid} = {
271         fh           => $fh,
272         node_connect => $node_connect,
273         ressource    => $job_ressource,
274         name         => $job_name,
275         pidfile      => $job_pidfile,
276         };
277
278      # set job environment, run it and clean
279      if ($job_np > 1) {
280         $fh->print("printf \""
281               . join('\n', split(',', $job_ressource,))
282               . "\" > $job_nodefile\n");
283         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
284         $fh->print("OAR_NP=$job_np\n");
285         $fh->print("export OAR_NODE_FILE\n");
286         $fh->print("export OAR_NP\n");
287         $fh->print("unset OAR_MSG_NODEFILE\n");
288         }
289
290      $fh->print("cd $current_dir\n");
291
292      if ($sig_transmit) {
293         $fh->print("trap 'jobs -p|xargs -r ps -o pid --no-headers --ppid|xargs -r kill -$sig_checkpoint' $sig_checkpoint\n");
294         $fh->print("echo \$\$ > $job_pidfile\n");
295         }
296
297      $fh->print("(\n");
298      $fh->print("$job_cmd\n");
299      $fh->print(") $job_stdout $job_stderr \&\n");
300      $fh->print("while [ \$(jobs -p | wc -l) -gt 0 ]\n");
301      $fh->print("do\n");
302      $fh->print("   wait\n");
303      $fh->print("done\n");
304
305      $fh->print("rm -f $job_pidfile\n")  if $sig_transmit;
306      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
307      $fh->print("exit\n");
308      cede;
309      }
310   }
311
312# asynchrone end job block
313async {
314   while () {
315      for my $job_pid (keys %scheduled) {
316         # non blocking PID test
317         if (waitpid($job_pid, WNOHANG)) {
318            my $msg = sprintf "end   job %${job_name_maxlen}s / %5i at %s oar job %i on node %s\n",
319               $scheduled{$job_pid}->{name},
320               $job_pid, time, $ENV{OAR_JOB_ID}, $scheduled{$job_pid}->{ressource};
321
322            # Job non finish, just suspend if received checkpoint signal
323            $msg =~ s/^end\s+job/suspend job/
324               if $sig_transmit and $oar_checkpoint->count() > 0;
325
326            $log_h->print($msg) if $logtrace;
327            print($msg) if $verbose;
328            close $scheduled{$job_pid}->{fh};
329            # leave ressources for another job
330            $ressources->put($scheduled{$job_pid}->{ressource});
331            $job_todo->down;
332            delete $scheduled{$job_pid};
333            }
334         cede;
335         }
336
337      # checkpointing ! just finishing running job and quit
338      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
339
340      $finished->send if $job_todo->count() == 0;
341      cede;
342      }
343   }
344
345cede;
346
347# all job have been done
348$finished->wait;
349
350# close log trace file
351$log_h->close() if $logtrace;
352
353exit 99 if (($oar_checkpoint->count() > 0) and ($oar_version !~ m/^2\.4/));
354
355
356__END__
357
358=head1 NAME
359
360oar-parexec - parallel execution of many small short or long job
361
362=head1 SYNOPSIS
363
364 oar-parexec --file filecommand \
365    [--logtrace tracefile] [--verbose] \
366    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
367    [--switchio] [--masterio basefileio] \
368    [--kill signal] [--transmit]
369
370 oar-parexec --dir foldertoiterate --cmd commandtolaunch \
371    [--logtrace tracefile] [--verbose] \
372    [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \
373    [--switchio] [--masterio basefileio] \
374    [--kill signal] [--transmit]
375
376 oar-parexec --help
377
378=head1 DESCRIPTION
379
380C<oar-parexec> can execute lot of small short or long job in parallel inside a cluster.
381Number of parallel job at one time cannot exceed the number of core define in the node file.
382C<oar-parexec> is easier to use inside an OAR job environment
383which define automatically these strategics parameters...
384However, it can be used outside OAR.
385
386Option C<--file> or C<--dir> and C<--cmd> are the only mandatory parameters.
387
388Small job will be launch in the same folder as the master job.
389Two environment variable are defined for each small job
390and only in case of parallel small job (option C<--jobnp> > 1).
391
392 OAR_NODE_FILE - file that list node for parallel computing
393 OAR_NP        - number of processor affected
394
395The file define by OAR_NODE_FILE is created  in /tmp
396on the node before launching the small job
397and this file will be delete after job complete.
398C<oar-parexec> is a simple script,
399OAR_NODE_FILE will not be deleted in case of crash of the master job.
400
401OAR define other variable that are equivalent to OAR_NODE_FILE:
402OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
403You can use in your script the OAR original file ressources
404by using these variable if you need it.
405
406When use with long job,
407activate option C<--tranmit> to send OAR checkpoint signal
408and suspend small job before the walltime cut!
409
410=head1 OPTIONS
411
412=over 12
413
414=item B<-f|--file filecommand>
415
416File name which content job list.
417For the JOB_NAME definition,
418the first valid job in the list will have the number 1 and so on...
419
420It's possible to fix the name inside a comment on the job line.
421For example:
422
423 $HOME/test/subjob1.sh # name=subjob1
424
425The key C<name> is case insensitive,
426the associated value cannot have a space...
427
428The command can be any shell command.
429It's possible to change folder,
430or launch an asynchrone job in parallel,
431but one command must block and not be launch in asynchrone (with & or coproc).
432Example :
433
434 cd ./test; ./subjob1.sh
435 cd ./test; nice -18 du -sk ./ & ./subjob1.sh
436
437Commands C<du -sk ./>  and C<./subjob1.sh> will be done in parallel on the same ressource...
438It's better if C<du -sk ./> is faster than C<./subjob1.sh> !
439Do not abuse of that!
440
441=item B<-d|--dir foldertoiterate>
442
443Command C<--cmd> will be launch in all sub-folder of this master folder.
444Files in this folder will be ignored.
445Sub-folder name which begin with F<.>
446or finish with F<.old>, F<.sav>, F<.bak>, F<.no> will either be ignored...
447
448The JOB_NAME is simply the Sub-folder name.
449
450=item B<-c|--cmd commandtolaunch>
451
452Command (and argument to it) that will be launch in all sub-folder
453parameter folfer C<--dir>.
454Like for option C<--file>, command can be any valid shell command
455but one must block.
456
457=item B<-l|--logtrace tracefile>
458
459File which log and trace running job.
460In case of running the same master command (after crash for example),
461only job that are not mark as done will be run again.
462Be careful, job mark as running (start but not finish) will be run again.
463Tracing is base on the JOB_NAME between multiple run.
464
465This option is very usefull in case of crash
466but also for checkpointing and idempotent OAR job.
467
468=item B<-v|--verbose>
469
470=item B<-j|--jobnp integer>
471
472Number of processor to allocated for each small job.
4731 by default.
474
475=item B<-n|--nodefile filenode>
476
477File name that list all the node where job could be launch.
478By defaut, it's define automatically by OAR via
479environment variable C<OAR_NODE_FILE>.
480
481For example, if you want to use 6 core on your cluster node,
482you need to put 6 times the hostname node in this file,
483one per line...
484It's a very common file in MPI process !
485
486=item B<-o|-oarsh command>
487
488Command use to launch a shell on a node.
489By default
490
491 oarsh -q -T
492
493Change it to C<ssh> if you are not using an OAR cluster...
494
495=item B<-s|--switchio>
496
497Each small job will have it's own output STDOUT and STDERR
498base on master OAR job with C<JOB_NAME> inside
499(or base on C<basefileio> if option C<masterio>).
500Example :
501
502 OAR.151524.stdout -> OAR.151524-JOB_NAME.stdout
503
504where 151524 here is the master C<OAR_JOB_ID>
505and C<JOB_NAME> is the small job name.
506
507=item B<-m|--masterio basefileio>
508
509The C<basefileio> will be use in place of environment variable
510C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
511(only use when option C<switchio> is activated).
512
513=item B<-k|--kill signal>
514
515Signal to listen and make a clean stop of the current C<oar-parexec> process.
516By default, use USR2 signal (see C<kill -l> for a list of possible signal).
517
518=item B<-t|--transmit>
519
520Resend catch signal to sub-job when receiving it.
521By default, no signal is transmis to child process.
522
523It's only valuable if use for long sub-job than can
524in return make themselves a clean restart.
525
526
527=item B<-h|--help>
528
529=back
530
531
532=head1 EXAMPLE
533
534=head2 Simple list of sequential job
535
536Content for the job file command (option C<--file>) could have:
537
538 - empty line
539 - comment line begin with #
540 - valid shell command (can containt comment)
541
542Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
543
544 $HOME/test/subjob01.sh  # name=subjob01
545 $HOME/test/subjob02.sh  # name=subjob02
546 $HOME/test/subjob03.sh  # name=subjob03
547 $HOME/test/subjob04.sh  # name=subjob04
548 ...
549 $HOME/test/subjob38.sh  # name=subjob38
550 $HOME/test/subjob39.sh  # name=subjob39
551 $HOME/test/subjob40.sh  # name=subjob40
552
553These jobs could be launch by:
554
555 oarsub -n test -l /core=6,walltime=04:00:00 \
556   "oar-parexec -f ./subjob.list.txt"
557
558=head2 Folder job
559
560In a folder F<subjob.d>, create sub-folder with your data inside : F<test1>, <test2>...
561The same command will be executed in every sub-folder.
562C<oar-parexec> change the current directory to the sub-folder before launching it.
563
564A very simple job could be:
565
566 oarsub -n test -l /core=6,walltime=04:00:00 \
567   "oar-parexec -d ./subjob.d -c 'sleep 10; env'"
568
569The command C<env> will be excuted in all folder F<test1>, F<test2>... after a 10s pause.
570
571Sometime, it's simpler to use file list command,
572sometime, jobs by folder with the same command run is more relevant.
573
574=head2 Parallel job
575
576You need to put the number of core each small job need with option C<--jobnp>.
577If your job is build on OpenMP or MPI,
578you can use OAR_NP and OAR_NODE_FILE variables to configure them.
579On OAR cluster, you need to use C<oarsh> or a wrapper like C<oar-envsh>
580for connexion between node instead of C<ssh>.
581
582Example with parallel small job on 2 core:
583
584 oarsub -n test -l /core=6,walltime=04:00:00 \
585   "oar-parexec -j 2 -f ./subjob.list.txt"
586
587=head2 Tracing and master crash
588
589If the master node crash after hours of calculus, everything is lost ?
590No, with option C<--logtrace>,
591it's possible to remember older result
592and not re-run these job the second and next time.
593
594 oarsub -n test -l /core=6,walltime=04:00:00 \
595   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
596
597After a crash or an C<oardel> command,
598you can then re-run the same command that will end to execute the jobs in the list
599
600 oarsub -n test -l /core=6,walltime=04:00:00 \
601   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
602
603C<logtrace> file are just plain file.
604We use the extension '.log' because these files are automatically
605eliminate from our backup system!
606
607=head2 Checkpointing and Idempotent
608
609C<oar-parexec> is compatible with the OAR checkpointing.
610If you have 2000 small jobs that need 55h to be done on 6 cores,
611you can cut this in small parts.
612
613For this example, we suppose that each small job need about 10min...
614So, we send a checkpoint 12min before the end of the process
615to let C<oar-parexec> finish the jobs started.
616After being checkpointed, C<oar-parexec> do not start any new small job.
617
618 oarsub -t idempotent -n test \
619   -l /core=6,walltime=04:00:00 \
620   --checkpoint 720 \
621   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
622
623After 3h48min, the OAR job will begin to stop launching new small job.
624When all running small job are finished, it's exit.
625But as the OAR job is type C<idempotent>,
626OAR will re-submit it as long as all small job are not executed...
627
628This way, we let other users a chance to use the cluster!
629
630In this last exemple, we use moldable OAR job with idempotent
631to reserve many core for a small time or a few cores for a long time:
632
633 oarsub -t idempotent -n test \
634   -l /core=50,walltime=01:05:00 \
635   -l /core=6,walltime=04:00:00 \
636   --checkpoint 720 \
637   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
638
639=head2 Signal, recurse and long job
640
641By default, OAR use signal USR2 for checkpointing.
642It's possible to change this with option C<--kill>.
643
644When use with long small job, checkpointing could be too long...
645More than walltime!
646The option C<--transmit> could be use to checkpoint small job!
647These long small job will then stop cleanly and will be restarted next time.
648
649In the C<logtrace> file, small job will have the status suspend.
650They will be launch with the same command line at the next OAR run.
651
652Example: if you have 50 small jobs that each need 72h to be done on 1 cores,
653you can cut this in 24h parts.
654
655For this example, we suppose that each long job loop need about 20min...
656So, we send a checkpoint 30min before the end of the process
657to let C<oar-parexec> suspend the jobs started.
658After being checkpointed, C<oar-parexec> do not start any new small job.
659
660 oarsub -t idempotent -n test \
661   -l /core=6,walltime=24:00:00 \
662   --checkpoint 1800 \
663   --transmit \
664   "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log"
665
666After 23h30min, the OAR job will begin to stop launching new small job.
667When all running small job are suspend, it's exit.
668But as the OAR job is type C<idempotent>,
669OAR will re-submit it as long as all small job are not finished...
670
671=head1 SEE ALSO
672
673oar-dispatch, mpilauncher,
674orsh, oar-envsh, ssh
675
676
677=head1 AUTHORS
678
679Written by Gabriel Moreau, Grenoble - France
680
681
682=head1 LICENSE AND COPYRIGHT
683
684GPL version 2 or later and Perl equivalent
685
686Copyright (C) 2011-2015 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
687
Note: See TracBrowser for help on using the repository browser.