TNO Intern

Commit 189708e6 authored by Arjo Segers's avatar Arjo Segers
Browse files

Convert selected orbit files based on selection query. Support combination of...

Convert selected orbit files based on selection query. Support combination of multiple inquiry files.
parent ef8e5762
Loading
Loading
Loading
Loading
+162 −129
Original line number Diff line number Diff line
#
# Changes
# 
# 2022-09, Arjo Segers, Met-Norway
# 2022-09, Arjo Segers
#   Updated documentation of special conversions.
#
# 2022-09, Arjo Segers, Met-Norway
# 2022-09, Arjo Segers
#   Download files during conversion, eventually remove them when done.
# 
# 2022-09, Arjo Segers, Met-Norway
# 2022-09, Arjo Segers
#   Added flags to disable compression and packing of variables on output, 
#   by default both are enabled.
#
# 2023-08, Arjo Segers
#   Convert selected orbit files based on selection query.
#   Support combination of multiple inquiry files.
#

########################################################################
###
@@ -677,7 +681,7 @@ class CSO_S5p_File( cso_file.CSO_File ) :
        import numpy
        import xarray
        import pandas
        #import scipy.interpolate
        import scipy.interpolate
        
        # info ..
        logging.info( indent+'add pixels to satellite file ...' )
@@ -1410,8 +1414,10 @@ class CSO_S5p_File( cso_file.CSO_File ) :
                    
                    # define interpolator with index ax,
                    # apply with half-level indices:
                    hp = numpy.array( scipy.interpolate.interp1d( numpy.arange(sfile.nlayer), pmid, 
                                          fill_value='extrapolate')( numpy.arange(sfile.nlayer+1)-0.5 ),
                    hp = numpy.array( 
                                scipy.interpolate.interp1d(
                                        numpy.arange(sfile.nlayer), pmid, fill_value='extrapolate'
                                    )( numpy.arange(sfile.nlayer+1)-0.5 ),
                            dtype='f4' )

                    # create variable:
@@ -1963,10 +1969,26 @@ class CSO_S5p_Convert( utopya.UtopyaRc ) :
        tfmt = '%Y-%m-%d %H:%M'
        logging.info( indent+'timerange: [%s,%s]' % (t1.strftime(tfmt),t2.strftime(tfmt)) )

        # inquire table:
        filename__template = self.GetSetting( 'inquire.file' )
        # inquire tables:
        filename__templates = self.GetSetting( 'inquire.file' ).split(';')
        # time stamp in file?
        filedate = self.GetSetting( 'inquire.filedate', default='' )
        filedates = self.GetSetting( 'inquire.filedate', default='' )
        if len(filedates) == 0 :
            filedates = ['']*len(filename__templates)
        elif len(filedates) != len(filename__templates) :
            logging.error( 'number of filedates should match with number of listing file names:' )
            logging.error( '  inquire.file      : %s' % self.GetSetting( 'inquire.file' ) )
            logging.error( '  inquire.filedate  : %s' % self.GetSetting( 'inquire.filedate' ) )
            raise Exception
        #endif
        # init storage:
        dfs = []
        # loop:
        for ifile in range(len(filename__templates)) :
            # current:
            filename__template = filename__templates[ifile].strip()
            filedate = filedates[ifile].strip()
            # time to use for templates:
            if len(filedate) > 0 :
                t0 = datetime.datetime.strptime( filedate, '%Y-%m-%d' )
            else :
@@ -1982,19 +2004,38 @@ class CSO_S5p_Convert( utopya.UtopyaRc ) :
            # info ..
            logging.info( indent+'read inquire table: %s' % filename )
            # read:
        df = pandas.read_csv( filename, sep=';', skip_blank_lines=True,
            dfs.append( pandas.read_csv( filename, sep=';', skip_blank_lines=True,
                                           parse_dates=['start_time','end_time'],
                                       dtype='str' )
                                           dtype='str' ) )
        #endfor # filenames
        # combine:
        df = pandas.concat( dfs )
        # sort by filename:
        df = df.sort_values( 'filename' )

        # info ...
        logging.info( indent+'number of files : %i' % len(df) )
        
        # selected proecessings:
        processings = self.GetSetting( 'processings' ).split()
        # selected processor versions:
        processor_versions = self.GetSetting( 'processor_versions' ).split()
        ## selected proecessings:
        #processings = self.GetSetting( 'processings' ).split()
        ## selected processor versions:
        #processor_versions = self.GetSetting( 'processor_versions' ).split()
        
        # list of ';' seperated selection expression:
        #     (%{processor_version} == '020400') & (%{processing} == 'RPRO') ; ...
        line = self.GetSetting( 'selection' )
        # replace templates:
        #     (rec['processor_version'] == '020400') & (rec['processing'] == 'RPRO') ; ...
        for key in df.keys() :
            line = line.replace( "%{"+key+"}", "rec['"+key+"']" )
        #endfor
        # split:
        selections = line.split(';')
        # info ..
        logging.info( 'selection criteria (first with matching orbit is used):' )
        for selection in selections :
            logging.info( '  %s' % selection.strip() )
        #endif

        # skip some?
        blacklist = self.GetSetting( 'blacklist', default='' ).split()
@@ -2021,77 +2062,84 @@ class CSO_S5p_Convert( utopya.UtopyaRc ) :
        # compression level:
        complevel = self.GetSetting( 'output.complevel', 'int' )

        ## max layers:
        #nlayer = self.GetSetting( 'leip.prod.xomi.max_nlayer_trop', 'int' )
        # select records with start time inside time range:
        xdf = df[ (df['start_time'] >= t1) & (df['start_time'] <= t2) ]
        # info ..
        logging.info( indent+'found %i orbits with overlap of time range ..' % len(xdf) )
        
        # add column, will be set to True if the record is used:
        df['used'] = False
        # oribit labels:
        orbits = xdf['orbit'].unique()
            
        # info ..
        logging.info( indent+'loop over files ...' )
        # loop over records:
        for indx in df.index :
            # testing ...
            #print( indx, rec )
            # record:
            rec = df.loc[indx]

            # skip if already used ..
            if rec['used'] : continue
        logging.info( indent+'loop over orbits overlapping with time interval ...' )
        # loop over orbits:
        for orbit in orbits :
            # info ...
            logging.info( indent+'  orbit "%s" ...' % orbit )

            # search for other records for same orbit:
            dfx = df[ (df['orbit'            ] == rec['orbit'            ]) & \
                      (df['processing'       ] == rec['processing'       ]) & \
                      (df['processor_version'] == rec['processor_version'])   ]
            
            # info ...
            logging.info( indent+'  input file(s):' )
            for filename in dfx['filename'] :
                logging.info( indent+'    %s ...' % os.path.basename( filename ) )
            odf = xdf[ xdf['orbit'] == orbit ]
            
            # storage for status label: "selected", "blacklisted", ...
            filestatus = {}
            # no match yet ..
            seleted = []
            # loop over selection criteria,
            # this should give either none or a single file:
            for selection in selections :
                # make empty again:
                selected = []
                # loop over records:
                for indx,rec in odf.iterrows() :
                    # skip?
                    if os.path.basename(rec['filename']) in blacklist : 
                        filestatus[rec['filename']] = 'blacklisted'
                        continue
                    #endif
            
            # one of the files explicitly mentioned as corrupted?
            # then skip entire orbit ..
            skip_record = False
            for filename in dfx['filename'] :
                if os.path.basename(filename) in blacklist :
                    # info ..
                    logging.info( indent+'    input file in blacklist, skip ...' )
                    # skip:
                    skip_record = True
                    # leave loop over file
                    break
                    # evaluate expression including 'rec[key]' values:
                    if eval( selection ) :
                        selected.append( rec['filename'] )
                        filestatus[rec['filename']] = 'selected'
                    #endif
                #endfor # records
                # exactly one? then leave:
                if len(selected) == 1 :
                    break
                elif len(selected) > 1 :
                    logging.error( 'found more than one orbit file matching selection: %s' % selection )
                    for fname in selected :
                        logging.error( '  %s' % fname )
                    #endfor
            # skip?
            if skip_record : continue
                    raise Exception
                #endif  # number found
            #endfor # selection criteria

            # filter ..
            if rec['processing'] not in processings :
                # info ..
                logging.info( indent+'    skip processing "%s" ..' % rec['processing'] )
                # skip:
                continue
            #endif
            # filter ...
            if len(processor_versions) > 0 :
                if rec['processor_version'] not in processor_versions :
                    # info ..
                    logging.info( indent+'    skip processor_version "%s" ..' % rec['processor_version'] )
                    # skip:
            # info ...
            logging.info( indent+'    available file(s):' )
            # loop:
            for fname in odf['filename'] :
                line = fname
                if fname in filestatus.keys() : line = line+' ['+filestatus[fname]+']'
                logging.info( indent+'      '+line )
            #endfor
            
            # no match?
            if len(selected) == 0 :
                # info ...
                logging.warning( '    no match with any selection criterium; next orbit ...' )
                ## degug:
                #for selection in selections :
                #    logging.warning( '    %s' % selection.strip() )
                #logging.warning( '  record:' )
                #for key in rec.keys() :
                #    logging.warning( '    "%s" : "%s"' % (key,rec[key]) )
                # next orbit:
                continue
            #endif
            #endif # filter on processor versions?
                    
            # start time of orbit:
            t0 = rec['start_time']
            
            # filter ...
            if (t0 < t1) or (t0 > t2) :
                logging.info( indent+'    outside timerange, skip ..' )
                continue
            #endif
            
            # target file:
            output_filename = t0.strftime( output_filename__template )
            for key in rec.keys() :
@@ -2177,16 +2225,9 @@ class CSO_S5p_Convert( utopya.UtopyaRc ) :
            # create?
            if create :
            
                # description of input files, used as informative attribute:
                input_filename_descr = ' '.join(dfx['filename'])
                
                # keep list of downloaded files:
                downloads = []
                
                # download input files if necessary;
                # loop over input files:
                for filename in dfx['filename'] :
            
                # input dir:
                input_dir = t0.strftime( input_dir__template )
                # replace templates:
@@ -2196,7 +2237,7 @@ class CSO_S5p_Convert( utopya.UtopyaRc ) :
                    #endif
                #endfor
                # full path:
                    input_file = os.path.join( input_dir, filename )
                input_file = os.path.join( input_dir, rec['filename'] )

                # info ..
                logging.info( '    input file: %s' % input_file )
@@ -2210,14 +2251,6 @@ class CSO_S5p_Convert( utopya.UtopyaRc ) :
                    downloads.append( input_file )
                #endif
                    
                #endfor # files with orbit slabs
                
                # NOT YET ...
                if len(dfx) > 1 :
                    logging.error( 'TO BE DONE: could not combine multiple input files yet ..' )
                    raise Exception
                #endif
                
                # download might have failed ..
                if not os.path.isfile(input_file) :
                    # info ..
@@ -2277,12 +2310,12 @@ class CSO_S5p_Convert( utopya.UtopyaRc ) :
                    # add:
                    csf.AddSelection( sfile, selected, self.rcf, self.rcbase, indent=indent+'    ' )
                    # update history:
                    history.append( 'added %i pixels from %s' % (nselected,input_filename_descr) )
                    history.append( 'added %i pixels from %s' % (nselected,os.path.basename(input_file)) )
                    # update attributes:
                    for key in ['orbit','processing','processor_version'] :
                    for key in ['orbit','processing','processor_version','collection'] :
                        attrs[key] = rec[key]
                    #endfor
                    attrs['orbit_file'] = input_filename_descr
                    attrs['orbit_file'] = input_file
                    # write:
                    csf.Write( filename=output_filename, attrs=attrs, history=history, 
                                 packed=packed, complevel=complevel )