query
datatrail.query.query_api ¶
Implement the following REST endpoints.
- Create a storage element.
- Fetch storage elements.
- Toggle active state of storage element.
check_dataset_fully_transformed
async
¶
check_dataset_fully_transformed(session: Union[AsyncSession, sessionmaker], name: str, scope: str, storage_name: str, transformation: str, test_mode: bool = False) -> Union[bool, str]
Check if dataset has been fully transformed.
Source code in datatrail/query/query_api.py
async def check_dataset_fully_transformed(
session: Union[AsyncSession, sessionmaker],
name: str,
scope: str,
storage_name: str,
transformation: str,
test_mode: bool = False,
) -> Union[bool, str]:
"""Check if dataset has been fully transformed."""
try:
assert transformation in ["deletion", "replication"]
if transformation == "deletion":
return db_queries.is_dataset_fully_deleted(
session, name, scope, storage_name
)
return db_queries.is_dataset_fully_replicated(session, name, scope, storage_name)
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
check_replication ¶
Check if item needs to be replicated.
compare_checksum ¶
dataset_belongs_to ¶
Query the L4 action picker database to get the dataset name.
Source code in datatrail/query/query_api.py
def dataset_belongs_to(event_number: int, data_type: str):
"""Query the L4 action picker database to get the dataset name."""
assert data_type in ["intensity", "baseband"]
r = requests.post(
"https://frb.chimenet.ca/chimefrb/astro_events/get_event_dataset/",
data={"event_nos": [event_number], "data_type": data_type},
)
if r.status_code in [200, 201]:
actions = r.json()
k = str(event_number)
if len(actions) == 0 or actions[k] == "unregistered":
return "event-missing-l4-actions"
if actions[k] == "realtime-pipeline":
return query_tsar_verification(event_number)
return actions[k]
return ""
dataset_scout ¶
Return the number of file replicas for each storage element.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session
|
Postgres session. |
required | |
dataset_name
|
str
|
Name of dataset. |
required |
dataset_scope
|
str
|
Name of dataset scope. |
required |
Returns:
Name | Type | Description |
---|---|---|
count |
Dict
|
Number of file replicas for each storage element. |
Source code in datatrail/query/query_api.py
def dataset_scout(session, dataset_name: str, dataset_scope: str) -> Dict[str, int]:
"""Return the number of file replicas for each storage element.
Args:
session: Postgres session.
dataset_name (str): Name of dataset.
dataset_scope (str): Name of dataset scope.
Returns:
count (Dict): Number of file replicas for each storage element.
"""
storage_map = db_queries.get_storage_map(session)
ds = (
session.query(Dataset)
.filter(Dataset.name == dataset_name, Dataset.scope == dataset_scope)
.first()
)
count: Dict[str, int] = {}
for f in ds.files:
for fr in f.file_replicas:
if fr.deletion_state.name == "available":
if storage_map[fr.storage_id] in count:
count[storage_map[fr.storage_id]] += 1
else:
count[storage_map[fr.storage_id]] = 1
return count
file_info
async
¶
file_info(session: Union[AsyncSession, sessionmaker], payload: Dict[str, Any], test_mode: bool = False) -> Union[Dict[str, Any], str]
Get information about a file.
Source code in datatrail/query/query_api.py
async def file_info(
session: Union[AsyncSession, sessionmaker],
payload: Dict[str, Any],
test_mode: bool = False,
) -> Union[Dict[str, Any], str]:
"""Get information about a file."""
try:
storage_map = db_queries.get_storage_map(session)
f = db_queries.fetch_file(
session,
payload["name"],
storage_map[payload["storage_element_captured_at"]],
)
item = {}
item["id"] = f.id
item["name"] = f.name
item["md5sum"] = f.md5sum
item["size_gb"] = f.size_bytes / 1024**3
item["preferred_storage_elements"] = f.preferred_storage_elements
item["date_created"] = f.date_created.isoformat().split("T")[0]
item["num_replicas"] = f.num_available_replicas
item["replica_info"] = []
elem_dict = db_queries.get_storage_map(session)
for fr in f.file_replicas:
ri = {}
ri["id"] = fr.id
ri["storage_name"] = elem_dict[fr.storage_id]
ri["file_path"] = fr.file_path
ri["md5sum"] = fr.md5sum
ri["date_created"] = None
if fr.date_created:
ri["date_created"] = fr.date_created.isoformat().split("T")[0]
ri["replicate_to"] = None
if fr.replicate_to:
ri["replicate_to"] = elem_dict[fr.replicate_to]
ri["replication_priority"] = None
if fr.replication_priority:
ri["replication_priority"] = fr.replication_priority.name
ri["replication_state"] = fr.replication_state.name
ri["replication_state_updated"] = None
if fr.replication_state_updated_at:
ri[
"replication_state_updated"
] = fr.replication_state_updated_at.isoformat().split("T")[0]
ri["delete_after"] = None
if fr.delete_after:
ri["delete_after"] = fr.delete_after.isoformat().split("T")[0]
ri["deletion_priority"] = None
if fr.deletion_priority:
ri["deletion_priority"] = fr.deletion_priority.name
ri["deletion_state"] = None
if fr.deletion_state:
ri["deletion_state"] = fr.deletion_state.name
ri["deletion_state_updated"] = None
if fr.deletion_state_updated_at:
ri[
"deletion_state_updated"
] = fr.deletion_state_updated_at.isoformat().split("T")[0]
ri["quality"] = compare_checksum(f, fr)
item["replica_info"].append(ri)
return item
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
find_dataset
async
¶
find_dataset(session: Union[AsyncSession, sessionmaker], payload: Dict[str, Any], test_mode: bool = False) -> Union[Dict[Any, Any], str]
Find location of the dataset.
Source code in datatrail/query/query_api.py
async def find_dataset(
session: Union[AsyncSession, sessionmaker],
payload: Dict[str, Any],
test_mode: bool = False,
) -> Union[Dict[Any, Any], str]:
"""Find location of the dataset."""
try:
name = payload["name"]
scope = payload["scope"]
ds = db_queries.fetch_dataset(session, name, scope)
dataset: Dict[Any, Any] = {}
dataset["contains_datasets"] = len(ds.datasets)
dataset["datasets_contained"] = [name for name in ds.datasets]
dataset["file_replica_locations"] = {}
storage_elements = db_queries.fetch_storage_elements(session)
storage_map = {}
for se in storage_elements:
storage_map[se.id] = se.name
for f in ds.files:
file_replicas = f.file_replicas
for fr in file_replicas:
if fr.deletion_state.name == "available":
storage_element_name = storage_map[fr.storage_id]
if storage_element_name not in dataset["file_replica_locations"]:
dataset["file_replica_locations"][storage_element_name] = [
fr.file_path
]
else:
dataset["file_replica_locations"][storage_element_name].append(
fr.file_path
)
return dataset
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
find_dataset_by_id
async
¶
find_dataset_by_id(session: Union[AsyncSession, sessionmaker], id: int, test_mode: bool = False) -> Union[Dict[Any, Any], str]
Find dataset by id.
Source code in datatrail/query/query_api.py
async def find_dataset_by_id(
session: Union[AsyncSession, sessionmaker], id: int, test_mode: bool = False
) -> Union[Dict[Any, Any], str]:
"""Find dataset by id."""
try:
ds = db_queries.fetch_dataset_by_id(session, id)
dataset: Dict[Any, Any] = {}
dataset["name"] = ds.name
dataset["scope"] = ds.scope
dataset["replication_policy"] = ds.replication_policy
dataset["deletion_policy"] = ds.deletion_policy
dataset["files"] = []
for f in ds.files:
file: Dict[Any, Any] = {}
file["name"] = f.name
dataset["files"].append(file)
dataset["belongs_to"] = []
for i in ds.belongs_to:
item: Dict[Any, Any] = {}
item["name"] = i.name
item["scope"] = i.scope
dataset["belongs_to"].append(item)
return dataset
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
find_datasets_at_storage_element
async
¶
find_datasets_at_storage_element(session: Union[AsyncSession, sessionmaker], payload: Dict[str, Any], test_mode: bool = False) -> Union[Dict[str, Any], str]
Find all datasets at a storage element.
Source code in datatrail/query/query_api.py
async def find_datasets_at_storage_element(
session: Union[AsyncSession, sessionmaker],
payload: Dict[str, Any],
test_mode: bool = False,
) -> Union[Dict[str, Any], str]:
"""Find all datasets at a storage element."""
try:
storage_element_name = payload["storage_element_name"]
ds = db_queries.fetch_datasets_at_storage_element(session, storage_element_name)
payload = {}
for d in ds:
if d[0] in payload:
payload[d[0]].append(d[1])
else:
payload[d[0]] = [d[1]]
return payload
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
find_file_by_id
async
¶
Find file by id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session
|
Union[AsyncSession, sessionmaker]
|
Postgres session. |
required |
id
|
int
|
File's id number. |
required |
Returns:
Name | Type | Description |
---|---|---|
file |
Dict
|
Dictionary containing file's info. |
Source code in datatrail/query/query_api.py
async def find_file_by_id(
session: Union[AsyncSession, sessionmaker], id: int
) -> Dict[str, Any]:
"""Find file by id.
Args:
session: Postgres session.
id (int): File's id number.
Returns:
file (Dict): Dictionary containing file's info.
"""
try:
f = db_queries.fetch_file_by_id(session, id)
file: Dict[str, Any] = {}
file["id"] = f.id
file["name"] = f.name
file["md5sum"] = f.md5sum
file["size_gb"] = f.size_bytes / 1024**3
file["preferred_storage_elements"] = f.preferred_storage_elements
file["date_created"] = f.date_created.isoformat().split("T")[0]
file["num_replicas"] = f.num_available_replicas
file["replica_info"] = []
elem_dict = db_queries.get_storage_map(session)
for fr in f.file_replicas:
ri = {}
ri["id"] = fr.id
ri["storage_name"] = elem_dict[fr.storage_id]
ri["file_path"] = fr.file_path
ri["quality"] = compare_checksum(f, fr)
file["replica_info"].append(ri)
return file
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return {"error": str(e)}
find_num_replicas_by_deletion_status
async
¶
find_num_replicas_by_deletion_status(session: Union[AsyncSession, sessionmaker], status: str, duration: str, test_mode: bool = False) -> Union[Dict[str, int], str]
Find number of file replicas matching deletion status.
Source code in datatrail/query/query_api.py
async def find_num_replicas_by_deletion_status(
session: Union[AsyncSession, sessionmaker],
status: str,
duration: str,
test_mode: bool = False,
) -> Union[Dict[str, int], str]:
"""Find number of file replicas matching deletion status."""
try:
deletion_state_updated_at_after = None
if status == "completed":
deletion_state_updated_at_after = get_state_updated_after(duration)
num_replicas = db_queries.find_num_replicas_by_deletion_status(
session, status, deletion_state_updated_at_after
)
payload = {status: num_replicas}
return payload
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
find_num_replicas_by_replication_status
async
¶
find_num_replicas_by_replication_status(session: Union[AsyncSession, sessionmaker], status: str, duration: str, test_mode: bool = False) -> Union[Dict[str, int], str]
Find number of file replicas matching replication status.
Source code in datatrail/query/query_api.py
async def find_num_replicas_by_replication_status(
session: Union[AsyncSession, sessionmaker],
status: str,
duration: str,
test_mode: bool = False,
) -> Union[Dict[str, int], str]:
"""Find number of file replicas matching replication status."""
try:
replication_state_updated_at_after = None
if status == "completed":
replication_state_updated_at_after = get_state_updated_after(duration)
num_replicas = db_queries.find_num_replicas_by_replication_status(
session, status, replication_state_updated_at_after
)
payload = {status: num_replicas}
return payload
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
find_replica_by_id
async
¶
Find file replica by id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session
|
Union[AsyncSession, sessionmaker]
|
Postgres session. |
required |
id
|
int
|
File replica's id number. |
required |
Returns:
Name | Type | Description |
---|---|---|
replica |
Dict
|
Dictionary containing replica's info. |
Source code in datatrail/query/query_api.py
async def find_replica_by_id(
session: Union[AsyncSession, sessionmaker], id: int
) -> Dict[str, Any]:
"""Find file replica by id.
Args:
session: Postgres session.
id (int): File replica's id number.
Returns:
replica (Dict): Dictionary containing replica's info.
"""
try:
fr = db_queries.fetch_file_replica_by_id(session, id)
storage_map = db_queries.get_storage_map(session)
replica: Dict[str, Any] = {}
replica["id"] = fr.id
replica["file_id"] = fr.file_id
replica["file_path"] = fr.file_path
replica["md5sum"] = fr.md5sum
replica["date_created"] = fr.date_created.isoformat().split("T")[0]
replica["storage_id"] = storage_map[fr.storage_id]
replica["replicate_to"] = None
if fr.replicate_to:
replica["replicate_to"] = storage_map[fr.replicate_to]
replica["replication_priority"] = None
if fr.replication_priority:
replica["replication_priority"] = fr.replication_priority.name
replica["replication_state"] = None
if fr.replication_state:
replica["replication_state"] = fr.replication_state.name
replica["replication_state_updated_at"] = None
if fr.replication_state_updated_at:
replica[
"replication_state_updated_at"
] = fr.replication_state_updated_at.isoformat().split("T")[0]
replica["deletion_priority"] = None
if fr.deletion_priority:
replica["deletion_priority"] = fr.deletion_priority.name
replica["deletion_state"] = None
if fr.deletion_state:
replica["deletion_state"] = fr.deletion_state.name
replica["deletion_state_updated_at"] = None
if fr.deletion_state_updated_at:
replica[
"deletion_state_updated_at"
] = fr.deletion_state_updated_at.isoformat().split("T")[0]
replica["delete_after"] = None
if fr.delete_after:
replica["delete_after"] = fr.delete_after.isoformat().split("T")[0]
return replica
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return {"error": str(e)}
find_replica_info_by_deletion_status
async
¶
find_replica_info_by_deletion_status(session: Union[AsyncSession, sessionmaker], status: str, duration: str, limit: int = 100, test_mode: bool = False) -> Union[List[Dict[Any, Any]], str]
Find number of file replicas matching deletion status.
Source code in datatrail/query/query_api.py
async def find_replica_info_by_deletion_status(
session: Union[AsyncSession, sessionmaker],
status: str,
duration: str,
limit: int = 100,
test_mode: bool = False,
) -> Union[List[Dict[Any, Any]], str]:
"""Find number of file replicas matching deletion status."""
try:
deletion_state_updated_at_after = None
if status == "completed":
deletion_state_updated_at_after = get_state_updated_after(duration)
replicas = db_queries.find_replica_info_by_deletion_status(
session, status, limit, deletion_state_updated_at_after
)
payload = []
elem_dict = db_queries.get_storage_map(session)
for r in replicas:
pl = {
"file_path": r.file_path,
"deletion_state_updated_at": r.deletion_state_updated_at.strftime(
"%Y-%m-%d %H:%M:%S.%f"
),
"deletion_state": r.deletion_state.name,
"storage_name": elem_dict[r.storage_id],
}
payload.append(pl)
return payload
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
find_replica_info_by_replication_status
async
¶
find_replica_info_by_replication_status(session: Union[AsyncSession, sessionmaker], status: str, duration: str, limit: int = 100, test_mode: bool = False) -> Union[List[Dict[Any, Any]], str]
Find number of file replicas matching replication status.
Source code in datatrail/query/query_api.py
async def find_replica_info_by_replication_status(
session: Union[AsyncSession, sessionmaker],
status: str,
duration: str,
limit: int = 100,
test_mode: bool = False,
) -> Union[List[Dict[Any, Any]], str]:
"""Find number of file replicas matching replication status."""
try:
replication_state_updated_at_after = None
if status == "completed":
replication_state_updated_at_after = get_state_updated_after(duration)
replicas = db_queries.find_replica_info_by_replication_status(
session, status, limit, replication_state_updated_at_after
)
log.info("Replicas: ", replicas)
payload = []
elem_dict = db_queries.get_storage_map(session)
for r in replicas:
pl = {
"file_path": r.file_path,
"replication_state_updated_at": r.replication_state_updated_at.strftime(
"%Y-%m-%d %H:%M:%S.%f"
),
"replication_state": r.replication_state.name,
"storage_name": elem_dict[r.storage_id],
"replicate_to": (
elem_dict[r.replicate_to] if r.replicate_to is not None else None
),
}
payload.append(pl)
return payload
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
get_dataset
async
¶
get_dataset(session: Union[AsyncSession, sessionmaker], scope: str, name: str, test_mode: bool = False) -> Union[Dict[str, Any], str]
Get a specific dataset.
Source code in datatrail/query/query_api.py
async def get_dataset(
session: Union[AsyncSession, sessionmaker],
scope: str,
name: str,
test_mode: bool = False,
) -> Union[Dict[str, Any], str]:
"""Get a specific dataset."""
try:
d = db_queries.fetch_dataset(session, name, scope)
item = {}
item["name"] = d.name
item["scope"] = d.scope
item["replication_policy"] = d.replication_policy
item["deletion_policy"] = d.deletion_policy
item["num_files"] = len(d.files)
item["belongs_to"] = []
for i in d.belongs_to:
item["belongs_to"].append({"scope": i.scope, "name": i.name})
return item
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
get_dataset_by_file_replica_deletion_status
async
¶
get_dataset_by_file_replica_deletion_status(session: Union[AsyncSession, sessionmaker], status: str, storage_name: str, num_datasets: int = 10) -> List[Dataset]
Find datasets by file replica deletion status.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session
|
Union[AsyncSession, sessionmaker]
|
Postgres session. |
required |
status
|
str
|
Deletion status. |
required |
storage_name
|
str
|
Storage name. |
required |
num_datasets
|
int
|
Number of datasets. |
10
|
Returns:
Name | Type | Description |
---|---|---|
datasets |
List[Dataset]
|
List of datasets. |
Source code in datatrail/query/query_api.py
async def get_dataset_by_file_replica_deletion_status(
session: Union[AsyncSession, sessionmaker],
status: str,
storage_name: str,
num_datasets: int = 10,
) -> List[Dataset]:
"""Find datasets by file replica deletion status.
Args:
session (Union[AsyncSession, sessionmaker]): Postgres session.
status (str): Deletion status.
storage_name (str): Storage name.
num_datasets (int, optional): Number of datasets.
Returns:
datasets (List[Dataset]): List of datasets.
"""
try:
storage_map = db_queries.get_storage_map(session)
storage_id = storage_map[storage_name]
datasets = db_queries.fetch_datasets_by_file_replica_deletion_status(
session, status, storage_id, num_datasets
)
return datasets
except Exception as error: # pragma: no cover
session.rollback()
log.error(str(error))
raise error
get_dataset_children
async
¶
get_dataset_children(session: Union[AsyncSession, sessionmaker], scope: str, name: str, test_mode: bool = False) -> Union[Dict[str, Any], str]
Get dataset's children.
Source code in datatrail/query/query_api.py
async def get_dataset_children(
session: Union[AsyncSession, sessionmaker],
scope: str,
name: str,
test_mode: bool = False,
) -> Union[Dict[str, Any], str]:
"""Get dataset's children."""
try:
d = db_queries.fetch_dataset(session, name, scope)
children = [child.name for child in d.datasets]
return {"name": d.name, "scope": d.scope, "contains": children}
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
get_dataset_deletion_date
async
¶
get_dataset_deletion_date(session: Union[AsyncSession, sessionmaker], dataset: str) -> Dict[str, Dict[str, Any]]
List deletion date and state for dataset in all scopes dataset exists in.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session
|
Union[AsyncSession, sessionmaker]
|
Postgres session. |
required |
dataset
|
str
|
Name of dataset. |
required |
Returns:
Name | Type | Description |
---|---|---|
deletion_info |
Dict[str, Dict[str, Any]]
|
Dictionary of scope containing storage elements contain deletion date and state. |
Source code in datatrail/query/query_api.py
async def get_dataset_deletion_date(
session: Union[AsyncSession, sessionmaker], dataset: str
) -> Dict[str, Dict[str, Any]]:
"""List deletion date and state for dataset in all scopes dataset exists in.
Args:
session: Postgres session.
dataset (str): Name of dataset.
Returns:
deletion_info: Dictionary of scope containing storage elements contain deletion
date and state.
"""
try:
deletion_info: Dict[str, Dict[str, Any]] = {}
storage_map = db_queries.get_storage_map(session)
scopes = db_queries.fetch_all_dataset_scopes(session)
datasets: Dict[str, Any] = {}
for scope in scopes:
ds = db_queries.fetch_dataset(session, dataset, scope)
if ds is not None:
datasets[scope] = ds
for scope, ds in zip(datasets.keys(), datasets.values()):
for f in ds.files:
for fr in f.file_replicas:
se = storage_map[fr.storage_id]
if scope not in deletion_info.keys():
deletion_info[scope] = {}
if se not in deletion_info[scope].keys():
deletion_info[scope][se] = {
"deletion_date": fr.delete_after,
"deletion_state": fr.deletion_state.name,
}
elif deletion_info[scope][se]["deletion_date"] > fr.delete_after:
deletion_info[scope][se]["deletion_date"] = fr.delete_after
deletion_info[scope][se][
"deletion_state"
] = fr.deletion_state.name
else:
pass
for scope in deletion_info.keys():
for se in deletion_info[scope].keys():
deletion_info[scope][se]["deletion_date"] = deletion_info[scope][se][
"deletion_date"
].isoformat()
return deletion_info
except Exception as error: # pragma: no cover
session.rollback()
log.error(str(error))
raise error
get_dataset_scopes
async
¶
get_dataset_scopes(session: Union[AsyncSession, sessionmaker], dataset_name: str = '', test_mode: bool = False) -> Union[List[Dict[str, Any]], str]
Get all scopes.
Source code in datatrail/query/query_api.py
async def get_dataset_scopes(
session: Union[AsyncSession, sessionmaker],
dataset_name: str = "",
test_mode: bool = False,
) -> Union[List[Dict[str, Any]], str]:
"""Get all scopes."""
try:
if dataset_name:
return db_queries.fetch_dataset_scopes(session, dataset_name)
else:
return db_queries.fetch_all_dataset_scopes(session)
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
get_datasets
async
¶
get_datasets(session: Union[AsyncSession, sessionmaker], scope: str, test_mode: bool = False) -> Union[List[Dict[str, Any]], str]
Get all datasets for a scope.
Source code in datatrail/query/query_api.py
async def get_datasets(
session: Union[AsyncSession, sessionmaker], scope: str, test_mode: bool = False
) -> Union[List[Dict[str, Any]], str]:
"""Get all datasets for a scope."""
try:
datasets = []
for d in db_queries.fetch_datasets(session, scope):
item = {}
item["name"] = d.name
item["scope"] = d.scope
item["replication_policy"] = d.replication_policy
item["deletion_policy"] = d.deletion_policy
item["num_files"] = len(d.files)
item["belongs_to"] = []
for i in d.belongs_to:
item["belongs_to"].append({"scope": i.scope, "name": i.name})
datasets.append(item)
return datasets
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
get_datasets_files
async
¶
get_datasets_files(session: Union[AsyncSession, sessionmaker], payload: Dict[str, Any]) -> Union[Dict[str, Any], str]
Get all datasets and files.
Source code in datatrail/query/query_api.py
async def get_datasets_files(
session: Union[AsyncSession, sessionmaker], payload: Dict[str, Any]
) -> Union[Dict[str, Any], str]:
"""Get all datasets and files."""
try:
output: Dict[str, Any] = {}
if "dataset_scope" not in payload or "dataset_names" not in payload:
return output
if len(payload["dataset_names"]) == 0:
num_datasets = db_queries.count_datasets(session, payload["dataset_scope"])
datasets = []
for d in db_queries.fetch_datasets_limit(
session, payload["dataset_scope"], num=50
):
item = {}
item["name"] = d.name
item["scope"] = d.scope
item["replication_policy"] = d.replication_policy
item["deletion_policy"] = d.deletion_policy
item["num_files"] = len(d.files)
item["belongs_to"] = []
for i in d.belongs_to:
item["belongs_to"].append({"scope": i.scope, "name": i.name})
datasets.append(item)
output["num_datasets"] = num_datasets
output["datasets"] = datasets
elif len(payload["dataset_names"]) > 1:
datasets = []
for d in db_queries.fetch_datasets_in(
session, payload["dataset_scope"], payload["dataset_names"][:50]
):
item = {}
item["name"] = d.name
item["scope"] = d.scope
item["replication_policy"] = d.replication_policy
item["deletion_policy"] = d.deletion_policy
item["num_files"] = len(d.files)
item["belongs_to"] = []
for i in d.belongs_to:
item["belongs_to"].append({"scope": i.scope, "name": i.name})
datasets.append(item)
output["datasets"] = datasets
else: # only one name
d = db_queries.fetch_dataset(
session, payload["dataset_names"][0], payload["dataset_scope"]
)
item = {}
item["name"] = d.name
item["scope"] = d.scope
item["replication_policy"] = d.replication_policy
item["deletion_policy"] = d.deletion_policy
item["num_files"] = len(d.files)
item["belongs_to"] = []
for i in d.belongs_to:
item["belongs_to"].append({"scope": i.scope, "name": i.name})
output["datasets"] = [item]
files = d.files
items = []
for f in files:
item = {}
item["name"] = f.name
item["size_gb"] = f.size_bytes / 1024**3
item["num_replicas"] = len(f.file_replicas)
item["replica_info"] = []
for fr in f.file_replicas:
ri = {}
ri["storage_element"] = db_queries.fetch_storage_element_by_id(
session, fr.storage_id
).name
ri["delete_after"] = None
if fr.delete_after:
ri["delete_after"] = fr.delete_after.isoformat().split("T")[0]
ri["replicate_to"] = db_queries.fetch_storage_element_by_id(
session, fr.replicate_to
)
ri = check_replication(ri)
ri["quality"] = "corrupt"
ri["quality"] = compare_checksum(fr, f)
ri["deletion_state"] = fr.deletion_state.name
item["replica_info"].append(ri)
items.append(item)
output["files"] = items
return output
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
get_file_replicas_by_deletion_status
async
¶
get_file_replicas_by_deletion_status(session: Union[AsyncSession, sessionmaker], status: str, storage_name: str, num_replicas: int = 1000) -> List[FileReplica]
Find file replicas by deletion status.
Source code in datatrail/query/query_api.py
async def get_file_replicas_by_deletion_status(
session: Union[AsyncSession, sessionmaker],
status: str,
storage_name: str,
num_replicas: int = 1000,
) -> List[FileReplica]:
"""Find file replicas by deletion status."""
try:
storage_map = db_queries.get_storage_map(session)
storage_id = storage_map[storage_name]
replicas = db_queries.fetch_file_replicas_by_deletion_status_at_storage_element(
session, status, storage_id, num_replicas
)
return replicas
except Exception as error: # pragma: no cover
session.rollback()
log.error(str(error))
raise error
get_file_replicas_by_replication_status
async
¶
get_file_replicas_by_replication_status(session: Union[AsyncSession, sessionmaker], status: str, storage_name: str, replicate_to: str, num_replicas: int = 1000) -> List[FileReplica]
Find file replicas by replication status.
Useful for finding replicas that are at replicate_to, but do not have an entry in the Datatrail database.
Source code in datatrail/query/query_api.py
async def get_file_replicas_by_replication_status(
session: Union[AsyncSession, sessionmaker],
status: str,
storage_name: str,
replicate_to: str,
num_replicas: int = 1000,
) -> List[FileReplica]:
"""Find file replicas by replication status.
Useful for finding replicas that are at replicate_to, but do not have an entry in the
Datatrail database.
"""
try:
storage_map = db_queries.get_storage_map(session)
storage_id = storage_map[storage_name]
replicate_to_id = storage_map[replicate_to]
replicas = (
db_queries.fetch_file_replicas_by_replication_status_at_storage_element(
session, status, storage_id, replicate_to_id, num_replicas
)
)
return replicas
except Exception as error: # pragma: no cover
session.rollback()
log.error(str(error))
raise error
get_files
async
¶
get_files(session: Union[AsyncSession, sessionmaker], scope: str, name: str, test_mode: bool = False) -> Union[List[Dict[str, Any]], str]
Fetch files from a dataset.
Source code in datatrail/query/query_api.py
async def get_files(
session: Union[AsyncSession, sessionmaker],
scope: str,
name: str,
test_mode: bool = False,
) -> Union[List[Dict[str, Any]], str]:
"""Fetch files from a dataset."""
try:
files = db_queries.fetch_files_from_dataset(session, name, scope)
items = []
for f in files:
item = {}
item["name"] = f.name
item["md5sum"] = f.md5sum
item["size_bytes"] = f.size_bytes
item["num_replicas"] = len(f.file_replicas)
item["preferred_storage_elements"] = f.preferred_storage_elements
items.append(item)
return items
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
get_larger_datasets
async
¶
get_larger_datasets(session: Union[AsyncSession, sessionmaker], scope: str, test_mode: bool = False)
Get the larger datasets for a given scope.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session
|
Union[AsyncSession, sessionmaker]
|
Postgres session. |
required |
scope
|
str
|
Name of scope. |
required |
test_mode
|
bool
|
Activate test mode. |
False
|
Source code in datatrail/query/query_api.py
async def get_larger_datasets(
session: Union[AsyncSession, sessionmaker],
scope: str,
test_mode: bool = False,
):
"""Get the larger datasets for a given scope.
Args:
session (Union[AsyncSession, sessionmaker]): Postgres session.
scope (str): Name of scope.
test_mode (bool): Activate test mode.
"""
try:
dss = db_queries.fetch_larger_datasets(session, scope)
larger_datasets = [ds.name for ds in dss]
return {"scope": scope, "larger_datasets": larger_datasets}
except Exception as error: # pragma: no cover
session.rollback()
log.error(error)
return str(error)
get_state_updated_after ¶
Get state updated after date.
Source code in datatrail/query/query_api.py
def get_state_updated_after(duration: str):
"""Get state updated after date."""
now = datetime.datetime.utcnow()
if duration == "month":
diff = datetime.timedelta(days=30)
elif duration == "week":
diff = datetime.timedelta(days=7)
elif duration == "day":
diff = datetime.timedelta(days=1)
elif duration == "hour":
diff = datetime.timedelta(hours=1)
else:
return None
return pytz.utc.localize(now - diff)
get_storage_elements
async
¶
get_storage_elements(session: Union[AsyncSession, sessionmaker]) -> Union[List[Dict[str, Any]], str]
Get all storage elements.
Source code in datatrail/query/query_api.py
async def get_storage_elements(
session: Union[AsyncSession, sessionmaker],
) -> Union[List[Dict[str, Any]], str]:
"""Get all storage elements."""
try:
elements = []
for elem in db_queries.fetch_storage_elements(session):
item = {}
item["name"] = elem.name
item["storage_type"] = elem.storage_type.name
item["protocol"] = elem.protocol.name
item["root"] = elem.root
item["active"] = elem.active
item["address"] = elem.address
item["total_gb"] = elem.total_gb
item["available_gb"] = elem.available_gb
item["min_available_gb"] = elem.min_available_gb
item[
"available_gb_last_checked"
] = elem.available_gb_last_checked.isoformat()
elements.append(item)
return elements
except Exception as e: # pragma: no cover
session.rollback()
log.error(str(e))
return str(e)
query_tsar_verification ¶
Fetch tsar verification from frb-master's verifications.
Source code in datatrail/query/query_api.py
def query_tsar_verification(event_number: int) -> str:
"""Fetch tsar verification from frb-master's verifications."""
try:
backend = chime_frb_api.frb_master.FRBMaster()
verification = backend.API.get(
f"/v1/verification/get-verification/{event_number}"
)
except Exception as e:
log.error(f"Failed to get tsar classification for event: {event_number}")
log.error(str(e))
return ""
if verification is None:
return "event-missing-verification"
user_verifications = verification.get("user_verification", [])
ratings = []
for uv in user_verifications:
if uv["id"] == "l4_pipeline":
continue
if uv["classification"] == "KNOWN SOURCE" and uv["comments"] in "known pulsar":
ratings.append("classified.PULSAR")
elif uv["classification"] == "FAINT":
ratings.append("classified.NOISE")
elif uv["classification"] == "RFI":
ratings.append("classified.RFI")
else:
# Even if one person thinks that this is an FRB,
# save it for safety (less agreessive action).
return "classified.FRB"
if len(ratings) < 2:
return "pending-tsar-classification"
if "classified.PULSAR" in ratings:
return "classified.PULSAR"
elif "classified.NOISE" in ratings:
return "classified.NOISE"
return "classified.RFI"