Work Object¶
Bases: BaseSettings
Workflow Work Object.
The work object defines the lifecycle of any action to be performed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
BaseSettings
|
BaseSettings
|
Pydantic BaseModel with settings. |
required |
Note
The selection priority for attributes in descending order is:
- Arguments passed to the
Work
Object. - Environment variables with
WORKFLOW_
prefix. - Variables from /run/secrets secrets directory.
- The default values in the class constructor.
Environemnt Prefixes:
WORKFLOW_
for all work attributes.WORKFLOW_HTTP_
for all work http attributes.WORKFLOW_CONFIG_
for all work config attributes.WORKFLOW_NOTIFY_
for all work notify attributes.
The size of the results dictionary is limited to 4MB. If the results
dictionary is larger than 4MB, it is ignored and not mapped to the
work object. Use the products
attribute to store large data products.
Attributes:
Name | Type | Description |
---|---|---|
pipeline |
str
|
Name of the pipeline. (Required) |
site |
str
|
Site where the work will be performed. (Required) |
user |
str
|
User who created the work. (Required) |
function |
str
|
Python function to run |
parameters |
Dict[str, Any]
|
Parameters to pass to the function. |
command |
List[str]
|
Command to run as |
results |
Dict[str, Any]
|
Results from the work. |
products |
List[str]
|
Data products from work. |
plots |
List[str]
|
Visual plots of the work. |
tags |
List[str]
|
Searchable tags of the work. |
event |
List[int]
|
Unique ID[s] the work was performed against. |
id |
str
|
BSON ID of the work, set by the database. |
creation |
float
|
Creation time of the work. |
start |
float
|
Start time of the work, set by the backend. |
stop |
float
|
Stop time of the work, set by the client. |
attempt |
int
|
Attempt number of the work. Defaults to 0. |
status |
str
|
Status of the work. Defaults to "created". |
timeout |
int
|
Timeout for the work in seconds. Defaults to 3600 seconds. |
retries |
int
|
Number of retries for the work. Defaults to 2 retries. |
priority |
int
|
Priority of the work. Defaults to 3. |
config |
Config
|
Configuration for the lifecycle of the work. |
notify |
Notify
|
Notification configuration for the work. |
workspace |
FilePath
|
Path to the active workspace configuration.
Defaults to |
token |
SecretStr
|
Workflow Access Token. (Excluded from payload) |
http |
HTTPContext
|
HTTP Context for backends. (Excluded from payload) |
Raises:
Type | Description |
---|---|
ValueError
|
When work is invalid. |
Returns:
Name | Type | Description |
---|---|---|
Object |
Work
|
Work object. |
Example
from workflow.definitions.work import Work
work = Work(pipeline="test", site="local", user="shinybrar")
work.dump_model()
{
"config": {...},
"notify": {"slack": {}},
"pipeline": "test-pipeline",
"site": "local",
"user": "shinybrar",
"timeout": 3600,
"retries": 2,
"priority": 4, # Overridden by the environment variable.
"attempt": 0,
"status": "created",
}
work.deposit(return_ids=True)
payload
property
¶
Return the dictionary representation of the work.
Returns:
Type | Description |
---|---|
Dict[str, Any]
|
Dict[str, Any]: The payload of the work. |
Dict[str, Any]
|
Non-instanced attributes are excluded from the payload. |
delete ¶
Delete work from the buckets backend.
Returns:
Name | Type | Description |
---|---|---|
bool |
bool
|
True if successful, False otherwise. |
deposit ¶
deposit(return_ids: bool = False, timeout: float = 15.0, token: Optional[SecretStr] = None, http: Optional[HTTPContext] = None) -> Union[bool, List[str]]
Deposit work to the buckets backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
return_ids
|
bool
|
Return Database ID. Defaults to False. |
False
|
timeout
|
float
|
HTTP request timeout in seconds. |
15.0
|
token
|
Optional[SecretStr]
|
Workflow Access Token. |
None
|
http
|
Optional[HTTPContext]
|
HTTP Context for backend. |
None
|
Note
Both token and http can be either provided as arguments to the function or also inherited from the work object itself.
Returns:
Type | Description |
---|---|
Union[bool, List[str]]
|
Union[bool, List[str]]: True if successful, False otherwise. |
Source code in workflow/definitions/work.py
def deposit(
self,
return_ids: bool = False,
timeout: float = 15.0,
token: Optional[SecretStr] = None,
http: Optional[HTTPContext] = None,
) -> Union[bool, List[str]]:
"""Deposit work to the buckets backend.
Args:
return_ids (bool, optional): Return Database ID. Defaults to False.
timeout (float, optional): HTTP request timeout in seconds.
token (Optional[SecretStr], optional): Workflow Access Token.
http (Optional[HTTPContext], optional): HTTP Context for backend.
Note:
Both token and http can be either provided as arguments to the function
or also inherited from the work object itself.
Returns:
Union[bool, List[str]]: True if successful, False otherwise.
"""
self.token = token or self.token
self.http = (
http
or self.http
or HTTPContext(timeout=timeout, token=token, backends=["buckets"])
)
return self.http.buckets.deposit(works=[self.payload], return_ids=return_ids)
from_dict
classmethod
¶
Create a work from a dictionary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
payload
|
Dict[str, Any]
|
The dictionary. |
required |
Returns:
Name | Type | Description |
---|---|---|
Work |
Work
|
Work Object. |
from_json
classmethod
¶
Create a work from a json string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
json
|
str
|
The json string. |
required |
Returns:
Name | Type | Description |
---|---|---|
Work |
Work
|
Work Object. |
post_model_validation ¶
Validate the work model after creating the object.
Raises:
Type | Description |
---|---|
ValueError
|
Raised if, the site provided is not allowed in the workspace. |
Returns:
Name | Type | Description |
---|---|---|
Work |
Work
|
The current work object. |
Source code in workflow/definitions/work.py
@model_validator(mode="after")
def post_model_validation(self) -> "Work":
"""Validate the work model after creating the object.
Raises:
ValueError: Raised if,
the site provided is not allowed in the workspace.
Returns:
Work: The current work object.
"""
# Validate if the site provided is allowed in the workspace.
if self.config.strategy == "strict":
config: Dict[str, Any] = read.workspace(self.workspace)
sites: List[str] = config.get("sites", [])
if self.site not in sites:
error = f"site {self.site} not in workspace: {sites}."
raise ValueError(error)
return self
pre_validation ¶
Validate the work model before creating the object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
Any
|
Work data to validate. |
required |
Raises:
Type | Description |
---|---|
ValueError
|
If both |
Returns:
Name | Type | Description |
---|---|---|
Any |
Any
|
Validated work data. |
Source code in workflow/definitions/work.py
@model_validator(mode="before")
def pre_validation(cls, data: Any) -> Any:
"""Validate the work model before creating the object.
Args:
data (Any): Work data to validate.
Raises:
ValueError: If both `function` and `command` are set.
Returns:
Any: Validated work data.
"""
if data.get("function") and data.get("command"):
raise ValueError("Only either function or command can be provided.")
return data
update ¶
Update work in the buckets backend.
Returns:
Name | Type | Description |
---|---|---|
bool |
bool
|
True if successful, False otherwise. |
validate_creation ¶
Validate and set the creation time.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
creation
|
Optional[float]
|
Creation time in unix timestamp. |
required |
Returns:
Name | Type | Description |
---|---|---|
float |
float
|
Creation time in unix timestamp. |
Source code in workflow/definitions/work.py
@field_validator("creation")
def validate_creation(cls, creation: Optional[float]) -> float:
"""Validate and set the creation time.
Args:
creation (Optional[float]): Creation time in unix timestamp.
Returns:
float: Creation time in unix timestamp.
"""
if creation is None:
return time()
return creation
validate_pipeline ¶
Validate the pipeline name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline
|
str
|
Name of the pipeline. |
required |
Raises:
Type | Description |
---|---|
ValueError
|
If the pipeline name contains any character that is not |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
Validated pipeline name. |
Source code in workflow/definitions/work.py
@field_validator("pipeline", mode="after", check_fields=True)
def validate_pipeline(cls, pipeline: str) -> str:
"""Validate the pipeline name.
Args:
pipeline (str): Name of the pipeline.
Raises:
ValueError: If the pipeline name contains any character that is not
Returns:
str: Validated pipeline name.
"""
for char in pipeline:
if not char.isalnum() and char not in ["-"]:
raise ValueError(
"pipeline name can only contain letters, numbers & hyphens."
)
return pipeline
withdraw
classmethod
¶
withdraw(pipeline: Union[str, List[str]], event: Optional[List[int]] = None, site: Optional[str] = None, priority: Optional[int] = None, user: Optional[str] = None, tags: Optional[List[str]] = None, parent: Optional[List[str]] = None, timeout: float = 15.0, token: Optional[SecretStr] = None, http: Optional[HTTPContext] = None) -> Optional[Work]
Withdraw work from the buckets backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline
|
Union[str, List[str]]
|
Name of the pipeline to withdraw work for. |
required |
event
|
Optional[List[int]]
|
Unique event ids to withdraw work for. |
None
|
site
|
Optional[str]
|
Name of the site to withdraw work for. |
None
|
priority
|
Optional[int]
|
Priority of the work to withdraw. |
None
|
user
|
Optional[str]
|
Name of the user to withdraw work for. |
None
|
tags
|
Optional[List[str]]
|
List of tags to withdraw work from. |
None
|
parent
|
Optional[str]
|
Parent Pipeline ID of the work to withdraw. |
None
|
timeout
|
float
|
HTTP timeout in seconds. Defaults to 15.0. |
15.0
|
token
|
Optional[SecretStr]
|
Workflow Access Token. |
None
|
http
|
Optional[HTTPContext]
|
HTTP Context for connecting to workflow servers. |
None
|
Note
token
and timeout
arguments can also be provided via the environment,
- "WORKFLOW_HTTP_TOKEN"
- "WORKFLOW_TOKEN"
- "GITHUB_TOKEN"
- "GITHUB_PAT"
- "WORKFLOW_HTTP_TIMEOUT"
The http
argument can be provided, to avoid creating a new HTTPContext
object each time the work object interacts with a backend. This is useful
when doing bulk withdraws from the same backend.
Returns:
Type | Description |
---|---|
Optional[Work]
|
Optional[Work]: The withdrawn work if successful, None otherwise. |
Source code in workflow/definitions/work.py
@classmethod
def withdraw(
cls,
pipeline: Union[str, List[str]],
event: Optional[List[int]] = None,
site: Optional[str] = None,
priority: Optional[int] = None,
user: Optional[str] = None,
tags: Optional[List[str]] = None,
parent: Optional[List[str]] = None,
timeout: float = 15.0,
token: Optional[SecretStr] = None,
http: Optional[HTTPContext] = None,
) -> Optional["Work"]:
"""Withdraw work from the buckets backend.
Args:
pipeline (Union[str, List[str]]): Name of the pipeline to withdraw work for.
event (Optional[List[int]]): Unique event ids to withdraw work for.
site (Optional[str]): Name of the site to withdraw work for.
priority (Optional[int]): Priority of the work to withdraw.
user (Optional[str]): Name of the user to withdraw work for.
tags (Optional[List[str]]): List of tags to withdraw work from.
parent (Optional[str]): Parent Pipeline ID of the work to withdraw.
timeout (float, optional): HTTP timeout in seconds. Defaults to 15.0.
token (Optional[SecretStr], optional): Workflow Access Token.
http (Optional[HTTPContext], optional): HTTP Context for connecting to
workflow servers.
Note:
`token` and `timeout` arguments can also be provided via the environment,
- "WORKFLOW_HTTP_TOKEN"
- "WORKFLOW_TOKEN"
- "GITHUB_TOKEN"
- "GITHUB_PAT"
- "WORKFLOW_HTTP_TIMEOUT"
The `http` argument can be provided, to avoid creating a new HTTPContext
object each time the work object interacts with a backend. This is useful
when doing bulk withdraws from the same backend.
Returns:
Optional[Work]: The withdrawn work if successful, None otherwise.
"""
# Context is used to source environtment variables, which are overwriten
# by the arguments passed to the function.
http = http or HTTPContext(timeout=timeout, token=token, backends=["buckets"])
payload = http.buckets.withdraw(
pipeline=pipeline,
event=event,
site=site,
priority=priority,
user=user,
tags=tags,
parent=parent,
)
if payload:
work = cls.from_dict(payload)
work.http = http
return work
return None