Skip to content

Work Object

Bases: BaseSettings

Workflow Work Object.

The work object defines the lifecycle of any action to be performed.

Parameters:

Name Type Description Default
BaseSettings BaseSettings

Pydantic BaseModel with settings.

required
Note

The selection priority for attributes in descending order is:

  • Arguments passed to the Work Object.
  • Environment variables with WORKFLOW_ prefix.
  • Variables from /run/secrets secrets directory.
  • The default values in the class constructor.

Environemnt Prefixes:

  • WORKFLOW_ for all work attributes.
  • WORKFLOW_HTTP_ for all work http attributes.
  • WORKFLOW_CONFIG_ for all work config attributes.
  • WORKFLOW_NOTIFY_ for all work notify attributes.

The size of the results dictionary is limited to 4MB. If the results dictionary is larger than 4MB, it is ignored and not mapped to the work object. Use the products attribute to store large data products.

Attributes:

Name Type Description
pipeline str

Name of the pipeline. (Required)

site str

Site where the work will be performed. (Required)

user str

User who created the work. (Required)

function str

Python function to run function(**parameters).

parameters Dict[str, Any]

Parameters to pass to the function.

command List[str]

Command to run as subprocess.run(command).

results Dict[str, Any]

Results from the work.

products List[str]

Data products from work.

plots List[str]

Visual plots of the work.

tags List[str]

Searchable tags of the work.

event List[int]

Unique ID[s] the work was performed against.

id str

BSON ID of the work, set by the database.

creation float

Creation time of the work.

start float

Start time of the work, set by the backend.

stop float

Stop time of the work, set by the client.

attempt int

Attempt number of the work. Defaults to 0.

status str

Status of the work. Defaults to "created".

timeout int

Timeout for the work in seconds. Defaults to 3600 seconds.

retries int

Number of retries for the work. Defaults to 2 retries.

priority int

Priority of the work. Defaults to 3.

config Config

Configuration for the lifecycle of the work.

notify Notify

Notification configuration for the work.

workspace FilePath

Path to the active workspace configuration. Defaults to ~/.config/workflow/workspace.yml. (Excluded from payload)

token SecretStr

Workflow Access Token. (Excluded from payload)

http HTTPContext

HTTP Context for backends. (Excluded from payload)

Raises:

Type Description
ValueError

When work is invalid.

Returns:

Name Type Description
Object Work

Work object.

Example
Bash
# Set the environment variables to override the defaults.
export WORKFLOW_PRIORITY=4
Python
from workflow.definitions.work import Work

work = Work(pipeline="test", site="local", user="shinybrar")
work.dump_model()
{
    "config": {...},
    "notify": {"slack": {}},
    "pipeline": "test-pipeline",
    "site": "local",
    "user": "shinybrar",
    "timeout": 3600,
    "retries": 2,
    "priority": 4,  # Overridden by the environment variable.
    "attempt": 0,
    "status": "created",
}
work.deposit(return_ids=True)

payload property

Python
payload: Dict[str, Any]

Return the dictionary representation of the work.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The payload of the work.

Dict[str, Any]

Non-instanced attributes are excluded from the payload.

delete

Python
delete() -> bool

Delete work from the buckets backend.

Returns:

Name Type Description
bool bool

True if successful, False otherwise.

Source code in workflow/definitions/work.py
Python
def delete(self) -> bool:
    """Delete work from the buckets backend.

    Returns:
        bool: True if successful, False otherwise.
    """
    return self.http.buckets.delete_ids([str(self.id)])

deposit

Python
deposit(return_ids: bool = False, timeout: float = 15.0, token: Optional[SecretStr] = None, http: Optional[HTTPContext] = None) -> Union[bool, List[str]]

Deposit work to the buckets backend.

Parameters:

Name Type Description Default
return_ids bool

Return Database ID. Defaults to False.

False
timeout float

HTTP request timeout in seconds.

15.0
token Optional[SecretStr]

Workflow Access Token.

None
http Optional[HTTPContext]

HTTP Context for backend.

None
Note

Both token and http can be either provided as arguments to the function or also inherited from the work object itself.

Returns:

Type Description
Union[bool, List[str]]

Union[bool, List[str]]: True if successful, False otherwise.

Source code in workflow/definitions/work.py
Python
def deposit(
    self,
    return_ids: bool = False,
    timeout: float = 15.0,
    token: Optional[SecretStr] = None,
    http: Optional[HTTPContext] = None,
) -> Union[bool, List[str]]:
    """Deposit work to the buckets backend.

    Args:
        return_ids (bool, optional): Return Database ID. Defaults to False.
        timeout (float, optional): HTTP request timeout in seconds.
        token (Optional[SecretStr], optional): Workflow Access Token.
        http (Optional[HTTPContext], optional): HTTP Context for backend.

    Note:
        Both token and http can be either provided as arguments to the function
        or also inherited from the work object itself.

    Returns:
        Union[bool, List[str]]: True if successful, False otherwise.
    """
    self.token = token or self.token
    self.http = (
        http
        or self.http
        or HTTPContext(timeout=timeout, token=token, backends=["buckets"])
    )
    return self.http.buckets.deposit(works=[self.payload], return_ids=return_ids)

from_dict classmethod

Python
from_dict(payload: Dict[str, Any]) -> Work

Create a work from a dictionary.

Parameters:

Name Type Description Default
payload Dict[str, Any]

The dictionary.

required

Returns:

Name Type Description
Work Work

Work Object.

Source code in workflow/definitions/work.py
Python
@classmethod
def from_dict(cls, payload: Dict[str, Any]) -> "Work":
    """Create a work from a dictionary.

    Args:
        payload (Dict[str, Any]): The dictionary.

    Returns:
        Work: Work Object.
    """
    return cls(**payload)

from_json classmethod

Python
from_json(json: str) -> Work

Create a work from a json string.

Parameters:

Name Type Description Default
json str

The json string.

required

Returns:

Name Type Description
Work Work

Work Object.

Source code in workflow/definitions/work.py
Python
@classmethod
def from_json(cls, json: str) -> "Work":
    """Create a work from a json string.

    Args:
        json (str): The json string.

    Returns:
        Work: Work Object.
    """
    return cls(**loads(json))

post_model_validation

Python
post_model_validation() -> Work

Validate the work model after creating the object.

Raises:

Type Description
ValueError

Raised if, the site provided is not allowed in the workspace.

Returns:

Name Type Description
Work Work

The current work object.

Source code in workflow/definitions/work.py
Python
@model_validator(mode="after")
def post_model_validation(self) -> "Work":
    """Validate the work model after creating the object.

    Raises:
        ValueError: Raised if,
            the site provided is not allowed in the workspace.

    Returns:
        Work: The current work object.
    """
    # Validate if the site provided is allowed in the workspace.
    if self.config.strategy == "strict":
        config: Dict[str, Any] = read.workspace(self.workspace)
        sites: List[str] = config.get("sites", [])
        if self.site not in sites:
            error = f"site {self.site} not in workspace: {sites}."
            raise ValueError(error)
    return self

pre_validation

Python
pre_validation(data: Any) -> Any

Validate the work model before creating the object.

Parameters:

Name Type Description Default
data Any

Work data to validate.

required

Raises:

Type Description
ValueError

If both function and command are set.

Returns:

Name Type Description
Any Any

Validated work data.

Source code in workflow/definitions/work.py
Python
@model_validator(mode="before")
def pre_validation(cls, data: Any) -> Any:
    """Validate the work model before creating the object.

    Args:
        data (Any): Work data to validate.

    Raises:
        ValueError: If both `function` and `command` are set.

    Returns:
        Any: Validated work data.
    """
    if data.get("function") and data.get("command"):
        raise ValueError("Only either function or command can be provided.")
    return data

update

Python
update() -> bool

Update work in the buckets backend.

Returns:

Name Type Description
bool bool

True if successful, False otherwise.

Source code in workflow/definitions/work.py
Python
def update(self) -> bool:
    """Update work in the buckets backend.

    Returns:
        bool: True if successful, False otherwise.
    """
    return self.http.buckets.update([self.payload])

validate_creation

Python
validate_creation(creation: Optional[float]) -> float

Validate and set the creation time.

Parameters:

Name Type Description Default
creation Optional[float]

Creation time in unix timestamp.

required

Returns:

Name Type Description
float float

Creation time in unix timestamp.

Source code in workflow/definitions/work.py
Python
@field_validator("creation")
def validate_creation(cls, creation: Optional[float]) -> float:
    """Validate and set the creation time.

    Args:
        creation (Optional[float]): Creation time in unix timestamp.

    Returns:
        float: Creation time in unix timestamp.
    """
    if creation is None:
        return time()
    return creation

validate_pipeline

Python
validate_pipeline(pipeline: str) -> str

Validate the pipeline name.

Parameters:

Name Type Description Default
pipeline str

Name of the pipeline.

required

Raises:

Type Description
ValueError

If the pipeline name contains any character that is not

Returns:

Name Type Description
str str

Validated pipeline name.

Source code in workflow/definitions/work.py
Python
@field_validator("pipeline", mode="after", check_fields=True)
def validate_pipeline(cls, pipeline: str) -> str:
    """Validate the pipeline name.

    Args:
        pipeline (str): Name of the pipeline.

    Raises:
        ValueError: If the pipeline name contains any character that is not

    Returns:
        str: Validated pipeline name.
    """
    for char in pipeline:
        if not char.isalnum() and char not in ["-"]:
            raise ValueError(
                "pipeline name can only contain letters, numbers & hyphens."
            )
    return pipeline

withdraw classmethod

Python
withdraw(pipeline: Union[str, List[str]], event: Optional[List[int]] = None, site: Optional[str] = None, priority: Optional[int] = None, user: Optional[str] = None, tags: Optional[List[str]] = None, parent: Optional[List[str]] = None, timeout: float = 15.0, token: Optional[SecretStr] = None, http: Optional[HTTPContext] = None) -> Optional[Work]

Withdraw work from the buckets backend.

Parameters:

Name Type Description Default
pipeline Union[str, List[str]]

Name of the pipeline to withdraw work for.

required
event Optional[List[int]]

Unique event ids to withdraw work for.

None
site Optional[str]

Name of the site to withdraw work for.

None
priority Optional[int]

Priority of the work to withdraw.

None
user Optional[str]

Name of the user to withdraw work for.

None
tags Optional[List[str]]

List of tags to withdraw work from.

None
parent Optional[str]

Parent Pipeline ID of the work to withdraw.

None
timeout float

HTTP timeout in seconds. Defaults to 15.0.

15.0
token Optional[SecretStr]

Workflow Access Token.

None
http Optional[HTTPContext]

HTTP Context for connecting to workflow servers.

None
Note

token and timeout arguments can also be provided via the environment,

Text Only
- "WORKFLOW_HTTP_TOKEN"
- "WORKFLOW_TOKEN"
- "GITHUB_TOKEN"
- "GITHUB_PAT"
- "WORKFLOW_HTTP_TIMEOUT"

The http argument can be provided, to avoid creating a new HTTPContext object each time the work object interacts with a backend. This is useful when doing bulk withdraws from the same backend.

Returns:

Type Description
Optional[Work]

Optional[Work]: The withdrawn work if successful, None otherwise.

Source code in workflow/definitions/work.py
Python
@classmethod
def withdraw(
    cls,
    pipeline: Union[str, List[str]],
    event: Optional[List[int]] = None,
    site: Optional[str] = None,
    priority: Optional[int] = None,
    user: Optional[str] = None,
    tags: Optional[List[str]] = None,
    parent: Optional[List[str]] = None,
    timeout: float = 15.0,
    token: Optional[SecretStr] = None,
    http: Optional[HTTPContext] = None,
) -> Optional["Work"]:
    """Withdraw work from the buckets backend.

    Args:
        pipeline (Union[str, List[str]]): Name of the pipeline to withdraw work for.
        event (Optional[List[int]]): Unique event ids to withdraw work for.
        site (Optional[str]): Name of the site to withdraw work for.
        priority (Optional[int]): Priority of the work to withdraw.
        user (Optional[str]): Name of the user to withdraw work for.
        tags (Optional[List[str]]): List of tags to withdraw work from.
        parent (Optional[str]): Parent Pipeline ID of the work to withdraw.
        timeout (float, optional): HTTP timeout in seconds. Defaults to 15.0.
        token (Optional[SecretStr], optional): Workflow Access Token.
        http (Optional[HTTPContext], optional): HTTP Context for connecting to
            workflow servers.

    Note:
        `token` and `timeout` arguments can also be provided via the environment,

            - "WORKFLOW_HTTP_TOKEN"
            - "WORKFLOW_TOKEN"
            - "GITHUB_TOKEN"
            - "GITHUB_PAT"
            - "WORKFLOW_HTTP_TIMEOUT"

        The `http` argument can be provided, to avoid creating a new HTTPContext
        object each time the work object interacts with a backend. This is useful
        when doing bulk withdraws from the same backend.

    Returns:
        Optional[Work]: The withdrawn work if successful, None otherwise.
    """
    # Context is used to source environtment variables, which are overwriten
    # by the arguments passed to the function.
    http = http or HTTPContext(timeout=timeout, token=token, backends=["buckets"])
    payload = http.buckets.withdraw(
        pipeline=pipeline,
        event=event,
        site=site,
        priority=priority,
        user=user,
        tags=tags,
        parent=parent,
    )
    if payload:
        work = cls.from_dict(payload)
        work.http = http
        return work
    return None