Skip to content

commit

datatrail.commit.commit_api

Implement the following REST endpoints.

  1. *Create a dataset.
  2. *Register files to a dataset.
  3. *Update transformation status of file replicas.
  4. Fetch active replications.
  5. Fetch upcoming replications.
  6. Fetch datasets.
  7. Fetch content of a dataset.

add_file_replicas async

Python
add_file_replicas(session: Union[AsyncSession, sessionmaker], payload: Dict[str, Any], test_mode: bool = False) -> Union[bool, str]

Add file replicas.

Source code in datatrail/commit/commit_api.py
Python
async def add_file_replicas(
    session: Union[AsyncSession, sessionmaker],
    payload: Dict[str, Any],
    test_mode: bool = False,
) -> Union[bool, str]:
    """Add file replicas."""
    try:
        storage_map = db_queries.get_storage_map(session)
        # TODO Check if files already exist for this dataset.
        replicas = []
        for fr in payload["file_replicas"]:
            r = common.FileReplica(**fr).dict()
            r["deletion_state"] = models.TransformationState.available
            r["deletion_state_updated_at"] = r["date_created"]
            r["replication_state"] = models.TransformationState.available
            r["replication_state_updated_at"] = r["date_created"]
            replicas.append(r)
        status = db_queries.bulk_insert_file_replicas(
            session,
            replicas,
            payload["storage_name"],
            storage_map[payload["storage_element_captured_at"]],
        )

        # identify number of replicas for each file to increament
        replica_map: Dict[str, int] = {}
        for fr in replicas:
            if fr["file_name"] in replica_map:
                replica_map[fr["file_name"]] += 1
            else:
                replica_map[fr["file_name"]] = 1
        names = [(file_name,) for file_name in replica_map]
        files = db_queries.fetch_files(
            session, names, storage_map[payload["storage_element_captured_at"]]
        )
        for f in files:
            f.num_available_replicas = len(
                [
                    fr
                    for fr in f.file_replicas
                    if fr.deletion_state != models.TransformationState.completed
                ]
            )
        session.commit()

        # Now apply the policy to the file replicas
        ds = db_queries.fetch_dataset(
            session, payload["dataset_name"], payload["dataset_scope"]
        )
        rp, dp = policy_evaluator.determine_policies(session, ds, storage_map)
        for f in files:
            file_replicas = [i for i in f.file_replicas]
            replica_locations = policy_evaluator.get_replica_locations(
                session, f, storage_map
            )
            if len(replicas):
                policy_evaluator.apply_policy_to_dataset_file_replicas(
                    file_replicas,
                    replica_locations,
                    deepcopy(rp),
                    deepcopy(dp),
                    storage_map,
                )
        session.commit()
        return status
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        log.error(f"Payload was: {payload}")
        return str(e)

attach_datasets async

Python
attach_datasets(session: Union[AsyncSession, sessionmaker], payload: Union[common.DeletionPayload, common.ReplicationPayload], test_mode: bool = False) -> Union[bool, str]

Attach two datasets.

Source code in datatrail/commit/commit_api.py
Python
async def attach_datasets(
    session: Union[AsyncSession, sessionmaker],
    payload: Union[common.DeletionPayload, common.ReplicationPayload],
    test_mode: bool = False,
) -> Union[bool, str]:
    """Attach two datasets."""
    try:
        dis = [datasets.DatasetIdentifier(**bt) for bt in payload["belongs_to"]]
        ad = datasets.AttachDataset(
            name=payload["name"], scope=payload["scope"], belongs_to=dis
        )
        ad = ad.dict()
        # Verify that both the datasets exist before attaching.
        ds = db_queries.fetch_dataset(session, payload["name"], payload["scope"])
        if ds is None:
            #  dataset test-scope, 9386707 not found
            raise DatasetNotFoundError(payload["name"], payload["scope"])
        for bt in payload["belongs_to"]:
            belongs_to_ds = db_queries.fetch_dataset(session, bt["name"], bt["scope"])
            if belongs_to_ds is None:
                raise DatasetNotFoundError(bt["name"], bt["scope"])
            # Before attaching datasets see if it is already attached
            # Attaching datasets is a long running operation since policies have to be
            # passed on to all the file replicas.
            # Avoid reattaching datasets already attached.
            for larger_dataset in ds.belongs_to:
                if (
                    larger_dataset.name == bt["name"]
                    and larger_dataset.scope == bt["scope"]
                ):
                    # If already attached then no need to attach again. Return True.
                    return True

        # Now attach datasets
        status = db_queries.attach_datasets(session, **ad)

        ds = db_queries.fetch_dataset(session, payload["name"], payload["scope"])
        container: List[Any] = []
        db_queries.explore_children_datasets(ds, container)
        storage_map = db_queries.get_storage_map(session)
        for ds in container:
            if len(ds.files) > 0:
                rp, dp = policy_evaluator.determine_policies(session, ds, storage_map)
                policy_evaluator.apply_policy_to_dataset(
                    session, ds, rp, dp, storage_map
                )
        session.commit()
        return status
    except DatasetNotFoundError as error:
        session.rollback()
        log.warning(error.message)
        return error.message
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error("Something errored out while attaching elements")
        log.error(str(e))
        return str(e)

create_dataset async

Python
create_dataset(session: Union[AsyncSession, sessionmaker], payload: Union[common.DeletionPayload, common.ReplicationPayload], test_mode: bool = False) -> Union[bool, str]

Create a dataset.

Source code in datatrail/commit/commit_api.py
Python
async def create_dataset(
    session: Union[AsyncSession, sessionmaker],
    payload: Union[common.DeletionPayload, common.ReplicationPayload],
    test_mode: bool = False,
) -> Union[bool, str]:
    """Create a dataset."""
    try:
        conf = config.fetch_config()
        storage_elements = await get_storage_elements(session)
        if isinstance(storage_elements, str):
            return False
        ds = db_queries.fetch_dataset(session, payload["name"], payload["scope"])
        if ds:
            return True
        if "replication_policy" in payload:
            rp = datasets.ReplicationPolicy(**payload["replication_policy"])
            payload["replication_policy"] = rp
            replication_default = False
        else:
            payload["replication_policy"] = conf["default_replication_policy"]
            replication_default = True
        if "deletion_policy" in payload:
            dp: Any = [datasets.DeletionPolicy(**p) for p in payload["deletion_policy"]]
            # Add default policies for other sites depending on their type.
            sites = [d.storage_element for d in dp]
            for s in storage_elements:
                if s["name"] in sites:
                    continue
                policy = {
                    "storage_element": s["name"],
                    "delete_after_days": conf["default_deletion_policy"][
                        s["storage_type"]
                    ]["delete_after_days"],
                    "priority": conf["default_deletion_policy"][s["storage_type"]][
                        "priority"
                    ],
                }
                dp.append(datasets.DeletionPolicy(**policy))
            payload["deletion_policy"] = dp
            deletion_default = False
        else:
            # Add default policies for all sites depending on their type.
            ddp = []
            for s in storage_elements:
                dp = {
                    "storage_element": s["name"],
                    "delete_after_days": conf["default_deletion_policy"][
                        s["storage_type"]
                    ]["delete_after_days"],
                    "priority": conf["default_deletion_policy"][s["storage_type"]][
                        "priority"
                    ],
                }
                ddp.append(datasets.DeletionPolicy(**dp))
            payload["deletion_policy"] = ddp
            deletion_default = True
        ds = datasets.CreateDataset(**payload)
        payload = ds.dict()
        payload["replication_policy"]["default"] = replication_default
        for dp in payload["deletion_policy"]:
            dp["default"] = deletion_default
        vp = validate_policies(payload["replication_policy"], payload["deletion_policy"])
        if vp is not True:
            return vp
        status = db_queries.insert_dataset(session, **payload)
        return status
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

create_storage_element async

Python
create_storage_element(session: Union[AsyncSession, sessionmaker], payload: Union[common.DeletionPayload, common.ReplicationPayload]) -> Union[bool, str]

Create a storage element.

Source code in datatrail/commit/commit_api.py
Python
async def create_storage_element(
    session: Union[AsyncSession, sessionmaker],
    payload: Union[common.DeletionPayload, common.ReplicationPayload],
) -> Union[bool, str]:
    """Create a storage element."""
    try:
        se = db_queries.fetch_storage_element(session, payload["name"])
        if se:
            return True
        cse = storage_element.CreateStorageElement(**payload)
        payload = cse.dict()
        status = db_queries.insert_storage_element(session, **payload)
        return status
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

detach_datasets async

Python
detach_datasets(session: Union[AsyncSession, sessionmaker], payload: Union[common.DeletionPayload, common.ReplicationPayload], test_mode: bool = False) -> Union[bool, str]

Detach two datasets.

Source code in datatrail/commit/commit_api.py
Python
async def detach_datasets(
    session: Union[AsyncSession, sessionmaker],
    payload: Union[common.DeletionPayload, common.ReplicationPayload],
    test_mode: bool = False,
) -> Union[bool, str]:
    """Detach two datasets."""
    try:
        dis = [datasets.DatasetIdentifier(**bt) for bt in payload["detach_from"]]
        ad = datasets.DetachDataset(
            name=payload["name"], scope=payload["scope"], detach_from=dis
        )
        ad = ad.dict()
        status = db_queries.detach_datasets(session, **ad)
        ds = db_queries.fetch_dataset(session, payload["name"], payload["scope"])
        container: List[Any] = []
        db_queries.explore_children_datasets(ds, container)
        storage_map = db_queries.get_storage_map(session)
        for ds in container:
            if len(ds.files) > 0:
                rp, dp = policy_evaluator.determine_policies(session, ds, storage_map)
                policy_evaluator.apply_policy_to_dataset(
                    session, ds, rp, dp, storage_map
                )
        session.commit()
        return status
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

register_files async

Python
register_files(session: Union[AsyncSession, sessionmaker], payload: Union[common.DeletionPayload, common.ReplicationPayload], test_mode: bool = False) -> Union[bool, str]

Register files to a dataset.

Source code in datatrail/commit/commit_api.py
Python
async def register_files(
    session: Union[AsyncSession, sessionmaker],
    payload: Union[common.DeletionPayload, common.ReplicationPayload],
    test_mode: bool = False,
) -> Union[bool, str]:
    """Register files to a dataset."""
    try:
        # If nothing to register
        if payload["files"] == []:
            return True
        storage_map = db_queries.get_storage_map(session)
        # Skip if files already exist for this dataset.
        conf = config.fetch_config()
        ds = db_queries.fetch_dataset(
            session, payload["dataset_name"], payload["dataset_scope"]
        )
        if ds.files:
            ds_file_names = [f.name for f in ds.files]
            fs_file_names = [f["name"] for f in payload["files"]]

            if set(fs_file_names) <= set(ds_file_names):
                return True
        for f in payload["files"]:
            # Check that name is path minus root directory.
            assert f["name"].startswith(
                "data/"
            ), "File names must start with data/, ie have site-specific root removed."
        # Continue otherwise.
        fs = [common.File(**f) for f in payload["files"]]

        pl = files.AttachFilesToDataset(
            dataset_name=payload["dataset_name"],
            dataset_scope=payload["dataset_scope"],
            files=fs,
        )
        pl = pl.dict()
        for f in pl["files"]:
            f["preferred_storage_elements"] = conf["default_replication_policy"][
                "preferred_storage_elements"
            ]
            f["storage_element_captured_at"] = storage_map[
                f["storage_element_captured_at"]
            ]
        status = db_queries.bulk_insert_files(session, **pl)
        session.commit()
        ds = db_queries.fetch_dataset(
            session, payload["dataset_name"], payload["dataset_scope"]
        )
        rp, dp = policy_evaluator.determine_policies(session, ds, storage_map)
        policy_evaluator.apply_policy_to_dataset(session, ds, rp, dp, storage_map)
        session.commit()
        return status
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

scout_sync_storage_element async

Python
scout_sync_storage_element(session, name: str, scope: str, data: Dict[str, str], replicate_to: str) -> None

Scout sync creates replicas for existing files missing in Datatrail.

Parameters:

Name Type Description Default
session PSQL Session

Postgres session.

required
name str

Dataset name.

required
scope str

Dataset scope.

required
data Dict[str, str]

Dictionary containing the file name and md5sum at se.

required

Raises:

Type Description
DatasetNotFoundError

If the dataset is not found in the database.

Returns:

Name Type Description
None None

None.

Source code in datatrail/commit/commit_api.py
Python
async def scout_sync_storage_element(
    session, name: str, scope: str, data: Dict[str, str], replicate_to: str
) -> None:
    """Scout sync creates replicas for existing files missing in Datatrail.

    Args:
        session (PSQL Session): Postgres session.
        name (str): Dataset name.
        scope (str): Dataset scope.
        data (Dict[str, str]): Dictionary containing the file name and md5sum at se.

    Raises:
        DatasetNotFoundError: If the dataset is not found in the database.

    Returns:
        None: None.
    """
    storage_map = db_queries.get_storage_map(session)

    ds = session.query(models.Dataset).filter_by(name=name, scope=scope).first()
    if ds is None:
        raise DatasetNotFoundError(name, scope)

    for f in ds.files:
        assert (
            replicate_to in f.preferred_storage_elements
        ), f"{replicate_to} not in preferred storage elements"
        frs: Dict[int, models.FileReplica] = {}
        for fr in f.file_replicas:
            frs[fr.storage_id] = fr
        # Check if the storage element's replica is missing
        if storage_map[replicate_to] not in frs.keys():
            # Find the most recent replica
            latest_fr: Optional[models.FileReplica] = None
            for fr in frs.values():
                if latest_fr is None or latest_fr.date_created < fr.date_created:
                    latest_fr = fr
            if latest_fr is None:
                log.warning(f"No replicas found for {f.name}")
                continue

            # Check that file is in storage element's md5sum data
            se_md5 = data.get(f.name)
            if se_md5 is None:
                log.warning(f"MD5 not found for {f.name}, file not at storage element")
                continue

            # Create a new replica for the file at se
            payload = {
                "replicator": [
                    {
                        "new_replica": {
                            "file_path": f.name,
                            "md5sum": se_md5,
                            "replica_id": latest_fr.id,
                            "status": "completed",
                            "storage_name": replicate_to,
                        },
                        "current_replica": {
                            str(latest_fr.id): {
                                "status": "completed",
                                "transformation": "replication",
                            },
                        },
                        "dataset_name": name,
                        "dataset_scope": scope,
                        "storage_element_captured_at": storage_map[
                            f.storage_element_captured_at
                        ],
                    }
                ]
            }
            log.info(payload)
            await update_status(
                session,
                Work(
                    pipeline="update",
                    user="datatrail",
                    site="local",
                    parameters=payload,
                ),
            )
    return None

toggle_storage_element_state async

Python
toggle_storage_element_state(session: Union[AsyncSession, sessionmaker], name: str) -> Union[bool, str]

Toggle state of storage element.

Source code in datatrail/commit/commit_api.py
Python
async def toggle_storage_element_state(
    session: Union[AsyncSession, sessionmaker], name: str
) -> Union[bool, str]:
    """Toggle state of storage element."""
    try:
        status = db_queries.toggle_storage_element_state(session, name)
        return status
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

update_deletion_policy async

Python
update_deletion_policy(session: Union[AsyncSession, sessionmaker], scope: str, name: str, deletion_policy: List[Any], test_mode: bool = False, propagate_policy: bool = True) -> Union[bool, str]

Update the deletion policy of a dataset.

Source code in datatrail/commit/commit_api.py
Python
async def update_deletion_policy(
    session: Union[AsyncSession, sessionmaker],
    scope: str,
    name: str,
    deletion_policy: List[Any],
    test_mode: bool = False,
    propagate_policy: bool = True,
) -> Union[bool, str]:
    """Update the deletion policy of a dataset."""
    try:
        udpList = []
        for dp in deletion_policy:
            p = datasets.DeletionPolicy(**dp)
            udpList.append(p.dict())
        dataset = await get_dataset(session, scope, name, test_mode=test_mode)
        if isinstance(dataset, str):
            return "Could not find dataset"
        vp = validate_policies(dataset["replication_policy"], udpList)
        if vp is not True:
            return vp
        for p in udpList:
            p["default"] = False
        status = db_queries.update_deletion_policy(session, scope, name, udpList)
        if propagate_policy:
            this_dataset = db_queries.fetch_dataset(session, name, scope)
            container: List[Any] = []
            db_queries.explore_children_datasets(this_dataset, container)
            storage_map = db_queries.get_storage_map(session)
            for ds in container:
                if len(ds.files) > 0:
                    rp_ds, dp_ds = policy_evaluator.determine_policies(
                        session, ds, storage_map
                    )
                    policy_evaluator.apply_policy_to_dataset(
                        session, ds, rp_ds, dp_ds, storage_map
                    )
        session.commit()
        return status
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

update_deletion_state async

Python
update_deletion_state(replica: models.FileReplica, deletion_state: str) -> models.FileReplica

Update the deletion state and time of the replica.

Source code in datatrail/commit/commit_api.py
Python
async def update_deletion_state(
    replica: models.FileReplica,
    deletion_state: str,
) -> models.FileReplica:
    """Update the deletion state and time of the replica."""
    replica.deletion_state = models.TransformationState[deletion_state]
    replica.deletion_state_updated_at = pytz.utc.localize(datetime.datetime.utcnow())
    return replica

update_deletion_status_full_dataset async

Python
update_deletion_status_full_dataset(session: Union[AsyncSession, sessionmaker], payload: Dict[str, dict], test_mode: bool = False) -> Union[bool, str]

Update the transformation status of the file replicas.

Source code in datatrail/commit/commit_api.py
Python
async def update_deletion_status_full_dataset(
    session: Union[AsyncSession, sessionmaker],
    payload: Dict[str, dict],
    test_mode: bool = False,
) -> Union[bool, str]:
    """Update the transformation status of the file replicas."""
    try:
        storage_name = payload["storage_name"]
        status = payload["payload"]
        storage_map = db_queries.get_storage_map(session)

        for replica_id, fr_status in status.items():
            fr = db_queries.fetch_file_replica_by_id(session, replica_id)
            if fr.storage_id == storage_map[storage_name]:
                fr = await update_deletion_state(fr, fr_status)
                f = fr.belongs_to
                if fr_status == "completed":
                    f.num_available_replicas = len(
                        [
                            r
                            for r in f.file_replicas
                            if r.deletion_state != models.TransformationState.completed
                        ]
                    )

        session.commit()
        return True
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

update_replication_policy async

Python
update_replication_policy(session: Union[AsyncSession, sessionmaker], scope: str, name: str, replication_policy: datasets.ReplicationPolicy, test_mode: bool = False, propagate_policy: bool = True) -> Union[bool, str]

Update the replication policy of a dataset.

Source code in datatrail/commit/commit_api.py
Python
async def update_replication_policy(
    session: Union[AsyncSession, sessionmaker],
    scope: str,
    name: str,
    replication_policy: datasets.ReplicationPolicy,
    test_mode: bool = False,
    propagate_policy: bool = True,
) -> Union[bool, str]:
    """Update the replication policy of a dataset."""
    try:
        rp = datasets.ReplicationPolicy(**replication_policy)
        urp = datasets.UpdateReplicationPolicy(name=name, scope=scope, policy=rp)
        urp = urp.dict()
        dataset = await get_dataset(session, scope, name, test_mode=test_mode)
        if isinstance(dataset, str):
            return "Could not find dataset"
        vp = validate_policies(urp["policy"], dataset["deletion_policy"])
        if vp is not True:
            return vp
        urp["policy"]["default"] = False
        status = db_queries.update_replication_policy(session, **urp)
        if propagate_policy:
            this_dataset = db_queries.fetch_dataset(session, name, scope)
            container: List[Any] = []
            db_queries.explore_children_datasets(this_dataset, container)
            storage_map = db_queries.get_storage_map(session)
            for ds in container:
                if len(ds.files) > 0:
                    rp_ds, dp_ds = policy_evaluator.determine_policies(
                        session, ds, storage_map
                    )
                    policy_evaluator.apply_policy_to_dataset(
                        session, ds, rp_ds, dp_ds, storage_map
                    )
        session.commit()
        return status
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

update_replication_state async

Python
update_replication_state(storage_map: dict, replica: models.FileReplica, replication_state: str) -> models.FileReplica

Update the replication state and time of the replica.

Source code in datatrail/commit/commit_api.py
Python
async def update_replication_state(
    storage_map: dict,
    replica: models.FileReplica,
    replication_state: str,
) -> models.FileReplica:
    """Update the replication state and time of the replica."""
    replica.replication_state = models.TransformationState[replication_state]
    replica.replication_state_updated_at = pytz.utc.localize(datetime.datetime.utcnow())
    if replication_state == "completed":
        pse = deepcopy(replica.belongs_to.preferred_storage_elements)
        if replica.replicate_to is not None:
            conf = config.fetch_config()
            if storage_map[replica.replicate_to] in pse:
                pse.remove(storage_map[replica.replicate_to])
            if storage_map[replica.storage_id] in pse:
                pse.remove(storage_map[replica.storage_id])
            if pse == []:
                replica.replicate_to = None
            else:
                can_replicate_to = conf["replicate_to"].get(
                    storage_map[replica.storage_id]
                )
                if can_replicate_to is None:
                    replica.replicate_to = None
                if can_replicate_to in pse:
                    replica.replicate_to = storage_map[can_replicate_to]
    return replica

update_transformation_status async

Python
update_transformation_status(session: Union[AsyncSession, sessionmaker], payload: Dict[int, common.TransformationStatus], test_mode: bool = False) -> Union[bool, str]

Update the transformation status of the file replica.

Source code in datatrail/commit/commit_api.py
Python
async def update_transformation_status(
    session: Union[AsyncSession, sessionmaker],
    payload: Dict[int, common.TransformationStatus],
    test_mode: bool = False,
) -> Union[bool, str]:
    """Update the transformation status of the file replica."""
    try:
        files = set()
        for replica_id, transformation_status in payload.items():
            transformation_status = common.TransformationStatus(
                **transformation_status
            ).dict()
            replica = db_queries.fetch_file_replica_by_id(session, replica_id)
            if transformation_status["transformation"] == "replication":
                storage_map = db_queries.get_storage_map(session)
                replica = await update_replication_state(
                    storage_map, replica, transformation_status["status"]
                )
            else:
                # transformation is deletion
                replica = await update_deletion_state(
                    replica, transformation_status["status"]
                )
                if (
                    transformation_status["status"] == "completed"
                    and replica.belongs_to.name not in files
                ):
                    f = replica.belongs_to
                    f.num_available_replicas = len(
                        [
                            fr
                            for fr in f.file_replicas
                            if fr.deletion_state != models.TransformationState.completed
                        ]
                    )
                    files.add(f.name)
        session.commit()
        return True
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

validate_policies

Python
validate_policies(replication_policy: datasets.ReplicationPolicy, deletion_policy: List[Any]) -> Union[bool, str]

Validate policies.

Ensure that each preferred storage element has a deletion policy.

Source code in datatrail/commit/commit_api.py
Python
def validate_policies(
    replication_policy: datasets.ReplicationPolicy,
    deletion_policy: List[Any],
) -> Union[bool, str]:
    """Validate policies.

    Ensure that each preferred storage element has a deletion policy.
    """
    valid: Union[bool, str] = True
    pse = replication_policy["preferred_storage_elements"]
    deletion_storage_elements = [d["storage_element"] for d in deletion_policy]
    for se in pse:
        if se not in deletion_storage_elements:
            valid = f"Error! No deletion policy found for {se}."
    return valid