TNO Intern

Skip to content
Commits on Source (2)
......@@ -4,8 +4,8 @@
# 2023-10, Arjo Segers
# Tools to access Copernicus DataSpace.
#
# 2023-10, Arjo Segers
# Trap errors from "requests.get" during inquire.
# 2023-11, Arjo Segers
# Extended error traps.
#
########################################################################
......@@ -191,6 +191,9 @@ class CSO_DataSpace_Inquire(utopya.UtopyaRc):
# init base object:
utopya.UtopyaRc.__init__(self, rcfile=rcfile, rcbase=rcbase, env=env)
# number of seconds to wait in retry loop:
nsec_wait = 10
# url of API:
api_url = self.GetSetting("url")
# info ...
......@@ -309,8 +312,8 @@ class CSO_DataSpace_Inquire(utopya.UtopyaRc):
logging.error(f"{indent} tried {ntry} times now, exit ...")
raise Exception
else:
logging.error(f"{indent} wait ..")
time.sleep(10)
logging.error(f"{indent} wait {nsec_wait} seconds ..")
time.sleep(nsec_wait)
logging.error(f"{indent} try again ...")
ntry += 1
continue
......@@ -543,7 +546,9 @@ class CSO_DataSpace_DownloadFile(object):
"""
# modules:
import sys
import os
import time
import urllib.parse
import requests
import zipfile
......@@ -552,6 +557,9 @@ class CSO_DataSpace_DownloadFile(object):
# tools:
import cso_file
# number of seconds to wait in retry loop:
nsec_wait = 10
#
# On linux system, login/passwords for websites and ftp can be stored in "~/.netrc" file:
# ---[~/.netrc]-----------------------------------------------
......@@ -587,32 +595,54 @@ class CSO_DataSpace_DownloadFile(object):
# identity server:
domain = "identity.dataspace.copernicus.eu"
url = f"https://{domain}/auth/realms/CDSE/protocol/openid-connect/token"
try:
# send request:
r = requests.post(url, data=data)
# check status, raise error if request failed:
r.raise_for_status()
except requests.exceptions.HTTPError as err:
# info ..
msg = str(err)
logging.error(f"exception from download; message received:")
logging.error(f" {msg}")
# catch known problem ...
if msg.startswith("401 Client Error: Unauthorized for url:"):
logging.error(f"Interpretation: the (username,password) received from")
logging.error(f"your '~/.netrc' file are incorrect.")
logging.error(f"For the Copernicus DataSpace, the file should contain:")
logging.error(f" machine {p.netloc} login **** password ****")
logging.error(f"If the machine was not found, a default might have been received")
# retry loop ..
ntry = 0
while True:
# try to obtain token:
try:
# send request:
r = requests.post(url, data=data)
# check status, raise error if request failed:
r.raise_for_status()
# extract token from response:
access_token = r.json()["access_token"]
# all ok, leave try loop:
break
except requests.exceptions.HTTPError as err:
# info ..
msg = str(err)
logging.error(f"{indent} exception from token creation; message received:")
logging.error(f"{indent} {msg}")
# catch known problem ...
if msg.startswith("401 Client Error: Unauthorized for url:"):
logging.error(f"{indent} Possible causes:")
logging.error(f"{indent} * Just a random failure ...")
logging.error(f"{indent} * The (login,password) pair received from your '~/.netrc' file are incorrect.")
logging.error(f"{indent} For the Copernicus DataSpace, the file should contain:")
logging.error(f"{indent} machine {p.netloc} login **** password ****")
logging.error(f"{indent} If the machine was not found, a default might have been received.")
logging.error(f"{indent} Login received: {username}")
logging.error(f"{indent} * System maintenance? Check the Copernicus DataSpace website.")
else:
logging.error(f"{indent} Access token creation failed; server response: {r.json()}")
# endif
except:
# info ...
logging.error(f"{indent} Access token creation failed; server response: {r.json()}")
# end try
# increase counter:
ntry += 1
# switch:
if ntry == maxtry:
logging.warning(f"{indent} tried {maxtry} times; exit ...")
raise Exception
else:
raise Exception(f"Access token creation failed; server response: {r.json()}")
logging.warning(f"{indent} exception from token creation; wait {nsec_wait} seconds ...")
time.sleep(nsec_wait)
logging.warning(f"{indent} try again ...")
continue # while-loop
# endif
except:
raise Exception(f"Access token creation failed; server response: {r.json()}")
# endtry # get access token
# extract token from response:
access_token = r.json()["access_token"]
# endwhile # retry
# retry loop ..
ntry = 0
......@@ -620,107 +650,101 @@ class CSO_DataSpace_DownloadFile(object):
# try to download and save:
try:
# try to download:
try:
# fill authorization token in header:
headers = {"Authorization": f"Bearer {access_token}"}
# ensure that "~/.netrc" is ignored by passing null-authorization,
# otherwise the token in the header is overwritten by a token formed
# from the login/password in the rcfile if that is found:
r = requests.get(href, auth=NullAuth(), headers=headers, timeout=timeout)
# check status, raise error if request failed:
r.raise_for_status()
# product is a zip-file:
product_file = "product.zip"
# info ..
logging.info(f"{indent} write to {product_file} ...")
# write to temporary target first ..
tmpfile = product_file + ".tmp"
# open destination file for binary write:
with open(tmpfile, "wb") as fd:
# prefered way to write content following:
# https://docs.python-requests.org/en/master/user/quickstart/
for chunk in r.iter_content(chunk_size=128):
fd.write(chunk)
# endfor
# endwith
# rename:
os.rename(tmpfile, product_file)
# open product file:
arch = zipfile.ZipFile(product_file, mode="r")
# loop over members, probably two files in a directory:
# S5P_RPRO_L2__CH4____20200101T005246_etc/S5P_RPRO_L2__CH4____20200101T005246_etc.cdl
# S5P_RPRO_L2__CH4____20200101T005246_etc.nc
for member in arch.namelist():
# ncfile?
if member.endswith(".nc"):
# this should be the target file ..
if os.path.basename(member) != os.path.basename(output_file):
logging.error(f"member of archive file: {member}")
logging.error(f"differs from target name: {output_file}")
raise Exception
# endif
# info ..
logging.info(f"{indent} extract {member} ...")
# extract here, including leading directory:
arch.extract(member)
# info ..
logging.info(f"{indent} store ...")
# create target dir if necessary:
cso_file.CheckDir(output_file)
# move to destination:
os.rename(member, output_file)
# remove directory tree:
shutil.rmtree(os.path.dirname(member))
# only one file in package; leave loop over members
break
# fill authorization token in header:
headers = {"Authorization": f"Bearer {access_token}"}
# ensure that "~/.netrc" is ignored by passing null-authorization,
# otherwise the token in the header is overwritten by a token formed
# from the login/password in the rcfile if that is found:
r = requests.get(href, auth=NullAuth(), headers=headers, timeout=timeout)
# check status, raise error if request failed:
r.raise_for_status()
# product is a zip-file:
product_file = "product.zip"
# info ..
logging.info(f"{indent} write to {product_file} ...")
# write to temporary target first ..
tmpfile = product_file + ".tmp"
# open destination file for binary write:
with open(tmpfile, "wb") as fd:
# prefered way to write content following:
# https://docs.python-requests.org/en/master/user/quickstart/
for chunk in r.iter_content(chunk_size=128):
fd.write(chunk)
# endfor
# endwith
# rename:
os.rename(tmpfile, product_file)
# open product file:
arch = zipfile.ZipFile(product_file, mode="r")
# loop over members, probably two files in a directory:
# S5P_RPRO_L2__CH4____20200101T005246_etc/S5P_RPRO_L2__CH4____20200101T005246_etc.cdl
# S5P_RPRO_L2__CH4____20200101T005246_etc.nc
for member in arch.namelist():
# ncfile?
if member.endswith(".nc"):
# this should be the target file ..
if os.path.basename(member) != os.path.basename(output_file):
logging.error(f"member of archive file: {member}")
logging.error(f"differs from target name: {output_file}")
raise Exception
# endif
# endfor # members
# info ..
logging.info(f"{indent} remove product file ...")
# remove package:
os.remove(product_file)
except requests.exceptions.HTTPError as err:
# info ..
msg = str(err)
logging.error("exception from download; message received:")
logging.error(" %s" % msg)
# info ..
logging.info(f"{indent} extract {member} ...")
# extract here, including leading directory:
arch.extract(member)
# info ..
logging.info(f"{indent} store ...")
# create target dir if necessary:
cso_file.CheckDir(output_file)
# move to destination:
os.rename(member, output_file)
# remove directory tree:
shutil.rmtree(os.path.dirname(member))
# only one file in package; leave loop over members
break
# endif
# endfor # members
# info ..
logging.info(f"{indent} remove product file ...")
# remove package:
os.remove(product_file)
# all ok, leave retry loop:
break
except MemoryError as err:
logging.error("memory error from download; increase resources?")
# quit with error:
raise
except requests.exceptions.HTTPError as err:
# info ..
msg = str(err)
logging.error("exception from download; message received:")
logging.error(" %s" % msg)
except Exception as err:
# info ..
logging.error("from download; message received:")
logging.error(" %s" % str(err))
# quit with error:
raise
except MemoryError as err:
logging.error("memory error from download; increase resources?")
# quit with error:
raise
# endtry
except Exception as err:
# info ..
logging.error("from download; message received:")
logging.error(" %s" % str(err))
# quit with error:
raise
# error from download or save:
except:
# increase counter:
ntry += 1
# switch:
if ntry == maxtry:
logging.warning(f"{indent} tried {maxtry} times ...")
raise Exception
else:
logging.warning(f"{indent} exception from download; try again ...")
continue # while-loop
# endif
# endtry
# leave retry loop,
# either because download was ok,
# or because maximum number of retries was reached:
break
# increase counter:
ntry += 1
# switch:
if ntry == maxtry:
logging.warning(f"{indent} tried {maxtry} times; exit ...")
raise Exception
else:
logging.warning(f"{indent} exception from download; wait {nsec_wait} seconds ...")
time.sleep(nsec_wait)
logging.warning(f"{indent} try again ...")
continue # while-loop
# endif
# endwhile # retry
......
......@@ -22,7 +22,8 @@
# Fixed bug in definition of listing file dates from rcfile settings.
#
# 2023-11, Arjo Segers
# Improved check on undefined 'href' fields in inquiry listing.
# Introduced error files to inform about input files that could not be downloaded
# or are corrupted, next to message files that inform about zero selected pixels.
#
#
......@@ -2090,13 +2091,32 @@ class CSO_S5p_Convert(utopya.UtopyaRc):
! skip some input files:
<rcbase>.blacklist : S5P_PAL__L2__NO2____20190806T022006_20190806T040136_09388_01_020301_20211110T020511.nc
By default the conversion will stop if a file is corrupted or could not be downloaded.
To let the conversion firs process all files, an option is present to create a so-called *error file*.
An *error file* has the same name as the target file of the conversion, but with extension ``.err``
instead of ``.nc``. The *error file* contains a text that describes what is wrong with
the source file, for example that it cannot be opened. Enable the creation of error files with
the following flag::
! enable error files for missing or corrupted input files?
<rcbase>.create-error-files : True
If this flag is enabled, and an error file is found instead of the target file,
the conversion will simply skip this target and will not try to download the source file again.
If an input file should be converted, it is read into a :py:class:`.S5p_File` object.
The :py:meth:`SelectPixels <S5p_File.SelectPixels>` method is called to select pixels based
on critera defined in the settings; see its documentation for how to configure the pixel selection.
This method als returns a history line to desribe the selection, which will be added as
This method also returns a history line to desribe the selection, which will be added as
attribute to the output file.
If no pixels are selected, for example because an orbit is outside the target domain,
an informative message is written to a so-called *message file*.
A *message file* has the same name as the target file of the conversion, but with extension ``.msg``
instead of ``.nc``. If this file is present, the conversion will simply skip this target and will
for example not try to download the source file again.
The output file is created as an :py:class:`.CSO_S5p_File` object.
It's :py:meth:`AddSelection <.CSO_S5p_File.AddSelection>` method is called with the input object as argument,
and this will copy the selected pixels for variables specified in the settings.
......@@ -2157,7 +2177,6 @@ class CSO_S5p_Convert(utopya.UtopyaRc):
import datetime
import fnmatch
import pandas
import numpy
# tools:
import cso_file
......@@ -2261,6 +2280,9 @@ class CSO_S5p_Convert(utopya.UtopyaRc):
blacklist = self.GetSetting("blacklist", default="").split()
# select some specific files?
whitelist = self.GetSetting("whitelist", default="").split()
# write error files?
with_error_files = self.GetSetting("create-error-files",totype="bool",default=False)
# path to store download:
input_dir__template = self.GetSetting("input.dir")
......@@ -2388,12 +2410,32 @@ class CSO_S5p_Convert(utopya.UtopyaRc):
os.makedirs(dname)
# endif
# message file:
# split filename at extension:
fname, ext = os.path.splitext(output_filename)
# error file:
output_errfile = fname + ".err"
# message file:
output_msgfile = fname + ".msg"
# messsage present?
if os.path.isfile(output_msgfile):
# error file present?
if with_error_files and os.path.isfile(output_errfile):
# info ..
logging.info(" error file present:")
# read:
with open(output_errfile, "r") as f:
lines = f.readlines()
# endwith
# info..
for line in lines:
logging.info(" %s" % line.strip())
# endfor
# info ..
logging.info(" do not try to create again ...:")
# do not create ..
create = False
#
# ~ message file present?
elif os.path.isfile(output_msgfile):
# info ..
logging.info(" message file present:")
# read:
......@@ -2402,24 +2444,27 @@ class CSO_S5p_Convert(utopya.UtopyaRc):
# endwith
# info..
for line in lines:
logging.info(" %s" % line)
logging.info(" %s" % line.strip())
# endfor
# info ..
logging.info(" do not try to create again ...:")
# do not create ..
create = False
#
# ~ not present yet, create file?
elif not os.path.isfile(output_filename):
# info ...
logging.info(" create new file ...")
# always create:
create = True
#
# ~ renew?
elif renew:
# info ...
logging.info(" renew file ...")
# always create:
create = True
#
# ~ already a version present:
elif os.path.isfile(output_filename):
# there might be duplicated processings, with different processing times;
......@@ -2471,17 +2516,6 @@ class CSO_S5p_Convert(utopya.UtopyaRc):
if not os.path.isfile(input_file):
# info ..
logging.info(" not present yet, download ...")
# check ..
if "href" not in rec.keys():
logging.error(f"cannot download, no 'href' column in inquiry ...")
logging.error(f"check inquiry table: {filename}")
raise Exception
# endif
if pandas.isna(rec["href"]):
logging.error(f"cannot download, empty 'href' element in inquiry ...")
logging.error(f"check inquiry table: {filename}")
raise Exception
# endif
# download ...
cso_dataspace.CSO_DataSpace_DownloadFile(rec["href"], input_file)
# store name:
......@@ -2490,14 +2524,22 @@ class CSO_S5p_Convert(utopya.UtopyaRc):
# download might have failed ..
if not os.path.isfile(input_file):
# info ..
logging.warning(indent + " message input file, write message file ...")
# write message file:
with open(output_msgfile, "w") as f:
f.write("missing file: %s" % input_file)
# endwith
# next:
continue
# write error file or raise error?
if with_error_files:
# info ..
logging.warning(indent + " missing input file, write error file ...")
# write error file:
with open(output_errfile, "w") as f:
f.write("missing file: %s\n" % input_file)
# endwith
# next:
continue
else :
# info ..
logging.error(f"missing input file")
logging.error(f" {input_file}")
raise Exception
# endif
# endif
# info ...
......@@ -2506,21 +2548,26 @@ class CSO_S5p_Convert(utopya.UtopyaRc):
try:
sfile = S5p_File(input_file)
except:
# info ..
logging.warning(
indent + " could not open input file, write message file ..."
)
# write message file:
with open(output_msgfile, "w") as f:
f.write("could not open file: %s" % input_file)
# endwith
# cleanup?
if downloads_cleanup and (input_file in downloads):
logging.warning(indent + " remove downloaded %s ..." % input_file)
os.remove(input_file)
# endif
# next:
continue
# write error file or raise error or error?
if with_error_files :
# info ..
logging.warning(f"{indent} could not open, write error file ...")
# write error file:
with open(output_errfile, "w") as f:
f.write("could not open file: %s\n" % input_file)
# endwith
# cleanup?
if downloads_cleanup and (input_file in downloads):
logging.warning(indent + " remove downloaded %s ..." % input_file)
os.remove(input_file)
# endif
# next:
continue
else:
# info ..
logging.error(f"could not open input file: {input_file}")
raise Exception
#endif
# endtry
# apply selections, return bool mask and list of history lines:
......@@ -2539,7 +2586,7 @@ class CSO_S5p_Convert(utopya.UtopyaRc):
logging.warning(indent + " no pixels selected, write message file ...")
# write message file:
with open(output_msgfile, "w") as f:
f.write("no pixels selected in: %s" % input_file)
f.write("no pixels selected in: %s\n" % input_file)
# endwith
else:
......@@ -2549,13 +2596,11 @@ class CSO_S5p_Convert(utopya.UtopyaRc):
# init:
csf = CSO_S5p_File()
# add:
csf.AddSelection(
sfile, selected, self.rcf, self.rcbase, indent=indent + " "
)
csf.AddSelection(sfile, selected, self.rcf, self.rcbase,
indent=indent + " " )
# update history:
history.append(
"added %i pixels from %s" % (nselected, os.path.basename(input_file))
)
history.append( "added %i pixels from %s"
% (nselected, os.path.basename(input_file)) )
# update attributes:
for key in ["orbit", "processing", "processor_version", "collection"]:
attrs[key] = rec[key]
......@@ -2851,7 +2896,7 @@ class CSO_S5p_Listing(utopya.UtopyaRc):
class CSO_S5p_Download_Listing(utopya.UtopyaRc):
"""
Create *listing* file for files download from S5P data portals.
Create *listing* file for files downloaded from S5P data portals.
A *listing* file contains the names of the converted orbit files,
the time range of pixels in the file, and other information extracted from the filenames:
......