Skip to content

Low-Level API

Swarm

CHIME/FRB Swarm API.

Parameters:

Name Type Description Default
API

chime_frb_api.core.API Base class handling the actual HTTP requests.

required

Initialize the Swarm API.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
Python
def __init__(self, API: API):
    """Initialize the Swarm API."""
    self.API = API

get_job_status

Python
get_job_status(job_name: str) -> Dict[str, str]

Get job[s] status with a regex match to argument job_name.

Parameters:

Name Type Description Default
job_name str

Name of the job

required

Returns:

Type Description
Dict[str, str]

{ job_name : STATUS } : dict

Dict[str, str]

Where STATUS can be,

Dict[str, str]

NEW The job was initialized.

Dict[str, str]

PENDING Resources for the job were allocated.

Dict[str, str]

ASSIGNED Docker assigned the job to nodes.

Dict[str, str]

ACCEPTED The job was accepted by a worker node.

Dict[str, str]

PREPARING Docker is preparing the job.

Dict[str, str]

STARTING Docker is starting the job.

Dict[str, str]

RUNNING The job is executing.

Dict[str, str]

COMPLETE The job exited without an error code.

Dict[str, str]

FAILED The job exited with an error code.

Dict[str, str]

SHUTDOWN Docker requested the job to shut down.

Dict[str, str]

REJECTED The worker node rejected the job.

Dict[str, str]

ORPHANED The node was down for too long.

Dict[str, str]

REMOVE The job is not terminal but the associated job was removed

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
Python
def get_job_status(self, job_name: str) -> Dict[str, str]:
    """Get job[s] status with a regex match to argument job_name.

    Args:
        job_name: Name of the job

    Returns:
        { job_name : STATUS } : dict

        Where STATUS can be,
        NEW         The job was initialized.
        PENDING     Resources for the job were allocated.
        ASSIGNED    Docker assigned the job to nodes.
        ACCEPTED    The job was accepted by a worker node.
        PREPARING   Docker is preparing the job.
        STARTING    Docker is starting the job.
        RUNNING     The job is executing.
        COMPLETE    The job exited without an error code.
        FAILED      The job exited with an error code.
        SHUTDOWN    Docker requested the job to shut down.
        REJECTED    The worker node rejected the job.
        ORPHANED    The node was down for too long.
        REMOVE      The job is not terminal but the associated job was removed
    """
    return self.API.get(f"/v1/swarm/job-status/{job_name}")

get_jobs

Python
get_jobs() -> List[str]

Returns the name of all jobs on the analysis cluster.

Returns:

Type Description
List[str]

List[str]: List of job names.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
Python
def get_jobs(self) -> List[str]:
    """Returns the name of all jobs on the analysis cluster.

    Args:
        None

    Returns:
        List[str]: List of job names.
    """
    jobs: List[str] = self.API.get(url="/v1/swarm/jobs")
    return jobs

get_logs

Python
get_logs(job_name: str) -> Dict[str, str]

Return logs from a CHIME/FRB Job.

Parameters:

Name Type Description Default
job_name str

Unique name for the cluster job

required

Returns:

Name Type Description
job_logs Dict[str, str]

dict

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
Python
def get_logs(self, job_name: str) -> Dict[str, str]:
    """Return logs from a CHIME/FRB Job.

    Args:
        job_name: Unique name for the cluster job

    Returns:
        job_logs : dict
    """
    return self.API.get(f"/v1/swarm/logs/{job_name}")

jobs_running

Python
jobs_running(job_names: List[str]) -> bool

Monitor job[s] on CHIME/FRB Analysis Cluster.

Monitors job[s] on the CHIME/FRB Analysis Cluster untill they are either COMPLETE, FAILED or SHUTDOWN

Parameters:

Name Type Description Default
job_names List[str]

A list of string job_name paramerters to monitor

required
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
Python
def jobs_running(self, job_names: List[str]) -> bool:
    """Monitor job[s] on CHIME/FRB Analysis Cluster.

    Monitors job[s] on the CHIME/FRB Analysis Cluster untill they are either
    COMPLETE, FAILED or SHUTDOWN

    Args:
        job_names: A list of string job_name paramerters to monitor
    """
    running_statuses = [
        "new",
        "pending",
        "assigned",
        "accepted",
        "preparing",
        "starting",
        "running",
    ]
    if isinstance(job_names, str):
        job_names = [job_names]
    jobs_status = {}
    for job in job_names:
        status = self.get_job_status(job)
        jobs_status[job] = status
        for running in running_statuses:
            if running in status.values():
                return True  # pragma: no cover
    return False

kill_failed_jobs

Python
kill_failed_jobs(job_name: Optional[str] = None) -> Dict[str, bool]

Remove FAILED jobs with a regex match to job_name.

Parameters:

Name Type Description Default
job_name Optional[str]

Unique name for the cluster job

None

Returns:

Name Type Description
dict Dict[str, bool]

{job_name : boolean}

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
Python
def kill_failed_jobs(self, job_name: Optional[str] = None) -> Dict[str, bool]:
    """Remove FAILED jobs with a regex match to job_name.

    Args:
        job_name: Unique name for the cluster job

    Returns:
        dict: {job_name : boolean}
    """
    assert isinstance(job_name, str), "job_name <str> is required"
    status = {}
    for job in self.get_jobs():
        if job_name in job:  # pragma: no cover
            if self.get_job_status(job)[job] == "failed":
                status[job] = self.kill_job(job)[job]
    return status

kill_job

Python
kill_job(job_name: str) -> Dict[str, bool]

Remove (forcibly) job with ANY status but with an exact match to job_name.

Parameters:

Name Type Description Default
job_name str

Unique name for the cluster job

required

Returns:

Name Type Description
dict Dict[str, bool]

{job_name : boolean}

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
Python
def kill_job(self, job_name: str) -> Dict[str, bool]:
    """Remove (forcibly) job with ANY status but with an exact match to job_name.

    Args:
        job_name: Unique name for the cluster job

    Returns:
        dict: {job_name : boolean}
    """
    return self.API.get(url=f"/v1/swarm/kill-job/{job_name}")

monitor_jobs

Python
monitor_jobs(job_name: str, error_logs: bool = False) -> bool

Continously monitor job[s] on the CHIME/FRB Analysis Cluster.

Parameters:

Name Type Description Default
job_name str

Regular expression matching to the job_name

required
error_logs bool

Print error logs, by default False

False

Returns:

Name Type Description
bool bool

Status of the pipeline

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
Python
def monitor_jobs(
    self, job_name: str, error_logs: bool = False
) -> bool:  # pragma: no cover
    """Continously monitor job[s] on the CHIME/FRB Analysis Cluster.

    Args:
        job_name: Regular expression matching to the job_name
        error_logs: Print error logs, by default False

    Returns:
        bool: Status of the pipeline
    """
    log.info("================================================")
    log.info(f"Monitoring Pipeline: {job_name}")
    log.info("================================================")
    initiating = ["new", "accepted", "pending", "starting", "preparing", "assigned"]
    status = self.get_job_status(job_name)
    while any([n in initiating for n in status.values()]):
        sleep(30)
        status = self.get_job_status(job_name)
    # Initiation
    log.info("Pipeline Initiation: Complete")
    log.info("================================================")
    log.info("Pipeline Processing: Started")
    status = self.get_job_status(job_name)
    while "running" in status.values():
        log.info("Pipeline Processing: Running")
        sleep(120)
        status = self.get_job_status(job_name)
    log.info("Pipeline Processing: Complete")
    log.info("================================================")
    log.info("Pipeline Completion Status")
    completed = failed = 0
    for key, value in status.items():
        if value == "completed":
            completed += 1
        else:
            failed += 1
    log.info(f"Completed : {(completed / len(status)) * 100}%")
    log.info(f"Failed    : {(failed / len(status)) * 100}%")
    log.info("================================================")
    log.info("Pipeline Cleanup: Started")
    self.prune_jobs(job_name)
    # Make sure all jobs were pruned, if not report and kill them
    # TODO: In the future respawn "failed" jobs
    status = self.get_job_status(job_name)
    if len(status.keys()) > 0:
        log.error("Pipeline Cleanup: Failed Jobs Detected")
        for job in status.keys():
            log.error(f"Job Name : {key}")
            log.error(f"Job Removal: {self.kill_job(job)}")
        log.info("Pipeline Cleanup: Completed with Failed Jobs")
        return False
    log.info("Pipeline Cleanup: Completed")
    log.info("================================================")
    return True

prune_jobs

Python
prune_jobs(job_name: str) -> Dict[str, bool]

Remove COMPLETED jobs with a regex match to argument job_name.

Parameters:

Name Type Description Default
job_name str

Unique name for the cluster job

required

Returns:

Name Type Description
dict Dict[str, bool]

{job_name : boolean}

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
Python
def prune_jobs(self, job_name: str) -> Dict[str, bool]:
    """Remove COMPLETED jobs with a regex match to argument job_name.

    Args:
        job_name: Unique name for the cluster job

    Returns:
        dict: {job_name : boolean}
    """
    return self.API.get(url=f"/v1/swarm/prune-job/{job_name}")

spawn_baseband_job

Python
spawn_baseband_job(event_number: int, task_name: str, arguments: list = [], job_id: Optional[int] = None, image_name: str = 'chimefrb/baseband-localization:latest', command: list = ['baseband_analysis/pipelines/cluster/cluster_cli.py'], job_name: Optional[str] = None, job_mem_limit: int = 10 * 1024 ** 3, job_mem_reservation: int = 10 * 1024 ** 3, environment: dict = {}, **kwargs: dict) -> Dict[str, str]

Spawn a CHIME/FRB Baseband job on the Analysis Cluster.

Parameters:

Name Type Description Default
event_number int

ID of the event to process.

required
task_name str

Name of the task to run. Eg. localization

required
arguments list

Arguments to the command. Default: None.

[]
job_id Optional[int]

ID of the job to run. Default: None.

None
command list

The command to be run in the container. Default: cluster_cli.py.

['baseband_analysis/pipelines/cluster/cluster_cli.py']
image_name str

Name of the container image to spawn the job with Default: chimefrb/baseband-analysis:latest

'chimefrb/baseband-localization:latest'
job_name Optional[str]

Unique name for the cluster job Default: baseband-EVENT_NUMBER-TASK_NAME-UUID_CODE

None
job_mem_limit int

Memory limit of the created container in bytes Default: 10 GB

10 * 1024 ** 3
job_mem_reservation int

Minimum memory reserved of the created container in bytes. Default: 10 GB

10 * 1024 ** 3
environment dict

ENV variables to pass to the container Default: read authentication tokens from the environment

{}
kwargs

Additional parameters for spawn_job

{}
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
Python
def spawn_baseband_job(
    self,
    event_number: int,
    task_name: str,
    arguments: list = [],
    job_id: Optional[int] = None,
    image_name: str = "chimefrb/baseband-localization:latest",
    command: list = ["baseband_analysis/pipelines/cluster/cluster_cli.py"],
    job_name: Optional[str] = None,
    job_mem_limit: int = 10 * 1024**3,
    job_mem_reservation: int = 10 * 1024**3,
    environment: dict = {},
    **kwargs,
) -> Dict[str, str]:  # pragma: no cover
    """Spawn a CHIME/FRB Baseband job on the Analysis Cluster.

    Args:
        event_number: ID of the event to process.
        task_name: Name of the task to run. Eg. localization
        arguments: Arguments to the command.
                   Default: None.
        job_id: ID of the job to run.
                Default: None.
        command: The command to be run in the container.
                 Default: cluster_cli.py.
        image_name: Name of the container image to spawn the job with
                    Default: chimefrb/baseband-analysis:latest
        job_name: Unique name for the cluster job
                  Default: baseband-EVENT_NUMBER-TASK_NAME-UUID_CODE
        job_mem_limit: Memory limit of the created container in bytes
                       Default: 10 GB
        job_mem_reservation: Minimum memory reserved of the created container in
                             bytes.
                             Default: 10 GB
        environment: ENV variables to pass to the container
                     Default: read authentication tokens from the environment
        kwargs: Additional parameters for spawn_job
    """
    environment.setdefault("FRB_MASTER_ACCESS_TOKEN", self.API.access_token)
    environment.setdefault("FRB_MASTER_REFRESH_TOKEN", self.API.refresh_token)

    if job_id is None:
        job_argument = []
    else:
        job_argument = ["--job-id", str(job_id)]

    if job_name is None:
        if (job_id is None) or (job_id < 0):
            job_name = f"baseband-{event_number}-{task_name}"
        else:
            job_name = f"baseband-{event_number}-{task_name}-{job_id}"

    out = self.spawn_job(
        image_name=image_name,
        command=command + [task_name],
        arguments=["--event-number", str(event_number)]
        + job_argument
        + ["--"]
        + arguments,
        job_name=job_name,
        job_mem_limit=job_mem_limit,
        job_mem_reservation=job_mem_reservation,
        job_cpu_limit=2,
        job_cpu_reservation=2,
        environment=environment,
        **kwargs,
    )

    return out

spawn_job

Python
spawn_job(image_name: str, command: list, arguments: list, job_name: str, mount_archiver: bool = True, swarm_network: bool = True, job_mem_limit: int = 4294967296, job_mem_reservation: int = 268435456, job_cpu_limit: float = 1, job_cpu_reservation: float = 1, environment: dict = {}) -> Dict[str, str]

Spawn a job on the CHIME/FRB Analysis Cluster.

Parameters:

Name Type Description Default
image_name str

Name of the container image

required
command list

Command to run in the container

required
arguments list

Arguments to the command

required
job_name str

Unique name for the job

required
mount_archiver bool

Mount Site Data Archivers, by default True

True
swarm_network bool

Mount Cluster Network, by default True

True
job_mem_limit int

Memory limit in bytes, by default 4294967296

4294967296
job_mem_reservation int

Minimum memory reserved, by default 268435456

268435456
job_cpu_limit float

Maximum cpu cores job can use, by default 1

1
job_cpu_reservation float

Minimum cores reservers for the job, default 1

1
environment dict

ENV to pass to the container, default is {}

{}

Returns:

Type Description
Dict[str, str]

JSON [description]

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/swarm.py
Python
def spawn_job(
    self,
    image_name: str,
    command: list,
    arguments: list,
    job_name: str,
    mount_archiver: bool = True,
    swarm_network: bool = True,
    job_mem_limit: int = 4294967296,
    job_mem_reservation: int = 268435456,
    job_cpu_limit: float = 1,
    job_cpu_reservation: float = 1,
    environment: dict = {},
) -> Dict[str, str]:
    """Spawn a job on the CHIME/FRB Analysis Cluster.

    Args:
        image_name: Name of the container image
        command: Command to run in the container
        arguments: Arguments to the command
        job_name: Unique name for the job
        mount_archiver: Mount Site Data Archivers, by default True
        swarm_network: Mount Cluster Network, by default True
        job_mem_limit: Memory limit in bytes, by default 4294967296
        job_mem_reservation: Minimum memory reserved, by default 268435456
        job_cpu_limit: Maximum cpu cores job can use, by default 1
        job_cpu_reservation: Minimum cores reservers for the job, default 1
        environment: ENV to pass to the container, default is {}

    Returns:
        JSON
            [description]
    """
    payload = {
        "image_name": image_name,
        "command": command,
        "arguments": arguments,
        "job_name": job_name,
        "mount_archiver": mount_archiver,
        "swarm_network": swarm_network,
        "job_mem_reservation": job_mem_reservation,
        "job_mem_limit": job_mem_limit,
        "job_cpu_limit": job_cpu_limit,
        "job_cpu_reservation": job_cpu_reservation,
        "environment": environment,
    }
    return self.API.post(url="/v1/swarm/spawn-job", json=payload)

Workflow

Bases: BaseModel

Work Object.

Parameters:

Name Type Description Default
BaseModel BaseModel

Pydantic BaseModel.

required

Attributes:

Name Type Description
pipeline str

Name of the pipeline. (Required) Automatically reformated to hyphen-case.

site str

Site where the work will be performed. (Required)

user str

User who created the work. (Required)

function Optional[str]

Name of the function ran as function(**parameters).

command Optional[List[str]]

Command to run as subprocess.run(command).

parameters Optional[Dict[str, Any]]

Parameters to pass to the function.

command Optional[List[str]]

Command to run as subprocess.run(command).

results Optional[Dict[str, Any]]

Results of the work.

products Optional[Dict[str, Any]]

Products of the work.

plots Optional[Dict[str, Any]]

Plots of the work.

event Optional[List[int]]

Event ID of the work.

tags Optional[List[str]]

Tags of the work.

timeout int

Timeout for the work in seconds. Default is 3600 seconds.

retries int

Number of retries for the work. Default is 2 retries.

priority int

Priority of the work. Default is 3.

config WorkConfig

Configuration of the work.

notify Notify

Notification configuration of the work.

id str

ID of the work.

creation float

Creation time of the work.

start float

Start time of the work.

stop float

Stop time of the work.

status str

Status of the work.

Raises:

Type Description
ValueError

If the work is not valid.

Returns:

Name Type Description
Work

Work object.

Example
Python
from chime_frb_api.workflow import Work

work = Work(pipeline="test-pipeline", site="chime", user="shinybrar")
work.deposit(return_ids=True)

payload property

Python
payload: Dict[str, Any]

Return the dictioanary representation of the work.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The payload of the work.

Dict[str, Any]

Non-instanced attributes are excluded from the payload.

Config

Pydantic Config.

delete

Python
delete(**kwargs: Dict[str, Any]) -> bool

Delete work from the buckets backend.

Parameters:

Name Type Description Default
ids List[str]

List of ids to delete.

required

Returns:

Name Type Description
bool bool

True if successful, False otherwise.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
Python
@retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
def delete(self, **kwargs: Dict[str, Any]) -> bool:
    """Delete work from the buckets backend.

    Args:
        ids (List[str]): List of ids to delete.

    Returns:
        bool: True if successful, False otherwise.
    """
    buckets = Buckets(**kwargs)  # type: ignore
    return buckets.delete_ids([str(self.id)])

deposit

Python
deposit(return_ids: bool = False, **kwargs: Dict[str, Any]) -> Union[bool, List[str]]

Deposit work to the buckets backend.

Parameters:

Name Type Description Default
**kwargs Dict[str, Any]

Keyword arguments for the Buckets API.

{}

Returns:

Name Type Description
bool Union[bool, List[str]]

True if successful, False otherwise.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
Python
@retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
def deposit(
    self, return_ids: bool = False, **kwargs: Dict[str, Any]
) -> Union[bool, List[str]]:
    """Deposit work to the buckets backend.

    Args:
        **kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.

    Returns:
        bool: True if successful, False otherwise.
    """
    buckets = Buckets(**kwargs)  # type: ignore
    return buckets.deposit(works=[self.payload], return_ids=return_ids)

from_dict classmethod

Python
from_dict(payload: Dict[str, Any]) -> Work

Create a work from a dictionary.

Parameters:

Name Type Description Default
payload Dict[str, Any]

The dictionary.

required

Returns:

Name Type Description
Work Work

The work.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
Python
@classmethod
def from_dict(cls, payload: Dict[str, Any]) -> "Work":
    """Create a work from a dictionary.

    Args:
        payload (Dict[str, Any]): The dictionary.

    Returns:
        Work: The work.
    """
    return cls(**payload)

from_json classmethod

Python
from_json(json_str: str) -> Work

Create a work from a json string.

Parameters:

Name Type Description Default
json_str str

The json string.

required

Returns:

Name Type Description
Work Work

The work.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
Python
@classmethod
def from_json(cls, json_str: str) -> "Work":
    """Create a work from a json string.

    Args:
        json_str (str): The json string.

    Returns:
        Work: The work.
    """
    return cls(**loads(json_str))

post_init

Python
post_init(values: Dict[str, Any])

Initialize work attributes after validation.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
Python
@root_validator
def post_init(cls, values: Dict[str, Any]):
    """Initialize work attributes after validation."""
    # Check if the pipeline name has any character that is uppercase
    reformatted: bool = False
    for char in values["pipeline"]:
        if char.isupper():
            values["pipeline"] = values["pipeline"].lower()
            reformatted = True
            break

    if any(char in {" ", "_"} for char in values["pipeline"]):
        values["pipeline"] = values["pipeline"].replace(" ", "-")
        values["pipeline"] = values["pipeline"].replace("_", "-")
        reformatted = True

    # Check if the pipeline has any character that is not alphanumeric or dash
    for char in values["pipeline"]:
        if not char.isalnum() and char not in ["-"]:
            raise ValueError(
                "pipeline name can only contain letters, numbers & dashes"
            )

    if reformatted:
        warn(
            SyntaxWarning(f"pipeline reformatted to {values['pipeline']}"),
            stacklevel=2,
        )

    # Set creation time if not already set
    if values.get("creation") is None:
        values["creation"] = time()
    # Update tags from environment variable WORKFLOW_TAGS
    if environ.get("WORKFLOW_TAGS"):
        env_tags: List[str] = str(environ.get("WORKFLOW_TAGS")).split(",")
        # If tags are already set, append the new ones
        if values.get("tags"):
            values["tags"] = values["tags"] + env_tags
        else:
            values["tags"] = env_tags
        # Remove duplicates
        values["tags"] = list(set(values["tags"]))

    # Check if both command and function are set
    if values.get("command") and values.get("function"):
        raise ValueError("command and function cannot be set together.")

    if not values.get("token"):  # type: ignore
        msg = "workflow token required after v4.0.0."
        warn(
            FutureWarning(msg),
            stacklevel=2,
        )
    return values

update

Python
update(**kwargs: Dict[str, Any]) -> bool

Update work in the buckets backend.

Parameters:

Name Type Description Default
**kwargs Dict[str, Any]

Keyword arguments for the Buckets API.

{}

Returns:

Name Type Description
bool bool

True if successful, False otherwise.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
Python
@retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
def update(self, **kwargs: Dict[str, Any]) -> bool:
    """Update work in the buckets backend.

    Args:
        **kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.

    Returns:
        bool: True if successful, False otherwise.
    """
    buckets = Buckets(**kwargs)  # type: ignore
    return buckets.update([self.payload])

withdraw classmethod

Python
withdraw(pipeline: str, event: Optional[List[int]] = None, site: Optional[str] = None, priority: Optional[int] = None, user: Optional[str] = None, tags: Optional[List[str]] = None, parent: Optional[str] = None, **kwargs: Dict[str, Any]) -> Optional[Work]

Withdraw work from the buckets backend.

Parameters:

Name Type Description Default
pipeline str

Name of the pipeline.

required
**kwargs Dict[str, Any]

Keyword arguments for the Buckets API.

{}

Returns:

Name Type Description
Work Optional[Work]

Work object.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
Python
@classmethod
def withdraw(
    cls,
    pipeline: str,
    event: Optional[List[int]] = None,
    site: Optional[str] = None,
    priority: Optional[int] = None,
    user: Optional[str] = None,
    tags: Optional[List[str]] = None,
    parent: Optional[str] = None,
    **kwargs: Dict[str, Any],
) -> Optional["Work"]:
    """Withdraw work from the buckets backend.

    Args:
        pipeline (str): Name of the pipeline.
        **kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.

    Returns:
        Work: Work object.
    """
    buckets = Buckets(**kwargs)  # type: ignore
    payload = buckets.withdraw(
        pipeline=pipeline,
        event=event,
        site=site,
        priority=priority,
        user=user,
        tags=tags,
        parent=parent,
    )
    if payload:
        return cls.from_dict(payload)
    return None

Results

Bases: API

CHIME/FRB Backend workflow Results API.

Initialize the workflow Results API.

Parameters:

Name Type Description Default
debug bool

Whether to enable debug mode. Defaults to False.

False
base_url str

The base URL of the API. Defaults to "http://localhost:8005".

'http://localhost:8005'
authentication bool

Whether to enable authentication. Defaults to False.

False
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/results.py
Python
def __init__(
    self,
    debug: bool = False,
    base_url: str = "http://localhost:8005",
    authentication: bool = False,
    **kwargs: Dict[str, Any],
):
    """Initialize the workflow Results API.

    Args:
        debug (bool, optional): Whether to enable debug mode. Defaults to False.
        base_url (str, optional): The base URL of the API.
            Defaults to "http://localhost:8005".
        authentication (bool, optional): Whether to enable authentication.
            Defaults to False.
    """
    API.__init__(
        self,
        debug=debug,
        default_base_urls=[
            "http://frb-vsop.chime:8005",
            "http://localhost:8005",
            "https://frb.chimenet.ca/results/",
        ],
        base_url=base_url,
        authentication=authentication,
        **kwargs,
    )

count

Python
count(pipeline: str, query: Dict[str, Any]) -> int

Retrieve the number of results filtered by the query parameters.

Parameters:

Name Type Description Default
pipeline str

Name of pipeline to query results.

required
query Dict[str, Any]

The query to filter the results with.

required

Returns:

Name Type Description
int int

The number of results that match the query.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/results.py
Python
def count(
    self,
    pipeline: str,
    query: Dict[str, Any],
) -> int:
    """Retrieve the number of results filtered by the query parameters.

    Args:
        pipeline (str): Name of pipeline to query results.
        query (Dict[str, Any]): The query to filter the results with.

    Returns:
        int: The number of results that match the query.
    """
    query["pipeline"] = pipeline
    payload = {
        "query": query,
    }
    response: int = self.post("/view/count", json=payload)
    return response

delete_ids

Python
delete_ids(pipeline: str, ids: List[str]) -> bool

Delete results from the results backend with the given ids.

Parameters:

Name Type Description Default
pipeline str

Name of pipeline that the IDs are from.

required
ids List[str]

The IDs of the works to delete.

required

Returns:

Name Type Description
bool bool

Whether the results were deleted successfully.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/results.py
Python
def delete_ids(self, pipeline: str, ids: List[str]) -> bool:
    """Delete results from the results backend with the given ids.

    Args:
        pipeline (str): Name of pipeline that the IDs are from.
        ids (List[str]): The IDs of the works to delete.

    Returns:
        bool: Whether the results were deleted successfully.
    """
    return self.delete(url="/results", params={pipeline: ids})

deposit

Python
deposit(works: List[Dict[str, Any]]) -> Dict[str, bool]

Deposit works into the results backend.

Parameters:

Name Type Description Default
works List[Dict[str, Any]]

A list of payloads from Work Objects.

required

Note: This method is not intended to be called directly by end users.

Returns:

Type Description
Dict[str, bool]

Dict[str, bool]: Dictionary of deposit results for each pipeline.

Examples:

from chime_frb_api.results import Results from chime_frb_api.tasks import Work work = Work.fetch(pipeline="sample") results = Results() status = results.deposit([work.payload])

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/results.py
Python
def deposit(self, works: List[Dict[str, Any]]) -> Dict[str, bool]:
    """Deposit works into the results backend.

    Args:
        works (List[Dict[str, Any]]): A list of payloads from Work Objects.
    Note:
        This method is not intended to be called directly by end users.

    Returns:
        Dict[str, bool]: Dictionary of deposit results for each pipeline.

    Examples:
    >>> from chime_frb_api.results import Results
    >>> from chime_frb_api.tasks import Work
    >>> work = Work.fetch(pipeline="sample")
    >>> results = Results()
    >>> status = results.deposit([work.payload])
    """
    for work in works:
        assert work["status"] in [
            "success",
            "failure",
        ], "Work status must be 'success' or 'failure'"
    return self.post(url="/results", json=works)

status

Python
status() -> Dict[str, int]

Retrieve the overall status of the results backend.

Returns:

Type Description
Dict[str, int]

Dict[str, int]: The status of the results backend.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/results.py
Python
def status(self) -> Dict[str, int]:
    """Retrieve the overall status of the results backend.

    Returns:
        Dict[str, int]: The status of the results backend.
    """
    response: Dict[str, int] = self.get("/status")
    return response

update

Python
update(works: List[Dict[str, Any]]) -> bool

Update works in the results backend.

Parameters:

Name Type Description Default
works List[Dict[str, Any]]

A list of payloads from Work Objects.

required

Note: The results need to exist before they can be updated.

Returns:

Name Type Description
bool bool

Whether the works were updated successfully.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/results.py
Python
def update(self, works: List[Dict[str, Any]]) -> bool:
    """Update works in the results backend.

    Args:
        works (List[Dict[str, Any]]): A list of payloads from Work Objects.
    Note:
        The results need to exist before they can be updated.

    Returns:
        bool: Whether the works were updated successfully.
    """
    response: bool = self.put(url="/results", json=works)
    return response

view

Python
view(pipeline: str, query: Dict[str, Any], projection: Dict[str, bool], skip: int = 0, limit: Optional[int] = 100) -> List[Dict[str, Any]]

View works in the workflow results backend.

Parameters:

Name Type Description Default
pipeline str

Name of pipeline to query results.

required
query Dict[str, Any]

The query to filter the results with.

required
projection Dict[str, bool]

The projection to use to map the output.

required
skip int

The number of works to skip. Defaults to 0.

0
limit Optional[int]

The number of works to limit to. Defaults to 100.

100

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: The works matching the query.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/modules/results.py
Python
def view(
    self,
    pipeline: str,
    query: Dict[str, Any],
    projection: Dict[str, bool],
    skip: int = 0,
    limit: Optional[int] = 100,
) -> List[Dict[str, Any]]:
    """View works in the workflow results backend.

    Args:
        pipeline (str): Name of pipeline to query results.
        query (Dict[str, Any]): The query to filter the results with.
        projection (Dict[str, bool]): The projection to use to map the output.
        skip (int, optional): The number of works to skip. Defaults to 0.
        limit (Optional[int], optional): The number of works to limit to.
            Defaults to 100.

    Returns:
        List[Dict[str, Any]]: The works matching the query.
    """
    query["pipeline"] = pipeline
    payload = {
        "query": query,
        "projection": projection,
        "skip": skip,
        "limit": limit,
    }
    response: List[Dict[str, Any]] = self.post("/view", json=payload)
    return response