source: trunk/oarutils/oar-parexec @ 43

Last change on this file since 43 was 43, checked in by g7moreau, 12 years ago
  • --file -> --filecmd
  • --logfile -> --logtrace
  • Comment code
  • Begin man for option --logtrace
File size: 9.7 KB
Line 
1#!/usr/bin/perl
2#
3# 2011/11/27 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
16use Cwd qw( getcwd );
17
18my $filecmd  = '';
19my $logtrace = '';
20my $verbose;
21my $job_np = 1;
22my $nodefile = $ENV{OAR_NODE_FILE} || '';
23my $masterio;
24my $switchio;
25my $help;
26my $oarsh = 'oarsh -q -T';
27
28Getopt::Long::GetOptions(
29   'filecmd=s'  => \$filecmd,
30   'logtrace=s' => \$logtrace,
31   'verbose'    => \$verbose,
32   'help'       => \$help,
33   'oarsh=s'    => \$oarsh,
34   'jobnp=i'    => \$job_np,
35   'nodefile=s' => \$nodefile,
36   'masterio=s' => \$masterio,
37   'switchio'   => \$switchio,
38   ) || pod2usage(-verbose => 0);
39pod2usage(-verbose => 2) if $help;
40pod2usage(-verbose => 2) if not -e $filecmd;
41
42# re-run, keep trace of job already done
43my %state;
44my $log_h = IO::File->new();
45if (-e $logtrace) {
46   $log_h->open("< $logtrace")
47      or die "error: can't read log file: $!";
48   while (<$log_h>) {
49      $state{$1} = 'start' if m/^start\s+job\s+(\d+)\s/;
50      $state{$1} = 'end'   if m/^end\s+job\s+(\d+)\s/;
51      }
52   $log_h->close();
53   }
54if ($logtrace) {
55   $log_h->open(">> $logtrace")
56      or die "error: can't append log file $logtrace: $!";
57   $log_h->autoflush;
58   $log_h = unblock $log_h;
59   }
60
61# job to run
62my @job = ();
63open(JOB_LIST, '<', "$filecmd") or die "error: can't open job file $filecmd: $!";
64while (<JOB_LIST>) {
65   chomp;
66   next if m/^#/;
67   next if m/^\s*$/;
68   push @job, $_;
69   }
70close JOB_LIST;
71
72# ressources available
73my @ressources = ();
74open(NODE_FILE, '<', "$nodefile")
75   or die "can't open $nodefile: $!";
76while (<NODE_FILE>) {
77   chomp;
78   next if m/^#/;
79   next if m/^\s*$/;
80   push @ressources, $_;
81   }
82close NODE_FILE;
83
84my $ressource_size = scalar(@ressources);
85die "error: not enought ressources jobnp $job_np > ressources $ressource_size"
86   if $job_np > $ressource_size;
87
88my $current_dir = getcwd();
89
90my $stderr = $ENV{OAR_STDERR} || '';
91$stderr =~ s/\.stderr$//;
92$stderr = $masterio if $masterio;
93my $stdout = $ENV{OAR_STDOUT} || '';
94$stdout =~ s/\.stdout$//;
95$stdout = $masterio if $masterio;
96
97my $finished = new Coro::Signal;
98my $job_todo = new Coro::Semaphore 0;
99$job_todo->up for (@job);
100
101# slice of ressources for parallel job
102my $ressources = new Coro::Channel;
103for my $slot (1 .. int($ressource_size / $job_np)) {
104   $ressources->put(
105      join(',',
106         @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ])
107         );
108   }
109
110my $job_num   = 0;
111my %scheduled = ();
112
113# OAR checkpoint and default signal SIGUSR2
114my $oar_checkpoint = new Coro::Semaphore 0;
115$SIG{USR2} = sub {
116   print "warning: receive checkpoint at "
117      . time
118      . ", no new job, just finishing running job\n"
119      if $verbose;
120   $oar_checkpoint->up();
121   };
122
123# asynchrone start job block
124async {
125        JOB:
126   for my $job (@job) {
127      $job_num++;
128
129      # job has been already run ?
130      if (exists $state{$job_num}) {
131         if ($state{$job_num} eq 'start') {
132            print "warning: job $job_num was not clearly finished, relaunching...\n"
133               if $verbose;
134            }
135         elsif ($state{$job_num} eq 'end') {
136            delete $state{$job_num}; # free memory
137            $job_todo->down;
138            print "warning: job $job_num already run\n" if $verbose;
139            cede;
140            next JOB;
141            }
142         }
143
144      # take job ressource
145      my $job_ressource = $ressources->get;
146
147      # no more launch job when OAR checkpointing
148      last JOB if $oar_checkpoint->count() > 0;
149
150      my ($node_connect) = split ',', $job_ressource;
151      my $fh = IO::File->new();
152      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
153         or die "error: can't start subjob: $!";
154
155      $fh->autoflush;
156      $fh = unblock $fh;
157
158      $scheduled{$job_pid} = {
159         fh           => $fh,
160         node_connect => $node_connect,
161         ressource    => $job_ressource,
162         num          => $job_num
163         };
164
165      my $msg = sprintf "start job %5i / %5i at %s on node %s\n",
166         $job_num, $job_pid, time, $job_ressource;
167      $log_h->print($msg) if $logtrace;
168      print($msg) if $verbose;
169
170      my ($job_stdout, $job_stderr);
171      $job_stdout = ">  $stdout-$job_num.stdout" if $stdout ne '' and $switchio;
172      $job_stderr = "2> $stderr-$job_num.stderr" if $stderr ne '' and $switchio;
173
174      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$job_num";
175
176     # set job environment, run it and clean
177      if ($job_np > 1) {
178         $fh->print("printf \""
179               . join('\n', split(',', $job_ressource,))
180               . "\" > $job_nodefile\n");
181         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
182         $fh->print("OAR_NP=$job_np\n");
183         $fh->print("export OAR_NODE_FILE\n");
184         $fh->print("export OAR_NP\n");
185         $fh->print("unset OAR_MSG_NODEFILE\n");
186         }
187      $fh->print("cd $current_dir\n");
188      $fh->print("$job $job_stdout $job_stderr\n");
189      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
190      $fh->print("exit\n");
191      cede;
192      }
193   }
194
195# asynchrone end job block
196async {
197   while () {
198      for my $job_pid (keys %scheduled) {
199                        # non blocking PID test
200         if (waitpid($job_pid, WNOHANG)) {
201            my $msg = sprintf "end   job %5i / %5i at %s on node %s\n",
202               $scheduled{$job_pid}->{num},
203               $job_pid, time, $scheduled{$job_pid}->{ressource};
204            $log_h->print($msg) if $logtrace;
205            print($msg) if $verbose;
206            close $scheduled{$job_pid}->{fh};
207            # leave ressources for another job
208            $ressources->put($scheduled{$job_pid}->{ressource});
209            $job_todo->down;
210            delete $scheduled{$job_pid};
211            }
212         cede;
213         }
214
215      # checkpointing ! just finishing running job and quit
216      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
217
218      $finished->send if $job_todo->count() == 0;
219      cede;
220      }
221   }
222
223cede;
224
225# all job have been done
226$finished->wait;
227
228# close log trace file
229$log_h->close() if $logtrace;
230
231__END__
232
233=head1 NAME
234
235oar-parexec - parallel execute lot of small job
236
237=head1 SYNOPSIS
238
239 oar-parexec --filecmd filecommand [--logtrace tracefile] [--verbose] [--jobnp integer] [--nodefile filenode] [--masterio basefileio] [--switchio] [--oarsh sssh]
240 oar-parexec --help
241
242=head1 DESCRIPTION
243
244C<oar-parexec> execute lot of small job.in parallel inside a cluster.
245Number of parallel job at one time cannot excede core number in the node file.
246C<oar-parexec> is easier to use inside an OAR job environment
247which define automatically theses strategics parameters...
248
249Option C<--filecmd> is the only mandatory one.
250
251Small job will be launch in the same folder as the master job.
252Two environment variable are define for each small job
253and only in case of parallel small job (option C<--jobnp> > 1).
254
255 OAR_NODE_FILE - file that list node for parallel computing
256 OAR_NP        - number of processor affected
257
258The file define by OAR_NODE_FILE is created on the node before launching
259the small job in /tmp and will be delete after...
260C<oar-parexec> is a simple script,
261OAR_NODE_FILE will not be deleted in case of crash of the master job.
262
263OAR define other variable that are equivalent to OAR_NODE_FILE:
264OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
265You can use in your script the OAR original file ressources
266by using these variable if you need it.
267 
268
269=head1 OPTIONS
270
271=over 12
272
273=item B<-f|--filecmd    filecommand>
274
275File name which content job list.
276
277=item B<-l|--logtrace tracefile>
278
279File which log and trace running job.
280In case of running the same command (after crash for example),
281only job that ar not mark as done will be run again.
282Be carefful, job mark as running (start but for finish) will be run again.
283
284This option is very usefull in case of crash
285but also for checkpointing and idempotent OAR job.
286
287=item B<-v|--verbose>
288
289=item B<-j|--jobnp integer>
290
291Number of processor to allocated for each small job.
2921 by default.
293
294=item B<-n|--nodefile filenode>
295
296File name that list all the node to launch job.
297By defaut, it's define automatically by OAR via
298environment variable C<OAR_NODE_FILE>.
299
300For example, if you want to use 6 core on your cluster node,
301you need to put 6 times the hostname node in this file,
302one per line...
303It's a very common file in MPI process !
304
305=item B<-m|--masterio basefileio>
306
307The C<basefileio> will be use in place of environment variable
308C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
309(only use when option C<swithio> is activated).
310
311=item B<-s|--switchio>
312
313Each small job will have it's own output STDOUT and STDERR
314base on master OAR job with C<JOB_NUM> inside
315(or base on C<basefileio> if option C<masterio>).
316Example :
317
318 OAR.151524.stdout -> OAR.151524-JOB_NUM.stdout
319
320where 151524 here is the master C<OAR_JOB_ID>
321and C<JOB_NUM> is the small job nnumber.
322
323=item B<-o|-oarsh command>
324
325Command use to launch a shell on a node.
326By default
327
328        oarsh -q -T
329
330=item B<-h|--help>
331
332=back
333
334
335=head1 EXAMPLE
336
337Content for the job file command (option C<--filecmd>) could have:
338
339 - empty line
340 - comment line begin with #
341 - valid shell command
342
343Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
344
345 $HOME/test/subjob1.sh
346 $HOME/test/subjob2.sh
347 $HOME/test/subjob3.sh
348 $HOME/test/subjob4.sh
349 ...
350 $HOME/test/subjob38.sh
351 $HOME/test/subjob39.sh
352 $HOME/test/subjob40.sh
353
354These jobs could be launch by
355
356 oarsub -n test -l /core=6,walltime=00:35:00 "oar-parexec -f ./subjob.list.txt"
357
358
359=head1 SEE ALSO
360
361oar-dispatch, mpilauncher
362
363
364=head1 AUTHORS
365
366Written by Gabriel Moreau, Grenoble - France
367
368
369=head1 LICENSE AND COPYRIGHT
370
371GPL version 2 or later and Perl equivalent
372
373Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
374
Note: See TracBrowser for help on using the repository browser.