Skip to content

Work Object

chime_frb_api.workflow.Work

Bases: BaseModel

Work Object.

Parameters:

Name Type Description Default
BaseModel BaseModel

Pydantic BaseModel.

required

Attributes:

Name Type Description
pipeline str

Name of the pipeline. (Required) Automatically reformated to hyphen-case.

site str

Site where the work will be performed. (Required)

user str

User who created the work. (Required)

function Optional[str]

Name of the function ran as function(**parameters).

command Optional[List[str]]

Command to run as subprocess.run(command).

parameters Optional[Dict[str, Any]]

Parameters to pass to the function.

command Optional[List[str]]

Command to run as subprocess.run(command).

results Optional[Dict[str, Any]]

Results of the work.

products Optional[Dict[str, Any]]

Products of the work.

plots Optional[Dict[str, Any]]

Plots of the work.

event Optional[List[int]]

Event ID of the work.

tags Optional[List[str]]

Tags of the work.

timeout int

Timeout for the work in seconds. Default is 3600 seconds.

retries int

Number of retries for the work. Default is 2 retries.

priority int

Priority of the work. Default is 3.

config WorkConfig

Configuration of the work.

notify Notify

Notification configuration of the work.

id str

ID of the work.

creation float

Creation time of the work.

start float

Start time of the work.

stop float

Stop time of the work.

status str

Status of the work.

Raises:

Type Description
ValueError

If the work is not valid.

Returns:

Name Type Description
Work

Work object.

Example
from chime_frb_api.workflow import Work

work = Work(pipeline="test-pipeline", site="chime", user="shinybrar")
work.deposit(return_ids=True)
Source code in chime_frb_api/workflow/work.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
class Work(BaseModel):
    """Work Object.

    Args:
        BaseModel (BaseModel): Pydantic BaseModel.

    Attributes:
        pipeline (str): Name of the pipeline. (Required)
            Automatically reformated to hyphen-case.
        site (str): Site where the work will be performed. (Required)
        user (str): User who created the work. (Required)
        function (Optional[str]): Name of the function ran as `function(**parameters)`.
        command (Optional[List[str]]): Command to run as `subprocess.run(command)`.
        parameters (Optional[Dict[str, Any]]): Parameters to pass to the function.
        command (Optional[List[str]]): Command to run as `subprocess.run(command)`.
        results (Optional[Dict[str, Any]]): Results of the work.
        products (Optional[Dict[str, Any]]): Products of the work.
        plots (Optional[Dict[str, Any]]): Plots of the work.
        event (Optional[List[int]]): Event ID of the work.
        tags (Optional[List[str]]): Tags of the work.
        timeout (int): Timeout for the work in seconds. Default is 3600 seconds.
        retries (int): Number of retries for the work. Default is 2 retries.
        priority (int): Priority of the work. Default is 3.
        config (WorkConfig): Configuration of the work.
        notify (Notify): Notification configuration of the work.
        id (str): ID of the work.
        creation (float): Creation time of the work.
        start (float): Start time of the work.
        stop (float): Stop time of the work.
        status (str): Status of the work.

    Raises:
        ValueError: If the work is not valid.

    Returns:
        Work: Work object.

    Example:
        ```python
        from chime_frb_api.workflow import Work

        work = Work(pipeline="test-pipeline", site="chime", user="shinybrar")
        work.deposit(return_ids=True)
        ```
    """

    class Config:
        """Pydantic Config."""

        validate_all = True
        validate_assignment = True
        exclude_none = True

    ###########################################################################
    # Required Attributes. Set by user.
    ###########################################################################
    pipeline: StrictStr = Field(
        ...,
        min_length=1,
        description="Name of the pipeline. Automatically reformated to hyphen-case.xw",
        example="example-pipeline",
    )
    site: Literal[
        "canfar",
        "cedar",
        "chime",
        "aro",
        "hco",
        "gbo",
        "kko",
        "local",
    ] = Field(
        ...,
        description="Site where the work will be performed.",
        example="chime",
    )
    user: StrictStr = Field(
        ..., description="User ID who created the work.", example="shinybrar"
    )
    token: Optional[SecretStr] = Field(
        default=next(
            (
                value
                for value in [
                    environ.get("GITHUB_TOKEN"),
                    environ.get("WORKFLOW_TOKEN"),
                    environ.get("GITHUB_PAT"),
                    environ.get("GITHUB_ACCESS_TOKEN"),
                    environ.get("GITHUB_PERSONAL_ACCESS_TOKEN"),
                    environ.get("GITHUB_OAUTH_TOKEN"),
                    environ.get("GITHUB_OAUTH_ACCESS_TOKEN"),
                ]
                if value is not None
            ),
            None,
        ),
        description="Github Personal Access Token.",
        example="ghp_1234567890abcdefg",
        exclude=True,
    )

    ###########################################################################
    # Optional attributes, might be provided by the user.
    ###########################################################################
    function: str = Field(
        default=None,
        description="""
        Name of the function to run as `function(**parameters)`.
        Only either `function` or `command` can be provided.
        """,
        example="requests.get",
    )
    parameters: Optional[Dict[str, Any]] = Field(
        default=None,
        description="""
        Parameters to pass the pipeline function.
        """,
        example={"event_number": 9385707},
    )
    command: List[str] = Field(
        default=None,
        description="""
        Command to run as `subprocess.run(command)`.
        Note, only either `function` or `command` can be provided.
        """,
        example=["python", "example.py", "--example", "example"],
    )
    results: Optional[Dict[str, Any]] = Field(
        default=None,
        description="Results of the work performed, if any.",
        example={"dm": 100.0, "snr": 10.0},
    )
    products: Optional[List[StrictStr]] = Field(
        default=None,
        description="""
        Name of the non-human-readable data products generated by the pipeline.
        """,
        example=["spectra.h5", "dm_vs_time.png"],
    )
    plots: Optional[List[StrictStr]] = Field(
        default=None,
        description="""
        Name of visual data products generated by the pipeline.
        """,
        example=["waterfall.png", "/arc/projects/chimefrb/9385707/9385707.png"],
    )
    event: Optional[List[int]] = Field(
        default=None,
        description="CHIME/FRB Event ID[s] the work was performed against.",
        example=[9385707, 9385708],
    )
    tags: Optional[List[str]] = Field(
        default=None,
        description="""
        Searchable tags for the work. Merged with values from env WORKFLOW_TAGS.
        """,
        example=["dm-analysis"],
    )
    timeout: int = Field(
        default=3600,
        ge=1,
        le=86400,
        description="""
        Timeout in seconds for the work to finish.
        Defaults 3600s (1 hr) with range of [1, 86400] (1s-24hrs).
        """,
        example=7200,
    )
    retries: int = Field(
        default=2,
        lt=6,
        description="Number of retries before giving up. Defaults to 2.",
        example=4,
    )
    priority: int = Field(
        default=3,
        ge=1,
        le=5,
        description="Priority of the work. Defaults to 3.",
        example=1,
    )
    config: WorkConfig = WorkConfig()
    notify: Notify = Notify()

    ###########################################################################
    # Automaticaly set attributes
    ###########################################################################
    id: Optional[StrictStr] = Field(
        default=None, description="Work ID created by the database."
    )
    creation: Optional[StrictFloat] = Field(
        default=None, description="Unix timestamp of when the work was created."
    )
    start: Optional[StrictFloat] = Field(
        default=None,
        description="Unix timestamp when the work was started, reset at each attempt.",
    )
    stop: Optional[StrictFloat] = Field(
        default=None,
        description="Unix timestamp when the work was stopped, reset at each attempt.",
    )
    attempt: int = Field(
        default=0, ge=0, description="Attempt number at performing the work."
    )
    status: Literal["created", "queued", "running", "success", "failure"] = Field(
        default="created", description="Status of the work."
    )
    ###########################################################################
    # Attribute setters for the work attributes
    ###########################################################################

    @root_validator
    def post_init(cls, values: Dict[str, Any]):
        """Initialize work attributes after validation."""
        # Check if the pipeline name has any character that is uppercase
        reformatted: bool = False
        for char in values["pipeline"]:
            if char.isupper():
                values["pipeline"] = values["pipeline"].lower()
                reformatted = True
                break

        if any(char in {" ", "_"} for char in values["pipeline"]):
            values["pipeline"] = values["pipeline"].replace(" ", "-")
            values["pipeline"] = values["pipeline"].replace("_", "-")
            reformatted = True

        # Check if the pipeline has any character that is not alphanumeric or dash
        for char in values["pipeline"]:
            if not char.isalnum() and char not in ["-"]:
                raise ValueError(
                    "pipeline name can only contain letters, numbers & dashes"
                )

        if reformatted:
            warn(
                SyntaxWarning(f"pipeline reformatted to {values['pipeline']}"),
                stacklevel=2,
            )

        # Set creation time if not already set
        if values.get("creation") is None:
            values["creation"] = time()
        # Update tags from environment variable WORKFLOW_TAGS
        if environ.get("WORKFLOW_TAGS"):
            env_tags: List[str] = str(environ.get("WORKFLOW_TAGS")).split(",")
            # If tags are already set, append the new ones
            if values.get("tags"):
                values["tags"] = values["tags"] + env_tags
            else:
                values["tags"] = env_tags
            # Remove duplicates
            values["tags"] = list(set(values["tags"]))

        # Check if both command and function are set
        if values.get("command") and values.get("function"):
            raise ValueError("command and function cannot be set together.")

        if not values.get("token"):  # type: ignore
            msg = "workflow token required after v4.0.0."
            warn(
                FutureWarning(msg),
                stacklevel=2,
            )
        return values

    ###########################################################################
    # Work methods
    ###########################################################################

    @property
    def payload(self) -> Dict[str, Any]:
        """Return the dictioanary representation of the work.

        Returns:
            Dict[str, Any]: The payload of the work.
            Non-instanced attributes are excluded from the payload.
        """
        payload: Dict[str, Any] = self.dict(exclude={"config.token"})
        return payload

    @classmethod
    def from_json(cls, json_str: str) -> "Work":
        """Create a work from a json string.

        Args:
            json_str (str): The json string.

        Returns:
            Work: The work.
        """
        return cls(**loads(json_str))

    @classmethod
    def from_dict(cls, payload: Dict[str, Any]) -> "Work":
        """Create a work from a dictionary.

        Args:
            payload (Dict[str, Any]): The dictionary.

        Returns:
            Work: The work.
        """
        return cls(**payload)

    ###########################################################################
    # HTTP Methods
    ###########################################################################

    @classmethod
    def withdraw(
        cls,
        pipeline: str,
        event: Optional[List[int]] = None,
        site: Optional[str] = None,
        priority: Optional[int] = None,
        user: Optional[str] = None,
        **kwargs: Dict[str, Any],
    ) -> Optional["Work"]:
        """Withdraw work from the buckets backend.

        Args:
            pipeline (str): Name of the pipeline.
            **kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.

        Returns:
            Work: Work object.
        """
        buckets = Buckets(**kwargs)  # type: ignore
        payload = buckets.withdraw(
            pipeline=pipeline, event=event, site=site, priority=priority, user=user
        )
        if payload:
            return cls.from_dict(payload)
        return None

    @retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
    def deposit(
        self, return_ids: bool = False, **kwargs: Dict[str, Any]
    ) -> Union[bool, List[str]]:
        """Deposit work to the buckets backend.

        Args:
            **kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.

        Returns:
            bool: True if successful, False otherwise.
        """
        buckets = Buckets(**kwargs)  # type: ignore
        return buckets.deposit(works=[self.payload], return_ids=return_ids)

    @retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
    def update(self, **kwargs: Dict[str, Any]) -> bool:
        """Update work in the buckets backend.

        Args:
            **kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.

        Returns:
            bool: True if successful, False otherwise.
        """
        buckets = Buckets(**kwargs)  # type: ignore
        return buckets.update([self.payload])

    @retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
    def delete(self, **kwargs: Dict[str, Any]) -> bool:
        """Delete work from the buckets backend.

        Args:
            ids (List[str]): List of ids to delete.

        Returns:
            bool: True if successful, False otherwise.
        """
        buckets = Buckets(**kwargs)  # type: ignore
        return buckets.delete_ids([str(self.id)])
Attributes
payload: Dict[str, Any] property

Return the dictioanary representation of the work.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The payload of the work.

Dict[str, Any]

Non-instanced attributes are excluded from the payload.

Classes
Config

Pydantic Config.

Source code in chime_frb_api/workflow/work.py
class Config:
    """Pydantic Config."""

    validate_all = True
    validate_assignment = True
    exclude_none = True
Functions
delete(**kwargs)

Delete work from the buckets backend.

Parameters:

Name Type Description Default
ids List[str]

List of ids to delete.

required

Returns:

Name Type Description
bool bool

True if successful, False otherwise.

Source code in chime_frb_api/workflow/work.py
@retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
def delete(self, **kwargs: Dict[str, Any]) -> bool:
    """Delete work from the buckets backend.

    Args:
        ids (List[str]): List of ids to delete.

    Returns:
        bool: True if successful, False otherwise.
    """
    buckets = Buckets(**kwargs)  # type: ignore
    return buckets.delete_ids([str(self.id)])
deposit(return_ids=False, **kwargs)

Deposit work to the buckets backend.

Parameters:

Name Type Description Default
**kwargs Dict[str, Any]

Keyword arguments for the Buckets API.

{}

Returns:

Name Type Description
bool Union[bool, List[str]]

True if successful, False otherwise.

Source code in chime_frb_api/workflow/work.py
@retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
def deposit(
    self, return_ids: bool = False, **kwargs: Dict[str, Any]
) -> Union[bool, List[str]]:
    """Deposit work to the buckets backend.

    Args:
        **kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.

    Returns:
        bool: True if successful, False otherwise.
    """
    buckets = Buckets(**kwargs)  # type: ignore
    return buckets.deposit(works=[self.payload], return_ids=return_ids)
from_dict(payload) classmethod

Create a work from a dictionary.

Parameters:

Name Type Description Default
payload Dict[str, Any]

The dictionary.

required

Returns:

Name Type Description
Work Work

The work.

Source code in chime_frb_api/workflow/work.py
@classmethod
def from_dict(cls, payload: Dict[str, Any]) -> "Work":
    """Create a work from a dictionary.

    Args:
        payload (Dict[str, Any]): The dictionary.

    Returns:
        Work: The work.
    """
    return cls(**payload)
from_json(json_str) classmethod

Create a work from a json string.

Parameters:

Name Type Description Default
json_str str

The json string.

required

Returns:

Name Type Description
Work Work

The work.

Source code in chime_frb_api/workflow/work.py
@classmethod
def from_json(cls, json_str: str) -> "Work":
    """Create a work from a json string.

    Args:
        json_str (str): The json string.

    Returns:
        Work: The work.
    """
    return cls(**loads(json_str))
post_init(values)

Initialize work attributes after validation.

Source code in chime_frb_api/workflow/work.py
@root_validator
def post_init(cls, values: Dict[str, Any]):
    """Initialize work attributes after validation."""
    # Check if the pipeline name has any character that is uppercase
    reformatted: bool = False
    for char in values["pipeline"]:
        if char.isupper():
            values["pipeline"] = values["pipeline"].lower()
            reformatted = True
            break

    if any(char in {" ", "_"} for char in values["pipeline"]):
        values["pipeline"] = values["pipeline"].replace(" ", "-")
        values["pipeline"] = values["pipeline"].replace("_", "-")
        reformatted = True

    # Check if the pipeline has any character that is not alphanumeric or dash
    for char in values["pipeline"]:
        if not char.isalnum() and char not in ["-"]:
            raise ValueError(
                "pipeline name can only contain letters, numbers & dashes"
            )

    if reformatted:
        warn(
            SyntaxWarning(f"pipeline reformatted to {values['pipeline']}"),
            stacklevel=2,
        )

    # Set creation time if not already set
    if values.get("creation") is None:
        values["creation"] = time()
    # Update tags from environment variable WORKFLOW_TAGS
    if environ.get("WORKFLOW_TAGS"):
        env_tags: List[str] = str(environ.get("WORKFLOW_TAGS")).split(",")
        # If tags are already set, append the new ones
        if values.get("tags"):
            values["tags"] = values["tags"] + env_tags
        else:
            values["tags"] = env_tags
        # Remove duplicates
        values["tags"] = list(set(values["tags"]))

    # Check if both command and function are set
    if values.get("command") and values.get("function"):
        raise ValueError("command and function cannot be set together.")

    if not values.get("token"):  # type: ignore
        msg = "workflow token required after v4.0.0."
        warn(
            FutureWarning(msg),
            stacklevel=2,
        )
    return values
update(**kwargs)

Update work in the buckets backend.

Parameters:

Name Type Description Default
**kwargs Dict[str, Any]

Keyword arguments for the Buckets API.

{}

Returns:

Name Type Description
bool bool

True if successful, False otherwise.

Source code in chime_frb_api/workflow/work.py
@retry(wait=wait_random(min=0.5, max=1.5), stop=(stop_after_delay(30)))
def update(self, **kwargs: Dict[str, Any]) -> bool:
    """Update work in the buckets backend.

    Args:
        **kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.

    Returns:
        bool: True if successful, False otherwise.
    """
    buckets = Buckets(**kwargs)  # type: ignore
    return buckets.update([self.payload])
withdraw(pipeline, event=None, site=None, priority=None, user=None, **kwargs) classmethod

Withdraw work from the buckets backend.

Parameters:

Name Type Description Default
pipeline str

Name of the pipeline.

required
**kwargs Dict[str, Any]

Keyword arguments for the Buckets API.

{}

Returns:

Name Type Description
Work Optional[Work]

Work object.

Source code in chime_frb_api/workflow/work.py
@classmethod
def withdraw(
    cls,
    pipeline: str,
    event: Optional[List[int]] = None,
    site: Optional[str] = None,
    priority: Optional[int] = None,
    user: Optional[str] = None,
    **kwargs: Dict[str, Any],
) -> Optional["Work"]:
    """Withdraw work from the buckets backend.

    Args:
        pipeline (str): Name of the pipeline.
        **kwargs (Dict[str, Any]): Keyword arguments for the Buckets API.

    Returns:
        Work: Work object.
    """
    buckets = Buckets(**kwargs)  # type: ignore
    payload = buckets.withdraw(
        pipeline=pipeline, event=event, site=site, priority=priority, user=user
    )
    if payload:
        return cls.from_dict(payload)
    return None