Skip to content

Workers API

CADC Certs

datatrail_admin.workers.cadc_certs

Update cadc certificate.

main

Python
main()

Update cadc certificate at CHIME and canfar.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/workers/cadc_certs.py
Python
def main():
    """Update cadc certificate at CHIME and canfar."""
    while True:
        # extend the certificate period for the next 30 days.
        cert = CERTFILE
        netrc = "/run/secrets/CANFAR_NETRC"
        log.info(f"Renewing certificate at {SITE}.")
        cmd = (
            f"cadc-get-cert --days-valid 30 --cert-filename {cert} --netrc-file {netrc}"
        )
        result = os.system(cmd)
        if result == 0:
            print(f"certificate at {SITE} update success")
        else:
            print(f"certificate at {SITE} update failure")
        if SITE == "chime":
            log.info("Pushing new certificate to ARC.")
            cmd = f"vcp --certfile {cert} {cert} arc:projects/chime_frb/cert/"
            result = os.system(cmd)
            cmd = f"vchmod --certfile {cert} g+r arc:projects/chime_frb/cert/cadcproxy.pem chime-frb-rw"  # noqa: E501
            result = os.system(cmd)
            if result == 0:
                print("certificate at CANFAR/arc update success")
            else:
                print("certificate at CANFAR/arc update failure")
        time.sleep(3600)

Deleter

datatrail_admin.workers.deleter

Deletion worker.

main

Python
main(site: str)

Main function.

Parameters:

Name Type Description Default
site str

Site name that deleter is running at.

required
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/workers/deleter.py
Python
def main(site: str):
    """Main function.

    Args:
        site (str): Site name that deleter is running at.
    """
    while True:
        work = utilities.withdraw(f"datatrail-deleter-{site}")
        if work is not None:
            perform_deletion(work)
        time.sleep(0.1)

perform_deletion

Python
perform_deletion(work: Work, test_mode: Work = False)

Perform deletion.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/workers/deleter.py
Python
def perform_deletion(work: Work, test_mode=False):  # noqa C901
    """Perform deletion."""
    dataset_name = work.parameters["dataset_name"]
    dataset_scope = work.parameters["dataset_scope"]
    if not query.dataset_check_at_minoc(dataset_scope, dataset_name):
        log.error(
            "Failed to validate existence of {dataset_name}, {dataset_scope} at Minoc. Failing !!!"  # noqa
        )
        work.status = "failure"
        if test_mode is False:
            utilities.update(work)
        return
    try:
        dataset_path = os.path.commonpath(
            [fr["file_path"] for fr in work.parameters["replica_parameters"]]
        )
        storage_name = work.parameters["replica_parameters"][0]["storage_name"]
        exists = False
        if storage_name == "chime":
            log.debug("Checking if dataset exists at CHIME.")
            exists = Path(dataset_path).exists()
        else:
            # if storage_name not chime, assume exists and deletion
            exists = True
        if exists:
            log.debug(f"Deleting dataset from {storage_name}.")
            action.delete(storage_name, dataset_path)
        else:
            # If file doesn't exist, no deletion to perform
            log.debug(f"No replicas found at {storage_name}.")
        for replica_parameters in work.parameters["replica_parameters"]:
            replica_id = replica_parameters["id"]
            if replica_id in work.results["delete_file"]["failure"]:
                work.results["delete_file"]["failure"].remove(replica_id)
            work.results["delete_file"]["success"].append(replica_id)
            work.start = time.time()
        log.info("Deletion successful.")

        # Now update the state in the database
        try:
            requests.put(
                SERVER + "/commit/dataset/transformation-status",
                json={
                    "deleter": {
                        "dataset_name": dataset_name,
                        "dataset_scope": dataset_scope,
                        "storage_name": storage_name,
                        "status": "completed",
                    }
                },
            )
            for replica_parameters in work.parameters["replica_parameters"]:
                # Deletion state successfully updated to work was successful
                replica_id = replica_parameters["id"]
                if replica_id in work.results["update_database"]["failure"]:
                    work.results["update_database"]["failure"].remove(replica_id)
                work.results["update_database"]["success"].append(replica_id)
        except ConnectionError:
            log.error(f"Request error while updating dataset: {dataset_name}")
            for replica_parameters in work.parameters["replica_parameters"]:
                replica_id = replica_parameters["id"]
                if replica_id not in work.results["update_database"]["failure"]:
                    work.results["update_database"]["failure"].append(replica_id)
    except Exception as e:
        log.error(str(e))
        log.error("Deletion failed.")
        for replica_parameters in work.parameters["replica_parameters"]:
            replica_id = replica_parameters["id"]
            if replica_id not in work.results["delete_file"]["failure"]:
                work.results["delete_file"]["failure"].append(replica_id)

    work.parameters["number_deletion_completed"] = len(
        work.results["delete_file"]["success"]
    )
    work.parameters["number_deletion_failed"] = len(
        work.results["delete_file"]["failure"]
    )
    work.parameters["number_db_update_completed"] = len(
        work.results["update_database"]["success"]
    )
    work.parameters["number_db_update_failed"] = len(
        work.results["update_database"]["failure"]
    )
    if (
        len(work.results["update_database"]["failure"]) != 0
        and len(work.results["delete_file"]["failure"]) != 0
    ):
        work.status = "failure"
    else:
        work.status = "success"
    if test_mode is False:
        utilities.update(work)
    return

Processed Data Fetcher

Processed data fetcher.

fetch_processed_data

Python
fetch_processed_data()

Fetch processed data from arc.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/workers/processed_data_fetcher.py
Python
def fetch_processed_data():
    """Fetch processed data from arc."""
    w = utilities.withdraw("datatrail-pull-files-from-arc")
    if w is not None:
        try:
            action.pull_file(**w.parameters)
            w.status = "success"
        except Exception as e:
            log.error(str(e))
            w.status = "failure"
        utilities.update(w)
        return 1
    else:
        return 0

Replication Healer

datatrail_admin.workers.replication_healer

Replication healer.

heal_staged_file_replicas

Python
heal_staged_file_replicas(replica_paths: List[str], replica_ids: List[str], datasets: List[str], scopes: List[str])

Heal file replicas left as staged.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/workers/replication_healer.py
Python
def heal_staged_file_replicas(
    replica_paths: List[str],
    replica_ids: List[str],
    datasets: List[str],
    scopes: List[str],
):  # noqa
    """Heal file replicas left as staged."""
    # Check if at Minoc and create payload
    logger.info("Checking if replicas exist at Minoc.")
    try:
        results, _, _ = check_staged_file_replicas_replicated(
            replica_paths, replica_ids, datasets, scopes
        )
    except Exception as error:
        logger.error(error)
        logger.info("Sleeping for 30s.")
        time.sleep(30)
        return {}, [], []

    if len(results["to_update"]) > 0:
        logger.info(f"Number of replicas already at Minoc: {len(results['to_update'])}.")
        # Attempt to create new replica, status code 200 or 500 expected.
        for pl in results["to_update"]:
            logger.info(f"Creating replica for: {pl['file_replicas'][0]['file_name']}")
            try:
                r = requests.post(SERVER + "/commit/file/replicas", json=pl)

                # If 200 done, elif 500 new replica already exists update original.
                if r.status_code not in [200, 500]:
                    logger.warning(f"Issue with: {pl['file_replicas'][0]['file_name']}")
                    continue

                logger.info(
                    f"Updating replication state of original replica: {pl['file_replicas'][0]['file_name']}"  # noqa: E501
                )

                requests.put(
                    SERVER + "/commit/file/replica/transformation-status",
                    json={
                        int(pl["original_replica_id"]): {
                            "transformation": "replication",
                            "status": "completed",
                        }
                    },
                )
            except ConnectionError:
                logger.error(
                    f"Request error while updating: {pl['file_replicas'][0]['file_name']}"  # noqa: E501
                )

    if len(results["not_at_minoc"]["replica_ids"]) > 0:
        logger.info(
            f"Number of replicas not at Minoc to reset: {len(results['not_at_minoc']['replica_ids'])}."  # noqa: E501
        )
        for fr_id in results["not_at_minoc"]["replica_ids"]:
            try:
                requests.put(
                    SERVER + "/commit/file/replica/transformation-status",
                    json={
                        int(fr_id): {
                            "transformation": "replication",
                            "status": "available",
                        }
                    },
                )
            except ConnectionError:
                logger.error(f"Request error while updating id: {fr_id}")

    logger.info("Healing completed successfully!")
    return {}, [], []

Replicator

datatrail_admin.workers.replicator

Replication worker.

get_destination

Python
get_destination(file_path, storage_name, replicate_to)

Get the destination file path or uri to push the file to.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/workers/replicator.py
Python
def get_destination(file_path, storage_name, replicate_to):
    """Get the destination file path or uri to push the file to."""
    if storage_name == "arc":
        f = file_path.replace("/arc/projects/chime_frb", "")
        f = f.replace("arc:projects/chime_frb", "")
    elif storage_name == "minoc":
        f = file_path.replace("cadc:CHIMEFRB", "")
    else:
        f = file_path

    if replicate_to == "minoc":
        if f[0] == "/":
            f = f[1:]
    elif replicate_to == "arc":
        f = os.path.join("arc:projects/chime_frb", f)
    return f

main

Python
main(site: str)

Main function.

Parameters:

Name Type Description Default
site str

Site name that replicator is running at.

required
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/workers/replicator.py
Python
def main(site: str):
    """Main function.

    Args:
        site (str): Site name that replicator is running at.
    """
    b = buckets.Buckets()
    while True:
        c = cadcclient.CADCClient()
        try:
            c.exist("no-such-file-checking-for-connectivity", certfile=CERTFILE)
        except Exception as e:
            log.error(str(e))
            log.error(
                "Either CANFAR is DOWN or the ssl certificate for authentication has expired."  # noqa
            )
            time.sleep(30)
            continue
        log.info("Looking for work...")
        work = utilities.withdraw(f"datatrail-replicator-{site}-minoc")
        if work is not None:
            log.info("Work found!")
            perform_replication(work, b)
        time.sleep(0.1)

perform_replication

Python
perform_replication(work: Work, b: buckets.Buckets, test_mode: buckets.Buckets = False)

Perform the replication.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/workers/replicator.py
Python
def perform_replication(work: Work, b: buckets.Buckets, test_mode=False):  # noqa C901
    """Perform the replication."""
    wo_payloads: List[Work.payload] = []
    dataset_name = work.parameters["dataset_name"]
    dataset_scope = work.parameters["dataset_scope"]
    storage_element_captured_at = work.parameters["storage_element_captured_at"]
    for replica_parameters in work.parameters["replica_parameters"]:
        log.info(f"Replicating {replica_parameters['file_path']}")
        replica_id = replica_parameters["id"]
        storage_name = replica_parameters["storage_name"]
        replicate_to = replica_parameters["replicate_to"]
        file_path = replica_parameters["file_path"]
        destination = get_destination(file_path, storage_name, replicate_to)
        if replica_id not in work.results["copy_file"]["success"]:
            # Perform replication.
            try:
                exists = False
                if storage_name == "chime":
                    log.debug("Checking if file exists at CHIME.")
                    exists = Path(file_path).exists()
                    log.debug(f"File exist = {exists}.")
                else:
                    # if storage_name not chime, assume exists and attempt replication
                    exists = True
                if exists:
                    log.debug(f"Copying file from {storage_name} to {replicate_to}")
                    action.push_file(replicate_to, file_path, destination)
                    md5sum = ""
                    if replicate_to == "minoc":
                        c = cadcclient.CADCClient()
                        md5sum = c.info(destination, certfile=CERTFILE)[-1]
                    if replica_id in work.results["copy_file"]["failure"]:
                        work.results["copy_file"]["failure"].remove(replica_id)
                    work.results["copy_file"]["success"].append(replica_id)
                    # If work successfully replicating reset start time to keep work in
                    # buckets. Reset here as copying is the biggest consumer of time.
                    work.start = time.time()
                    log.info("Replication successful.")
                else:
                    # If file doesn't exist, no replication to perform
                    log.debug(f"No replica found at {storage_name}.")
            except Exception as e:
                log.error(str(e))
                md5sum = ""
                if replica_id not in work.results["copy_file"]["failure"]:
                    work.results["copy_file"]["failure"].append(replica_id)
                log.error("Replication failed.")
            finally:
                log.debug("Updating 'copy_file' results.")
                work.parameters["number_transfers_completed"] = len(
                    work.results["copy_file"]["success"]
                )
                work.parameters["number_transfers_failed"] = len(
                    work.results["copy_file"]["failure"]
                )
                utilities.update(work)

        # update the status of replication and new replica
        if replica_id not in work.results["update_database"]["success"]:
            status = "failed"
            if replica_id in work.results["copy_file"]["success"]:
                status = "completed"
            try:
                if exists:
                    log.debug("Creating new replica record.")
                    wo_payload = update_work_status(
                        replica_id,
                        "replication",
                        status,
                        replicate_to,
                        destination,
                        md5sum,
                        dataset_name,
                        dataset_scope,
                        storage_element_captured_at,
                    )
                    if test_mode is False:
                        utilities.bucket_deposit(b, [wo_payload])
                else:
                    log.debug("Updating replica's deletion state.")
                    status = "completed"
                    wo_payload = update_work_status(
                        replica_id,
                        "deletion",
                        status,
                        "",
                        "",
                        "",
                        dataset_name,
                        dataset_scope,
                        "",
                    )
                    if test_mode is False:
                        utilities.bucket_deposit(b, [wo_payload])
                    # Deletion state successfully updated to work was successful
                if replica_id in work.results["update_database"]["failure"]:
                    work.results["update_database"]["failure"].remove(replica_id)
                work.results["update_database"]["success"].append(replica_id)
                wo_payloads.append(wo_payload)
            except Exception as e:
                log.error(str(e))
                if replica_id not in work.results["update_database"]["failure"]:
                    work.results["update_database"]["failure"].append(replica_id)
            finally:
                log.debug("Updating 'update_database' results.")
                work.parameters["number_db_update_completed"] = len(
                    work.results["update_database"]["success"]
                )
                work.parameters["number_db_update_failed"] = len(
                    work.results["update_database"]["failure"]
                )
                utilities.update(work)
    if (
        len(work.results["update_database"]["failure"]) != 0
        and len(work.results["copy_file"]["failure"]) != 0
    ):
        work.status = "failure"
    else:
        work.status = "success"
    if test_mode is False:
        utilities.update(work)
    return wo_payloads

update_work_status

Python
update_work_status(replica_id: int, transformation: str, status: str, storage_element: str, file_path: str, md5sum: str, dataset_name: str, dataset_scope: str, storage_element_captured_at: str) -> List[Dict[Any, Any]]

Update the status of work.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/workers/replicator.py
Python
def update_work_status(
    replica_id: int,
    transformation: str,
    status: str,
    storage_element: str,
    file_path: str,
    md5sum: str,
    dataset_name: str,
    dataset_scope: str,
    storage_element_captured_at: str,
) -> List[Dict[Any, Any]]:
    """Update the status of work."""
    params: Dict[str, Any] = {}
    if transformation == "replication":
        params["new_replica"] = {
            "storage_name": storage_element,
            "replica_id": replica_id,
            "md5sum": md5sum,
            "file_path": file_path,
            "status": status,
        }
    params["current_replica"] = {
        replica_id: {"transformation": transformation, "status": status}  # type: ignore
    }
    params["dataset_name"] = dataset_name
    params["dataset_scope"] = dataset_scope
    params["storage_element_captured_at"] = storage_element_captured_at
    wo = Work(
        pipeline="datatrail-state-updater",
        user="datatrail",
        site="chime",
        parameters={"replicator": params},
        config={"archive": {"results": False}},
    )
    return wo.payload