#!/usr/bin/perl
#
# 2011/11/27 gabriel

use strict;

use Getopt::Long();
use Pod::Usage;
use Coro;
use Coro::Semaphore;
use Coro::Signal;
use Coro::Channel;
use Coro::Handle;
use IO::File;
use POSIX qw( WNOHANG WEXITSTATUS );
use Cwd qw( getcwd );

my $file = '';
my $verbose;
my $nodefile = $ENV{OAR_NODE_FILE} || '';
my $masterio;
my $switchio;
my $help;
my $oarsh = 'oarsh -q -T';

Getopt::Long::GetOptions(
   'file=s'     => \$file,
   'verbose'    => \$verbose,
   'help'       => \$help,
   'oarsh=s'    => \$oarsh,
   'nodefile=s' => \$nodefile,
   'masterio=s' => \$masterio,
   'switchio'   => \$switchio,
   ) || pod2usage( -verbose => 0 );
pod2usage( -verbose => 2 ) if $help;
pod2usage( -verbose => 2 ) if not -e $file;

my @job = ();
open( JOB_LIST, '<', "$file" ) or die "can't open $file: $!";
while (<JOB_LIST>) {
   chomp;
   next if m/^#/;
   next if m/^\s*$/;
   push @job, $_ ;
   }
close JOB_LIST;

my $stderr = $ENV{OAR_STDERR} || '';
$stderr =~ s/\.stderr$//;
$stderr = $masterio if $masterio;
my $stdout = $ENV{OAR_STDOUT} || '';
$stdout =~ s/\.stdout$//;
$stdout = $masterio if $masterio;

my $current_dir = getcwd();

my $finished = new Coro::Signal;
my $job_todo = new Coro::Semaphore 0;
$job_todo->up for (@job);

my $ressources = new Coro::Channel;
open( NODE_FILE, '<', "$nodefile" )
   or die "can't open $nodefile: $!";
while (<NODE_FILE>) {
   chomp;
   next if m/^#/;
   next if m/^\s*$/;
   $ressources->put($_);
   }
close NODE_FILE;

my $job_num   = 0;
my %scheduled = ();

async {
   for my $job (@job) {
      my $node = $ressources->get;

      $job_num++;

      my $fh      = IO::File->new();
      my $job_pid = $fh->open("| $oarsh $node >/dev/null 2>&1")
         or die "don't start subjob: $!";

      $fh->autoflush;
      $fh = unblock $fh;

      $scheduled{$job_pid} = { fh => $fh, node => $node, num => $job_num };

      printf "start job %5i / %5i on node %s at %s\n",
         $job_num, $job_pid, $node, time
         if $verbose;

      my ( $job_stdout, $job_stderr );
      $job_stdout = ">  $stdout-$job_num.stdout" if $stdout ne '' and $switchio;
      $job_stderr = "2> $stderr-$job_num.stderr" if $stderr ne '' and $switchio;

      $fh->print("cd $current_dir\n");
      $fh->print("$job $job_stdout $job_stderr\n");
      $fh->print("exit\n");
      cede;
      }
   }

async {
   while () {
      for my $job_pid ( keys %scheduled ) {
         if ( waitpid( $job_pid, WNOHANG ) ) {
            printf "end   job %5i / %5i on node %s at %s\n",
               $scheduled{$job_pid}->{num},
               $job_pid, $scheduled{$job_pid}->{node}, time
               if $verbose;
            close $scheduled{$job_pid}->{fh};
            $ressources->put( $scheduled{$job_pid}->{node} );
            $job_todo->down;
            delete $scheduled{$job_pid};
            }
         cede;
         }

      $finished->send if $job_todo->count == 0;
      cede;
      }
   }

cede;

$finished->wait;

__END__

=head1 NAME

oar-parexec - parallel execute lot of small job

=head1 SYNOPSIS

 oar-parexec --file filecommand [--verbose]  [--nodefile filenode] [--masterio basefileio] [--switchio] [--oarsh sssh]
 oar-parexec --help

=head1 DESCRIPTION

C<oar-parexec> execute lot of small job.in parallel inside a cluster.
Number of parallel job at one time cannot excede core number in the node file.
C<oar-parexec> is easier to use inside an OAR job environment
which define automatically theses strategics parameters...

Option C<--file> is the only mandatory one.

Small job will be launch in the same folder as the master job.


=head1 OPTIONS

=over 12

=item B<-f|--file	filecommand>

File name which content job list.

=item B<-v|--verbose>

=item B<-n|nodefile filenode>

File name that list all the node to launch job.
By defaut, it's define automatically by OAR via
environment variable C<OAR_NODE_FILE>.

For example, if you want to use 6 core on your cluster node,
you need to put 6 times the hostname node in this file,
one per line...
It's a very common file in MPI process !

=item B<-m|--masterio basefileio> 

The C<basefileio> will be use in place of environment variable
C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
(only when option C<swithio> is activated).

=item B<-s|--switchio> 

Each small job will have it's own output STDOUT and STDERR
base on master OAR job with C<JOB_NUM> inside
(or base on C<basefileio> if option C<masterio>).
Example :

 OAR.151524.stdout -> OAR.151524-JOB_NUM.stdout

where 151524 here is the master C<OAR_JOB_ID>
and C<JOB_NUM> is the small job nnumber.

=item B<-o|-oarsh command>

Command use to launch a shell on a node.
By default

        oarsh -q -T

=item B<-h|--help>

=back


=head1 EXAMPLE

Content for the job file (option C<--file>) could have:

 - empty line
 - comment line begin with #
 - valid shell command

Example where F<$HOME/test/subjob1.sh> is a shell script (executable).

 $HOME/test/subjob1.sh
 $HOME/test/subjob2.sh
 $HOME/test/subjob3.sh
 $HOME/test/subjob4.sh
 ...
 $HOME/test/subjob38.sh
 $HOME/test/subjob39.sh
 $HOME/test/subjob40.sh

These jobs could be launch by

 oarsub -n test -l /core=6,walltime=00:35:00 "oar-parexec -f ./subjob.list.txt"


=head1 SEE ALSO

oar-dispatch, mpilauncher


=head1 AUTHORS

Written by Gabriel Moreau, Grenoble - France


=head1 LICENSE AND COPYRIGHT

GPL version 2 or later and Perl equivalent

Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France

