Skip to content

Work Object

Bases: BaseModel

Work Object.

Parameters:

Name Type Description Default
BaseModel BaseModel

Pydantic BaseModel.

required

Attributes:

Name Type Description
pipeline str

Name of the pipeline. (Required) Automatically reformated to hyphen-case.

site str

Site where the work will be performed. (Required)

user str

User who created the work. (Required)

function Optional[str]

Name of the function ran as function(**parameters).

command Optional[List[str]]

Command to run as subprocess.run(command).

parameters Optional[Dict[str, Any]]

Parameters to pass to the function.

command Optional[List[str]]

Command to run as subprocess.run(command).

results Optional[Dict[str, Any]]

Results of the work.

products Optional[Dict[str, Any]]

Products of the work.

plots Optional[Dict[str, Any]]

Plots of the work.

event Optional[List[int]]

Event ID of the work.

tags Optional[List[str]]

Tags of the work.

timeout int

Timeout for the work in seconds. Default is 3600 seconds.

retries int

Number of retries for the work. Default is 2 retries.

priority int

Priority of the work. Default is 3.

config WorkConfig

Configuration of the work.

notify Notify

Notification configuration of the work.

id str

ID of the work.

creation float

Creation time of the work.

start float

Start time of the work.

stop float

Stop time of the work.

status str

Status of the work.

Raises:

Type Description
ValueError

If the work is not valid.

Returns:

Name Type Description
Work

Work object.

Example
Python
from chime_frb_api.workflow import Work

work = Work(pipeline="test-pipeline", site="chime", user="shinybrar")
work.deposit(return_ids=True)

payload property

Python
payload: Dict[str, Any]

Return the dictioanary representation of the work.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The payload of the work.

Dict[str, Any]

Non-instanced attributes are excluded from the payload.

Config

Pydantic Config.

delete

Python
delete(**kwargs: Dict[str, Any]) -> bool

Delete work from the buckets backend.

Parameters:

Name Type Description Default
ids List[str]

List of ids to delete.

required

Returns:

Name Type Description
bool bool

True if successful, False otherwise.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
Python
@retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
def delete(self, **kwargs: Dict[str, Any]) -> bool:
    """Delete work from the buckets backend.

    Args:
        ids (List[str]): List of ids to delete.

    Returns:
        bool: True if successful, False otherwise.
    """
    buckets = Buckets(**kwargs)  # type: ignore
    return buckets.delete_ids([str(self.id)])

deposit

Python
deposit(return_ids: bool = False, **kwargs: Dict[str, Any]) -> Union[bool, List[str]]

Deposit work to the buckets backend.

Parameters:

Name Type Description Default
**kwargs Dict[str, Any]

Keyword arguments for the Buckets API.

{}

Returns:

Name Type Description
bool Union[bool, List[str]]

True if successful, False otherwise.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
Python
@retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
def deposit(
    self, return_ids: bool = False, **kwargs: Dict[str, Any]
) -> Union[bool, List[str]]:
    """Deposit work to the buckets backend.

    Args:
        **kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.

    Returns:
        bool: True if successful, False otherwise.
    """
    buckets = Buckets(**kwargs)  # type: ignore
    return buckets.deposit(works=[self.payload], return_ids=return_ids)

from_dict classmethod

Python
from_dict(payload: Dict[str, Any]) -> Work

Create a work from a dictionary.

Parameters:

Name Type Description Default
payload Dict[str, Any]

The dictionary.

required

Returns:

Name Type Description
Work Work

The work.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
Python
@classmethod
def from_dict(cls, payload: Dict[str, Any]) -> "Work":
    """Create a work from a dictionary.

    Args:
        payload (Dict[str, Any]): The dictionary.

    Returns:
        Work: The work.
    """
    return cls(**payload)

from_json classmethod

Python
from_json(json_str: str) -> Work

Create a work from a json string.

Parameters:

Name Type Description Default
json_str str

The json string.

required

Returns:

Name Type Description
Work Work

The work.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
Python
@classmethod
def from_json(cls, json_str: str) -> "Work":
    """Create a work from a json string.

    Args:
        json_str (str): The json string.

    Returns:
        Work: The work.
    """
    return cls(**loads(json_str))

post_init

Python
post_init(values: Dict[str, Any])

Initialize work attributes after validation.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
Python
@root_validator
def post_init(cls, values: Dict[str, Any]):
    """Initialize work attributes after validation."""
    # Check if the pipeline name has any character that is uppercase
    reformatted: bool = False
    for char in values["pipeline"]:
        if char.isupper():
            values["pipeline"] = values["pipeline"].lower()
            reformatted = True
            break

    if any(char in {" ", "_"} for char in values["pipeline"]):
        values["pipeline"] = values["pipeline"].replace(" ", "-")
        values["pipeline"] = values["pipeline"].replace("_", "-")
        reformatted = True

    # Check if the pipeline has any character that is not alphanumeric or dash
    for char in values["pipeline"]:
        if not char.isalnum() and char not in ["-"]:
            raise ValueError(
                "pipeline name can only contain letters, numbers & dashes"
            )

    if reformatted:
        warn(
            SyntaxWarning(f"pipeline reformatted to {values['pipeline']}"),
            stacklevel=2,
        )

    # Set creation time if not already set
    if values.get("creation") is None:
        values["creation"] = time()
    # Update tags from environment variable WORKFLOW_TAGS
    if environ.get("WORKFLOW_TAGS"):
        env_tags: List[str] = str(environ.get("WORKFLOW_TAGS")).split(",")
        # If tags are already set, append the new ones
        if values.get("tags"):
            values["tags"] = values["tags"] + env_tags
        else:
            values["tags"] = env_tags
        # Remove duplicates
        values["tags"] = list(set(values["tags"]))

    # Check if both command and function are set
    if values.get("command") and values.get("function"):
        raise ValueError("command and function cannot be set together.")

    if not values.get("token"):  # type: ignore
        msg = "workflow token required after v4.0.0."
        warn(
            FutureWarning(msg),
            stacklevel=2,
        )
    return values

update

Python
update(**kwargs: Dict[str, Any]) -> bool

Update work in the buckets backend.

Parameters:

Name Type Description Default
**kwargs Dict[str, Any]

Keyword arguments for the Buckets API.

{}

Returns:

Name Type Description
bool bool

True if successful, False otherwise.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
Python
@retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
def update(self, **kwargs: Dict[str, Any]) -> bool:
    """Update work in the buckets backend.

    Args:
        **kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.

    Returns:
        bool: True if successful, False otherwise.
    """
    buckets = Buckets(**kwargs)  # type: ignore
    return buckets.update([self.payload])

withdraw classmethod

Python
withdraw(pipeline: str, event: Optional[List[int]] = None, site: Optional[str] = None, priority: Optional[int] = None, user: Optional[str] = None, tags: Optional[List[str]] = None, parent: Optional[str] = None, **kwargs: Dict[str, Any]) -> Optional[Work]

Withdraw work from the buckets backend.

Parameters:

Name Type Description Default
pipeline str

Name of the pipeline.

required
**kwargs Dict[str, Any]

Keyword arguments for the Buckets API.

{}

Returns:

Name Type Description
Work Optional[Work]

Work object.

Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
Python
@classmethod
def withdraw(
    cls,
    pipeline: str,
    event: Optional[List[int]] = None,
    site: Optional[str] = None,
    priority: Optional[int] = None,
    user: Optional[str] = None,
    tags: Optional[List[str]] = None,
    parent: Optional[str] = None,
    **kwargs: Dict[str, Any],
) -> Optional["Work"]:
    """Withdraw work from the buckets backend.

    Args:
        pipeline (str): Name of the pipeline.
        **kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.

    Returns:
        Work: Work object.
    """
    buckets = Buckets(**kwargs)  # type: ignore
    payload = buckets.withdraw(
        pipeline=pipeline,
        event=event,
        site=site,
        priority=priority,
        user=user,
        tags=tags,
        parent=parent,
    )
    if payload:
        return cls.from_dict(payload)
    return None