atomscale.similarity.polling#

Polling utilities for similarity trajectory data.

Functions

aiter_poll_trajectory(client, source_id, *)

Asynchronously poll similarity trajectory data without blocking the loop.

iter_poll_trajectory(client, source_id, *[, ...])

Synchronously poll similarity trajectory data, yielding DataFrames.

start_polling_trajectory_task(client, ...[, ...])

Start polling trajectory data as an asyncio.Task.

start_polling_trajectory_thread(client, ...)

Start polling trajectory data in a background daemon thread.

atomscale.similarity.polling.iter_poll_trajectory(client, source_id: str, *, interval: float = 1.0, last_n: int | None = None, distinct_by: Callable[[DataFrame], Any] | None = None, until: Callable[[DataFrame], bool] | None = None, max_polls: int | None = None, fire_immediately: bool = True, jitter: float = 0.0, on_error: Callable[[BaseException], None] | None = None) Iterator[DataFrame][source]

Synchronously poll similarity trajectory data, yielding DataFrames.

Parameters:
  • client – API client instance forwarded to the provider.

  • source_id (str) – The data_id or physical_sample_id to poll trajectory for.

  • interval (float) – Seconds between polls. Defaults to 1.0.

  • last_n (int | None) – Last number of trajectory data points to poll. None is all.

  • distinct_by (Callable[[DataFrame], Any] | None) – Optional function mapping a result to a hashable key for deduping. If provided, only results with a new key are yielded.

  • until (Callable[[DataFrame], bool] | None) – Optional predicate; stop when it returns True for a result. If None, defaults to stopping when no trajectory is active (i.e., not df["Active"].any()).

  • max_polls (int | None) – Optional maximum number of polls before stopping.

  • fire_immediately (bool) – If True, perform the first poll immediately; otherwise wait one interval before the first poll. Defaults to True.

  • jitter (float) – Optional random delay (0..jitter) added to each sleep to avoid thundering herds. Clamped at interval. Defaults to 0.0.

  • on_error (Callable[[BaseException], None] | None) – Optional error handler called with the raised exception when a poll fails. Errors are swallowed so polling continues.

Yields:

DataFrame – Trajectory data with multi-index [“Reference ID”, “Time”].

Return type:

Iterator[DataFrame]

Notes

  • Uses drift-corrected scheduling to maintain the requested cadence even if individual polls are slow.

  • Stops when until is satisfied or max_polls is reached (if set).

async atomscale.similarity.polling.aiter_poll_trajectory(client, source_id: str, *, interval: float = 1.0, last_n: int | None = None, distinct_by: Callable[[DataFrame], Any] | None = None, until: Callable[[DataFrame], bool] | None = None, max_polls: int | None = None, fire_immediately: bool = True, jitter: float = 0.0, on_error: Callable[[BaseException], None] | None = None) AsyncIterator[DataFrame][source]

Asynchronously poll similarity trajectory data without blocking the loop.

Uses the same semantics as iter_poll_trajectory.

Parameters:
  • client – API client instance forwarded to the provider.

  • source_id (str) – The data_id or physical_sample_id to poll trajectory for.

  • interval (float) – Seconds between polls. Defaults to 1.0.

  • last_n (int | None) – Last number of trajectory data points to poll. None is all.

  • distinct_by (Callable[[DataFrame], Any] | None) – Optional function mapping a result to a hashable key for deduping. If provided, only results with a new key are yielded.

  • until (Callable[[DataFrame], bool] | None) – Optional predicate; stop when it returns True for a result. If None, defaults to stopping when no trajectory is active (i.e., not df["Active"].any()).

  • max_polls (int | None) – Optional maximum number of polls before stopping.

  • fire_immediately (bool) – If True, perform the first poll immediately; otherwise wait one interval before the first poll. Defaults to True.

  • jitter (float) – Optional random delay (0..jitter) added to each sleep to avoid thundering herds. Clamped at interval. Defaults to 0.0.

  • on_error (Callable[[BaseException], None] | None) – Optional error handler called with the raised exception when a poll fails. Errors are swallowed so polling continues.

Yields:

DataFrame – Trajectory data with multi-index [“Reference ID”, “Time”].

Return type:

AsyncIterator[DataFrame]

Notes

  • Uses asyncio.to_thread so provider calls never block the event loop.

  • Drift-corrected scheduling preserves cadence even with slow polls.

  • Stops when until is satisfied or max_polls is reached (if set).

atomscale.similarity.polling.start_polling_trajectory_thread(client, source_id: str, *, interval: float = 1.0, last_n: int | None = None, on_result: Callable[[DataFrame], None], **kwargs) Event[source]

Start polling trajectory data in a background daemon thread.

Wraps iter_poll_trajectory in a daemon thread and invokes on_result(result) for each yielded item. Returns a threading.Event that can be set to stop polling gracefully.

Parameters:
  • client – API client instance forwarded to the provider.

  • source_id (str) – The data_id or physical_sample_id to poll trajectory for.

  • interval (float) – Seconds between polls. Defaults to 1.0.

  • last_n (int | None) – Last number of trajectory data points to poll for. None is all.

  • on_result (Callable[[DataFrame], None]) – Callback invoked with each yielded result.

  • **kwargs – Additional keyword arguments forwarded to iter_poll_trajectory (e.g., distinct_by, until, max_polls, fire_immediately, jitter, on_error).

Returns:

Event that, when set, requests the polling thread to stop.

Return type:

threading.Event

atomscale.similarity.polling.start_polling_trajectory_task(client, source_id: str, *, interval: float = 1.0, last_n: int | None = None, on_result: Callable[[DataFrame], Any] | None = None, **kwargs) Task[None][source]

Start polling trajectory data as an asyncio.Task.

Wraps aiter_poll_trajectory in a background Task. If on_result returns a coroutine, it will be awaited before the next iteration.

Parameters:
  • client – API client instance forwarded to the provider.

  • source_id (str) – The data_id or physical_sample_id to poll trajectory for.

  • interval (float) – Seconds between polls. Defaults to 1.0.

  • last_n (int | None) – Last number of trajectory data points to poll for. None is all.

  • on_result (Callable[[DataFrame], Any] | None) – Optional callback invoked with each yielded result. If it returns a coroutine, it will be awaited.

  • **kwargs – Additional keyword arguments forwarded to aiter_poll_trajectory (e.g., distinct_by, until, max_polls, fire_immediately, jitter, on_error).

Returns:

A created and started Task. Cancel it to stop polling.

Return type:

asyncio.Task[None]

Raises:

RuntimeError – If no running event loop is available when called.