Protocol API¶
CADC¶
datatrail_admin.protocols.cadcclient ¶
Class to facilitate data transfer on CANFAR using the CADC tools.
CADCClient ¶
Class for CANFAR CADC API.
delete ¶
delete(file: List[str], certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', verbose: int = 0)
Delete a file from the CANFAR file server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file |
List[str]
|
List of source files to delete. |
required |
certfile |
Optional[str]
|
Certificate. Defaults to None. |
None
|
namespace |
str
|
Minoc Namespace. Defaults to "cadc:CHIMEFRB". |
'cadc:CHIMEFRB'
|
verbose |
int
|
Verbosity level. Defaults to 0. |
0
|
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
def delete(
self,
file: List[str],
certfile: Optional[str] = None,
namespace: str = "cadc:CHIMEFRB",
verbose: int = 0,
):
"""Delete a file from the CANFAR file server.
Args:
file (List[str]): List of source files to delete.
certfile (Optional[str], optional): Certificate. Defaults to None.
namespace (str): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
verbose (int): Verbosity level. Defaults to 0.
"""
# Set logging level.
logger.setLevel("WARNING")
if verbose == 1:
logger.setLevel("INFO")
elif verbose > 1:
logger.setLevel("DEBUG")
logger.info("Connecting to CADC...")
_, storage, _ = self._connect(certfile=certfile)
not_found: List[str] = []
try:
for index, filename in enumerate(file):
try:
filename = namespace + "/" + filename
storage.cadcremove(filename) # type: ignore
logger.debug(f"Deleted {filename} ✔")
except cadcutils.exceptions.NotFoundException as error: # type: ignore
logger.error(f"CADC Exception: {filename}")
not_found.append(str(error))
except cadcutils.exceptions.HttpException as error: # type: ignore
logger.error(f"CADC Exception: {error}")
raise error
except Exception as error:
logger.error(f"Error: {error}")
raise error
if len(not_found) > 0:
logger.error(f"Number of files not found: {len(not_found)}")
logger.error(f"Not found: {not_found}")
logger.info(f"Process {os.getpid()} finished.")
exist ¶
exist(file: str, certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', verbose: int = 0)
Check if a file exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file |
str
|
File to check. |
required |
certfile |
Optional[str]
|
Certificate. Defaults to None. |
None
|
namespace |
str
|
Minoc Namespace. Defaults to "cadc:CHIMEFRB". |
'cadc:CHIMEFRB'
|
verbose |
int
|
Verbosity. Defaults to 0. |
0
|
Returns:
Name | Type | Description |
---|---|---|
bool |
True if file exists. |
Example:
>>> exists("/data/chime/intensity/raw/2023/01/01/")
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
def exist(
self,
file: str,
certfile: Optional[str] = None,
namespace: str = "cadc:CHIMEFRB",
verbose: int = 0,
):
"""Check if a file exists.
Args:
file (str): File to check.
certfile (Optional[str], optional): Certificate. Defaults to None.
namespace (str, optional): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
verbose (int, optional): Verbosity. Defaults to 0.
Returns:
bool: True if file exists.
Example:
>>> exists("/data/chime/intensity/raw/2023/01/01/")
"""
# Set logging level.
logger.setLevel("WARNING")
if verbose == 1:
logger.setLevel("INFO")
elif verbose > 1:
logger.setLevel("DEBUG")
_, storageClient, _ = self._connect(certfile=certfile)
file_uri = namespace + "/" + file
try:
storageClient.cadcinfo(file_uri)
existFlag = True
except cadcutils.exceptions.NotFoundException:
existFlag = False
return existFlag
get ¶
get(source: List[str], destination: List[str], certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', verbose: int = 0)
Retrieve a file, stored on the CANFAR file server, and copy it locally.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
List[str]
|
List of source files to retrieve. |
required |
destination |
List[str]
|
List of destination files to copy to. |
required |
certfile |
Optional[str]
|
Certificate. Defaults to None. |
None
|
namespace |
str
|
Minoc Namespace. Defaults to "cadc:CHIMEFRB". |
'cadc:CHIMEFRB'
|
verbose |
int
|
Verbosity level. Defaults to 0. |
0
|
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
def get(
self,
source: List[str],
destination: List[str],
certfile: Optional[str] = None,
namespace: str = "cadc:CHIMEFRB",
verbose: int = 0,
):
"""Retrieve a file, stored on the CANFAR file server, and copy it locally.
Args:
source (List[str]): List of source files to retrieve.
destination (List[str]): List of destination files to copy to.
certfile (Optional[str], optional): Certificate. Defaults to None.
namespace (str): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
verbose (int): Verbosity level. Defaults to 0.
"""
# Set logging level.
logger.setLevel("WARNING")
if verbose == 1:
logger.setLevel("INFO")
elif verbose > 1:
logger.setLevel("DEBUG")
logger.info("Connecting to CADC...")
_, storage, _ = self._connect(certfile=certfile)
not_found: List[str] = []
try:
logger.debug("Checking source and destination length match.")
logger.debug(f"Source length: {len(source)}")
logger.debug(f"Destination length: {len(destination)}")
assert len(source) == len(destination), (
"The number of source files must match the number of destination files."
f"Got {len(source)} source files and {len(destination)} destination files." # noqa
)
for index, filename in enumerate(source):
try:
filename = namespace + "/" + filename
storage.cadcget(filename, destination[index]) # type: ignore
logger.debug(f"{filename} ➜ {destination[index]} ✔")
except cadcutils.exceptions.NotFoundException as error: # type: ignore
logger.error(f"CADC Exception: {filename}")
not_found.append(str(error))
except cadcutils.exceptions.HttpException as error: # type: ignore
logger.error(f"CADC Exception: {error}")
raise error
except Exception as error:
logger.error(f"Error: {error}")
raise error
if len(not_found) > 0:
logger.error(f"Number of files not found: {len(not_found)}")
logger.error(f"Not found: {not_found}")
logger.info(f"Process {os.getpid()} finished.")
info ¶
info(filename: str, certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', verbose: int = 0) -> Tuple[Any, Any, Any, Any, Any, Any, Any]
Get the metadata for a list of files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filenames |
List[str]
|
List of filenames to get metadata for. |
required |
certfile |
Optional[str]
|
Certificate. Defaults to None. |
None
|
namespace |
_type_
|
Minoc Namespace. Defaults to "cadc:CHIMEFRB". |
'cadc:CHIMEFRB'
|
aggregate |
bool
|
Aggregate the results. Defaults to False. |
required |
verbose |
int
|
Verbosity level. Defaults to 0. |
0
|
Returns:
Type | Description |
---|---|
Tuple[Any, Any, Any, Any, Any, Any, Any]
|
Tuple[Any]: Metadata for the files. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
def info(
self,
filename: str,
certfile: Optional[str] = None,
namespace: str = "cadc:CHIMEFRB",
verbose: int = 0,
) -> Tuple[Any, Any, Any, Any, Any, Any, Any]:
"""Get the metadata for a list of files.
Args:
filenames (List[str]): List of filenames to get metadata for.
certfile (Optional[str], optional): Certificate. Defaults to None.
namespace (_type_, optional): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
aggregate (bool, optional): Aggregate the results. Defaults to False.
verbose (int, optional): Verbosity level. Defaults to 0.
Returns:
Tuple[Any]: Metadata for the files.
"""
# Set logging level.
logger.setLevel("WARNING")
if verbose == 1:
logger.setLevel("INFO")
elif verbose > 1:
logger.setLevel("DEBUG")
_, storageClient, _ = self._connect(certfile=certfile)
information: List[Dict[str, Any]] = []
uri = namespace + "/" + filename
logger.info(f"Getting info for {uri}.")
try:
information = storageClient.cadcinfo(uri)
except cadcutils.exceptions.NotFoundException as error:
logger.debug(f"CADC Exception: {error}")
raise error
# Parse the info. that is returned by CANFAR and return them as variables.
file_id = vars(information)["id"] # String
file_name = vars(information)["name"] # String
file_size = vars(information)["size"] # String
file_type = vars(information)["file_type"] # String
file_encoding = vars(information)["encoding"] # String
file_last_modified = vars(information)["lastmod"] # Datetime object
file_md5sum = vars(information)["md5sum"] # String
return (
file_id,
file_name,
file_size,
file_type,
file_encoding,
file_last_modified,
file_md5sum,
)
pdelete ¶
pdelete(file: List[str], certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', processors: int = os.cpu_count() or 1, verbose: int = 0)
Parallelly delete files from the CANFAR file server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file |
List[str]
|
List of source files to delete. |
required |
certfile |
Optional[str]
|
Certificate. Defaults to None. |
None
|
namespace |
_type_
|
Minoc Namespace. Defaults to "cadc:CHIMEFRB". |
'cadc:CHIMEFRB'
|
processors |
int
|
Number of processes to use. Defaults to os.cpu_count() or 1. |
cpu_count() or 1
|
verbose |
int
|
Verbosity level. Defaults to 0. |
0
|
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
def pdelete(
self,
file: List[str],
certfile: Optional[str] = None,
namespace: str = "cadc:CHIMEFRB",
processors: int = os.cpu_count() or 1,
verbose: int = 0,
):
"""Parallelly delete files from the CANFAR file server.
Args:
file (List[str]): List of source files to delete.
certfile (Optional[str], optional): Certificate. Defaults to None.
namespace (_type_, optional): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
processors (int, optional): Number of processes to use.
Defaults to os.cpu_count() or 1.
verbose (int, optional): Verbosity level. Defaults to 0.
"""
# Set logging level.
logger.setLevel("WARNING")
if verbose == 1:
logger.setLevel("INFO")
elif verbose > 1:
logger.setLevel("DEBUG")
files: List[List[Any]] = split(file, processors)
logger.info(f"Starting {processors} processes.")
processes: List[DillProcess] = []
for process in range(processors):
mp = DillProcess(
target=self.delete,
args=(
files[process],
certfile,
namespace,
verbose,
),
)
processes.append(mp)
for proc in processes:
proc.start()
for proc in processes:
proc.join()
pget ¶
pget(source: List[str], destination: List[str], certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', processors: int = os.cpu_count() or 1, verbose: int = 0)
Parallelly retrieve files, stored at CANFAR file server, and copy it locally.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
List[str]
|
List of source files to retrieve. |
required |
destination |
List[str]
|
List of destination files to copy to. |
required |
certfile |
Optional[str]
|
Certificate. Defaults to None. |
None
|
namespace |
_type_
|
Minoc Namespace. Defaults to "cadc:CHIMEFRB". |
'cadc:CHIMEFRB'
|
processors |
int
|
Number of processes to use. Defaults to os.cpu_count() or 1. |
cpu_count() or 1
|
verbose |
int
|
Verbosity level. Defaults to 0. |
0
|
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
def pget(
self,
source: List[str],
destination: List[str],
certfile: Optional[str] = None,
namespace: str = "cadc:CHIMEFRB",
processors: int = os.cpu_count() or 1,
verbose: int = 0,
):
"""Parallelly retrieve files, stored at CANFAR file server, and copy it locally.
Args:
source (List[str]): List of source files to retrieve.
destination (List[str]): List of destination files to copy to.
certfile (Optional[str], optional): Certificate. Defaults to None.
namespace (_type_, optional): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
processors (int, optional): Number of processes to use.
Defaults to os.cpu_count() or 1.
verbose (int, optional): Verbosity level. Defaults to 0.
"""
# Set logging level.
logger.setLevel("WARNING")
if verbose == 1:
logger.setLevel("INFO")
elif verbose > 1:
logger.setLevel("DEBUG")
sources: List[List[Any]] = split(source, processors)
destinations: List[List[Any]] = split(destination, processors)
logger.info(f"Starting {processors} processes.")
processes: List[DillProcess] = []
for process in range(processors):
mp = DillProcess(
target=self.get,
args=(
sources[process],
destinations[process],
certfile,
namespace,
verbose,
),
)
processes.append(mp)
for proc in processes:
proc.start()
for proc in processes:
proc.join()
pput ¶
pput(source: List[str], destination: List[str], certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', overwrite: bool = False, processors: int = os.cpu_count() or 1, verbose: int = 0)
Parallelly transfer files, stored locally, to the CANFAR file server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
List[str]
|
List of source files to transfer. |
required |
destination |
List[str]
|
List of destination files to transfer to. |
required |
certfile |
Optional[str]
|
Certificate. Defaults to None. |
None
|
namespace |
_type_
|
Minoc Namespace. Defaults to "cadc:CHIMEFRB". |
'cadc:CHIMEFRB'
|
overwrite |
bool
|
Overwrite existing files. |
False
|
processors |
int
|
Number of processes to use. Defaults to os.cpu_count() or 1. |
cpu_count() or 1
|
verbose |
int
|
Verbosity level. Defaults to 0. |
0
|
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
def pput(
self,
source: List[str],
destination: List[str],
certfile: Optional[str] = None,
namespace: str = "cadc:CHIMEFRB",
overwrite: bool = False,
processors: int = os.cpu_count() or 1,
verbose: int = 0,
):
"""Parallelly transfer files, stored locally, to the CANFAR file server.
Args:
source (List[str]): List of source files to transfer.
destination (List[str]): List of destination files to transfer to.
certfile (Optional[str], optional): Certificate. Defaults to None.
namespace (_type_, optional): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
overwrite (bool): Overwrite existing files.
processors (int, optional): Number of processes to use.
Defaults to os.cpu_count() or 1.
verbose (int, optional): Verbosity level. Defaults to 0.
"""
# Set logging level.
logger.setLevel("WARNING")
if verbose == 1:
logger.setLevel("INFO")
elif verbose > 1:
logger.setLevel("DEBUG")
sources: List[List[Any]] = split(source, processors)
destinations: List[List[Any]] = split(destination, processors)
logger.info(f"Starting {processors} processes.")
processes: List[DillProcess] = []
for process in range(processors):
mp = DillProcess(
target=self.put,
args=(
sources[process],
destinations[process],
certfile,
namespace,
overwrite,
verbose,
),
)
processes.append(mp)
for proc in processes:
proc.start()
for proc in processes:
proc.join()
put ¶
put(source: List[str], destination: List[str], certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', overwrite: bool = False, verbose: int = 0)
Transfer a file, stored locally, and copy it to the CANFAR file server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
List[str]
|
List of source files to transfer. |
required |
destination |
List[str]
|
List of destination files to transfer to. |
required |
certfile |
Optional[str]
|
Certificate. Defaults to None. |
None
|
namespace |
str
|
Minoc Namespace. Defaults to "cadc:CHIMEFRB". |
'cadc:CHIMEFRB'
|
overwrite |
bool
|
Overwrite existing files. |
False
|
verbose |
int
|
Verbosity level. Defaults to 0. |
0
|
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
def put(
self,
source: List[str],
destination: List[str],
certfile: Optional[str] = None,
namespace: str = "cadc:CHIMEFRB",
overwrite: bool = False,
verbose: int = 0,
):
"""Transfer a file, stored locally, and copy it to the CANFAR file server.
Args:
source (List[str]): List of source files to transfer.
destination (List[str]): List of destination files to transfer to.
certfile (Optional[str], optional): Certificate. Defaults to None.
namespace (str): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
overwrite (bool): Overwrite existing files.
verbose (int): Verbosity level. Defaults to 0.
"""
# Set logging level.
logger.setLevel("WARNING")
if verbose == 1:
logger.setLevel("INFO")
elif verbose > 1:
logger.setLevel("DEBUG")
logger.info("Connecting to CADC...")
_, storage, _ = self._connect(certfile=certfile)
not_found: List[str] = []
try:
logger.debug("Checking source and destination length match.")
logger.debug(f"Source length: {len(source)}")
logger.debug(f"Destination length: {len(destination)}")
assert len(source) == len(destination), (
"The number of source files must match the number of destination files."
f"Got {len(source)} source files and {len(destination)} destination files." # noqa
)
for index, filename in enumerate(destination):
try:
filename = namespace + "/" + filename
# type: ignore
storage.cadcput(filename, source[index], replace=overwrite)
logger.debug(f"{source[index]} ➜ {filename} ✔")
except cadcutils.exceptions.NotFoundException as error: # type: ignore
logger.error(f"CADC Exception: {filename}")
not_found.append(str(error))
except cadcutils.exceptions.HttpException as error: # type: ignore
logger.error(f"CADC Exception: {error}")
raise error
except Exception as error:
logger.error(f"Error: {error}")
raise error
if len(not_found) > 0:
logger.error(f"Number of files not found: {len(not_found)}")
logger.error(f"Not found: {not_found}")
logger.info(f"Process {os.getpid()} finished.")
query ¶
query(directory: str, certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', timeout: int = 60, verbose: int = 0) -> List[str]
Get list of files in a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
directory |
str
|
Directory to get the size of. |
required |
certfile |
str
|
Certificate file. Defaults to None. |
None
|
namespace |
str
|
Minoc Namespace. Defaults to "cadc:CHIMEFRB". |
'cadc:CHIMEFRB'
|
timeout |
int
|
Timeout. Defaults to 60. |
60
|
verbose |
int
|
Verbosity. Defaults to 0. |
0
|
Returns:
Type | Description |
---|---|
List[str]
|
List[str]: List of files in the directory. |
Example
size("/data/chime/intensity/raw/2023/01/01/")
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
def query(
self,
directory: str,
certfile: Optional[str] = None,
namespace: str = "cadc:CHIMEFRB",
timeout: int = 60,
verbose: int = 0,
) -> List[str]:
"""Get list of files in a directory.
Args:
directory (str): Directory to get the size of.
certfile (str, optional): Certificate file. Defaults to None.
namespace (str, optional): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
timeout (int, optional): Timeout. Defaults to 60.
verbose (int, optional): Verbosity. Defaults to 0.
Returns:
List[str]: List of files in the directory.
Example:
>>> size("/data/chime/intensity/raw/2023/01/01/")
"""
# Set logging level.
logger.setLevel("WARNING")
if verbose == 1:
logger.setLevel("INFO")
elif verbose > 1:
logger.setLevel("DEBUG")
logger.info("Getting size of {directory}...")
query = f"select * from inventory.Artifact where uri like '{namespace}/{directory}%'" # noqa
query = query.replace("//", "/")
logger.info(f"Running query: {query}")
buffer = StringIO()
sys.stdout = buffer
_, _, queryClient = self._connect(certfile=certfile)
queryClient.query( # type: ignore
query=query,
output_file=None,
response_format="csv",
tmptable=None,
lang="ADQL",
timeout=timeout,
data_only=True,
no_column_names=True,
)
content = buffer.getvalue()
sys.stdout = sys.__stdout__
return [line.split(",")[0] for line in content.split("\n")]
size ¶
size(directory: str, certfile: Optional[str] = None, namespace: str = 'cadc:CHIMEFRB', timeout: int = 60, verbose: int = 0) -> float
Get the size of a directory in GB.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
directory |
str
|
Directory to get the size of. |
required |
certfile |
Optional[str]
|
Certificate. Defaults to None. |
None
|
namespace |
_type_
|
Minoc Namespace. Defaults to "cadc:CHIMEFRB". |
'cadc:CHIMEFRB'
|
timeout |
int
|
Timeout. Defaults to 60. |
60
|
verbose |
int
|
Verbosity level. Defaults to 0. |
0
|
Returns:
Name | Type | Description |
---|---|---|
float |
float
|
Size of the directory in GB. |
Example
size("/data/chime/intensity/raw/2023/01/01/")
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
def size(
self,
directory: str,
certfile: Optional[str] = None,
namespace: str = "cadc:CHIMEFRB",
timeout: int = 60,
verbose: int = 0,
) -> float:
"""Get the size of a directory in GB.
Args:
directory (str): Directory to get the size of.
certfile (Optional[str], optional): Certificate. Defaults to None.
namespace (_type_, optional): Minoc Namespace. Defaults to "cadc:CHIMEFRB".
timeout (int, optional): Timeout. Defaults to 60.
verbose (int, optional): Verbosity level. Defaults to 0.
Returns:
float: Size of the directory in GB.
Example:
>>> size("/data/chime/intensity/raw/2023/01/01/")
"""
# Set logging level.
logger.setLevel("WARNING")
if verbose == 1:
logger.setLevel("INFO")
elif verbose > 1:
logger.setLevel("DEBUG")
logger.info("Getting size of {directory}...")
query = f"select sum(contentLength/1024.0/1024.0/1024.0) as numGB from inventory.Artifact where uri like '{namespace}/{directory}%'" # noqa
query = query.replace("//", "/")
logger.info(f"Running query: {query}")
buffer = StringIO()
sys.stdout = buffer
_, _, queryClient = self._connect(certfile=certfile)
queryClient.query( # type: ignore
query=query,
output_file=None,
response_format="csv",
tmptable=None,
lang="ADQL",
timeout=timeout,
data_only=True,
no_column_names=True,
)
content = buffer.getvalue()
sys.stdout = sys.__stdout__
return float(content.split("\n")[0])
DillProcess ¶
Bases: Process
A Process class that uses dill to serialize the target function before execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
Process |
object
|
Python Process class. |
required |
Initialize the DillProcess class.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
run ¶
Run the DillProcess.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
split ¶
Split a list into batches.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
List[Any]
|
List to split. |
required |
count |
int
|
Number of batches to split into. |
required |
Returns:
Type | Description |
---|---|
List[List[Any]]
|
List[List[Any]]: List of batches. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/cadcclient.py
def split(data: List[Any], count: int) -> List[List[Any]]:
"""Split a list into batches.
Args:
data (List[Any]): List to split.
count (int): Number of batches to split into.
Returns:
List[List[Any]]: List of batches.
"""
batch_size = len(data) // count
remainder = len(data) % count
batches: List[Any] = []
idx = 0
for i in range(count):
if i < remainder:
batch = data[idx: idx + batch_size + 1] # noqa: E203
idx += batch_size + 1
else:
batch = data[idx: idx + batch_size] # noqa: E203
idx += batch_size
if len(batch) > 0:
batches.append(batch)
return batches
POSIX¶
datatrail_admin.protocols.posix ¶
Implement POSIX protocols to do the following.
- delete: delete a file on the storage element.
- push: replicate a file to a remote destination.
- pull: replicate a file from a remote location.
- move: move a file from one local location to another.
- copy: copy a file from one local location to another.
PosixApi ¶
Bases: Protocol
Class for POSIX API.
Attributes¶
Protocol : Protocol
Methods¶
delete(path: Union[str, Path]) Delete a file on the local storage element.
push(source: Union[str, Path], destination: Union[str, Path]) Push the file to the remote destination.
pull(source: Union[str, Path], destination: Union[str, Path]) Pull the file from the remote destination.
move(source, destination) Move a file from one local location to another.
copy(source, destination) Copy a file from one local location to another.
Initialize the class.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/posix.py
copy ¶
Copy a file locally from one location to another.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/posix.py
def copy(self, source: Path, destination: Path) -> None:
"""Copy a file locally from one location to another."""
if type(source) is not Path:
source = Path(source)
if type(destination) is not Path:
destination = Path(destination)
if source.is_file():
shutil.copy(source, destination)
else:
shutil.copytree(source, destination)
delete ¶
Delete a file on the storage element.
Parameters¶
path : Union[str, Path] Path for the file or directory to be deleted.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/posix.py
def delete(self, path: Path) -> None:
"""Delete a file on the storage element.
Parameters
----------
path : Union[str, Path]
Path for the file or directory to be deleted.
"""
if type(path) is not Path:
path = Path(path)
if ":" in path.as_posix():
host, split_path = path.as_posix().split(":")
connection = Connection(host)
connection.run(f"rm -rf {split_path}")
else:
subprocess.call(["rm", "-rf", path.as_posix()])
mkdir ¶
Make a directory on the local storage element.
Parameters¶
path : Union[str, Path] Path for the new directory.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/posix.py
move ¶
Move a file locally from one location to another.
Parameters¶
source : Union[str, Path] Source path - local host. destination : Union[str, Path] Destination path - local host.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/posix.py
def move(self, source, destination):
"""Move a file locally from one location to another.
Parameters
----------
source : Union[str, Path]
Source path - local host.
destination : Union[str, Path]
Destination path - local host.
"""
if type(source) is not Path:
source = Path(source)
if type(destination) is not Path:
destination = Path(destination)
shutil.move(source, destination)
pull ¶
Pull the file from the remote destination.
Use rsync to pull the file from the remote destination.
Parameters¶
source : Union[str, Path] Source path - remote host. destination : Union[str, Path] Destination path - local host.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/posix.py
def pull(self, source: Path, destination: Path) -> None:
"""Pull the file from the remote destination.
Use rsync to pull the file from the remote destination.
Parameters
----------
source : Union[str, Path]
Source path - remote host.
destination : Union[str, Path]
Destination path - local host.
"""
if type(source) is not Path:
source = Path(source)
if type(destination) is not Path:
destination = Path(destination)
subprocess.call(["rsync", "-azPR", source.as_posix(), destination.as_posix()])
push ¶
Push the file to the remote destination.
Use rsync to push the file to the remote destination.
Parameters¶
source : Union[str, Path] Source path - local host. destination : Union[str, Path] Destination path - remote host.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/posix.py
def push(self, source: Path, destination: Path) -> None:
"""Push the file to the remote destination.
Use rsync to push the file to the remote destination.
Parameters
----------
source : Union[str, Path]
Source path - local host.
destination : Union[str, Path]
Destination path - remote host.
"""
if type(source) is not Path:
source = Path(source)
if type(destination) is not Path:
destination = Path(destination)
assert source.exists(), "Source file does not exist."
subprocess.call(["rsync", "-azPR", source.as_posix(), destination.as_posix()])
VOS¶
datatrail_admin.protocols.vosapi ¶
Implement VOSpace protocols to do the following.
- delete: delete a file on the storage element.
- push: replicate a file to a remote destination.
- pull: replicate a file from a remote location.
- move: move a file from one local location to another.
- copy: copy a file from one local location to another.
VosApi ¶
Bases: Protocol
Class for VOSpace API.
Initialize the VOSpace API.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
chmod ¶
Change global permissions of the file/directory.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
connect ¶
copy ¶
Copy file/directory within VOSpace.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
delete ¶
exists ¶
Check if the file/directory exists on VOSpace.
isdir ¶
Check if the node is a directory.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
isfile ¶
Check if the node is a file.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
make_readable_by_chime_frb_ro ¶
Make the node writable by all of chime frb group.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
make_writable_by_chime_frb_admin ¶
Make the node writable by chime frb admin group.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
make_writable_by_chime_frb_rw ¶
Make the node writable by all of chime frb group.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
mkdir ¶
Create a directory on VOSpace.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
move ¶
Move file/directory within VOSpace.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
pull ¶
Pull the file/directory from VOSpace to local space.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
def pull(self, source, destination):
"""Pull the file/directory from VOSpace to local space."""
if source.startswith("/arc/"):
source = "arc:" + source[5:]
if destination.startswith("/arc/"):
destination = "arc:" + destination[5:]
assert source.startswith("arc:") or source.startswith("vos:")
starts_with_slash = destination[0] == "/"
if re.search(r"/", destination):
dest_dir = destination.split("/")[:-1]
dest_dir = os.path.join(*dest_dir)
else:
dest_dir = destination
if starts_with_slash:
dest_dir = "/" + dest_dir
os.makedirs(dest_dir, mode=0o777, exist_ok=True)
# Error with the vos python client, using vcp command in meantime.
# Once fixed can use: self.client.copy(source, destination)
run(f"vcp --certfile {CERTFILE} {source} {destination}")
os.system(f"chmod -R 777 {destination}")
push ¶
Push the file/directory to VOSpace.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/datatrail_admin/protocols/vosapi.py
def push(self, source, destination, n_streams=5, cache_path=None, verbose=False):
"""Push the file/directory to VOSpace."""
if source.startswith("/arc/"):
logger.info("Fixing source path '/arc/' to 'arc:'")
source = "arc:" + source[5:]
if destination.startswith("/arc/"):
logger.info("Fixing destination path '/arc/' to 'arc:'")
destination = "arc:" + destination[5:]
if cache_path is None:
logger.info("Setting cache_path to ~/.vsync_cache")
cache_path = (Path("~") / "vsync_cache").as_posix()
assert destination.startswith("arc:") or destination.startswith("vos:")
# Error with the vos python client, using vcp command in meantime.
# Once fixed can use: self.client.copy(source, destination)
if verbose:
run(
f"vsync --certfile {CERTFILE} -v -r -n {n_streams} --cache_nodes --cache_filename {cache_path} {source} {destination}" # noqa
)
else:
run(
f"vsync --certfile {CERTFILE} -r -n {n_streams} --cache_nodes --cache_filename {cache_path} {source} {destination}" # noqa
)
sync ¶
Sync the directory structure within VOSpace.