TNO Intern

Commit 565137a6 authored by Arjo Segers's avatar Arjo Segers
Browse files

Select input file with latest processor version or production time if multiple...

Select input file with latest processor version or production time if multiple are selected. Convert multiple files if orbit is stored in patches. Check on duplicate output files.
parent 97781891
Loading
Loading
Loading
Loading
+6 −1
Original line number Diff line number Diff line
@@ -308,6 +308,8 @@ cso.tutorial.inquire-table-dataspace.url : https://stac.data
! collection name:
cso.tutorial.inquire-table-dataspace.collections             :  sentinel-5p-l2-no2-rpro \
                                                                sentinel-5p-l2-no2-offl
!!~ near-real time processing:
!cso.tutorial.inquire-table-dataspace.collections             :  sentinel-5p-l2-no2-nrti

! target area;
!!~ empty for no limitation:
@@ -375,6 +377,8 @@ my.tutorial.selection : C03
!
cso.tutorial.convert.selection                     :  (%{collection} == '03') and (%{processing} == 'RPRO') ; \
                                                      (%{collection} == '03') and (%{processing} == 'OFFL')
!!~ for near-real-time processing:
!cso.tutorial.convert.selection                     :  %{collection} == '03'

! input directory;
! files are searched here or downloaded to if not present yet;
@@ -425,7 +429,8 @@ cso.tutorial.convert.filter.quality.units : 1
! - times are taken from mid of selection, rounded to hours
! - use '%{processing}' for the processing name
! - use '%{orbit}' for orbit number
cso.tutorial.convert.output.filename            :  ${my.work}/CSO-data/${my.region}/S5p/NO2/${my.tutorial.selection}/%Y/%m/S5p_%{processing}_NO2_%{orbit}.nc
! - include start time to have different files if orbit is stored in patches
cso.tutorial.convert.output.filename            :  ${my.work}/CSO-data/${my.region}/S5p/NO2/${my.tutorial.selection}/%Y/%m/S5p_%{processing}_NO2_%{orbit}_%Y%m%dT%H%M%S.nc

! pack variables on output:
cso.tutorial.convert.output.packed              :  True
+90 −29
Original line number Diff line number Diff line
@@ -29,6 +29,8 @@
#
# 2026-04, Arjo Segers
#   Initialize columns of listing file.
#   Added production time of data files to listings.
#   Select file with latest production time if multiple are found.
#

########################################################################
@@ -185,7 +187,7 @@ class CSO_ColHubMirror_Inquire(utopya.UtopyaRc):

            # initiallize for (re)creation,
            # define extra columns to ensure correct content even for empty files:
            listing = cso_file.CSO_Listing( columns=["orbit","processing","collection","processor_version","href"] )
            listing = cso_file.CSO_Listing( columns=["orbit","processing","collection","processor_version","production_time","href"] )

            # archive directories
            archive_dirs = self.GetSetting("dir").split()
@@ -242,7 +244,7 @@ class CSO_ColHubMirror_Inquire(utopya.UtopyaRc):
                                        orbit,
                                        collection,
                                        processor_version,
                                        prod_time,
                                        production_time,
                                    ) = rest.split("_")
                                except:
                                    logging.error(
@@ -267,6 +269,7 @@ class CSO_ColHubMirror_Inquire(utopya.UtopyaRc):
                                data["processing"] = processing
                                data["collection"] = collection
                                data["processor_version"] = processor_version
                                data["production_time"] = production_time
                                data["href"] = os.path.join(root, fname)
                                # update record:
                                listing.UpdateRecord(fname, data, indent=f"{indent}    ")
@@ -413,8 +416,10 @@ class CSO_ColHubMirror_Missing(utopya.UtopyaRc):
            # info ..
            logging.info(f"{indent}create %s ..." % lst_file)

            # initiallize for (re)creation:
            listing = cso_file.CSO_Listing()
            # initiallize for (re)creation,
            # define extra columns to ensure correct content even for empty files:
            listing = cso_file.CSO_Listing( columns=["orbit","processing","collection","processor_version","production_time","href"] )


            # table with all available files:
            listfile_all = self.GetSetting("all.file")
@@ -460,6 +465,8 @@ class CSO_ColHubMirror_Missing(utopya.UtopyaRc):
            logging.info(f"{indent}loop over available orbits ...")
            # loop:
            for orbit in orbits:
                ## testing ...
                #if orbit != '40182' : continue
                # info ..
                logging.info(f"{indent}  orbit '{orbit}' ...")

@@ -469,11 +476,66 @@ class CSO_ColHubMirror_Missing(utopya.UtopyaRc):
                # nothing selected?
                if len(xlst) == 0:
                    continue
                #
                elif len(xlst) == 1:
                    # single record:
                    rec = xlst.GetRecord(irec=0)
                #
                else:
                    # multiple records ...
                    # init name of selected file:
                    selected_filename = None
                    #
                    # 1st adhoc fix ...
                    # check if these are the same data but file was recreated later (and forgotten to be removed ...)
                    #    S5P_RPRO_L2__CO_____20250715T142651_20250715T161018_40182_03_020800_20251103T112819.nc
                    #    S5P_RPRO_L2__CO_____20250715T142651_20250715T161018_40182_03_020800_20251030T133312.nc
                    # init basenames:
                    basenames = []
                    # loop over filenames:
                    for filename in xlst.df.index:
                        # basename without production time:
                        basename = filename.rsplit(sep="_",maxsplit=1)[0]
                        # store:
                        if basename not in basenames:
                            basenames.append( basename )
                        #endif
                        # first?
                        if selected_filename is None:
                            # store first values:
                            selected_filename = filename
                        else:
                            # newer?
                            if filename > selected_filename:
                                selected_filename = filename
                            #endif
                        #endif
                    #endfor # filenames
                    # should be the same ..
                    if len(basenames) == 1:
                        # info ...
                        logging.info(f"{indent}    selected single file based on latest production time:")
                        logging.info(f"{indent}      {selected_filename}")
                    else:
                        # info ...
                        logging.warning(f"{indent}    checked on production time, but found multiple base names:")
                        for basename in basenames:
                            logging.warning(f"{indent}      {basename}")
                        #endfor
                        # reset selected name, check below:
                        selected_filename = None
                    #endif
                    #
                    # none of the adhoc fix lead to selection of a single file ...
                    if selected_filename is None:
                        logging.error(f"multiple records selected, something wrong with files or selection?")
                        raise Exception
                    #endif
                    #
                    # extract record:
                    rec = xlst.GetRecord( name=selected_filename )
                # endif

                # loop over selected records:
                for irec in range(len(xlst)):
                    # current:
                    rec = xlst.GetRecord(irec)
                ## info ..
                # logging.info( f"{indent}      {fname}" )
                # set flag:
@@ -497,7 +559,6 @@ class CSO_ColHubMirror_Missing(utopya.UtopyaRc):
                    # add record to list:
                    listing.UpdateRecord(rec["filename"], rec)
                # endif
                # endfor

                ## TESTING ...
                # if len(listing) > 0 :
+17 −4
Original line number Diff line number Diff line
@@ -55,6 +55,8 @@
#   Fixed definition of bounding box for global selection.
#   Added increasing delays to avoid rate limit errors from STAC catalogue inquiry.
#   Check on different paths to 'href' attribute in stac inquiry.
#   Trap files that could not be downloaded.
#   Added production time of data files to listings.
#


@@ -378,7 +380,7 @@ class CSO_DataSpace_Inquire(utopya.UtopyaRc):
                            platform_name, processing, rest = product_id.split("_", 2)
                            product_type = rest[0:10]
                            parts = rest[11:].split("_")
                            start_time, end_time, orbit, collection, processor_version, prod_time = parts
                            start_time, end_time, orbit, collection, processor_version, production_time = parts

                            # convert:
                            tfmt = "%Y%m%dT%H%M%S"
@@ -427,6 +429,7 @@ class CSO_DataSpace_Inquire(utopya.UtopyaRc):
                                "processing": [processing],
                                "collection": [collection],
                                "processor_version": [processor_version],
                                "production_time": [production_time],
                                "filename": [filename],
                                "href": [href],
                            }
@@ -783,8 +786,12 @@ class CSO_DataSpace_Downloader(object):
        # check ...
        if npfile != 1:
            print(f"ERROR - found {npfile} files in S3 bucket for product: {product}")
            if npfile == 0:
                raise Exception(f"file not found in S3 bucket")
            else:
                raise Exception
            #endif
        #endif

        # loop over (single) files:
        for pfile in pfiles:
@@ -893,8 +900,14 @@ class CSO_DataSpace_Downloader(object):

                # unknown ...
                except Exception as err:
                    # check on known errors ..
                    if "file not found in S3 bucket" in str(err):
                        logging.warning(f"{indent}  WARNING - could not download: {href}")
                        break
                    else:
                        logging.error(f"{indent}unknown error:")
                        logging.error(f"{indent}  {str(err)}")
                    #endif

                # endtry

+21 −6
Original line number Diff line number Diff line
@@ -68,6 +68,9 @@
# 2026-04, Arjo Segers
#   Added optional "columns" argument to initialize empty listing object.
#
# 2026-05, Arjo Segers
#   Support `name` argument for GetRecord method.
#

########################################################################
###
@@ -1407,11 +1410,24 @@ class CSO_Listing(object):

    # *

    def GetRecord(self, irec):
    def GetRecord(self, irec=None, name=None):
        """
        Return record for corresponding 0-based row index.
        Return record for corresponding 0-based row index or index name.
        """
        
        # modules:
        import numpy
        
        # by name?
        if name is not None:
            indx, = numpy.where( self.df.index == name )
            if len(indx) == 0:
                logging.error(f"value `{name}` not found in index")
                raise Exception
            #endif
            irec = indx[0]
        #endif

        # extract:
        rec = self.df.iloc[irec].copy()
        # add index values:
@@ -1604,8 +1620,7 @@ class CSO_Listing(object):
                (%{processor_version} == '020400') & (%{processing} == 'RPRO') ; ...

          This is evaluted after the previously described selections.
          The result should be None or exactly one record.
          Eventually skip files in ``blacklists``.
          Eventually skip files in ``blacklist``.

        """

@@ -1650,7 +1665,7 @@ class CSO_Listing(object):
            # endif
            # select:
            df = df[df[key] == value]
        # endfor
        # endfor # key/value pairs

        # *

@@ -1734,7 +1749,7 @@ class CSO_Listing(object):
                df = df.loc[selected]
            # endif

        # endif
        # endif # expr defined

        # *

+301 −259

File changed.

Preview size limit exceeded, changes collapsed.