source: trunk/oarutils/oar-parexec @ 41

Last change on this file since 41 was 41, checked in by g7moreau, 9 years ago
  • Reformat oar-parexec with perltidy
File size: 8.6 KB
Line 
1#!/usr/bin/perl
2#
3# 2011/11/27 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
16use Cwd qw( getcwd );
17
18my $file    = '';
19my $logfile = '';
20my $verbose;
21my $job_np = 1;
22my $nodefile = $ENV{OAR_NODE_FILE} || '';
23my $masterio;
24my $switchio;
25my $help;
26my $oarsh = 'oarsh -q -T';
27
28Getopt::Long::GetOptions(
29   'file=s'     => \$file,
30   'logfile=s'  => \$logfile,
31   'verbose'    => \$verbose,
32   'help'       => \$help,
33   'oarsh=s'    => \$oarsh,
34   'jobnp=i'    => \$job_np,
35   'nodefile=s' => \$nodefile,
36   'masterio=s' => \$masterio,
37   'switchio'   => \$switchio,
38   ) || pod2usage(-verbose => 0);
39pod2usage(-verbose => 2) if $help;
40pod2usage(-verbose => 2) if not -e $file;
41
42my %state;
43my $log_h = IO::File->new();
44if (-e $logfile) {
45   $log_h->open("< $logfile")
46      or die "can't read log file: $!";
47   while (<$log_h>) {
48      $state{$1} = 'start' if m/^start\s+job\s+(\d+)\s/;
49      $state{$1} = 'end'   if m/^end\s+job\s+(\d+)\s/;
50      }
51   $log_h->close();
52   }
53if ($logfile) {
54   $log_h->open(">> $logfile")
55      or die "can't append log file $logfile: $!";
56   $log_h->autoflush;
57   $log_h = unblock $log_h;
58   }
59
60my @job = ();
61open(JOB_LIST, '<', "$file") or die "can't open job file $file: $!";
62while (<JOB_LIST>) {
63   chomp;
64   next if m/^#/;
65   next if m/^\s*$/;
66   push @job, $_;
67   }
68close JOB_LIST;
69
70my @ressources = ();
71open(NODE_FILE, '<', "$nodefile")
72   or die "can't open $nodefile: $!";
73while (<NODE_FILE>) {
74   chomp;
75   next if m/^#/;
76   next if m/^\s*$/;
77   push @ressources, $_;
78   }
79close NODE_FILE;
80
81my $ressource_size = scalar(@ressources);
82die "not enought ressources jobnp $job_np > ressources $ressource_size"
83   if $job_np > $ressource_size;
84
85my $current_dir = getcwd();
86
87my $stderr = $ENV{OAR_STDERR} || '';
88$stderr =~ s/\.stderr$//;
89$stderr = $masterio if $masterio;
90my $stdout = $ENV{OAR_STDOUT} || '';
91$stdout =~ s/\.stdout$//;
92$stdout = $masterio if $masterio;
93
94my $finished = new Coro::Signal;
95my $job_todo = new Coro::Semaphore 0;
96$job_todo->up for (@job);
97
98my $ressources = new Coro::Channel;
99for my $slot (1 .. int($ressource_size / $job_np)) {
100   $ressources->put(
101      join(',',
102         @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ])
103         );
104   }
105
106my $job_num   = 0;
107my %scheduled = ();
108
109my $oar_checkpoint = new Coro::Semaphore 0;
110$SIG{USR2} = sub {$oar_checkpoint->up};
111
112async {
113   for my $job (@job) {
114      $job_num++;
115
116      if (exists $state{$job_num}) {
117         if ($state{$job_num} eq 'start') {
118            print "warning: job $job_num was not finished, relaunching...\n"
119               if $verbose;
120            }
121         elsif ($state{$job_num} eq 'end') {
122            delete $state{$job_num};
123            $job_todo->down;
124            print "warning: job $job_num already done\n" if $verbose;
125            cede;
126            next;
127            }
128         }
129
130      my $job_ressource = $ressources->get;
131
132      last if $oar_checkpoint->count() > 0;
133
134      my ($node_connect) = split ',', $job_ressource;
135      my $fh = IO::File->new();
136      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
137         or die "don't start subjob: $!";
138
139      $fh->autoflush;
140      $fh = unblock $fh;
141
142      $scheduled{$job_pid} = {
143         fh           => $fh,
144         node_connect => $node_connect,
145         ressource    => $job_ressource,
146         num          => $job_num
147         };
148
149      $log_h->printf("start job %5i at %s\n", $job_num, time) if $logfile;
150      printf "start job %5i / %5i at %s on node %s\n",
151         $job_num, $job_pid, time, $job_ressource
152         if $verbose;
153
154      my ($job_stdout, $job_stderr);
155      $job_stdout = ">  $stdout-$job_num.stdout" if $stdout ne '' and $switchio;
156      $job_stderr = "2> $stderr-$job_num.stderr" if $stderr ne '' and $switchio;
157
158      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$job_num";
159
160      if ($job_np > 1) {
161         $fh->print("printf \""
162               . join('\n', split(',', $job_ressource,))
163               . "\" > $job_nodefile\n");
164         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
165         $fh->print("OAR_NP=$job_np\n");
166         $fh->print("export OAR_NODE_FILE\n");
167         $fh->print("export OAR_NP\n");
168         $fh->print("unset OAR_MSG_NODEFILE\n");
169         }
170      $fh->print("cd $current_dir\n");
171      $fh->print("$job $job_stdout $job_stderr\n");
172      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
173      $fh->print("exit\n");
174      cede;
175      }
176   }
177
178async {
179   while () {
180      for my $job_pid (keys %scheduled) {
181         if (waitpid($job_pid, WNOHANG)) {
182            $log_h->printf("end   job %5i at %s\n",
183               $scheduled{$job_pid}->{num}, time)
184               if $logfile;
185            printf "end   job %5i / %5i at %s on node %s\n",
186               $scheduled{$job_pid}->{num},
187               $job_pid, time, $scheduled{$job_pid}->{ressource}
188               if $verbose;
189            close $scheduled{$job_pid}->{fh};
190            $ressources->put($scheduled{$job_pid}->{ressource});
191            $job_todo->down;
192            delete $scheduled{$job_pid};
193            }
194         cede;
195         }
196
197      $finished->send if $oar_checkpoint->count > 0 and keys(%scheduled) == 0;
198
199      $finished->send if $job_todo->count == 0;
200      cede;
201      }
202   }
203
204cede;
205
206$finished->wait;
207
208$log_h->close() if $logfile;
209
210__END__
211
212=head1 NAME
213
214oar-parexec - parallel execute lot of small job
215
216=head1 SYNOPSIS
217
218 oar-parexec --file filecommand [--verbose] [--jobnp integer] [--nodefile filenode] [--masterio basefileio] [--switchio] [--oarsh sssh]
219 oar-parexec --help
220
221=head1 DESCRIPTION
222
223C<oar-parexec> execute lot of small job.in parallel inside a cluster.
224Number of parallel job at one time cannot excede core number in the node file.
225C<oar-parexec> is easier to use inside an OAR job environment
226which define automatically theses strategics parameters...
227
228Option C<--file> is the only mandatory one.
229
230Small job will be launch in the same folder as the master job.
231Two environment variable are define for each small job
232and only in case of parallel small job (option C<--jobnp> > 1).
233
234 OAR_NODE_FILE - file that list node for parallel computing
235 OAR_NP        - number of processor affected
236
237The file define by OAR_NODE_FILE is created on the node before launching
238the small job in /tmp and will be delete after...
239C<oar-parexec> is a simple script,
240OAR_NODE_FILE will not be deleted in case of crash of the master job.
241
242OAR define other variable that are equivalent to OAR_NODE_FILE:
243OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
244You can use in your script the OAR original file ressources
245by using these variable if you need it.
246 
247
248=head1 OPTIONS
249
250=over 12
251
252=item B<-f|--file       filecommand>
253
254File name which content job list.
255
256=item B<-v|--verbose>
257
258=item B<-j|--jobnp integer>
259
260Number of processor to allocated for each small job.
2611 by default.
262
263=item B<-n|--nodefile filenode>
264
265File name that list all the node to launch job.
266By defaut, it's define automatically by OAR via
267environment variable C<OAR_NODE_FILE>.
268
269For example, if you want to use 6 core on your cluster node,
270you need to put 6 times the hostname node in this file,
271one per line...
272It's a very common file in MPI process !
273
274=item B<-m|--masterio basefileio>
275
276The C<basefileio> will be use in place of environment variable
277C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
278(only use when option C<swithio> is activated).
279
280=item B<-s|--switchio>
281
282Each small job will have it's own output STDOUT and STDERR
283base on master OAR job with C<JOB_NUM> inside
284(or base on C<basefileio> if option C<masterio>).
285Example :
286
287 OAR.151524.stdout -> OAR.151524-JOB_NUM.stdout
288
289where 151524 here is the master C<OAR_JOB_ID>
290and C<JOB_NUM> is the small job nnumber.
291
292=item B<-o|-oarsh command>
293
294Command use to launch a shell on a node.
295By default
296
297        oarsh -q -T
298
299=item B<-h|--help>
300
301=back
302
303
304=head1 EXAMPLE
305
306Content for the job file (option C<--file>) could have:
307
308 - empty line
309 - comment line begin with #
310 - valid shell command
311
312Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
313
314 $HOME/test/subjob1.sh
315 $HOME/test/subjob2.sh
316 $HOME/test/subjob3.sh
317 $HOME/test/subjob4.sh
318 ...
319 $HOME/test/subjob38.sh
320 $HOME/test/subjob39.sh
321 $HOME/test/subjob40.sh
322
323These jobs could be launch by
324
325 oarsub -n test -l /core=6,walltime=00:35:00 "oar-parexec -f ./subjob.list.txt"
326
327
328=head1 SEE ALSO
329
330oar-dispatch, mpilauncher
331
332
333=head1 AUTHORS
334
335Written by Gabriel Moreau, Grenoble - France
336
337
338=head1 LICENSE AND COPYRIGHT
339
340GPL version 2 or later and Perl equivalent
341
342Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
343
Note: See TracBrowser for help on using the repository browser.