Skip to content

Protocol API

CADC

datatrail_admin.protocols.cadcclient

Class to facilitate data transfer on CANFAR using the CADC tools.

CADCClient

Class for CANFAR CADC API.

delete
Python
delete(file: List[str], certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', verbose: int = 0)

Delete a file from the CANFAR file server.

Parameters:

Name Type Description Default
file List[str]

List of source files to delete.

required
certfile Optional[str]

Certificate. Defaults to None.

None
namespace str

Minoc Namespace. Defaults to "cadc:CHIMEFRB".

'cadc:CHIMEFRB'
verbose int

Verbosity level. Defaults to 0.

0
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
Python
def delete(
    self,
    file: List[str],
    certfile: Optional[str] = None,
    namespace: str = "cadc:CHIMEFRB",
    verbose: int = 0,
):
    """Delete a file from the CANFAR file server.

    Args:
        file (List[str]): List of source files to delete.
        certfile (Optional[str], optional): Certificate. Defaults to None.
        namespace (str): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
        verbose (int): Verbosity level. Defaults to 0.
    """
    # Set logging level.
    logger.setLevel("WARNING")
    if verbose == 1:
        logger.setLevel("INFO")
    elif verbose > 1:
        logger.setLevel("DEBUG")

    logger.info("Connecting to CADC...")
    _, storage, _ = self._connect(certfile=certfile)
    not_found: List[str] = []
    try:
        for index, filename in enumerate(file):
            try:
                filename = namespace + "/" + filename
                storage.cadcremove(filename)  # type: ignore
                logger.debug(f"Deleted {filename} ✔")
            except cadcutils.exceptions.NotFoundException as error:  # type: ignore
                logger.error(f"CADC Exception: {filename}")
                not_found.append(str(error))
    except cadcutils.exceptions.HttpException as error:  # type: ignore
        logger.error(f"CADC Exception: {error}")
        raise error
    except Exception as error:
        logger.error(f"Error: {error}")
        raise error
    if len(not_found) > 0:
        logger.error(f"Number of files not found: {len(not_found)}")
        logger.error(f"Not found: {not_found}")
    logger.info(f"Process {os.getpid()} finished.")
exist
Python
exist(file: str, certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', verbose: int = 0)

Check if a file exists.

Parameters:

Name Type Description Default
file str

File to check.

required
certfile Optional[str]

Certificate. Defaults to None.

None
namespace str

Minoc Namespace. Defaults to "cadc:CHIMEFRB".

'cadc:CHIMEFRB'
verbose int

Verbosity. Defaults to 0.

0

Returns:

Name Type Description
bool

True if file exists.

Example:

Text Only
>>> exists("/data/chime/intensity/raw/2023/01/01/")
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
Python
def exist(
    self,
    file: str,
    certfile: Optional[str] = None,
    namespace: str = "cadc:CHIMEFRB",
    verbose: int = 0,
):
    """Check if a file exists.

    Args:
        file (str): File to check.
        certfile (Optional[str], optional): Certificate. Defaults to None.
        namespace (str, optional): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
        verbose (int, optional): Verbosity. Defaults to 0.

    Returns:
        bool: True if file exists.

    Example:

        >>> exists("/data/chime/intensity/raw/2023/01/01/")
    """
    # Set logging level.
    logger.setLevel("WARNING")
    if verbose == 1:
        logger.setLevel("INFO")
    elif verbose > 1:
        logger.setLevel("DEBUG")

    _, storageClient, _ = self._connect(certfile=certfile)
    file_uri = namespace + "/" + file

    try:
        storageClient.cadcinfo(file_uri)
        existFlag = True
    except cadcutils.exceptions.NotFoundException:
        existFlag = False
    return existFlag
get
Python
get(source: List[str], destination: List[str], certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', verbose: int = 0)

Retrieve a file, stored on the CANFAR file server, and copy it locally.

Parameters:

Name Type Description Default
source List[str]

List of source files to retrieve.

required
destination List[str]

List of destination files to copy to.

required
certfile Optional[str]

Certificate. Defaults to None.

None
namespace str

Minoc Namespace. Defaults to "cadc:CHIMEFRB".

'cadc:CHIMEFRB'
verbose int

Verbosity level. Defaults to 0.

0
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
Python
def get(
    self,
    source: List[str],
    destination: List[str],
    certfile: Optional[str] = None,
    namespace: str = "cadc:CHIMEFRB",
    verbose: int = 0,
):
    """Retrieve a file, stored on the CANFAR file server, and copy it locally.

    Args:
        source (List[str]): List of source files to retrieve.
        destination (List[str]): List of destination files to copy to.
        certfile (Optional[str], optional): Certificate. Defaults to None.
        namespace (str): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
        verbose (int): Verbosity level. Defaults to 0.
    """
    # Set logging level.
    logger.setLevel("WARNING")
    if verbose == 1:
        logger.setLevel("INFO")
    elif verbose > 1:
        logger.setLevel("DEBUG")

    logger.info("Connecting to CADC...")
    _, storage, _ = self._connect(certfile=certfile)
    not_found: List[str] = []
    try:
        logger.debug("Checking source and destination length match.")
        logger.debug(f"Source length: {len(source)}")
        logger.debug(f"Destination length: {len(destination)}")
        assert len(source) == len(destination), (
            "The number of source files must match the number of destination files."
            f"Got {len(source)} source files and {len(destination)} destination files."  # noqa
        )
        for index, filename in enumerate(source):
            try:
                filename = namespace + "/" + filename
                storage.cadcget(filename, destination[index])  # type: ignore
                logger.debug(f"{filename}{destination[index]} ✔")
            except cadcutils.exceptions.NotFoundException as error:  # type: ignore
                logger.error(f"CADC Exception: {filename}")
                not_found.append(str(error))
    except cadcutils.exceptions.HttpException as error:  # type: ignore
        logger.error(f"CADC Exception: {error}")
        raise error
    except Exception as error:
        logger.error(f"Error: {error}")
        raise error
    if len(not_found) > 0:
        logger.error(f"Number of files not found: {len(not_found)}")
        logger.error(f"Not found: {not_found}")
    logger.info(f"Process {os.getpid()} finished.")
info
Python
info(filename: str, certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', verbose: int = 0) -> Tuple[Any, Any, Any, Any, Any, Any, Any]

Get the metadata for a list of files.

Parameters:

Name Type Description Default
filenames List[str]

List of filenames to get metadata for.

required
certfile Optional[str]

Certificate. Defaults to None.

None
namespace _type_

Minoc Namespace. Defaults to "cadc:CHIMEFRB".

'cadc:CHIMEFRB'
aggregate bool

Aggregate the results. Defaults to False.

required
verbose int

Verbosity level. Defaults to 0.

0

Returns:

Type Description
Tuple[Any, Any, Any, Any, Any, Any, Any]

Tuple[Any]: Metadata for the files.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
Python
def info(
    self,
    filename: str,
    certfile: Optional[str] = None,
    namespace: str = "cadc:CHIMEFRB",
    verbose: int = 0,
) -> Tuple[Any, Any, Any, Any, Any, Any, Any]:
    """Get the metadata for a list of files.

    Args:
        filenames (List[str]): List of filenames to get metadata for.
        certfile (Optional[str], optional): Certificate. Defaults to None.
        namespace (_type_, optional): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
        aggregate (bool, optional): Aggregate the results. Defaults to False.
        verbose (int, optional): Verbosity level. Defaults to 0.

    Returns:
        Tuple[Any]: Metadata for the files.
    """
    # Set logging level.
    logger.setLevel("WARNING")
    if verbose == 1:
        logger.setLevel("INFO")
    elif verbose > 1:
        logger.setLevel("DEBUG")

    _, storageClient, _ = self._connect(certfile=certfile)
    information: List[Dict[str, Any]] = []
    uri = namespace + "/" + filename
    logger.info(f"Getting info for {uri}.")
    try:
        information = storageClient.cadcinfo(uri)
    except cadcutils.exceptions.NotFoundException as error:
        logger.debug(f"CADC Exception: {error}")
        raise error
    # Parse the info. that is returned by CANFAR and return them as variables.
    file_id = vars(information)["id"]  # String
    file_name = vars(information)["name"]  # String
    file_size = vars(information)["size"]  # String
    file_type = vars(information)["file_type"]  # String
    file_encoding = vars(information)["encoding"]  # String
    file_last_modified = vars(information)["lastmod"]  # Datetime object
    file_md5sum = vars(information)["md5sum"]  # String
    return (
        file_id,
        file_name,
        file_size,
        file_type,
        file_encoding,
        file_last_modified,
        file_md5sum,
    )
pdelete
Python
pdelete(file: List[str], certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', processors: int = os.cpu_count() or 1, verbose: int = 0)

Parallelly delete files from the CANFAR file server.

Parameters:

Name Type Description Default
file List[str]

List of source files to delete.

required
certfile Optional[str]

Certificate. Defaults to None.

None
namespace _type_

Minoc Namespace. Defaults to "cadc:CHIMEFRB".

'cadc:CHIMEFRB'
processors int

Number of processes to use. Defaults to os.cpu_count() or 1.

cpu_count() or 1
verbose int

Verbosity level. Defaults to 0.

0
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
Python
def pdelete(
    self,
    file: List[str],
    certfile: Optional[str] = None,
    namespace: str = "cadc:CHIMEFRB",
    processors: int = os.cpu_count() or 1,
    verbose: int = 0,
):
    """Parallelly delete files from the CANFAR file server.

    Args:
        file (List[str]): List of source files to delete.
        certfile (Optional[str], optional): Certificate. Defaults to None.
        namespace (_type_, optional): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
        processors (int, optional): Number of processes to use.
            Defaults to os.cpu_count() or 1.
        verbose (int, optional): Verbosity level. Defaults to 0.
    """
    # Set logging level.
    logger.setLevel("WARNING")
    if verbose == 1:
        logger.setLevel("INFO")
    elif verbose > 1:
        logger.setLevel("DEBUG")

    files: List[List[Any]] = split(file, processors)
    logger.info(f"Starting {processors} processes.")
    processes: List[DillProcess] = []
    for process in range(processors):
        mp = DillProcess(
            target=self.delete,
            args=(
                files[process],
                certfile,
                namespace,
                verbose,
            ),
        )
        processes.append(mp)
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
pget
Python
pget(source: List[str], destination: List[str], certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', processors: int = os.cpu_count() or 1, verbose: int = 0)

Parallelly retrieve files, stored at CANFAR file server, and copy it locally.

Parameters:

Name Type Description Default
source List[str]

List of source files to retrieve.

required
destination List[str]

List of destination files to copy to.

required
certfile Optional[str]

Certificate. Defaults to None.

None
namespace _type_

Minoc Namespace. Defaults to "cadc:CHIMEFRB".

'cadc:CHIMEFRB'
processors int

Number of processes to use. Defaults to os.cpu_count() or 1.

cpu_count() or 1
verbose int

Verbosity level. Defaults to 0.

0
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
Python
def pget(
    self,
    source: List[str],
    destination: List[str],
    certfile: Optional[str] = None,
    namespace: str = "cadc:CHIMEFRB",
    processors: int = os.cpu_count() or 1,
    verbose: int = 0,
):
    """Parallelly retrieve files, stored at CANFAR file server, and copy it locally.

    Args:
        source (List[str]): List of source files to retrieve.
        destination (List[str]): List of destination files to copy to.
        certfile (Optional[str], optional): Certificate. Defaults to None.
        namespace (_type_, optional): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
        processors (int, optional): Number of processes to use.
            Defaults to os.cpu_count() or 1.
        verbose (int, optional): Verbosity level. Defaults to 0.
    """
    # Set logging level.
    logger.setLevel("WARNING")
    if verbose == 1:
        logger.setLevel("INFO")
    elif verbose > 1:
        logger.setLevel("DEBUG")

    sources: List[List[Any]] = split(source, processors)
    destinations: List[List[Any]] = split(destination, processors)
    logger.info(f"Starting {processors} processes.")
    processes: List[DillProcess] = []
    for process in range(processors):
        mp = DillProcess(
            target=self.get,
            args=(
                sources[process],
                destinations[process],
                certfile,
                namespace,
                verbose,
            ),
        )
        processes.append(mp)
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
pput
Python
pput(source: List[str], destination: List[str], certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', overwrite: bool = False, processors: int = os.cpu_count() or 1, verbose: int = 0)

Parallelly transfer files, stored locally, to the CANFAR file server.

Parameters:

Name Type Description Default
source List[str]

List of source files to transfer.

required
destination List[str]

List of destination files to transfer to.

required
certfile Optional[str]

Certificate. Defaults to None.

None
namespace _type_

Minoc Namespace. Defaults to "cadc:CHIMEFRB".

'cadc:CHIMEFRB'
overwrite bool

Overwrite existing files.

False
processors int

Number of processes to use. Defaults to os.cpu_count() or 1.

cpu_count() or 1
verbose int

Verbosity level. Defaults to 0.

0
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
Python
def pput(
    self,
    source: List[str],
    destination: List[str],
    certfile: Optional[str] = None,
    namespace: str = "cadc:CHIMEFRB",
    overwrite: bool = False,
    processors: int = os.cpu_count() or 1,
    verbose: int = 0,
):
    """Parallelly transfer files, stored locally, to the CANFAR file server.

    Args:
        source (List[str]): List of source files to transfer.
        destination (List[str]): List of destination files to transfer to.
        certfile (Optional[str], optional): Certificate. Defaults to None.
        namespace (_type_, optional): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
        overwrite (bool): Overwrite existing files.
        processors (int, optional): Number of processes to use.
            Defaults to os.cpu_count() or 1.
        verbose (int, optional): Verbosity level. Defaults to 0.
    """
    # Set logging level.
    logger.setLevel("WARNING")
    if verbose == 1:
        logger.setLevel("INFO")
    elif verbose > 1:
        logger.setLevel("DEBUG")

    sources: List[List[Any]] = split(source, processors)
    destinations: List[List[Any]] = split(destination, processors)
    logger.info(f"Starting {processors} processes.")
    processes: List[DillProcess] = []
    for process in range(processors):
        mp = DillProcess(
            target=self.put,
            args=(
                sources[process],
                destinations[process],
                certfile,
                namespace,
                overwrite,
                verbose,
            ),
        )
        processes.append(mp)
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
put
Python
put(source: List[str], destination: List[str], certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', overwrite: bool = False, verbose: int = 0)

Transfer a file, stored locally, and copy it to the CANFAR file server.

Parameters:

Name Type Description Default
source List[str]

List of source files to transfer.

required
destination List[str]

List of destination files to transfer to.

required
certfile Optional[str]

Certificate. Defaults to None.

None
namespace str

Minoc Namespace. Defaults to "cadc:CHIMEFRB".

'cadc:CHIMEFRB'
overwrite bool

Overwrite existing files.

False
verbose int

Verbosity level. Defaults to 0.

0
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
Python
def put(
    self,
    source: List[str],
    destination: List[str],
    certfile: Optional[str] = None,
    namespace: str = "cadc:CHIMEFRB",
    overwrite: bool = False,
    verbose: int = 0,
):
    """Transfer a file, stored locally, and copy it to the CANFAR file server.

    Args:
        source (List[str]): List of source files to transfer.
        destination (List[str]): List of destination files to transfer to.
        certfile (Optional[str], optional): Certificate. Defaults to None.
        namespace (str): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
        overwrite (bool): Overwrite existing files.
        verbose (int): Verbosity level. Defaults to 0.
    """
    # Set logging level.
    logger.setLevel("WARNING")
    if verbose == 1:
        logger.setLevel("INFO")
    elif verbose > 1:
        logger.setLevel("DEBUG")

    logger.info("Connecting to CADC...")
    _, storage, _ = self._connect(certfile=certfile)
    not_found: List[str] = []
    try:
        logger.debug("Checking source and destination length match.")
        logger.debug(f"Source length: {len(source)}")
        logger.debug(f"Destination length: {len(destination)}")
        assert len(source) == len(destination), (
            "The number of source files must match the number of destination files."
            f"Got {len(source)} source files and {len(destination)} destination files."  # noqa
        )
        for index, filename in enumerate(destination):
            try:
                filename = namespace + "/" + filename
                # type: ignore
                storage.cadcput(filename, source[index], replace=overwrite)
                logger.debug(f"{source[index]}{filename} ✔")
            except cadcutils.exceptions.NotFoundException as error:  # type: ignore
                logger.error(f"CADC Exception: {filename}")
                not_found.append(str(error))
    except cadcutils.exceptions.HttpException as error:  # type: ignore
        logger.error(f"CADC Exception: {error}")
        raise error
    except Exception as error:
        logger.error(f"Error: {error}")
        raise error
    if len(not_found) > 0:
        logger.error(f"Number of files not found: {len(not_found)}")
        logger.error(f"Not found: {not_found}")
    logger.info(f"Process {os.getpid()} finished.")
query
Python
query(directory: str, certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', timeout: int = 60, verbose: int = 0) -> List[str]

Get list of files in a directory.

Parameters:

Name Type Description Default
directory str

Directory to get the size of.

required
certfile str

Certificate file. Defaults to None.

None
namespace str

Minoc Namespace. Defaults to "cadc:CHIMEFRB".

'cadc:CHIMEFRB'
timeout int

Timeout. Defaults to 60.

60
verbose int

Verbosity. Defaults to 0.

0

Returns:

Type Description
List[str]

List[str]: List of files in the directory.

Example

size("/data/chime/intensity/raw/2023/01/01/")

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
Python
def query(
    self,
    directory: str,
    certfile: Optional[str] = None,
    namespace: str = "cadc:CHIMEFRB",
    timeout: int = 60,
    verbose: int = 0,
) -> List[str]:
    """Get list of files in a directory.

    Args:
        directory (str): Directory to get the size of.
        certfile (str, optional): Certificate file. Defaults to None.
        namespace (str, optional): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
        timeout (int, optional): Timeout. Defaults to 60.
        verbose (int, optional): Verbosity. Defaults to 0.

    Returns:
        List[str]: List of files in the directory.

    Example:
        >>> size("/data/chime/intensity/raw/2023/01/01/")
    """
    # Set logging level.
    logger.setLevel("WARNING")
    if verbose == 1:
        logger.setLevel("INFO")
    elif verbose > 1:
        logger.setLevel("DEBUG")

    logger.info("Getting size of {directory}...")
    query = f"select * from inventory.Artifact where uri like '{namespace}/{directory}%'"  # noqa
    query = query.replace("//", "/")
    logger.info(f"Running query: {query}")
    buffer = StringIO()
    sys.stdout = buffer
    _, _, queryClient = self._connect(certfile=certfile)
    queryClient.query(  # type: ignore
        query=query,
        output_file=None,
        response_format="csv",
        tmptable=None,
        lang="ADQL",
        timeout=timeout,
        data_only=True,
        no_column_names=True,
    )
    content = buffer.getvalue()
    sys.stdout = sys.__stdout__
    return [line.split(",")[0] for line in content.split("\n")]
size
Python
size(directory: str, certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', timeout: int = 60, verbose: int = 0) -> float

Get the size of a directory in GB.

Parameters:

Name Type Description Default
directory str

Directory to get the size of.

required
certfile Optional[str]

Certificate. Defaults to None.

None
namespace _type_

Minoc Namespace. Defaults to "cadc:CHIMEFRB".

'cadc:CHIMEFRB'
timeout int

Timeout. Defaults to 60.

60
verbose int

Verbosity level. Defaults to 0.

0

Returns:

Name Type Description
float float

Size of the directory in GB.

Example

size("/data/chime/intensity/raw/2023/01/01/")

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
Python
def size(
    self,
    directory: str,
    certfile: Optional[str] = None,
    namespace: str = "cadc:CHIMEFRB",
    timeout: int = 60,
    verbose: int = 0,
) -> float:
    """Get the size of a directory in GB.

    Args:
        directory (str): Directory to get the size of.
        certfile (Optional[str], optional): Certificate. Defaults to None.
        namespace (_type_, optional): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
        timeout (int, optional): Timeout. Defaults to 60.
        verbose (int, optional): Verbosity level. Defaults to 0.

    Returns:
        float: Size of the directory in GB.

    Example:
        >>> size("/data/chime/intensity/raw/2023/01/01/")
    """
    # Set logging level.
    logger.setLevel("WARNING")
    if verbose == 1:
        logger.setLevel("INFO")
    elif verbose > 1:
        logger.setLevel("DEBUG")

    logger.info("Getting size of {directory}...")
    query = f"select sum(contentLength/1024.0/1024.0/1024.0) as numGB from inventory.Artifact where uri like '{namespace}/{directory}%'"  # noqa
    query = query.replace("//", "/")
    logger.info(f"Running query: {query}")
    buffer = StringIO()
    sys.stdout = buffer
    _, _, queryClient = self._connect(certfile=certfile)
    queryClient.query(  # type: ignore
        query=query,
        output_file=None,
        response_format="csv",
        tmptable=None,
        lang="ADQL",
        timeout=timeout,
        data_only=True,
        no_column_names=True,
    )
    content = buffer.getvalue()
    sys.stdout = sys.__stdout__
    return float(content.split("\n")[0])

DillProcess

Python
DillProcess(*args: Any, **kwargs: Any)

Bases: Process

A Process class that uses dill to serialize the target function before execution.

Parameters:

Name Type Description Default
Process object

Python Process class.

required

Initialize the DillProcess class.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
Python
def __init__(self, *args: Any, **kwargs: Any):
    """Initialize the DillProcess class."""
    super().__init__(*args, **kwargs)
    self._target = dill.dumps(self._target)  # type: ignore
run
Python
run()

Run the DillProcess.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
Python
def run(self):
    """Run the DillProcess."""
    if self._target:
        self._target = dill.loads(self._target)  # type: ignore
        self._target(*self._args, **self._kwargs)  # type: ignore

split

Python
split(data: List[Any], count: int) -> List[List[Any]]

Split a list into batches.

Parameters:

Name Type Description Default
data List[Any]

List to split.

required
count int

Number of batches to split into.

required

Returns:

Type Description
List[List[Any]]

List[List[Any]]: List of batches.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
Python
def split(data: List[Any], count: int) -> List[List[Any]]:
    """Split a list into batches.

    Args:
        data (List[Any]): List to split.
        count (int): Number of batches to split into.

    Returns:
        List[List[Any]]: List of batches.
    """
    batch_size = len(data) // count
    remainder = len(data) % count
    batches: List[Any] = []
    idx = 0
    for i in range(count):
        if i < remainder:
            batch = data[idx: idx + batch_size + 1]  # noqa: E203
            idx += batch_size + 1
        else:
            batch = data[idx: idx + batch_size]  # noqa: E203
            idx += batch_size
        if len(batch) > 0:
            batches.append(batch)
    return batches

POSIX

datatrail_admin.protocols.posix

Implement POSIX protocols to do the following.

  1. delete: delete a file on the storage element.
  2. push: replicate a file to a remote destination.
  3. pull: replicate a file from a remote location.
  4. move: move a file from one local location to another.
  5. copy: copy a file from one local location to another.

PosixApi

Python
PosixApi(**kwargs)

Bases: Protocol

Class for POSIX API.

Attributes

Protocol : Protocol

Methods

delete(path: Union[str, Path]) Delete a file on the local storage element.

push(source: Union[str, Path], destination: Union[str, Path]) Push the file to the remote destination.

pull(source: Union[str, Path], destination: Union[str, Path]) Pull the file from the remote destination.

move(source, destination) Move a file from one local location to another.

copy(source, destination) Copy a file from one local location to another.

Initialize the class.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/posix.py
Python
def __init__(self, **kwargs) -> None:
    """Initialize the class."""
    super().__init__()
copy
Python
copy(source: Path, destination: Path) -> None

Copy a file locally from one location to another.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/posix.py
Python
def copy(self, source: Path, destination: Path) -> None:
    """Copy a file locally from one location to another."""
    if type(source) is not Path:
        source = Path(source)
    if type(destination) is not Path:
        destination = Path(destination)
    if source.is_file():
        shutil.copy(source, destination)
    else:
        shutil.copytree(source, destination)
delete
Python
delete(path: Path) -> None

Delete a file on the storage element.

Parameters

path : Union[str, Path] Path for the file or directory to be deleted.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/posix.py
Python
def delete(self, path: Path) -> None:
    """Delete a file on the storage element.

    Parameters
    ----------
    path : Union[str, Path]
        Path for the file or directory to be deleted.
    """
    if type(path) is not Path:
        path = Path(path)
    if ":" in path.as_posix():
        host, split_path = path.as_posix().split(":")
        connection = Connection(host)
        connection.run(f"rm -rf {split_path}")
    else:
        subprocess.call(["rm", "-rf", path.as_posix()])
mkdir
Python
mkdir(path: Path) -> None

Make a directory on the local storage element.

Parameters

path : Union[str, Path] Path for the new directory.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/posix.py
Python
def mkdir(self, path: Path) -> None:
    """Make a directory on the local storage element.

    Parameters
    ----------
    path : Union[str, Path]
        Path for the new directory.
    """
    if type(path) is not Path:
        path = Path(path)
    path.mkdir(parents=True, exist_ok=False)
move
Python
move(source, destination)

Move a file locally from one location to another.

Parameters

source : Union[str, Path] Source path - local host. destination : Union[str, Path] Destination path - local host.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/posix.py
Python
def move(self, source, destination):
    """Move a file locally from one location to another.

    Parameters
    ----------
    source : Union[str, Path]
        Source path - local host.
    destination : Union[str, Path]
        Destination path - local host.
    """
    if type(source) is not Path:
        source = Path(source)
    if type(destination) is not Path:
        destination = Path(destination)
    shutil.move(source, destination)
pull
Python
pull(source: Path, destination: Path) -> None

Pull the file from the remote destination.

Use rsync to pull the file from the remote destination.

Parameters

source : Union[str, Path] Source path - remote host. destination : Union[str, Path] Destination path - local host.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/posix.py
Python
def pull(self, source: Path, destination: Path) -> None:
    """Pull the file from the remote destination.

    Use rsync to pull the file from the remote destination.

    Parameters
    ----------
    source : Union[str, Path]
        Source path - remote host.
    destination : Union[str, Path]
        Destination path - local host.
    """
    if type(source) is not Path:
        source = Path(source)
    if type(destination) is not Path:
        destination = Path(destination)
    subprocess.call(["rsync", "-azPR", source.as_posix(), destination.as_posix()])
push
Python
push(source: Path, destination: Path) -> None

Push the file to the remote destination.

Use rsync to push the file to the remote destination.

Parameters

source : Union[str, Path] Source path - local host. destination : Union[str, Path] Destination path - remote host.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/posix.py
Python
def push(self, source: Path, destination: Path) -> None:
    """Push the file to the remote destination.

    Use rsync to push the file to the remote destination.

    Parameters
    ----------
    source : Union[str, Path]
        Source path - local host.
    destination : Union[str, Path]
        Destination path - remote host.
    """
    if type(source) is not Path:
        source = Path(source)
    if type(destination) is not Path:
        destination = Path(destination)
    assert source.exists(), "Source file does not exist."
    subprocess.call(["rsync", "-azPR", source.as_posix(), destination.as_posix()])

VOS

datatrail_admin.protocols.vosapi

Implement VOSpace protocols to do the following.

  1. delete: delete a file on the storage element.
  2. push: replicate a file to a remote destination.
  3. pull: replicate a file from a remote location.
  4. move: move a file from one local location to another.
  5. copy: copy a file from one local location to another.

VosApi

Python
VosApi(**kwargs)

Bases: Protocol

Class for VOSpace API.

Initialize the VOSpace API.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
Python
def __init__(self, **kwargs):
    """Initialize the VOSpace API."""
    super().__init__(**kwargs)
    self.client = self.connect()
chmod
Python
chmod(node, mode)

Change global permissions of the file/directory.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
Python
def chmod(self, node, mode):
    """Change global permissions of the file/directory."""
    assert node.startswith("arc:") or node.startswith("vos:")
    self.client.get_node(node).chmod(mode)
connect
Python
connect()

Connect to the VOSpace file server.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
Python
def connect(self):
    """Connect to the VOSpace file server."""
    return Client(vospace_certfile=CERTFILE)
copy
Python
copy(source, destination)

Copy file/directory within VOSpace.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
Python
def copy(self, source, destination):
    """Copy file/directory within VOSpace."""
    assert (
        source.startswith("arc:")
        or source.startswith("vos:")
        or destination.startswith("arc:")
        or destination.startswith("vos:")
    )
    self.client.copy(source, destination)
delete
Python
delete(node)

Delete the VOSpace node and everything in it.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
Python
def delete(self, node):
    """Delete the VOSpace node and everything in it."""
    self.client.delete(node)
exists
Python
exists(node)

Check if the file/directory exists on VOSpace.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
Python
def exists(self, node):
    """Check if the file/directory exists on VOSpace."""
    return self.isfile(node) or self.isdir(node)
isdir
Python
isdir(node)

Check if the node is a directory.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
Python
def isdir(self, node):
    """Check if the node is a directory."""
    assert node.startswith("arc:") or node.startswith("vos:")
    return self.client.isdir(node)
isfile
Python
isfile(node)

Check if the node is a file.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
Python
def isfile(self, node):
    """Check if the node is a file."""
    assert node.startswith("arc:") or node.startswith("vos:")
    return self.client.isfile(node)
make_readable_by_chime_frb_ro
Python
make_readable_by_chime_frb_ro(node)

Make the node writable by all of chime frb group.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
Python
def make_readable_by_chime_frb_ro(self, node):
    """Make the node writable by all of chime frb group."""
    assert node.startswith("arc:") or node.startswith("vos:")
    cmd = f"vchmod --certfile {CERTFILE} -R g+r {node} chime-frb-ro"
    os.system(cmd)
make_writable_by_chime_frb_admin
Python
make_writable_by_chime_frb_admin(node)

Make the node writable by chime frb admin group.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
Python
def make_writable_by_chime_frb_admin(self, node):
    """Make the node writable by chime frb admin group."""
    assert node.startswith("arc:") or node.startswith("vos:")
    cmd = f"vchmod --certfile {CERTFILE} -R g+w {node} chime-frb-raw"
    os.system(cmd)
make_writable_by_chime_frb_rw
Python
make_writable_by_chime_frb_rw(node)

Make the node writable by all of chime frb group.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
Python
def make_writable_by_chime_frb_rw(self, node):
    """Make the node writable by all of chime frb group."""
    assert node.startswith("arc:") or node.startswith("vos:")
    cmd = f"vchmod --certfile {CERTFILE} -R g+w {node} chime-frb-rw"
    os.system(cmd)
mkdir
Python
mkdir(node)

Create a directory on VOSpace.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
Python
def mkdir(self, node):
    """Create a directory on VOSpace."""
    assert node.startswith("arc:") or node.startswith("vos:")
    try:
        self.client.mkdir(node)
    except Exception:
        print("Directory already exists on VOSpace.")
move
Python
move(source, destination)

Move file/directory within VOSpace.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
Python
def move(self, source, destination):
    """Move file/directory within VOSpace."""
    assert source.startswith("arc:") or source.startswith("vos:")
    assert destination.startswith("arc:") or destination.startswith("vos:")
    self.client.move(source, destination)
pull
Python
pull(source, destination)

Pull the file/directory from VOSpace to local space.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
Python
def pull(self, source, destination):
    """Pull the file/directory from VOSpace to local space."""
    if source.startswith("/arc/"):
        source = "arc:" + source[5:]
    if destination.startswith("/arc/"):
        destination = "arc:" + destination[5:]
    assert source.startswith("arc:") or source.startswith("vos:")
    starts_with_slash = destination[0] == "/"
    if re.search(r"/", destination):
        dest_dir = destination.split("/")[:-1]
        dest_dir = os.path.join(*dest_dir)
    else:
        dest_dir = destination
    if starts_with_slash:
        dest_dir = "/" + dest_dir
    os.makedirs(dest_dir, mode=0o777, exist_ok=True)
    # Error with the vos python client, using vcp command in meantime.
    # Once fixed can use: self.client.copy(source, destination)
    run(f"vcp --certfile {CERTFILE} {source} {destination}")
    os.system(f"chmod -R 777 {destination}")
push
Python
push(source, destination, n_streams = 5, cache_path = None, verbose = False)

Push the file/directory to VOSpace.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
Python
def push(self, source, destination, n_streams=5, cache_path=None, verbose=False):
    """Push the file/directory to VOSpace."""
    if source.startswith("/arc/"):
        logger.info("Fixing source path '/arc/' to 'arc:'")
        source = "arc:" + source[5:]
    if destination.startswith("/arc/"):
        logger.info("Fixing destination path '/arc/' to 'arc:'")
        destination = "arc:" + destination[5:]
    if cache_path is None:
        logger.info("Setting cache_path to ~/.vsync_cache")
        cache_path = (Path("~") / "vsync_cache").as_posix()
    assert destination.startswith("arc:") or destination.startswith("vos:")
    # Error with the vos python client, using vcp command in meantime.
    # Once fixed can use: self.client.copy(source, destination)
    if verbose:
        run(
            f"vsync --certfile {CERTFILE} -v -r -n {n_streams} --cache_nodes --cache_filename {cache_path} {source} {destination}"  # noqa
        )
    else:
        run(
            f"vsync --certfile {CERTFILE} -r -n {n_streams} --cache_nodes --cache_filename {cache_path} {source} {destination}"  # noqa
        )
sync
Python
sync(source, destination)

Sync the directory structure within VOSpace.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
Python
def sync(self, source, destination):
    """Sync the directory structure within VOSpace."""
    assert source.startswith("arc:") or source.startswith("vos:")
    os.system(
        f"vsync --certfile {self.config['vospace_certfile']} --nstreams 5 {source} {destination}"  # noqa
    )