Skip to content

query

datatrail.query.query_api

Implement the following REST endpoints.

  1. Create a storage element.
  2. Fetch storage elements.
  3. Toggle active state of storage element.

check_dataset_fully_transformed async

Python
check_dataset_fully_transformed(session: Union[AsyncSession, sessionmaker], name: str, scope: str, storage_name: str, transformation: str, test_mode: bool = False) -> Union[bool, str]

Check if dataset has been fully transformed.

Source code in datatrail/query/query_api.py
Python
async def check_dataset_fully_transformed(
    session: Union[AsyncSession, sessionmaker],
    name: str,
    scope: str,
    storage_name: str,
    transformation: str,
    test_mode: bool = False,
) -> Union[bool, str]:
    """Check if dataset has been fully transformed."""
    try:
        assert transformation in ["deletion", "replication"]
        if transformation == "deletion":
            return db_queries.is_dataset_fully_deleted(
                session, name, scope, storage_name
            )
        return db_queries.is_dataset_fully_replicated(session, name, scope, storage_name)
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

check_replication

Python
check_replication(item: Dict[str, Any]) -> Union[Dict[str, Any], Any]

Check if item needs to be replicated.

Source code in datatrail/query/query_api.py
Python
def check_replication(item: Dict[str, Any]) -> Union[Dict[str, Any], Any]:
    """Check if item needs to be replicated."""
    if item["replicate_to"] is not None:
        item["replicate_to"] = item["replicate_to"].name
        return item
    else:
        return item

compare_checksum

Python
compare_checksum(file1: File, file2: File) -> str

Compare checksum of two files.

Source code in datatrail/query/query_api.py
Python
def compare_checksum(file1: File, file2: File) -> str:
    """Compare checksum of two files."""
    if file1.md5sum.strip() == file2.md5sum.strip():
        return "good"
    return "bad"

dataset_belongs_to

Python
dataset_belongs_to(event_number: int, data_type: str)

Query the L4 action picker database to get the dataset name.

Source code in datatrail/query/query_api.py
Python
def dataset_belongs_to(event_number: int, data_type: str):
    """Query the L4 action picker database to get the dataset name."""
    assert data_type in ["intensity", "baseband"]
    r = requests.post(
        "https://frb.chimenet.ca/chimefrb/astro_events/get_event_dataset/",
        data={"event_nos": [event_number], "data_type": data_type},
    )
    if r.status_code in [200, 201]:
        actions = r.json()
        k = str(event_number)
        if len(actions) == 0 or actions[k] == "unregistered":
            return "event-missing-l4-actions"
        if actions[k] == "realtime-pipeline":
            return query_tsar_verification(event_number)
        return actions[k]
    return ""

dataset_scout

Python
dataset_scout(session, dataset_name: str, dataset_scope: str) -> Dict[str, int]

Return the number of file replicas for each storage element.

Parameters:

Name Type Description Default
session

Postgres session.

required
dataset_name str

Name of dataset.

required
dataset_scope str

Name of dataset scope.

required

Returns:

Name Type Description
count Dict

Number of file replicas for each storage element.

Source code in datatrail/query/query_api.py
Python
def dataset_scout(session, dataset_name: str, dataset_scope: str) -> Dict[str, int]:
    """Return the number of file replicas for each storage element.

    Args:
        session: Postgres session.
        dataset_name (str): Name of dataset.
        dataset_scope (str): Name of dataset scope.

    Returns:
        count (Dict): Number of file replicas for each storage element.
    """
    storage_map = db_queries.get_storage_map(session)
    ds = (
        session.query(Dataset)
        .filter(Dataset.name == dataset_name, Dataset.scope == dataset_scope)
        .first()
    )
    count: Dict[str, int] = {}
    for f in ds.files:
        for fr in f.file_replicas:
            if fr.deletion_state.name == "available":
                if storage_map[fr.storage_id] in count:
                    count[storage_map[fr.storage_id]] += 1
                else:
                    count[storage_map[fr.storage_id]] = 1
    return count

file_info async

Python
file_info(session: Union[AsyncSession, sessionmaker], payload: Dict[str, Any], test_mode: bool = False) -> Union[Dict[str, Any], str]

Get information about a file.

Source code in datatrail/query/query_api.py
Python
async def file_info(
    session: Union[AsyncSession, sessionmaker],
    payload: Dict[str, Any],
    test_mode: bool = False,
) -> Union[Dict[str, Any], str]:
    """Get information about a file."""
    try:
        storage_map = db_queries.get_storage_map(session)
        f = db_queries.fetch_file(
            session,
            payload["name"],
            storage_map[payload["storage_element_captured_at"]],
        )
        item = {}
        item["id"] = f.id
        item["name"] = f.name
        item["md5sum"] = f.md5sum
        item["size_gb"] = f.size_bytes / 1024**3
        item["preferred_storage_elements"] = f.preferred_storage_elements
        item["date_created"] = f.date_created.isoformat().split("T")[0]
        item["num_replicas"] = f.num_available_replicas
        item["replica_info"] = []
        elem_dict = db_queries.get_storage_map(session)
        for fr in f.file_replicas:
            ri = {}
            ri["id"] = fr.id
            ri["storage_name"] = elem_dict[fr.storage_id]
            ri["file_path"] = fr.file_path
            ri["md5sum"] = fr.md5sum
            ri["date_created"] = None
            if fr.date_created:
                ri["date_created"] = fr.date_created.isoformat().split("T")[0]
            ri["replicate_to"] = None
            if fr.replicate_to:
                ri["replicate_to"] = elem_dict[fr.replicate_to]
            ri["replication_priority"] = None
            if fr.replication_priority:
                ri["replication_priority"] = fr.replication_priority.name
            ri["replication_state"] = fr.replication_state.name
            ri["replication_state_updated"] = None
            if fr.replication_state_updated_at:
                ri[
                    "replication_state_updated"
                ] = fr.replication_state_updated_at.isoformat().split("T")[0]
            ri["delete_after"] = None
            if fr.delete_after:
                ri["delete_after"] = fr.delete_after.isoformat().split("T")[0]
            ri["deletion_priority"] = None
            if fr.deletion_priority:
                ri["deletion_priority"] = fr.deletion_priority.name
            ri["deletion_state"] = None
            if fr.deletion_state:
                ri["deletion_state"] = fr.deletion_state.name
            ri["deletion_state_updated"] = None
            if fr.deletion_state_updated_at:
                ri[
                    "deletion_state_updated"
                ] = fr.deletion_state_updated_at.isoformat().split("T")[0]
            ri["quality"] = compare_checksum(f, fr)
            item["replica_info"].append(ri)
        return item
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

find_dataset async

Python
find_dataset(session: Union[AsyncSession, sessionmaker], payload: Dict[str, Any], test_mode: bool = False) -> Union[Dict[Any, Any], str]

Find location of the dataset.

Source code in datatrail/query/query_api.py
Python
async def find_dataset(
    session: Union[AsyncSession, sessionmaker],
    payload: Dict[str, Any],
    test_mode: bool = False,
) -> Union[Dict[Any, Any], str]:
    """Find location of the dataset."""
    try:
        name = payload["name"]
        scope = payload["scope"]
        ds = db_queries.fetch_dataset(session, name, scope)
        dataset: Dict[Any, Any] = {}
        dataset["contains_datasets"] = len(ds.datasets)
        dataset["datasets_contained"] = [name for name in ds.datasets]
        dataset["file_replica_locations"] = {}
        storage_elements = db_queries.fetch_storage_elements(session)
        storage_map = {}
        for se in storage_elements:
            storage_map[se.id] = se.name
        for f in ds.files:
            file_replicas = f.file_replicas
            for fr in file_replicas:
                if fr.deletion_state.name == "available":
                    storage_element_name = storage_map[fr.storage_id]
                    if storage_element_name not in dataset["file_replica_locations"]:
                        dataset["file_replica_locations"][storage_element_name] = [
                            fr.file_path
                        ]
                    else:
                        dataset["file_replica_locations"][storage_element_name].append(
                            fr.file_path
                        )
        return dataset
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

find_dataset_by_id async

Python
find_dataset_by_id(session: Union[AsyncSession, sessionmaker], id: int, test_mode: bool = False) -> Union[Dict[Any, Any], str]

Find dataset by id.

Source code in datatrail/query/query_api.py
Python
async def find_dataset_by_id(
    session: Union[AsyncSession, sessionmaker], id: int, test_mode: bool = False
) -> Union[Dict[Any, Any], str]:
    """Find dataset by id."""
    try:
        ds = db_queries.fetch_dataset_by_id(session, id)
        dataset: Dict[Any, Any] = {}
        dataset["name"] = ds.name
        dataset["scope"] = ds.scope
        dataset["replication_policy"] = ds.replication_policy
        dataset["deletion_policy"] = ds.deletion_policy
        dataset["files"] = []
        for f in ds.files:
            file: Dict[Any, Any] = {}
            file["name"] = f.name
            dataset["files"].append(file)
        dataset["belongs_to"] = []
        for i in ds.belongs_to:
            item: Dict[Any, Any] = {}
            item["name"] = i.name
            item["scope"] = i.scope
            dataset["belongs_to"].append(item)
        return dataset
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

find_datasets_at_storage_element async

Python
find_datasets_at_storage_element(session: Union[AsyncSession, sessionmaker], payload: Dict[str, Any], test_mode: bool = False) -> Union[Dict[str, Any], str]

Find all datasets at a storage element.

Source code in datatrail/query/query_api.py
Python
async def find_datasets_at_storage_element(
    session: Union[AsyncSession, sessionmaker],
    payload: Dict[str, Any],
    test_mode: bool = False,
) -> Union[Dict[str, Any], str]:
    """Find all datasets at a storage element."""
    try:
        storage_element_name = payload["storage_element_name"]
        ds = db_queries.fetch_datasets_at_storage_element(session, storage_element_name)
        payload = {}
        for d in ds:
            if d[0] in payload:
                payload[d[0]].append(d[1])
            else:
                payload[d[0]] = [d[1]]
        return payload
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

find_file_by_id async

Python
find_file_by_id(session: Union[AsyncSession, sessionmaker], id: int) -> Dict[str, Any]

Find file by id.

Parameters:

Name Type Description Default
session Union[AsyncSession, sessionmaker]

Postgres session.

required
id int

File's id number.

required

Returns:

Name Type Description
file Dict

Dictionary containing file's info.

Source code in datatrail/query/query_api.py
Python
async def find_file_by_id(
    session: Union[AsyncSession, sessionmaker], id: int
) -> Dict[str, Any]:
    """Find file by id.

    Args:
        session: Postgres session.
        id (int): File's id number.

    Returns:
        file (Dict): Dictionary containing file's info.
    """
    try:
        f = db_queries.fetch_file_by_id(session, id)
        file: Dict[str, Any] = {}
        file["id"] = f.id
        file["name"] = f.name
        file["md5sum"] = f.md5sum
        file["size_gb"] = f.size_bytes / 1024**3
        file["preferred_storage_elements"] = f.preferred_storage_elements
        file["date_created"] = f.date_created.isoformat().split("T")[0]
        file["num_replicas"] = f.num_available_replicas
        file["replica_info"] = []
        elem_dict = db_queries.get_storage_map(session)
        for fr in f.file_replicas:
            ri = {}
            ri["id"] = fr.id
            ri["storage_name"] = elem_dict[fr.storage_id]
            ri["file_path"] = fr.file_path
            ri["quality"] = compare_checksum(f, fr)
            file["replica_info"].append(ri)
        return file
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return {"error": str(e)}

find_num_replicas_by_deletion_status async

Python
find_num_replicas_by_deletion_status(session: Union[AsyncSession, sessionmaker], status: str, duration: str, test_mode: bool = False) -> Union[Dict[str, int], str]

Find number of file replicas matching deletion status.

Source code in datatrail/query/query_api.py
Python
async def find_num_replicas_by_deletion_status(
    session: Union[AsyncSession, sessionmaker],
    status: str,
    duration: str,
    test_mode: bool = False,
) -> Union[Dict[str, int], str]:
    """Find number of file replicas matching deletion status."""
    try:
        deletion_state_updated_at_after = None
        if status == "completed":
            deletion_state_updated_at_after = get_state_updated_after(duration)
        num_replicas = db_queries.find_num_replicas_by_deletion_status(
            session, status, deletion_state_updated_at_after
        )
        payload = {status: num_replicas}
        return payload
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

find_num_replicas_by_replication_status async

Python
find_num_replicas_by_replication_status(session: Union[AsyncSession, sessionmaker], status: str, duration: str, test_mode: bool = False) -> Union[Dict[str, int], str]

Find number of file replicas matching replication status.

Source code in datatrail/query/query_api.py
Python
async def find_num_replicas_by_replication_status(
    session: Union[AsyncSession, sessionmaker],
    status: str,
    duration: str,
    test_mode: bool = False,
) -> Union[Dict[str, int], str]:
    """Find number of file replicas matching replication status."""
    try:
        replication_state_updated_at_after = None
        if status == "completed":
            replication_state_updated_at_after = get_state_updated_after(duration)
        num_replicas = db_queries.find_num_replicas_by_replication_status(
            session, status, replication_state_updated_at_after
        )
        payload = {status: num_replicas}
        return payload
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

find_replica_by_id async

Python
find_replica_by_id(session: Union[AsyncSession, sessionmaker], id: int) -> Dict[str, Any]

Find file replica by id.

Parameters:

Name Type Description Default
session Union[AsyncSession, sessionmaker]

Postgres session.

required
id int

File replica's id number.

required

Returns:

Name Type Description
replica Dict

Dictionary containing replica's info.

Source code in datatrail/query/query_api.py
Python
async def find_replica_by_id(
    session: Union[AsyncSession, sessionmaker], id: int
) -> Dict[str, Any]:
    """Find file replica by id.

    Args:
        session: Postgres session.
        id (int): File replica's id number.

    Returns:
        replica (Dict): Dictionary containing replica's info.
    """
    try:
        fr = db_queries.fetch_file_replica_by_id(session, id)
        storage_map = db_queries.get_storage_map(session)
        replica: Dict[str, Any] = {}
        replica["id"] = fr.id
        replica["file_id"] = fr.file_id
        replica["file_path"] = fr.file_path
        replica["md5sum"] = fr.md5sum
        replica["date_created"] = fr.date_created.isoformat().split("T")[0]
        replica["storage_id"] = storage_map[fr.storage_id]

        replica["replicate_to"] = None
        if fr.replicate_to:
            replica["replicate_to"] = storage_map[fr.replicate_to]

        replica["replication_priority"] = None
        if fr.replication_priority:
            replica["replication_priority"] = fr.replication_priority.name

        replica["replication_state"] = None
        if fr.replication_state:
            replica["replication_state"] = fr.replication_state.name

        replica["replication_state_updated_at"] = None
        if fr.replication_state_updated_at:
            replica[
                "replication_state_updated_at"
            ] = fr.replication_state_updated_at.isoformat().split("T")[0]

        replica["deletion_priority"] = None
        if fr.deletion_priority:
            replica["deletion_priority"] = fr.deletion_priority.name

        replica["deletion_state"] = None
        if fr.deletion_state:
            replica["deletion_state"] = fr.deletion_state.name

        replica["deletion_state_updated_at"] = None
        if fr.deletion_state_updated_at:
            replica[
                "deletion_state_updated_at"
            ] = fr.deletion_state_updated_at.isoformat().split("T")[0]

        replica["delete_after"] = None
        if fr.delete_after:
            replica["delete_after"] = fr.delete_after.isoformat().split("T")[0]
        return replica
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return {"error": str(e)}

find_replica_info_by_deletion_status async

Python
find_replica_info_by_deletion_status(session: Union[AsyncSession, sessionmaker], status: str, duration: str, limit: int = 100, test_mode: bool = False) -> Union[List[Dict[Any, Any]], str]

Find number of file replicas matching deletion status.

Source code in datatrail/query/query_api.py
Python
async def find_replica_info_by_deletion_status(
    session: Union[AsyncSession, sessionmaker],
    status: str,
    duration: str,
    limit: int = 100,
    test_mode: bool = False,
) -> Union[List[Dict[Any, Any]], str]:
    """Find number of file replicas matching deletion status."""
    try:
        deletion_state_updated_at_after = None
        if status == "completed":
            deletion_state_updated_at_after = get_state_updated_after(duration)
        replicas = db_queries.find_replica_info_by_deletion_status(
            session, status, limit, deletion_state_updated_at_after
        )
        payload = []
        elem_dict = db_queries.get_storage_map(session)
        for r in replicas:
            pl = {
                "file_path": r.file_path,
                "deletion_state_updated_at": r.deletion_state_updated_at.strftime(
                    "%Y-%m-%d %H:%M:%S.%f"
                ),
                "deletion_state": r.deletion_state.name,
                "storage_name": elem_dict[r.storage_id],
            }
            payload.append(pl)
        return payload
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

find_replica_info_by_replication_status async

Python
find_replica_info_by_replication_status(session: Union[AsyncSession, sessionmaker], status: str, duration: str, limit: int = 100, test_mode: bool = False) -> Union[List[Dict[Any, Any]], str]

Find number of file replicas matching replication status.

Source code in datatrail/query/query_api.py
Python
async def find_replica_info_by_replication_status(
    session: Union[AsyncSession, sessionmaker],
    status: str,
    duration: str,
    limit: int = 100,
    test_mode: bool = False,
) -> Union[List[Dict[Any, Any]], str]:
    """Find number of file replicas matching replication status."""
    try:
        replication_state_updated_at_after = None
        if status == "completed":
            replication_state_updated_at_after = get_state_updated_after(duration)
        replicas = db_queries.find_replica_info_by_replication_status(
            session, status, limit, replication_state_updated_at_after
        )
        log.info("Replicas: ", replicas)
        payload = []
        elem_dict = db_queries.get_storage_map(session)
        for r in replicas:
            pl = {
                "file_path": r.file_path,
                "replication_state_updated_at": r.replication_state_updated_at.strftime(
                    "%Y-%m-%d %H:%M:%S.%f"
                ),
                "replication_state": r.replication_state.name,
                "storage_name": elem_dict[r.storage_id],
                "replicate_to": (
                    elem_dict[r.replicate_to] if r.replicate_to is not None else None
                ),
            }
            payload.append(pl)
        return payload
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

get_dataset async

Python
get_dataset(session: Union[AsyncSession, sessionmaker], scope: str, name: str, test_mode: bool = False) -> Union[Dict[str, Any], str]

Get a specific dataset.

Source code in datatrail/query/query_api.py
Python
async def get_dataset(
    session: Union[AsyncSession, sessionmaker],
    scope: str,
    name: str,
    test_mode: bool = False,
) -> Union[Dict[str, Any], str]:
    """Get a specific dataset."""
    try:
        d = db_queries.fetch_dataset(session, name, scope)
        item = {}
        item["name"] = d.name
        item["scope"] = d.scope
        item["replication_policy"] = d.replication_policy
        item["deletion_policy"] = d.deletion_policy
        item["num_files"] = len(d.files)
        item["belongs_to"] = []
        for i in d.belongs_to:
            item["belongs_to"].append({"scope": i.scope, "name": i.name})
        return item
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

get_dataset_by_file_replica_deletion_status async

Python
get_dataset_by_file_replica_deletion_status(session: Union[AsyncSession, sessionmaker], status: str, storage_name: str, num_datasets: int = 10) -> List[Dataset]

Find datasets by file replica deletion status.

Parameters:

Name Type Description Default
session Union[AsyncSession, sessionmaker]

Postgres session.

required
status str

Deletion status.

required
storage_name str

Storage name.

required
num_datasets int

Number of datasets.

10

Returns:

Name Type Description
datasets List[Dataset]

List of datasets.

Source code in datatrail/query/query_api.py
Python
async def get_dataset_by_file_replica_deletion_status(
    session: Union[AsyncSession, sessionmaker],
    status: str,
    storage_name: str,
    num_datasets: int = 10,
) -> List[Dataset]:
    """Find datasets by file replica deletion status.

    Args:
        session (Union[AsyncSession, sessionmaker]): Postgres session.
        status (str): Deletion status.
        storage_name (str): Storage name.
        num_datasets (int, optional): Number of datasets.

    Returns:
        datasets (List[Dataset]): List of datasets.
    """
    try:
        storage_map = db_queries.get_storage_map(session)
        storage_id = storage_map[storage_name]
        datasets = db_queries.fetch_datasets_by_file_replica_deletion_status(
            session, status, storage_id, num_datasets
        )
        return datasets
    except Exception as error:  # pragma: no cover
        session.rollback()
        log.error(str(error))
        raise error

get_dataset_children async

Python
get_dataset_children(session: Union[AsyncSession, sessionmaker], scope: str, name: str, test_mode: bool = False) -> Union[Dict[str, Any], str]

Get dataset's children.

Source code in datatrail/query/query_api.py
Python
async def get_dataset_children(
    session: Union[AsyncSession, sessionmaker],
    scope: str,
    name: str,
    test_mode: bool = False,
) -> Union[Dict[str, Any], str]:
    """Get dataset's children."""
    try:
        d = db_queries.fetch_dataset(session, name, scope)
        children = [child.name for child in d.datasets]
        return {"name": d.name, "scope": d.scope, "contains": children}
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

get_dataset_deletion_date async

Python
get_dataset_deletion_date(session: Union[AsyncSession, sessionmaker], dataset: str) -> Dict[str, Dict[str, Any]]

List deletion date and state for dataset in all scopes dataset exists in.

Parameters:

Name Type Description Default
session Union[AsyncSession, sessionmaker]

Postgres session.

required
dataset str

Name of dataset.

required

Returns:

Name Type Description
deletion_info Dict[str, Dict[str, Any]]

Dictionary of scope containing storage elements contain deletion date and state.

Source code in datatrail/query/query_api.py
Python
async def get_dataset_deletion_date(
    session: Union[AsyncSession, sessionmaker], dataset: str
) -> Dict[str, Dict[str, Any]]:
    """List deletion date and state for dataset in all scopes dataset exists in.

    Args:
        session: Postgres session.
        dataset (str): Name of dataset.

    Returns:
        deletion_info: Dictionary of scope containing storage elements contain deletion
            date and state.
    """
    try:
        deletion_info: Dict[str, Dict[str, Any]] = {}
        storage_map = db_queries.get_storage_map(session)
        scopes = db_queries.fetch_all_dataset_scopes(session)
        datasets: Dict[str, Any] = {}
        for scope in scopes:
            ds = db_queries.fetch_dataset(session, dataset, scope)
            if ds is not None:
                datasets[scope] = ds

        for scope, ds in zip(datasets.keys(), datasets.values()):
            for f in ds.files:
                for fr in f.file_replicas:
                    se = storage_map[fr.storage_id]
                    if scope not in deletion_info.keys():
                        deletion_info[scope] = {}
                    if se not in deletion_info[scope].keys():
                        deletion_info[scope][se] = {
                            "deletion_date": fr.delete_after,
                            "deletion_state": fr.deletion_state.name,
                        }
                    elif deletion_info[scope][se]["deletion_date"] > fr.delete_after:
                        deletion_info[scope][se]["deletion_date"] = fr.delete_after
                        deletion_info[scope][se][
                            "deletion_state"
                        ] = fr.deletion_state.name
                    else:
                        pass

        for scope in deletion_info.keys():
            for se in deletion_info[scope].keys():
                deletion_info[scope][se]["deletion_date"] = deletion_info[scope][se][
                    "deletion_date"
                ].isoformat()
        return deletion_info

    except Exception as error:  # pragma: no cover
        session.rollback()
        log.error(str(error))
        raise error

get_dataset_scopes async

Python
get_dataset_scopes(session: Union[AsyncSession, sessionmaker], dataset_name: str = '', test_mode: bool = False) -> Union[List[Dict[str, Any]], str]

Get all scopes.

Source code in datatrail/query/query_api.py
Python
async def get_dataset_scopes(
    session: Union[AsyncSession, sessionmaker],
    dataset_name: str = "",
    test_mode: bool = False,
) -> Union[List[Dict[str, Any]], str]:
    """Get all scopes."""
    try:
        if dataset_name:
            return db_queries.fetch_dataset_scopes(session, dataset_name)
        else:
            return db_queries.fetch_all_dataset_scopes(session)
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

get_datasets async

Python
get_datasets(session: Union[AsyncSession, sessionmaker], scope: str, test_mode: bool = False) -> Union[List[Dict[str, Any]], str]

Get all datasets for a scope.

Source code in datatrail/query/query_api.py
Python
async def get_datasets(
    session: Union[AsyncSession, sessionmaker], scope: str, test_mode: bool = False
) -> Union[List[Dict[str, Any]], str]:
    """Get all datasets for a scope."""
    try:
        datasets = []
        for d in db_queries.fetch_datasets(session, scope):
            item = {}
            item["name"] = d.name
            item["scope"] = d.scope
            item["replication_policy"] = d.replication_policy
            item["deletion_policy"] = d.deletion_policy
            item["num_files"] = len(d.files)
            item["belongs_to"] = []
            for i in d.belongs_to:
                item["belongs_to"].append({"scope": i.scope, "name": i.name})
            datasets.append(item)
        return datasets
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

get_datasets_files async

Python
get_datasets_files(session: Union[AsyncSession, sessionmaker], payload: Dict[str, Any]) -> Union[Dict[str, Any], str]

Get all datasets and files.

Source code in datatrail/query/query_api.py
Python
async def get_datasets_files(
    session: Union[AsyncSession, sessionmaker], payload: Dict[str, Any]
) -> Union[Dict[str, Any], str]:
    """Get all datasets and files."""
    try:
        output: Dict[str, Any] = {}
        if "dataset_scope" not in payload or "dataset_names" not in payload:
            return output
        if len(payload["dataset_names"]) == 0:
            num_datasets = db_queries.count_datasets(session, payload["dataset_scope"])
            datasets = []
            for d in db_queries.fetch_datasets_limit(
                session, payload["dataset_scope"], num=50
            ):
                item = {}
                item["name"] = d.name
                item["scope"] = d.scope
                item["replication_policy"] = d.replication_policy
                item["deletion_policy"] = d.deletion_policy
                item["num_files"] = len(d.files)
                item["belongs_to"] = []
                for i in d.belongs_to:
                    item["belongs_to"].append({"scope": i.scope, "name": i.name})
                datasets.append(item)
            output["num_datasets"] = num_datasets
            output["datasets"] = datasets
        elif len(payload["dataset_names"]) > 1:
            datasets = []
            for d in db_queries.fetch_datasets_in(
                session, payload["dataset_scope"], payload["dataset_names"][:50]
            ):
                item = {}
                item["name"] = d.name
                item["scope"] = d.scope
                item["replication_policy"] = d.replication_policy
                item["deletion_policy"] = d.deletion_policy
                item["num_files"] = len(d.files)
                item["belongs_to"] = []
                for i in d.belongs_to:
                    item["belongs_to"].append({"scope": i.scope, "name": i.name})
                datasets.append(item)
            output["datasets"] = datasets
        else:  # only one name
            d = db_queries.fetch_dataset(
                session, payload["dataset_names"][0], payload["dataset_scope"]
            )
            item = {}
            item["name"] = d.name
            item["scope"] = d.scope
            item["replication_policy"] = d.replication_policy
            item["deletion_policy"] = d.deletion_policy
            item["num_files"] = len(d.files)
            item["belongs_to"] = []
            for i in d.belongs_to:
                item["belongs_to"].append({"scope": i.scope, "name": i.name})
            output["datasets"] = [item]
            files = d.files
            items = []
            for f in files:
                item = {}
                item["name"] = f.name
                item["size_gb"] = f.size_bytes / 1024**3
                item["num_replicas"] = len(f.file_replicas)
                item["replica_info"] = []
                for fr in f.file_replicas:
                    ri = {}
                    ri["storage_element"] = db_queries.fetch_storage_element_by_id(
                        session, fr.storage_id
                    ).name
                    ri["delete_after"] = None
                    if fr.delete_after:
                        ri["delete_after"] = fr.delete_after.isoformat().split("T")[0]
                    ri["replicate_to"] = db_queries.fetch_storage_element_by_id(
                        session, fr.replicate_to
                    )
                    ri = check_replication(ri)
                    ri["quality"] = "corrupt"
                    ri["quality"] = compare_checksum(fr, f)
                    ri["deletion_state"] = fr.deletion_state.name
                    item["replica_info"].append(ri)
                items.append(item)
            output["files"] = items
        return output
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

get_file_replicas_by_deletion_status async

Python
get_file_replicas_by_deletion_status(session: Union[AsyncSession, sessionmaker], status: str, storage_name: str, num_replicas: int = 1000) -> List[FileReplica]

Find file replicas by deletion status.

Source code in datatrail/query/query_api.py
Python
async def get_file_replicas_by_deletion_status(
    session: Union[AsyncSession, sessionmaker],
    status: str,
    storage_name: str,
    num_replicas: int = 1000,
) -> List[FileReplica]:
    """Find file replicas by deletion status."""
    try:
        storage_map = db_queries.get_storage_map(session)
        storage_id = storage_map[storage_name]
        replicas = db_queries.fetch_file_replicas_by_deletion_status_at_storage_element(
            session, status, storage_id, num_replicas
        )
        return replicas

    except Exception as error:  # pragma: no cover
        session.rollback()
        log.error(str(error))
        raise error

get_file_replicas_by_replication_status async

Python
get_file_replicas_by_replication_status(session: Union[AsyncSession, sessionmaker], status: str, storage_name: str, replicate_to: str, num_replicas: int = 1000) -> List[FileReplica]

Find file replicas by replication status.

Useful for finding replicas that are at replicate_to, but do not have an entry in the Datatrail database.

Source code in datatrail/query/query_api.py
Python
async def get_file_replicas_by_replication_status(
    session: Union[AsyncSession, sessionmaker],
    status: str,
    storage_name: str,
    replicate_to: str,
    num_replicas: int = 1000,
) -> List[FileReplica]:
    """Find file replicas by replication status.

    Useful for finding replicas that are at replicate_to, but do not have an entry in the
    Datatrail database.
    """
    try:
        storage_map = db_queries.get_storage_map(session)
        storage_id = storage_map[storage_name]
        replicate_to_id = storage_map[replicate_to]
        replicas = (
            db_queries.fetch_file_replicas_by_replication_status_at_storage_element(
                session, status, storage_id, replicate_to_id, num_replicas
            )
        )
        return replicas

    except Exception as error:  # pragma: no cover
        session.rollback()
        log.error(str(error))
        raise error

get_files async

Python
get_files(session: Union[AsyncSession, sessionmaker], scope: str, name: str, test_mode: bool = False) -> Union[List[Dict[str, Any]], str]

Fetch files from a dataset.

Source code in datatrail/query/query_api.py
Python
async def get_files(
    session: Union[AsyncSession, sessionmaker],
    scope: str,
    name: str,
    test_mode: bool = False,
) -> Union[List[Dict[str, Any]], str]:
    """Fetch files from a dataset."""
    try:
        files = db_queries.fetch_files_from_dataset(session, name, scope)
        items = []
        for f in files:
            item = {}
            item["name"] = f.name
            item["md5sum"] = f.md5sum
            item["size_bytes"] = f.size_bytes
            item["num_replicas"] = len(f.file_replicas)
            item["preferred_storage_elements"] = f.preferred_storage_elements
            items.append(item)
        return items
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

get_larger_datasets async

Python
get_larger_datasets(session: Union[AsyncSession, sessionmaker], scope: str, test_mode: bool = False)

Get the larger datasets for a given scope.

Parameters:

Name Type Description Default
session Union[AsyncSession, sessionmaker]

Postgres session.

required
scope str

Name of scope.

required
test_mode bool

Activate test mode.

False
Source code in datatrail/query/query_api.py
Python
async def get_larger_datasets(
    session: Union[AsyncSession, sessionmaker],
    scope: str,
    test_mode: bool = False,
):
    """Get the larger datasets for a given scope.

    Args:
        session (Union[AsyncSession, sessionmaker]): Postgres session.
        scope (str): Name of scope.
        test_mode (bool): Activate test mode.
    """
    try:
        dss = db_queries.fetch_larger_datasets(session, scope)
        larger_datasets = [ds.name for ds in dss]
        return {"scope": scope, "larger_datasets": larger_datasets}
    except Exception as error:  # pragma: no cover
        session.rollback()
        log.error(error)
        return str(error)

get_state_updated_after

Python
get_state_updated_after(duration: str)

Get state updated after date.

Source code in datatrail/query/query_api.py
Python
def get_state_updated_after(duration: str):
    """Get state updated after date."""
    now = datetime.datetime.utcnow()
    if duration == "month":
        diff = datetime.timedelta(days=30)
    elif duration == "week":
        diff = datetime.timedelta(days=7)
    elif duration == "day":
        diff = datetime.timedelta(days=1)
    elif duration == "hour":
        diff = datetime.timedelta(hours=1)
    else:
        return None
    return pytz.utc.localize(now - diff)

get_storage_elements async

Python
get_storage_elements(session: Union[AsyncSession, sessionmaker]) -> Union[List[Dict[str, Any]], str]

Get all storage elements.

Source code in datatrail/query/query_api.py
Python
async def get_storage_elements(
    session: Union[AsyncSession, sessionmaker],
) -> Union[List[Dict[str, Any]], str]:
    """Get all storage elements."""
    try:
        elements = []
        for elem in db_queries.fetch_storage_elements(session):
            item = {}
            item["name"] = elem.name
            item["storage_type"] = elem.storage_type.name
            item["protocol"] = elem.protocol.name
            item["root"] = elem.root
            item["active"] = elem.active
            item["address"] = elem.address
            item["total_gb"] = elem.total_gb
            item["available_gb"] = elem.available_gb
            item["min_available_gb"] = elem.min_available_gb
            item[
                "available_gb_last_checked"
            ] = elem.available_gb_last_checked.isoformat()
            elements.append(item)
        return elements
    except Exception as e:  # pragma: no cover
        session.rollback()
        log.error(str(e))
        return str(e)

query_tsar_verification

Python
query_tsar_verification(event_number: int) -> str

Fetch tsar verification from frb-master's verifications.

Source code in datatrail/query/query_api.py
Python
def query_tsar_verification(event_number: int) -> str:
    """Fetch tsar verification from frb-master's verifications."""
    try:
        backend = chime_frb_api.frb_master.FRBMaster()
        verification = backend.API.get(
            f"/v1/verification/get-verification/{event_number}"
        )
    except Exception as e:
        log.error(f"Failed to get tsar classification for event: {event_number}")
        log.error(str(e))
        return ""
    if verification is None:
        return "event-missing-verification"
    user_verifications = verification.get("user_verification", [])
    ratings = []
    for uv in user_verifications:
        if uv["id"] == "l4_pipeline":
            continue
        if uv["classification"] == "KNOWN SOURCE" and uv["comments"] in "known pulsar":
            ratings.append("classified.PULSAR")
        elif uv["classification"] == "FAINT":
            ratings.append("classified.NOISE")
        elif uv["classification"] == "RFI":
            ratings.append("classified.RFI")
        else:
            # Even if one person thinks that this is an FRB,
            # save it for safety (less agreessive action).
            return "classified.FRB"
    if len(ratings) < 2:
        return "pending-tsar-classification"
    if "classified.PULSAR" in ratings:
        return "classified.PULSAR"
    elif "classified.NOISE" in ratings:
        return "classified.NOISE"
    return "classified.RFI"