Skip to content

Jobs API

The Job and JobList classes provide a high-level interface for working with tool executions (jobs) on Deep Origin.

JobList

JobList represents a collection of jobs that can be monitored and managed together. It's especially useful for managing batch jobs like Docking, where a set of ligands can be batched into multiple executions on multiple resources.

Creating a JobList

from deeporigin.platform.job import JobList

# Fetch jobs from the API
jobs = JobList.list()

# Create from a list of job IDs
jobs = JobList.from_ids(["id-1", "id-2", "id-3"])

# Create from execution DTOs
jobs = JobList.from_dtos([dto1, dto2, dto3])

Monitoring Jobs

The watch() method allows you to monitor multiple jobs in real-time. It will automatically stop when all jobs reach terminal states:

# Start monitoring a list of jobs
jobs = JobList.from_ids(["id-1", "id-2", "id-3"])
jobs.watch()  # Updates every 5 seconds by default

# Custom update interval
jobs.watch(interval=10)  # Update every 10 seconds

# Stop monitoring manually
jobs.stop_watching()

The watch() method will: - Display an initial status view - Periodically sync all jobs and update the display - Automatically stop when all jobs are in terminal states (Succeeded, Failed, Cancelled, etc.) - Handle errors gracefully and continue monitoring

Filtering Jobs

The filter() method allows you to filter jobs by status, attributes, or custom predicates:

Filter by status:

# Get only succeeded jobs
succeeded_jobs = jobs.filter(status="Succeeded")

# Get only running jobs
running_jobs = jobs.filter(status="Running")

Filter by tool attributes:

# Filter by tool key
docking_jobs = jobs.filter(tool_key="deeporigin.bulk-docking")

# Filter by tool version
v1_jobs = jobs.filter(tool_version="1.0.0")

# Filter by both tool key and version
specific_tool = jobs.filter(tool_key="deeporigin.abfe-end-to-end", tool_version="1.0.0")

Filter by other attributes:

# Filter by execution ID
specific_job = jobs.filter(executionId="id-123")

# Filter by multiple attributes (AND logic)
filtered = jobs.filter(status="Running", executionId="id-123")

Filter with custom predicate:

# Filter jobs with approveAmount > 100
expensive_jobs = jobs.filter(
    predicate=lambda job: job._attributes.get("approveAmount", 0) > 100
)

# Filter by nested attribute (tool.key)
tool_jobs = jobs.filter(
    predicate=lambda job: job._attributes.get("tool", {}).get("key") == "tool1"
)

Combine filters:

# Status filter + tool filter + custom predicate
complex_filter = jobs.filter(
    status="Running",
    tool_key="deeporigin.docking",
    predicate=lambda job: "error" not in str(
        job._attributes.get("progressReport", "")
    )
)

# Status + tool key + tool version
specific_jobs = jobs.filter(
    status="Succeeded",
    tool_key="deeporigin.abfe-end-to-end",
    tool_version="1.0.0"
)

src.platform.job.JobList

Represents a collection of Jobs that can be monitored and managed together.

This class provides methods to track, visualize, and manage multiple jobs as a single unit, and is especially useful for managing batch jobs like Docking, where a set of ligands can be batched into multiple executions on multiple resources.

Attributes

jobs instance-attribute

jobs = jobs

status property

status: dict[str, int]

Get a breakdown of the statuses of all jobs in the list.

Returns:

Type Description
dict[str, int]

A dictionary mapping status strings to counts.

Functions

cancel

cancel(max_workers: int = 4)

Cancel all jobs in the list in parallel.

Parameters:

Name Type Description Default
max_workers int

The maximum number of threads to use for parallel execution.

4

confirm

confirm(max_workers: int = 4)

Confirm all jobs in the list in parallel.

Parameters:

Name Type Description Default
max_workers int

The maximum number of threads to use for parallel execution.

4

filter

filter(
    *,
    status: Optional[str | list[str] | set[str]] = None,
    tool_key: Optional[str] = None,
    tool_version: Optional[str] = None,
    require_metadata: bool = False,
    predicate: Optional[Callable[[Job], bool]] = None,
    **kwargs: Any
) -> "JobList"

Filter jobs by status, tool attributes, other attributes, or custom predicate.

This method returns a new JobList containing only jobs that match the specified criteria. Multiple filters can be combined - keyword arguments are applied first (with AND logic), then the predicate function is applied if provided.

Parameters:

Name Type Description Default
status Optional[str | list[str] | set[str]]

Filter by job status. Can be a single status string (e.g., "Succeeded"), or a list/set of statuses (e.g., ["Succeeded", "Running", "Queued"]). Checks against job.status property.

None
tool_key Optional[str]

Filter by tool key (e.g., "deeporigin.docking", "deeporigin.abfe-end-to-end"). Checks against job._attributes["tool"]["key"].

None
tool_version Optional[str]

Filter by tool version (e.g., "1.0.0"). Checks against job._attributes["tool"]["version"].

None
require_metadata bool

If True, only include jobs that have metadata that exists and is not None.

False
predicate Optional[Callable[[Job], bool]]

Optional callable that takes a Job and returns True/False. Applied after keyword filters. Useful for complex conditions or accessing nested attributes.

None
**kwargs Any

Additional filters on job._attributes keys. Each keyword argument is treated as a key in _attributes, and the value must match exactly (equality check).

{}

Returns:

Type Description
'JobList'

A new JobList instance containing only matching jobs.

Examples:

Filter by status::

succeeded_jobs = jobs.filter(status="Succeeded")
running_jobs = jobs.filter(status="Running")
multiple_statuses = jobs.filter(status=["Succeeded", "Running", "Queued"])

Filter by tool attributes::

docking_jobs = jobs.filter(tool_key="deeporigin.docking")
specific_version = jobs.filter(tool_key="deeporigin.abfe-end-to-end", tool_version="1.0.0")

Filter by multiple attributes::

specific_job = jobs.filter(status="Running", executionId="id-123")

Filter with custom predicate::

expensive_jobs = jobs.filter(
    predicate=lambda job: job._attributes.get("approveAmount", 0) > 100
)

Combine filters::

# Status filter + tool filter + custom predicate
complex_filter = jobs.filter(
    status="Running",
    tool_key="deeporigin.docking",
    predicate=lambda job: "error" not in str(
        job._attributes.get("progressReport", "")
    )
)

from_dtos classmethod

from_dtos(
    dtos: list[dict],
    *,
    client: Optional[DeepOriginClient] = None
) -> "JobList"

Create a JobList from a list of execution DTOs.

Parameters:

Name Type Description Default
dtos list[dict]

A list of execution DTOs.

required
client Optional[DeepOriginClient]

Optional client for API calls.

None

Returns:

Type Description
'JobList'

A new JobList instance.

from_ids classmethod

from_ids(
    ids: list[str],
    *,
    client: Optional[DeepOriginClient] = None
) -> "JobList"

Create a JobList from a list of job IDs.

Parameters:

Name Type Description Default
ids list[str]

A list of job IDs.

required
client Optional[DeepOriginClient]

Optional client for API calls.

None

Returns:

Type Description
'JobList'

A new JobList instance.

list classmethod

list(
    *,
    page: Optional[int] = None,
    page_size: int = 1000,
    order: Optional[str] = None,
    filter: Optional[str] = None,
    client: Optional[DeepOriginClient] = None
) -> "JobList"

Fetch executions from the API and return a JobList.

This method automatically handles pagination, fetching all pages if necessary and combining them into a single JobList.

Parameters:

Name Type Description Default
page Optional[int]

Page number to start from (default 0). If None, starts from page 0.

None
page_size int

Page size of the pagination (max 10,000).

1000
order Optional[str]

Order of the pagination, e.g., "executionId? asc", "completedAt? desc".

None
filter Optional[str]

Filter applied to the data set Execution Model.

None
client Optional[DeepOriginClient]

Optional client for API calls.

None

Returns:

Type Description
'JobList'

A new JobList instance containing the fetched jobs.

show

show()

Display the job list view in a Jupyter notebook.

This method renders the job list view and displays it in a Jupyter notebook.

stop_watching

stop_watching()

Stop the background monitoring task.

This method safely cancels and cleans up any running monitoring task. It is called automatically when all jobs reach terminal states, or can be called manually to stop monitoring.

sync

sync(max_workers: int = 4)

Synchronize all jobs in the list in parallel.

This method updates the internal state of all jobs by fetching the latest status and progress report for each job ID in parallel.

Parameters:

Name Type Description Default
max_workers int

The maximum number of threads to use for parallel execution.

4

to_dataframe

to_dataframe(
    *,
    include_metadata: bool = False,
    include_inputs: bool = False,
    include_outputs: bool = False,
    resolve_user_names: bool = False,
    client: Optional[DeepOriginClient] = None
) -> DataFrame

Convert the JobList to a pandas DataFrame.

Extracts data from each job's _attributes dictionary and creates a DataFrame with the default columns: id, created_at, resource_id, completed_at, started_at, status, tool_key, tool_version, user_name, and run_duration_minutes.

Parameters:

Name Type Description Default
include_metadata bool

If True, include metadata column in the DataFrame.

False
include_inputs bool

If True, include user_inputs column in the DataFrame.

False
include_outputs bool

If True, include user_outputs column in the DataFrame.

False
resolve_user_names bool

If True, resolve user IDs to user names. Requires fetching users from the API.

False
client Optional[DeepOriginClient]

Optional client for API calls. Required if resolve_user_names is True.

None

Returns:

Type Description
DataFrame

A pandas DataFrame with one row per job.

watch

watch(*, interval: float = 5.0)

Start monitoring job list progress in real-time.

This method initiates a background task that periodically updates and displays the status of all jobs in the list. It will automatically stop when all jobs reach a terminal state (Succeeded, Failed, etc.). If all jobs are already in terminal states, it will display a message and show the current state once.

Job

Job represents a single computational job that can be monitored and managed.

src.platform.job.Job dataclass

Represents a single computational job that can be monitored and managed.

This class provides methods to track, visualize, and parse the status and progress of a job, with optional real-time updates (e.g., in Jupyter notebooks).

Attributes:

Name Type Description
name str

Name of the job.

Attributes

client class-attribute instance-attribute

client: Optional[DeepOriginClient] = None

name instance-attribute

name: str

status class-attribute instance-attribute

status: Optional[str] = None

Functions

cancel

cancel() -> None

Cancel the job being tracked by this instance.

This method sends a cancellation request for the job ID tracked by this instance using the utils.cancel_runs function.

confirm

confirm()

Confirm the job being tracked by this instance.

This method confirms the job being tracked by this instance, and requests the job to be started.

from_dto classmethod

from_dto(
    dto: dict, *, client: Optional[DeepOriginClient] = None
) -> "Job"

Create a Job instance from an execution DTO (Data Transfer Object).

This method constructs a Job from the full execution description without making a network request. It is faster than from_id() when you already have the execution data.

Parameters:

Name Type Description Default
dto dict

Dictionary containing the full execution description from the API. Must contain at least 'executionId' and 'status' fields.

required
client Optional[DeepOriginClient]

Optional client for API calls.

None

Returns:

Type Description
'Job'

A new Job instance constructed from the DTO.

from_id classmethod

from_id(
    id: str, *, client: Optional[DeepOriginClient] = None
) -> "Job"

Create a Job instance from a single ID.

Parameters:

Name Type Description Default
id str

Job ID to track.

required
client Optional[DeepOriginClient]

Optional client for API calls.

None

Returns:

Type Description
'Job'

A new Job instance with the given ID.

show

show()

Display the job view in a Jupyter notebook.

This method renders the job view and displays it in a Jupyter notebook.

stop_watching

stop_watching()

Stop the background monitoring task.

This method safely cancels and cleans up any running monitoring task. It is called automatically when all jobs reach a terminal state, or can be called manually to stop monitoring.

sync

sync()

Synchronize the job status and progress report.

This method updates the internal state by fetching the latest status and progress report for the job ID. It skips jobs that have already reached a terminal state (Succeeded or Failed).

watch

watch(*, interval: float = 5.0)

Start monitoring job progress in real-time.

This method initiates a background task that periodically updates and displays the job status. It will automatically stop when the job reaches a terminal state (Succeeded or Failed). If there is no active job to monitor, it will display a message and show the current state once.