Skip to content

Work Object

Bases: BaseSettings

Workflow Work Object.

The work object defines the lifecycle of any action to be performed.

Parameters:

Name Type Description Default
BaseSettings BaseSettings

Pydantic BaseModel with settings.

required
Note

The selection priority for attributes in descending order is:

  • Arguments passed to the Work Object.
  • Environment variables with WORKFLOW_ prefix.
  • Variables from /run/secrets secrets directory.
  • The default values in the class constructor.

Environemnt Prefixes:

  • WORKFLOW_ for all work attributes.
  • WORKFLOW_HTTP_ for all work http attributes.
  • WORKFLOW_CONFIG_ for all work config attributes.
  • WORKFLOW_NOTIFY_ for all work notify attributes.

The size of the results dictionary is limited to 4MB. If the results dictionary is larger than 4MB, it is ignored and not mapped to the work object. Use the products attribute to store large data products.

Attributes:

Name Type Description
pipeline str

Name of the pipeline. (Required)

site str

Site where the work will be performed. (Required)

user str

User who created the work. (Required)

function str

Python function to run function(**parameters).

parameters Dict[str, Any]

Parameters to pass to the function.

command List[str]

Command to run as subprocess.run(command).

results Dict[str, Any]

Results from the work.

products List[str]

Data products from work.

plots List[str]

Visual plots of the work.

tags List[str]

Searchable tags of the work.

event List[int]

Unique ID[s] the work was performed against.

id str

BSON ID of the work, set by the database.

creation float

Creation time of the work.

start float

Start time of the work, set by the backend.

stop float

Stop time of the work, set by the client.

attempt int

Attempt number of the work. Defaults to 0.

status str

Status of the work. Defaults to "created".

timeout int

Timeout for the work in seconds. Defaults to 3600 seconds.

retries int

Number of retries for the work. Defaults to 2 retries.

priority int

Priority of the work. Defaults to 3.

config Config

Configuration for the lifecycle of the work.

notify Notify

Notification configuration for the work.

workspace FilePath

Path to the active workspace configuration. Defaults to ~/.config/workflow/workspace.yml. (Excluded from payload)

token SecretStr

Workflow Access Token. (Excluded from payload)

http HTTPContext

HTTP Context for backends. (Excluded from payload)

Raises:

Type Description
ValueError

When work is invalid.

Returns:

Name Type Description
Object Work

Work object.

Example
Bash
# Set the environment variables to override the defaults.
export WORKFLOW_PRIORITY=4
Python
from workflow.definitions.work import Work

work = Work(pipeline="test", site="local", user="shinybrar")
work.dump_model()
{
    "config": {...},
    "notify": {"slack": {}},
    "pipeline": "test-pipeline",
    "site": "local",
    "user": "shinybrar",
    "timeout": 3600,
    "retries": 2,
    "priority": 4,  # Overridden by the environment variable.
    "attempt": 0,
    "status": "created",
}
work.deposit(return_ids=True)

payload property

Return the dictionary representation of the work.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The payload of the work.

Dict[str, Any]

Non-instanced attributes are excluded from the payload.

delete()

Delete work from the buckets backend.

Returns:

Name Type Description
bool bool

True if successful, False otherwise.

deposit(return_ids=False, timeout=15.0, token=None, http=None)

Deposit work to the buckets backend.

Parameters:

Name Type Description Default
return_ids bool

Return Database ID. Defaults to False.

False
timeout float

HTTP request timeout in seconds.

15.0
token Optional[SecretStr]

Workflow Access Token.

None
http Optional[HTTPContext]

HTTP Context for backend.

None
Note

Both token and http can be either provided as arguments to the function or also inherited from the work object itself.

Returns:

Type Description
Union[bool, List[str]]

Union[bool, List[str]]: True if successful, False otherwise.

from_dict(payload) classmethod

Create a work from a dictionary.

Parameters:

Name Type Description Default
payload Dict[str, Any]

The dictionary.

required

Returns:

Name Type Description
Work Work

Work Object.

from_json(json) classmethod

Create a work from a json string.

Parameters:

Name Type Description Default
json str

The json string.

required

Returns:

Name Type Description
Work Work

Work Object.

post_model_validation()

Validate the work model after creating the object.

Raises:

Type Description
ValueError

Raised if, the site provided is not allowed in the workspace.

Returns:

Name Type Description
Work Work

The current work object.

pre_validation(data)

Validate the work model before creating the object.

Parameters:

Name Type Description Default
data Any

Work data to validate.

required

Raises:

Type Description
ValueError

If both function and command are set.

Returns:

Name Type Description
Any Any

Validated work data.

update()

Update work in the buckets backend.

Returns:

Name Type Description
bool bool

True if successful, False otherwise.

validate_creation(creation)

Validate and set the creation time.

Parameters:

Name Type Description Default
creation Optional[float]

Creation time in unix timestamp.

required

Returns:

Name Type Description
float float

Creation time in unix timestamp.

validate_pipeline(pipeline)

Validate the pipeline name.

Parameters:

Name Type Description Default
pipeline str

Name of the pipeline.

required

Raises:

Type Description
ValueError

If the pipeline name contains any character that is not

Returns:

Name Type Description
str str

Validated pipeline name.

withdraw(pipeline, event=None, site=None, priority=None, user=None, tags=None, parent=None, timeout=15.0, token=None, http=None) classmethod

Withdraw work from the buckets backend.

Parameters:

Name Type Description Default
pipeline Union[str, List[str]]

Name of the pipeline to withdraw work for.

required
event Optional[List[int]]

Unique event ids to withdraw work for.

None
site Optional[str]

Name of the site to withdraw work for.

None
priority Optional[int]

Priority of the work to withdraw.

None
user Optional[str]

Name of the user to withdraw work for.

None
tags Optional[List[str]]

List of tags to withdraw work from.

None
parent Optional[str]

Parent Pipeline ID of the work to withdraw.

None
timeout float

HTTP timeout in seconds. Defaults to 15.0.

15.0
token Optional[SecretStr]

Workflow Access Token.

None
http Optional[HTTPContext]

HTTP Context for connecting to workflow servers.

None
Note

token and timeout arguments can also be provided via the environment,

Text Only
- "WORKFLOW_HTTP_TOKEN"
- "WORKFLOW_TOKEN"
- "GITHUB_TOKEN"
- "GITHUB_PAT"
- "WORKFLOW_HTTP_TIMEOUT"

The http argument can be provided, to avoid creating a new HTTPContext object each time the work object interacts with a backend. This is useful when doing bulk withdraws from the same backend.

Returns:

Type Description
Optional[Work]

Optional[Work]: The withdrawn work if successful, None otherwise.