Changeset 43
- Timestamp:
- Dec 5, 2011, 7:05:00 PM (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/oarutils/oar-parexec
r42 r43 16 16 use Cwd qw( getcwd ); 17 17 18 my $file 19 my $log file = '';18 my $filecmd = ''; 19 my $logtrace = ''; 20 20 my $verbose; 21 21 my $job_np = 1; … … 27 27 28 28 Getopt::Long::GetOptions( 29 'file =s' => \$file,30 'log file=s' => \$logfile,29 'filecmd=s' => \$filecmd, 30 'logtrace=s' => \$logtrace, 31 31 'verbose' => \$verbose, 32 32 'help' => \$help, … … 38 38 ) || pod2usage(-verbose => 0); 39 39 pod2usage(-verbose => 2) if $help; 40 pod2usage(-verbose => 2) if not -e $file; 41 40 pod2usage(-verbose => 2) if not -e $filecmd; 41 42 # re-run, keep trace of job already done 42 43 my %state; 43 44 my $log_h = IO::File->new(); 44 if (-e $log file) {45 $log_h->open("< $log file")46 or die " can't read log file: $!";45 if (-e $logtrace) { 46 $log_h->open("< $logtrace") 47 or die "error: can't read log file: $!"; 47 48 while (<$log_h>) { 48 49 $state{$1} = 'start' if m/^start\s+job\s+(\d+)\s/; … … 51 52 $log_h->close(); 52 53 } 53 if ($log file) {54 $log_h->open(">> $log file")55 or die " can't append log file $logfile: $!";54 if ($logtrace) { 55 $log_h->open(">> $logtrace") 56 or die "error: can't append log file $logtrace: $!"; 56 57 $log_h->autoflush; 57 58 $log_h = unblock $log_h; 58 59 } 59 60 61 # job to run 60 62 my @job = (); 61 open(JOB_LIST, '<', "$file ") or die "can't open job file $file: $!";63 open(JOB_LIST, '<', "$filecmd") or die "error: can't open job file $filecmd: $!"; 62 64 while (<JOB_LIST>) { 63 65 chomp; … … 68 70 close JOB_LIST; 69 71 72 # ressources available 70 73 my @ressources = (); 71 74 open(NODE_FILE, '<', "$nodefile") … … 80 83 81 84 my $ressource_size = scalar(@ressources); 82 die " not enought ressources jobnp $job_np > ressources $ressource_size"85 die "error: not enought ressources jobnp $job_np > ressources $ressource_size" 83 86 if $job_np > $ressource_size; 84 87 … … 96 99 $job_todo->up for (@job); 97 100 101 # slice of ressources for parallel job 98 102 my $ressources = new Coro::Channel; 99 103 for my $slot (1 .. int($ressource_size / $job_np)) { … … 107 111 my %scheduled = (); 108 112 113 # OAR checkpoint and default signal SIGUSR2 109 114 my $oar_checkpoint = new Coro::Semaphore 0; 110 115 $SIG{USR2} = sub { … … 116 121 }; 117 122 123 # asynchrone start job block 118 124 async { 125 JOB: 119 126 for my $job (@job) { 120 127 $job_num++; 121 128 129 # job has been already run ? 122 130 if (exists $state{$job_num}) { 123 131 if ($state{$job_num} eq 'start') { 124 print "warning: job $job_num was not finished, relaunching...\n"132 print "warning: job $job_num was not clearly finished, relaunching...\n" 125 133 if $verbose; 126 134 } 127 135 elsif ($state{$job_num} eq 'end') { 128 delete $state{$job_num}; 136 delete $state{$job_num}; # free memory 129 137 $job_todo->down; 130 print "warning: job $job_num already done\n" if $verbose;138 print "warning: job $job_num already run\n" if $verbose; 131 139 cede; 132 next ;140 next JOB; 133 141 } 134 142 } 135 143 144 # take job ressource 136 145 my $job_ressource = $ressources->get; 137 146 138 last if $oar_checkpoint->count() > 0; 147 # no more launch job when OAR checkpointing 148 last JOB if $oar_checkpoint->count() > 0; 139 149 140 150 my ($node_connect) = split ',', $job_ressource; 141 151 my $fh = IO::File->new(); 142 152 my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1") 143 or die " don't start subjob: $!";153 or die "error: can't start subjob: $!"; 144 154 145 155 $fh->autoflush; … … 155 165 my $msg = sprintf "start job %5i / %5i at %s on node %s\n", 156 166 $job_num, $job_pid, time, $job_ressource; 157 $log_h->print($msg) if $log file;167 $log_h->print($msg) if $logtrace; 158 168 print($msg) if $verbose; 159 169 … … 164 174 my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$job_num"; 165 175 176 # set job environment, run it and clean 166 177 if ($job_np > 1) { 167 178 $fh->print("printf \"" … … 182 193 } 183 194 195 # asynchrone end job block 184 196 async { 185 197 while () { 186 198 for my $job_pid (keys %scheduled) { 199 # non blocking PID test 187 200 if (waitpid($job_pid, WNOHANG)) { 188 201 my $msg = sprintf "end job %5i / %5i at %s on node %s\n", 189 202 $scheduled{$job_pid}->{num}, 190 203 $job_pid, time, $scheduled{$job_pid}->{ressource}; 191 $log_h->print($msg) if $log file;204 $log_h->print($msg) if $logtrace; 192 205 print($msg) if $verbose; 193 206 close $scheduled{$job_pid}->{fh}; 207 # leave ressources for another job 194 208 $ressources->put($scheduled{$job_pid}->{ressource}); 195 209 $job_todo->down; … … 199 213 } 200 214 215 # checkpointing ! just finishing running job and quit 201 216 $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0; 202 217 … … 208 223 cede; 209 224 225 # all job have been done 210 226 $finished->wait; 211 227 212 $log_h->close() if $logfile; 228 # close log trace file 229 $log_h->close() if $logtrace; 213 230 214 231 __END__ … … 220 237 =head1 SYNOPSIS 221 238 222 oar-parexec --file filecommand[--verbose] [--jobnp integer] [--nodefile filenode] [--masterio basefileio] [--switchio] [--oarsh sssh]239 oar-parexec --filecmd filecommand [--logtrace tracefile] [--verbose] [--jobnp integer] [--nodefile filenode] [--masterio basefileio] [--switchio] [--oarsh sssh] 223 240 oar-parexec --help 224 241 … … 230 247 which define automatically theses strategics parameters... 231 248 232 Option C<--file > is the only mandatory one.249 Option C<--filecmd> is the only mandatory one. 233 250 234 251 Small job will be launch in the same folder as the master job. … … 254 271 =over 12 255 272 256 =item B<-f|--file filecommand>273 =item B<-f|--filecmd filecommand> 257 274 258 275 File name which content job list. 276 277 =item B<-l|--logtrace tracefile> 278 279 File which log and trace running job. 280 In case of running the same command (after crash for example), 281 only job that ar not mark as done will be run again. 282 Be carefful, job mark as running (start but for finish) will be run again. 283 284 This option is very usefull in case of crash 285 but also for checkpointing and idempotent OAR job. 259 286 260 287 =item B<-v|--verbose> … … 308 335 =head1 EXAMPLE 309 336 310 Content for the job file (option C<--file>) could have:337 Content for the job file command (option C<--filecmd>) could have: 311 338 312 339 - empty line
Note: See TracChangeset
for help on using the changeset viewer.