source: trunk/oarutils/oar-parexec @ 39

Last change on this file since 39 was 39, checked in by g7moreau, 12 years ago
  • Add OAR checkpointing!
File size: 8.2 KB
Line 
1#!/usr/bin/perl
2#
3# 2011/11/27 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Semaphore;
11use Coro::Signal;
12use Coro::Channel;
13use Coro::Handle;
14use IO::File;
15use POSIX qw( WNOHANG WEXITSTATUS );
16use Cwd qw( getcwd );
17
18my $file = '';
19my $logfile = '';
20my $verbose;
21my $job_np = 1;
22my $nodefile = $ENV{OAR_NODE_FILE} || '';
23my $masterio;
24my $switchio;
25my $help;
26my $oarsh = 'oarsh -q -T';
27
28Getopt::Long::GetOptions(
29   'file=s'     => \$file,
30   'logfile=s'  => \$logfile,
31   'verbose'    => \$verbose,
32   'help'       => \$help,
33   'oarsh=s'    => \$oarsh,
34   'jobnp=i'    => \$job_np,
35   'nodefile=s' => \$nodefile,
36   'masterio=s' => \$masterio,
37   'switchio'   => \$switchio,
38   ) || pod2usage( -verbose => 0 );
39pod2usage( -verbose => 2 ) if $help;
40pod2usage( -verbose => 2 ) if not -e $file;
41
42my %state;
43my $log_h = IO::File->new();
44if (-e $logfile) {
45        $log_h->open("< $logfile")
46         or die "can't read log file: $!";
47   while (<$log_h>) {
48                $state{$1} = 'start' if m/^start\sjob\s(\d+)/;
49                $state{$1} = 'end'   if m/^end\sjob\s(\d+)/;
50           }
51   $log_h->close();
52   }
53if ($logfile) {
54   $log_h->open(">> $logfile")
55         or die "can't open log file: $!";
56   $log_h = unblock $log_h;
57   }
58
59my @job = ();
60open( JOB_LIST, '<', "$file" ) or die "can't open $file: $!";
61while (<JOB_LIST>) {
62   chomp;
63   next if m/^#/;
64   next if m/^\s*$/;
65   push @job, $_ ;
66   }
67close JOB_LIST;
68
69my @ressources = ();
70open( NODE_FILE, '<', "$nodefile" )
71   or die "can't open $nodefile: $!";
72while (<NODE_FILE>) {
73   chomp;
74   next if m/^#/;
75   next if m/^\s*$/;
76   push @ressources, $_ ;
77   }
78close NODE_FILE;
79
80my $ressource_size = scalar(@ressources);
81die "not enought ressources jobnp $job_np > ressources $ressource_size" if $job_np > $ressource_size;
82
83my $current_dir = getcwd();
84
85my $stderr = $ENV{OAR_STDERR} || '';
86$stderr =~ s/\.stderr$//;
87$stderr = $masterio if $masterio;
88my $stdout = $ENV{OAR_STDOUT} || '';
89$stdout =~ s/\.stdout$//;
90$stdout = $masterio if $masterio;
91
92
93my $finished = new Coro::Signal;
94my $job_todo = new Coro::Semaphore 0;
95$job_todo->up for (@job);
96
97my $ressources = new Coro::Channel;
98for my $slot (1 .. int($ressource_size / $job_np)) {
99   $ressources->put( join(',', @ressources[(($slot - 1) * $job_np) .. (($slot * $job_np) - 1)] ) );
100   }
101
102
103my $job_num   = 0;
104my %scheduled = ();
105
106my $oar_checkpoint = new Coro::Semaphore 0;
107$SIG{USR2} = sub { $oar_checkpoint->up };
108
109async {
110   for my $job (@job) {
111      $job_num++;
112                if (exists $state{$job_num} and $state{$job_num} eq 'end') {
113                        delete $state{$job_num};
114                        cede;
115                        next;
116                   }
117                if (exists $state{$job_num} and $state{$job_num} eq 'start') {
118                        print "warning: job $job_num was not finished, relaunching...\n" if $verbose;
119                   }           
120
121      my $job_ressource = $ressources->get;
122
123                last if $oar_checkpoint->count > 0;
124
125      my ($node_connect) = split ',', $job_ressource;
126      my $fh      = IO::File->new();
127      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
128         or die "don't start subjob: $!";
129
130      $fh->autoflush;
131      $fh = unblock $fh;
132
133      $scheduled{$job_pid} = { fh => $fh, node_connect => $node_connect, ressource => $job_ressource, num => $job_num };
134
135      $log_h->printf("start job %5i at %s\n", $job_num, time) if $logfile;
136      printf "start job %5i / %5i at %s on node %s\n",
137         $job_num, $job_pid, time, $job_ressource
138         if $verbose;
139
140      my ( $job_stdout, $job_stderr );
141      $job_stdout = ">  $stdout-$job_num.stdout" if $stdout ne '' and $switchio;
142      $job_stderr = "2> $stderr-$job_num.stderr" if $stderr ne '' and $switchio;
143
144      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$job_num";
145
146      if ($job_np > 1) {
147         $fh->print("printf \""
148            . join('\n',split(',',$job_ressource,))
149            . "\" > $job_nodefile\n");
150         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
151         $fh->print("OAR_NP=$job_np\n");
152         $fh->print("export OAR_NODE_FILE\n");
153         $fh->print("export OAR_NP\n");
154         $fh->print("unset OAR_MSG_NODEFILE\n");
155         }
156      $fh->print("cd $current_dir\n");
157      $fh->print("$job $job_stdout $job_stderr\n");
158      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
159      $fh->print("exit\n");
160      cede;
161      }
162   }
163
164async {
165   while () {
166      for my $job_pid ( keys %scheduled ) {
167         if ( waitpid( $job_pid, WNOHANG ) ) {
168            $log_h->printf("end job %5i at %s\n", $job_num, time) if $logfile;
169            printf "end   job %5i / %5i at %s on node %s\n",
170               $scheduled{$job_pid}->{num},
171               $job_pid, time,
172               $scheduled{$job_pid}->{ressource}
173               if $verbose;
174            close $scheduled{$job_pid}->{fh};
175            $ressources->put( $scheduled{$job_pid}->{ressource} );
176            $job_todo->down;
177            delete $scheduled{$job_pid};
178            }
179         cede;
180         }
181
182                $finished->send  if $oar_checkpoint->count > 0 and keys(%scheduled) == 0;
183
184      $finished->send if $job_todo->count == 0;
185      cede;
186      }
187   }
188
189cede;
190
191$finished->wait;
192
193$log_h->close if $logfile;
194
195__END__
196
197=head1 NAME
198
199oar-parexec - parallel execute lot of small job
200
201=head1 SYNOPSIS
202
203 oar-parexec --file filecommand [--verbose] [--jobnp integer] [--nodefile filenode] [--masterio basefileio] [--switchio] [--oarsh sssh]
204 oar-parexec --help
205
206=head1 DESCRIPTION
207
208C<oar-parexec> execute lot of small job.in parallel inside a cluster.
209Number of parallel job at one time cannot excede core number in the node file.
210C<oar-parexec> is easier to use inside an OAR job environment
211which define automatically theses strategics parameters...
212
213Option C<--file> is the only mandatory one.
214
215Small job will be launch in the same folder as the master job.
216Two environment variable are define for each small job
217and only in case of parallel small job (option C<--jobnp> > 1).
218
219 OAR_NODE_FILE - file that list node for parallel computing
220 OAR_NP        - number of processor affected
221
222The file define by OAR_NODE_FILE is created on the node before launching
223the small job in /tmp and will be delete after...
224C<oar-parexec> is a simple script,
225OAR_NODE_FILE will not be deleted in case of crash of the master job.
226
227OAR define other variable that are equivalent to OAR_NODE_FILE:
228OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
229You can use in your script the OAR original file ressources
230by using these variable if you need it.
231 
232
233=head1 OPTIONS
234
235=over 12
236
237=item B<-f|--file       filecommand>
238
239File name which content job list.
240
241=item B<-v|--verbose>
242
243=item B<-j|--jobnp integer>
244
245Number of processor to allocated for each small job.
2461 by default.
247
248=item B<-n|--nodefile filenode>
249
250File name that list all the node to launch job.
251By defaut, it's define automatically by OAR via
252environment variable C<OAR_NODE_FILE>.
253
254For example, if you want to use 6 core on your cluster node,
255you need to put 6 times the hostname node in this file,
256one per line...
257It's a very common file in MPI process !
258
259=item B<-m|--masterio basefileio>
260
261The C<basefileio> will be use in place of environment variable
262C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
263(only use when option C<swithio> is activated).
264
265=item B<-s|--switchio>
266
267Each small job will have it's own output STDOUT and STDERR
268base on master OAR job with C<JOB_NUM> inside
269(or base on C<basefileio> if option C<masterio>).
270Example :
271
272 OAR.151524.stdout -> OAR.151524-JOB_NUM.stdout
273
274where 151524 here is the master C<OAR_JOB_ID>
275and C<JOB_NUM> is the small job nnumber.
276
277=item B<-o|-oarsh command>
278
279Command use to launch a shell on a node.
280By default
281
282        oarsh -q -T
283
284=item B<-h|--help>
285
286=back
287
288
289=head1 EXAMPLE
290
291Content for the job file (option C<--file>) could have:
292
293 - empty line
294 - comment line begin with #
295 - valid shell command
296
297Example where F<$HOME/test/subjob1.sh> is a shell script (executable).
298
299 $HOME/test/subjob1.sh
300 $HOME/test/subjob2.sh
301 $HOME/test/subjob3.sh
302 $HOME/test/subjob4.sh
303 ...
304 $HOME/test/subjob38.sh
305 $HOME/test/subjob39.sh
306 $HOME/test/subjob40.sh
307
308These jobs could be launch by
309
310 oarsub -n test -l /core=6,walltime=00:35:00 "oar-parexec -f ./subjob.list.txt"
311
312
313=head1 SEE ALSO
314
315oar-dispatch, mpilauncher
316
317
318=head1 AUTHORS
319
320Written by Gabriel Moreau, Grenoble - France
321
322
323=head1 LICENSE AND COPYRIGHT
324
325GPL version 2 or later and Perl equivalent
326
327Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
328
Note: See TracBrowser for help on using the repository browser.