#!/usr/bin/perl
#
# 2011/11/27 gabriel

use strict;

use Getopt::Long();
use Pod::Usage;
use Coro;
use Coro::Semaphore;
use Coro::Signal;
use Coro::Channel;
use Coro::Handle;
use IO::File;
use POSIX qw( WNOHANG WEXITSTATUS );

my $file = '';
my $verbose;
my $switchio;
my $help;
my $oarsh = 'oarsh -q -T';

Getopt::Long::GetOptions(
   'file=s'   => \$file,
   'verbose'  => \$verbose,
   'help'     => \$help,
   'oarsh'    => \$oarsh,
   'switchio' => \$switchio,
   ) || pod2usage( -verbose => 0 );
pod2usage( -verbose => 2 ) if $help;
pod2usage( -verbose => 2 ) if not -e $file;

my @job = ();
open( JOB_LIST, '<', "$file" ) or die "can't open $file: $!";
while (<JOB_LIST>) {
   chomp;
   next if m/^#/;
   push @job, $_ if not m/^\s*$/;
   }
close JOB_LIST;

my $stderr = $ENV{OAR_STDERR};
$stderr =~ s/\.stderr$//;
my $stdout = $ENV{OAR_STDOUT};
$stdout =~ s/\.stdout$//;

my $finished = new Coro::Signal;
my $job_todo = new Coro::Semaphore 0;
$job_todo->up for (@job);

my $ressources = new Coro::Channel;
open( NODE_FILE, '<', "$ENV{OAR_NODE_FILE}" )
   or die "can't open ENV{OAR_NODE_FILE}: $!";
while (<NODE_FILE>) {
   chomp;
   $ressources->put($_);
   }
close NODE_FILE;

my $job_num   = 0;
my %scheduled = ();

async {
   for my $job (@job) {
      my $node = $ressources->get;

      $job_num++;

      my $fh      = IO::File->new();
      my $job_pid = $fh->open("| $oarsh $node >/dev/null 2>&1")
         or die "don't start subjob: $!";

      $fh->autoflush;
      $fh = unblock $fh;

      $scheduled{$job_pid} = { fh => $fh, node => $node, num => $job_num };

      printf "start job %5i / %5i on node %s at %s\n",
         $job_num, $job_pid, $node, time
         if $verbose;

      my ( $job_stdout, $job_stderr );
      $job_stdout = ">  $stdout-$job_num.stdout" if $stdout ne '' and $switchio;
      $job_stderr = "2> $stderr-$job_num.stderr" if $stderr ne '' and $switchio;

      $fh->print("cd $ENV{OAR_WORKDIR}\n");
      $fh->print("$job $job_stdout $job_stderr\n");
      $fh->print("exit\n");
      cede;
      }
   }

async {
   while () {
      for my $job_pid ( keys %scheduled ) {
         if ( waitpid( $job_pid, WNOHANG ) ) {
            printf "end   job %5i / %5i on node %s at %s\n",
               $scheduled{$job_pid}->{num},
               $job_pid, $scheduled{$job_pid}->{node}, time
               if $verbose;
            close $scheduled{$job_pid}->{fh};
            $ressources->put( $scheduled{$job_pid}->{node} );
            $job_todo->down;
            delete $scheduled{$job_pid};
            }
         cede;
         }

      $finished->send if $job_todo->count == 0;
      cede;
      }
   }

cede;

$finished->wait;

__END__

=head1 NAME

oar-parexec - parallel execute lot of small job

=head1 SYNOPSIS

 oar-parexec --file filepath [--verbose] [--switchio] [--oarsh sssh]
 oar-parexec --help

=head1 OPTIONS

 --file	file name which content job list

 --verbose

 --switchio each small job will have it's own output STDOUT and STDERR
            base on master OAR job with JOB_NUM inside. Example :

            OAR.151524.stdout -> OAR.151524-JOB_NUM.stdout

            where 151524 here is the master OAR_JOB_ID

 -oarsh command use to connect a shell on a node
        by default

        oarsh -q -T

 --help


=head1 DESCRIPTION

C<oar-parexec> need to be execute inside an OAR job environment.
because it need the two environment variable that OAR define by
default:

 OAR_NODE_FILE path to a file which content one node by line

 OAR_WORKDIR   dir to launch job and do a chdir inside

Content for the job file (option C<--file>) could have:

 - empty line
 - comment line begin with #
 - valid shell command

Example where F<$HOME/test/subjob1.sh> is a shell script (executable).

 $HOME/test/subjob1.sh
 $HOME/test/subjob2.sh
 $HOME/test/subjob3.sh
 $HOME/test/subjob4.sh

 $HOME/test/subjob38.sh
 $HOME/test/subjob39.sh
 $HOME/test/subjob40.sh

These jobs could be launch by

 oarsub -n test -l /core=6,walltime=00:35:00 "oar-parexec -f ./subjob.list.txt"

=head1 SEE ALSO

oar-dispatch, mpilauncher


=head1 AUTHORS

Written by Gabriel Moreau, Grenoble - France


=head1 LICENSE AND COPYRIGHT

GPL version 2 or later and Perl equivalent

Copyright (C) 2011 Gabriel Moreau.

