Changeset 41 for trunk/oarutils/oar-parexec
- Timestamp:
- Dec 5, 2011, 11:56:31 AM (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/oarutils/oar-parexec
r40 r41 16 16 use Cwd qw( getcwd ); 17 17 18 my $file = '';18 my $file = ''; 19 19 my $logfile = ''; 20 20 my $verbose; … … 36 36 'masterio=s' => \$masterio, 37 37 'switchio' => \$switchio, 38 ) || pod2usage( -verbose => 0);39 pod2usage( -verbose => 2) if $help;40 pod2usage( -verbose => 2) if not -e $file;38 ) || pod2usage(-verbose => 0); 39 pod2usage(-verbose => 2) if $help; 40 pod2usage(-verbose => 2) if not -e $file; 41 41 42 42 my %state; … … 46 46 or die "can't read log file: $!"; 47 47 while (<$log_h>) { 48 49 50 48 $state{$1} = 'start' if m/^start\s+job\s+(\d+)\s/; 49 $state{$1} = 'end' if m/^end\s+job\s+(\d+)\s/; 50 } 51 51 $log_h->close(); 52 52 } 53 53 if ($logfile) { 54 54 $log_h->open(">> $logfile") 55 55 or die "can't append log file $logfile: $!"; 56 56 $log_h->autoflush; 57 57 $log_h = unblock $log_h; … … 59 59 60 60 my @job = (); 61 open( JOB_LIST, '<', "$file") or die "can't open job file $file: $!";61 open(JOB_LIST, '<', "$file") or die "can't open job file $file: $!"; 62 62 while (<JOB_LIST>) { 63 63 chomp; 64 64 next if m/^#/; 65 65 next if m/^\s*$/; 66 push @job, $_ 66 push @job, $_; 67 67 } 68 68 close JOB_LIST; 69 69 70 70 my @ressources = (); 71 open( NODE_FILE, '<', "$nodefile")71 open(NODE_FILE, '<', "$nodefile") 72 72 or die "can't open $nodefile: $!"; 73 73 while (<NODE_FILE>) { … … 75 75 next if m/^#/; 76 76 next if m/^\s*$/; 77 push @ressources, $_ 77 push @ressources, $_; 78 78 } 79 79 close NODE_FILE; 80 80 81 81 my $ressource_size = scalar(@ressources); 82 die "not enought ressources jobnp $job_np > ressources $ressource_size" if $job_np > $ressource_size; 82 die "not enought ressources jobnp $job_np > ressources $ressource_size" 83 if $job_np > $ressource_size; 83 84 84 85 my $current_dir = getcwd(); … … 91 92 $stdout = $masterio if $masterio; 92 93 93 94 94 my $finished = new Coro::Signal; 95 95 my $job_todo = new Coro::Semaphore 0; … … 98 98 my $ressources = new Coro::Channel; 99 99 for my $slot (1 .. int($ressource_size / $job_np)) { 100 $ressources->put( join(',', @ressources[(($slot - 1) * $job_np) .. (($slot * $job_np) - 1)] ) ); 101 } 102 100 $ressources->put( 101 join(',', 102 @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ]) 103 ); 104 } 103 105 104 106 my $job_num = 0; … … 106 108 107 109 my $oar_checkpoint = new Coro::Semaphore 0; 108 $SIG{USR2} = sub { $oar_checkpoint->up};110 $SIG{USR2} = sub {$oar_checkpoint->up}; 109 111 110 112 async { … … 113 115 114 116 if (exists $state{$job_num}) { 115 if ($state{$job_num} eq 'start') { 116 print "warning: job $job_num was not finished, relaunching...\n" if $verbose; 117 } 117 if ($state{$job_num} eq 'start') { 118 print "warning: job $job_num was not finished, relaunching...\n" 119 if $verbose; 120 } 118 121 elsif ($state{$job_num} eq 'end') { 119 120 121 122 123 124 125 } 122 delete $state{$job_num}; 123 $job_todo->down; 124 print "warning: job $job_num already done\n" if $verbose; 125 cede; 126 next; 127 } 128 } 126 129 127 130 my $job_ressource = $ressources->get; … … 130 133 131 134 my ($node_connect) = split ',', $job_ressource; 132 my $fh 135 my $fh = IO::File->new(); 133 136 my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1") 134 137 or die "don't start subjob: $!"; … … 137 140 $fh = unblock $fh; 138 141 139 $scheduled{$job_pid} = { fh => $fh, node_connect => $node_connect, ressource => $job_ressource, num => $job_num }; 142 $scheduled{$job_pid} = { 143 fh => $fh, 144 node_connect => $node_connect, 145 ressource => $job_ressource, 146 num => $job_num 147 }; 140 148 141 149 $log_h->printf("start job %5i at %s\n", $job_num, time) if $logfile; … … 144 152 if $verbose; 145 153 146 my ( $job_stdout, $job_stderr);154 my ($job_stdout, $job_stderr); 147 155 $job_stdout = "> $stdout-$job_num.stdout" if $stdout ne '' and $switchio; 148 156 $job_stderr = "2> $stderr-$job_num.stderr" if $stderr ne '' and $switchio; … … 152 160 if ($job_np > 1) { 153 161 $fh->print("printf \"" 154 . join('\n',split(',',$job_ressource,))155 . "\" > $job_nodefile\n");162 . join('\n', split(',', $job_ressource,)) 163 . "\" > $job_nodefile\n"); 156 164 $fh->print("OAR_NODE_FILE=$job_nodefile\n"); 157 165 $fh->print("OAR_NP=$job_np\n"); … … 170 178 async { 171 179 while () { 172 for my $job_pid ( keys %scheduled) {173 if ( waitpid( $job_pid, WNOHANG )) {180 for my $job_pid (keys %scheduled) { 181 if (waitpid($job_pid, WNOHANG)) { 174 182 $log_h->printf("end job %5i at %s\n", 175 $scheduled{$job_pid}->{num}, 176 time)if $logfile;183 $scheduled{$job_pid}->{num}, time) 184 if $logfile; 177 185 printf "end job %5i / %5i at %s on node %s\n", 178 186 $scheduled{$job_pid}->{num}, 179 $job_pid, time, 180 $scheduled{$job_pid}->{ressource} 187 $job_pid, time, $scheduled{$job_pid}->{ressource} 181 188 if $verbose; 182 189 close $scheduled{$job_pid}->{fh}; 183 $ressources->put( $scheduled{$job_pid}->{ressource});190 $ressources->put($scheduled{$job_pid}->{ressource}); 184 191 $job_todo->down; 185 192 delete $scheduled{$job_pid}; … … 200 207 201 208 $log_h->close() if $logfile; 202 203 209 204 210 __END__
Note: See TracChangeset
for help on using the changeset viewer.