#!/usr/bin/perl
#
# 2011/11/03 gabriel

use strict;

use Getopt::Long();
use Pod::Usage;
use Coro;
use Coro::Signal;
use Coro::Semaphore;
use Coro::AnyEvent;
use Coro::Handle;
use IO::File;

my $task = 0;
my $overload = 1;
my $file = '';
my $dir;
my $cmd;
my $logtrace;
my $verbose;
my $help;
my $sig_transmit;
my $sig_checkpoint = 'USR2';

Getopt::Long::GetOptions(
   'task=i'       => \$task,
   'overload=f'   => \$overload,
   'file=s'       => \$file,
   'dir=s'        => \$dir,
   'cmd=s'        => \$cmd,
   'logtrace=s'   => \$logtrace,
   'verbose'      => \$verbose,
   'help'         => \$help,
   'transmit'     => \$sig_transmit,
   'kill=s'       => \$sig_checkpoint,
   ) || pod2usage(-verbose => 0);
pod2usage(-verbose => 2) if $help;

if ($task == 0) {
   open(NODE_FILE, '<', "$ENV{OAR_NODE_FILE}") or die "can't open ENV{OAR_NODE_FILE}: $!";
   $task++ while <NODE_FILE>;
   close NODE_FILE;
   }

# re-run, keep trace of job already done
my %state;
my $log_h = IO::File->new();
if (-e "$logtrace") {
   $log_h->open("< $logtrace")
      or die "error: can't read log file: $!";
   while (<$log_h>) {
      $state{$1} = 'start' if m/^start\s+job\s+([^\s]+)\s/;
      $state{$1} = 'end'   if m/^end\s+job\s+([^\s]+)\s/;
      }
   $log_h->close();
   }
if ($logtrace) {
   $log_h->open(">> $logtrace")
      or die "error: can't append log file $logtrace: $!";
   $log_h->autoflush;
   $log_h = unblock $log_h;
   }

# job to run
my @job = ();
#open (JOB_LIST, '<', "$file") or die "can't open $file: $!";
#while (<JOB_LIST>) {
#   chomp;
#   next if m/^#/;
#   push @job, $_ if m/^\s*oarsub/;
#   }
#close JOB_LIST;
if (-e "$file") {
   my $job_num = 0;
   open(JOB_LIST, '<', "$file") or die "error: can't open job file $file: $!";
   while (my $job_cmd = <JOB_LIST>) {
      chomp $job_cmd;
      next if $job_cmd =~ m/^#/;
      next if $job_cmd =~ m/^\s*$/;
      # Add oarsub -S if not
      $job_cmd = "oarsub -S $job_cmd" if $job_cmd !~ m/^\s*oarsub/;
      $job_num++;
      my ($job_name) = $job_cmd =~ m/#.*?\bname=(\S+?)\b/i;
      $job_name ||= $job_num;
      push @job, {
         name   => $job_name,
         cmd    => "$job_cmd",
         num    => $job_num,
         };
      }
   close JOB_LIST;
   }
else {
   # Add oarsub -S if not
   $cmd = "oarsub -S $cmd" if $cmd !~ m/^\s*oarsub/;
   my $job_num = 0;
   opendir(DIR, $dir) or die "error: can't open folder $dir: $!";
   while (my $item = readdir(DIR)) {
      next if $item =~ m/^\./;
      next if $item =~ m/:/;
      next if $item =~ m/\.old$/;
      next if $item =~ m/\.sav$/;
      next if $item =~ m/\.bak$/;
      next if $item =~ m/\.no$/;
      next unless (-d "$dir/$item");
      $job_num++;
      push @job, {
         name   => $item,
         cmd    => "cd $dir/$item/; $cmd",
         num    => $job_num,
         };
      }
   closedir DIR;
   }

my $container_id=$ENV{OAR_JOB_ID};
my $insert_oar_option = "-t inner=$container_id";

# interactive job
if (not $container_id > 1) {
   $insert_oar_option = '';
   $overload = 1;
   }

my $finished = new Coro::Signal;
my $job_todo = new Coro::Semaphore 0;
my $job_name_maxlen;
for (@job) {
   $job_todo->up;
   $job_name_maxlen = length($_->{name}) if length($_->{name}) > $job_name_maxlen;
   }

my $job_active = new Coro::Semaphore 0;

my %scheduled = ();

# OAR checkpoint and default signal SIGUSR2
my $oar_checkpoint = new Coro::Semaphore 0;
my $notify         = new Coro::Signal;
$SIG{$sig_checkpoint} = sub {
   print "warning: receive checkpoint at "
      . time
      . ", no new job, just finishing running job\n"
      if $verbose;
   $oar_checkpoint->up();
   $notify->send if $sig_transmit;
   };

# asynchrone notify job
async {
   while () {
      $notify->wait;

      for my $job_pid (keys %scheduled) {
         system "oardel --checkpoint --signal $sig_checkpoint $job_pid";
         cede;
         }
      }
   }

# asynchrone start job block
async {
   JOB:
   for my $job (@job) {
      my $job_name   = $job->{name};
      my $job_cmd    = $job->{cmd};

      # job has been already run ?
      if (exists $state{$job_name}) {
         if ($state{$job_name} eq 'start') {
            print "warning: job $job_name was not clearly finished, relaunching...\n"
               if $verbose;
            }
         elsif ($state{$job_name} eq 'end') {
            delete $state{$job_name}; # free memory
            $job_todo->down;
            print "warning: job $job_name already run\n" if $verbose;
            cede;
            next JOB;
            }
         }

      while ($job_active->count >= $task*$overload) {
         cede;
         }

      # no more launch job when OAR checkpointing
      last JOB if $oar_checkpoint->count() > 0;

      $job_cmd =~ s/^\b(oarsub)\b/$1 $insert_oar_option/;
      print "$job_cmd" if $verbose;
      my $job_id = `$job_cmd|grep ^OAR_JOB_ID|cut -f 2 -d '='`;
      chomp $job_id;
      if ($job_id > 1) {
         $scheduled{$job_id} = {
            name         => $job_name,
            };
         $job_active->up;

         my $msg = sprintf "start job %${job_name_maxlen}s / %i at %s\n",
            $job_name, $job_id, time;
         $log_h->print($msg) if $logtrace;
         print($msg) if $verbose;
         }
      cede;

      # asynchrone guard for job end
      async {
         my $timer;
         GUARD:
         while () {
            cede;
            $timer = AE::now + 5;
            while ( AE::now < $timer ) { AE::now_update; cede; }
            my $is_finish = `oarstat -s -j $job_id`;
            chomp $is_finish;
            last GUARD if $is_finish =~ m/Terminated/;
            }

         my $msg = sprintf "end   job %${job_name_maxlen}s / %i at %s\n",
            $scheduled{$job_id}->{name}, $job_id, time;

            # Job non finish, just suspend if received checkpoint signal
            $msg =~ s/^end\s+job/suspend job/
               if $sig_transmit and $oar_checkpoint->count() > 0;

            $log_h->print($msg) if $logtrace;
            print($msg) if $verbose;

         $job_active->down;
         $job_todo->down;
         delete $scheduled{$job_id};
         }
      }
   };

async {
   while () {
#      for my $job_id (keys %scheduled) {
#         my $is_finish = `oarstat -s -j $job_id`;
#         chomp $is_finish;
#         if ($is_finish =~ m/Terminated/) {
#            delete $scheduled{$job_id};
#            $job_active->down;
#            $job_todo->down;
#            }
#         cede;
#         }

      # checkpointing ! just finishing running job and quit
      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;

      $finished->send if $job_todo->count == 0;
      cede;
      }
   };

cede;
   
# all job have been done
$finished->wait;

# close log trace file
$log_h->close() if $logtrace;


__END__

=head1 NAME

oar-dispatch - dispatch lot of small oar job

=head1 SYNOPSIS

 oar-dispatch [--task integer] [--overload real] --file filecommand [--verbose]
 oar-dispatch --help

=head1 OPTIONS

=over 12

=item B<[-t|--task integer]>

Number of task to do in parallel.
Default to the line number of the file OAR_NODE_FILE.
 
=item B<[-o|--overload real]>

Number of OAR job to create / number of task.
Some job are create in advance to start whenever it's possible.
1.1 by default.

=item B<[-f|--file filecommand]>

File name which content OAR job list

=item B<[-v|--verbose]>
 
=item B<[-h|--help]>

=back

Input job file name content can have

 - empty line
 - comment line begin with #
 - oarsub command without -t option
 
C<oar-dispatch> will add C<-t inner=container_id> in this command line,
just after C<oarsub>.

=head1 EXAMPLE

Example where the file F<$HOME/test/subjob.txt> is a list of OAR script job (and can be executable but not need here).

 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob1.oar
 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob2.oar
 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob3.oar
 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob4.oar
 ...
 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob38.oar
 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob39.oar
 oarsub -n test -l /core=1,walltime=00:05:00 $HOME/test/subjob40.oar

These jobs could be launch with

 oarsub -t container -n test-container -l /core=6,walltime=00:35:00 "oar-dispatch -f ./subjob.list.txt"

Total C<walltime> is defined by the formula:

 total_walltime = subjob_walltime * total_subjob / core + global_delay

In practise, C<oar-dispatch> take few second and each subjob run in less than it's walltime so

 total_walltime < subjob_walltime * total_subjob / core

If launch in interactif, C<overload> parameter is equal to 1,
C<task> must be define
and no inner container is add to the C<oarsub> command line.


=head1 SEE ALSO

oar-parexec, mpilauncher


=head1 AUTHORS

Written by Gabriel Moreau, Grenoble - France


=head1 LICENSE AND COPYRIGHT

GPL version 2 or later and Perl equivalent

Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France

