Workers API¶
CADC Certs¶
datatrail_admin.workers.cadc_certs ¶
Update cadc certificate.
main ¶
Update cadc certificate at CHIME and canfar.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/workers/cadc_certs.py
Python
def main():
"""Update cadc certificate at CHIME and canfar."""
while True:
# extend the certificate period for the next 30 days.
cert = CERTFILE
netrc = "/run/secrets/CANFAR_NETRC"
log.info(f"Renewing certificate at {SITE}.")
cmd = (
f"cadc-get-cert --days-valid 30 --cert-filename {cert} --netrc-file {netrc}"
)
result = os.system(cmd)
if result == 0:
print(f"certificate at {SITE} update success")
else:
print(f"certificate at {SITE} update failure")
if SITE == "chime":
log.info("Pushing new certificate to ARC.")
cmd = f"vcp --certfile {cert} {cert} arc:projects/chime_frb/cert/"
result = os.system(cmd)
cmd = f"vchmod --certfile {cert} g+r arc:projects/chime_frb/cert/cadcproxy.pem chime-frb-rw" # noqa: E501
result = os.system(cmd)
if result == 0:
print("certificate at CANFAR/arc update success")
else:
print("certificate at CANFAR/arc update failure")
time.sleep(3600)
Deleter¶
datatrail_admin.workers.deleter ¶
Deletion worker.
main ¶
Main function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
site |
str
|
Site name that deleter is running at. |
required |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/workers/deleter.py
perform_deletion ¶
Perform deletion.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/workers/deleter.py
Python
def perform_deletion(work: Work, test_mode=False): # noqa C901
"""Perform deletion."""
dataset_name = work.parameters["dataset_name"]
dataset_scope = work.parameters["dataset_scope"]
if not query.dataset_check_at_minoc(dataset_scope, dataset_name):
log.error(
"Failed to validate existence of {dataset_name}, {dataset_scope} at Minoc. Failing !!!" # noqa
)
work.status = "failure"
if test_mode is False:
utilities.update(work)
return
try:
dataset_path = os.path.commonpath(
[fr["file_path"] for fr in work.parameters["replica_parameters"]]
)
storage_name = work.parameters["replica_parameters"][0]["storage_name"]
exists = False
if storage_name == "chime":
log.debug("Checking if dataset exists at CHIME.")
exists = Path(dataset_path).exists()
else:
# if storage_name not chime, assume exists and deletion
exists = True
if exists:
log.debug(f"Deleting dataset from {storage_name}.")
action.delete(storage_name, dataset_path)
else:
# If file doesn't exist, no deletion to perform
log.debug(f"No replicas found at {storage_name}.")
for replica_parameters in work.parameters["replica_parameters"]:
replica_id = replica_parameters["id"]
if replica_id in work.results["delete_file"]["failure"]:
work.results["delete_file"]["failure"].remove(replica_id)
work.results["delete_file"]["success"].append(replica_id)
work.start = time.time()
log.info("Deletion successful.")
# Now update the state in the database
try:
requests.put(
SERVER + "/commit/dataset/transformation-status",
json={
"deleter": {
"dataset_name": dataset_name,
"dataset_scope": dataset_scope,
"storage_name": storage_name,
"status": "completed",
}
},
)
for replica_parameters in work.parameters["replica_parameters"]:
# Deletion state successfully updated to work was successful
replica_id = replica_parameters["id"]
if replica_id in work.results["update_database"]["failure"]:
work.results["update_database"]["failure"].remove(replica_id)
work.results["update_database"]["success"].append(replica_id)
except ConnectionError:
log.error(f"Request error while updating dataset: {dataset_name}")
for replica_parameters in work.parameters["replica_parameters"]:
replica_id = replica_parameters["id"]
if replica_id not in work.results["update_database"]["failure"]:
work.results["update_database"]["failure"].append(replica_id)
except Exception as e:
log.error(str(e))
log.error("Deletion failed.")
for replica_parameters in work.parameters["replica_parameters"]:
replica_id = replica_parameters["id"]
if replica_id not in work.results["delete_file"]["failure"]:
work.results["delete_file"]["failure"].append(replica_id)
work.parameters["number_deletion_completed"] = len(
work.results["delete_file"]["success"]
)
work.parameters["number_deletion_failed"] = len(
work.results["delete_file"]["failure"]
)
work.parameters["number_db_update_completed"] = len(
work.results["update_database"]["success"]
)
work.parameters["number_db_update_failed"] = len(
work.results["update_database"]["failure"]
)
if (
len(work.results["update_database"]["failure"]) != 0
and len(work.results["delete_file"]["failure"]) != 0
):
work.status = "failure"
else:
work.status = "success"
if test_mode is False:
utilities.update(work)
return
Processed Data Fetcher¶
Processed data fetcher.
fetch_processed_data ¶
Fetch processed data from arc.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/workers/processed_data_fetcher.py
Python
def fetch_processed_data():
"""Fetch processed data from arc."""
w = utilities.withdraw("datatrail-pull-files-from-arc")
if w is not None:
try:
action.pull_file(**w.parameters)
w.status = "success"
except Exception as e:
log.error(str(e))
w.status = "failure"
utilities.update(w)
return 1
else:
return 0
Replication Healer¶
datatrail_admin.workers.replication_healer ¶
Replication healer.
heal_staged_file_replicas ¶
Python
heal_staged_file_replicas(replica_paths: List[str], replica_ids: List[str], datasets: List[str], scopes: List[str])
Heal file replicas left as staged.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/workers/replication_healer.py
Python
def heal_staged_file_replicas(
replica_paths: List[str],
replica_ids: List[str],
datasets: List[str],
scopes: List[str],
): # noqa
"""Heal file replicas left as staged."""
# Check if at Minoc and create payload
logger.info("Checking if replicas exist at Minoc.")
try:
results, _, _ = check_staged_file_replicas_replicated(
replica_paths, replica_ids, datasets, scopes
)
except Exception as error:
logger.error(error)
logger.info("Sleeping for 30s.")
time.sleep(30)
return {}, [], []
if len(results["to_update"]) > 0:
logger.info(f"Number of replicas already at Minoc: {len(results['to_update'])}.")
# Attempt to create new replica, status code 200 or 500 expected.
for pl in results["to_update"]:
logger.info(f"Creating replica for: {pl['file_replicas'][0]['file_name']}")
try:
r = requests.post(SERVER + "/commit/file/replicas", json=pl)
# If 200 done, elif 500 new replica already exists update original.
if r.status_code not in [200, 500]:
logger.warning(f"Issue with: {pl['file_replicas'][0]['file_name']}")
continue
logger.info(
f"Updating replication state of original replica: {pl['file_replicas'][0]['file_name']}" # noqa: E501
)
requests.put(
SERVER + "/commit/file/replica/transformation-status",
json={
int(pl["original_replica_id"]): {
"transformation": "replication",
"status": "completed",
}
},
)
except ConnectionError:
logger.error(
f"Request error while updating: {pl['file_replicas'][0]['file_name']}" # noqa: E501
)
if len(results["not_at_minoc"]["replica_ids"]) > 0:
logger.info(
f"Number of replicas not at Minoc to reset: {len(results['not_at_minoc']['replica_ids'])}." # noqa: E501
)
for fr_id in results["not_at_minoc"]["replica_ids"]:
try:
requests.put(
SERVER + "/commit/file/replica/transformation-status",
json={
int(fr_id): {
"transformation": "replication",
"status": "available",
}
},
)
except ConnectionError:
logger.error(f"Request error while updating id: {fr_id}")
logger.info("Healing completed successfully!")
return {}, [], []
Replicator¶
datatrail_admin.workers.replicator ¶
Replication worker.
get_destination ¶
Get the destination file path or uri to push the file to.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/workers/replicator.py
Python
def get_destination(file_path, storage_name, replicate_to):
"""Get the destination file path or uri to push the file to."""
if storage_name == "arc":
f = file_path.replace("/arc/projects/chime_frb", "")
f = f.replace("arc:projects/chime_frb", "")
elif storage_name == "minoc":
f = file_path.replace("cadc:CHIMEFRB", "")
else:
f = file_path
if replicate_to == "minoc":
if f[0] == "/":
f = f[1:]
elif replicate_to == "arc":
f = os.path.join("arc:projects/chime_frb", f)
return f
main ¶
Main function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
site |
str
|
Site name that replicator is running at. |
required |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/workers/replicator.py
Python
def main(site: str):
"""Main function.
Args:
site (str): Site name that replicator is running at.
"""
b = buckets.Buckets()
while True:
c = cadcclient.CADCClient()
try:
c.exist("no-such-file-checking-for-connectivity", certfile=CERTFILE)
except Exception as e:
log.error(str(e))
log.error(
"Either CANFAR is DOWN or the ssl certificate for authentication has expired." # noqa
)
time.sleep(30)
continue
log.info("Looking for work...")
work = utilities.withdraw(f"datatrail-replicator-{site}-minoc")
if work is not None:
log.info("Work found!")
perform_replication(work, b)
time.sleep(0.1)
perform_replication ¶
Perform the replication.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/workers/replicator.py
Python
def perform_replication(work: Work, b: buckets.Buckets, test_mode=False): # noqa C901
"""Perform the replication."""
wo_payloads: List[Work.payload] = []
dataset_name = work.parameters["dataset_name"]
dataset_scope = work.parameters["dataset_scope"]
storage_element_captured_at = work.parameters["storage_element_captured_at"]
for replica_parameters in work.parameters["replica_parameters"]:
log.info(f"Replicating {replica_parameters['file_path']}")
replica_id = replica_parameters["id"]
storage_name = replica_parameters["storage_name"]
replicate_to = replica_parameters["replicate_to"]
file_path = replica_parameters["file_path"]
destination = get_destination(file_path, storage_name, replicate_to)
if replica_id not in work.results["copy_file"]["success"]:
# Perform replication.
try:
exists = False
if storage_name == "chime":
log.debug("Checking if file exists at CHIME.")
exists = Path(file_path).exists()
log.debug(f"File exist = {exists}.")
else:
# if storage_name not chime, assume exists and attempt replication
exists = True
if exists:
log.debug(f"Copying file from {storage_name} to {replicate_to}")
action.push_file(replicate_to, file_path, destination)
md5sum = ""
if replicate_to == "minoc":
c = cadcclient.CADCClient()
md5sum = c.info(destination, certfile=CERTFILE)[-1]
if replica_id in work.results["copy_file"]["failure"]:
work.results["copy_file"]["failure"].remove(replica_id)
work.results["copy_file"]["success"].append(replica_id)
# If work successfully replicating reset start time to keep work in
# buckets. Reset here as copying is the biggest consumer of time.
work.start = time.time()
log.info("Replication successful.")
else:
# If file doesn't exist, no replication to perform
log.debug(f"No replica found at {storage_name}.")
except Exception as e:
log.error(str(e))
md5sum = ""
if replica_id not in work.results["copy_file"]["failure"]:
work.results["copy_file"]["failure"].append(replica_id)
log.error("Replication failed.")
finally:
log.debug("Updating 'copy_file' results.")
work.parameters["number_transfers_completed"] = len(
work.results["copy_file"]["success"]
)
work.parameters["number_transfers_failed"] = len(
work.results["copy_file"]["failure"]
)
utilities.update(work)
# update the status of replication and new replica
if replica_id not in work.results["update_database"]["success"]:
status = "failed"
if replica_id in work.results["copy_file"]["success"]:
status = "completed"
try:
if exists:
log.debug("Creating new replica record.")
wo_payload = update_work_status(
replica_id,
"replication",
status,
replicate_to,
destination,
md5sum,
dataset_name,
dataset_scope,
storage_element_captured_at,
)
if test_mode is False:
utilities.bucket_deposit(b, [wo_payload])
else:
log.debug("Updating replica's deletion state.")
status = "completed"
wo_payload = update_work_status(
replica_id,
"deletion",
status,
"",
"",
"",
dataset_name,
dataset_scope,
"",
)
if test_mode is False:
utilities.bucket_deposit(b, [wo_payload])
# Deletion state successfully updated to work was successful
if replica_id in work.results["update_database"]["failure"]:
work.results["update_database"]["failure"].remove(replica_id)
work.results["update_database"]["success"].append(replica_id)
wo_payloads.append(wo_payload)
except Exception as e:
log.error(str(e))
if replica_id not in work.results["update_database"]["failure"]:
work.results["update_database"]["failure"].append(replica_id)
finally:
log.debug("Updating 'update_database' results.")
work.parameters["number_db_update_completed"] = len(
work.results["update_database"]["success"]
)
work.parameters["number_db_update_failed"] = len(
work.results["update_database"]["failure"]
)
utilities.update(work)
if (
len(work.results["update_database"]["failure"]) != 0
and len(work.results["copy_file"]["failure"]) != 0
):
work.status = "failure"
else:
work.status = "success"
if test_mode is False:
utilities.update(work)
return wo_payloads
update_work_status ¶
Python
update_work_status(replica_id: int, transformation: str, status: str, storage_element: str, file_path: str, md5sum: str, dataset_name: str, dataset_scope: str, storage_element_captured_at: str) -> List[Dict[Any, Any]]
Update the status of work.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/workers/replicator.py
Python
def update_work_status(
replica_id: int,
transformation: str,
status: str,
storage_element: str,
file_path: str,
md5sum: str,
dataset_name: str,
dataset_scope: str,
storage_element_captured_at: str,
) -> List[Dict[Any, Any]]:
"""Update the status of work."""
params: Dict[str, Any] = {}
if transformation == "replication":
params["new_replica"] = {
"storage_name": storage_element,
"replica_id": replica_id,
"md5sum": md5sum,
"file_path": file_path,
"status": status,
}
params["current_replica"] = {
replica_id: {"transformation": transformation, "status": status} # type: ignore
}
params["dataset_name"] = dataset_name
params["dataset_scope"] = dataset_scope
params["storage_element_captured_at"] = storage_element_captured_at
wo = Work(
pipeline="datatrail-state-updater",
user="datatrail",
site="chime",
parameters={"replicator": params},
config={"archive": {"results": False}},
)
return wo.payload