Low-Level API¶
Swarm¶
CHIME/FRB Swarm API.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
API |
chime_frb_api.core.API Base class handling the actual HTTP requests. |
required |
Initialize the Swarm API.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
get_job_status ¶
Get job[s] status with a regex match to argument job_name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_name |
str
|
Name of the job |
required |
Returns:
Type | Description |
---|---|
Dict[str, str]
|
{ job_name : STATUS } : dict |
Dict[str, str]
|
Where STATUS can be, |
Dict[str, str]
|
NEW The job was initialized. |
Dict[str, str]
|
PENDING Resources for the job were allocated. |
Dict[str, str]
|
ASSIGNED Docker assigned the job to nodes. |
Dict[str, str]
|
ACCEPTED The job was accepted by a worker node. |
Dict[str, str]
|
PREPARING Docker is preparing the job. |
Dict[str, str]
|
STARTING Docker is starting the job. |
Dict[str, str]
|
RUNNING The job is executing. |
Dict[str, str]
|
COMPLETE The job exited without an error code. |
Dict[str, str]
|
FAILED The job exited with an error code. |
Dict[str, str]
|
SHUTDOWN Docker requested the job to shut down. |
Dict[str, str]
|
REJECTED The worker node rejected the job. |
Dict[str, str]
|
ORPHANED The node was down for too long. |
Dict[str, str]
|
REMOVE The job is not terminal but the associated job was removed |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
def get_job_status(self, job_name: str) -> Dict[str, str]:
"""Get job[s] status with a regex match to argument job_name.
Args:
job_name: Name of the job
Returns:
{ job_name : STATUS } : dict
Where STATUS can be,
NEW The job was initialized.
PENDING Resources for the job were allocated.
ASSIGNED Docker assigned the job to nodes.
ACCEPTED The job was accepted by a worker node.
PREPARING Docker is preparing the job.
STARTING Docker is starting the job.
RUNNING The job is executing.
COMPLETE The job exited without an error code.
FAILED The job exited with an error code.
SHUTDOWN Docker requested the job to shut down.
REJECTED The worker node rejected the job.
ORPHANED The node was down for too long.
REMOVE The job is not terminal but the associated job was removed
"""
return self.API.get(f"/v1/swarm/job-status/{job_name}")
get_jobs ¶
Returns the name of all jobs on the analysis cluster.
Returns:
Type | Description |
---|---|
List[str]
|
List[str]: List of job names. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
get_logs ¶
Return logs from a CHIME/FRB Job.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_name |
str
|
Unique name for the cluster job |
required |
Returns:
Name | Type | Description |
---|---|---|
job_logs |
Dict[str, str]
|
dict |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
jobs_running ¶
Monitor job[s] on CHIME/FRB Analysis Cluster.
Monitors job[s] on the CHIME/FRB Analysis Cluster untill they are either COMPLETE, FAILED or SHUTDOWN
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_names |
List[str]
|
A list of string job_name paramerters to monitor |
required |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
def jobs_running(self, job_names: List[str]) -> bool:
"""Monitor job[s] on CHIME/FRB Analysis Cluster.
Monitors job[s] on the CHIME/FRB Analysis Cluster untill they are either
COMPLETE, FAILED or SHUTDOWN
Args:
job_names: A list of string job_name paramerters to monitor
"""
running_statuses = [
"new",
"pending",
"assigned",
"accepted",
"preparing",
"starting",
"running",
]
if isinstance(job_names, str):
job_names = [job_names]
jobs_status = {}
for job in job_names:
status = self.get_job_status(job)
jobs_status[job] = status
for running in running_statuses:
if running in status.values():
return True # pragma: no cover
return False
kill_failed_jobs ¶
Remove FAILED jobs with a regex match to job_name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_name |
Optional[str]
|
Unique name for the cluster job |
None
|
Returns:
Name | Type | Description |
---|---|---|
dict |
Dict[str, bool]
|
{job_name : boolean} |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
def kill_failed_jobs(self, job_name: Optional[str] = None) -> Dict[str, bool]:
"""Remove FAILED jobs with a regex match to job_name.
Args:
job_name: Unique name for the cluster job
Returns:
dict: {job_name : boolean}
"""
assert isinstance(job_name, str), "job_name <str> is required"
status = {}
for job in self.get_jobs():
if job_name in job: # pragma: no cover
if self.get_job_status(job)[job] == "failed":
status[job] = self.kill_job(job)[job]
return status
kill_job ¶
Remove (forcibly) job with ANY status but with an exact match to job_name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_name |
str
|
Unique name for the cluster job |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
Dict[str, bool]
|
{job_name : boolean} |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
monitor_jobs ¶
Continously monitor job[s] on the CHIME/FRB Analysis Cluster.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_name |
str
|
Regular expression matching to the job_name |
required |
error_logs |
bool
|
Print error logs, by default False |
False
|
Returns:
Name | Type | Description |
---|---|---|
bool |
bool
|
Status of the pipeline |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
def monitor_jobs(
self, job_name: str, error_logs: bool = False
) -> bool: # pragma: no cover
"""Continously monitor job[s] on the CHIME/FRB Analysis Cluster.
Args:
job_name: Regular expression matching to the job_name
error_logs: Print error logs, by default False
Returns:
bool: Status of the pipeline
"""
log.info("================================================")
log.info(f"Monitoring Pipeline: {job_name}")
log.info("================================================")
initiating = ["new", "accepted", "pending", "starting", "preparing", "assigned"]
status = self.get_job_status(job_name)
while any([n in initiating for n in status.values()]):
sleep(30)
status = self.get_job_status(job_name)
# Initiation
log.info("Pipeline Initiation: Complete")
log.info("================================================")
log.info("Pipeline Processing: Started")
status = self.get_job_status(job_name)
while "running" in status.values():
log.info("Pipeline Processing: Running")
sleep(120)
status = self.get_job_status(job_name)
log.info("Pipeline Processing: Complete")
log.info("================================================")
log.info("Pipeline Completion Status")
completed = failed = 0
for key, value in status.items():
if value == "completed":
completed += 1
else:
failed += 1
log.info(f"Completed : {(completed / len(status)) * 100}%")
log.info(f"Failed : {(failed / len(status)) * 100}%")
log.info("================================================")
log.info("Pipeline Cleanup: Started")
self.prune_jobs(job_name)
# Make sure all jobs were pruned, if not report and kill them
# TODO: In the future respawn "failed" jobs
status = self.get_job_status(job_name)
if len(status.keys()) > 0:
log.error("Pipeline Cleanup: Failed Jobs Detected")
for job in status.keys():
log.error(f"Job Name : {key}")
log.error(f"Job Removal: {self.kill_job(job)}")
log.info("Pipeline Cleanup: Completed with Failed Jobs")
return False
log.info("Pipeline Cleanup: Completed")
log.info("================================================")
return True
prune_jobs ¶
Remove COMPLETED jobs with a regex match to argument job_name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_name |
str
|
Unique name for the cluster job |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
Dict[str, bool]
|
{job_name : boolean} |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
spawn_baseband_job ¶
spawn_baseband_job(event_number: int, task_name: str, arguments: list = [], job_id: Optional[int] = None, image_name: str = 'chimefrb/baseband-localization:latest', command: list = ['baseband_analysis/pipelines/cluster/cluster_cli.py'], job_name: Optional[str] = None, job_mem_limit: int = 10 * 1024 ** 3, job_mem_reservation: int = 10 * 1024 ** 3, environment: dict = {}, **kwargs: dict) -> Dict[str, str]
Spawn a CHIME/FRB Baseband job on the Analysis Cluster.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_number |
int
|
ID of the event to process. |
required |
task_name |
str
|
Name of the task to run. Eg. localization |
required |
arguments |
list
|
Arguments to the command. Default: None. |
[]
|
job_id |
Optional[int]
|
ID of the job to run. Default: None. |
None
|
command |
list
|
The command to be run in the container. Default: cluster_cli.py. |
['baseband_analysis/pipelines/cluster/cluster_cli.py']
|
image_name |
str
|
Name of the container image to spawn the job with Default: chimefrb/baseband-analysis:latest |
'chimefrb/baseband-localization:latest'
|
job_name |
Optional[str]
|
Unique name for the cluster job Default: baseband-EVENT_NUMBER-TASK_NAME-UUID_CODE |
None
|
job_mem_limit |
int
|
Memory limit of the created container in bytes Default: 10 GB |
10 * 1024 ** 3
|
job_mem_reservation |
int
|
Minimum memory reserved of the created container in bytes. Default: 10 GB |
10 * 1024 ** 3
|
environment |
dict
|
ENV variables to pass to the container Default: read authentication tokens from the environment |
{}
|
kwargs |
Additional parameters for spawn_job |
{}
|
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
def spawn_baseband_job(
self,
event_number: int,
task_name: str,
arguments: list = [],
job_id: Optional[int] = None,
image_name: str = "chimefrb/baseband-localization:latest",
command: list = ["baseband_analysis/pipelines/cluster/cluster_cli.py"],
job_name: Optional[str] = None,
job_mem_limit: int = 10 * 1024**3,
job_mem_reservation: int = 10 * 1024**3,
environment: dict = {},
**kwargs,
) -> Dict[str, str]: # pragma: no cover
"""Spawn a CHIME/FRB Baseband job on the Analysis Cluster.
Args:
event_number: ID of the event to process.
task_name: Name of the task to run. Eg. localization
arguments: Arguments to the command.
Default: None.
job_id: ID of the job to run.
Default: None.
command: The command to be run in the container.
Default: cluster_cli.py.
image_name: Name of the container image to spawn the job with
Default: chimefrb/baseband-analysis:latest
job_name: Unique name for the cluster job
Default: baseband-EVENT_NUMBER-TASK_NAME-UUID_CODE
job_mem_limit: Memory limit of the created container in bytes
Default: 10 GB
job_mem_reservation: Minimum memory reserved of the created container in
bytes.
Default: 10 GB
environment: ENV variables to pass to the container
Default: read authentication tokens from the environment
kwargs: Additional parameters for spawn_job
"""
environment.setdefault("FRB_MASTER_ACCESS_TOKEN", self.API.access_token)
environment.setdefault("FRB_MASTER_REFRESH_TOKEN", self.API.refresh_token)
if job_id is None:
job_argument = []
else:
job_argument = ["--job-id", str(job_id)]
if job_name is None:
if (job_id is None) or (job_id < 0):
job_name = f"baseband-{event_number}-{task_name}"
else:
job_name = f"baseband-{event_number}-{task_name}-{job_id}"
out = self.spawn_job(
image_name=image_name,
command=command + [task_name],
arguments=["--event-number", str(event_number)]
+ job_argument
+ ["--"]
+ arguments,
job_name=job_name,
job_mem_limit=job_mem_limit,
job_mem_reservation=job_mem_reservation,
job_cpu_limit=2,
job_cpu_reservation=2,
environment=environment,
**kwargs,
)
return out
spawn_job ¶
spawn_job(image_name: str, command: list, arguments: list, job_name: str, mount_archiver: bool = True, swarm_network: bool = True, job_mem_limit: int = 4294967296, job_mem_reservation: int = 268435456, job_cpu_limit: float = 1, job_cpu_reservation: float = 1, environment: dict = {}) -> Dict[str, str]
Spawn a job on the CHIME/FRB Analysis Cluster.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image_name |
str
|
Name of the container image |
required |
command |
list
|
Command to run in the container |
required |
arguments |
list
|
Arguments to the command |
required |
job_name |
str
|
Unique name for the job |
required |
mount_archiver |
bool
|
Mount Site Data Archivers, by default True |
True
|
swarm_network |
bool
|
Mount Cluster Network, by default True |
True
|
job_mem_limit |
int
|
Memory limit in bytes, by default 4294967296 |
4294967296
|
job_mem_reservation |
int
|
Minimum memory reserved, by default 268435456 |
268435456
|
job_cpu_limit |
float
|
Maximum cpu cores job can use, by default 1 |
1
|
job_cpu_reservation |
float
|
Minimum cores reservers for the job, default 1 |
1
|
environment |
dict
|
ENV to pass to the container, default is {} |
{}
|
Returns:
Type | Description |
---|---|
Dict[str, str]
|
JSON [description] |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
def spawn_job(
self,
image_name: str,
command: list,
arguments: list,
job_name: str,
mount_archiver: bool = True,
swarm_network: bool = True,
job_mem_limit: int = 4294967296,
job_mem_reservation: int = 268435456,
job_cpu_limit: float = 1,
job_cpu_reservation: float = 1,
environment: dict = {},
) -> Dict[str, str]:
"""Spawn a job on the CHIME/FRB Analysis Cluster.
Args:
image_name: Name of the container image
command: Command to run in the container
arguments: Arguments to the command
job_name: Unique name for the job
mount_archiver: Mount Site Data Archivers, by default True
swarm_network: Mount Cluster Network, by default True
job_mem_limit: Memory limit in bytes, by default 4294967296
job_mem_reservation: Minimum memory reserved, by default 268435456
job_cpu_limit: Maximum cpu cores job can use, by default 1
job_cpu_reservation: Minimum cores reservers for the job, default 1
environment: ENV to pass to the container, default is {}
Returns:
JSON
[description]
"""
payload = {
"image_name": image_name,
"command": command,
"arguments": arguments,
"job_name": job_name,
"mount_archiver": mount_archiver,
"swarm_network": swarm_network,
"job_mem_reservation": job_mem_reservation,
"job_mem_limit": job_mem_limit,
"job_cpu_limit": job_cpu_limit,
"job_cpu_reservation": job_cpu_reservation,
"environment": environment,
}
return self.API.post(url="/v1/swarm/spawn-job", json=payload)
Workflow¶
Bases: BaseModel
Work Object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
BaseModel |
BaseModel
|
Pydantic BaseModel. |
required |
Attributes:
Name | Type | Description |
---|---|---|
pipeline |
str
|
Name of the pipeline. (Required) Automatically reformated to hyphen-case. |
site |
str
|
Site where the work will be performed. (Required) |
user |
str
|
User who created the work. (Required) |
function |
Optional[str]
|
Name of the function ran as |
command |
Optional[List[str]]
|
Command to run as |
parameters |
Optional[Dict[str, Any]]
|
Parameters to pass to the function. |
command |
Optional[List[str]]
|
Command to run as |
results |
Optional[Dict[str, Any]]
|
Results of the work. |
products |
Optional[Dict[str, Any]]
|
Products of the work. |
plots |
Optional[Dict[str, Any]]
|
Plots of the work. |
event |
Optional[List[int]]
|
Event ID of the work. |
tags |
Optional[List[str]]
|
Tags of the work. |
timeout |
int
|
Timeout for the work in seconds. Default is 3600 seconds. |
retries |
int
|
Number of retries for the work. Default is 2 retries. |
priority |
int
|
Priority of the work. Default is 3. |
config |
WorkConfig
|
Configuration of the work. |
notify |
Notify
|
Notification configuration of the work. |
id |
str
|
ID of the work. |
creation |
float
|
Creation time of the work. |
start |
float
|
Start time of the work. |
stop |
float
|
Stop time of the work. |
status |
str
|
Status of the work. |
Raises:
Type | Description |
---|---|
ValueError
|
If the work is not valid. |
Returns:
Name | Type | Description |
---|---|---|
Work |
Work object. |
Example
payload
property
¶
Return the dictioanary representation of the work.
Returns:
Type | Description |
---|---|
Dict[str, Any]
|
Dict[str, Any]: The payload of the work. |
Dict[str, Any]
|
Non-instanced attributes are excluded from the payload. |
Config ¶
Pydantic Config.
delete ¶
Delete work from the buckets backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ids |
List[str]
|
List of ids to delete. |
required |
Returns:
Name | Type | Description |
---|---|---|
bool |
bool
|
True if successful, False otherwise. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
@retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
def delete(self, **kwargs: Dict[str, Any]) -> bool:
"""Delete work from the buckets backend.
Args:
ids (List[str]): List of ids to delete.
Returns:
bool: True if successful, False otherwise.
"""
buckets = Buckets(**kwargs) # type: ignore
return buckets.delete_ids([str(self.id)])
deposit ¶
Deposit work to the buckets backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
Dict[str, Any]
|
Keyword arguments for the Buckets API. |
{}
|
Returns:
Name | Type | Description |
---|---|---|
bool |
Union[bool, List[str]]
|
True if successful, False otherwise. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
@retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
def deposit(
self, return_ids: bool = False, **kwargs: Dict[str, Any]
) -> Union[bool, List[str]]:
"""Deposit work to the buckets backend.
Args:
**kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.
Returns:
bool: True if successful, False otherwise.
"""
buckets = Buckets(**kwargs) # type: ignore
return buckets.deposit(works=[self.payload], return_ids=return_ids)
from_dict
classmethod
¶
Create a work from a dictionary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
payload |
Dict[str, Any]
|
The dictionary. |
required |
Returns:
Name | Type | Description |
---|---|---|
Work |
Work
|
The work. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
from_json
classmethod
¶
Create a work from a json string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
json_str |
str
|
The json string. |
required |
Returns:
Name | Type | Description |
---|---|---|
Work |
Work
|
The work. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
post_init ¶
Initialize work attributes after validation.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
@root_validator
def post_init(cls, values: Dict[str, Any]):
"""Initialize work attributes after validation."""
# Check if the pipeline name has any character that is uppercase
reformatted: bool = False
for char in values["pipeline"]:
if char.isupper():
values["pipeline"] = values["pipeline"].lower()
reformatted = True
break
if any(char in {" ", "_"} for char in values["pipeline"]):
values["pipeline"] = values["pipeline"].replace(" ", "-")
values["pipeline"] = values["pipeline"].replace("_", "-")
reformatted = True
# Check if the pipeline has any character that is not alphanumeric or dash
for char in values["pipeline"]:
if not char.isalnum() and char not in ["-"]:
raise ValueError(
"pipeline name can only contain letters, numbers & dashes"
)
if reformatted:
warn(
SyntaxWarning(f"pipeline reformatted to {values['pipeline']}"),
stacklevel=2,
)
# Set creation time if not already set
if values.get("creation") is None:
values["creation"] = time()
# Update tags from environment variable WORKFLOW_TAGS
if environ.get("WORKFLOW_TAGS"):
env_tags: List[str] = str(environ.get("WORKFLOW_TAGS")).split(",")
# If tags are already set, append the new ones
if values.get("tags"):
values["tags"] = values["tags"] + env_tags
else:
values["tags"] = env_tags
# Remove duplicates
values["tags"] = list(set(values["tags"]))
# Check if both command and function are set
if values.get("command") and values.get("function"):
raise ValueError("command and function cannot be set together.")
if not values.get("token"): # type: ignore
msg = "workflow token required after v4.0.0."
warn(
FutureWarning(msg),
stacklevel=2,
)
return values
update ¶
Update work in the buckets backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
Dict[str, Any]
|
Keyword arguments for the Buckets API. |
{}
|
Returns:
Name | Type | Description |
---|---|---|
bool |
bool
|
True if successful, False otherwise. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
@retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
def update(self, **kwargs: Dict[str, Any]) -> bool:
"""Update work in the buckets backend.
Args:
**kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.
Returns:
bool: True if successful, False otherwise.
"""
buckets = Buckets(**kwargs) # type: ignore
return buckets.update([self.payload])
withdraw
classmethod
¶
withdraw(pipeline: str, event: Optional[List[int]] = None, site: Optional[str] = None, priority: Optional[int] = None, user: Optional[str] = None, tags: Optional[List[str]] = None, parent: Optional[str] = None, **kwargs: Dict[str, Any]) -> Optional[Work]
Withdraw work from the buckets backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
str
|
Name of the pipeline. |
required |
**kwargs |
Dict[str, Any]
|
Keyword arguments for the Buckets API. |
{}
|
Returns:
Name | Type | Description |
---|---|---|
Work |
Optional[Work]
|
Work object. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
@classmethod
def withdraw(
cls,
pipeline: str,
event: Optional[List[int]] = None,
site: Optional[str] = None,
priority: Optional[int] = None,
user: Optional[str] = None,
tags: Optional[List[str]] = None,
parent: Optional[str] = None,
**kwargs: Dict[str, Any],
) -> Optional["Work"]:
"""Withdraw work from the buckets backend.
Args:
pipeline (str): Name of the pipeline.
**kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.
Returns:
Work: Work object.
"""
buckets = Buckets(**kwargs) # type: ignore
payload = buckets.withdraw(
pipeline=pipeline,
event=event,
site=site,
priority=priority,
user=user,
tags=tags,
parent=parent,
)
if payload:
return cls.from_dict(payload)
return None
Results¶
Bases: API
CHIME/FRB Backend workflow Results API.
Initialize the workflow Results API.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
debug |
bool
|
Whether to enable debug mode. Defaults to False. |
False
|
base_url |
str
|
The base URL of the API. Defaults to "http://localhost:8005". |
'http://localhost:8005'
|
authentication |
bool
|
Whether to enable authentication. Defaults to False. |
False
|
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/results.py
def __init__(
self,
debug: bool = False,
base_url: str = "http://localhost:8005",
authentication: bool = False,
**kwargs: Dict[str, Any],
):
"""Initialize the workflow Results API.
Args:
debug (bool, optional): Whether to enable debug mode. Defaults to False.
base_url (str, optional): The base URL of the API.
Defaults to "http://localhost:8005".
authentication (bool, optional): Whether to enable authentication.
Defaults to False.
"""
API.__init__(
self,
debug=debug,
default_base_urls=[
"http://frb-vsop.chime:8005",
"http://localhost:8005",
"https://frb.chimenet.ca/results/",
],
base_url=base_url,
authentication=authentication,
**kwargs,
)
count ¶
Retrieve the number of results filtered by the query parameters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
str
|
Name of pipeline to query results. |
required |
query |
Dict[str, Any]
|
The query to filter the results with. |
required |
Returns:
Name | Type | Description |
---|---|---|
int |
int
|
The number of results that match the query. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/results.py
def count(
self,
pipeline: str,
query: Dict[str, Any],
) -> int:
"""Retrieve the number of results filtered by the query parameters.
Args:
pipeline (str): Name of pipeline to query results.
query (Dict[str, Any]): The query to filter the results with.
Returns:
int: The number of results that match the query.
"""
query["pipeline"] = pipeline
payload = {
"query": query,
}
response: int = self.post("/view/count", json=payload)
return response
delete_ids ¶
Delete results from the results backend with the given ids.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
str
|
Name of pipeline that the IDs are from. |
required |
ids |
List[str]
|
The IDs of the works to delete. |
required |
Returns:
Name | Type | Description |
---|---|---|
bool |
bool
|
Whether the results were deleted successfully. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/results.py
def delete_ids(self, pipeline: str, ids: List[str]) -> bool:
"""Delete results from the results backend with the given ids.
Args:
pipeline (str): Name of pipeline that the IDs are from.
ids (List[str]): The IDs of the works to delete.
Returns:
bool: Whether the results were deleted successfully.
"""
return self.delete(url="/results", params={pipeline: ids})
deposit ¶
Deposit works into the results backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
works |
List[Dict[str, Any]]
|
A list of payloads from Work Objects. |
required |
Note: This method is not intended to be called directly by end users.
Returns:
Type | Description |
---|---|
Dict[str, bool]
|
Dict[str, bool]: Dictionary of deposit results for each pipeline. |
Examples:
from chime_frb_api.results import Results from chime_frb_api.tasks import Work work = Work.fetch(pipeline="sample") results = Results() status = results.deposit([work.payload])
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/results.py
def deposit(self, works: List[Dict[str, Any]]) -> Dict[str, bool]:
"""Deposit works into the results backend.
Args:
works (List[Dict[str, Any]]): A list of payloads from Work Objects.
Note:
This method is not intended to be called directly by end users.
Returns:
Dict[str, bool]: Dictionary of deposit results for each pipeline.
Examples:
>>> from chime_frb_api.results import Results
>>> from chime_frb_api.tasks import Work
>>> work = Work.fetch(pipeline="sample")
>>> results = Results()
>>> status = results.deposit([work.payload])
"""
for work in works:
assert work["status"] in [
"success",
"failure",
], "Work status must be 'success' or 'failure'"
return self.post(url="/results", json=works)
status ¶
Retrieve the overall status of the results backend.
Returns:
Type | Description |
---|---|
Dict[str, int]
|
Dict[str, int]: The status of the results backend. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/results.py
update ¶
Update works in the results backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
works |
List[Dict[str, Any]]
|
A list of payloads from Work Objects. |
required |
Note: The results need to exist before they can be updated.
Returns:
Name | Type | Description |
---|---|---|
bool |
bool
|
Whether the works were updated successfully. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/results.py
def update(self, works: List[Dict[str, Any]]) -> bool:
"""Update works in the results backend.
Args:
works (List[Dict[str, Any]]): A list of payloads from Work Objects.
Note:
The results need to exist before they can be updated.
Returns:
bool: Whether the works were updated successfully.
"""
response: bool = self.put(url="/results", json=works)
return response
view ¶
view(pipeline: str, query: Dict[str, Any], projection: Dict[str, bool], skip: int = 0, limit: Optional[int] = 100) -> List[Dict[str, Any]]
View works in the workflow results backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
str
|
Name of pipeline to query results. |
required |
query |
Dict[str, Any]
|
The query to filter the results with. |
required |
projection |
Dict[str, bool]
|
The projection to use to map the output. |
required |
skip |
int
|
The number of works to skip. Defaults to 0. |
0
|
limit |
Optional[int]
|
The number of works to limit to. Defaults to 100. |
100
|
Returns:
Type | Description |
---|---|
List[Dict[str, Any]]
|
List[Dict[str, Any]]: The works matching the query. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/results.py
def view(
self,
pipeline: str,
query: Dict[str, Any],
projection: Dict[str, bool],
skip: int = 0,
limit: Optional[int] = 100,
) -> List[Dict[str, Any]]:
"""View works in the workflow results backend.
Args:
pipeline (str): Name of pipeline to query results.
query (Dict[str, Any]): The query to filter the results with.
projection (Dict[str, bool]): The projection to use to map the output.
skip (int, optional): The number of works to skip. Defaults to 0.
limit (Optional[int], optional): The number of works to limit to.
Defaults to 100.
Returns:
List[Dict[str, Any]]: The works matching the query.
"""
query["pipeline"] = pipeline
payload = {
"query": query,
"projection": projection,
"skip": skip,
"limit": limit,
}
response: List[Dict[str, Any]] = self.post("/view", json=payload)
return response