#!/usr/bin/perl
#
# 2011/11/03 gabriel

use strict;

use Getopt::Long();
use Pod::Usage;
use Coro;
use Coro::Signal;
use Coro::Semaphore;
use Coro::Timer qw(sleep);
use Coro::Handle;
use IO::File;

my $task = 0;
my $overload = 1.1;
my $file = '';
my $logtrace;
my $verbose;
my $help;
my $sig_transmit;
my $sig_checkpoint = 'USR2';

Getopt::Long::GetOptions(
   'task=i'       => \$task,
   'overload=f'   => \$overload,
   'file=s'       => \$file,
   'logtrace=s'   => \$logtrace,
   'verbose'      => \$verbose,
   'help'         => \$help,
   'transmit'     => \$sig_transmit,
   'kill=s'       => \$sig_checkpoint,
   ) || pod2usage(-verbose => 0);
pod2usage(-verbose => 2) if $help;

if ($task == 0) {
   open(NODE_FILE, '<', "$ENV{OAR_NODE_FILE}") or die "can't open ENV{OAR_NODE_FILE}: $!";
   $task++ while <NODE_FILE>;
   close NODE_FILE;
   }

# re-run, keep trace of job already done
my %state;
my $log_h = IO::File->new();
if (-e "$logtrace") {
   $log_h->open("< $logtrace")
      or die "error: can't read log file: $!";
   while (<$log_h>) {
      $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/;
      $state{$1} = 'end'   if m/^end\s+job\s+([^\s]+)\s/;
      }
   $log_h->close();
   }
if ($logtrace) {
   $log_h->open(">> $logtrace")
      or die "error: can't append log file $logtrace: $!";
   $log_h->autoflush;
   $log_h = unblock $log_h;
   }

# job to run
my @job = ();
open (JOB_LIST, '<', "$file") or die "can't open $file: $!";
while (<JOB_LIST>) {
   chomp;
   next if m/^#/;
   push @job, $_ if m/^\s*oarsub/;
   }
close JOB_LIST;

my $container_id=$ENV{OAR_JOB_ID};
my $insert_oar_option = "-t inner=$container_id";

# interactive job
if (not $container_id > 1) {
   $insert_oar_option = '';
   $overload = 1;
   }


my $finished = new Coro::Signal;
my $job_active = new Coro::Semaphore 0;
my $job_todo = new Coro::Semaphore 0;
$job_todo->up for (@job);

my %scheduled = ();

# OAR checkpoint and default signal SIGUSR2
my $oar_checkpoint = new Coro::Semaphore 0;
my $notify         = new Coro::Signal;
$SIG{$sig_checkpoint} = sub {
   print "warning: receive checkpoint at "
      . time
      . ", no new job, just finishing running job\n"
      if $verbose;
   $oar_checkpoint->up();
   $notify->send if $sig_transmit;
   };

# asynchrone notify job
async {
   while () {
      $notify->wait;

      for my $job_pid (keys %scheduled) {
         system "oardel --checkpoint --signal $sig_checkpoint $job_pid";
         cede;
         }
      }
   }

# asynchrone start job block
async {
   JOB:
   for my $job (@job) {
      while ($job_active->count >= $task*$overload) {
         cede;
         }

      # no more launch job when OAR checkpointing
      last JOB if $oar_checkpoint->count() > 0;

      $job =~ s/^\s*oarsub//;
      print "oarsub $insert_oar_option $job" if $verbose;
      my $job_id = `oarsub $insert_oar_option $job|grep ^OAR_JOB_ID|cut -f 2 -d '='`;
      chomp $job_id;
      if ($job_id > 1) {
         $scheduled{$job_id}++;
         $job_active->up;
         }
      cede;
      
      # asynchrone guard for job end
      async {
         my $job_id = shift;
         GUARD:
         while () {
            sleep 15; # async, do not re-launch oarstat to fast
            my $is_finish = `oarstat -s -j $job_id`;
            chomp $is_finish;
            last GUARD if $is_finish =~ m/Terminated/;
            }
         delete $scheduled{$job_id};
         $job_active->down;
         $job_todo->down;
         } $job_id;
      }
   };

async {
   while () {
#      for my $job_id (keys %scheduled) {
#         my $is_finish = `oarstat -s -j $job_id`;
#         chomp $is_finish;
#         if ($is_finish =~ m/Terminated/) {
#            delete $scheduled{$job_id};
#            $job_active->down;
#            $job_todo->down;
#            }
#         cede;
#         }

      # checkpointing ! just finishing running job and quit
      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;

      $finished->send if $job_todo->count == 0;
      cede;
      }
   };

cede;
   
# all job have been done
$finished->wait;

# close log trace file
$log_h->close() if $logtrace;


__END__

=head1 NAME

oar-dispatch - dispatch lot of small oar job

=head1 SYNOPSIS

 oar-dispatch [--task integer] [--overload real] --file filecommand [--verbose]
 oar-dispatch --help

=head1 OPTIONS

=over 12

=item B<[-t|--task integer]>

Number of task to do in parallel.
Default to the line number of the file OAR_NODE_FILE.
 
=item B<[-o|--overload real]>

Number of OAR job to create / number of task.
Some job are create in advance to start whenever it's possible.
1.1 by default.

=item B<[-f|--file filecommand]>

File name which content OAR job list

=item B<[-v|--verbose]>
 
=item B<[-h|--help]>

=back

Input job file name content can have

 - empty line
 - comment line begin with #
 - oarsub command without -t option
 
C<oar-dispatch> will add C<-t inner=container_id> in this command line,
just after C<oarsub>.

=head1 EXAMPLE

Example where the file F<$HOME/test/subjob.txt> is a list of OAR script job (and can be executable but not need here).

 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob1.oar
 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob2.oar
 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob3.oar
 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob4.oar
 ...
 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob38.oar
 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob39.oar
 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob40.oar

These jobs could be launch with

 oarsub -t container -n test-container -l /core=6,walltime=00:35:00 "oar-dispatch -f ./subjob.list.txt"

Total C<walltime> is defined by the formula:

 total_walltime = subjob_walltime * total_subjob / core + global_delay

In practise, C<oar-dispatch> take few second and each subjob run in less than it's walltime so

 total_walltime < subjob_walltime * total_subjob / core

If launch in interactif, C<overload> parameter is equal to 1,
C<task> must be define
and no inner container is add to the C<oarsub> command line.


=head1 SEE ALSO

oar-parexec, mpilauncher


=head1 AUTHORS

Written by Gabriel Moreau, Grenoble - France


=head1 LICENSE AND COPYRIGHT

GPL version 2 or later and Perl equivalent

Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France

