diff --git a/config/tutorial/tutorial.rc b/config/tutorial/tutorial.rc index 90618234901bed49a8451b91a4fd1ce5d94fb894..b4e689d79e252b9d278dfcc73bb011ff89228c67 100644 --- a/config/tutorial/tutorial.rc +++ b/config/tutorial/tutorial.rc @@ -308,6 +308,8 @@ cso.tutorial.inquire-table-dataspace.url : https://stac.data ! collection name: cso.tutorial.inquire-table-dataspace.collections : sentinel-5p-l2-no2-rpro \ sentinel-5p-l2-no2-offl +!!~ near-real time processing: +!cso.tutorial.inquire-table-dataspace.collections : sentinel-5p-l2-no2-nrti ! target area; !!~ empty for no limitation: @@ -375,6 +377,8 @@ my.tutorial.selection : C03 ! cso.tutorial.convert.selection : (%{collection} == '03') and (%{processing} == 'RPRO') ; \ (%{collection} == '03') and (%{processing} == 'OFFL') +!!~ for near-real-time processing: +!cso.tutorial.convert.selection : %{collection} == '03' ! input directory; ! files are searched here or downloaded to if not present yet; @@ -425,7 +429,8 @@ cso.tutorial.convert.filter.quality.units : 1 ! - times are taken from mid of selection, rounded to hours ! - use '%{processing}' for the processing name ! - use '%{orbit}' for orbit number -cso.tutorial.convert.output.filename : ${my.work}/CSO-data/${my.region}/S5p/NO2/${my.tutorial.selection}/%Y/%m/S5p_%{processing}_NO2_%{orbit}.nc +! - include start time to have different files if orbit is stored in patches +cso.tutorial.convert.output.filename : ${my.work}/CSO-data/${my.region}/S5p/NO2/${my.tutorial.selection}/%Y/%m/S5p_%{processing}_NO2_%{orbit}_%Y%m%dT%H%M%S.nc ! pack variables on output: cso.tutorial.convert.output.packed : True diff --git a/src/cso/cso_colhub.py b/src/cso/cso_colhub.py index de30345a9b41c6f57c4d7aaaa9e300adac64a028..35c49843e20d1718d111c8f35e2ddf7542359f25 100644 --- a/src/cso/cso_colhub.py +++ b/src/cso/cso_colhub.py @@ -29,6 +29,8 @@ # # 2026-04, Arjo Segers # Initialize columns of listing file. +# Added production time of data files to listings. +# Select file with latest production time if multiple are found. # ######################################################################## @@ -185,7 +187,7 @@ class CSO_ColHubMirror_Inquire(utopya.UtopyaRc): # initiallize for (re)creation, # define extra columns to ensure correct content even for empty files: - listing = cso_file.CSO_Listing( columns=["orbit","processing","collection","processor_version","href"] ) + listing = cso_file.CSO_Listing( columns=["orbit","processing","collection","processor_version","production_time","href"] ) # archive directories archive_dirs = self.GetSetting("dir").split() @@ -242,7 +244,7 @@ class CSO_ColHubMirror_Inquire(utopya.UtopyaRc): orbit, collection, processor_version, - prod_time, + production_time, ) = rest.split("_") except: logging.error( @@ -267,6 +269,7 @@ class CSO_ColHubMirror_Inquire(utopya.UtopyaRc): data["processing"] = processing data["collection"] = collection data["processor_version"] = processor_version + data["production_time"] = production_time data["href"] = os.path.join(root, fname) # update record: listing.UpdateRecord(fname, data, indent=f"{indent} ") @@ -413,8 +416,10 @@ class CSO_ColHubMirror_Missing(utopya.UtopyaRc): # info .. logging.info(f"{indent}create %s ..." % lst_file) - # initiallize for (re)creation: - listing = cso_file.CSO_Listing() + # initiallize for (re)creation, + # define extra columns to ensure correct content even for empty files: + listing = cso_file.CSO_Listing( columns=["orbit","processing","collection","processor_version","production_time","href"] ) + # table with all available files: listfile_all = self.GetSetting("all.file") @@ -460,6 +465,8 @@ class CSO_ColHubMirror_Missing(utopya.UtopyaRc): logging.info(f"{indent}loop over available orbits ...") # loop: for orbit in orbits: + ## testing ... + #if orbit != '40182' : continue # info .. logging.info(f"{indent} orbit '{orbit}' ...") @@ -469,35 +476,89 @@ class CSO_ColHubMirror_Missing(utopya.UtopyaRc): # nothing selected? if len(xlst) == 0: continue + # + elif len(xlst) == 1: + # single record: + rec = xlst.GetRecord(irec=0) + # + else: + # multiple records ... + # init name of selected file: + selected_filename = None + # + # 1st adhoc fix ... + # check if these are the same data but file was recreated later (and forgotten to be removed ...) + # S5P_RPRO_L2__CO_____20250715T142651_20250715T161018_40182_03_020800_20251103T112819.nc + # S5P_RPRO_L2__CO_____20250715T142651_20250715T161018_40182_03_020800_20251030T133312.nc + # init basenames: + basenames = [] + # loop over filenames: + for filename in xlst.df.index: + # basename without production time: + basename = filename.rsplit(sep="_",maxsplit=1)[0] + # store: + if basename not in basenames: + basenames.append( basename ) + #endif + # first? + if selected_filename is None: + # store first values: + selected_filename = filename + else: + # newer? + if filename > selected_filename: + selected_filename = filename + #endif + #endif + #endfor # filenames + # should be the same .. + if len(basenames) == 1: + # info ... + logging.info(f"{indent} selected single file based on latest production time:") + logging.info(f"{indent} {selected_filename}") + else: + # info ... + logging.warning(f"{indent} checked on production time, but found multiple base names:") + for basename in basenames: + logging.warning(f"{indent} {basename}") + #endfor + # reset selected name, check below: + selected_filename = None + #endif + # + # none of the adhoc fix lead to selection of a single file ... + if selected_filename is None: + logging.error(f"multiple records selected, something wrong with files or selection?") + raise Exception + #endif + # + # extract record: + rec = xlst.GetRecord( name=selected_filename ) + # endif - # loop over selected records: - for irec in range(len(xlst)): - # current: - rec = xlst.GetRecord(irec) - ## info .. - # logging.info( f"{indent} {fname}" ) - # set flag: - found = False - # loop over mirror listings: - for listing_curr in listings_curr: - # check if already available: - found = rec["filename"] in listing_curr - # found? - if found: - # info ... - logging.info(f"{indent} file already present ...") - # leave: - break - # endif - # endfor - # not found? - if not found: + ## info .. + # logging.info( f"{indent} {fname}" ) + # set flag: + found = False + # loop over mirror listings: + for listing_curr in listings_curr: + # check if already available: + found = rec["filename"] in listing_curr + # found? + if found: # info ... - logging.info(f"{indent} file not present yet, add to list ...") - # add record to list: - listing.UpdateRecord(rec["filename"], rec) + logging.info(f"{indent} file already present ...") + # leave: + break # endif # endfor + # not found? + if not found: + # info ... + logging.info(f"{indent} file not present yet, add to list ...") + # add record to list: + listing.UpdateRecord(rec["filename"], rec) + # endif ## TESTING ... # if len(listing) > 0 : diff --git a/src/cso/cso_dataspace.py b/src/cso/cso_dataspace.py index 8f812ea5b449cfd84f36cfbf2051489d47ed34b5..c793802f12fc940e416c5e09954498d06056024e 100644 --- a/src/cso/cso_dataspace.py +++ b/src/cso/cso_dataspace.py @@ -55,6 +55,8 @@ # Fixed definition of bounding box for global selection. # Added increasing delays to avoid rate limit errors from STAC catalogue inquiry. # Check on different paths to 'href' attribute in stac inquiry. +# Trap files that could not be downloaded. +# Added production time of data files to listings. # @@ -378,7 +380,7 @@ class CSO_DataSpace_Inquire(utopya.UtopyaRc): platform_name, processing, rest = product_id.split("_", 2) product_type = rest[0:10] parts = rest[11:].split("_") - start_time, end_time, orbit, collection, processor_version, prod_time = parts + start_time, end_time, orbit, collection, processor_version, production_time = parts # convert: tfmt = "%Y%m%dT%H%M%S" @@ -427,6 +429,7 @@ class CSO_DataSpace_Inquire(utopya.UtopyaRc): "processing": [processing], "collection": [collection], "processor_version": [processor_version], + "production_time": [production_time], "filename": [filename], "href": [href], } @@ -783,7 +786,11 @@ class CSO_DataSpace_Downloader(object): # check ... if npfile != 1: print(f"ERROR - found {npfile} files in S3 bucket for product: {product}") - raise Exception + if npfile == 0: + raise Exception(f"file not found in S3 bucket") + else: + raise Exception + #endif #endif # loop over (single) files: @@ -893,8 +900,14 @@ class CSO_DataSpace_Downloader(object): # unknown ... except Exception as err: - logging.error(f"{indent}unknown error:") - logging.error(f"{indent} {str(err)}") + # check on known errors .. + if "file not found in S3 bucket" in str(err): + logging.warning(f"{indent} WARNING - could not download: {href}") + break + else: + logging.error(f"{indent}unknown error:") + logging.error(f"{indent} {str(err)}") + #endif # endtry diff --git a/src/cso/cso_file.py b/src/cso/cso_file.py index 1f121f50ccae4164ab790accc23a838050e27560..0ad1114374db6fdacfdc921f65080cffb0d7ea0d 100644 --- a/src/cso/cso_file.py +++ b/src/cso/cso_file.py @@ -68,6 +68,9 @@ # 2026-04, Arjo Segers # Added optional "columns" argument to initialize empty listing object. # +# 2026-05, Arjo Segers +# Support `name` argument for GetRecord method. +# ######################################################################## ### @@ -1407,10 +1410,23 @@ class CSO_Listing(object): # * - def GetRecord(self, irec): + def GetRecord(self, irec=None, name=None): """ - Return record for corresponding 0-based row index. + Return record for corresponding 0-based row index or index name. """ + + # modules: + import numpy + + # by name? + if name is not None: + indx, = numpy.where( self.df.index == name ) + if len(indx) == 0: + logging.error(f"value `{name}` not found in index") + raise Exception + #endif + irec = indx[0] + #endif # extract: rec = self.df.iloc[irec].copy() @@ -1604,8 +1620,7 @@ class CSO_Listing(object): (%{processor_version} == '020400') & (%{processing} == 'RPRO') ; ... This is evaluted after the previously described selections. - The result should be None or exactly one record. - Eventually skip files in ``blacklists``. + Eventually skip files in ``blacklist``. """ @@ -1650,7 +1665,7 @@ class CSO_Listing(object): # endif # select: df = df[df[key] == value] - # endfor + # endfor # key/value pairs # * @@ -1734,7 +1749,7 @@ class CSO_Listing(object): df = df.loc[selected] # endif - # endif + # endif # expr defined # * diff --git a/src/cso/cso_s5p.py b/src/cso/cso_s5p.py index f6ef687c4a1c4b9e08fcae81ce965e31f6ef3b1e..838867e17369444d536dec452036516dfc6b7b50 100644 --- a/src/cso/cso_s5p.py +++ b/src/cso/cso_s5p.py @@ -81,6 +81,13 @@ # 2026-03, Arjo Segers # Updated selection of download source for Copernicus Dataspace. # +# 2026-04, Arjo Segers +# Trap download errors. +# Select input file with latest processor version or production time if multiple are selected. +# +# 2026-05, Arjo Segers +# Convert multiple files if orbit is stored in patches. Check on duplicate output files. +# ######################################################################## ### @@ -2865,7 +2872,7 @@ class CSO_S5p_Convert(utopya.UtopyaRc): # loop over orbits: for orbit in orbits: # info ... - logging.info(indent + ' orbit "%s" ...' % orbit) + logging.info(f"{indent} orbit '{orbit}' ...") ## TESTING .. #if orbit != "21861": @@ -2879,294 +2886,310 @@ class CSO_S5p_Convert(utopya.UtopyaRc): ) # no orbit found? next: if len(odf) == 0: + # next orbit: continue - elif len(odf) > 1: - logging.error(f"found multiple records matching selection;" + - " use finer selection, or something wrong in inquiry table(s)?" ) - raise Exception #endif - # selected record: - rec = odf.GetRecord(0) - - # start time of orbit: - t0 = rec["start_time"] - - # target file: - output_filename = t0.strftime(output_filename__template) - for key in rec.keys(): - if type(rec[key]) == str: - output_filename = output_filename.replace("%{" + key + "}", rec[key]) - # endif - # endfor - # info ... - logging.info(" target file: %s" % output_filename) - - # selected productions? - if len(whitelist) > 0: - if os.path.basename(output_filename) not in whitelist: - logging.info(" not in whitelist, skip ...") - continue + + # keep list of target files to check on duplicates .. + output_filenames = [] + # loop over records: + for irec in range(len(odf)): + # info ... + logging.info(f"{indent} record {irec+1}/{len(odf)} ...") + # selected record: + rec = odf.GetRecord(irec) + + # start time of orbit: + t0 = rec["start_time"] + + # target file: + output_filename = t0.strftime(output_filename__template) + for key in rec.keys(): + if type(rec[key]) == str: + output_filename = output_filename.replace(f"%{{{key}}}", rec[key]) + # endif + # endfor + # info ... + logging.info(f"{indent} target file: {output_filename}") + + # check .. + if output_filename in output_filenames: + logging.error(f"output file already created for this record:") + logging.error(f" {output_filename}") + logging.error(f"Orbit stored in more than one file?") + logging.error(f"Then ensure to include a start-time in the output filename template:") + logging.error(f" {output_filename__template}") + raise Exception + #endif + # store: + output_filenames.append( output_filename ) + + # selected productions? + if len(whitelist) > 0: + if os.path.basename(output_filename) not in whitelist: + logging.info("{indent} not in whitelist, skip ...") + continue + # endif # endif - # endif - # output dir: - cso_file.CheckDir(output_filename, dmode=dmode) + # output dir: + cso_file.CheckDir(output_filename, dmode=dmode) - # split filename at extension: - fname, ext = os.path.splitext(output_filename) - # error file: - output_errfile = fname + ".err" - # message file: - output_msgfile = fname + ".msg" + # split filename at extension: + fname, ext = os.path.splitext(output_filename) + # error file: + output_errfile = fname + ".err" + # message file: + output_msgfile = fname + ".msg" - # error file present? - if with_error_files and os.path.isfile(output_errfile): - # info .. - logging.info(" error file present:") - # read: - with open(output_errfile, "r") as f: - lines = f.readlines() - # endwith - # info.. - for line in lines: - logging.info(" %s" % line.strip()) - # endfor - # info .. - logging.info(" do not try to create again ...:") - # do not create .. - create = False - # - # ~ message file present? - elif os.path.isfile(output_msgfile): - # info .. - logging.info(" message file present:") - # read: - with open(output_msgfile, "r") as f: - lines = f.readlines() - # endwith - # info.. - for line in lines: - logging.info(" %s" % line.strip()) - # endfor - # info .. - logging.info(" do not try to create again ...:") - # do not create .. - create = False - # - # ~ not present yet, create file? - elif not os.path.isfile(output_filename): - # info ... - logging.info(" create new file ...") - # always create: - create = True - # - # ~ renew? - elif renew: - # info ... - logging.info(" renew file ...") - # always create: - create = True - # - # ~ already a version present: - elif os.path.isfile(output_filename): - # there might be duplicated processings, with different processing times; - # filenames are sorted, so current is probably newer - # open existing file: - csf = CSO_S5p_File(output_filename) - # original orbit as attribute? - orbit_file = csf.GetAttr("orbit_file", quiet=True) - if orbit_file is None: + # error file present? + if with_error_files and os.path.isfile(output_errfile): + # info .. + logging.info(f"{indent} error file present:") + # read: + with open(output_errfile, "r") as f: + lines = f.readlines() + # endwith + # info.. + for line in lines: + logging.info(" %s" % line.strip()) + # endfor + # info .. + logging.info(f"{indent} do not try to create again ...:") + # do not create .. + create = False + # + # ~ message file present? + elif os.path.isfile(output_msgfile): + # info .. + logging.info(f"{indent} message file present:") + # read: + with open(output_msgfile, "r") as f: + lines = f.readlines() + # endwith + # info.. + for line in lines: + logging.info(f"{indent} {line.strip()}") + # endfor + # info .. + logging.info(f"{indent} do not try to create again ...:") + # do not create .. + create = False + # + # ~ not present yet, create file? + elif not os.path.isfile(output_filename): # info ... - logging.info(' replace existing file, no "orbit_file" attribute yet ...') - # attribute not found, re-create to have this attribute: + logging.info(f"{indent} create new file ...") + # always create: create = True - elif fname <= orbit_file: - # info ... - logging.info(" keep existing file, based on same or newer source file ...") - # same file, no need to re-create: - create = False - else: + # + # ~ renew? + elif renew: # info ... - logging.info( - " replace existing file, based on older processing: %s" % orbit_file - ) - # replace: + logging.info(f"{indent} renew file ...") + # always create: create = True + # + # ~ already a version present: + elif os.path.isfile(output_filename): + # there might be duplicated processings, with different processing times; + # filenames are sorted, so current is probably newer + # open existing file: + csf = CSO_S5p_File(output_filename) + # original orbit as attribute? + orbit_file = csf.GetAttr("orbit_file", quiet=True) + if orbit_file is None: + # info ... + logging.info(f"{indent} replace existing file, no 'orbit_file' attribute yet ...") + # attribute not found, re-create to have this attribute: + create = True + elif fname <= orbit_file: + # info ... + logging.info(" keep existing file, based on same or newer source file ...") + # same file, no need to re-create: + create = False + else: + # info ... + logging.info(f"{indent} replace existing file, based on older processing: {orbit_file}") + # replace: + create = True + # endif # endif - # endif - # create? - if create: - # keep list of downloaded files: - downloads = [] + # create? + if create: + # keep list of downloaded files: + downloads = [] + + # source location, either url or local mirror: + href = rec["href"] + # local file? + if os.path.isfile(href): + # this is the input file: + input_file = href + # info .. + logging.info(f"{indent} input file: {input_file}") - # source location, either url or local mirror: - href = rec["href"] - # local file? - if os.path.isfile(href): - # this is the input file: - input_file = href - # info .. - logging.info(" input file: %s" % input_file) + else: + # input dir: + input_dir = t0.strftime(input_dir__template) + # replace templates: + for key in rec.keys(): + if type(rec[key]) == str: + input_dir = input_dir.replace(f"%{{{key}}}", rec[key]) + # endif + # endfor + # full path: + input_file = os.path.join(input_dir, rec["filename"]) - else: - # input dir: - input_dir = t0.strftime(input_dir__template) - # replace templates: - for key in rec.keys(): - if type(rec[key]) == str: - input_dir = input_dir.replace("%{" + key + "}", rec[key]) + # info .. + logging.info(f"{indent} input file: {input_file}") + # check .. + if not os.path.isfile(input_file): + # info .. + logging.info(f"{indent} not present yet, download ...") + # download url: + href = rec["href"] + # initialize download? + if downloader is None: + # init downloader based on url: + if href.startswith("s3://eodata/"): + # download from Copernicus DataSpace: + downloader = cso_dataspace.CSO_DataSpace_Downloader() + # + elif "s5p-pal.com" in href: + # download from PAL: + downloader = cso_pal.CSO_PAL_Downloader() + # + else: + logging.error(f"no downloader class defined for url: {href}") + raise Exception + # endif + # endif + # download ... + downloader.DownloadFile(href, input_file, dmode=dmode, indent=" ") + # store name: + downloads.append(input_file) # endif - # endfor - # full path: - input_file = os.path.join(input_dir, rec["filename"]) - # info .. - logging.info(" input file: %s" % input_file) - # check .. - if not os.path.isfile(input_file): - # info .. - logging.info(" not present yet, download ...") - # download url: - href = rec["href"] - # initialize download? - if downloader is None: - # init downloader based on url: - if href.startswith("s3://eodata/"): - # download from Copernicus DataSpace: - downloader = cso_dataspace.CSO_DataSpace_Downloader() - # - elif "s5p-pal.com" in href: - # download from PAL: - downloader = cso_pal.CSO_PAL_Downloader() - # + # download might have failed .. + if not os.path.isfile(input_file): + # write error file or raise error? + if with_error_files: + # info .. + logging.warning( + f"{indent} missing input file, write error file ..." + ) + # write error file: + with open(output_errfile, "w") as f: + f.write("missing file: %s\n" % input_file) + # endwith + # next: + continue else: - logging.error(f"no downloader class defined for url: {href}") + # info .. + logging.error(f"missing input file") + logging.error(f" {input_file}") + logging.error(f"enable creation of *error files* to not break on this") raise Exception # endif # endif - # download ... - downloader.DownloadFile(href, input_file, dmode=dmode, indent=" ") - # store name: - downloads.append(input_file) - # endif - # download might have failed .. - if not os.path.isfile(input_file): - # write error file or raise error? + # endif # local mirror or remote source file + + # info ... + logging.info(f"{indent} open ...") + # read file, sometimes files are corrupted ... + try: + sfile = S5p_File(input_file) + except: + # write error file or raise error or error? if with_error_files: # info .. - logging.warning( - f"{indent} missing input file, write error file ..." - ) + logging.warning(f"{indent} could not open, write error file ...") # write error file: with open(output_errfile, "w") as f: - f.write("missing file: %s\n" % input_file) + f.write("could not open file: %s\n" % input_file) # endwith + # cleanup? + if downloads_cleanup and (input_file in downloads): + logging.warning(f"{indent} remove downloaded %s ..." % input_file) + os.remove(input_file) + # endif # next: continue else: # info .. - logging.error(f"missing input file") - logging.error(f" {input_file}") + logging.error(f"could not open input file: {input_file}") logging.error(f"enable creation of *error files* to not break on this") raise Exception # endif - # endif + # endtry - # endif # local mirror or remote source file + # apply selections, return bool mask and list of history lines: + selected, history = sfile.SelectPixels(self.rcf, self.rcbase, indent=" ") + # count: + nselected = selected.sum() - # info ... - logging.info(f"{indent} open ...") - # read file, sometimes files are corrupted ... - try: - sfile = S5p_File(input_file) - except: - # write error file or raise error or error? - if with_error_files: + # info ... + logging.info( + f"{indent} selected %i of %i pixels" % (nselected, selected.size) + ) + # any ? + if nselected == 0: # info .. - logging.warning(f"{indent} could not open, write error file ...") - # write error file: - with open(output_errfile, "w") as f: - f.write("could not open file: %s\n" % input_file) + logging.warning(f"{indent} no pixels selected, write message file ...") + # write message file: + with open(output_msgfile, "w") as f: + f.write("no pixels selected in: %s\n" % input_file) # endwith - # cleanup? - if downloads_cleanup and (input_file in downloads): - logging.warning(f"{indent} remove downloaded %s ..." % input_file) - os.remove(input_file) - # endif - # next: - continue - else: - # info .. - logging.error(f"could not open input file: {input_file}") - logging.error(f"enable creation of *error files* to not break on this") - raise Exception - # endif - # endtry - - # apply selections, return bool mask and list of history lines: - selected, history = sfile.SelectPixels(self.rcf, self.rcbase, indent=" ") - # count: - nselected = selected.sum() - # info ... - logging.info( - f"{indent} selected %i of %i pixels" % (nselected, selected.size) - ) - # any ? - if nselected == 0: - # info .. - logging.warning(f"{indent} no pixels selected, write message file ...") - # write message file: - with open(output_msgfile, "w") as f: - f.write("no pixels selected in: %s\n" % input_file) - # endwith + else: + # info ... + logging.info(f"{indent} create %s ..." % output_filename) + # init: + csf = CSO_S5p_File() + # add: + csf.AddSelection( + sfile, selected, self.rcf, self.rcbase, indent=f"{indent} " + ) + # update history: + history.append(f"added {nselected} pixels from {os.path.basename(input_file)}") + # update attributes: + for key in ["orbit", "processing", "processor_version", "collection"]: + attrs[key] = rec[key] + # endfor + attrs["orbit_file"] = os.path.basename(input_file) + # write: + csf.Write( + filename=output_filename, + attrs=attrs, + history=history, + packed=packed, + complevel=complevel, + ) - else: - # info ... - logging.info(f"{indent} create %s ..." % output_filename) - # init: - csf = CSO_S5p_File() - # add: - csf.AddSelection( - sfile, selected, self.rcf, self.rcbase, indent=f"{indent} " - ) - # update history: - history.append(f"added {nselected} pixels from {os.path.basename(input_file)}") - # update attributes: - for key in ["orbit", "processing", "processor_version", "collection"]: - attrs[key] = rec[key] - # endfor - attrs["orbit_file"] = os.path.basename(input_file) - # write: - csf.Write( - filename=output_filename, - attrs=attrs, - history=history, - packed=packed, - complevel=complevel, - ) + ## testing ... + # logging.warning( f"{indent}break after first created file ..." ) + # break - ## testing ... - # logging.warning( f"{indent}break after first created file ..." ) - # break + # endif # any selected - # endif # any selected + # cleanup? + if downloads_cleanup and (input_file in downloads): + logging.warning(f"{indent} remove downloaded %s ..." % input_file) + os.remove(input_file) + # endif - # cleanup? - if downloads_cleanup and (input_file in downloads): - logging.warning(f"{indent} remove downloaded %s ..." % input_file) - os.remove(input_file) - # endif + # endif # create - # endif # create + # endif # irec (patches in orbit) ## testing ... # logging.warning( 'break after first orbit' ) # break - # endfor # input files + # endfor # orbits # info ... logging.info(f"{indent}") @@ -3326,7 +3349,7 @@ class CSO_S5p_Download(utopya.UtopyaRc): df.sort_values("filename", inplace=True) # info ... - logging.info(f"{indent}number of files : %i" % len(df)) + logging.info(f"{indent}number of files : {len(df)}") # list of ';' seperated selection expression: # (%{processor_version} == '020400') & (%{processing} == 'RPRO') ; ... @@ -3334,14 +3357,14 @@ class CSO_S5p_Download(utopya.UtopyaRc): # replace templates: # (xrec['processor_version'] == '020400') & (xrec['processing'] == 'RPRO') ; ... for key in df.keys(): - line = line.replace("%{" + key + "}", "xrec['" + key + "']") + line = line.replace(f"%{{{key}}}", f"xrec['{key}']") # endfor # split: selections = line.split(";") # info .. logging.info("selection criteria (first with matching orbit is used):") for selection in selections: - logging.info(" %s" % selection.strip()) + logging.info(f" {selection.strip()}") # endif # skip some? @@ -3356,7 +3379,9 @@ class CSO_S5p_Download(utopya.UtopyaRc): logging.info(f"{indent}found %i orbits with overlap of time range .." % len(xdf)) # orbit labels: - orbits = xdf["orbit"].unique() + orbits = list( xdf["orbit"].unique() ) + # sorted version: + orbits.sort() # no download initialized yet: downloader = None @@ -3366,11 +3391,14 @@ class CSO_S5p_Download(utopya.UtopyaRc): # loop over orbits: for orbit in orbits: # info ... - logging.info(indent + ' orbit "%s" ...' % orbit) + logging.info(f"{indent} orbit '{orbit}'") # search for other records for same orbit: odf = xdf[xdf["orbit"] == orbit] + # sort on processor version and production time, newest first: + odf.sort_values(by=["processor_version","production_time"],ascending=False) + # storage for status label: "selected", "blacklisted", ... filestatus = {} # no match yet .. @@ -3380,6 +3408,9 @@ class CSO_S5p_Download(utopya.UtopyaRc): for selection in selections: # make empty again: selected = [] + # latest processor version and production time: + pversion = None + ptime = None # loop over records: for indx, xrec in odf.iterrows(): # skip? @@ -3389,20 +3420,32 @@ class CSO_S5p_Download(utopya.UtopyaRc): # endif # evaluate expression including 'xrec[key]' values: if eval(selection): + # already selected a record? then check on processor version and production time: + if len(selected) > 0: + if xrec["processor_version"] < pversion: + filestatus[xrec["filename"]] = "older processor version" + continue + #endif + if xrec["production_time"] < ptime: + filestatus[xrec["filename"]] = "older production time" + continue + #endif + #endif + # store: selected.append(xrec["filename"]) filestatus[xrec["filename"]] = "selected" rec = xrec + pversion = xrec["processor_version"] + ptime = xrec["production_time"] # endif # endfor # records # exactly one? then leave: if len(selected) == 1: break elif len(selected) > 1: - logging.error( - "found more than one orbit file matching selection: %s" % selection - ) + logging.warning(f"{indent} found more than one orbit file matching selection: {selection}") for fname in selected: - logging.error(" %s" % fname) + logging.warning(f"{indent} {fname}") # endfor raise Exception # endif # number found @@ -3460,7 +3503,7 @@ class CSO_S5p_Download(utopya.UtopyaRc): # initialize download? if downloader is None: # init downloader based on url: - if "dataspace.copernicus.eu" in href: + if href.startswith("s3://eodata/"): # download from Copernicus DataSpace: downloader = cso_dataspace.CSO_DataSpace_Downloader() # @@ -3469,7 +3512,7 @@ class CSO_S5p_Download(utopya.UtopyaRc): downloader = cso_pal.CSO_PAL_Downloader() # else: - logging.error("no downloader class defined for url: {href}") + logging.error(f"no downloader class defined for url: {href}") raise Exception # endif # endif @@ -3481,8 +3524,7 @@ class CSO_S5p_Download(utopya.UtopyaRc): # download might have failed .. if not os.path.isfile(input_file): - logging.error(f"missing input file") - logging.error(f" {input_file}") + logging.error(f"missing input file: {input_file}") raise Exception # endif @@ -3925,7 +3967,7 @@ class CSO_S5p_Download_Listing(utopya.UtopyaRc): orbit, collection, processor_version, - prod_time, + production_time, ) = rest[13:].split("_") else: product_id = rest[0:10] @@ -3935,7 +3977,7 @@ class CSO_S5p_Download_Listing(utopya.UtopyaRc): orbit, collection, processor_version, - prod_time, + production_time, ) = rest[11:].split("_") # endif except: @@ -3958,13 +4000,13 @@ class CSO_S5p_Download_Listing(utopya.UtopyaRc): data["orbit"] = orbit data["collection"] = collection data["processor_version"] = processor_version - if len(prod_time) == 8: + if len(production_time) == 8: data["processing_time"] = datetime.datetime.strptime( - prod_time, "%Y%m%d" + production_time, "%Y%m%d" ) else: data["processing_time"] = datetime.datetime.strptime( - prod_time, tfmt + production_time, tfmt ) # endif # update record: