source: trunk/oarutils/oar-dispatch @ 94

Last change on this file since 94 was 94, checked in by g7moreau, 11 years ago
  • Add checkpointing and signal API. No test done...
File size: 6.3 KB
Line 
1#!/usr/bin/perl
2#
3# 2011/11/03 gabriel
4
5use strict;
6
7use Getopt::Long();
8use Pod::Usage;
9use Coro;
10use Coro::Signal;
11use Coro::Semaphore;
12use Coro::Timer qw(sleep);
13use Coro::Handle;
14use IO::File;
15
16my $task = 0;
17my $overload = 1.1;
18my $file = '';
19my $logtrace;
20my $verbose;
21my $help;
22my $sig_transmit;
23my $sig_checkpoint = 'USR2';
24
25Getopt::Long::GetOptions(
26   'task=i'       => \$task,
27   'overload=f'   => \$overload,
28   'file=s'       => \$file,
29   'logtrace=s'   => \$logtrace,
30   'verbose'      => \$verbose,
31   'help'         => \$help,
32   'transmit'     => \$sig_transmit,
33   'kill=s'       => \$sig_checkpoint,
34   ) || pod2usage(-verbose => 0);
35pod2usage(-verbose => 2) if $help;
36
37if ($task == 0) {
38   open(NODE_FILE, '<', "$ENV{OAR_NODE_FILE}") or die "can't open ENV{OAR_NODE_FILE}: $!";
39   $task++ while <NODE_FILE>;
40   close NODE_FILE;
41   }
42
43# re-run, keep trace of job already done
44my %state;
45my $log_h = IO::File->new();
46if (-e "$logtrace") {
47   $log_h->open("< $logtrace")
48      or die "error: can't read log file: $!";
49   while (<$log_h>) {
50      $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/;
51      $state{$1} = 'end'   if m/^end\s+job\s+([^\s]+)\s/;
52      }
53   $log_h->close();
54   }
55if ($logtrace) {
56   $log_h->open(">> $logtrace")
57      or die "error: can't append log file $logtrace: $!";
58   $log_h->autoflush;
59   $log_h = unblock $log_h;
60   }
61
62# job to run
63my @job = ();
64open (JOB_LIST, '<', "$file") or die "can't open $file: $!";
65while (<JOB_LIST>) {
66   chomp;
67   next if m/^#/;
68   push @job, $_ if m/^\s*oarsub/;
69   }
70close JOB_LIST;
71
72my $container_id=$ENV{OAR_JOB_ID};
73my $insert_oar_option = "-t inner=$container_id";
74
75# interactive job
76if (not $container_id > 1) {
77   $insert_oar_option = '';
78   $overload = 1;
79   }
80
81
82my $finished = new Coro::Signal;
83my $job_active = new Coro::Semaphore 0;
84my $job_todo = new Coro::Semaphore 0;
85$job_todo->up for (@job);
86
87my %scheduled = ();
88
89# OAR checkpoint and default signal SIGUSR2
90my $oar_checkpoint = new Coro::Semaphore 0;
91my $notify         = new Coro::Signal;
92$SIG{$sig_checkpoint} = sub {
93   print "warning: receive checkpoint at "
94      . time
95      . ", no new job, just finishing running job\n"
96      if $verbose;
97   $oar_checkpoint->up();
98   $notify->send if $sig_transmit;
99   };
100
101# asynchrone notify job
102async {
103   while () {
104      $notify->wait;
105
106      for my $job_pid (keys %scheduled) {
107         system "oardel --checkpoint --signal $sig_checkpoint $job_pid";
108         cede;
109         }
110      }
111   }
112
113# asynchrone start job block
114async {
115   JOB:
116   for my $job (@job) {
117      while ($job_active->count >= $task*$overload) {
118         cede;
119         }
120
121      # no more launch job when OAR checkpointing
122      last JOB if $oar_checkpoint->count() > 0;
123
124      $job =~ s/^\s*oarsub//;
125      print "oarsub $insert_oar_option $job" if $verbose;
126      my $job_id = `oarsub $insert_oar_option $job|grep ^OAR_JOB_ID|cut -f 2 -d '='`;
127      chomp $job_id;
128      if ($job_id > 1) {
129         $scheduled{$job_id}++;
130         $job_active->up;
131         }
132      cede;
133     
134      # asynchrone guard for job end
135      async {
136         my $job_id = shift;
137         GUARD:
138         while () {
139            sleep 15; # async, do not re-launch oarstat to fast
140            my $is_finish = `oarstat -s -j $job_id`;
141            chomp $is_finish;
142            last GUARD if $is_finish =~ m/Terminated/;
143            }
144         delete $scheduled{$job_id};
145         $job_active->down;
146         $job_todo->down;
147         } $job_id;
148      }
149   };
150
151async {
152   while () {
153#      for my $job_id (keys %scheduled) {
154#         my $is_finish = `oarstat -s -j $job_id`;
155#         chomp $is_finish;
156#         if ($is_finish =~ m/Terminated/) {
157#            delete $scheduled{$job_id};
158#            $job_active->down;
159#            $job_todo->down;
160#            }
161#         cede;
162#         }
163
164      # checkpointing ! just finishing running job and quit
165      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;
166
167      $finished->send if $job_todo->count == 0;
168      cede;
169      }
170   };
171
172cede;
173   
174# all job have been done
175$finished->wait;
176
177# close log trace file
178$log_h->close() if $logtrace;
179
180
181__END__
182
183=head1 NAME
184
185oar-dispatch - dispatch lot of small oar job
186
187=head1 SYNOPSIS
188
189 oar-dispatch [--task integer] [--overload real] --file filecommand [--verbose]
190 oar-dispatch --help
191
192=head1 OPTIONS
193
194=over 12
195
196=item B<[-t|--task integer]>
197
198Number of task to do in parallel.
199Default to the line number of the file OAR_NODE_FILE.
200 
201=item B<[-o|--overload real]>
202
203Number of OAR job to create / number of task.
204Some job are create in advance to start whenever it's possible.
2051.1 by default.
206
207=item B<[-f|--file filecommand]>
208
209File name which content OAR job list
210
211=item B<[-v|--verbose]>
212 
213=item B<[-h|--help]>
214
215=back
216
217Input job file name content can have
218
219 - empty line
220 - comment line begin with #
221 - oarsub command without -t option
222 
223C<oar-dispatch> will add C<-t inner=container_id> in this command line,
224just after C<oarsub>.
225
226=head1 EXAMPLE
227
228Example where the file F<$HOME/test/subjob.txt> is a list of OAR script job (and can be executable but not need here).
229
230 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob1.oar
231 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob2.oar
232 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob3.oar
233 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob4.oar
234 ...
235 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob38.oar
236 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob39.oar
237 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob40.oar
238
239These jobs could be launch with
240
241 oarsub -t container -n test-container -l /core=6,walltime=00:35:00 "oar-dispatch -f ./subjob.list.txt"
242
243Total C<walltime> is defined by the formula:
244
245 total_walltime = subjob_walltime * total_subjob / core + global_delay
246
247In practise, C<oar-dispatch> take few second and each subjob run in less than it's walltime so
248
249 total_walltime < subjob_walltime * total_subjob / core
250
251If launch in interactif, C<overload> parameter is equal to 1,
252C<task> must be define
253and no inner container is add to the C<oarsub> command line.
254
255
256=head1 SEE ALSO
257
258oar-parexec, mpilauncher
259
260
261=head1 AUTHORS
262
263Written by Gabriel Moreau, Grenoble - France
264
265
266=head1 LICENSE AND COPYRIGHT
267
268GPL version 2 or later and Perl equivalent
269
270Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France
271
Note: See TracBrowser for help on using the repository browser.