#!/usr/bin/perl # # 2011/11/03 gabriel use strict; use Getopt::Long(); use Pod::Usage; use Coro; use Coro::Signal; use Coro::Semaphore; use Coro::AnyEvent; use Coro::Handle; use IO::File; my $task = 0; my $overload = 1; my $file = ''; my $dir; my $cmd; my $logtrace; my $verbose; my $help; my $sig_transmit; my $sig_checkpoint = 'USR2'; my $brake = 5; # five second time brake Getopt::Long::GetOptions( 'task=i' => \$task, 'overload=f' => \$overload, 'file=s' => \$file, 'dir=s' => \$dir, 'cmd=s' => \$cmd, 'logtrace=s' => \$logtrace, 'verbose' => \$verbose, 'help' => \$help, 'transmit' => \$sig_transmit, 'kill=s' => \$sig_checkpoint, 'brake=i' => \$brake, ) || pod2usage(-verbose => 0); pod2usage(-verbose => 2) if $help; if ($task == 0) { open(NODE_FILE, '<', "$ENV{OAR_NODE_FILE}") or die "can't open ENV{OAR_NODE_FILE}: $!"; $task++ while ; close NODE_FILE; } # re-run, keep trace of job already done my %state; my $log_h = IO::File->new(); if (-e "$logtrace") { $log_h->open("< $logtrace") or die "error: can't read log file: $!"; while (<$log_h>) { $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/; $state{$1} = 'end' if m/^end\s+job\s+([^\s]+)\s/; } $log_h->close(); } if ($logtrace) { $log_h->open(">> $logtrace") or die "error: can't append log file $logtrace: $!"; $log_h->autoflush; $log_h = unblock $log_h; } # job to run my @job = (); #open (JOB_LIST, '<', "$file") or die "can't open $file: $!"; #while () { # chomp; # next if m/^#/; # push @job, $_ if m/^\s*oarsub/; # } #close JOB_LIST; if (-e "$file") { my $job_num = 0; open(JOB_LIST, '<', "$file") or die "error: can't open job file $file: $!"; while (my $job_cmd = ) { chomp $job_cmd; next if $job_cmd =~ m/^#/; next if $job_cmd =~ m/^\s*$/; # Add oarsub -S if not $job_cmd = "oarsub -S $job_cmd" if $job_cmd !~ m/^\s*oarsub/; $job_num++; my ($job_name) = $job_cmd =~ m/#.*?\bname=(\S+?)\b/i; $job_name ||= $job_num; push @job, { name => $job_name, cmd => "$job_cmd", num => $job_num, }; } close JOB_LIST; } else { # Add oarsub -S if not $cmd = "oarsub -S $cmd" if $cmd !~ m/^\s*oarsub/; my $job_num = 0; opendir(DIR, $dir) or die "error: can't open folder $dir: $!"; while (my $item = readdir(DIR)) { next if $item =~ m/^\./; next if $item =~ m/:/; next if $item =~ m/\.old$/; next if $item =~ m/\.sav$/; next if $item =~ m/\.bak$/; next if $item =~ m/\.no$/; next unless (-d "$dir/$item"); $job_num++; push @job, { name => $item, cmd => "cd $dir/$item/; $cmd", num => $job_num, }; } closedir DIR; } my $container_id=$ENV{OAR_JOB_ID}; my $insert_oar_option = "-t inner=$container_id"; # interactive job if (not $container_id > 1) { $insert_oar_option = ''; $overload = 1; } my $finished = new Coro::Signal; my $job_todo = new Coro::Semaphore 0; my $job_name_maxlen; for (@job) { $job_todo->up; $job_name_maxlen = length($_->{name}) if length($_->{name}) > $job_name_maxlen; } my $job_active = new Coro::Semaphore 0; my %scheduled = (); # OAR checkpoint and default signal SIGUSR2 my $oar_checkpoint = new Coro::Semaphore 0; my $notify = new Coro::Signal; $SIG{$sig_checkpoint} = sub { print "warning: receive checkpoint at " . time . ", no new job, just finishing running job\n" if $verbose; $oar_checkpoint->up(); $notify->send if $sig_transmit; }; # asynchrone notify job async { while () { $notify->wait; for my $job_pid (keys %scheduled) { system "oardel --checkpoint --signal $sig_checkpoint $job_pid"; cede; } } } # asynchrone start job block async { JOB: for my $job (@job) { my $job_name = $job->{name}; my $job_cmd = $job->{cmd}; # job has been already run ? if (exists $state{$job_name}) { if ($state{$job_name} eq 'start') { print "warning: job $job_name was not clearly finished, relaunching...\n" if $verbose; } elsif ($state{$job_name} eq 'end') { delete $state{$job_name}; # free memory $job_todo->down; print "warning: job $job_name already run\n" if $verbose; cede; next JOB; } } while ($job_active->count >= $task*$overload) { cede; } # no more launch job when OAR checkpointing last JOB if $oar_checkpoint->count() > 0; $job_cmd =~ s/^\b(oarsub)\b/$1 $insert_oar_option/; print "$job_cmd" if $verbose; my $job_id = `$job_cmd|grep ^OAR_JOB_ID|cut -f 2 -d '='`; chomp $job_id; if ($job_id > 1) { $scheduled{$job_id} = { name => $job_name, }; $job_active->up; my $msg = sprintf "start job %${job_name_maxlen}s / %i at %s\n", $job_name, $job_id, time; $log_h->print($msg) if $logtrace; print($msg) if $verbose; } cede; # asynchrone guard for job end async { my $timer; GUARD: while () { cede; # wait to not re-launch oarstat to fast # equivalent to sleep $brake $timer = AE::now + $brake; while ( AE::now < $timer ) { # force update of AE time AE::now_update; cede; } my $is_finish = `oarstat -s -j $job_id`; chomp $is_finish; last GUARD if $is_finish =~ m/Terminated/; } my $msg = sprintf "end job %${job_name_maxlen}s / %i at %s\n", $scheduled{$job_id}->{name}, $job_id, time; # Job non finish, just suspend if received checkpoint signal $msg =~ s/^end\s+job/suspend job/ if $sig_transmit and $oar_checkpoint->count() > 0; $log_h->print($msg) if $logtrace; print($msg) if $verbose; $job_active->down; $job_todo->down; delete $scheduled{$job_id}; } } } async { while () { # checkpointing ! just finishing running job and quit $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0; $finished->send if $job_todo->count == 0; cede; } } cede; # all job have been done $finished->wait; # close log trace file $log_h->close() if $logtrace; __END__ =head1 NAME oar-dispatch - dispatch lot of small oar job =head1 SYNOPSIS oar-dispatch [--task integer] [--overload real] --file filecommand [--verbose] oar-dispatch --help =head1 OPTIONS =over 12 =item B<[-t|--task integer]> Number of task to do in parallel. Default to the line number of the file OAR_NODE_FILE. =item B<[-o|--overload real]> Number of OAR job to create / number of task. Some job are create in advance to start whenever it's possible. 1.1 by default. =item B<[-f|--file filecommand]> File name which content OAR job list =item B<[-v|--verbose]> =item B<[-h|--help]> =back Input job file name content can have - empty line - comment line begin with # - oarsub command without -t option C will add C<-t inner=container_id> in this command line, just after C. =head1 EXAMPLE Example where the file F<$HOME/test/subjob.txt> is a list of OAR script job (and can be executable but not need here). oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob1.oar oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob2.oar oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob3.oar oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob4.oar ... oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob38.oar oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob39.oar oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob40.oar These jobs could be launch with oarsub -t container -n test-container -l /core=6,walltime=00:35:00 "oar-dispatch -f ./subjob.list.txt" Total C is defined by the formula: total_walltime = subjob_walltime * total_subjob / core + global_delay In practise, C take few second and each subjob run in less than it's walltime so total_walltime < subjob_walltime * total_subjob / core If launch in interactif, C parameter is equal to 1, C must be define and no inner container is add to the C command line. =head1 SEE ALSO oar-parexec, mpilauncher =head1 AUTHORS Written by Gabriel Moreau, Grenoble - France =head1 LICENSE AND COPYRIGHT GPL version 2 or later and Perl equivalent Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France