atomscale.streaming#
High-performance streaming for RHEED and instrument data.
- class atomscale.streaming.RHEEDStreamer(api_key: str, endpoint: str | None = None)
Bases:
objectA thin, high-performance Python interface (via PyO3) for real-time RHEED frame streaming into the Atomscale platform. This class takes chunks of 8-bit frames (NumPy arrays) and uploads them for analysis while they are being captured from a camera programmatically.
Typical usage:
Instantiate the streamer
2) initialize(…) to create the remote data item and receive data_id 3a) run(data_id, frames_iter) to stream by yielding frame chunks from a generator/iterator, or 3b) push(data_id, chunk_idx, frames) repeatedly to send chunks from your own loop 4) finalize(data_id) to mark the stream complete on the server
Notes
Frame dtype is coerced to uint8. Shapes (H, W) or (N, H, W) are accepted; (N,H,W) is preferred for chunks.
Packaging happens concurrently for throughput; network PUTs are async.
This class is safe to call from Python; heavy work is offloaded to multithreaded async workers.
See also: finalize(…).
- Parameters:
api_key (str) – Your Atomscale API key.
endpoint (Optional[str]) – Base API URL. Defaults to “https://api.atomscale.ai”.
- Raises:
RuntimeError – If the HTTP client or async runtime cannot be constructed.
- finalize(self, data_id: str) None
Explicitly closes the remote stream for the given data_id. This signals to the server that no further chunks will be uploaded and allows any downstream jobs (e.g., indexing, aggregation, or post-processing) to begin.
Typical use: - After run(…) returns (it waits for all chunk tasks), call finalize(data_id). - In push(…) mode, call finalize(data_id) only after you have pushed your last
chunk and ensured any in-flight uploads have finished (since push(…) detaches tasks).
Notes
The operation performs a single HTTP POST to the …/end endpoint.
It is safe to call more than once; the server may treat it as idempotent, but repeated calls are unnecessary.
- Parameters:
data_id (str) – The stream identifier returned by initialize(…).
- Returns:
None
- Raises:
RuntimeError – If the finalization POST fails.
See also
run(data_id, frames_iter), push(data_id, chunk_idx, frames)
- initialize(self, fps: float, rotations_per_min: float, chunk_size: int, stream_name: str | None = None, physical_sample: str | None = None, project_id: str | None = None) str
Creates a new remote data item for this stream and returns its data_id. Also captures runtime configuration used for subsequent chunk uploads.
The rotational period (frames per rotation) is computed as: fpr = (fps * 60.0) / rotations_per_min. If rotations_per_min <= 0.0, the stream is treated as stationary (no rotation).
After streaming via run(…) or push(…), call finalize(data_id) to mark the stream as complete.
- Parameters:
fps (float) – Capture rate in frames per second.
rotations_per_min (float) – Wafer/crystal rotations per minute; use 0.0 for stationary operation.
chunk_size (int) – The intended number of frames per chunk you will send with run(…) or push(…).
stream_name (Optional[str]) – Human-readable name shown in the platform. If None or an empty string, a default like “RHEED Stream @ 1:23PM” is used.
physical_sample (Optional[str]) – Name or UUID of a physical sample to associate with the data item. If a UUID is provided, it must match an existing sample. If a name is provided, it is matched case-insensitively against existing samples, or a new sample is created if no match is found.
project_id (Optional[str]) – UUID of a project to associate with the stream. When provided along with physical_sample, the project’s tracking_physical_sample_id configuration is automatically updated to link the physical sample to the project for growth monitoring.
tags (Optional[List[str]]) – List of tag names or UUIDs to attach to the data item. Names are matched case-insensitively against existing org tags; unknown names are created. UUIDs must reference existing tags.
- Returns:
The created data_id for this stream.
- Return type:
str
- Raises:
RuntimeError – If the initialization POST fails.
- push(self, data_id: str, chunk_idx: int, frames: numpy.ndarray, capture_start_ms_utc: int | None = None) None
Callback mode. Push a single chunk of frames that you produced externally (e.g., from a camera callback). Call repeatedly for subsequent chunks.
The frames argument may be (N,H,W) or (H,W); dtype is coerced to uint8.
After your last push(…), call `finalize(data_id)` to mark the stream as complete on the server.
Timestamps#
When capture_start_ms_utc is not provided, this method stamps each chunk with Utc::now() sampled the moment push() is entered, before any GIL-held packaging work. The chunk’s end_unix_ms_utc is then start + (n / fps) * 1000. Inter-chunk gaps therefore reflect real arrival jitter; intra-chunk span follows declared fps. Pass an explicit timestamp when you have a hardware clock (camera trigger, OS monotonic-to-utc conversion).
- type data_id:
- param data_id:
The remote data identifier returned by initialize(…).
- type data_id:
str
- type chunk_idx:
- param chunk_idx:
Zero-based index of this chunk (used in Zarr shard path).
- type chunk_idx:
int
- type frames:
- param frames:
(N,H,W) or (H,W) grayscale frames as uint8.
- type frames:
numpy.ndarray
- type capture_start_ms_utc:
- param capture_start_ms_utc:
Capture-start timestamp in milliseconds since UNIX epoch (UTC). Pass when you have a real capture-time clock (camera trigger, OS monotonic-to-utc conversion). When None, sampled from Utc::now() on entry.
- type capture_start_ms_utc:
Optional[int]
- returns:
None
- raises RuntimeError:
If packaging or upload fails internally.
See also
finalize(data_id)
- run(self, data_id: str, frames_iter: Iterable) None
Generator/iterator mode. Iterates frames_iter, where each yielded item is one of:
(N, H, W) numpy.ndarray[uint8]: a chunk of N grayscale frames
(H, W) numpy.ndarray[uint8]: a single frame (treated as N = 1)
(frames, capture_start_ms_utc) 2-tuple: frames as above + an explicit capture-start timestamp (int milliseconds since UNIX epoch, UTC). Use this when you have hardware/OS-level timestamps for each chunk (camera trigger time, OS monotonic-to-utc conversion, etc.).
For each yielded item, the method: 1) Converts to flat uint8 bytes, 2) Packages frames on a blocking worker thread, 3) Uploads the shard, 4) Proceeds concurrently for high throughput.
This method blocks until all spawned tasks complete. After run(…) returns, call finalize(data_id) to mark the stream complete on the server.
Timestamps#
When the caller does not supply a capture_start_ms_utc, this method stamps each chunk with Utc::now() sampled the moment the iterator yields, before any GIL-held packaging work. The chunk’s end_unix_ms_utc is then start + (n / fps) * 1000. Inter-chunk gaps therefore reflect real arrival jitter; intra-chunk span follows declared fps. Pass an explicit timestamp when you have a hardware clock (camera trigger, OS monotonic-to-utc conversion).
- type data_id:
- param data_id:
The stream data ID returned by initialize(…).
- type data_id:
str
- type frames_iter:
- param frames_iter:
Python iterable/generator of (N,H,W) / (H,W) uint8 arrays, or (frames, capture_start_ms_utc) tuples.
- type frames_iter:
Iterable
- returns:
None
- raises RuntimeError:
If any packaging/join/upload step fails.
- raises ValueError:
If a yielded tuple’s second element is not an int.
See also
finalize(data_id)
- class atomscale.streaming.TimeseriesStreamer(api_key: str, endpoint: str | None = None, points_per_chunk: int = 100)
Bases:
objectA high-performance Python interface (via PyO3) for real-time instrument time series streaming into the Atomscale platform. This class takes chunks of scalar time series data and uploads them asynchronously for storage and analysis.
Typical usage:
Instantiate the streamer with API key and optional endpoint
initialize(…) to create the remote data item and get data_id
push(…) repeatedly to send data chunks (fire-and-forget, non-blocking)
Notes
Each push() spawns an async upload task and returns immediately.
Chunks are ordered server-side by chunk_index, so out-of-order arrival is handled.
Multiple channels can be streamed for the same data_id.
- Parameters:
api_key (str) – Your Atomscale API key.
endpoint (Optional[str]) – Base API URL. Defaults to “https://api.atomscale.ai”.
points_per_chunk (int) – Expected number of points per chunk. Defaults to 100.
- Raises:
RuntimeError – If the HTTP client or async runtime cannot be constructed.
- finalize(self, data_id: str) None
Finalize the stream, marking it as complete on the server.
Call this after all data has been pushed to signal that the stream is finished.
- Parameters:
data_id (str) – The stream identifier returned by initialize().
- Returns:
None
- Raises:
RuntimeError – If the request fails.
- initialize(self, stream_name: Optional[str] = None, synth_source_id: Optional[int] = None, ...) str
Initialize a new time series stream on the server.
Creates data_catalogue and processed_metrology_catalogue entries. Returns data_id to use for subsequent push() calls.
- Parameters:
stream_name (Optional[str]) – Human-readable name for the stream.
synth_source_id (Optional[int]) – Growth instrument ID to link. Must belong to your organization. Use list_instruments() to see available instruments.
physical_sample (Optional[str]) – Name or UUID of a physical sample to associate with the data item. If a UUID is provided, it must match an existing sample. If a name is provided, it is matched case-insensitively against existing samples, or a new sample is created if no match is found.
project_id (Optional[str]) – UUID of a project to associate with the stream. When provided along with physical_sample, the project’s tracking_physical_sample_id configuration is automatically updated to link the physical sample to the project for growth monitoring.
tags (Optional[List[str]]) – List of tag names or UUIDs to attach to the data item. Names are matched case-insensitively against existing org tags; unknown names are created. UUIDs must reference existing tags.
- Returns:
The data_id for this stream.
- Return type:
str
- Raises:
RuntimeError – If the initialization request fails or instrument not found.
- push(self, data_id: str, chunk_index: int, channel_name: str, timestamps: list[float], values: list[float], ...) None
Push a single chunk of time series data for a channel. This method is fire-and-forget: it spawns an async upload task and returns immediately.
- Parameters:
data_id (str) – The stream identifier returned by initialize().
chunk_index (int) – Zero-based index of this chunk for ordering.
channel_name (str) – Name of the data channel (e.g., “temperature”, “pressure”).
timestamps (list[float]) – Unix epoch timestamps in seconds.
values (list[float]) – Measured values corresponding to timestamps.
units (Optional[str]) – Units for the values.
- Returns:
None
- Raises:
RuntimeError – If timestamps/values have different lengths.
- push_multi(self, data_id: str, chunk_index: int, channels: dict[str, dict]) None
Push data for multiple channels at once. Each channel’s data is uploaded as a separate async task.
- Parameters:
data_id (str) – The stream identifier returned by initialize().
chunk_index (int) – Zero-based index of this chunk.
channels (dict[str, dict]) – Mapping of channel_name to channel data dict. Each channel dict should have: - “timestamps” (list[float]): Unix epoch timestamps in seconds - “values” (list[float]): Measured values - “units” (str, optional): Units for the values
Example
- streamer.push_multi(data_id, 0, {
“temperature”: {“timestamps”: [0.0, 0.1], “values”: [25.0, 25.1], “units”: “C”}, “pressure”: {“timestamps”: [0.0, 0.1], “values”: [1.0, 1.1]},
})
- Returns:
None
- Raises:
RuntimeError – If any channel has mismatched lengths.
- run(self, data_id: str, channel_name: str, data_iter: Iterable[tuple[list[float], list[float]]], units: str | None = None) None
Iterator mode. Stream time series data by iterating over chunks. The chunk_index is automatically assigned based on iteration order.
Each yielded item should be a tuple of (timestamps, values) where both are lists of floats.
This method blocks until all upload tasks complete.
- Parameters:
data_id (str) – The stream identifier returned by initialize().
channel_name (str) – Name of the data channel (e.g., “temperature”, “pressure”).
data_iter (Iterable[tuple[list[float], list[float]]]) – Iterator yielding (timestamps, values) tuples.
units (Optional[str]) – Units for the values.
- Returns:
None
- Raises:
RuntimeError – If any chunk has mismatched lengths.