#!/usr/bin/perl
# $Id: oarsub,v 1.39 2005/06/15 11:54:52 capitn Exp $

#Sumbit job for execution

use strict;
use IO::Socket::INET;
use DBI();
use Data::Dumper;
use Sys::Hostname;
use Getopt::Long;
use oar_iolib;
use oar_conflib qw(init_conf dump_conf get_conf is_conf);
use File::Temp qw/ tempfile /;

#For interactive
my $server;
my $serverport;
my $remote_host;
my $remote_port;
my $deploy_hostname;
my $host = hostname ;

my $base;
my $err;
my $FILE;
my $idJob;
my $interactive = 0;
my $reservation = "0";
my $connectResa ;
my @resource_list;
my @resource;
my $value;
my $index;
my $res_type;
my $res_value;
my $nbnodes = 1;
my $weight = 1;
my $walltime;
my $queueName;
my $jobSQLproperties;
my $label;
my $exec = "";
my $cmd_executor;
my $mode;
my $w_h; my $w_mn; my $w_sec;
my $verbose;
my $stagein = undef;
my $idFile = undef;
my $md5sum = undef;
my $stageindir;
my $sos;
my $checkpoint=0;

#to catch ^C signal
$SIG{'INT'} = 'qdel';

# to adress ^C in interactive submission
sub qdel($) {
    my $Al_dead = shift;
    if (defined($idJob)) {
        if ($Al_dead eq "INT" ) {
            print("\n\nCaught Interrupt (^C), Deleting the job $idJob\n");
        }
        $base = iolib::connect();
        $err = iolib::frag_job($base,$idJob);
        iolib::disconnect($base);
        print("Job deleted\n");
        #Signal Almigthy
        if ($Al_dead == 0) {
            my $socket = IO::Socket::INET->new( PeerAddr => $remote_host,
                                                PeerPort => $remote_port,
                                                Proto => "tcp",
                                                Type  => SOCK_STREAM)
                         or die("Couldn't connect executor $remote_host:$remote_port\n");
            print($socket "Qdel\n");
            my $answer=<$socket>;
            print("Almigthy answers : $answer\n");
            exit(1);
        }
    }
}

sub init_TCP_server(){
    $server = IO::Socket::INET->new(Type => SOCK_STREAM,
                                    Reuse => 1,
                                    Listen => 1)
                                or die("ARGL....");
    $serverport = $server->sockport();
    if (defined($verbose)) {
        print("init_TCP_server Port : $serverport\n");
    }
}

sub scan_script($){
#Catch #OAR -l nodes=N and other #OAR
    $FILE = shift;
    my $noscript = 0;

    #open(FILE,"<$FILE") or $noscript = 1;
    my $lusr= getpwuid($ENV{SUDO_UID});
    open(FILE, "sudo -u $lusr cat $FILE 2> /dev/null|") or $noscript = 1;

    if ($noscript == 1) {
        return;
    }

    if (<FILE> =~ /^#/){
        while (<FILE>) {
            if ( /^#OAR\s+/ ){
                my $line = $_;
                if ($line =~ m/^#OAR\s+-l\s*(.+)\s*$/m){
                    my $resourceTmp = $1;
                    @resource_list = split(',', $resourceTmp );
                    foreach $index (0 .. $#resource_list) {
                        ($resourceTmp,$value) = split("=",$resource_list[$index]);
                        print("Resource = $resourceTmp Value = $value\n");
                        if($resourceTmp eq 'nodes'){
                            $nbnodes = $value;
                        }elsif ($resourceTmp eq 'walltime') {
                            $walltime = $value;
                            ($w_h,$w_mn,$w_sec)=split(':',$value);
                            if (not defined($w_sec)) {
                                $walltime = $walltime.":00";
                            }
                        }elsif ($resourceTmp eq 'weight'){
                            $weight = $value;
                        }else {
                            print("/!\\I don t recognize the resource $resourceTmp\n");
                        }
                    }
                }elsif ((not defined($queueName)) && ($line =~ m/^#OAR\s+\-q\s*(.+)\s*$/m)) {
                    print $_;
                    ($queueName) = $1;
                }elsif ((not defined($jobSQLproperties)) && ($line =~ m/^#OAR\s+-p\s*"(.+)"\s*$/m)) {
                    $jobSQLproperties = $1;
                }
            }
        }
    }
    close(FILE);
}

sub print_asked_resources(){
    print("\nResources asked\n");
    print("Nb_nodes = $nbnodes \tQueueName = $queueName \tWalltime = $walltime\n");
    print("Mode = $mode Command = $cmd_executor\n");
    print("Job_properties : $jobSQLproperties\n");
}

sub check_params(){
    # replace all by the maximum of nodes
    if (($nbnodes eq "all") || ($nbnodes eq "max")){
        my $criteria = $nbnodes;
        my $properties = iolib::form_job_properties($base, $jobSQLproperties);
        if ($properties ne ""){
            $properties =~ s/\\//g ;
            $properties = "AND ($properties)";
        }
        my $whereState = "n.state = \"Alive\"";
        if ($nbnodes eq "max"){
            $whereState = "($whereState OR n.state = \"Absent\" OR n.state = \"Suspected\")";
        }
        $nbnodes = $base->do("SELECT * FROM nodeProperties p, nodes n
                              WHERE n.hostname = p.hostname
                                AND (n.maxWeight - n.weight) >= $weight
                                AND $whereState
                                   $properties");
        print(" <$criteria> = $nbnodes\n");
    }
    
    my @listNodes = iolib::list_nodes($base);
    my $maxMaxNbNodes = $#listNodes+1;
    if($nbnodes > $maxMaxNbNodes){
        die("[ERROR] $nbnodes is too big. the maximum of nodes is $maxMaxNbNodes\n");
    }
    if($nbnodes <= 0){
        die("[ERROR] $nbnodes : not enough nodes\n");
    }

    my $maxmaxweight = iolib::get_maxweight_node($base);
    if ($weight > $maxmaxweight){
       die("[ERROR] MAX weight = $maxmaxweight, ... $weight is too heavy\n");
    }
    if($weight < 1){
       die("[ERROR] weight $weight must be heavier than 0\n");
    }
}

sub usage() {
    print <<EOS;
Usage: oarsub [OPTIONS] [-I|<SCRIPT>]
Submit a job the OAR batch scheduler
Options are:
 -I, --interactive             request an interactive job
 -l, --resource=<LIST>         set the resource list
 -q, --queue=<QUEUE>           set the the queue to submit the job to
 -p, --property=<LIST>         set the property list
 -r, --reservation=<DATE>      request a reservation
 -c, --connect=<RESERVATION>   connect a reservation
 -s, --stagein=<DIR|TGZ>       set the stagein directory or archive
 -m, --stagein-md5sum=<MD5SUM> set the stagein file md5sum
 -v, --verbose                 increase verbosity
 -k, --checkpoint              enable the job to be checkpointed
 -h, --help                    print this help message
EOS
}

#
# Main
#

init_conf("oar.conf");
$remote_host = get_conf("SERVER_HOSTNAME");
$remote_port = get_conf("SERVER_PORT");
$stageindir = get_conf("STAGEIN_DIR");

$deploy_hostname = get_conf("DEPLOY_HOSTNAME");
if (!defined($deploy_hostname)){
    $deploy_hostname = $remote_host;
}

my $binpath;
if (defined($ENV{OARDIR})){
    $binpath = $ENV{OARDIR}."/";
}else{
    die("OARDIR env variable must be defined\n");
}

if (is_conf("NODE_DEFAULT_WEIGHT")){
    $weight = get_conf("NODE_DEFAULT_WEIGHT");
}

Getopt::Long::Configure ("gnu_getopt");

GetOptions ("resource|l=s" => \@resource,
            "queue|q=s"   => \$queueName,
            "interactive|I"  => \$interactive,
            "verbose|V" => \$verbose,
            "property|p=s" => \$jobSQLproperties,
            "reservation|r=s" => \$reservation,
            "connect|c=i" => \$connectResa,
            "stagein|s=s" => \$stagein,
            "stagein-md5sum|m=s" => \$md5sum,
            "checkpoint|k=i" => \$checkpoint,
            "help|h" => \$sos
           );

if (defined $sos){
    usage();
    exit(0);
}

if ((@ARGV != 1) && ($interactive == 0) && ($reservation == 0)){
    usage();
    exit(1);
}

# stagein machinery
if (defined $stagein) {
    print "Setting up stagein...\n";
    if (-d $stagein) {
        print "Archiving the content of the directory \"$stagein\" for the job stagein...\n";
        my  (undef, $filename) = tempfile (SUFFIX=>".oar-stagein.tgz",OPEN => 0);
        system "tar cfz $filename $stagein" and die "Failed to archive the directory: $?\n";
        print "Stagein archive = $filename\n";
        print "(You may save this file if you plan to submit other jobs later with the same stagein)\n";
				$stagein = $filename;
        $md5sum = undef;
    }
    ( -r $stagein ) or die "Stagein file not found: $stagein\n";
    unless (defined $md5sum) {
        print "Computing stagein md5sum...\n";
		    ($md5sum) = split(" ",`md5sum $stagein`);
        print "md5sum = $md5sum\n";
        print "(You may use the -m option with this md5sum for other job submitions with the same stagein)\n";
    }
    $base = iolib::connect();
		iolib::get_lock($base,$md5sum,3600) or die "Failed to lock stagein\n";
    $idFile = iolib::get_stagein_id($base,$md5sum);
    if (defined $idFile) {
        print "This stagein is already stored on the server.\n";
    } else {
        my $location = "$stageindir/$md5sum";
        my $method = "FILE";
        my $compression = "tar.gz";
        print "Uploading stagein...\n";
        system "scp $stagein $location" and die "Stagein upload failed\n";
        my @stats = stat $stagein;
		    $idFile=iolib::set_stagein($base,$md5sum,$location,$method,$compression,$stats[7]);
        defined $idFile or die "Failed to setup stagein\n";
    }
    iolib::release_lock($base,$md5sum) or die "Failed to unlock stagein\n";
    iolib::disconnect($base);
    print "Stagein completed.\n";
}

$base = iolib::connect();

# Connect to a reservation
if (defined($connectResa)){
    #my $lusr= getpwuid($<);
    my $lusr= getpwuid($ENV{SUDO_UID});
    my $job = iolib::get_job($base, $connectResa);
    if ((($lusr eq $job->{'user'}) or ($lusr eq "oar")) && ($job->{'reservation'} eq "Scheduled") && ($interactive == 1) && ($job->{'state'} eq "Running")) {
        my @hosts = iolib::get_job_host_distinct($base,$connectResa);
        my $hostToConnectViaSSH = $hosts[0];
        #deploy part
        if ($job->{'queueName'} eq "deploy"){
            $hostToConnectViaSSH = $deploy_hostname;
        }
        #deploy part
        my $OAR_NB_NODES = $#hosts + 1;
        my @passinfo=getpwnam($lusr);
        my $shell=pop(@passinfo);
        #my $cmd = "ssh -t $hosts[0] \"/bin/sh -c \\\"TTY=\\`/usr/bin/tty\\` && /usr/bin/test -e \\\\\\\$TTY && sudo /bin/chown $lusr:oar \\\\\\\$TTY && sudo /bin/chmod 660 \\\\\\\$TTY \\\" && /usr/bin/sudo /bin/su - $lusr -c \\\"$binpath/oarexecuser.sh /tmp/OAR_$connectResa $OAR_NB_NODES $connectResa $lusr $shell $job->{launchingDirectory} I\\\"\"";
        my $cmd = "ssh -t $hostToConnectViaSSH \"sh -c \\\"TTY=\\\\\\\$(tty) && test -e \\\\\\\$TTY && sudo chown $lusr:oar \\\\\\\$TTY && sudo chmod 660 \\\\\\\$TTY \\\" && sudo su - $lusr -c \\\"$binpath/oarexecuser.sh /tmp/OAR_$connectResa $OAR_NB_NODES $connectResa $lusr $shell $job->{launchingDirectory} I\\\"\"";
        #print("oarsub launchs command : $cmd\n");
        #essential : you become oar instead of the user
        #UID=EUID
        $< = $>;
        print("Connect to OAR reservation $connectResa via the node $hostToConnectViaSSH\n");
        system($cmd);
        print("Disconnect from OAR reservation $connectResa\n");
    }else{
        print("/!\\ You cannot connect to the job $connectResa (maybe you are not the right user OR the job $connectResa is not a reservation OR this job is not Running OR it is not an Interactive connection)\n");
        exit(1);
    }
    exit(0);
}
# End connection to a reservation

if (defined(@resource)) {
    foreach my $i (@resource){
        push(@resource_list, split(',',$i));
    }

    foreach  my $index (0 .. $#resource_list) {
        ($res_type,$res_value) = split ('=',@resource_list[$index]);
        if ($res_type eq 'nodes') {
            $nbnodes = $res_value;
        }elsif ($res_type eq 'walltime') {
            $walltime = $res_value;
            ($w_h,$w_mn,$w_sec) = split(':',$walltime);
            if (not defined($w_sec)) {
                $walltime = $walltime.":00";
            }
        }elsif ($res_type eq 'weight'){
            $weight = $res_value;
        }else{
            print("/!\\I don t recognize the resource $res_type\n");
        }
    }
}

if ($interactive == 0) {
    $exec = $ARGV[0];
    scan_script($exec) if ($exec ne "");

    if (!($exec =~ m/^\/.+$/m)){
        # WARNING: we are not the real user, we are oar user!!!
        # so $exec properties are not correct
        if ( -e "$exec" ){
            $exec = $ENV{PWD}."/".$exec ;
        }
    }

    $cmd_executor = "Qsub\n";
    $mode = "PASSIVE";

    if (defined($verbose)) {
        print_asked_resources();
    }
    check_params();

    if ($reservation ne "0"){
        init_TCP_server();
        print("Host:Port = $host:$serverport \n ");
    }

    $idJob= iolib::add_micheline_job($base,$mode,$nbnodes,$weight,$exec,"$host:$serverport",$walltime,$queueName,$jobSQLproperties,$reservation,defined ($idFile)?$idFile:"NULL",$checkpoint);
    print("IdJob = $idJob \n");
}else{
    $cmd_executor = "Qsub -I\n";
    init_TCP_server();
    print("Host:Port = $host:$serverport \n ");
    $mode = "INTERACTIVE";

    if (defined($verbose)) {
        print_asked_resources();
    }
    check_params();

    $idJob= iolib::add_micheline_job($base,$mode,$nbnodes,$weight,$exec,"$host:$serverport",$walltime,$queueName,$jobSQLproperties,$reservation,defined ($idFile)?$idFile:"NULL",$checkpoint);
    print("IdJob = $idJob \n");
}

iolib::disconnect($base);

if ($idJob < 0){
    print("Error in oarsub, verify your syntax or call administrator\n");
    exit(2);
}

#Signal Almigthy

my $socket = IO::Socket::INET->new(PeerAddr => $remote_host,
                                   PeerPort => $remote_port,
                                   Proto => "tcp",
                                   Type  => SOCK_STREAM
                                  );

if (not $socket) {
    qdel(1);
    print("Couldn't connect executor $remote_host:$remote_port so I kill this job. Is OAR started?\n");
    exit(0);
}

print($socket $cmd_executor);
my $answer;

#my $answer = <$socket>;
#if (defined($verbose)) {
#    print("Almigthy answers : $answer\n");
#}


if ($reservation ne "0"){
    #Reservation mode
    print("Reservation mode : waiting validation\n");
    my $client = $server->accept();
    $answer = <$client>;
    if (defined($verbose)) {
        print("Runner says $answer\n");
    }
    if ($answer eq "GOOD RESERVATION"){
        print("Reservation valid --> OK\n");
    }else{
        print("Reservation not valid --> KO\n");
    }
}elsif ($interactive==1) {
    #Interactive mode
    print("Interactive mode : waiting \n");
    my $client = $server->accept();
    $answer = <$client>;
    if (defined($verbose)) {
        print("Runner says $answer\n");
    }
    if ($answer eq "GOOD JOB"){
        my $pid=0;
        $pid=fork;
        if($pid == 0){
            #CHILD
            #Pid process registered by bipbip.pl
            $ENV{PATH}="/bin:/usr/bin:/usr/local/bin";
            $ENV{IFS}="";
            $ENV{ENV}="";
            $ENV{USER}="oar";
            $ENV{USERNAME}="oar";
            $ENV{LOGNAME}="oar";
            exec("$binpath/bipbip $idJob");
        }
        wait;
        my $exit_value  = $? >> 8;
        exit($exit_value);
    }else{
        print("Sorry but the system returned : $answer\n");
    }
}

exit 0;
