Asynchronous Sessions¶
Skaha now supports asynchronous sessions using the AsyncSession
class while maintaining 1-to-1 compatibility with the Session
class.
Bases: SkahaClient
Asynchronous Skaha Session Management Client.
This class provides methods to manage sessions in the Skaha system, including fetching session details, creating new sessions, retrieving logs, and destroying existing sessions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
SkahaClient
|
SkahaClient
|
Base HTTP client for making API requests. |
required |
Examples:
>>> from skaha.session import AsyncSession
>>> session = AsyncSession(
server="https://skaha.example.com",
version="v1",
token="token",
timeout=30,
concurrency=100,
loglevel=40,
)
create
async
¶
create(name: str, image: str, cores: int = 2, ram: int = 4, kind: KINDS = 'headless', gpu: Optional[int] = None, cmd: Optional[str] = None, args: Optional[str] = None, env: Optional[Dict[str, Any]] = None, replicas: int = 1) -> List[str]
Launch a skaha session.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
A unique name for the session. |
required |
image
|
str
|
Container image to use for the session. |
required |
cores
|
int
|
Number of cores. Defaults to 2. |
2
|
ram
|
int
|
Amount of RAM (GB). Defaults to 4. |
4
|
kind
|
str
|
Type of skaha session. Defaults to "headless". |
'headless'
|
gpu
|
Optional[int]
|
Number of GPUs. Defaults to None. |
None
|
cmd
|
Optional[str]
|
Command to run. Defaults to None. |
None
|
args
|
Optional[str]
|
Arguments to the command. Defaults to None. |
None
|
env
|
Optional[Dict[str, Any]]
|
Environment variables to inject. Defaults to None. |
None
|
replicas
|
int
|
Number of sessions to launch. Defaults to 1. |
1
|
Notes
The name of the session suffixed with the replica number. eg. test-1, test-2 Each container will have the following environment variables injected: * REPLICA_ID - The replica number * REPLICA_COUNT - The total number of replicas
Returns:
Type | Description |
---|---|
List[str]
|
List[str]: A list of session IDs for the launched sessions. |
Examples:
>>> from skaha.session import AsyncSession
>>> session = AsyncSession()
>>> session.create(
name="test",
image='images.canfar.net/skaha/terminal:1.1.1',
cores=2,
ram=8,
gpu=1,
kind="headless",
cmd="env",
env={"TEST": "test"},
replicas=2,
)
>>> ["hjko98yghj", "ikvp1jtp"]
Source code in skaha/session.py
async def create(
self,
name: str,
image: str,
cores: int = 2,
ram: int = 4,
kind: KINDS = "headless",
gpu: Optional[int] = None,
cmd: Optional[str] = None,
args: Optional[str] = None,
env: Optional[Dict[str, Any]] = None,
replicas: int = 1,
) -> List[str]:
"""Launch a skaha session.
Args:
name (str): A unique name for the session.
image (str): Container image to use for the session.
cores (int, optional): Number of cores. Defaults to 2.
ram (int, optional): Amount of RAM (GB). Defaults to 4.
kind (str, optional): Type of skaha session. Defaults to "headless".
gpu (Optional[int], optional): Number of GPUs. Defaults to None.
cmd (Optional[str], optional): Command to run. Defaults to None.
args (Optional[str], optional): Arguments to the command. Defaults to None.
env (Optional[Dict[str, Any]], optional): Environment variables to inject.
Defaults to None.
replicas (int, optional): Number of sessions to launch. Defaults to 1.
Notes:
The name of the session suffixed with the replica number. eg. test-1, test-2
Each container will have the following environment variables injected:
* REPLICA_ID - The replica number
* REPLICA_COUNT - The total number of replicas
Returns:
List[str]: A list of session IDs for the launched sessions.
Examples:
>>> from skaha.session import AsyncSession
>>> session = AsyncSession()
>>> session.create(
name="test",
image='images.canfar.net/skaha/terminal:1.1.1',
cores=2,
ram=8,
gpu=1,
kind="headless",
cmd="env",
env={"TEST": "test"},
replicas=2,
)
>>> ["hjko98yghj", "ikvp1jtp"]
"""
payloads: List[List[Tuple[str, Any]]] = build.create_parameters(
name, image, cores, ram, kind, gpu, cmd, args, env, replicas
)
results: List[str] = []
tasks: List[Any] = []
semaphore: asyncio.Semaphore = asyncio.Semaphore(self.concurrency)
async def bounded(parameters: List[Tuple[str, Any]]) -> Any:
async with semaphore:
response = await self.asynclient.post(url="session", params=parameters)
response.raise_for_status()
return response.text.rstrip("\r\n")
for payload in payloads:
tasks.append(bounded(payload))
log.info("Creating {replicas} {kind} session[s].")
responses = await asyncio.gather(*tasks, return_exceptions=True)
for reply in responses:
if isinstance(reply, Exception):
log.error(reply)
elif isinstance(reply, str):
results.append(reply)
return results
destroy
async
¶
Destroy skaha session[s].
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ids
|
Union[str, List[str]]
|
Session ID[s]. |
required |
Returns:
Type | Description |
---|---|
Dict[str, bool]
|
Dict[str, bool]: A dictionary of session IDs |
Dict[str, bool]
|
and a bool indicating if the session was destroyed. |
Examples:
>>> from skaha.session import AsyncSession
>>> session = AsyncSession()
>>> await session.destroy(id="hjko98yghj")
>>> await session.destroy(id=["hjko98yghj", "ikvp1jtp"])
Source code in skaha/session.py
async def destroy(self, ids: Union[str, List[str]]) -> Dict[str, bool]:
"""Destroy skaha session[s].
Args:
ids (Union[str, List[str]]): Session ID[s].
Returns:
Dict[str, bool]: A dictionary of session IDs
and a bool indicating if the session was destroyed.
Examples:
>>> from skaha.session import AsyncSession
>>> session = AsyncSession()
>>> await session.destroy(id="hjko98yghj")
>>> await session.destroy(id=["hjko98yghj", "ikvp1jtp"])
"""
if isinstance(ids, str):
ids = [ids]
results: Dict[str, bool] = {}
semaphore: asyncio.Semaphore = asyncio.Semaphore(self.concurrency)
tasks: List[Any] = []
async def bounded(value: str) -> Tuple[str, bool]:
async with semaphore:
try:
response = await self.asynclient.delete(url=f"session/{value}")
response.raise_for_status()
return value, True
except HTTPError as err:
log.error(err)
return value, False
tasks = [bounded(value) for value in ids]
responses = await asyncio.gather(*tasks, return_exceptions=True)
for reply in responses:
if isinstance(reply, tuple):
results[reply[0]] = reply[1]
return results
destroy_with
async
¶
destroy_with(prefix: str, kind: KINDS = 'headless', status: STATUS = 'Succeeded') -> Dict[str, bool]
Destroy skaha session[s] matching search criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
prefix
|
str
|
Prefix to match in the session name. |
required |
kind
|
KINDS
|
Type of skaha session. Defaults to "headless". |
'headless'
|
status
|
STATUS
|
Status of the session. Defaults to "Succeeded". |
'Succeeded'
|
Returns:
Type | Description |
---|---|
Dict[str, bool]
|
Dict[str, bool]: A dictionary of session IDs |
Dict[str, bool]
|
and a bool indicating if the session was destroyed. |
Notes
The prefix is case-sensitive. This method is useful for destroying multiple sessions at once.
Examples:
>>> from skaha.session import AsyncSession
>>> session = AsyncSession()
>>> await session.destroy_with(prefix="test")
>>> await session.destroy_with(prefix="test", kind="desktop")
>>> await session.destroy_with(prefix="car", kind="carta", status="Running")
Source code in skaha/session.py
async def destroy_with(
self, prefix: str, kind: KINDS = "headless", status: STATUS = "Succeeded"
) -> Dict[str, bool]:
"""Destroy skaha session[s] matching search criteria.
Args:
prefix (str): Prefix to match in the session name.
kind (KINDS): Type of skaha session. Defaults to "headless".
status (STATUS): Status of the session. Defaults to "Succeeded".
Returns:
Dict[str, bool]: A dictionary of session IDs
and a bool indicating if the session was destroyed.
Notes:
The prefix is case-sensitive.
This method is useful for destroying multiple sessions at once.
Examples:
>>> from skaha.session import AsyncSession
>>> session = AsyncSession()
>>> await session.destroy_with(prefix="test")
>>> await session.destroy_with(prefix="test", kind="desktop")
>>> await session.destroy_with(prefix="car", kind="carta", status="Running")
"""
ids: List[str] = []
for session in await self.fetch(kind=kind, status=status):
if session["name"].startswith(prefix):
ids.append(session["id"])
return await self.destroy(ids)
fetch
async
¶
fetch(kind: Optional[KINDS] = None, status: Optional[STATUS] = None, view: Optional[VIEW] = None) -> List[Dict[str, str]]
List open sessions for the user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kind
|
Optional[KINDS]
|
Session kind. Defaults to None. |
None
|
status
|
Optional[STATUS]
|
Session status. Defaults to None. |
None
|
view
|
Optional[VIEW]
|
Session view level. Defaults to None. |
None
|
Notes
By default, only the calling user's sessions are listed. If views is set to 'all', all user sessions are listed (with limited information).
Returns:
Name | Type | Description |
---|---|---|
list |
List[Dict[str, str]]
|
Sessions information. |
Examples:
>>> from skaha.session import AsyncSession
>>> session = AsyncSession()
>>> await session.fetch(kind="notebook")
[{'id': 'vl91sfzz',
'userid': 'brars',
'runAsUID': '166169204',
'runAsGID': '166169204',
'supplementalGroups': [34241,
34337,
35124,
36227,
1902365706,
1454823273,
1025424273],
'appid': '<none>',
'image': 'images-rc.canfar.net/skaha/skaha-notebook:22.09-test',
'type': 'notebook',
'status': 'Running',
'name': 'notebook1',
'startTime': '2025-03-05T21:48:29Z',
'expiryTime': '2025-03-09T21:48:29Z',
'connectURL': 'https://canfar.net/session/notebook/some/url',
'requestedRAM': '8G',
'requestedCPUCores': '2',
'requestedGPUCores': '0',
'ramInUse': '<none>',
'gpuRAMInUse': '<none>',
'cpuCoresInUse': '<none>',
'gpuUtilization': '<none>'}]
Source code in skaha/session.py
async def fetch(
self,
kind: Optional[KINDS] = None,
status: Optional[STATUS] = None,
view: Optional[VIEW] = None,
) -> List[Dict[str, str]]:
"""List open sessions for the user.
Args:
kind (Optional[KINDS], optional): Session kind. Defaults to None.
status (Optional[STATUS], optional): Session status. Defaults to None.
view (Optional[VIEW], optional): Session view level. Defaults to None.
Notes:
By default, only the calling user's sessions are listed. If views is
set to 'all', all user sessions are listed (with limited information).
Returns:
list: Sessions information.
Examples:
>>> from skaha.session import AsyncSession
>>> session = AsyncSession()
>>> await session.fetch(kind="notebook")
[{'id': 'vl91sfzz',
'userid': 'brars',
'runAsUID': '166169204',
'runAsGID': '166169204',
'supplementalGroups': [34241,
34337,
35124,
36227,
1902365706,
1454823273,
1025424273],
'appid': '<none>',
'image': 'images-rc.canfar.net/skaha/skaha-notebook:22.09-test',
'type': 'notebook',
'status': 'Running',
'name': 'notebook1',
'startTime': '2025-03-05T21:48:29Z',
'expiryTime': '2025-03-09T21:48:29Z',
'connectURL': 'https://canfar.net/session/notebook/some/url',
'requestedRAM': '8G',
'requestedCPUCores': '2',
'requestedGPUCores': '0',
'ramInUse': '<none>',
'gpuRAMInUse': '<none>',
'cpuCoresInUse': '<none>',
'gpuUtilization': '<none>'}]
"""
parameters: Dict[str, Any] = build.fetch_parameters(kind, status, view)
response: Response = await self.asynclient.get(url="session", params=parameters)
response.raise_for_status()
return response.json()
info
async
¶
Get information about session[s].
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id
|
Union[List[str], str]
|
Session ID[s]. |
required |
Returns:
Type | Description |
---|---|
List[Dict[str, Any]]
|
Dict[str, Any]: Session information. |
Examples:
>>> from skaha.session import AsyncSession
>>> session = AsyncSession()
>>> await session.info(session_id="hjko98yghj")
>>> await session.info(id=["hjko98yghj", "ikvp1jtp"])
Source code in skaha/session.py
async def info(self, ids: Union[List[str], str]) -> List[Dict[str, Any]]:
"""Get information about session[s].
Args:
id (Union[List[str], str]): Session ID[s].
Returns:
Dict[str, Any]: Session information.
Examples:
>>> from skaha.session import AsyncSession
>>> session = AsyncSession()
>>> await session.info(session_id="hjko98yghj")
>>> await session.info(id=["hjko98yghj", "ikvp1jtp"])
"""
# Convert id to list if it is a string
if isinstance(ids, str):
ids = [ids]
parameters: Dict[str, str] = {"view": "event"}
results: List[Dict[str, Any]] = []
tasks: List[Any] = []
semaphore: asyncio.Semaphore = asyncio.Semaphore(self.concurrency)
async def bounded(value: str) -> Dict[str, Any]:
async with semaphore:
response = await self.asynclient.get(
url=f"session/{value}", params=parameters
)
response.raise_for_status()
return response.json()
tasks = [bounded(value) for value in ids]
responses = await asyncio.gather(*tasks, return_exceptions=True)
for reply in responses:
if isinstance(reply, Exception):
log.error(reply)
elif isinstance(reply, dict):
results.append(reply) # type: ignore
return results
logs
async
¶
Get logs from a session[s].
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ids
|
Union[List[str], str]
|
Session ID[s]. |
required |
verbose
|
bool
|
Print logs to stdout. Defaults to False. |
False
|
Returns:
Type | Description |
---|---|
Optional[Dict[str, str]]
|
Dict[str, str]: Logs in text/plain format. |
Examples:
>>> from skaha.session import AsyncSession
>>> session = AsyncSession()
>>> await session.logs(id="hjko98yghj")
>>> await session.logs(id=["hjko98yghj", "ikvp1jtp"])
Source code in skaha/session.py
async def logs(
self, ids: Union[List[str], str], verbose: bool = False
) -> Optional[Dict[str, str]]:
"""Get logs from a session[s].
Args:
ids (Union[List[str], str]): Session ID[s].
verbose (bool, optional): Print logs to stdout. Defaults to False.
Returns:
Dict[str, str]: Logs in text/plain format.
Examples:
>>> from skaha.session import AsyncSession
>>> session = AsyncSession()
>>> await session.logs(id="hjko98yghj")
>>> await session.logs(id=["hjko98yghj", "ikvp1jtp"])
"""
if isinstance(ids, str):
ids = [ids]
parameters: Dict[str, str] = {"view": "logs"}
results: Dict[str, str] = {}
semaphore: asyncio.Semaphore = asyncio.Semaphore(self.concurrency)
tasks: List[Any] = []
async def bounded(value: str) -> Tuple[str, str]:
async with semaphore:
response = await self.asynclient.get(
url=f"session/{value}", params=parameters
)
response.raise_for_status()
return value, response.text
tasks = [bounded(value) for value in ids]
responses = await asyncio.gather(*tasks, return_exceptions=True)
for reply in responses:
if isinstance(reply, Exception):
log.error(reply)
elif isinstance(reply, tuple):
results[reply[0]] = reply[1]
# Print logs to stdout if verbose is set to True
if verbose:
for key, value in results.items():
log.info("Session ID: %s\n", key)
logs.stdout(value)
return None
return results
stats
async
¶
Get statistics for the entire skaha cluster.
Returns:
Type | Description |
---|---|
Dict[str, Any]
|
Dict[str, Any]: Cluster statistics. |
Examples:
>>> from skaha.session import AsyncSession
>>> session = AsyncSession()
>>> await session.stats()
{'instances': {
'session': 88, 'desktopApp': 30, 'headless': 0, 'total': 118},
'cores': {'requestedCPUCores': 377,
'coresAvailable': 960,
'maxCores': {'cores': 32, 'withRam': '147Gi'}},
'ram': {'maxRAM': {'ram': '226Gi', 'withCores': 32}}}
Source code in skaha/session.py
async def stats(self) -> Dict[str, Any]:
"""Get statistics for the entire skaha cluster.
Returns:
Dict[str, Any]: Cluster statistics.
Examples:
>>> from skaha.session import AsyncSession
>>> session = AsyncSession()
>>> await session.stats()
{'instances': {
'session': 88, 'desktopApp': 30, 'headless': 0, 'total': 118},
'cores': {'requestedCPUCores': 377,
'coresAvailable': 960,
'maxCores': {'cores': 32, 'withRam': '147Gi'}},
'ram': {'maxRAM': {'ram': '226Gi', 'withCores': 32}}}
"""
parameters = {"view": "stats"}
response: Response = await self.asynclient.get("session", params=parameters)
response.raise_for_status()
return response.json()