Jobs API¶
The Job and JobList classes provide a high-level interface for working with tool executions (jobs) on Deep Origin.
JobList¶
JobList represents a collection of jobs that can be monitored and managed together. It's especially useful for managing batch jobs like Docking, where a set of ligands can be batched into multiple executions on multiple resources.
Creating a JobList¶
from deeporigin.platform.job import JobList
# Fetch jobs from the API
jobs = JobList.list()
# Create from a list of job IDs
jobs = JobList.from_ids(["id-1", "id-2", "id-3"])
# Create from execution DTOs
jobs = JobList.from_dtos([dto1, dto2, dto3])
Monitoring Jobs¶
The watch() method allows you to monitor multiple jobs in real-time. It will automatically stop when all jobs reach terminal states:
# Start monitoring a list of jobs
jobs = JobList.from_ids(["id-1", "id-2", "id-3"])
jobs.watch() # Updates every 5 seconds by default
# Custom update interval
jobs.watch(interval=10) # Update every 10 seconds
# Stop monitoring manually
jobs.stop_watching()
The watch() method will:
- Display an initial status view
- Periodically sync all jobs and update the display
- Automatically stop when all jobs are in terminal states (Succeeded, Failed, Cancelled, etc.)
- Handle errors gracefully and continue monitoring
Filtering Jobs¶
The filter() method allows you to filter jobs by status, attributes, or custom predicates:
Filter by status:
# Get only succeeded jobs
succeeded_jobs = jobs.filter(status="Succeeded")
# Get only running jobs
running_jobs = jobs.filter(status="Running")
Filter by tool attributes:
# Filter by tool key
docking_jobs = jobs.filter(tool_key="deeporigin.bulk-docking")
# Filter by tool version
v1_jobs = jobs.filter(tool_version="1.0.0")
# Filter by both tool key and version
specific_tool = jobs.filter(tool_key="deeporigin.abfe-end-to-end", tool_version="1.0.0")
Filter by other attributes:
# Filter by execution ID
specific_job = jobs.filter(executionId="id-123")
# Filter by multiple attributes (AND logic)
filtered = jobs.filter(status="Running", executionId="id-123")
Filter with custom predicate:
# Filter jobs with approveAmount > 100
expensive_jobs = jobs.filter(
predicate=lambda job: job._attributes.get("approveAmount", 0) > 100
)
# Filter by nested attribute (tool.key)
tool_jobs = jobs.filter(
predicate=lambda job: job._attributes.get("tool", {}).get("key") == "tool1"
)
Combine filters:
# Status filter + tool filter + custom predicate
complex_filter = jobs.filter(
status="Running",
tool_key="deeporigin.docking",
predicate=lambda job: "error" not in str(
job._attributes.get("progressReport", "")
)
)
# Status + tool key + tool version
specific_jobs = jobs.filter(
status="Succeeded",
tool_key="deeporigin.abfe-end-to-end",
tool_version="1.0.0"
)
src.platform.job.JobList
¶
Represents a collection of Jobs that can be monitored and managed together.
This class provides methods to track, visualize, and manage multiple jobs as a single unit, and is especially useful for managing batch jobs like Docking, where a set of ligands can be batched into multiple executions on multiple resources.
Attributes¶
status
property
¶
status: dict[str, int]
Get a breakdown of the statuses of all jobs in the list.
Returns:
| Type | Description |
|---|---|
dict[str, int]
|
A dictionary mapping status strings to counts. |
Functions¶
cancel
¶
cancel(max_workers: int = 4)
Cancel all jobs in the list in parallel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_workers
|
int
|
The maximum number of threads to use for parallel execution. |
4
|
confirm
¶
confirm(max_workers: int = 4)
Confirm all jobs in the list in parallel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_workers
|
int
|
The maximum number of threads to use for parallel execution. |
4
|
filter
¶
filter(
*,
status: Optional[str | list[str] | set[str]] = None,
tool_key: Optional[str] = None,
tool_version: Optional[str] = None,
require_metadata: bool = False,
predicate: Optional[Callable[[Job], bool]] = None,
**kwargs: Any
) -> "JobList"
Filter jobs by status, tool attributes, other attributes, or custom predicate.
This method returns a new JobList containing only jobs that match the specified criteria. Multiple filters can be combined - keyword arguments are applied first (with AND logic), then the predicate function is applied if provided.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
status
|
Optional[str | list[str] | set[str]]
|
Filter by job status. Can be a single status string (e.g., "Succeeded"), or a list/set of statuses (e.g., ["Succeeded", "Running", "Queued"]). Checks against job.status property. |
None
|
tool_key
|
Optional[str]
|
Filter by tool key (e.g., "deeporigin.docking", "deeporigin.abfe-end-to-end").
Checks against |
None
|
tool_version
|
Optional[str]
|
Filter by tool version (e.g., "1.0.0").
Checks against |
None
|
require_metadata
|
bool
|
If True, only include jobs that have metadata that exists and is not None. |
False
|
predicate
|
Optional[Callable[[Job], bool]]
|
Optional callable that takes a Job and returns True/False. Applied after keyword filters. Useful for complex conditions or accessing nested attributes. |
None
|
**kwargs
|
Any
|
Additional filters on job._attributes keys. Each keyword argument is treated as a key in _attributes, and the value must match exactly (equality check). |
{}
|
Returns:
| Type | Description |
|---|---|
'JobList'
|
A new JobList instance containing only matching jobs. |
Examples:
Filter by status::
succeeded_jobs = jobs.filter(status="Succeeded")
running_jobs = jobs.filter(status="Running")
multiple_statuses = jobs.filter(status=["Succeeded", "Running", "Queued"])
Filter by tool attributes::
docking_jobs = jobs.filter(tool_key="deeporigin.docking")
specific_version = jobs.filter(tool_key="deeporigin.abfe-end-to-end", tool_version="1.0.0")
Filter by multiple attributes::
specific_job = jobs.filter(status="Running", executionId="id-123")
Filter with custom predicate::
expensive_jobs = jobs.filter(
predicate=lambda job: job._attributes.get("approveAmount", 0) > 100
)
Combine filters::
# Status filter + tool filter + custom predicate
complex_filter = jobs.filter(
status="Running",
tool_key="deeporigin.docking",
predicate=lambda job: "error" not in str(
job._attributes.get("progressReport", "")
)
)
from_dtos
classmethod
¶
from_dtos(
dtos: list[dict],
*,
client: Optional[DeepOriginClient] = None
) -> "JobList"
Create a JobList from a list of execution DTOs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dtos
|
list[dict]
|
A list of execution DTOs. |
required |
client
|
Optional[DeepOriginClient]
|
Optional client for API calls. |
None
|
Returns:
| Type | Description |
|---|---|
'JobList'
|
A new JobList instance. |
from_ids
classmethod
¶
from_ids(
ids: list[str],
*,
client: Optional[DeepOriginClient] = None
) -> "JobList"
Create a JobList from a list of job IDs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ids
|
list[str]
|
A list of job IDs. |
required |
client
|
Optional[DeepOriginClient]
|
Optional client for API calls. |
None
|
Returns:
| Type | Description |
|---|---|
'JobList'
|
A new JobList instance. |
list
classmethod
¶
list(
*,
page: Optional[int] = None,
page_size: int = 1000,
order: Optional[str] = None,
filter: Optional[str] = None,
client: Optional[DeepOriginClient] = None
) -> "JobList"
Fetch executions from the API and return a JobList.
This method automatically handles pagination, fetching all pages if necessary and combining them into a single JobList.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
page
|
Optional[int]
|
Page number to start from (default 0). If None, starts from page 0. |
None
|
page_size
|
int
|
Page size of the pagination (max 10,000). |
1000
|
order
|
Optional[str]
|
Order of the pagination, e.g., "executionId? asc", "completedAt? desc". |
None
|
filter
|
Optional[str]
|
Filter applied to the data set Execution Model. |
None
|
client
|
Optional[DeepOriginClient]
|
Optional client for API calls. |
None
|
Returns:
| Type | Description |
|---|---|
'JobList'
|
A new JobList instance containing the fetched jobs. |
show
¶
show()
Display the job list view in a Jupyter notebook.
This method renders the job list view and displays it in a Jupyter notebook.
stop_watching
¶
stop_watching()
Stop the background monitoring task.
This method safely cancels and cleans up any running monitoring task. It is called automatically when all jobs reach terminal states, or can be called manually to stop monitoring.
sync
¶
sync(max_workers: int = 4)
Synchronize all jobs in the list in parallel.
This method updates the internal state of all jobs by fetching the latest status and progress report for each job ID in parallel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_workers
|
int
|
The maximum number of threads to use for parallel execution. |
4
|
to_dataframe
¶
to_dataframe(
*,
include_metadata: bool = False,
include_inputs: bool = False,
include_outputs: bool = False,
resolve_user_names: bool = False,
client: Optional[DeepOriginClient] = None
) -> DataFrame
Convert the JobList to a pandas DataFrame.
Extracts data from each job's _attributes dictionary and creates a DataFrame with the default columns: id, created_at, resource_id, completed_at, started_at, status, tool_key, tool_version, user_name, and run_duration_minutes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
include_metadata
|
bool
|
If True, include metadata column in the DataFrame. |
False
|
include_inputs
|
bool
|
If True, include user_inputs column in the DataFrame. |
False
|
include_outputs
|
bool
|
If True, include user_outputs column in the DataFrame. |
False
|
resolve_user_names
|
bool
|
If True, resolve user IDs to user names. Requires fetching users from the API. |
False
|
client
|
Optional[DeepOriginClient]
|
Optional client for API calls. Required if resolve_user_names is True. |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
A pandas DataFrame with one row per job. |
watch
¶
watch(*, interval: float = 5.0)
Start monitoring job list progress in real-time.
This method initiates a background task that periodically updates and displays the status of all jobs in the list. It will automatically stop when all jobs reach a terminal state (Succeeded, Failed, etc.). If all jobs are already in terminal states, it will display a message and show the current state once.
Job¶
Job represents a single computational job that can be monitored and managed.
src.platform.job.Job
dataclass
¶
Represents a single computational job that can be monitored and managed.
This class provides methods to track, visualize, and parse the status and progress of a job, with optional real-time updates (e.g., in Jupyter notebooks).
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
Name of the job. |
Attributes¶
Functions¶
cancel
¶
cancel() -> None
Cancel the job being tracked by this instance.
This method sends a cancellation request for the job ID tracked by this instance using the utils.cancel_runs function.
confirm
¶
confirm()
Confirm the job being tracked by this instance.
This method confirms the job being tracked by this instance, and requests the job to be started.
from_dto
classmethod
¶
from_dto(
dto: dict, *, client: Optional[DeepOriginClient] = None
) -> "Job"
Create a Job instance from an execution DTO (Data Transfer Object).
This method constructs a Job from the full execution description without making a network request. It is faster than from_id() when you already have the execution data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dto
|
dict
|
Dictionary containing the full execution description from the API. Must contain at least 'executionId' and 'status' fields. |
required |
client
|
Optional[DeepOriginClient]
|
Optional client for API calls. |
None
|
Returns:
| Type | Description |
|---|---|
'Job'
|
A new Job instance constructed from the DTO. |
from_id
classmethod
¶
from_id(
id: str, *, client: Optional[DeepOriginClient] = None
) -> "Job"
Create a Job instance from a single ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
str
|
Job ID to track. |
required |
client
|
Optional[DeepOriginClient]
|
Optional client for API calls. |
None
|
Returns:
| Type | Description |
|---|---|
'Job'
|
A new Job instance with the given ID. |
show
¶
show()
Display the job view in a Jupyter notebook.
This method renders the job view and displays it in a Jupyter notebook.
stop_watching
¶
stop_watching()
Stop the background monitoring task.
This method safely cancels and cleans up any running monitoring task. It is called automatically when all jobs reach a terminal state, or can be called manually to stop monitoring.
sync
¶
sync()
Synchronize the job status and progress report.
This method updates the internal state by fetching the latest status and progress report for the job ID. It skips jobs that have already reached a terminal state (Succeeded or Failed).
watch
¶
watch(*, interval: float = 5.0)
Start monitoring job progress in real-time.
This method initiates a background task that periodically updates and displays the job status. It will automatically stop when the job reaches a terminal state (Succeeded or Failed). If there is no active job to monitor, it will display a message and show the current state once.