Skip to content

Buckets API

Bases: Client

HTTP Client for interacting with the Buckets backend.

Parameters:

Name Type Description Default
Client client

The base class for interacting with the backend.

required

Returns:

Name Type Description
Buckets

A client for interacting with the Buckets backend.

audit

Python
audit() -> Dict[str, Any]

Audit work buckets backend.

The audit process retries failed work, expires any work past the expiration time and checks for any stale work older than 7 days.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The audit results.

Source code in workflow/http/buckets.py
Python
def audit(self) -> Dict[str, Any]:
    """Audit work buckets backend.

    The audit process retries failed work, expires any work past the
    expiration time and checks for any stale work older than 7 days.

    Returns:
        Dict[str, Any]: The audit results.
    """
    reply: Dict[str, Any] = {}
    urls: Dict[str, str] = {
        "failed": "/audit/failed",
        "expired": "/audit/expired",
        "stale": "/audit/stale/7.0",
    }
    with self.session as session:
        for process, url in urls.items():
            response: Response = session.get(url=f"{self.baseurl}{url}")
            response.raise_for_status()
            reply[process] = response.json()
    return reply

delete_ids

Python
delete_ids(ids: List[str]) -> bool

Delete works from the buckets backend with the given ids.

Parameters:

Name Type Description Default
ids List[str]

The IDs of the works to delete.

required

Returns:

Name Type Description
bool bool

Whether the works were deleted successfully.

Source code in workflow/http/buckets.py
Python
@retry(wait=wait_random(min=0.1, max=2), stop=(stop_after_delay(30)))
def delete_ids(self, ids: List[str]) -> bool:
    """Delete works from the buckets backend with the given ids.

    Args:
        ids (List[str]): The IDs of the works to delete.

    Returns:
        bool: Whether the works were deleted successfully.
    """
    with self.session as session:
        response: Response = session.delete(
            url=f"{self.baseurl}/work", params={"ids": ids}
        )
        response.raise_for_status()
    return response.json()

delete_many

Python
delete_many(
    pipeline: str,
    status: Optional[str] = None,
    events: Optional[List[int]] = None,
    tags: Optional[List[str]] = None,
    parent: Optional[str] = None,
    force: bool = False,
) -> bool

Delete works belonging to a pipeline from the buckets backend.

If a status is provided, only works with that status will be deleted. If an event number is provided, only works with that event will be deleted.

Parameters:

Name Type Description Default
pipeline str

The pipeline to delete works from.

required
status Optional[List[str]]

The status to delete works with. e.g. ["queued"].

None
event Optional[List[int]]

The event to delete works with.

required
force bool

Whether to force the deletion without requiring user confirmation. Defaults to False.

False

Returns:

Name Type Description
bool bool

Whether any works were deleted.

Source code in workflow/http/buckets.py
Python
def delete_many(
    self,
    pipeline: str,
    status: Optional[str] = None,
    events: Optional[List[int]] = None,
    tags: Optional[List[str]] = None,
    parent: Optional[str] = None,
    force: bool = False,
) -> bool:
    """Delete works belonging to a pipeline from the buckets backend.

    If a status is provided, only works with that status will be deleted.
    If an event number is provided, only works with that event will be deleted.

    Args:
        pipeline (str): The pipeline to delete works from.
        status (Optional[List[str]]): The status to delete works with.
            e.g. ["queued"].
        event (Optional[List[int]]): The event to delete works with.
        force (bool, optional): Whether to force the deletion without requiring
            user confirmation. Defaults to False.

    Returns:
        bool: Whether any works were deleted.
    """
    query: Dict[str, Any] = {"pipeline": pipeline}
    query.update({"status": status} if status else {})
    query.update({"event": {"$in": events}} if events else {})
    query.update({"tags": {"$in": tags}} if tags else {})
    query.update({"config.parent": parent} if parent else {})
    projection = {"id": True}
    result = self.view(query, projection)
    ids: List[str] = []
    if result:
        ids = [work["id"] for work in result]
    # Get user confirmation before deleting
    if ids and not force:
        msg = f"Are you sure you want to delete {len(ids)} works?"
        # Display upto 5 ids only
        msg += "\n\tids: " + ", ".join(ids[:5]) + ("..." if len(ids) > 5 else "")
        msg += "\n\tstatus: " + status if status else ""
        msg += "\n\tevents: " + ", ".join(map(str, events)) if events else ""
        msg += "\n\ttags: " + ", ".join(tags) if tags else ""
        msg += "\n\tparent: " + parent if parent else ""
        force = confirmation(msg)
    if ids and force:
        return self.delete_ids(ids)
    return False

deposit

Python
deposit(
    works: List[Dict[str, Any]], return_ids: bool = False
) -> Union[bool, List[str]]

Deposit works into the buckets backend.

Parameters:

Name Type Description Default
works List[Dict[str, Any]]

The payload from the Work Object.

required
return_ids bool

Whether to return the ids of the works. Defaults to False.

False

Raises:

Type Description
RequestException

If the request fails.

Returns:

Type Description
Union[bool, List[str]]

Union[bool, List[str]]: Whether the request was successful or the ids of the works.

Examples:

from chime_frb_api.buckets import Buckets from chime_frb_api.tasks import Work work = Work(pipeline="sample") buckets.deposit([work.payload]) True buckets.deposit([work.payload], return_ids=True) ["5f9b5e1b7e5c4b5eb1b""]

Source code in workflow/http/buckets.py
Python
@retry(wait=wait_random(min=0.1, max=2), stop=(stop_after_delay(30)))
def deposit(
    self, works: List[Dict[str, Any]], return_ids: bool = False
) -> Union[bool, List[str]]:
    """Deposit works into the buckets backend.

    Args:
        works (List[Dict[str, Any]]): The payload from the Work Object.
        return_ids (bool, optional): Whether to return the ids of the works.
            Defaults to False.

    Raises:
        RequestException: If the request fails.

    Returns:
        Union[bool, List[str]]:
            Whether the request was successful or the ids of the works.

    Examples:
    >>> from chime_frb_api.buckets import Buckets
    >>> from chime_frb_api.tasks import Work
    >>> work = Work(pipeline="sample")
    >>> buckets.deposit([work.payload])
    True
    >>> buckets.deposit([work.payload], return_ids=True)
    ["5f9b5e1b7e5c4b5eb1b""]
    """
    params: Dict[str, Any] = {"return_ids": return_ids}
    with self.session as session:
        response: Response = session.post(
            url=f"{self.baseurl}/work?{urlencode(params)}",
            json=works,
            params=params,
            timeout=self.timeout,
        )
        response.raise_for_status()
    if return_ids:
        return response.json()
    return True

info

Python
info() -> Dict[str, Any]

Get the version of the buckets backend.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The version of the buckets backend.

Source code in workflow/http/buckets.py
Python
def info(self) -> Dict[str, Any]:
    """Get the version of the buckets backend.

    Returns:
        Dict[str, Any]: The version of the buckets backend.
    """
    client_info = self.model_dump()
    with self.session as session:
        response: Response = session.get(url=f"{self.baseurl}/version")
        response.raise_for_status()
    server_info = response.json()
    return {"client": client_info, "server": server_info}

pipelines

Python
pipelines() -> List[str]

View the current pipelines in the buckets backend.

Returns:

Type Description
List[str]

List[str]: The current pipelines.

Source code in workflow/http/buckets.py
Python
def pipelines(self) -> List[str]:
    """View the current pipelines in the buckets backend.

    Returns:
        List[str]: The current pipelines.
    """
    with self.session as session:
        response: Response = session.get(url=f"{self.baseurl}/status/pipelines")
        response.raise_for_status()
    return response.json()

status

Python
status(pipeline: Optional[str] = None) -> Dict[str, Any]

View the status of the buckets backend.

If overall is True, the status of all pipelines will be returned.

Parameters:

Name Type Description Default
pipeline Optional[str]

The pipeline to return the status of.

None

Returns:

Type Description
Dict[str, Any]

List[Dict[str, Any]]: The status of the buckets backend.

Source code in workflow/http/buckets.py
Python
def status(self, pipeline: Optional[str] = None) -> Dict[str, Any]:
    """View the status of the buckets backend.

    If overall is True, the status of all pipelines will be returned.

    Args:
        pipeline (Optional[str], optional): The pipeline to return the status of.

    Returns:
        List[Dict[str, Any]]: The status of the buckets backend.
    """
    url: str = f"{self.baseurl}/status"
    if pipeline:
        url = f"{url}/details/{pipeline}"
    with self.session as session:
        response: Response = session.get(url=url)
        response.raise_for_status()
    return response.json()

update

Python
update(works: List[Dict[str, Any]]) -> bool

Update works in the buckets backend.

Parameters:

Name Type Description Default
works List[Dict[str, Any]]

The payload from the Work Object.

required

Returns:

Name Type Description
bool bool

Whether the works were updated successfully.

Source code in workflow/http/buckets.py
Python
@retry(wait=wait_random(min=0.1, max=2), stop=(stop_after_delay(30)))
def update(self, works: List[Dict[str, Any]]) -> bool:
    """Update works in the buckets backend.

    Args:
        works (List[Dict[str, Any]]): The payload from the Work Object.

    Returns:
        bool: Whether the works were updated successfully.
    """
    with self.session as session:
        response: Response = session.put(url=f"{self.baseurl}/work", json=works)
        response.raise_for_status()
    return response.json()

view

Python
view(
    query: Dict[str, Any],
    projection: Dict[str, bool] = {},
    skip: int = 0,
    limit: Optional[int] = 100,
) -> List[Dict[str, Any]]

View works in the buckets backend.

Parameters:

Name Type Description Default
query Dict[str, Any]

The query to filter the works with.

required
projection Dict[str, bool]

The projection to use to map the output.

{}
skip int

The number of works to skip. Defaults to 0.

0
limit Optional[int]

The number of works to limit to. Defaults to 100.

100

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: The works matching the query.

Source code in workflow/http/buckets.py
Python
def view(
    self,
    query: Dict[str, Any],
    projection: Dict[str, bool] = {},
    skip: int = 0,
    limit: Optional[int] = 100,
) -> List[Dict[str, Any]]:
    """View works in the buckets backend.

    Args:
        query (Dict[str, Any]): The query to filter the works with.
        projection (Dict[str, bool]): The projection to use to map the output.
        skip (int, optional): The number of works to skip. Defaults to 0.
        limit (Optional[int], optional): The number of works to limit to.
            Defaults to 100.

    Returns:
        List[Dict[str, Any]]: The works matching the query.
    """
    projection.update({"_id": False})
    payload = {
        "query": query,
        "projection": projection,
        "skip": skip,
        "limit": limit,
    }
    with self.session as session:
        response: Response = session.post(url=f"{self.baseurl}/view", json=payload)
        response.raise_for_status()
    return response.json()

withdraw

Python
withdraw(
    pipeline: Union[str, List[str]],
    event: Optional[List[int]] = None,
    site: Optional[str] = None,
    priority: Optional[int] = None,
    user: Optional[str] = None,
    tags: Optional[List[str]] = None,
    parent: Optional[List[str]] = None,
) -> Dict[str, Any]

Withdraw queued work from the buckets backend.

Parameters:

Name Type Description Default
pipeline str | List[str]

The pipeline to withdraw work from.

required
event Optional[List[int]]

The event to filter by.

None
site Optional[str]

The site to filter by.

None
priority Optional[int]

The priority to withdraw from.

None
user Optional[str]

The user to filter by.

None
tags Optional[List[str]]

The tags to filter by.

None
parent Optional[str]

The parent to filter by.

None

Raises:

Type Description
RequestException

If the request fails.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The work withdrawn.

Source code in workflow/http/buckets.py
Python
def withdraw(
    self,
    pipeline: Union[str, List[str]],
    event: Optional[List[int]] = None,
    site: Optional[str] = None,
    priority: Optional[int] = None,
    user: Optional[str] = None,
    tags: Optional[List[str]] = None,
    parent: Optional[List[str]] = None,
) -> Dict[str, Any]:
    """Withdraw `queued` work from the buckets backend.

    Args:
        pipeline (str | List[str]): The pipeline to withdraw work from.
        event (Optional[List[int]], optional): The event to filter by.
        site (Optional[str], optional): The site to filter by.
        priority (Optional[int], optional): The priority to withdraw from.
        user (Optional[str], optional): The user to filter by.
        tags (Optional[List[str]], optional): The tags to filter by.
        parent (Optional[str], optional): The parent to filter by.

    Raises:
        RequestException: If the request fails.

    Returns:
        Dict[str, Any]: The work withdrawn.
    """
    if isinstance(pipeline, str):
        pipeline = [pipeline]
    query: Dict[str, Any] = {"pipeline": {"$in": pipeline}}
    query.update({"site": site} if site else {})
    query.update({"priority": priority} if priority else {})
    query.update({"user": user} if user else {})
    query.update({"event": {"$in": event}} if event else {})
    query.update({"tags": {"$in": tags}} if tags else {})
    query.update({"config.parent": {"$in": parent}} if parent else {})
    with self.session as session:
        response: Response = session.post(
            url=f"{self.baseurl}/work/withdraw", json=query
        )
        response.raise_for_status()
    return response.json()