source: trunk/oarutils/oar-parexec @ 42

Last change on this file since 42 was 42, checked in by g7moreau, 9 years ago
  • Small polish
File size: 8.7 KB
Line 
1#!/usr/bin/perl
2#
3# 2011/11/27 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
16use Cwd qw( getcwd );
17
18my $file    = '';
19my $logfile = '';
20my $verbose;
21my $job_np = 1;
22my $nodefile = $ENV{OAR_NODE_FILE} || '';
23my $masterio;
24my $switchio;
25my $help;
26my $oarsh = 'oarsh -q -T';
27
28Getopt::Long::GetOptions(
29   'file=s'     => \$file,
30   'logfile=s'  => \$logfile,
31   'verbose'    => \$verbose,
32   'help'       => \$help,
33   'oarsh=s'    => \$oarsh,
34   'jobnp=i'    => \$job_np,
35   'nodefile=s' => \$nodefile,
36   'masterio=s' => \$masterio,
37   'switchio'   => \$switchio,
38   ) || pod2usage(-verbose => 0);
39pod2usage(-verbose => 2) if $help;
40pod2usage(-verbose => 2) if not -e $file;
41
42my %state;
43my $log_h = IO::File->new();
44if (-e $logfile) {
45   $log_h->open("< $logfile")
46      or die "can't read log file: $!";
47   while (<$log_h>) {
48      $state{$1} = 'start' if m/^start\s+job\s+(\d+)\s/;
49      $state{$1} = 'end'   if m/^end\s+job\s+(\d+)\s/;
50      }
51   $log_h->close();
52   }
53if ($logfile) {
54   $log_h->open(">> $logfile")
55      or die "can't append log file $logfile: $!";
56   $log_h->autoflush;
57   $log_h = unblock $log_h;
58   }
59
60my @job = ();
61open(JOB_LIST, '<', "$file") or die "can't open job file $file: $!";
62while (<JOB_LIST>) {
63   chomp;
64   next if m/^#/;
65   next if m/^\s*$/;
66   push @job, $_;
67   }
68close JOB_LIST;
69
70my @ressources = ();
71open(NODE_FILE, '<', "$nodefile")
72   or die "can't open $nodefile: $!";
73while (<NODE_FILE>) {
74   chomp;
75   next if m/^#/;
76   next if m/^\s*$/;
77   push @ressources, $_;
78   }
79close NODE_FILE;
80
81my $ressource_size = scalar(@ressources);
82die "not enought ressources jobnp $job_np > ressources $ressource_size"
83   if $job_np > $ressource_size;
84
85my $current_dir = getcwd();
86
87my $stderr = $ENV{OAR_STDERR} || '';
88$stderr =~ s/\.stderr$//;
89$stderr = $masterio if $masterio;
90my $stdout = $ENV{OAR_STDOUT} || '';
91$stdout =~ s/\.stdout$//;
92$stdout = $masterio if $masterio;
93
94my $finished = new Coro::Signal;
95my $job_todo = new Coro::Semaphore 0;
96$job_todo->up for (@job);
97
98my $ressources = new Coro::Channel;
99for my $slot (1 .. int($ressource_size / $job_np)) {
100   $ressources->put(
101      join(',',
102         @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ])
103         );
104   }
105
106my $job_num   = 0;
107my %scheduled = ();
108
109my $oar_checkpoint = new Coro::Semaphore 0;
110$SIG{USR2} = sub {
111   print "warning: receive checkpoint at "
112      . time
113      . ", no new job, just finishing running job\n"
114      if $verbose;
115   $oar_checkpoint->up();
116   };
117
118async {
119   for my $job (@job) {
120      $job_num++;
121
122      if (exists $state{$job_num}) {
123         if ($state{$job_num} eq 'start') {
124            print "warning: job $job_num was not finished, relaunching...\n"
125               if $verbose;
126            }
127         elsif ($state{$job_num} eq 'end') {
128            delete $state{$job_num};
129            $job_todo->down;
130            print "warning: job $job_num already done\n" if $verbose;
131            cede;
132            next;
133            }
134         }
135
136      my $job_ressource = $ressources->get;
137
138      last if $oar_checkpoint->count() > 0;
139
140      my ($node_connect) = split ',', $job_ressource;
141      my $fh = IO::File->new();
142      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
143         or die "don't start subjob: $!";
144
145      $fh->autoflush;
146      $fh = unblock $fh;
147
148      $scheduled{$job_pid} = {
149         fh           => $fh,
150         node_connect => $node_connect,
151         ressource    => $job_ressource,
152         num          => $job_num
153         };
154
155      my $msg = sprintf "start job %5i / %5i at %s on node %s\n",
156         $job_num, $job_pid, time, $job_ressource;
157      $log_h->print($msg) if $logfile;
158      print($msg) if $verbose;
159
160      my ($job_stdout, $job_stderr);
161      $job_stdout = ">  $stdout-$job_num.stdout" if $stdout ne '' and $switchio;
162      $job_stderr = "2> $stderr-$job_num.stderr" if $stderr ne '' and $switchio;
163
164      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$job_num";
165
166      if ($job_np > 1) {
167         $fh->print("printf \""
168               . join('\n', split(',', $job_ressource,))
169               . "\" > $job_nodefile\n");
170         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
171         $fh->print("OAR_NP=$job_np\n");
172         $fh->print("export OAR_NODE_FILE\n");
173         $fh->print("export OAR_NP\n");
174         $fh->print("unset OAR_MSG_NODEFILE\n");
175         }
176      $fh->print("cd $current_dir\n");
177      $fh->print("$job $job_stdout $job_stderr\n");
178      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
179      $fh->print("exit\n");
180      cede;
181      }
182   }
183
184async {
185   while () {
186      for my $job_pid (keys %scheduled) {
187         if (waitpid($job_pid, WNOHANG)) {
188            my $msg = sprintf "end   job %5i / %5i at %s on node %s\n",
189               $scheduled{$job_pid}->{num},
190               $job_pid, time, $scheduled{$job_pid}->{ressource};
191            $log_h->print($msg) if $logfile;
192            print($msg) if $verbose;
193            close $scheduled{$job_pid}->{fh};
194            $ressources->put($scheduled{$job_pid}->{ressource});
195            $job_todo->down;
196            delete $scheduled{$job_pid};
197            }
198         cede;
199         }
200
201      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
202
203      $finished->send if $job_todo->count() == 0;
204      cede;
205      }
206   }
207
208cede;
209
210$finished->wait;
211
212$log_h->close() if $logfile;
213
214__END__
215
216=head1 NAME
217
218oar-parexec - parallel execute lot of small job
219
220=head1 SYNOPSIS
221
222 oar-parexec --file filecommand [--verbose] [--jobnp integer] [--nodefile filenode] [--masterio basefileio] [--switchio] [--oarsh sssh]
223 oar-parexec --help
224
225=head1 DESCRIPTION
226
227C<oar-parexec> execute lot of small job.in parallel inside a cluster.
228Number of parallel job at one time cannot excede core number in the node file.
229C<oar-parexec> is easier to use inside an OAR job environment
230which define automatically theses strategics parameters...
231
232Option C<--file> is the only mandatory one.
233
234Small job will be launch in the same folder as the master job.
235Two environment variable are define for each small job
236and only in case of parallel small job (option C<--jobnp> > 1).
237
238 OAR_NODE_FILE - file that list node for parallel computing
239 OAR_NP        - number of processor affected
240
241The file define by OAR_NODE_FILE is created on the node before launching
242the small job in /tmp and will be delete after...
243C<oar-parexec> is a simple script,
244OAR_NODE_FILE will not be deleted in case of crash of the master job.
245
246OAR define other variable that are equivalent to OAR_NODE_FILE:
247OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
248You can use in your script the OAR original file ressources
249by using these variable if you need it.
250 
251
252=head1 OPTIONS
253
254=over 12
255
256=item B<-f|--file       filecommand>
257
258File name which content job list.
259
260=item B<-v|--verbose>
261
262=item B<-j|--jobnp integer>
263
264Number of processor to allocated for each small job.
2651 by default.
266
267=item B<-n|--nodefile filenode>
268
269File name that list all the node to launch job.
270By defaut, it's define automatically by OAR via
271environment variable C<OAR_NODE_FILE>.
272
273For example, if you want to use 6 core on your cluster node,
274you need to put 6 times the hostname node in this file,
275one per line...
276It's a very common file in MPI process !
277
278=item B<-m|--masterio basefileio>
279
280The C<basefileio> will be use in place of environment variable
281C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
282(only use when option C<swithio> is activated).
283
284=item B<-s|--switchio>
285
286Each small job will have it's own output STDOUT and STDERR
287base on master OAR job with C<JOB_NUM> inside
288(or base on C<basefileio> if option C<masterio>).
289Example :
290
291 OAR.151524.stdout -> OAR.151524-JOB_NUM.stdout
292
293where 151524 here is the master C<OAR_JOB_ID>
294and C<JOB_NUM> is the small job nnumber.
295
296=item B<-o|-oarsh command>
297
298Command use to launch a shell on a node.
299By default
300
301        oarsh -q -T
302
303=item B<-h|--help>
304
305=back
306
307
308=head1 EXAMPLE
309
310Content for the job file (option C<--file>) could have:
311
312 - empty line
313 - comment line begin with #
314 - valid shell command
315
316Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
317
318 $HOME/test/subjob1.sh
319 $HOME/test/subjob2.sh
320 $HOME/test/subjob3.sh
321 $HOME/test/subjob4.sh
322 ...
323 $HOME/test/subjob38.sh
324 $HOME/test/subjob39.sh
325 $HOME/test/subjob40.sh
326
327These jobs could be launch by
328
329 oarsub -n test -l /core=6,walltime=00:35:00 "oar-parexec -f ./subjob.list.txt"
330
331
332=head1 SEE ALSO
333
334oar-dispatch, mpilauncher
335
336
337=head1 AUTHORS
338
339Written by Gabriel Moreau, Grenoble - France
340
341
342=head1 LICENSE AND COPYRIGHT
343
344GPL version 2 or later and Perl equivalent
345
346Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
347
Note: See TracBrowser for help on using the repository browser.