source: trunk/oarutils/oar-dispatch @ 95

Last change on this file since 95 was 95, checked in by g7moreau, 12 years ago
  • Add folder list command
  • Add state manager
File size: 8.8 KB
RevLine 
[9]1#!/usr/bin/perl
2#
3# 2011/11/03 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
[12]10use Coro::Signal;
[9]11use Coro::Semaphore;
[93]12use Coro::Timer qw(sleep);
[94]13use Coro::Handle;
14use IO::File;
[9]15
16my $task = 0;
[95]17my $overload = 1;
[9]18my $file = '';
[95]19my $dir;
20my $cmd;
[94]21my $logtrace;
[9]22my $verbose;
23my $help;
[94]24my $sig_transmit;
25my $sig_checkpoint = 'USR2';
[9]26
27Getopt::Long::GetOptions(
[92]28   'task=i'       => \$task,
29   'overload=f'   => \$overload,
30   'file=s'       => \$file,
[95]31   'dir=s'        => \$dir,
32   'cmd=s'        => \$cmd,
[94]33   'logtrace=s'   => \$logtrace,
[92]34   'verbose'      => \$verbose,
35   'help'         => \$help,
[94]36   'transmit'     => \$sig_transmit,
37   'kill=s'       => \$sig_checkpoint,
[92]38   ) || pod2usage(-verbose => 0);
[9]39pod2usage(-verbose => 2) if $help;
40
41if ($task == 0) {
[92]42   open(NODE_FILE, '<', "$ENV{OAR_NODE_FILE}") or die "can't open ENV{OAR_NODE_FILE}: $!";
43   $task++ while <NODE_FILE>;
44   close NODE_FILE;
45   }
[9]46
[94]47# re-run, keep trace of job already done
48my %state;
49my $log_h = IO::File->new();
50if (-e "$logtrace") {
51   $log_h->open("< $logtrace")
52      or die "error: can't read log file: $!";
53   while (<$log_h>) {
54      $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/;
55      $state{$1} = 'end'   if m/^end\s+job\s+([^\s]+)\s/;
56      }
57   $log_h->close();
58   }
59if ($logtrace) {
60   $log_h->open(">> $logtrace")
61      or die "error: can't append log file $logtrace: $!";
62   $log_h->autoflush;
63   $log_h = unblock $log_h;
64   }
65
[92]66# job to run
[9]67my @job = ();
[95]68#open (JOB_LIST, '<', "$file") or die "can't open $file: $!";
69#while (<JOB_LIST>) {
70#   chomp;
71#   next if m/^#/;
72#   push @job, $_ if m/^\s*oarsub/;
73#   }
74#close JOB_LIST;
75if (-e "$file") {
76   my $job_num = 0;
77   open(JOB_LIST, '<', "$file") or die "error: can't open job file $file: $!";
78   while (my $job_cmd = <JOB_LIST>) {
79      chomp $job_cmd;
80      next if $job_cmd =~ m/^#/;
81      next if $job_cmd =~ m/^\s*$/;
82      # Add oarsub -S if not
83      $job_cmd = "oarsub -S $job_cmd" if $job_cmd !~ m/^\s*oarsub/;
84      $job_num++;
85      my ($job_name) = $job_cmd =~ m/#.*?\bname=(\S+?)\b/i;
86      $job_name ||= $job_num;
87      push @job, {
88         name   => $job_name,
89         cmd    => "$job_cmd",
90         num    => $job_num,
91         };
92      }
93   close JOB_LIST;
[92]94   }
[95]95else {
96   # Add oarsub -S if not
97   $cmd = "oarsub -S $cmd" if $cmd !~ m/^\s*oarsub/;
98   my $job_num = 0;
99   opendir(DIR, $dir) or die "error: can't open folder $dir: $!";
100   while (my $item = readdir(DIR)) {
101      next if $item =~ m/^\./;
102      next if $item =~ m/:/;
103      next if $item =~ m/\.old$/;
104      next if $item =~ m/\.sav$/;
105      next if $item =~ m/\.bak$/;
106      next if $item =~ m/\.no$/;
107      next unless (-d "$dir/$item");
108      $job_num++;
109      push @job, {
110         name   => $item,
111         cmd    => "cd $dir/$item/; $cmd",
112         num    => $job_num,
113         };
114      }
115   closedir DIR;
116   }
[9]117
118my $container_id=$ENV{OAR_JOB_ID};
119my $insert_oar_option = "-t inner=$container_id";
120
121# interactive job
122if (not $container_id > 1) {
[92]123   $insert_oar_option = '';
124   $overload = 1;
125   }
[9]126
127my $finished = new Coro::Signal;
128my $job_todo = new Coro::Semaphore 0;
[95]129my $job_name_maxlen;
130for (@job) {
131   $job_todo->up;
132   $job_name_maxlen = length($_->{name}) if length($_->{name}) > $job_name_maxlen;
133   }
[9]134
[95]135my $job_active = new Coro::Semaphore 0;
136
[9]137my %scheduled = ();
138
[94]139# OAR checkpoint and default signal SIGUSR2
140my $oar_checkpoint = new Coro::Semaphore 0;
141my $notify         = new Coro::Signal;
142$SIG{$sig_checkpoint} = sub {
143   print "warning: receive checkpoint at "
144      . time
145      . ", no new job, just finishing running job\n"
146      if $verbose;
147   $oar_checkpoint->up();
148   $notify->send if $sig_transmit;
149   };
150
151# asynchrone notify job
152async {
153   while () {
154      $notify->wait;
155
156      for my $job_pid (keys %scheduled) {
157         system "oardel --checkpoint --signal $sig_checkpoint $job_pid";
158         cede;
159         }
160      }
161   }
162
[92]163# asynchrone start job block
[9]164async {
[92]165   JOB:
166   for my $job (@job) {
[95]167      my $job_name   = $job->{name};
168      my $job_cmd    = $job->{cmd};
169
170      # job has been already run ?
171      if (exists $state{$job_name}) {
172         if ($state{$job_name} eq 'start') {
173            print "warning: job $job_name was not clearly finished, relaunching...\n"
174               if $verbose;
175            }
176         elsif ($state{$job_name} eq 'end') {
177            delete $state{$job_name}; # free memory
178            $job_todo->down;
179            print "warning: job $job_name already run\n" if $verbose;
180            cede;
181            next JOB;
182            }
183         }
184
[92]185      while ($job_active->count >= $task*$overload) {
186         cede;
187         }
[94]188
189      # no more launch job when OAR checkpointing
190      last JOB if $oar_checkpoint->count() > 0;
191
[95]192      $job_cmd =~ s/^\b(oarsub)\b/$1 $insert_oar_option/;
193      print "$job_cmd" if $verbose;
194      my $job_id = `$job_cmd|grep ^OAR_JOB_ID|cut -f 2 -d '='`;
[92]195      chomp $job_id;
196      if ($job_id > 1) {
[95]197         $scheduled{$job_id} = {
198            name         => $job_name,
199            };
[92]200         $job_active->up;
[95]201
202         my $msg = sprintf "start job %${job_name_maxlen}s / %i at %s\n",
203            $job_name, $job_id, time;
204         $log_h->print($msg) if $logtrace;
205         print($msg) if $verbose;
[92]206         }
207      cede;
[93]208     
209      # asynchrone guard for job end
210      async {
211         my $job_id = shift;
212         GUARD:
213         while () {
214            sleep 15; # async, do not re-launch oarstat to fast
215            my $is_finish = `oarstat -s -j $job_id`;
216            chomp $is_finish;
217            last GUARD if $is_finish =~ m/Terminated/;
218            }
[95]219
220         my $msg = sprintf "end   job %${job_name_maxlen}s / %i at %s\n",
221            $scheduled{$job_id}->{name}, $job_id, time;
222
223            # Job non finish, just suspend if received checkpoint signal
224            $msg =~ s/^end\s+job/suspend job/
225               if $sig_transmit and $oar_checkpoint->count() > 0;
226
227            $log_h->print($msg) if $logtrace;
228            print($msg) if $verbose;
229
[93]230         $job_active->down;
231         $job_todo->down;
[95]232         delete $scheduled{$job_id};
[93]233         } $job_id;
[92]234      }
[93]235   };
[9]236
237async {
[92]238   while () {
[93]239#      for my $job_id (keys %scheduled) {
240#         my $is_finish = `oarstat -s -j $job_id`;
241#         chomp $is_finish;
242#         if ($is_finish =~ m/Terminated/) {
243#            delete $scheduled{$job_id};
244#            $job_active->down;
245#            $job_todo->down;
246#            }
247#         cede;
248#         }
[9]249
[94]250      # checkpointing ! just finishing running job and quit
251      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
252
[92]253      $finished->send if $job_todo->count == 0;
254      cede;
255      }
[93]256   };
[9]257
258cede;
259   
[92]260# all job have been done
[9]261$finished->wait;
262
[94]263# close log trace file
264$log_h->close() if $logtrace;
[9]265
[94]266
[9]267__END__
268
269=head1 NAME
270
271oar-dispatch - dispatch lot of small oar job
272
273=head1 SYNOPSIS
274
[33]275 oar-dispatch [--task integer] [--overload real] --file filecommand [--verbose]
[9]276 oar-dispatch --help
277
278=head1 OPTIONS
279
[30]280=over 12
281
282=item B<[-t|--task integer]>
283
284Number of task to do in parallel.
285Default to the line number of the file OAR_NODE_FILE.
[9]286 
[30]287=item B<[-o|--overload real]>
[9]288
[30]289Number of OAR job to create / number of task.
290Some job are create in advance to start whenever it's possible.
2911.1 by default.
[9]292
[33]293=item B<[-f|--file filecommand]>
[30]294
295File name which content OAR job list
296
297=item B<[-v|--verbose]>
[9]298 
[30]299=item B<[-h|--help]>
[9]300
[30]301=back
302
[11]303Input job file name content can have
[9]304
305 - empty line
306 - comment line begin with #
307 - oarsub command without -t option
308 
309C<oar-dispatch> will add C<-t inner=container_id> in this command line,
310just after C<oarsub>.
311
[30]312=head1 EXAMPLE
[9]313
[30]314Example where the file F<$HOME/test/subjob.txt> is a list of OAR script job (and can be executable but not need here).
315
[9]316 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob1.oar
317 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob2.oar
318 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob3.oar
319 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob4.oar
320 ...
321 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob38.oar
322 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob39.oar
323 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob40.oar
324
[11]325These jobs could be launch with
[9]326
327 oarsub -t container -n test-container -l /core=6,walltime=00:35:00 "oar-dispatch -f ./subjob.list.txt"
328
[11]329Total C<walltime> is defined by the formula:
[9]330
331 total_walltime = subjob_walltime * total_subjob / core + global_delay
332
[11]333In practise, C<oar-dispatch> take few second and each subjob run in less than it's walltime so
[9]334
335 total_walltime < subjob_walltime * total_subjob / core
336
[10]337If launch in interactif, C<overload> parameter is equal to 1,
[9]338C<task> must be define
339and no inner container is add to the C<oarsub> command line.
340
341
[28]342=head1 SEE ALSO
343
344oar-parexec, mpilauncher
345
346
[9]347=head1 AUTHORS
348
[28]349Written by Gabriel Moreau, Grenoble - France
350
351
352=head1 LICENSE AND COPYRIGHT
353
354GPL version 2 or later and Perl equivalent
355
356Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
357
Note: See TracBrowser for help on using the repository browser.