source: trunk/oarutils/oar-parexec @ 40

Last change on this file since 40 was 40, checked in by g7moreau, 12 years ago
  • Small bug in logfile implementation
File size: 8.4 KB
Line 
1#!/usr/bin/perl
2#
3# 2011/11/27 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
16use Cwd qw( getcwd );
17
18my $file = '';
19my $logfile = '';
20my $verbose;
21my $job_np = 1;
22my $nodefile = $ENV{OAR_NODE_FILE} || '';
23my $masterio;
24my $switchio;
25my $help;
26my $oarsh = 'oarsh -q -T';
27
28Getopt::Long::GetOptions(
29   'file=s'     => \$file,
30   'logfile=s'  => \$logfile,
31   'verbose'    => \$verbose,
32   'help'       => \$help,
33   'oarsh=s'    => \$oarsh,
34   'jobnp=i'    => \$job_np,
35   'nodefile=s' => \$nodefile,
36   'masterio=s' => \$masterio,
37   'switchio'   => \$switchio,
38   ) || pod2usage( -verbose => 0 );
39pod2usage( -verbose => 2 ) if $help;
40pod2usage( -verbose => 2 ) if not -e $file;
41
42my %state;
43my $log_h = IO::File->new();
44if (-e $logfile) {
45   $log_h->open("< $logfile")
46      or die "can't read log file: $!";
47   while (<$log_h>) {
48                $state{$1} = 'start' if m/^start\s+job\s+(\d+)\s/;
49                $state{$1} = 'end'   if m/^end\s+job\s+(\d+)\s/;
50           }
51   $log_h->close();
52   }
53if ($logfile) {
54   $log_h->open(">> $logfile")
55         or die "can't append log file $logfile: $!";
56   $log_h->autoflush;
57   $log_h = unblock $log_h;
58   }
59
60my @job = ();
61open( JOB_LIST, '<', "$file" ) or die "can't open job file $file: $!";
62while (<JOB_LIST>) {
63   chomp;
64   next if m/^#/;
65   next if m/^\s*$/;
66   push @job, $_ ;
67   }
68close JOB_LIST;
69
70my @ressources = ();
71open( NODE_FILE, '<', "$nodefile" )
72   or die "can't open $nodefile: $!";
73while (<NODE_FILE>) {
74   chomp;
75   next if m/^#/;
76   next if m/^\s*$/;
77   push @ressources, $_ ;
78   }
79close NODE_FILE;
80
81my $ressource_size = scalar(@ressources);
82die "not enought ressources jobnp $job_np > ressources $ressource_size" if $job_np > $ressource_size;
83
84my $current_dir = getcwd();
85
86my $stderr = $ENV{OAR_STDERR} || '';
87$stderr =~ s/\.stderr$//;
88$stderr = $masterio if $masterio;
89my $stdout = $ENV{OAR_STDOUT} || '';
90$stdout =~ s/\.stdout$//;
91$stdout = $masterio if $masterio;
92
93
94my $finished = new Coro::Signal;
95my $job_todo = new Coro::Semaphore 0;
96$job_todo->up for (@job);
97
98my $ressources = new Coro::Channel;
99for my $slot (1 .. int($ressource_size / $job_np)) {
100   $ressources->put( join(',', @ressources[(($slot - 1) * $job_np) .. (($slot * $job_np) - 1)] ) );
101   }
102
103
104my $job_num   = 0;
105my %scheduled = ();
106
107my $oar_checkpoint = new Coro::Semaphore 0;
108$SIG{USR2} = sub { $oar_checkpoint->up };
109
110async {
111   for my $job (@job) {
112      $job_num++;
113
114      if (exists $state{$job_num}) {
115         if ($state{$job_num} eq 'start') {
116            print "warning: job $job_num was not finished, relaunching...\n" if $verbose;
117            }
118         elsif ($state{$job_num} eq 'end') {
119            delete $state{$job_num};
120            $job_todo->down;
121            print "warning: job $job_num already done\n" if $verbose;
122            cede;
123            next;
124            }
125         }             
126
127      my $job_ressource = $ressources->get;
128
129      last if $oar_checkpoint->count() > 0;
130
131      my ($node_connect) = split ',', $job_ressource;
132      my $fh      = IO::File->new();
133      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
134         or die "don't start subjob: $!";
135
136      $fh->autoflush;
137      $fh = unblock $fh;
138
139      $scheduled{$job_pid} = { fh => $fh, node_connect => $node_connect, ressource => $job_ressource, num => $job_num };
140
141      $log_h->printf("start job %5i at %s\n", $job_num, time) if $logfile;
142      printf "start job %5i / %5i at %s on node %s\n",
143         $job_num, $job_pid, time, $job_ressource
144         if $verbose;
145
146      my ( $job_stdout, $job_stderr );
147      $job_stdout = ">  $stdout-$job_num.stdout" if $stdout ne '' and $switchio;
148      $job_stderr = "2> $stderr-$job_num.stderr" if $stderr ne '' and $switchio;
149
150      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$job_num";
151
152      if ($job_np > 1) {
153         $fh->print("printf \""
154            . join('\n',split(',',$job_ressource,))
155            . "\" > $job_nodefile\n");
156         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
157         $fh->print("OAR_NP=$job_np\n");
158         $fh->print("export OAR_NODE_FILE\n");
159         $fh->print("export OAR_NP\n");
160         $fh->print("unset OAR_MSG_NODEFILE\n");
161         }
162      $fh->print("cd $current_dir\n");
163      $fh->print("$job $job_stdout $job_stderr\n");
164      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
165      $fh->print("exit\n");
166      cede;
167      }
168   }
169
170async {
171   while () {
172      for my $job_pid ( keys %scheduled ) {
173         if ( waitpid( $job_pid, WNOHANG ) ) {
174            $log_h->printf("end   job %5i at %s\n",
175               $scheduled{$job_pid}->{num},
176               time) if $logfile;
177            printf "end   job %5i / %5i at %s on node %s\n",
178               $scheduled{$job_pid}->{num},
179               $job_pid, time,
180               $scheduled{$job_pid}->{ressource}
181               if $verbose;
182            close $scheduled{$job_pid}->{fh};
183            $ressources->put( $scheduled{$job_pid}->{ressource} );
184            $job_todo->down;
185            delete $scheduled{$job_pid};
186            }
187         cede;
188         }
189
190      $finished->send if $oar_checkpoint->count > 0 and keys(%scheduled) == 0;
191
192      $finished->send if $job_todo->count == 0;
193      cede;
194      }
195   }
196
197cede;
198
199$finished->wait;
200
201$log_h->close() if $logfile;
202
203
204__END__
205
206=head1 NAME
207
208oar-parexec - parallel execute lot of small job
209
210=head1 SYNOPSIS
211
212 oar-parexec --file filecommand [--verbose] [--jobnp integer] [--nodefile filenode] [--masterio basefileio] [--switchio] [--oarsh sssh]
213 oar-parexec --help
214
215=head1 DESCRIPTION
216
217C<oar-parexec> execute lot of small job.in parallel inside a cluster.
218Number of parallel job at one time cannot excede core number in the node file.
219C<oar-parexec> is easier to use inside an OAR job environment
220which define automatically theses strategics parameters...
221
222Option C<--file> is the only mandatory one.
223
224Small job will be launch in the same folder as the master job.
225Two environment variable are define for each small job
226and only in case of parallel small job (option C<--jobnp> > 1).
227
228 OAR_NODE_FILE - file that list node for parallel computing
229 OAR_NP        - number of processor affected
230
231The file define by OAR_NODE_FILE is created on the node before launching
232the small job in /tmp and will be delete after...
233C<oar-parexec> is a simple script,
234OAR_NODE_FILE will not be deleted in case of crash of the master job.
235
236OAR define other variable that are equivalent to OAR_NODE_FILE:
237OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
238You can use in your script the OAR original file ressources
239by using these variable if you need it.
240 
241
242=head1 OPTIONS
243
244=over 12
245
246=item B<-f|--file       filecommand>
247
248File name which content job list.
249
250=item B<-v|--verbose>
251
252=item B<-j|--jobnp integer>
253
254Number of processor to allocated for each small job.
2551 by default.
256
257=item B<-n|--nodefile filenode>
258
259File name that list all the node to launch job.
260By defaut, it's define automatically by OAR via
261environment variable C<OAR_NODE_FILE>.
262
263For example, if you want to use 6 core on your cluster node,
264you need to put 6 times the hostname node in this file,
265one per line...
266It's a very common file in MPI process !
267
268=item B<-m|--masterio basefileio>
269
270The C<basefileio> will be use in place of environment variable
271C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
272(only use when option C<swithio> is activated).
273
274=item B<-s|--switchio>
275
276Each small job will have it's own output STDOUT and STDERR
277base on master OAR job with C<JOB_NUM> inside
278(or base on C<basefileio> if option C<masterio>).
279Example :
280
281 OAR.151524.stdout -> OAR.151524-JOB_NUM.stdout
282
283where 151524 here is the master C<OAR_JOB_ID>
284and C<JOB_NUM> is the small job nnumber.
285
286=item B<-o|-oarsh command>
287
288Command use to launch a shell on a node.
289By default
290
291        oarsh -q -T
292
293=item B<-h|--help>
294
295=back
296
297
298=head1 EXAMPLE
299
300Content for the job file (option C<--file>) could have:
301
302 - empty line
303 - comment line begin with #
304 - valid shell command
305
306Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
307
308 $HOME/test/subjob1.sh
309 $HOME/test/subjob2.sh
310 $HOME/test/subjob3.sh
311 $HOME/test/subjob4.sh
312 ...
313 $HOME/test/subjob38.sh
314 $HOME/test/subjob39.sh
315 $HOME/test/subjob40.sh
316
317These jobs could be launch by
318
319 oarsub -n test -l /core=6,walltime=00:35:00 "oar-parexec -f ./subjob.list.txt"
320
321
322=head1 SEE ALSO
323
324oar-dispatch, mpilauncher
325
326
327=head1 AUTHORS
328
329Written by Gabriel Moreau, Grenoble - France
330
331
332=head1 LICENSE AND COPYRIGHT
333
334GPL version 2 or later and Perl equivalent
335
336Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
337
Note: See TracBrowser for help on using the repository browser.