source: trunk/oarutils/oar-dispatch @ 96

Last change on this file since 96 was 96, checked in by g7moreau, 11 years ago
  • Change sleep function (do not work) with manual timer
File size: 8.9 KB
Line 
1#!/usr/bin/perl
2#
3# 2011/11/03 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Signal;
11use Coro::Semaphore;
12use Coro::AnyEvent;
13use Coro::Handle;
14use IO::File;
15
16my $task = 0;
17my $overload = 1;
18my $file = '';
19my $dir;
20my $cmd;
21my $logtrace;
22my $verbose;
23my $help;
24my $sig_transmit;
25my $sig_checkpoint = 'USR2';
26
27Getopt::Long::GetOptions(
28   'task=i'       => \$task,
29   'overload=f'   => \$overload,
30   'file=s'       => \$file,
31   'dir=s'        => \$dir,
32   'cmd=s'        => \$cmd,
33   'logtrace=s'   => \$logtrace,
34   'verbose'      => \$verbose,
35   'help'         => \$help,
36   'transmit'     => \$sig_transmit,
37   'kill=s'       => \$sig_checkpoint,
38   ) || pod2usage(-verbose => 0);
39pod2usage(-verbose => 2) if $help;
40
41if ($task == 0) {
42   open(NODE_FILE, '<', "$ENV{OAR_NODE_FILE}") or die "can't open ENV{OAR_NODE_FILE}: $!";
43   $task++ while <NODE_FILE>;
44   close NODE_FILE;
45   }
46
47# re-run, keep trace of job already done
48my %state;
49my $log_h = IO::File->new();
50if (-e "$logtrace") {
51   $log_h->open("< $logtrace")
52      or die "error: can't read log file: $!";
53   while (<$log_h>) {
54      $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/;
55      $state{$1} = 'end'   if m/^end\s+job\s+([^\s]+)\s/;
56      }
57   $log_h->close();
58   }
59if ($logtrace) {
60   $log_h->open(">> $logtrace")
61      or die "error: can't append log file $logtrace: $!";
62   $log_h->autoflush;
63   $log_h = unblock $log_h;
64   }
65
66# job to run
67my @job = ();
68#open (JOB_LIST, '<', "$file") or die "can't open $file: $!";
69#while (<JOB_LIST>) {
70#   chomp;
71#   next if m/^#/;
72#   push @job, $_ if m/^\s*oarsub/;
73#   }
74#close JOB_LIST;
75if (-e "$file") {
76   my $job_num = 0;
77   open(JOB_LIST, '<', "$file") or die "error: can't open job file $file: $!";
78   while (my $job_cmd = <JOB_LIST>) {
79      chomp $job_cmd;
80      next if $job_cmd =~ m/^#/;
81      next if $job_cmd =~ m/^\s*$/;
82      # Add oarsub -S if not
83      $job_cmd = "oarsub -S $job_cmd" if $job_cmd !~ m/^\s*oarsub/;
84      $job_num++;
85      my ($job_name) = $job_cmd =~ m/#.*?\bname=(\S+?)\b/i;
86      $job_name ||= $job_num;
87      push @job, {
88         name   => $job_name,
89         cmd    => "$job_cmd",
90         num    => $job_num,
91         };
92      }
93   close JOB_LIST;
94   }
95else {
96   # Add oarsub -S if not
97   $cmd = "oarsub -S $cmd" if $cmd !~ m/^\s*oarsub/;
98   my $job_num = 0;
99   opendir(DIR, $dir) or die "error: can't open folder $dir: $!";
100   while (my $item = readdir(DIR)) {
101      next if $item =~ m/^\./;
102      next if $item =~ m/:/;
103      next if $item =~ m/\.old$/;
104      next if $item =~ m/\.sav$/;
105      next if $item =~ m/\.bak$/;
106      next if $item =~ m/\.no$/;
107      next unless (-d "$dir/$item");
108      $job_num++;
109      push @job, {
110         name   => $item,
111         cmd    => "cd $dir/$item/; $cmd",
112         num    => $job_num,
113         };
114      }
115   closedir DIR;
116   }
117
118my $container_id=$ENV{OAR_JOB_ID};
119my $insert_oar_option = "-t inner=$container_id";
120
121# interactive job
122if (not $container_id > 1) {
123   $insert_oar_option = '';
124   $overload = 1;
125   }
126
127my $finished = new Coro::Signal;
128my $job_todo = new Coro::Semaphore 0;
129my $job_name_maxlen;
130for (@job) {
131   $job_todo->up;
132   $job_name_maxlen = length($_->{name}) if length($_->{name}) > $job_name_maxlen;
133   }
134
135my $job_active = new Coro::Semaphore 0;
136
137my %scheduled = ();
138
139# OAR checkpoint and default signal SIGUSR2
140my $oar_checkpoint = new Coro::Semaphore 0;
141my $notify         = new Coro::Signal;
142$SIG{$sig_checkpoint} = sub {
143   print "warning: receive checkpoint at "
144      . time
145      . ", no new job, just finishing running job\n"
146      if $verbose;
147   $oar_checkpoint->up();
148   $notify->send if $sig_transmit;
149   };
150
151# asynchrone notify job
152async {
153   while () {
154      $notify->wait;
155
156      for my $job_pid (keys %scheduled) {
157         system "oardel --checkpoint --signal $sig_checkpoint $job_pid";
158         cede;
159         }
160      }
161   }
162
163# asynchrone start job block
164async {
165   JOB:
166   for my $job (@job) {
167      my $job_name   = $job->{name};
168      my $job_cmd    = $job->{cmd};
169
170      # job has been already run ?
171      if (exists $state{$job_name}) {
172         if ($state{$job_name} eq 'start') {
173            print "warning: job $job_name was not clearly finished, relaunching...\n"
174               if $verbose;
175            }
176         elsif ($state{$job_name} eq 'end') {
177            delete $state{$job_name}; # free memory
178            $job_todo->down;
179            print "warning: job $job_name already run\n" if $verbose;
180            cede;
181            next JOB;
182            }
183         }
184
185      while ($job_active->count >= $task*$overload) {
186         cede;
187         }
188
189      # no more launch job when OAR checkpointing
190      last JOB if $oar_checkpoint->count() > 0;
191
192      $job_cmd =~ s/^\b(oarsub)\b/$1 $insert_oar_option/;
193      print "$job_cmd" if $verbose;
194      my $job_id = `$job_cmd|grep ^OAR_JOB_ID|cut -f 2 -d '='`;
195      chomp $job_id;
196      if ($job_id > 1) {
197         $scheduled{$job_id} = {
198            name         => $job_name,
199            };
200         $job_active->up;
201
202         my $msg = sprintf "start job %${job_name_maxlen}s / %i at %s\n",
203            $job_name, $job_id, time;
204         $log_h->print($msg) if $logtrace;
205         print($msg) if $verbose;
206         }
207      cede;
208
209      # asynchrone guard for job end
210      async {
211         my $timer;
212         GUARD:
213         while () {
214            cede;
215            $timer = AE::now + 5;
216            while ( AE::now < $timer ) { AE::now_update; cede; }
217            my $is_finish = `oarstat -s -j $job_id`;
218            chomp $is_finish;
219            last GUARD if $is_finish =~ m/Terminated/;
220            }
221
222         my $msg = sprintf "end   job %${job_name_maxlen}s / %i at %s\n",
223            $scheduled{$job_id}->{name}, $job_id, time;
224
225            # Job non finish, just suspend if received checkpoint signal
226            $msg =~ s/^end\s+job/suspend job/
227               if $sig_transmit and $oar_checkpoint->count() > 0;
228
229            $log_h->print($msg) if $logtrace;
230            print($msg) if $verbose;
231
232         $job_active->down;
233         $job_todo->down;
234         delete $scheduled{$job_id};
235         }
236      }
237   };
238
239async {
240   while () {
241#      for my $job_id (keys %scheduled) {
242#         my $is_finish = `oarstat -s -j $job_id`;
243#         chomp $is_finish;
244#         if ($is_finish =~ m/Terminated/) {
245#            delete $scheduled{$job_id};
246#            $job_active->down;
247#            $job_todo->down;
248#            }
249#         cede;
250#         }
251
252      # checkpointing ! just finishing running job and quit
253      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
254
255      $finished->send if $job_todo->count == 0;
256      cede;
257      }
258   };
259
260cede;
261   
262# all job have been done
263$finished->wait;
264
265# close log trace file
266$log_h->close() if $logtrace;
267
268
269__END__
270
271=head1 NAME
272
273oar-dispatch - dispatch lot of small oar job
274
275=head1 SYNOPSIS
276
277 oar-dispatch [--task integer] [--overload real] --file filecommand [--verbose]
278 oar-dispatch --help
279
280=head1 OPTIONS
281
282=over 12
283
284=item B<[-t|--task integer]>
285
286Number of task to do in parallel.
287Default to the line number of the file OAR_NODE_FILE.
288 
289=item B<[-o|--overload real]>
290
291Number of OAR job to create / number of task.
292Some job are create in advance to start whenever it's possible.
2931.1 by default.
294
295=item B<[-f|--file filecommand]>
296
297File name which content OAR job list
298
299=item B<[-v|--verbose]>
300 
301=item B<[-h|--help]>
302
303=back
304
305Input job file name content can have
306
307 - empty line
308 - comment line begin with #
309 - oarsub command without -t option
310 
311C<oar-dispatch> will add C<-t inner=container_id> in this command line,
312just after C<oarsub>.
313
314=head1 EXAMPLE
315
316Example where the file F<$HOME/test/subjob.txt> is a list of OAR script job (and can be executable but not need here).
317
318 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob1.oar
319 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob2.oar
320 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob3.oar
321 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob4.oar
322 ...
323 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob38.oar
324 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob39.oar
325 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob40.oar
326
327These jobs could be launch with
328
329 oarsub -t container -n test-container -l /core=6,walltime=00:35:00 "oar-dispatch -f ./subjob.list.txt"
330
331Total C<walltime> is defined by the formula:
332
333 total_walltime = subjob_walltime * total_subjob / core + global_delay
334
335In practise, C<oar-dispatch> take few second and each subjob run in less than it's walltime so
336
337 total_walltime < subjob_walltime * total_subjob / core
338
339If launch in interactif, C<overload> parameter is equal to 1,
340C<task> must be define
341and no inner container is add to the C<oarsub> command line.
342
343
344=head1 SEE ALSO
345
346oar-parexec, mpilauncher
347
348
349=head1 AUTHORS
350
351Written by Gabriel Moreau, Grenoble - France
352
353
354=head1 LICENSE AND COPYRIGHT
355
356GPL version 2 or later and Perl equivalent
357
358Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
359
Note: See TracBrowser for help on using the repository browser.