TNO Intern

Commit b41efefb authored by Arjo Segers's avatar Arjo Segers
Browse files

Support batch job for Sun Grid Engine.

parent c0c04d8f
Loading
Loading
Loading
Loading
+320 −17
Original line number Diff line number Diff line
@@ -117,6 +117,7 @@ The classes provided by this module have been derived with the following hierchy
      * :py:class:`.UtopyaJobScriptBatchSlurm`
      * :py:class:`.UtopyaJobScriptBatchPBS`
      * :py:class:`.UtopyaJobScriptBatchLoadLeveler`
      * :py:class:`.UtopyaJobScriptBatchSGE`

Classes
=======
@@ -803,9 +804,9 @@ class UtopyaJobScriptBatch( UtopyaJobScript ) :
        
        For each option key, a flag/value pair needs to be defined;
        in the example above, for key 'jobname' the flag/value pair 
        'J %(env:name)' is defined.
        'n %(env:name)' is defined.
        Typically, the flag could consist of just a cryptic
        letter ('J') as required by the batch system,
        letter ('n') as required by the batch system,
        while the option key is longer and more descriptive.
        
        The first part of a flag/value is the flag, seperated by whitespace
@@ -1199,9 +1200,9 @@ class UtopyaJobScriptBatchLSF( UtopyaJobScriptBatch ) :
        #    bsub < jbfile
        command = 'bsub'
        # input lines:
        f = open( jbfile, 'r' )
        with open( jbfile, 'r' ) as f :
            jblines = f.read()
        f.close()
        #endwith
        # info ...
        logging.info( _indent+'command: %s' % command )
        logging.info( _indent+'stdin  : %s' % jbfile )
@@ -1257,9 +1258,9 @@ class UtopyaJobScriptBatchLSF( UtopyaJobScriptBatch ) :
        # log file for submission:
        job_info = bname+'.info'
        # write to file:
        f = open( job_info, 'w' )
        with open( job_info, 'w' ) as f :
            f.writelines(infotext)
        f.close()
        #endwith

    #enddef Submit

@@ -1449,9 +1450,9 @@ class UtopyaJobScriptBatchPBS( UtopyaJobScriptBatch ) :
        # log file for submission:
        job_info = bname+'.info'
        # write to file:
        f = open( job_info, 'w' )
        with open( job_info, 'w' ) as f :
            f.writelines(infotext)
        f.close()
        #endwith

    #enddef Submit
    
@@ -1558,6 +1559,308 @@ class UtopyaJobScriptBatchPBS( UtopyaJobScriptBatch ) :
#endclass UtopyaJobScriptBatchPBS


# ======================================================================
# ===
# === jobs submitted to Sun Grid Engine
# ===
# ======================================================================


class UtopyaJobScriptBatchSGE( UtopyaJobScriptBatch ) :

    """
    Class to create job for submission to Sun Grid Engine (SGE) batch system.
    
    Example of job options::
    
      #$ -N myjob
      #$ -o myjob.out
      #$ -e myjob.err
    
    Example of usage::
        
        # init object:
        jb = utopya.UtopyaJobScriptBatchSGE()

        # obtain line with job options from rcfile:
        options = jb.GetOptionsRc( 'UtopyaJobScriptBatchSGE.rc', rcbase='appl', \\
                                env={'name':'myjob'} )
        
        # fill script lines:
        lines = []
        lines.append( '#! /usr/bin/env python\\n' )
        lines.append( '\\n' )
        lines.append( options )
        lines.append( '\\n' )
        lines.append( '# do something:\\n' )
        lines.append( 'print( "boe!" )\\n' )
        lines.append( '\\n' )
        
        # write:
        jb.Create( 'myjob.jb', lines )
        
        # submit:
        jb.Submit( 'myjob.jb' )
    
    See also man pages of batch job commands:
            
    * :manpage:`qsub`
    * :manpage:`qdel`
    * :manpage:`qstat`

    """
    
    def GetOptionsRc( self, rcfile, rcbase='', env={} ) :
    
        """
        Return str line (with newline characters) with job options based on rcfile settings.
        
        The rcfile settings should start with '[<rcbase>.]batch.sge',
        where the rcbase might be empty or equal to '*' for default settings.
        
        Example settings for rcbase 'appl'::
                    
            ! job format for this application:
            appl.batch.sge.format            :  sge_format
            ! which keywords:
            appl.batch.sge.options           :  name output error
            ! values:
            appl.batch.sge.option.name       :  N myjob
            appl.batch.sge.option.output     :  o %(name).out
            appl.batch.sge.option.error      :  e %(name).err
    
            ! Define format of batch options, e.g.:
            !   #$ -flag value
            sge_format.comment       :  #
            sge_format.prefix        :  $
            sge_format.arg           :  '-'
            sge_format.assign        :  ' '
            sge_format.template      :  %(key)
            sge_format.envtemplate   :  %(env:key)

        This will return the following job options as a str with newline characters::
    
            #$ -N myjob
            #$ -o myjob.out
            #$ -e myjob.err

        """
        
        # call parent:
        lines = UtopyaJobScriptBatch.GetOptionsRc( self, rcfile, 'batch.sge', rcbase=rcbase, env=env )
        
        # ok
        return lines
        
    #enddef GetOptionsRc
    
    # *
    
    def Submit( self, jbfile, _indent='' ) :
    
        """
        Submit job file.
        Information on job id and commands to follow and cancel the job
        are written to a file with the same name but extension '.info' .
        
        The process id is written to a file with extension '``.pid``'.
        """
        
        # modules:
        import sys
        import os
        import subprocess        

        # basename for scripts etc is name of rcfile minus extension:
        bname,ext = os.path.splitext(jbfile)
        
        # pid file to store process id:
        pid_file = bname+'.pid'
        # remove if already present, otherwise it will hold a previous id
        # if something goes wrong before the subprocess is started:
        if os.path.isfile( pid_file ) : os.remove( pid_file )
        
        # qsub seems to append new output/error to existing file;
        # remove files that are probably the previous output/error:
        for fname in [ bname+'.out', bname+'.err' ] :
            if os.path.isfile( fname ) : os.remove( fname )
        #endfor        

        # setup command line, should read jbfile from standard input:
        #    qsub jbfile
        command = 'qsub'
        # info ...
        logging.info( _indent+'command    : %s' % command )
        logging.info( _indent+'jobs script: %s' % jbfile )

        # init submission info file:
        infotext = []
        infotext.append(  '\n' )

        # start command, trap errors (command does not exist etc):
        try:
            # handle standard input, output, and error later on by 'communicate' :
            p = subprocess.Popen( [command,jbfile], 
                                     stdout=subprocess.PIPE, stderr=subprocess.PIPE )
        except :
            logging.error( sys.exc_info()[1] )
            logging.error( 'from command : %s' % command )
            raise Exception
        #endtry
        # send job lines to standard input, obtain standard output and error in return:
        stdout,stderr = p.communicate()
        # convert to str in necessary:
        try :
            stdout = stdout.decode()
            stderr = stderr.decode()
        except :
            pass
        #endtry
        # add to help info message:
        infotext = infotext + stdout.split('\n')
        infotext = infotext + stderr.split('\n')
        # standard output is:
        #   Your job <jobnr> ("<jobname>") has been submitted
        # extract job id from line:
        try :
            job_id = stdout.split()[2]
        except :
            logging.error( 'could not extract job id from stdout:' )
            logging.error( stdout )
            raise
        #endtry
        
        # write process id file:
        with open( pid_file, 'w' ) as f :
            f.write( '%s\n' % job_id )
        #endwith
    
        # help text:
        infotext.append( '\n' )
        infotext.append( 'To manage SGE jobs:\n' )
        infotext.append( '\n' )
        infotext.append( '  qsub %s              # submit to queue\n' % jbfile )
        infotext.append( '  qscan %s             # list this job\n' % job_id )
        infotext.append( '  qdel %s              # kill this job\n' % job_id )
        infotext.append( '\n' )

        # write to loging system:
        for line in infotext :
            logging.info( _indent+line.rstrip() )
        #endfor

        # log file for submission:
        job_info = bname+'.info'
        # write to file:
        with open( job_info, 'w' ) as f :
            f.writelines(infotext)
        #endwith

    #enddef Submit
    
#    # *
#    
#    def CheckStatus( self, pid_file, _indent='' ) :
#    
#        """
#        Read the job id from the '``.pid``' file created by
#        the :py:meth:`Submit` method, and check the current status.
#        Returns a str with one of the following values:
#        
#        * '``running``' if the job is still running;
#        * '``stopped``' if the job id is not present anymore,
#          which is interpreted as that the job was stopped.
#        """
#        
#        # modules:
#        import os
#        import subprocess
#        
#        # check ...
#        if not os.path.isfile( pid_file ) :
#            logging.error( 'process id file "%s" not found' % pid_file )
#            raise Exception
#        #endif
#        # read id:
#        with open( pid_file, 'r' ) as f :
#            job_id = f.read().strip()
#        #endwith
#        # info ..
#        logging.info( _indent+'check job id %s ...' % job_id )
#        
#        # command to check status:
#        command = [ 'qstat', job_id ]
#        # start command, trap errors (command does not exist etc):
#        try:
#            # handle standard input, output, and error later on by 'communicate' :
#            p = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE )
#        except :
#            logging.error( sys.exc_info()[1] )
#            logging.error( 'from command : %s' % command )
#            raise Exception
#        #endtry
#        # send job lines to standard input, obtain standard output and error in return:
#        stdout,stderr = p.communicate()
#        # convert to str in necessary:
#        try :
#            stdout = stdout.decode()
#            stderr = stderr.decode()
#        except :
#            pass
#        #endtry
#        # cleanup:
#        outlines = []
#        for line in stdout.strip().split('\n') :
#            if len(line) > 0 : outlines.append( line )
#        #endfor
#        errlines = []
#        for line in stderr.strip().split('\n') :
#            if len(line) > 0 : errlines.append( line )
#        #endfor
#
#        # info ...
#        logging.info( _indent+'  stdout:' )
#        if len(outlines) > 0 :
#            for line in outlines : logging.info( _indent+'    '+line )
#        #endif
#        logging.info( _indent+'  stderr:' )
#        if len(errlines) > 0 :
#            for line in errlines : logging.info( _indent+'    '+line )
#        #endif
#
#        # current status:
#        if len(errlines) > 0 :
#            pstat = errlines[0]
#            if 'Unknown Job Id' in pstat :
#                status = 'stopped'
#            else :
#                logging.error( 'unsupported error message' )
#                raise Exception
#            #endif
#        else :
#            # output for running jobs:
#            #   Job id            Name             User              Time Use S Queue
#            #   ----------------  ---------------- ----------------  -------- - -----
#            #   6671814.ccapar    flexvar.var4d.i  nl5               00:01:26 R ns
#            if len(outlines) == 3 :
#                pstat = outlines[2]
#                status = 'running'
#            else :
#                logging.error( 'unsupported process stdout' )
#                raise Exception
#            #endif
#        #endif
#        # info ...
#        logging.info( _indent+'status: %s ("%s")' % (status,pstat) )
#        
#        # ok
#        return status
#        
#    #enddef CheckStatus

#endclass UtopyaJobScriptBatchSGE
    
    
# ======================================================================
# ===
# === jobs submitted to Slurm
@@ -1735,9 +2038,9 @@ class UtopyaJobScriptBatchSlurm( UtopyaJobScriptBatch ) :
        # log file for submission:
        job_info = bname+'.info'
        # write to file:
        f = open( job_info, 'w' )
        with open( job_info, 'w' ) as f :
            f.writelines(infotext)
        f.close()
        #endwith

    #enddef Submit

@@ -1910,9 +2213,9 @@ class UtopyaJobScriptBatchLoadLeveler( UtopyaJobScriptBatch ) :
        # log file for submission:
        job_info = bname+'.info'
        # write to file:
        f = open( job_info, 'w' )
        with open( job_info, 'w' ) as f :
            f.writelines(infotext)
        f.close()
        #endwith

    #enddef Submit