Work Object¶
Bases: BaseModel
Work Object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
BaseModel |
BaseModel
|
Pydantic BaseModel. |
required |
Attributes:
Name | Type | Description |
---|---|---|
pipeline |
str
|
Name of the pipeline. (Required) Automatically reformated to hyphen-case. |
site |
str
|
Site where the work will be performed. (Required) |
user |
str
|
User who created the work. (Required) |
function |
Optional[str]
|
Name of the function ran as |
command |
Optional[List[str]]
|
Command to run as |
parameters |
Optional[Dict[str, Any]]
|
Parameters to pass to the function. |
command |
Optional[List[str]]
|
Command to run as |
results |
Optional[Dict[str, Any]]
|
Results of the work. |
products |
Optional[Dict[str, Any]]
|
Products of the work. |
plots |
Optional[Dict[str, Any]]
|
Plots of the work. |
event |
Optional[List[int]]
|
Event ID of the work. |
tags |
Optional[List[str]]
|
Tags of the work. |
timeout |
int
|
Timeout for the work in seconds. Default is 3600 seconds. |
retries |
int
|
Number of retries for the work. Default is 2 retries. |
priority |
int
|
Priority of the work. Default is 3. |
config |
WorkConfig
|
Configuration of the work. |
notify |
Notify
|
Notification configuration of the work. |
id |
str
|
ID of the work. |
creation |
float
|
Creation time of the work. |
start |
float
|
Start time of the work. |
stop |
float
|
Stop time of the work. |
status |
str
|
Status of the work. |
Raises:
Type | Description |
---|---|
ValueError
|
If the work is not valid. |
Returns:
Name | Type | Description |
---|---|---|
Work |
Work object. |
Example
payload
property
¶
Return the dictioanary representation of the work.
Returns:
Type | Description |
---|---|
Dict[str, Any]
|
Dict[str, Any]: The payload of the work. |
Dict[str, Any]
|
Non-instanced attributes are excluded from the payload. |
Config ¶
Pydantic Config.
delete ¶
Delete work from the buckets backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ids |
List[str]
|
List of ids to delete. |
required |
Returns:
Name | Type | Description |
---|---|---|
bool |
bool
|
True if successful, False otherwise. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
@retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
def delete(self, **kwargs: Dict[str, Any]) -> bool:
"""Delete work from the buckets backend.
Args:
ids (List[str]): List of ids to delete.
Returns:
bool: True if successful, False otherwise.
"""
buckets = Buckets(**kwargs) # type: ignore
return buckets.delete_ids([str(self.id)])
deposit ¶
Deposit work to the buckets backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
Dict[str, Any]
|
Keyword arguments for the Buckets API. |
{}
|
Returns:
Name | Type | Description |
---|---|---|
bool |
Union[bool, List[str]]
|
True if successful, False otherwise. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
@retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
def deposit(
self, return_ids: bool = False, **kwargs: Dict[str, Any]
) -> Union[bool, List[str]]:
"""Deposit work to the buckets backend.
Args:
**kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.
Returns:
bool: True if successful, False otherwise.
"""
buckets = Buckets(**kwargs) # type: ignore
return buckets.deposit(works=[self.payload], return_ids=return_ids)
from_dict
classmethod
¶
Create a work from a dictionary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
payload |
Dict[str, Any]
|
The dictionary. |
required |
Returns:
Name | Type | Description |
---|---|---|
Work |
Work
|
The work. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
from_json
classmethod
¶
Create a work from a json string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
json_str |
str
|
The json string. |
required |
Returns:
Name | Type | Description |
---|---|---|
Work |
Work
|
The work. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
post_init ¶
Initialize work attributes after validation.
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
@root_validator
def post_init(cls, values: Dict[str, Any]):
"""Initialize work attributes after validation."""
# Check if the pipeline name has any character that is uppercase
reformatted: bool = False
for char in values["pipeline"]:
if char.isupper():
values["pipeline"] = values["pipeline"].lower()
reformatted = True
break
if any(char in {" ", "_"} for char in values["pipeline"]):
values["pipeline"] = values["pipeline"].replace(" ", "-")
values["pipeline"] = values["pipeline"].replace("_", "-")
reformatted = True
# Check if the pipeline has any character that is not alphanumeric or dash
for char in values["pipeline"]:
if not char.isalnum() and char not in ["-"]:
raise ValueError(
"pipeline name can only contain letters, numbers & dashes"
)
if reformatted:
warn(
SyntaxWarning(f"pipeline reformatted to {values['pipeline']}"),
stacklevel=2,
)
# Set creation time if not already set
if values.get("creation") is None:
values["creation"] = time()
# Update tags from environment variable WORKFLOW_TAGS
if environ.get("WORKFLOW_TAGS"):
env_tags: List[str] = str(environ.get("WORKFLOW_TAGS")).split(",")
# If tags are already set, append the new ones
if values.get("tags"):
values["tags"] = values["tags"] + env_tags
else:
values["tags"] = env_tags
# Remove duplicates
values["tags"] = list(set(values["tags"]))
# Check if both command and function are set
if values.get("command") and values.get("function"):
raise ValueError("command and function cannot be set together.")
if not values.get("token"): # type: ignore
msg = "workflow token required after v4.0.0."
warn(
FutureWarning(msg),
stacklevel=2,
)
return values
update ¶
Update work in the buckets backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
Dict[str, Any]
|
Keyword arguments for the Buckets API. |
{}
|
Returns:
Name | Type | Description |
---|---|---|
bool |
bool
|
True if successful, False otherwise. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
@retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
def update(self, **kwargs: Dict[str, Any]) -> bool:
"""Update work in the buckets backend.
Args:
**kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.
Returns:
bool: True if successful, False otherwise.
"""
buckets = Buckets(**kwargs) # type: ignore
return buckets.update([self.payload])
withdraw
classmethod
¶
withdraw(pipeline: str, event: Optional[List[int]] = None, site: Optional[str] = None, priority: Optional[int] = None, user: Optional[str] = None, tags: Optional[List[str]] = None, parent: Optional[str] = None, **kwargs: Dict[str, Any]) -> Optional[Work]
Withdraw work from the buckets backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
str
|
Name of the pipeline. |
required |
**kwargs |
Dict[str, Any]
|
Keyword arguments for the Buckets API. |
{}
|
Returns:
Name | Type | Description |
---|---|---|
Work |
Optional[Work]
|
Work object. |
Source code in /home/runner/.cache/pypoetry/virtualenvs/handbook-G9X_rrhO-py3.10/lib/python3.10/site-packages/chime_frb_api/workflow/work.py
@classmethod
def withdraw(
cls,
pipeline: str,
event: Optional[List[int]] = None,
site: Optional[str] = None,
priority: Optional[int] = None,
user: Optional[str] = None,
tags: Optional[List[str]] = None,
parent: Optional[str] = None,
**kwargs: Dict[str, Any],
) -> Optional["Work"]:
"""Withdraw work from the buckets backend.
Args:
pipeline (str): Name of the pipeline.
**kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.
Returns:
Work: Work object.
"""
buckets = Buckets(**kwargs) # type: ignore
payload = buckets.withdraw(
pipeline=pipeline,
event=event,
site=site,
priority=priority,
user=user,
tags=tags,
parent=parent,
)
if payload:
return cls.from_dict(payload)
return None