#!/usr/bin/perl # # 2011/11/27 Gabriel Moreau use strict; use Getopt::Long(); use Pod::Usage; use Coro; use Coro::Semaphore; use Coro::Signal; use Coro::Channel; use Coro::Handle; use IO::File; use POSIX qw( WNOHANG WEXITSTATUS ); use Cwd qw( getcwd ); my $file; my $dir; my $cmd; my $logtrace; my $verbose; my $job_np = 1; my $nodefile = $ENV{OAR_NODE_FILE} || ''; my $masterio; my $switchio; my $help; my $oarsh = 'oarsh -q -T'; my $sig_transmit; my $sig_checkpoint = 'USR2'; my $job_launch_brake = 1; # one second time brake Getopt::Long::GetOptions( 'file=s' => \$file, 'dir=s' => \$dir, 'cmd=s' => \$cmd, 'logtrace=s' => \$logtrace, 'verbose' => \$verbose, 'help' => \$help, 'oarsh=s' => \$oarsh, 'jobnp=i' => \$job_np, 'nodefile=s' => \$nodefile, 'masterio=s' => \$masterio, 'switchio' => \$switchio, 'transmit' => \$sig_transmit, 'kill=s' => \$sig_checkpoint, ) || pod2usage(-verbose => 0); pod2usage(-verbose => 2) if $help; pod2usage(-verbose => 2) if not ( (-e "$file") or (-d "$dir" and $cmd ne '') ); my $oar_version = `oarsub -V | awk '{print \$4}'`; chomp $oar_version; # global time my $global_time_atstart = time; my $global_time_total = 0; my $global_time_cumulative = 0; # re-run, keep trace of job already done my %state; my $log_h = IO::File->new(); if (-e "$logtrace") { $log_h->open("< $logtrace") or die "error: can't read log file: $!"; while (<$log_h>) { # log version 1 $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/; $state{$1} = 'end' if m/^end\s+job\s+([^\s]+)\s/; # log version 2 $state{$1} = 'start' if m/^start\s+subjob\s+([^\s]+)\s/; $state{$1} = 'end' if m/^end\s+subjob\s+([^\s]+)\s/; ($global_time_total, $global_time_cumulative) = ($1, $2) if m/^global-time\s.*total\s+(\d+)\s+cumulative\s+(\d+)/; } $log_h->close(); } if ($logtrace) { $log_h->open(">> $logtrace") or die "error: can't append log file $logtrace: $!"; $log_h->autoflush; $log_h = unblock $log_h; } # write log format version $log_h->print("log version 2\n") if $logtrace; print("log version 2\n") if $verbose; # job to run my @job = (); if (-e "$file") { my $job_num = 0; open(JOB_LIST, '<', "$file") or die "error: can't open job file $file: $!"; while (my $job_cmd = ) { chomp $job_cmd; next if $job_cmd =~ m/^#/; next if $job_cmd =~ m/^\s*$/; $job_num++; my ($job_name) = $job_cmd =~ m/#.*?\bname=(\S+?)\b/i; $job_name ||= $job_num; push @job, { name => $job_name, cmd => "$job_cmd", num => $job_num, }; } close JOB_LIST; } else { my $job_num = 0; opendir(DIR, $dir) or die "error: can't open folder $dir: $!"; while (my $item = readdir(DIR)) { next if $item =~ m/^\./; next if $item =~ m/:/; next if $item =~ m/\.old$/; next if $item =~ m/\.sav$/; next if $item =~ m/\.bak$/; next if $item =~ m/\.no$/; next unless (-d "$dir/$item"); $job_num++; push @job, { name => $item, cmd => "cd $dir/$item/; $cmd", num => $job_num, }; } closedir DIR; } # assume unique job name { my %seen = (); my $count_unique_name = grep { ! $seen{ $_->{name} }++ } @job; if ($count_unique_name != $#job) { $_->{name} = $_->{num} for @job; } } # ressources available my @ressources = (); open(NODE_FILE, '<', "$nodefile") or die "can't open $nodefile: $!"; while () { chomp; next if m/^#/; next if m/^\s*$/; push @ressources, $_; } close NODE_FILE; my $ressource_size = scalar(@ressources); die "error: not enought ressources jobnp $job_np > ressources $ressource_size" if $job_np > $ressource_size; my $current_dir = getcwd(); my $stderr = $ENV{OAR_STDERR} || ''; $stderr =~ s/\.stderr$//; $stderr = $masterio if $masterio; my $stdout = $ENV{OAR_STDOUT} || ''; $stdout =~ s/\.stdout$//; $stdout = $masterio if $masterio; my $finished = new Coro::Signal; my $job_todo = new Coro::Semaphore 0; my $job_name_maxlen; for (@job) { $job_todo->up; $job_name_maxlen = length($_->{name}) if length($_->{name}) > $job_name_maxlen; } # slice of ressources for parallel job my $ressources = new Coro::Channel; for my $slot (1 .. int($ressource_size / $job_np)) { $ressources->put( join(',', @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ]) ); } my %scheduled = (); # OAR checkpoint and default signal SIGUSR2 my $oar_checkpoint = new Coro::Semaphore 0; my $notify = new Coro::Signal; $SIG{$sig_checkpoint} = sub { print "warning: receive checkpoint at " . time . ", no new job, just finishing running job\n" if $verbose; $oar_checkpoint->up(); $notify->send if $sig_transmit; }; # asynchrone notify job async { while () { $notify->wait; for my $job_pid (keys %scheduled) { my $job_name = $scheduled{$job_pid}->{name}; my $job_pidfile = $scheduled{$job_pid}->{pidfile}; my $node_connect = $scheduled{$job_pid}->{node_connect}; my $fh = IO::File->new(); $fh->open("| $oarsh $node_connect >/dev/null 2>&1") or die "error: can't notify subjob: $!"; $fh->autoflush; $fh = unblock $fh; $fh->print("kill -$sig_checkpoint \$(cat $job_pidfile)\n"); $fh->print("exit\n"); print "warning: transmit signal $sig_checkpoint" . " to job $job_name on node $node_connect.\n" if $verbose; close $fh; cede; } } } # asynchrone start job block async { my $timer; JOB: for my $job (@job) { my $job_name = $job->{name}; my $job_cmd = $job->{cmd}; # job has been already run ? if (exists $state{$job_name}) { if ($state{$job_name} eq 'start') { print "warning: job $job_name was not clearly finished, relaunching...\n" if $verbose; } elsif ($state{$job_name} eq 'end') { delete $state{$job_name}; # free memory $job_todo->down; print "warning: job $job_name already run\n" if $verbose; cede; next JOB; } } # wait to not re-launch oarstat to fast # equivalent to sleep $job_launch_brake $timer = AE::now + $job_launch_brake; while ( AE::now < $timer ) { # force update of AE time AE::now_update; cede; } # take job ressource my $job_ressource = $ressources->get; # no more launch job when OAR checkpointing last JOB if $oar_checkpoint->count() > 0; my ($node_connect) = split ',', $job_ressource; my $fh = IO::File->new(); my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1") or die "error: can't start subjob: $!"; $fh->autoflush; $fh = unblock $fh; my $begin_at = time; #my $msg = sprintf "start job %${job_name_maxlen}s / %5i at %s oar job %i on node %s\n", my $msg = sprintf "start subjob %${job_name_maxlen}s pid %5i at %s oarjob %i onnode %s\n", $job_name, $job_pid, $begin_at, $ENV{OAR_JOB_ID}, $job_ressource; $log_h->print($msg) if $logtrace; print($msg) if $verbose; my ($job_stdout, $job_stderr); $job_stdout = "> $stdout-$job_name.stdout" if $stdout ne '' and $switchio; $job_stderr = "2> $stderr-$job_name.stderr" if $stderr ne '' and $switchio; my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$ENV{OAR_JOB_ID}-$job_name"; my $job_pidfile = "/tmp/oar-parexec-$ENV{LOGNAME}-$ENV{OAR_JOB_ID}-$job_name.pid"; my $job_statusfile = "/tmp/oar-parexec-$ENV{LOGNAME}-$ENV{OAR_JOB_ID}-$job_name.status"; $scheduled{$job_pid} = { fh => $fh, node_connect => $node_connect, ressource => $job_ressource, name => $job_name, pidfile => $job_pidfile, begin_at => $begin_at, }; # set job environment, run it and clean if ($job_np > 1) { $fh->print("printf \"" . join('\n', split(',', $job_ressource,)) . "\" > $job_nodefile\n"); $fh->print("OAR_NODE_FILE=$job_nodefile\n"); $fh->print("OAR_NP=$job_np\n"); $fh->print("export OAR_NODE_FILE\n"); $fh->print("export OAR_NP\n"); $fh->print("unset OAR_MSG_NODEFILE\n"); } $fh->print("cd $current_dir\n"); if ($sig_transmit) { $fh->print("trap 'jobs -p|xargs -r ps -o pid --no-headers --ppid|xargs -r kill -$sig_checkpoint' $sig_checkpoint\n"); $fh->print("echo \$\$ > $job_pidfile\n"); } $fh->print("echo 0 > $job_statusfile\n"); $fh->print("(\n"); $fh->print("$job_cmd\n"); $fh->print(") $job_stdout $job_stderr || echo \$? > $job_statusfile \&\n"); $fh->print("while [ \$(jobs -p | wc -l) -gt 0 ]\n"); $fh->print("do\n"); $fh->print(" wait\n"); $fh->print("done\n"); $fh->print("OAR_SUBJOB_RETCODE=\$(cat $job_statusfile)\n"); $fh->print("rm -f $job_statusfile\n"); $fh->print("rm -f $job_pidfile\n") if $sig_transmit; $fh->print("rm -f $job_nodefile\n") if $job_np > 1; $fh->print("exit \$OAR_SUBJOB_RETCODE\n"); cede; } } # asynchrone end job block async { while () { for my $job_pid (keys %scheduled) { # non blocking PID test if (waitpid($job_pid, WNOHANG)) { # get return status code my $job_retcode0 = $? >> 8; #print "ERREUR0 $job_pid $job_retcode0\n" if $job_retcode0; # job time my $end_at = time; my $duration = $end_at - $scheduled{$job_pid}->{begin_at}; $global_time_cumulative += $duration; #my $msg = sprintf "end job %${job_name_maxlen}s / %5i at %s oar job %i on node %s\n", my $msg = sprintf "end subjob %${job_name_maxlen}s pid %5i at %s oarjob %i onnode %s duration %i status %i\n", $scheduled{$job_pid}->{name}, $job_pid, $end_at, $ENV{OAR_JOB_ID}, $scheduled{$job_pid}->{ressource}, $duration, $job_retcode0; # Job error $msg =~ s/^end\s+subjob/error subjob/ if $job_retcode0 > 0 and $job_retcode0 != 99; # Job non finish, just suspend if received checkpoint signal $msg =~ s/^end\s+subjob/suspend subjob/ if $sig_transmit and $oar_checkpoint->count() > 0; $log_h->print($msg) if $logtrace; print($msg) if $verbose; close $scheduled{$job_pid}->{fh}; # leave ressources for another job $ressources->put($scheduled{$job_pid}->{ressource}); $job_todo->down; delete $scheduled{$job_pid}; } cede; } # checkpointing ! just finishing running job and quit $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0; $finished->send if $job_todo->count() == 0; cede; } } cede; # all job have been done $finished->wait; # global time $global_time_total += (time - $global_time_atstart); $log_h->print("global-time total $global_time_total cumulative $global_time_cumulative\n") if $logtrace; print("global-time total $global_time_total cumulative $global_time_cumulative\n") if $verbose; # close log trace file $log_h->close() if $logtrace; exit 99 if (($oar_checkpoint->count() > 0) and ($oar_version !~ m/^2\.4/)); __END__ =head1 NAME oar-parexec - parallel execution of many small short or long job =head1 SYNOPSIS oar-parexec --file filecommand \ [--logtrace tracefile] [--verbose] \ [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \ [--switchio] [--masterio basefileio] \ [--kill signal] [--transmit] oar-parexec --dir foldertoiterate --cmd commandtolaunch \ [--logtrace tracefile] [--verbose] \ [--jobnp integer] [--nodefile filenode] [--oarsh sssh] \ [--switchio] [--masterio basefileio] \ [--kill signal] [--transmit] oar-parexec --help =head1 DESCRIPTION C can execute lot of small short or long job in parallel inside a cluster. Number of parallel job at one time cannot exceed the number of core define in the node file. C is easier to use inside an OAR job environment which define automatically these strategics parameters... However, it can be used outside OAR. Option C<--file> or C<--dir> and C<--cmd> are the only mandatory parameters. Small job will be launch in the same folder as the master job. Two environment variable are defined for each small job and only in case of parallel small job (option C<--jobnp> > 1). OAR_NODE_FILE - file that list node for parallel computing OAR_NP - number of processor affected The file define by OAR_NODE_FILE is created in /tmp on the node before launching the small job and this file will be delete after job complete. C is a simple script, OAR_NODE_FILE will not be deleted in case of crash of the master job. OAR define other variable that are equivalent to OAR_NODE_FILE: OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE... You can use in your script the OAR original file ressources by using these variable if you need it. When use with long job, activate option C<--tranmit> to send OAR checkpoint signal and suspend small job before the walltime cut! =head1 OPTIONS =over 12 =item B<-f|--file filecommand> File name which content job list. For the JOB_NAME definition, the first valid job in the list will have the number 1 and so on... It's possible to fix the name inside a comment on the job line. For example: $HOME/test/subjob1.sh # name=subjob1 The key C is case insensitive, the associated value cannot have a space... The command can be any shell command. It's possible to change folder, or launch an asynchrone job in parallel, but one command must block and not be launch in asynchrone (with & or coproc). Example : cd ./test; ./subjob1.sh cd ./test; nice -18 du -sk ./ & ./subjob1.sh Commands C and C<./subjob1.sh> will be done in parallel on the same ressource... It's better if C is faster than C<./subjob1.sh> ! Do not abuse of that! =item B<-d|--dir foldertoiterate> Command C<--cmd> will be launch in all sub-folder of this master folder. Files in this folder will be ignored. Sub-folder name which begin with F<.> or finish with F<.old>, F<.sav>, F<.bak>, F<.no> will either be ignored... The JOB_NAME is simply the Sub-folder name. =item B<-c|--cmd commandtolaunch> Command (and argument to it) that will be launch in all sub-folder parameter folfer C<--dir>. Like for option C<--file>, command can be any valid shell command but one must block. =item B<-l|--logtrace tracefile> File which log and trace running job. In case of running the same master command (after crash for example), only job that are not mark as done will be run again. Be careful, job mark as running (start but not finish) will be run again. Tracing is base on the JOB_NAME between multiple run. This option is very usefull in case of crash but also for checkpointing and idempotent OAR job. =item B<-v|--verbose> =item B<-j|--jobnp integer> Number of processor to allocated for each small job. 1 by default. =item B<-n|--nodefile filenode> File name that list all the node where job could be launch. By defaut, it's define automatically by OAR via environment variable C. For example, if you want to use 6 core on your cluster node, you need to put 6 times the hostname node in this file, one per line... It's a very common file in MPI process ! =item B<-o|-oarsh command> Command use to launch a shell on a node. By default oarsh -q -T Change it to C if you are not using an OAR cluster... =item B<-s|--switchio> Each small job will have it's own output STDOUT and STDERR base on master OAR job with C inside (or base on C if option C). Example : OAR.151524.stdout -> OAR.151524-JOB_NAME.stdout where 151524 here is the master C and C is the small job name. =item B<-m|--masterio basefileio> The C will be use in place of environment variable C and C (without extension) to build the base name of the small job standart output (only use when option C is activated). =item B<-k|--kill signal> Signal to listen and make a clean stop of the current C process. By default, use USR2 signal (see C for a list of possible signal). =item B<-t|--transmit> Resend catch signal to sub-job when receiving it. By default, no signal is transmis to child process. It's only valuable if use for long sub-job than can in return make themselves a clean restart. =item B<-h|--help> =back =head1 EXAMPLE =head2 Simple list of sequential job Content for the job file command (option C<--file>) could have: - empty line - comment line begin with # - valid shell command (can containt comment) Example where F<$HOME/test/subjob1.sh> is a shell script (executable). $HOME/test/subjob01.sh # name=subjob01 $HOME/test/subjob02.sh # name=subjob02 $HOME/test/subjob03.sh # name=subjob03 $HOME/test/subjob04.sh # name=subjob04 ... $HOME/test/subjob38.sh # name=subjob38 $HOME/test/subjob39.sh # name=subjob39 $HOME/test/subjob40.sh # name=subjob40 These jobs could be launch by: oarsub -n test -l /core=6,walltime=04:00:00 \ "oar-parexec -f ./subjob.list.txt" =head2 Folder job In a folder F, create sub-folder with your data inside : F, ... The same command will be executed in every sub-folder. C change the current directory to the sub-folder before launching it. A very simple job could be: oarsub -n test -l /core=6,walltime=04:00:00 \ "oar-parexec -d ./subjob.d -c 'sleep 10; env'" The command C will be excuted in all folder F, F... after a 10s pause. Sometime, it's simpler to use file list command, sometime, jobs by folder with the same command run is more relevant. =head2 Parallel job You need to put the number of core each small job need with option C<--jobnp>. If your job is build on OpenMP or MPI, you can use OAR_NP and OAR_NODE_FILE variables to configure them. On OAR cluster, you need to use C or a wrapper like C for connexion between node instead of C. Example with parallel small job on 2 core: oarsub -n test -l /core=6,walltime=04:00:00 \ "oar-parexec -j 2 -f ./subjob.list.txt" =head2 Tracing and master crash If the master node crash after hours of calculus, everything is lost ? No, with option C<--logtrace>, it's possible to remember older result and not re-run these job the second and next time. oarsub -n test -l /core=6,walltime=04:00:00 \ "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log" After a crash or an C command, you can then re-run the same command that will end to execute the jobs in the list oarsub -n test -l /core=6,walltime=04:00:00 \ "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log" C file are just plain file. We use the extension '.log' because these files are automatically eliminate from our backup system! =head2 Checkpointing and Idempotent C is compatible with the OAR checkpointing. If you have 2000 small jobs that need 55h to be done on 6 cores, you can cut this in small parts. For this example, we suppose that each small job need about 10min... So, we send a checkpoint 12min before the end of the process to let C finish the jobs started. After being checkpointed, C do not start any new small job. oarsub -t idempotent -n test \ -l /core=6,walltime=04:00:00 \ --checkpoint 720 \ "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log" After 3h48min, the OAR job will begin to stop launching new small job. When all running small job are finished, it's exit. But as the OAR job is type C, OAR will re-submit it as long as all small job are not executed... This way, we let other users a chance to use the cluster! In this last exemple, we use moldable OAR job with idempotent to reserve many core for a small time or a few cores for a long time: oarsub -t idempotent -n test \ -l /core=50,walltime=01:05:00 \ -l /core=6,walltime=04:00:00 \ --checkpoint 720 \ "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log" =head2 Signal, recurse and long job By default, OAR use signal USR2 for checkpointing. It's possible to change this with option C<--kill>. When use with long small job, checkpointing could be too long... More than walltime! The option C<--transmit> could be use to checkpoint small job! These long small job will then stop cleanly and will be restarted next time. In the C file, small job will have the status suspend. They will be launch with the same command line at the next OAR run. Example: if you have 50 small jobs that each need 72h to be done on 1 cores, you can cut this in 24h parts. For this example, we suppose that each long job loop need about 20min... So, we send a checkpoint 30min before the end of the process to let C suspend the jobs started. After being checkpointed, C do not start any new small job. oarsub -t idempotent -n test \ -l /core=6,walltime=24:00:00 \ --checkpoint 1800 \ --transmit \ "oar-parexec -f ./subjob.list.txt -l ./subjob.list.log" After 23h30min, the OAR job will begin to stop launching new small job. When all running small job are suspend, it's exit. But as the OAR job is type C, OAR will re-submit it as long as all small job are not finished... =head2 Log format =over =item B log version 2 start subjob 1 pid 101468 at 1450482228 oarjob 71725 onnode cl7n001 end subjob 1 pid 101468 at 1450482556 oarjob 71725 onnode cl7n001 duration 657 status 0 error subjob 1 pid 101468 at 1450482556 oarjob 71725 onnode cl7n001 duration 657 status 0 suspend subjob 1 pid 101468 at 1450482556 oarjob 71725 onnode cl7n001 duration 657 status 0 global-time total 555 cumulative 44444 =item B log version 1 start job 1 / 101468 at 1450482228 oar job 71725 on node cl7n001 end job 1 / 101468 at 1450482556 oar job 71725 on node cl7n001 end job 1 / 101468 at 1450482556 oar job 71725 on node cl7n001 error:retcode job 1 / 101468 at 1450482556 oar job 71725 on node cl7n00 =back =head1 SEE ALSO oar-dispatch, mpilauncher, orsh, oar-envsh, ssh =head1 AUTHORS Written by Gabriel Moreau, Grenoble - France =head1 LICENSE AND COPYRIGHT GPL version 2 or later and Perl equivalent Copyright (C) 2011-2017 Gabriel Moreau / LEGI - CNRS UMR 5519 - France