commit
datatrail.commit.commit_api ¶
Implement the following REST endpoints.
- *Create a dataset.
- *Register files to a dataset.
- *Update transformation status of file replicas.
- Fetch active replications.
- Fetch upcoming replications.
- Fetch datasets.
- Fetch content of a dataset.
add_file_replicas
async
¶
add_file_replicas(session: Union[AsyncSession, sessionmaker], payload: Dict[str, Any], test_mode: bool = False) -> Union[bool, str]
Add file replicas.
Source code in datatrail/commit/commit_api.py
async def add_file_replicas(
session: Union[AsyncSession, sessionmaker],
payload: Dict[str, Any],
test_mode: bool = False,
) -> Union[bool, str]:
"""Add file replicas."""
try:
storage_map = db_queries.get_storage_map(session)
# TODO Check if files already exist for this dataset.
replicas = []
for fr in payload["file_replicas"]:
r = common.FileReplica(**fr).dict()
r["deletion_state"] = models.TransformationState.available
r["deletion_state_updated_at"] = r["date_created"]
r["replication_state"] = models.TransformationState.available
r["replication_state_updated_at"] = r["date_created"]
replicas.append(r)
status = db_queries.bulk_insert_file_replicas(
session,
replicas,
payload["storage_name"],
storage_map[payload["storage_element_captured_at"]],
)
# identify number of replicas for each file to increament
replica_map: Dict[str, int] = {}
for fr in replicas:
if fr["file_name"] in replica_map:
replica_map[fr["file_name"]] += 1
else:
replica_map[fr["file_name"]] = 1
names = [(file_name,) for file_name in replica_map]
files = db_queries.fetch_files(
session, names, storage_map[payload["storage_element_captured_at"]]
)
for f in files:
f.num_available_replicas = len(
[
fr
for fr in f.file_replicas
if fr.deletion_state != models.TransformationState.completed
]
)
session.commit()
# Now apply the policy to the file replicas
ds = db_queries.fetch_dataset(
session, payload["dataset_name"], payload["dataset_scope"]
)
rp, dp = policy_evaluator.determine_policies(session, ds, storage_map)
for f in files:
file_replicas = [i for i in f.file_replicas]
replica_locations = policy_evaluator.get_replica_locations(
session, f, storage_map
)
if len(replicas):
policy_evaluator.apply_policy_to_dataset_file_replicas(
file_replicas,
replica_locations,
deepcopy(rp),
deepcopy(dp),
storage_map,
)
session.commit()
return status
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
log.error(f"Payload was: {payload}")
return str(e)
attach_datasets
async
¶
attach_datasets(session: Union[AsyncSession, sessionmaker], payload: Union[common.DeletionPayload, common.ReplicationPayload], test_mode: bool = False) -> Union[bool, str]
Attach two datasets.
Source code in datatrail/commit/commit_api.py
async def attach_datasets(
session: Union[AsyncSession, sessionmaker],
payload: Union[common.DeletionPayload, common.ReplicationPayload],
test_mode: bool = False,
) -> Union[bool, str]:
"""Attach two datasets."""
try:
dis = [datasets.DatasetIdentifier(**bt) for bt in payload["belongs_to"]]
ad = datasets.AttachDataset(
name=payload["name"], scope=payload["scope"], belongs_to=dis
)
ad = ad.dict()
# Verify that both the datasets exist before attaching.
ds = db_queries.fetch_dataset(session, payload["name"], payload["scope"])
if ds is None:
# dataset test-scope, 9386707 not found
raise DatasetNotFoundError(payload["name"], payload["scope"])
for bt in payload["belongs_to"]:
belongs_to_ds = db_queries.fetch_dataset(session, bt["name"], bt["scope"])
if belongs_to_ds is None:
raise DatasetNotFoundError(bt["name"], bt["scope"])
# Before attaching datasets see if it is already attached
# Attaching datasets is a long running operation since policies have to be
# passed on to all the file replicas.
# Avoid reattaching datasets already attached.
for larger_dataset in ds.belongs_to:
if (
larger_dataset.name == bt["name"]
and larger_dataset.scope == bt["scope"]
):
# If already attached then no need to attach again. Return True.
return True
# Now attach datasets
status = db_queries.attach_datasets(session, **ad)
ds = db_queries.fetch_dataset(session, payload["name"], payload["scope"])
container: List[Any] = []
db_queries.explore_children_datasets(ds, container)
storage_map = db_queries.get_storage_map(session)
for ds in container:
if len(ds.files) > 0:
rp, dp = policy_evaluator.determine_policies(session, ds, storage_map)
policy_evaluator.apply_policy_to_dataset(
session, ds, rp, dp, storage_map
)
session.commit()
return status
except DatasetNotFoundError as error:
session.rollback()
log.warning(error.message)
return error.message
except Exception as e: # pragma: no cover
session.rollback()
log.error("Something errored out while attaching elements")
log.error(str(e))
return str(e)
create_dataset
async
¶
create_dataset(session: Union[AsyncSession, sessionmaker], payload: Union[common.DeletionPayload, common.ReplicationPayload], test_mode: bool = False) -> Union[bool, str]
Create a dataset.
Source code in datatrail/commit/commit_api.py
async def create_dataset(
session: Union[AsyncSession, sessionmaker],
payload: Union[common.DeletionPayload, common.ReplicationPayload],
test_mode: bool = False,
) -> Union[bool, str]:
"""Create a dataset."""
try:
conf = config.fetch_config()
storage_elements = await get_storage_elements(session)
if isinstance(storage_elements, str):
return False
ds = db_queries.fetch_dataset(session, payload["name"], payload["scope"])
if ds:
return True
if "replication_policy" in payload:
rp = datasets.ReplicationPolicy(**payload["replication_policy"])
payload["replication_policy"] = rp
replication_default = False
else:
payload["replication_policy"] = conf["default_replication_policy"]
replication_default = True
if "deletion_policy" in payload:
dp: Any = [datasets.DeletionPolicy(**p) for p in payload["deletion_policy"]]
# Add default policies for other sites depending on their type.
sites = [d.storage_element for d in dp]
for s in storage_elements:
if s["name"] in sites:
continue
policy = {
"storage_element": s["name"],
"delete_after_days": conf["default_deletion_policy"][
s["storage_type"]
]["delete_after_days"],
"priority": conf["default_deletion_policy"][s["storage_type"]][
"priority"
],
}
dp.append(datasets.DeletionPolicy(**policy))
payload["deletion_policy"] = dp
deletion_default = False
else:
# Add default policies for all sites depending on their type.
ddp = []
for s in storage_elements:
dp = {
"storage_element": s["name"],
"delete_after_days": conf["default_deletion_policy"][
s["storage_type"]
]["delete_after_days"],
"priority": conf["default_deletion_policy"][s["storage_type"]][
"priority"
],
}
ddp.append(datasets.DeletionPolicy(**dp))
payload["deletion_policy"] = ddp
deletion_default = True
ds = datasets.CreateDataset(**payload)
payload = ds.dict()
payload["replication_policy"]["default"] = replication_default
for dp in payload["deletion_policy"]:
dp["default"] = deletion_default
vp = validate_policies(payload["replication_policy"], payload["deletion_policy"])
if vp is not True:
return vp
status = db_queries.insert_dataset(session, **payload)
return status
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
create_storage_element
async
¶
create_storage_element(session: Union[AsyncSession, sessionmaker], payload: Union[common.DeletionPayload, common.ReplicationPayload]) -> Union[bool, str]
Create a storage element.
Source code in datatrail/commit/commit_api.py
async def create_storage_element(
session: Union[AsyncSession, sessionmaker],
payload: Union[common.DeletionPayload, common.ReplicationPayload],
) -> Union[bool, str]:
"""Create a storage element."""
try:
se = db_queries.fetch_storage_element(session, payload["name"])
if se:
return True
cse = storage_element.CreateStorageElement(**payload)
payload = cse.dict()
status = db_queries.insert_storage_element(session, **payload)
return status
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
detach_datasets
async
¶
detach_datasets(session: Union[AsyncSession, sessionmaker], payload: Union[common.DeletionPayload, common.ReplicationPayload], test_mode: bool = False) -> Union[bool, str]
Detach two datasets.
Source code in datatrail/commit/commit_api.py
async def detach_datasets(
session: Union[AsyncSession, sessionmaker],
payload: Union[common.DeletionPayload, common.ReplicationPayload],
test_mode: bool = False,
) -> Union[bool, str]:
"""Detach two datasets."""
try:
dis = [datasets.DatasetIdentifier(**bt) for bt in payload["detach_from"]]
ad = datasets.DetachDataset(
name=payload["name"], scope=payload["scope"], detach_from=dis
)
ad = ad.dict()
status = db_queries.detach_datasets(session, **ad)
ds = db_queries.fetch_dataset(session, payload["name"], payload["scope"])
container: List[Any] = []
db_queries.explore_children_datasets(ds, container)
storage_map = db_queries.get_storage_map(session)
for ds in container:
if len(ds.files) > 0:
rp, dp = policy_evaluator.determine_policies(session, ds, storage_map)
policy_evaluator.apply_policy_to_dataset(
session, ds, rp, dp, storage_map
)
session.commit()
return status
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
register_files
async
¶
register_files(session: Union[AsyncSession, sessionmaker], payload: Union[common.DeletionPayload, common.ReplicationPayload], test_mode: bool = False) -> Union[bool, str]
Register files to a dataset.
Source code in datatrail/commit/commit_api.py
async def register_files(
session: Union[AsyncSession, sessionmaker],
payload: Union[common.DeletionPayload, common.ReplicationPayload],
test_mode: bool = False,
) -> Union[bool, str]:
"""Register files to a dataset."""
try:
# If nothing to register
if payload["files"] == []:
return True
storage_map = db_queries.get_storage_map(session)
# Skip if files already exist for this dataset.
conf = config.fetch_config()
ds = db_queries.fetch_dataset(
session, payload["dataset_name"], payload["dataset_scope"]
)
if ds.files:
ds_file_names = [f.name for f in ds.files]
fs_file_names = [f["name"] for f in payload["files"]]
if set(fs_file_names) <= set(ds_file_names):
return True
for f in payload["files"]:
# Check that name is path minus root directory.
assert f["name"].startswith(
"data/"
), "File names must start with data/, ie have site-specific root removed."
# Continue otherwise.
fs = [common.File(**f) for f in payload["files"]]
pl = files.AttachFilesToDataset(
dataset_name=payload["dataset_name"],
dataset_scope=payload["dataset_scope"],
files=fs,
)
pl = pl.dict()
for f in pl["files"]:
f["preferred_storage_elements"] = conf["default_replication_policy"][
"preferred_storage_elements"
]
f["storage_element_captured_at"] = storage_map[
f["storage_element_captured_at"]
]
status = db_queries.bulk_insert_files(session, **pl)
session.commit()
ds = db_queries.fetch_dataset(
session, payload["dataset_name"], payload["dataset_scope"]
)
rp, dp = policy_evaluator.determine_policies(session, ds, storage_map)
policy_evaluator.apply_policy_to_dataset(session, ds, rp, dp, storage_map)
session.commit()
return status
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
scout_sync_storage_element
async
¶
scout_sync_storage_element(session, name: str, scope: str, data: Dict[str, str], replicate_to: str) -> None
Scout sync creates replicas for existing files missing in Datatrail.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session
|
PSQL Session
|
Postgres session. |
required |
name
|
str
|
Dataset name. |
required |
scope
|
str
|
Dataset scope. |
required |
data
|
Dict[str, str]
|
Dictionary containing the file name and md5sum at se. |
required |
Raises:
Type | Description |
---|---|
DatasetNotFoundError
|
If the dataset is not found in the database. |
Returns:
Name | Type | Description |
---|---|---|
None |
None
|
None. |
Source code in datatrail/commit/commit_api.py
async def scout_sync_storage_element(
session, name: str, scope: str, data: Dict[str, str], replicate_to: str
) -> None:
"""Scout sync creates replicas for existing files missing in Datatrail.
Args:
session (PSQL Session): Postgres session.
name (str): Dataset name.
scope (str): Dataset scope.
data (Dict[str, str]): Dictionary containing the file name and md5sum at se.
Raises:
DatasetNotFoundError: If the dataset is not found in the database.
Returns:
None: None.
"""
storage_map = db_queries.get_storage_map(session)
ds = session.query(models.Dataset).filter_by(name=name, scope=scope).first()
if ds is None:
raise DatasetNotFoundError(name, scope)
for f in ds.files:
assert (
replicate_to in f.preferred_storage_elements
), f"{replicate_to} not in preferred storage elements"
frs: Dict[int, models.FileReplica] = {}
for fr in f.file_replicas:
frs[fr.storage_id] = fr
# Check if the storage element's replica is missing
if storage_map[replicate_to] not in frs.keys():
# Find the most recent replica
latest_fr: Optional[models.FileReplica] = None
for fr in frs.values():
if latest_fr is None or latest_fr.date_created < fr.date_created:
latest_fr = fr
if latest_fr is None:
log.warning(f"No replicas found for {f.name}")
continue
# Check that file is in storage element's md5sum data
se_md5 = data.get(f.name)
if se_md5 is None:
log.warning(f"MD5 not found for {f.name}, file not at storage element")
continue
# Create a new replica for the file at se
payload = {
"replicator": [
{
"new_replica": {
"file_path": f.name,
"md5sum": se_md5,
"replica_id": latest_fr.id,
"status": "completed",
"storage_name": replicate_to,
},
"current_replica": {
str(latest_fr.id): {
"status": "completed",
"transformation": "replication",
},
},
"dataset_name": name,
"dataset_scope": scope,
"storage_element_captured_at": storage_map[
f.storage_element_captured_at
],
}
]
}
log.info(payload)
await update_status(
session,
Work(
pipeline="update",
user="datatrail",
site="local",
parameters=payload,
),
)
return None
toggle_storage_element_state
async
¶
toggle_storage_element_state(session: Union[AsyncSession, sessionmaker], name: str) -> Union[bool, str]
Toggle state of storage element.
Source code in datatrail/commit/commit_api.py
async def toggle_storage_element_state(
session: Union[AsyncSession, sessionmaker], name: str
) -> Union[bool, str]:
"""Toggle state of storage element."""
try:
status = db_queries.toggle_storage_element_state(session, name)
return status
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
update_deletion_policy
async
¶
update_deletion_policy(session: Union[AsyncSession, sessionmaker], scope: str, name: str, deletion_policy: List[Any], test_mode: bool = False, propagate_policy: bool = True) -> Union[bool, str]
Update the deletion policy of a dataset.
Source code in datatrail/commit/commit_api.py
async def update_deletion_policy(
session: Union[AsyncSession, sessionmaker],
scope: str,
name: str,
deletion_policy: List[Any],
test_mode: bool = False,
propagate_policy: bool = True,
) -> Union[bool, str]:
"""Update the deletion policy of a dataset."""
try:
udpList = []
for dp in deletion_policy:
p = datasets.DeletionPolicy(**dp)
udpList.append(p.dict())
dataset = await get_dataset(session, scope, name, test_mode=test_mode)
if isinstance(dataset, str):
return "Could not find dataset"
vp = validate_policies(dataset["replication_policy"], udpList)
if vp is not True:
return vp
for p in udpList:
p["default"] = False
status = db_queries.update_deletion_policy(session, scope, name, udpList)
if propagate_policy:
this_dataset = db_queries.fetch_dataset(session, name, scope)
container: List[Any] = []
db_queries.explore_children_datasets(this_dataset, container)
storage_map = db_queries.get_storage_map(session)
for ds in container:
if len(ds.files) > 0:
rp_ds, dp_ds = policy_evaluator.determine_policies(
session, ds, storage_map
)
policy_evaluator.apply_policy_to_dataset(
session, ds, rp_ds, dp_ds, storage_map
)
session.commit()
return status
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
update_deletion_state
async
¶
Update the deletion state and time of the replica.
Source code in datatrail/commit/commit_api.py
async def update_deletion_state(
replica: models.FileReplica,
deletion_state: str,
) -> models.FileReplica:
"""Update the deletion state and time of the replica."""
replica.deletion_state = models.TransformationState[deletion_state]
replica.deletion_state_updated_at = pytz.utc.localize(datetime.datetime.utcnow())
return replica
update_deletion_status_full_dataset
async
¶
update_deletion_status_full_dataset(session: Union[AsyncSession, sessionmaker], payload: Dict[str, dict], test_mode: bool = False) -> Union[bool, str]
Update the transformation status of the file replicas.
Source code in datatrail/commit/commit_api.py
async def update_deletion_status_full_dataset(
session: Union[AsyncSession, sessionmaker],
payload: Dict[str, dict],
test_mode: bool = False,
) -> Union[bool, str]:
"""Update the transformation status of the file replicas."""
try:
storage_name = payload["storage_name"]
status = payload["payload"]
storage_map = db_queries.get_storage_map(session)
for replica_id, fr_status in status.items():
fr = db_queries.fetch_file_replica_by_id(session, replica_id)
if fr.storage_id == storage_map[storage_name]:
fr = await update_deletion_state(fr, fr_status)
f = fr.belongs_to
if fr_status == "completed":
f.num_available_replicas = len(
[
r
for r in f.file_replicas
if r.deletion_state != models.TransformationState.completed
]
)
session.commit()
return True
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
update_replication_policy
async
¶
update_replication_policy(session: Union[AsyncSession, sessionmaker], scope: str, name: str, replication_policy: datasets.ReplicationPolicy, test_mode: bool = False, propagate_policy: bool = True) -> Union[bool, str]
Update the replication policy of a dataset.
Source code in datatrail/commit/commit_api.py
async def update_replication_policy(
session: Union[AsyncSession, sessionmaker],
scope: str,
name: str,
replication_policy: datasets.ReplicationPolicy,
test_mode: bool = False,
propagate_policy: bool = True,
) -> Union[bool, str]:
"""Update the replication policy of a dataset."""
try:
rp = datasets.ReplicationPolicy(**replication_policy)
urp = datasets.UpdateReplicationPolicy(name=name, scope=scope, policy=rp)
urp = urp.dict()
dataset = await get_dataset(session, scope, name, test_mode=test_mode)
if isinstance(dataset, str):
return "Could not find dataset"
vp = validate_policies(urp["policy"], dataset["deletion_policy"])
if vp is not True:
return vp
urp["policy"]["default"] = False
status = db_queries.update_replication_policy(session, **urp)
if propagate_policy:
this_dataset = db_queries.fetch_dataset(session, name, scope)
container: List[Any] = []
db_queries.explore_children_datasets(this_dataset, container)
storage_map = db_queries.get_storage_map(session)
for ds in container:
if len(ds.files) > 0:
rp_ds, dp_ds = policy_evaluator.determine_policies(
session, ds, storage_map
)
policy_evaluator.apply_policy_to_dataset(
session, ds, rp_ds, dp_ds, storage_map
)
session.commit()
return status
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
update_replication_state
async
¶
update_replication_state(storage_map: dict, replica: models.FileReplica, replication_state: str) -> models.FileReplica
Update the replication state and time of the replica.
Source code in datatrail/commit/commit_api.py
async def update_replication_state(
storage_map: dict,
replica: models.FileReplica,
replication_state: str,
) -> models.FileReplica:
"""Update the replication state and time of the replica."""
replica.replication_state = models.TransformationState[replication_state]
replica.replication_state_updated_at = pytz.utc.localize(datetime.datetime.utcnow())
if replication_state == "completed":
pse = deepcopy(replica.belongs_to.preferred_storage_elements)
if replica.replicate_to is not None:
conf = config.fetch_config()
if storage_map[replica.replicate_to] in pse:
pse.remove(storage_map[replica.replicate_to])
if storage_map[replica.storage_id] in pse:
pse.remove(storage_map[replica.storage_id])
if pse == []:
replica.replicate_to = None
else:
can_replicate_to = conf["replicate_to"].get(
storage_map[replica.storage_id]
)
if can_replicate_to is None:
replica.replicate_to = None
if can_replicate_to in pse:
replica.replicate_to = storage_map[can_replicate_to]
return replica
update_transformation_status
async
¶
update_transformation_status(session: Union[AsyncSession, sessionmaker], payload: Dict[int, common.TransformationStatus], test_mode: bool = False) -> Union[bool, str]
Update the transformation status of the file replica.
Source code in datatrail/commit/commit_api.py
async def update_transformation_status(
session: Union[AsyncSession, sessionmaker],
payload: Dict[int, common.TransformationStatus],
test_mode: bool = False,
) -> Union[bool, str]:
"""Update the transformation status of the file replica."""
try:
files = set()
for replica_id, transformation_status in payload.items():
transformation_status = common.TransformationStatus(
**transformation_status
).dict()
replica = db_queries.fetch_file_replica_by_id(session, replica_id)
if transformation_status["transformation"] == "replication":
storage_map = db_queries.get_storage_map(session)
replica = await update_replication_state(
storage_map, replica, transformation_status["status"]
)
else:
# transformation is deletion
replica = await update_deletion_state(
replica, transformation_status["status"]
)
if (
transformation_status["status"] == "completed"
and replica.belongs_to.name not in files
):
f = replica.belongs_to
f.num_available_replicas = len(
[
fr
for fr in f.file_replicas
if fr.deletion_state != models.TransformationState.completed
]
)
files.add(f.name)
session.commit()
return True
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
validate_policies ¶
validate_policies(replication_policy: datasets.ReplicationPolicy, deletion_policy: List[Any]) -> Union[bool, str]
Validate policies.
Ensure that each preferred storage element has a deletion policy.
Source code in datatrail/commit/commit_api.py
def validate_policies(
replication_policy: datasets.ReplicationPolicy,
deletion_policy: List[Any],
) -> Union[bool, str]:
"""Validate policies.
Ensure that each preferred storage element has a deletion policy.
"""
valid: Union[bool, str] = True
pse = replication_policy["preferred_storage_elements"]
deletion_storage_elements = [d["storage_element"] for d in deletion_policy]
for se in pse:
if se not in deletion_storage_elements:
valid = f"Error! No deletion policy found for {se}."
return valid