Skip to content

Low-Level API

Swarm

chime_frb_api.modules.swarm.Swarm

CHIME/FRB Swarm API.

Parameters:

Name Type Description Default
API

chime_frb_api.core.API Base class handling the actual HTTP requests.

required
Source code in chime_frb_api/modules/swarm.py
class Swarm:
    """CHIME/FRB Swarm API.

    Args:
        API : chime_frb_api.core.API
            Base class handling the actual HTTP requests.
    """

    def __init__(self, API: API):
        """Initialize the Swarm API."""
        self.API = API

    def get_jobs(self) -> List[str]:
        """Returns the name of all jobs on the analysis cluster.

        Args:
            None

        Returns:
            List[str]: List of job names.
        """
        jobs: List[str] = self.API.get(url="/v1/swarm/jobs")
        return jobs

    def get_job_status(self, job_name: str) -> Dict[str, str]:
        """Get job[s] status with a regex match to argument job_name.

        Args:
            job_name: Name of the job

        Returns:
            { job_name : STATUS } : dict

            Where STATUS can be,
            NEW         The job was initialized.
            PENDING     Resources for the job were allocated.
            ASSIGNED    Docker assigned the job to nodes.
            ACCEPTED    The job was accepted by a worker node.
            PREPARING   Docker is preparing the job.
            STARTING    Docker is starting the job.
            RUNNING     The job is executing.
            COMPLETE    The job exited without an error code.
            FAILED      The job exited with an error code.
            SHUTDOWN    Docker requested the job to shut down.
            REJECTED    The worker node rejected the job.
            ORPHANED    The node was down for too long.
            REMOVE      The job is not terminal but the associated job was removed
        """
        return self.API.get(f"/v1/swarm/job-status/{job_name}")

    def spawn_job(
        self,
        image_name: str,
        command: list,
        arguments: list,
        job_name: str,
        mount_archiver: bool = True,
        swarm_network: bool = True,
        job_mem_limit: int = 4294967296,
        job_mem_reservation: int = 268435456,
        job_cpu_limit: float = 1,
        job_cpu_reservation: float = 1,
        environment: dict = {},
    ) -> Dict[str, str]:
        """Spawn a job on the CHIME/FRB Analysis Cluster.

        Args:
            image_name: Name of the container image
            command: Command to run in the container
            arguments: Arguments to the command
            job_name: Unique name for the job
            mount_archiver: Mount Site Data Archivers, by default True
            swarm_network: Mount Cluster Network, by default True
            job_mem_limit: Memory limit in bytes, by default 4294967296
            job_mem_reservation: Minimum memory reserved, by default 268435456
            job_cpu_limit: Maximum cpu cores job can use, by default 1
            job_cpu_reservation: Minimum cores reservers for the job, default 1
            environment: ENV to pass to the container, default is {}

        Returns:
            JSON
                [description]
        """
        payload = {
            "image_name": image_name,
            "command": command,
            "arguments": arguments,
            "job_name": job_name,
            "mount_archiver": mount_archiver,
            "swarm_network": swarm_network,
            "job_mem_reservation": job_mem_reservation,
            "job_mem_limit": job_mem_limit,
            "job_cpu_limit": job_cpu_limit,
            "job_cpu_reservation": job_cpu_reservation,
            "environment": environment,
        }
        return self.API.post(url="/v1/swarm/spawn-job", json=payload)

    def get_logs(self, job_name: str) -> Dict[str, str]:
        """Return logs from a CHIME/FRB Job.

        Args:
            job_name: Unique name for the cluster job

        Returns:
            job_logs : dict
        """
        return self.API.get(f"/v1/swarm/logs/{job_name}")

    def prune_jobs(self, job_name: str) -> Dict[str, bool]:
        """Remove COMPLETED jobs with a regex match to argument job_name.

        Args:
            job_name: Unique name for the cluster job

        Returns:
            dict: {job_name : boolean}
        """
        return self.API.get(url=f"/v1/swarm/prune-job/{job_name}")

    def kill_job(self, job_name: str) -> Dict[str, bool]:
        """Remove (forcibly) job with ANY status but with an exact match to job_name.

        Args:
            job_name: Unique name for the cluster job

        Returns:
            dict: {job_name : boolean}
        """
        return self.API.get(url=f"/v1/swarm/kill-job/{job_name}")

    def kill_failed_jobs(self, job_name: Optional[str] = None) -> Dict[str, bool]:
        """Remove FAILED jobs with a regex match to job_name.

        Args:
            job_name: Unique name for the cluster job

        Returns:
            dict: {job_name : boolean}
        """
        assert isinstance(job_name, str), "job_name <str> is required"
        status = {}
        for job in self.get_jobs():
            if job_name in job:  # pragma: no cover
                if self.get_job_status(job)[job] == "failed":
                    status[job] = self.kill_job(job)[job]
        return status

    def jobs_running(self, job_names: List[str]) -> bool:
        """Monitor job[s] on CHIME/FRB Analysis Cluster.

        Monitors job[s] on the CHIME/FRB Analysis Cluster untill they are either
        COMPLETE, FAILED or SHUTDOWN

        Args:
            job_names: A list of string job_name paramerters to monitor
        """
        running_statuses = [
            "new",
            "pending",
            "assigned",
            "accepted",
            "preparing",
            "starting",
            "running",
        ]
        if isinstance(job_names, str):
            job_names = [job_names]
        jobs_status = {}
        for job in job_names:
            status = self.get_job_status(job)
            jobs_status[job] = status
            for running in running_statuses:
                if running in status.values():
                    return True  # pragma: no cover
        return False

    def monitor_jobs(
        self, job_name: str, error_logs: bool = False
    ) -> bool:  # pragma: no cover
        """Continously monitor job[s] on the CHIME/FRB Analysis Cluster.

        Args:
            job_name: Regular expression matching to the job_name
            error_logs: Print error logs, by default False

        Returns:
            bool: Status of the pipeline
        """
        log.info("================================================")
        log.info(f"Monitoring Pipeline: {job_name}")
        log.info("================================================")
        initiating = ["new", "accepted", "pending", "starting", "preparing", "assigned"]
        status = self.get_job_status(job_name)
        while any([n in initiating for n in status.values()]):
            sleep(30)
            status = self.get_job_status(job_name)
        # Initiation
        log.info("Pipeline Initiation: Complete")
        log.info("================================================")
        log.info("Pipeline Processing: Started")
        status = self.get_job_status(job_name)
        while "running" in status.values():
            log.info("Pipeline Processing: Running")
            sleep(120)
            status = self.get_job_status(job_name)
        log.info("Pipeline Processing: Complete")
        log.info("================================================")
        log.info("Pipeline Completion Status")
        completed = failed = 0
        for key, value in status.items():
            if value == "completed":
                completed += 1
            else:
                failed += 1
        log.info(f"Completed : {(completed / len(status)) * 100}%")
        log.info(f"Failed    : {(failed / len(status)) * 100}%")
        log.info("================================================")
        log.info("Pipeline Cleanup: Started")
        self.prune_jobs(job_name)
        # Make sure all jobs were pruned, if not report and kill them
        # TODO: In the future respawn "failed" jobs
        status = self.get_job_status(job_name)
        if len(status.keys()) > 0:
            log.error("Pipeline Cleanup: Failed Jobs Detected")
            for job in status.keys():
                log.error(f"Job Name : {key}")
                log.error(f"Job Removal: {self.kill_job(job)}")
            log.info("Pipeline Cleanup: Completed with Failed Jobs")
            return False
        log.info("Pipeline Cleanup: Completed")
        log.info("================================================")
        return True

    def spawn_baseband_job(
        self,
        event_number: int,
        task_name: str,
        arguments: list = [],
        job_id: Optional[int] = None,
        image_name: str = "chimefrb/baseband-localization:latest",
        command: list = ["baseband_analysis/pipelines/cluster/cluster_cli.py"],
        job_name: Optional[str] = None,
        job_mem_limit: int = 10 * 1024**3,
        job_mem_reservation: int = 10 * 1024**3,
        environment: dict = {},
        **kwargs,
    ) -> Dict[str, str]:  # pragma: no cover
        """Spawn a CHIME/FRB Baseband job on the Analysis Cluster.

        Args:
            event_number: ID of the event to process.
            task_name: Name of the task to run. Eg. localization
            arguments: Arguments to the command.
                       Default: None.
            job_id: ID of the job to run.
                    Default: None.
            command: The command to be run in the container.
                     Default: cluster_cli.py.
            image_name: Name of the container image to spawn the job with
                        Default: chimefrb/baseband-analysis:latest
            job_name: Unique name for the cluster job
                      Default: baseband-EVENT_NUMBER-TASK_NAME-UUID_CODE
            job_mem_limit: Memory limit of the created container in bytes
                           Default: 10 GB
            job_mem_reservation: Minimum memory reserved of the created container in
                                 bytes.
                                 Default: 10 GB
            environment: ENV variables to pass to the container
                         Default: read authentication tokens from the environment
            kwargs: Additional parameters for spawn_job
        """
        environment.setdefault("FRB_MASTER_ACCESS_TOKEN", self.API.access_token)
        environment.setdefault("FRB_MASTER_REFRESH_TOKEN", self.API.refresh_token)

        if job_id is None:
            job_argument = []
        else:
            job_argument = ["--job-id", str(job_id)]

        if job_name is None:
            if (job_id is None) or (job_id < 0):
                job_name = f"baseband-{event_number}-{task_name}"
            else:
                job_name = f"baseband-{event_number}-{task_name}-{job_id}"

        out = self.spawn_job(
            image_name=image_name,
            command=command + [task_name],
            arguments=["--event-number", str(event_number)]
            + job_argument
            + ["--"]
            + arguments,
            job_name=job_name,
            job_mem_limit=job_mem_limit,
            job_mem_reservation=job_mem_reservation,
            job_cpu_limit=2,
            job_cpu_reservation=2,
            environment=environment,
            **kwargs,
        )

        return out
Functions
__init__(API)

Initialize the Swarm API.

Source code in chime_frb_api/modules/swarm.py
def __init__(self, API: API):
    """Initialize the Swarm API."""
    self.API = API
get_job_status(job_name)

Get job[s] status with a regex match to argument job_name.

Parameters:

Name Type Description Default
job_name str

Name of the job

required

Returns:

Type Description
Dict[str, str]

{ job_name : STATUS } : dict

Dict[str, str]

Where STATUS can be,

Dict[str, str]

NEW The job was initialized.

Dict[str, str]

PENDING Resources for the job were allocated.

Dict[str, str]

ASSIGNED Docker assigned the job to nodes.

Dict[str, str]

ACCEPTED The job was accepted by a worker node.

Dict[str, str]

PREPARING Docker is preparing the job.

Dict[str, str]

STARTING Docker is starting the job.

Dict[str, str]

RUNNING The job is executing.

Dict[str, str]

COMPLETE The job exited without an error code.

Dict[str, str]

FAILED The job exited with an error code.

Dict[str, str]

SHUTDOWN Docker requested the job to shut down.

Dict[str, str]

REJECTED The worker node rejected the job.

Dict[str, str]

ORPHANED The node was down for too long.

Dict[str, str]

REMOVE The job is not terminal but the associated job was removed

Source code in chime_frb_api/modules/swarm.py
def get_job_status(self, job_name: str) -> Dict[str, str]:
    """Get job[s] status with a regex match to argument job_name.

    Args:
        job_name: Name of the job

    Returns:
        { job_name : STATUS } : dict

        Where STATUS can be,
        NEW         The job was initialized.
        PENDING     Resources for the job were allocated.
        ASSIGNED    Docker assigned the job to nodes.
        ACCEPTED    The job was accepted by a worker node.
        PREPARING   Docker is preparing the job.
        STARTING    Docker is starting the job.
        RUNNING     The job is executing.
        COMPLETE    The job exited without an error code.
        FAILED      The job exited with an error code.
        SHUTDOWN    Docker requested the job to shut down.
        REJECTED    The worker node rejected the job.
        ORPHANED    The node was down for too long.
        REMOVE      The job is not terminal but the associated job was removed
    """
    return self.API.get(f"/v1/swarm/job-status/{job_name}")
get_jobs()

Returns the name of all jobs on the analysis cluster.

Returns:

Type Description
List[str]

List[str]: List of job names.

Source code in chime_frb_api/modules/swarm.py
def get_jobs(self) -> List[str]:
    """Returns the name of all jobs on the analysis cluster.

    Args:
        None

    Returns:
        List[str]: List of job names.
    """
    jobs: List[str] = self.API.get(url="/v1/swarm/jobs")
    return jobs
get_logs(job_name)

Return logs from a CHIME/FRB Job.

Parameters:

Name Type Description Default
job_name str

Unique name for the cluster job

required

Returns:

Name Type Description
job_logs Dict[str, str]

dict

Source code in chime_frb_api/modules/swarm.py
def get_logs(self, job_name: str) -> Dict[str, str]:
    """Return logs from a CHIME/FRB Job.

    Args:
        job_name: Unique name for the cluster job

    Returns:
        job_logs : dict
    """
    return self.API.get(f"/v1/swarm/logs/{job_name}")
jobs_running(job_names)

Monitor job[s] on CHIME/FRB Analysis Cluster.

Monitors job[s] on the CHIME/FRB Analysis Cluster untill they are either COMPLETE, FAILED or SHUTDOWN

Parameters:

Name Type Description Default
job_names List[str]

A list of string job_name paramerters to monitor

required
Source code in chime_frb_api/modules/swarm.py
def jobs_running(self, job_names: List[str]) -> bool:
    """Monitor job[s] on CHIME/FRB Analysis Cluster.

    Monitors job[s] on the CHIME/FRB Analysis Cluster untill they are either
    COMPLETE, FAILED or SHUTDOWN

    Args:
        job_names: A list of string job_name paramerters to monitor
    """
    running_statuses = [
        "new",
        "pending",
        "assigned",
        "accepted",
        "preparing",
        "starting",
        "running",
    ]
    if isinstance(job_names, str):
        job_names = [job_names]
    jobs_status = {}
    for job in job_names:
        status = self.get_job_status(job)
        jobs_status[job] = status
        for running in running_statuses:
            if running in status.values():
                return True  # pragma: no cover
    return False
kill_failed_jobs(job_name=None)

Remove FAILED jobs with a regex match to job_name.

Parameters:

Name Type Description Default
job_name Optional[str]

Unique name for the cluster job

None

Returns:

Name Type Description
dict Dict[str, bool]

{job_name : boolean}

Source code in chime_frb_api/modules/swarm.py
def kill_failed_jobs(self, job_name: Optional[str] = None) -> Dict[str, bool]:
    """Remove FAILED jobs with a regex match to job_name.

    Args:
        job_name: Unique name for the cluster job

    Returns:
        dict: {job_name : boolean}
    """
    assert isinstance(job_name, str), "job_name <str> is required"
    status = {}
    for job in self.get_jobs():
        if job_name in job:  # pragma: no cover
            if self.get_job_status(job)[job] == "failed":
                status[job] = self.kill_job(job)[job]
    return status
kill_job(job_name)

Remove (forcibly) job with ANY status but with an exact match to job_name.

Parameters:

Name Type Description Default
job_name str

Unique name for the cluster job

required

Returns:

Name Type Description
dict Dict[str, bool]

{job_name : boolean}

Source code in chime_frb_api/modules/swarm.py
def kill_job(self, job_name: str) -> Dict[str, bool]:
    """Remove (forcibly) job with ANY status but with an exact match to job_name.

    Args:
        job_name: Unique name for the cluster job

    Returns:
        dict: {job_name : boolean}
    """
    return self.API.get(url=f"/v1/swarm/kill-job/{job_name}")
monitor_jobs(job_name, error_logs=False)

Continously monitor job[s] on the CHIME/FRB Analysis Cluster.

Parameters:

Name Type Description Default
job_name str

Regular expression matching to the job_name

required
error_logs bool

Print error logs, by default False

False

Returns:

Name Type Description
bool bool

Status of the pipeline

Source code in chime_frb_api/modules/swarm.py
def monitor_jobs(
    self, job_name: str, error_logs: bool = False
) -> bool:  # pragma: no cover
    """Continously monitor job[s] on the CHIME/FRB Analysis Cluster.

    Args:
        job_name: Regular expression matching to the job_name
        error_logs: Print error logs, by default False

    Returns:
        bool: Status of the pipeline
    """
    log.info("================================================")
    log.info(f"Monitoring Pipeline: {job_name}")
    log.info("================================================")
    initiating = ["new", "accepted", "pending", "starting", "preparing", "assigned"]
    status = self.get_job_status(job_name)
    while any([n in initiating for n in status.values()]):
        sleep(30)
        status = self.get_job_status(job_name)
    # Initiation
    log.info("Pipeline Initiation: Complete")
    log.info("================================================")
    log.info("Pipeline Processing: Started")
    status = self.get_job_status(job_name)
    while "running" in status.values():
        log.info("Pipeline Processing: Running")
        sleep(120)
        status = self.get_job_status(job_name)
    log.info("Pipeline Processing: Complete")
    log.info("================================================")
    log.info("Pipeline Completion Status")
    completed = failed = 0
    for key, value in status.items():
        if value == "completed":
            completed += 1
        else:
            failed += 1
    log.info(f"Completed : {(completed / len(status)) * 100}%")
    log.info(f"Failed    : {(failed / len(status)) * 100}%")
    log.info("================================================")
    log.info("Pipeline Cleanup: Started")
    self.prune_jobs(job_name)
    # Make sure all jobs were pruned, if not report and kill them
    # TODO: In the future respawn "failed" jobs
    status = self.get_job_status(job_name)
    if len(status.keys()) > 0:
        log.error("Pipeline Cleanup: Failed Jobs Detected")
        for job in status.keys():
            log.error(f"Job Name : {key}")
            log.error(f"Job Removal: {self.kill_job(job)}")
        log.info("Pipeline Cleanup: Completed with Failed Jobs")
        return False
    log.info("Pipeline Cleanup: Completed")
    log.info("================================================")
    return True
prune_jobs(job_name)

Remove COMPLETED jobs with a regex match to argument job_name.

Parameters:

Name Type Description Default
job_name str

Unique name for the cluster job

required

Returns:

Name Type Description
dict Dict[str, bool]

{job_name : boolean}

Source code in chime_frb_api/modules/swarm.py
def prune_jobs(self, job_name: str) -> Dict[str, bool]:
    """Remove COMPLETED jobs with a regex match to argument job_name.

    Args:
        job_name: Unique name for the cluster job

    Returns:
        dict: {job_name : boolean}
    """
    return self.API.get(url=f"/v1/swarm/prune-job/{job_name}")
spawn_baseband_job(event_number, task_name, arguments=[], job_id=None, image_name='chimefrb/baseband-localization:latest', command=['baseband_analysis/pipelines/cluster/cluster_cli.py'], job_name=None, job_mem_limit=10 * 1024 ** 3, job_mem_reservation=10 * 1024 ** 3, environment={}, **kwargs)

Spawn a CHIME/FRB Baseband job on the Analysis Cluster.

Parameters:

Name Type Description Default
event_number int

ID of the event to process.

required
task_name str

Name of the task to run. Eg. localization

required
arguments list

Arguments to the command. Default: None.

[]
job_id Optional[int]

ID of the job to run. Default: None.

None
command list

The command to be run in the container. Default: cluster_cli.py.

['baseband_analysis/pipelines/cluster/cluster_cli.py']
image_name str

Name of the container image to spawn the job with Default: chimefrb/baseband-analysis:latest

'chimefrb/baseband-localization:latest'
job_name Optional[str]

Unique name for the cluster job Default: baseband-EVENT_NUMBER-TASK_NAME-UUID_CODE

None
job_mem_limit int

Memory limit of the created container in bytes Default: 10 GB

10 * 1024 ** 3
job_mem_reservation int

Minimum memory reserved of the created container in bytes. Default: 10 GB

10 * 1024 ** 3
environment dict

ENV variables to pass to the container Default: read authentication tokens from the environment

{}
kwargs

Additional parameters for spawn_job

{}
Source code in chime_frb_api/modules/swarm.py
def spawn_baseband_job(
    self,
    event_number: int,
    task_name: str,
    arguments: list = [],
    job_id: Optional[int] = None,
    image_name: str = "chimefrb/baseband-localization:latest",
    command: list = ["baseband_analysis/pipelines/cluster/cluster_cli.py"],
    job_name: Optional[str] = None,
    job_mem_limit: int = 10 * 1024**3,
    job_mem_reservation: int = 10 * 1024**3,
    environment: dict = {},
    **kwargs,
) -> Dict[str, str]:  # pragma: no cover
    """Spawn a CHIME/FRB Baseband job on the Analysis Cluster.

    Args:
        event_number: ID of the event to process.
        task_name: Name of the task to run. Eg. localization
        arguments: Arguments to the command.
                   Default: None.
        job_id: ID of the job to run.
                Default: None.
        command: The command to be run in the container.
                 Default: cluster_cli.py.
        image_name: Name of the container image to spawn the job with
                    Default: chimefrb/baseband-analysis:latest
        job_name: Unique name for the cluster job
                  Default: baseband-EVENT_NUMBER-TASK_NAME-UUID_CODE
        job_mem_limit: Memory limit of the created container in bytes
                       Default: 10 GB
        job_mem_reservation: Minimum memory reserved of the created container in
                             bytes.
                             Default: 10 GB
        environment: ENV variables to pass to the container
                     Default: read authentication tokens from the environment
        kwargs: Additional parameters for spawn_job
    """
    environment.setdefault("FRB_MASTER_ACCESS_TOKEN", self.API.access_token)
    environment.setdefault("FRB_MASTER_REFRESH_TOKEN", self.API.refresh_token)

    if job_id is None:
        job_argument = []
    else:
        job_argument = ["--job-id", str(job_id)]

    if job_name is None:
        if (job_id is None) or (job_id < 0):
            job_name = f"baseband-{event_number}-{task_name}"
        else:
            job_name = f"baseband-{event_number}-{task_name}-{job_id}"

    out = self.spawn_job(
        image_name=image_name,
        command=command + [task_name],
        arguments=["--event-number", str(event_number)]
        + job_argument
        + ["--"]
        + arguments,
        job_name=job_name,
        job_mem_limit=job_mem_limit,
        job_mem_reservation=job_mem_reservation,
        job_cpu_limit=2,
        job_cpu_reservation=2,
        environment=environment,
        **kwargs,
    )

    return out
spawn_job(image_name, command, arguments, job_name, mount_archiver=True, swarm_network=True, job_mem_limit=4294967296, job_mem_reservation=268435456, job_cpu_limit=1, job_cpu_reservation=1, environment={})

Spawn a job on the CHIME/FRB Analysis Cluster.

Parameters:

Name Type Description Default
image_name str

Name of the container image

required
command list

Command to run in the container

required
arguments list

Arguments to the command

required
job_name str

Unique name for the job

required
mount_archiver bool

Mount Site Data Archivers, by default True

True
swarm_network bool

Mount Cluster Network, by default True

True
job_mem_limit int

Memory limit in bytes, by default 4294967296

4294967296
job_mem_reservation int

Minimum memory reserved, by default 268435456

268435456
job_cpu_limit float

Maximum cpu cores job can use, by default 1

1
job_cpu_reservation float

Minimum cores reservers for the job, default 1

1
environment dict

ENV to pass to the container, default is {}

{}

Returns:

Type Description
Dict[str, str]

JSON [description]

Source code in chime_frb_api/modules/swarm.py
def spawn_job(
    self,
    image_name: str,
    command: list,
    arguments: list,
    job_name: str,
    mount_archiver: bool = True,
    swarm_network: bool = True,
    job_mem_limit: int = 4294967296,
    job_mem_reservation: int = 268435456,
    job_cpu_limit: float = 1,
    job_cpu_reservation: float = 1,
    environment: dict = {},
) -> Dict[str, str]:
    """Spawn a job on the CHIME/FRB Analysis Cluster.

    Args:
        image_name: Name of the container image
        command: Command to run in the container
        arguments: Arguments to the command
        job_name: Unique name for the job
        mount_archiver: Mount Site Data Archivers, by default True
        swarm_network: Mount Cluster Network, by default True
        job_mem_limit: Memory limit in bytes, by default 4294967296
        job_mem_reservation: Minimum memory reserved, by default 268435456
        job_cpu_limit: Maximum cpu cores job can use, by default 1
        job_cpu_reservation: Minimum cores reservers for the job, default 1
        environment: ENV to pass to the container, default is {}

    Returns:
        JSON
            [description]
    """
    payload = {
        "image_name": image_name,
        "command": command,
        "arguments": arguments,
        "job_name": job_name,
        "mount_archiver": mount_archiver,
        "swarm_network": swarm_network,
        "job_mem_reservation": job_mem_reservation,
        "job_mem_limit": job_mem_limit,
        "job_cpu_limit": job_cpu_limit,
        "job_cpu_reservation": job_cpu_reservation,
        "environment": environment,
    }
    return self.API.post(url="/v1/swarm/spawn-job", json=payload)

Workflow

chime_frb_api.workflow.Work

Bases: BaseModel

Work Object.

Parameters:

Name Type Description Default
BaseModel BaseModel

Pydantic BaseModel.

required

Attributes:

Name Type Description
pipeline str

Name of the pipeline. (Required) Automatically reformated to hyphen-case.

site str

Site where the work will be performed. (Required)

user str

User who created the work. (Required)

function Optional[str]

Name of the function ran as function(**parameters).

command Optional[List[str]]

Command to run as subprocess.run(command).

parameters Optional[Dict[str, Any]]

Parameters to pass to the function.

command Optional[List[str]]

Command to run as subprocess.run(command).

results Optional[Dict[str, Any]]

Results of the work.

products Optional[Dict[str, Any]]

Products of the work.

plots Optional[Dict[str, Any]]

Plots of the work.

event Optional[List[int]]

Event ID of the work.

tags Optional[List[str]]

Tags of the work.

timeout int

Timeout for the work in seconds. Default is 3600 seconds.

retries int

Number of retries for the work. Default is 2 retries.

priority int

Priority of the work. Default is 3.

config WorkConfig

Configuration of the work.

notify Notify

Notification configuration of the work.

id str

ID of the work.

creation float

Creation time of the work.

start float

Start time of the work.

stop float

Stop time of the work.

status str

Status of the work.

Raises:

Type Description
ValueError

If the work is not valid.

Returns:

Name Type Description
Work

Work object.

Example
from chime_frb_api.workflow import Work

work = Work(pipeline="test-pipeline", site="chime", user="shinybrar")
work.deposit(return_ids=True)
Source code in chime_frb_api/workflow/work.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
class Work(BaseModel):
    """Work Object.

    Args:
        BaseModel (BaseModel): Pydantic BaseModel.

    Attributes:
        pipeline (str): Name of the pipeline. (Required)
            Automatically reformated to hyphen-case.
        site (str): Site where the work will be performed. (Required)
        user (str): User who created the work. (Required)
        function (Optional[str]): Name of the function ran as `function(**parameters)`.
        command (Optional[List[str]]): Command to run as `subprocess.run(command)`.
        parameters (Optional[Dict[str, Any]]): Parameters to pass to the function.
        command (Optional[List[str]]): Command to run as `subprocess.run(command)`.
        results (Optional[Dict[str, Any]]): Results of the work.
        products (Optional[Dict[str, Any]]): Products of the work.
        plots (Optional[Dict[str, Any]]): Plots of the work.
        event (Optional[List[int]]): Event ID of the work.
        tags (Optional[List[str]]): Tags of the work.
        timeout (int): Timeout for the work in seconds. Default is 3600 seconds.
        retries (int): Number of retries for the work. Default is 2 retries.
        priority (int): Priority of the work. Default is 3.
        config (WorkConfig): Configuration of the work.
        notify (Notify): Notification configuration of the work.
        id (str): ID of the work.
        creation (float): Creation time of the work.
        start (float): Start time of the work.
        stop (float): Stop time of the work.
        status (str): Status of the work.

    Raises:
        ValueError: If the work is not valid.

    Returns:
        Work: Work object.

    Example:
        ```python
        from chime_frb_api.workflow import Work

        work = Work(pipeline="test-pipeline", site="chime", user="shinybrar")
        work.deposit(return_ids=True)
        ```
    """

    class Config:
        """Pydantic Config."""

        validate_all = True
        validate_assignment = True
        exclude_none = True

    ###########################################################################
    # Required Attributes. Set by user.
    ###########################################################################
    pipeline: StrictStr = Field(
        ...,
        min_length=1,
        description="Name of the pipeline. Automatically reformated to hyphen-case.xw",
        example="example-pipeline",
    )
    site: Literal[
        "canfar",
        "cedar",
        "chime",
        "aro",
        "hco",
        "gbo",
        "kko",
        "local",
    ] = Field(
        ...,
        description="Site where the work will be performed.",
        example="chime",
    )
    user: StrictStr = Field(
        ..., description="User ID who created the work.", example="shinybrar"
    )
    token: Optional[SecretStr] = Field(
        default=next(
            (
                value
                for value in [
                    environ.get("GITHUB_TOKEN"),
                    environ.get("WORKFLOW_TOKEN"),
                    environ.get("GITHUB_PAT"),
                    environ.get("GITHUB_ACCESS_TOKEN"),
                    environ.get("GITHUB_PERSONAL_ACCESS_TOKEN"),
                    environ.get("GITHUB_OAUTH_TOKEN"),
                    environ.get("GITHUB_OAUTH_ACCESS_TOKEN"),
                ]
                if value is not None
            ),
            None,
        ),
        description="Github Personal Access Token.",
        example="ghp_1234567890abcdefg",
        exclude=True,
    )

    ###########################################################################
    # Optional attributes, might be provided by the user.
    ###########################################################################
    function: str = Field(
        default=None,
        description="""
        Name of the function to run as `function(**parameters)`.
        Only either `function` or `command` can be provided.
        """,
        example="requests.get",
    )
    parameters: Optional[Dict[str, Any]] = Field(
        default=None,
        description="""
        Parameters to pass the pipeline function.
        """,
        example={"event_number": 9385707},
    )
    command: List[str] = Field(
        default=None,
        description="""
        Command to run as `subprocess.run(command)`.
        Note, only either `function` or `command` can be provided.
        """,
        example=["python", "example.py", "--example", "example"],
    )
    results: Optional[Dict[str, Any]] = Field(
        default=None,
        description="Results of the work performed, if any.",
        example={"dm": 100.0, "snr": 10.0},
    )
    products: Optional[List[StrictStr]] = Field(
        default=None,
        description="""
        Name of the non-human-readable data products generated by the pipeline.
        """,
        example=["spectra.h5", "dm_vs_time.png"],
    )
    plots: Optional[List[StrictStr]] = Field(
        default=None,
        description="""
        Name of visual data products generated by the pipeline.
        """,
        example=["waterfall.png", "/arc/projects/chimefrb/9385707/9385707.png"],
    )
    event: Optional[List[int]] = Field(
        default=None,
        description="CHIME/FRB Event ID[s] the work was performed against.",
        example=[9385707, 9385708],
    )
    tags: Optional[List[str]] = Field(
        default=None,
        description="""
        Searchable tags for the work. Merged with values from env WORKFLOW_TAGS.
        """,
        example=["dm-analysis"],
    )
    timeout: int = Field(
        default=3600,
        ge=1,
        le=86400,
        description="""
        Timeout in seconds for the work to finish.
        Defaults 3600s (1 hr) with range of [1, 86400] (1s-24hrs).
        """,
        example=7200,
    )
    retries: int = Field(
        default=2,
        lt=6,
        description="Number of retries before giving up. Defaults to 2.",
        example=4,
    )
    priority: int = Field(
        default=3,
        ge=1,
        le=5,
        description="Priority of the work. Defaults to 3.",
        example=1,
    )
    config: WorkConfig = WorkConfig()
    notify: Notify = Notify()

    ###########################################################################
    # Automaticaly set attributes
    ###########################################################################
    id: Optional[StrictStr] = Field(
        default=None, description="Work ID created by the database."
    )
    creation: Optional[StrictFloat] = Field(
        default=None, description="Unix timestamp of when the work was created."
    )
    start: Optional[StrictFloat] = Field(
        default=None,
        description="Unix timestamp when the work was started, reset at each attempt.",
    )
    stop: Optional[StrictFloat] = Field(
        default=None,
        description="Unix timestamp when the work was stopped, reset at each attempt.",
    )
    attempt: int = Field(
        default=0, ge=0, description="Attempt number at performing the work."
    )
    status: Literal["created", "queued", "running", "success", "failure"] = Field(
        default="created", description="Status of the work."
    )
    ###########################################################################
    # Attribute setters for the work attributes
    ###########################################################################

    @root_validator
    def post_init(cls, values: Dict[str, Any]):
        """Initialize work attributes after validation."""
        # Check if the pipeline name has any character that is uppercase
        reformatted: bool = False
        for char in values["pipeline"]:
            if char.isupper():
                values["pipeline"] = values["pipeline"].lower()
                reformatted = True
                break

        if any(char in {" ", "_"} for char in values["pipeline"]):
            values["pipeline"] = values["pipeline"].replace(" ", "-")
            values["pipeline"] = values["pipeline"].replace("_", "-")
            reformatted = True

        # Check if the pipeline has any character that is not alphanumeric or dash
        for char in values["pipeline"]:
            if not char.isalnum() and char not in ["-"]:
                raise ValueError(
                    "pipeline name can only contain letters, numbers & dashes"
                )

        if reformatted:
            warn(
                SyntaxWarning(f"pipeline reformatted to {values['pipeline']}"),
                stacklevel=2,
            )

        # Set creation time if not already set
        if values.get("creation") is None:
            values["creation"] = time()
        # Update tags from environment variable WORKFLOW_TAGS
        if environ.get("WORKFLOW_TAGS"):
            env_tags: List[str] = str(environ.get("WORKFLOW_TAGS")).split(",")
            # If tags are already set, append the new ones
            if values.get("tags"):
                values["tags"] = values["tags"] + env_tags
            else:
                values["tags"] = env_tags
            # Remove duplicates
            values["tags"] = list(set(values["tags"]))

        # Check if both command and function are set
        if values.get("command") and values.get("function"):
            raise ValueError("command and function cannot be set together.")

        if not values.get("token"):  # type: ignore
            msg = "workflow token required after v4.0.0."
            warn(
                FutureWarning(msg),
                stacklevel=2,
            )
        return values

    ###########################################################################
    # Work methods
    ###########################################################################

    @property
    def payload(self) -> Dict[str, Any]:
        """Return the dictioanary representation of the work.

        Returns:
            Dict[str, Any]: The payload of the work.
            Non-instanced attributes are excluded from the payload.
        """
        payload: Dict[str, Any] = self.dict(exclude={"config.token"})
        return payload

    @classmethod
    def from_json(cls, json_str: str) -> "Work":
        """Create a work from a json string.

        Args:
            json_str (str): The json string.

        Returns:
            Work: The work.
        """
        return cls(**loads(json_str))

    @classmethod
    def from_dict(cls, payload: Dict[str, Any]) -> "Work":
        """Create a work from a dictionary.

        Args:
            payload (Dict[str, Any]): The dictionary.

        Returns:
            Work: The work.
        """
        return cls(**payload)

    ###########################################################################
    # HTTP Methods
    ###########################################################################

    @classmethod
    def withdraw(
        cls,
        pipeline: str,
        event: Optional[List[int]] = None,
        site: Optional[str] = None,
        priority: Optional[int] = None,
        user: Optional[str] = None,
        **kwargs: Dict[str, Any],
    ) -> Optional["Work"]:
        """Withdraw work from the buckets backend.

        Args:
            pipeline (str): Name of the pipeline.
            **kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.

        Returns:
            Work: Work object.
        """
        buckets = Buckets(**kwargs)  # type: ignore
        payload = buckets.withdraw(
            pipeline=pipeline, event=event, site=site, priority=priority, user=user
        )
        if payload:
            return cls.from_dict(payload)
        return None

    @retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
    def deposit(
        self, return_ids: bool = False, **kwargs: Dict[str, Any]
    ) -> Union[bool, List[str]]:
        """Deposit work to the buckets backend.

        Args:
            **kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.

        Returns:
            bool: True if successful, False otherwise.
        """
        buckets = Buckets(**kwargs)  # type: ignore
        return buckets.deposit(works=[self.payload], return_ids=return_ids)

    @retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
    def update(self, **kwargs: Dict[str, Any]) -> bool:
        """Update work in the buckets backend.

        Args:
            **kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.

        Returns:
            bool: True if successful, False otherwise.
        """
        buckets = Buckets(**kwargs)  # type: ignore
        return buckets.update([self.payload])

    @retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
    def delete(self, **kwargs: Dict[str, Any]) -> bool:
        """Delete work from the buckets backend.

        Args:
            ids (List[str]): List of ids to delete.

        Returns:
            bool: True if successful, False otherwise.
        """
        buckets = Buckets(**kwargs)  # type: ignore
        return buckets.delete_ids([str(self.id)])
Attributes
payload: Dict[str, Any] property

Return the dictioanary representation of the work.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The payload of the work.

Dict[str, Any]

Non-instanced attributes are excluded from the payload.

Classes
Config

Pydantic Config.

Source code in chime_frb_api/workflow/work.py
class Config:
    """Pydantic Config."""

    validate_all = True
    validate_assignment = True
    exclude_none = True
Functions
delete(**kwargs)

Delete work from the buckets backend.

Parameters:

Name Type Description Default
ids List[str]

List of ids to delete.

required

Returns:

Name Type Description
bool bool

True if successful, False otherwise.

Source code in chime_frb_api/workflow/work.py
@retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
def delete(self, **kwargs: Dict[str, Any]) -> bool:
    """Delete work from the buckets backend.

    Args:
        ids (List[str]): List of ids to delete.

    Returns:
        bool: True if successful, False otherwise.
    """
    buckets = Buckets(**kwargs)  # type: ignore
    return buckets.delete_ids([str(self.id)])
deposit(return_ids=False, **kwargs)

Deposit work to the buckets backend.

Parameters:

Name Type Description Default
**kwargs Dict[str, Any]

Keyword arguments for the Buckets API.

{}

Returns:

Name Type Description
bool Union[bool, List[str]]

True if successful, False otherwise.

Source code in chime_frb_api/workflow/work.py
@retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
def deposit(
    self, return_ids: bool = False, **kwargs: Dict[str, Any]
) -> Union[bool, List[str]]:
    """Deposit work to the buckets backend.

    Args:
        **kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.

    Returns:
        bool: True if successful, False otherwise.
    """
    buckets = Buckets(**kwargs)  # type: ignore
    return buckets.deposit(works=[self.payload], return_ids=return_ids)
from_dict(payload) classmethod

Create a work from a dictionary.

Parameters:

Name Type Description Default
payload Dict[str, Any]

The dictionary.

required

Returns:

Name Type Description
Work Work

The work.

Source code in chime_frb_api/workflow/work.py
@classmethod
def from_dict(cls, payload: Dict[str, Any]) -> "Work":
    """Create a work from a dictionary.

    Args:
        payload (Dict[str, Any]): The dictionary.

    Returns:
        Work: The work.
    """
    return cls(**payload)
from_json(json_str) classmethod

Create a work from a json string.

Parameters:

Name Type Description Default
json_str str

The json string.

required

Returns:

Name Type Description
Work Work

The work.

Source code in chime_frb_api/workflow/work.py
@classmethod
def from_json(cls, json_str: str) -> "Work":
    """Create a work from a json string.

    Args:
        json_str (str): The json string.

    Returns:
        Work: The work.
    """
    return cls(**loads(json_str))
post_init(values)

Initialize work attributes after validation.

Source code in chime_frb_api/workflow/work.py
@root_validator
def post_init(cls, values: Dict[str, Any]):
    """Initialize work attributes after validation."""
    # Check if the pipeline name has any character that is uppercase
    reformatted: bool = False
    for char in values["pipeline"]:
        if char.isupper():
            values["pipeline"] = values["pipeline"].lower()
            reformatted = True
            break

    if any(char in {" ", "_"} for char in values["pipeline"]):
        values["pipeline"] = values["pipeline"].replace(" ", "-")
        values["pipeline"] = values["pipeline"].replace("_", "-")
        reformatted = True

    # Check if the pipeline has any character that is not alphanumeric or dash
    for char in values["pipeline"]:
        if not char.isalnum() and char not in ["-"]:
            raise ValueError(
                "pipeline name can only contain letters, numbers & dashes"
            )

    if reformatted:
        warn(
            SyntaxWarning(f"pipeline reformatted to {values['pipeline']}"),
            stacklevel=2,
        )

    # Set creation time if not already set
    if values.get("creation") is None:
        values["creation"] = time()
    # Update tags from environment variable WORKFLOW_TAGS
    if environ.get("WORKFLOW_TAGS"):
        env_tags: List[str] = str(environ.get("WORKFLOW_TAGS")).split(",")
        # If tags are already set, append the new ones
        if values.get("tags"):
            values["tags"] = values["tags"] + env_tags
        else:
            values["tags"] = env_tags
        # Remove duplicates
        values["tags"] = list(set(values["tags"]))

    # Check if both command and function are set
    if values.get("command") and values.get("function"):
        raise ValueError("command and function cannot be set together.")

    if not values.get("token"):  # type: ignore
        msg = "workflow token required after v4.0.0."
        warn(
            FutureWarning(msg),
            stacklevel=2,
        )
    return values
update(**kwargs)

Update work in the buckets backend.

Parameters:

Name Type Description Default
**kwargs Dict[str, Any]

Keyword arguments for the Buckets API.

{}

Returns:

Name Type Description
bool bool

True if successful, False otherwise.

Source code in chime_frb_api/workflow/work.py
@retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
def update(self, **kwargs: Dict[str, Any]) -> bool:
    """Update work in the buckets backend.

    Args:
        **kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.

    Returns:
        bool: True if successful, False otherwise.
    """
    buckets = Buckets(**kwargs)  # type: ignore
    return buckets.update([self.payload])
withdraw(pipeline, event=None, site=None, priority=None, user=None, **kwargs) classmethod

Withdraw work from the buckets backend.

Parameters:

Name Type Description Default
pipeline str

Name of the pipeline.

required
**kwargs Dict[str, Any]

Keyword arguments for the Buckets API.

{}

Returns:

Name Type Description
Work Optional[Work]

Work object.

Source code in chime_frb_api/workflow/work.py
@classmethod
def withdraw(
    cls,
    pipeline: str,
    event: Optional[List[int]] = None,
    site: Optional[str] = None,
    priority: Optional[int] = None,
    user: Optional[str] = None,
    **kwargs: Dict[str, Any],
) -> Optional["Work"]:
    """Withdraw work from the buckets backend.

    Args:
        pipeline (str): Name of the pipeline.
        **kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.

    Returns:
        Work: Work object.
    """
    buckets = Buckets(**kwargs)  # type: ignore
    payload = buckets.withdraw(
        pipeline=pipeline, event=event, site=site, priority=priority, user=user
    )
    if payload:
        return cls.from_dict(payload)
    return None