Monitor Live Results#
Poll for analysis results in real-time during active streaming sessions.
When to Use This#
Watch analysis results arrive during a live RHEED stream
Update a dashboard or plot as new data comes in
Trigger actions when certain conditions are met
Tip
For already-completed data, use Analysis Results instead.
Choose Your Approach#
Approach |
Use When |
|---|---|
Synchronous loop |
Simple scripts that can block |
Background thread |
GUI apps or acquisition loops that can’t block |
Async iterator |
Asyncio applications |
Synchronous Polling#
Block and wait for each update:
from atomscale import Client
from atomscale.timeseries.polling import iter_poll
client = Client()
data_id = "YOUR_STREAM_DATA_ID"
for result in iter_poll(
client,
data_id=data_id,
interval=5.0, # Poll every 5 seconds
last_n=10, # Get last 10 rows
max_polls=20, # Stop after 20 polls
):
print(f"Latest timestamp: {result['timestamp'].iloc[-1]}")
print(result.tail())
Background Thread#
Poll without blocking your main code:
from atomscale import Client
from atomscale.timeseries.polling import start_polling_thread
client = Client()
collected = []
def on_new_data(result):
print(f"Got {len(result)} rows")
collected.append(result)
stop_event = start_polling_thread(
client,
data_id="YOUR_STREAM_DATA_ID",
interval=5.0,
last_n=10,
max_polls=20,
on_result=on_new_data,
)
# Your main code continues here...
# To stop early:
# stop_event.set()
Caution
The callback runs in the polling thread. Use queue.Queue or other
thread-safe mechanisms to update UI elements.
Async Polling#
For asyncio applications:
import asyncio
from atomscale import Client
from atomscale.timeseries.polling import aiter_poll
client = Client()
async def watch_stream():
async for result in aiter_poll(
client,
data_id="YOUR_STREAM_DATA_ID",
interval=5.0,
last_n=10,
max_polls=20,
):
print(f"Got {len(result)} rows")
asyncio.run(watch_stream())
Avoid Duplicate Processing#
Use distinct_by to skip results you’ve already seen:
def latest_timestamp(df):
if df.empty or "timestamp" not in df.columns:
return None
return df["timestamp"].iloc[-1]
for result in iter_poll(
client,
data_id=data_id,
interval=5.0,
distinct_by=latest_timestamp, # Only yield when timestamp changes
):
process(result)
Similarity Trajectory Polling#
For similarity trajectory data (structure-property relationships), use the dedicated trajectory polling functions. These auto-stop when the trajectory completes:
from atomscale.similarity.polling import iter_poll_trajectory
for result in iter_poll_trajectory(
client,
source_id="YOUR_DATA_ID", # or physical_sample_id
interval=5.0,
last_n=10,
):
print(f"Active: {result['Active'].any()}")
if not result["Active"].any():
print("Trajectory complete")
The trajectory functions mirror the timeseries API:
iter_poll_trajectory()- Synchronous loopstart_polling_trajectory_thread()- Background threadaiter_poll_trajectory()- Async iteratorstart_polling_trajectory_task()- Async background task