#!/usr/bin/perl
#
# 2011/11/27 gabriel

use strict;

use Getopt::Long();
use Pod::Usage;
use Coro;
use Coro::Semaphore;
use Coro::Signal;
use Coro::Channel;
use Coro::Handle;
use IO::File;
use POSIX qw( WNOHANG WEXITSTATUS );
use Cwd qw( getcwd );

my $filecmd  = '';
my $logtrace = '';
my $verbose;
my $job_np = 1;
my $nodefile = $ENV{OAR_NODE_FILE} || '';
my $masterio;
my $switchio;
my $help;
my $oarsh = 'oarsh -q -T';

Getopt::Long::GetOptions(
   'filecmd=s'  => \$filecmd,
   'logtrace=s' => \$logtrace,
   'verbose'    => \$verbose,
   'help'       => \$help,
   'oarsh=s'    => \$oarsh,
   'jobnp=i'    => \$job_np,
   'nodefile=s' => \$nodefile,
   'masterio=s' => \$masterio,
   'switchio'   => \$switchio,
   ) || pod2usage(-verbose => 0);
pod2usage(-verbose => 2) if $help;
pod2usage(-verbose => 2) if not -e $filecmd;

# re-run, keep trace of job already done
my %state;
my $log_h = IO::File->new();
if (-e $logtrace) {
   $log_h->open("< $logtrace")
      or die "error: can't read log file: $!";
   while (<$log_h>) {
      $state{$1} = 'start' if m/^start\s+job\s+(\d+)\s/;
      $state{$1} = 'end'   if m/^end\s+job\s+(\d+)\s/;
      }
   $log_h->close();
   }
if ($logtrace) {
   $log_h->open(">> $logtrace")
      or die "error: can't append log file $logtrace: $!";
   $log_h->autoflush;
   $log_h = unblock $log_h;
   }

# job to run
my @job = ();
open(JOB_LIST, '<', "$filecmd") or die "error: can't open job file $filecmd: $!";
while (<JOB_LIST>) {
   chomp;
   next if m/^#/;
   next if m/^\s*$/;
   push @job, $_;
   }
close JOB_LIST;

# ressources available
my @ressources = ();
open(NODE_FILE, '<', "$nodefile")
   or die "can't open $nodefile: $!";
while (<NODE_FILE>) {
   chomp;
   next if m/^#/;
   next if m/^\s*$/;
   push @ressources, $_;
   }
close NODE_FILE;

my $ressource_size = scalar(@ressources);
die "error: not enought ressources jobnp $job_np > ressources $ressource_size"
   if $job_np > $ressource_size;

my $current_dir = getcwd();

my $stderr = $ENV{OAR_STDERR} || '';
$stderr =~ s/\.stderr$//;
$stderr = $masterio if $masterio;
my $stdout = $ENV{OAR_STDOUT} || '';
$stdout =~ s/\.stdout$//;
$stdout = $masterio if $masterio;

my $finished = new Coro::Signal;
my $job_todo = new Coro::Semaphore 0;
$job_todo->up for (@job);

# slice of ressources for parallel job
my $ressources = new Coro::Channel;
for my $slot (1 .. int($ressource_size / $job_np)) {
   $ressources->put(
      join(',',
         @ressources[ (($slot - 1) * $job_np) .. (($slot * $job_np) - 1) ])
         );
   }

my $job_num   = 0;
my %scheduled = ();

# OAR checkpoint and default signal SIGUSR2
my $oar_checkpoint = new Coro::Semaphore 0;
$SIG{USR2} = sub {
   print "warning: receive checkpoint at "
      . time
      . ", no new job, just finishing running job\n"
      if $verbose;
   $oar_checkpoint->up();
   };

# asynchrone start job block
async {
	JOB:
   for my $job (@job) {
      $job_num++;

      # job has been already run ?
      if (exists $state{$job_num}) {
         if ($state{$job_num} eq 'start') {
            print "warning: job $job_num was not clearly finished, relaunching...\n"
               if $verbose;
            }
         elsif ($state{$job_num} eq 'end') {
            delete $state{$job_num}; # free memory
            $job_todo->down;
            print "warning: job $job_num already run\n" if $verbose;
            cede;
            next JOB;
            }
         }

      # take job ressource
      my $job_ressource = $ressources->get;

      # no more launch job when OAR checkpointing
      last JOB if $oar_checkpoint->count() > 0;

      my ($node_connect) = split ',', $job_ressource;
      my $fh = IO::File->new();
      my $job_pid = $fh->open("| $oarsh $node_connect >/dev/null 2>&1")
         or die "error: can't start subjob: $!";

      $fh->autoflush;
      $fh = unblock $fh;

      $scheduled{$job_pid} = {
         fh           => $fh,
         node_connect => $node_connect,
         ressource    => $job_ressource,
         num          => $job_num
         };

      my $msg = sprintf "start job %5i / %5i at %s on node %s\n",
         $job_num, $job_pid, time, $job_ressource;
      $log_h->print($msg) if $logtrace;
      print($msg) if $verbose;

      my ($job_stdout, $job_stderr);
      $job_stdout = ">  $stdout-$job_num.stdout" if $stdout ne '' and $switchio;
      $job_stderr = "2> $stderr-$job_num.stderr" if $stderr ne '' and $switchio;

      my $job_nodefile = "/tmp/oar-parexec-$ENV{LOGNAME}-$job_num";

     # set job environment, run it and clean
      if ($job_np > 1) {
         $fh->print("printf \""
               . join('\n', split(',', $job_ressource,))
               . "\" > $job_nodefile\n");
         $fh->print("OAR_NODE_FILE=$job_nodefile\n");
         $fh->print("OAR_NP=$job_np\n");
         $fh->print("export OAR_NODE_FILE\n");
         $fh->print("export OAR_NP\n");
         $fh->print("unset OAR_MSG_NODEFILE\n");
         }
      $fh->print("cd $current_dir\n");
      $fh->print("$job $job_stdout $job_stderr\n");
      $fh->print("rm -f $job_nodefile\n") if $job_np > 1;
      $fh->print("exit\n");
      cede;
      }
   }

# asynchrone end job block
async {
   while () {
      for my $job_pid (keys %scheduled) {
			# non blocking PID test
         if (waitpid($job_pid, WNOHANG)) {
            my $msg = sprintf "end   job %5i / %5i at %s on node %s\n",
               $scheduled{$job_pid}->{num},
               $job_pid, time, $scheduled{$job_pid}->{ressource};
            $log_h->print($msg) if $logtrace;
            print($msg) if $verbose;
            close $scheduled{$job_pid}->{fh};
            # leave ressources for another job
            $ressources->put($scheduled{$job_pid}->{ressource});
            $job_todo->down;
            delete $scheduled{$job_pid};
            }
         cede;
         }

      # checkpointing ! just finishing running job and quit
      $finished->send if $oar_checkpoint->count() > 0 and scalar(keys(%scheduled)) == 0;

      $finished->send if $job_todo->count() == 0;
      cede;
      }
   }

cede;

# all job have been done
$finished->wait;

# close log trace file
$log_h->close() if $logtrace;

__END__

=head1 NAME

oar-parexec - parallel execute lot of small job

=head1 SYNOPSIS

 oar-parexec --filecmd filecommand [--logtrace tracefile] [--verbose] [--jobnp integer] [--nodefile filenode] [--masterio basefileio] [--switchio] [--oarsh sssh]
 oar-parexec --help

=head1 DESCRIPTION

C<oar-parexec> execute lot of small job.in parallel inside a cluster.
Number of parallel job at one time cannot excede core number in the node file.
C<oar-parexec> is easier to use inside an OAR job environment
which define automatically theses strategics parameters...

Option C<--filecmd> is the only mandatory one.

Small job will be launch in the same folder as the master job.
Two environment variable are define for each small job
and only in case of parallel small job (option C<--jobnp> > 1).

 OAR_NODE_FILE - file that list node for parallel computing
 OAR_NP        - number of processor affected

The file define by OAR_NODE_FILE is created on the node before launching
the small job in /tmp and will be delete after...
C<oar-parexec> is a simple script,
OAR_NODE_FILE will not be deleted in case of crash of the master job.

OAR define other variable that are equivalent to OAR_NODE_FILE:
OAR_NODEFILE, OAR_FILE_NODES, OAR_RESOURCE_FILE...
You can use in your script the OAR original file ressources
by using these variable if you need it.
 

=head1 OPTIONS

=over 12

=item B<-f|--filecmd	filecommand>

File name which content job list.

=item B<-l|--logtrace tracefile>

File which log and trace running job.
In case of running the same command (after crash for example),
only job that ar not mark as done will be run again.
Be carefful, job mark as running (start but for finish) will be run again.

This option is very usefull in case of crash
but also for checkpointing and idempotent OAR job.

=item B<-v|--verbose>

=item B<-j|--jobnp integer>

Number of processor to allocated for each small job.
1 by default.

=item B<-n|--nodefile filenode>

File name that list all the node to launch job.
By defaut, it's define automatically by OAR via
environment variable C<OAR_NODE_FILE>.

For example, if you want to use 6 core on your cluster node,
you need to put 6 times the hostname node in this file,
one per line...
It's a very common file in MPI process !

=item B<-m|--masterio basefileio> 

The C<basefileio> will be use in place of environment variable
C<OAR_STDOUT> and C<OAR_STDERR> (without extension) to build the base name of the small job standart output
(only use when option C<swithio> is activated).

=item B<-s|--switchio> 

Each small job will have it's own output STDOUT and STDERR
base on master OAR job with C<JOB_NUM> inside
(or base on C<basefileio> if option C<masterio>).
Example :

 OAR.151524.stdout -> OAR.151524-JOB_NUM.stdout

where 151524 here is the master C<OAR_JOB_ID>
and C<JOB_NUM> is the small job nnumber.

=item B<-o|-oarsh command>

Command use to launch a shell on a node.
By default

        oarsh -q -T

=item B<-h|--help>

=back


=head1 EXAMPLE

Content for the job file command (option C<--filecmd>) could have:

 - empty line
 - comment line begin with #
 - valid shell command

Example where F<$HOME/test/subjob1.sh> is a shell script (executable).

 $HOME/test/subjob1.sh
 $HOME/test/subjob2.sh
 $HOME/test/subjob3.sh
 $HOME/test/subjob4.sh
 ...
 $HOME/test/subjob38.sh
 $HOME/test/subjob39.sh
 $HOME/test/subjob40.sh

These jobs could be launch by

 oarsub -n test -l /core=6,walltime=00:35:00 "oar-parexec -f ./subjob.list.txt"


=head1 SEE ALSO

oar-dispatch, mpilauncher


=head1 AUTHORS

Written by Gabriel Moreau, Grenoble - France


=head1 LICENSE AND COPYRIGHT

GPL version 2 or later and Perl equivalent

Copyright (C) 2011 Gabriel Moreau / LEGI - CNRS UMR 5519 - France

