source: trunk/oarutils/oar-dispatch @ 425

Last change on this file since 425 was 97, checked in by g7moreau, 12 years ago
  • Clean code and add comments
File size: 8.8 KB
Line 
1#!/usr/bin/perl
2#
3# 2011/11/03 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Signal;
11use Coro::Semaphore;
12use Coro::AnyEvent;
13use Coro::Handle;
14use IO::File;
15
16my $task = 0;
17my $overload = 1;
18my $file = '';
19my $dir;
20my $cmd;
21my $logtrace;
22my $verbose;
23my $help;
24my $sig_transmit;
25my $sig_checkpoint = 'USR2';
26my $brake = 5; # five second time brake
27
28Getopt::Long::GetOptions(
29   'task=i'       => \$task,
30   'overload=f'   => \$overload,
31   'file=s'       => \$file,
32   'dir=s'        => \$dir,
33   'cmd=s'        => \$cmd,
34   'logtrace=s'   => \$logtrace,
35   'verbose'      => \$verbose,
36   'help'         => \$help,
37   'transmit'     => \$sig_transmit,
38   'kill=s'       => \$sig_checkpoint,
39   'brake=i'      => \$brake,
40   ) || pod2usage(-verbose => 0);
41pod2usage(-verbose => 2) if $help;
42
43if ($task == 0) {
44   open(NODE_FILE, '<', "$ENV{OAR_NODE_FILE}") or die "can't open ENV{OAR_NODE_FILE}: $!";
45   $task++ while <NODE_FILE>;
46   close NODE_FILE;
47   }
48
49# re-run, keep trace of job already done
50my %state;
51my $log_h = IO::File->new();
52if (-e "$logtrace") {
53   $log_h->open("< $logtrace")
54      or die "error: can't read log file: $!";
55   while (<$log_h>) {
56      $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/;
57      $state{$1} = 'end'   if m/^end\s+job\s+([^\s]+)\s/;
58      }
59   $log_h->close();
60   }
61if ($logtrace) {
62   $log_h->open(">> $logtrace")
63      or die "error: can't append log file $logtrace: $!";
64   $log_h->autoflush;
65   $log_h = unblock $log_h;
66   }
67
68# job to run
69my @job = ();
70#open (JOB_LIST, '<', "$file") or die "can't open $file: $!";
71#while (<JOB_LIST>) {
72#   chomp;
73#   next if m/^#/;
74#   push @job, $_ if m/^\s*oarsub/;
75#   }
76#close JOB_LIST;
77if (-e "$file") {
78   my $job_num = 0;
79   open(JOB_LIST, '<', "$file") or die "error: can't open job file $file: $!";
80   while (my $job_cmd = <JOB_LIST>) {
81      chomp $job_cmd;
82      next if $job_cmd =~ m/^#/;
83      next if $job_cmd =~ m/^\s*$/;
84      # Add oarsub -S if not
85      $job_cmd = "oarsub -S $job_cmd" if $job_cmd !~ m/^\s*oarsub/;
86      $job_num++;
87      my ($job_name) = $job_cmd =~ m/#.*?\bname=(\S+?)\b/i;
88      $job_name ||= $job_num;
89      push @job, {
90         name   => $job_name,
91         cmd    => "$job_cmd",
92         num    => $job_num,
93         };
94      }
95   close JOB_LIST;
96   }
97else {
98   # Add oarsub -S if not
99   $cmd = "oarsub -S $cmd" if $cmd !~ m/^\s*oarsub/;
100   my $job_num = 0;
101   opendir(DIR, $dir) or die "error: can't open folder $dir: $!";
102   while (my $item = readdir(DIR)) {
103      next if $item =~ m/^\./;
104      next if $item =~ m/:/;
105      next if $item =~ m/\.old$/;
106      next if $item =~ m/\.sav$/;
107      next if $item =~ m/\.bak$/;
108      next if $item =~ m/\.no$/;
109      next unless (-d "$dir/$item");
110      $job_num++;
111      push @job, {
112         name   => $item,
113         cmd    => "cd $dir/$item/; $cmd",
114         num    => $job_num,
115         };
116      }
117   closedir DIR;
118   }
119
120my $container_id=$ENV{OAR_JOB_ID};
121my $insert_oar_option = "-t inner=$container_id";
122
123# interactive job
124if (not $container_id > 1) {
125   $insert_oar_option = '';
126   $overload = 1;
127   }
128
129my $finished = new Coro::Signal;
130my $job_todo = new Coro::Semaphore 0;
131my $job_name_maxlen;
132for (@job) {
133   $job_todo->up;
134   $job_name_maxlen = length($_->{name}) if length($_->{name}) > $job_name_maxlen;
135   }
136
137my $job_active = new Coro::Semaphore 0;
138
139my %scheduled = ();
140
141# OAR checkpoint and default signal SIGUSR2
142my $oar_checkpoint = new Coro::Semaphore 0;
143my $notify         = new Coro::Signal;
144$SIG{$sig_checkpoint} = sub {
145   print "warning: receive checkpoint at "
146      . time
147      . ", no new job, just finishing running job\n"
148      if $verbose;
149   $oar_checkpoint->up();
150   $notify->send if $sig_transmit;
151   };
152
153# asynchrone notify job
154async {
155   while () {
156      $notify->wait;
157
158      for my $job_pid (keys %scheduled) {
159         system "oardel --checkpoint --signal $sig_checkpoint $job_pid";
160         cede;
161         }
162      }
163   }
164
165# asynchrone start job block
166async {
167   JOB:
168   for my $job (@job) {
169      my $job_name   = $job->{name};
170      my $job_cmd    = $job->{cmd};
171
172      # job has been already run ?
173      if (exists $state{$job_name}) {
174         if ($state{$job_name} eq 'start') {
175            print "warning: job $job_name was not clearly finished, relaunching...\n"
176               if $verbose;
177            }
178         elsif ($state{$job_name} eq 'end') {
179            delete $state{$job_name}; # free memory
180            $job_todo->down;
181            print "warning: job $job_name already run\n" if $verbose;
182            cede;
183            next JOB;
184            }
185         }
186
187      while ($job_active->count >= $task*$overload) {
188         cede;
189         }
190
191      # no more launch job when OAR checkpointing
192      last JOB if $oar_checkpoint->count() > 0;
193
194      $job_cmd =~ s/^\b(oarsub)\b/$1 $insert_oar_option/;
195      print "$job_cmd" if $verbose;
196      my $job_id = `$job_cmd|grep ^OAR_JOB_ID|cut -f 2 -d '='`;
197      chomp $job_id;
198      if ($job_id > 1) {
199         $scheduled{$job_id} = {
200            name         => $job_name,
201            };
202         $job_active->up;
203
204         my $msg = sprintf "start job %${job_name_maxlen}s / %i at %s\n",
205            $job_name, $job_id, time;
206         $log_h->print($msg) if $logtrace;
207         print($msg) if $verbose;
208         }
209      cede;
210
211      # asynchrone guard for job end
212      async {
213         my $timer;
214         GUARD:
215         while () {
216            cede;
217
218            # wait to not re-launch oarstat to fast
219            # equivalent to sleep $brake
220            $timer = AE::now + $brake;
221            while ( AE::now < $timer ) {
222               # force update of AE time
223               AE::now_update;
224               cede;
225               }
226
227            my $is_finish = `oarstat -s -j $job_id`;
228            chomp $is_finish;
229            last GUARD if $is_finish =~ m/Terminated/;
230            }
231
232         my $msg = sprintf "end   job %${job_name_maxlen}s / %i at %s\n",
233            $scheduled{$job_id}->{name}, $job_id, time;
234
235         # Job non finish, just suspend if received checkpoint signal
236         $msg =~ s/^end\s+job/suspend job/
237            if $sig_transmit and $oar_checkpoint->count() > 0;
238
239         $log_h->print($msg) if $logtrace;
240         print($msg) if $verbose;
241
242         $job_active->down;
243         $job_todo->down;
244         delete $scheduled{$job_id};
245         }
246      }
247   }
248
249async {
250   while () {
251      # checkpointing ! just finishing running job and quit
252      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
253
254      $finished->send if $job_todo->count == 0;
255      cede;
256      }
257   }
258
259cede;
260   
261# all job have been done
262$finished->wait;
263
264# close log trace file
265$log_h->close() if $logtrace;
266
267
268__END__
269
270=head1 NAME
271
272oar-dispatch - dispatch lot of small oar job
273
274=head1 SYNOPSIS
275
276 oar-dispatch [--task integer] [--overload real] --file filecommand [--verbose]
277 oar-dispatch --help
278
279=head1 OPTIONS
280
281=over 12
282
283=item B<[-t|--task integer]>
284
285Number of task to do in parallel.
286Default to the line number of the file OAR_NODE_FILE.
287 
288=item B<[-o|--overload real]>
289
290Number of OAR job to create / number of task.
291Some job are create in advance to start whenever it's possible.
2921.1 by default.
293
294=item B<[-f|--file filecommand]>
295
296File name which content OAR job list
297
298=item B<[-v|--verbose]>
299 
300=item B<[-h|--help]>
301
302=back
303
304Input job file name content can have
305
306 - empty line
307 - comment line begin with #
308 - oarsub command without -t option
309 
310C<oar-dispatch> will add C<-t inner=container_id> in this command line,
311just after C<oarsub>.
312
313=head1 EXAMPLE
314
315Example where the file F<$HOME/test/subjob.txt> is a list of OAR script job (and can be executable but not need here).
316
317 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob1.oar
318 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob2.oar
319 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob3.oar
320 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob4.oar
321 ...
322 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob38.oar
323 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob39.oar
324 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob40.oar
325
326These jobs could be launch with
327
328 oarsub -t container -n test-container -l /core=6,walltime=00:35:00 "oar-dispatch -f ./subjob.list.txt"
329
330Total C<walltime> is defined by the formula:
331
332 total_walltime = subjob_walltime * total_subjob / core + global_delay
333
334In practise, C<oar-dispatch> take few second and each subjob run in less than it's walltime so
335
336 total_walltime < subjob_walltime * total_subjob / core
337
338If launch in interactif, C<overload> parameter is equal to 1,
339C<task> must be define
340and no inner container is add to the C<oarsub> command line.
341
342
343=head1 SEE ALSO
344
345oar-parexec, mpilauncher
346
347
348=head1 AUTHORS
349
350Written by Gabriel Moreau, Grenoble - France
351
352
353=head1 LICENSE AND COPYRIGHT
354
355GPL version 2 or later and Perl equivalent
356
357Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
358
Note: See TracBrowser for help on using the repository browser.