atomscale.streaming#

High-performance streaming for RHEED and instrument data.

class atomscale.streaming.RHEEDStreamer(api_key: str, endpoint: str | None = None)

Bases: object

A thin, high-performance Python interface (via PyO3) for real-time RHEED frame streaming into the Atomscale platform. This class takes chunks of 8-bit frames (NumPy arrays) and uploads them for analysis while they are being captured from a camera programmatically.

Typical usage:

  1. Instantiate the streamer

2) initialize(…) to create the remote data item and receive data_id 3a) run(data_id, frames_iter) to stream by yielding frame chunks from a generator/iterator, or 3b) push(data_id, chunk_idx, frames) repeatedly to send chunks from your own loop 4) finalize(data_id) to mark the stream complete on the server

Notes

  • Frame dtype is coerced to uint8. Shapes (H, W) or (N, H, W) are accepted; (N,H,W) is preferred for chunks.

  • Packaging happens concurrently for throughput; network PUTs are async.

  • This class is safe to call from Python; heavy work is offloaded to multithreaded async workers.

  • See also: finalize(…).

Parameters:
  • api_key (str) – Your Atomscale API key.

  • endpoint (Optional[str]) – Base API URL. Defaults to “https://api.atomscale.ai”.

Raises:

RuntimeError – If the HTTP client or async runtime cannot be constructed.

finalize(self, data_id: str) None

Explicitly closes the remote stream for the given data_id. This signals to the server that no further chunks will be uploaded and allows any downstream jobs (e.g., indexing, aggregation, or post-processing) to begin.

Typical use: - After run(…) returns (it waits for all chunk tasks), call finalize(data_id). - In push(…) mode, call finalize(data_id) only after you have pushed your last

chunk and ensured any in-flight uploads have finished (since push(…) detaches tasks).

Notes

  • The operation performs a single HTTP POST to the …/end endpoint.

  • It is safe to call more than once; the server may treat it as idempotent, but repeated calls are unnecessary.

Parameters:

data_id (str) – The stream identifier returned by initialize(…).

Returns:

None

Raises:

RuntimeError – If the finalization POST fails.

See also

run(data_id, frames_iter), push(data_id, chunk_idx, frames)

initialize(self, fps: float, rotations_per_min: float, chunk_size: int, stream_name: str | None = None, physical_sample: str | None = None, project_id: str | None = None) str

Creates a new remote data item for this stream and returns its data_id. Also captures runtime configuration used for subsequent chunk uploads.

The rotational period (frames per rotation) is computed as: fpr = (fps * 60.0) / rotations_per_min. If rotations_per_min <= 0.0, the stream is treated as stationary (no rotation).

After streaming via run(…) or push(…), call finalize(data_id) to mark the stream as complete.

Parameters:
  • fps (float) – Capture rate in frames per second.

  • rotations_per_min (float) – Wafer/crystal rotations per minute; use 0.0 for stationary operation.

  • chunk_size (int) – The intended number of frames per chunk you will send with run(…) or push(…).

  • stream_name (Optional[str]) – Human-readable name shown in the platform. If None or an empty string, a default like “RHEED Stream @ 1:23PM” is used.

  • physical_sample (Optional[str]) – Name or UUID of a physical sample to associate with the data item. If a UUID is provided, it must match an existing sample. If a name is provided, it is matched case-insensitively against existing samples, or a new sample is created if no match is found.

  • project_id (Optional[str]) – UUID of a project to associate with the stream. When provided along with physical_sample, the project’s tracking_physical_sample_id configuration is automatically updated to link the physical sample to the project for growth monitoring.

  • tags (Optional[List[str]]) – List of tag names or UUIDs to attach to the data item. Names are matched case-insensitively against existing org tags; unknown names are created. UUIDs must reference existing tags.

Returns:

The created data_id for this stream.

Return type:

str

Raises:

RuntimeError – If the initialization POST fails.

push(self, data_id: str, chunk_idx: int, frames: numpy.ndarray, capture_start_ms_utc: int | None = None) None

Callback mode. Push a single chunk of frames that you produced externally (e.g., from a camera callback). Call repeatedly for subsequent chunks.

The frames argument may be (N,H,W) or (H,W); dtype is coerced to uint8.

After your last push(…), call `finalize(data_id)` to mark the stream as complete on the server.

Timestamps#

When capture_start_ms_utc is not provided, this method stamps each chunk with Utc::now() sampled the moment push() is entered, before any GIL-held packaging work. The chunk’s end_unix_ms_utc is then start + (n / fps) * 1000. Inter-chunk gaps therefore reflect real arrival jitter; intra-chunk span follows declared fps. Pass an explicit timestamp when you have a hardware clock (camera trigger, OS monotonic-to-utc conversion).

type data_id:

param data_id:

The remote data identifier returned by initialize(…).

type data_id:

str

type chunk_idx:

param chunk_idx:

Zero-based index of this chunk (used in Zarr shard path).

type chunk_idx:

int

type frames:

param frames:

(N,H,W) or (H,W) grayscale frames as uint8.

type frames:

numpy.ndarray

type capture_start_ms_utc:

param capture_start_ms_utc:

Capture-start timestamp in milliseconds since UNIX epoch (UTC). Pass when you have a real capture-time clock (camera trigger, OS monotonic-to-utc conversion). When None, sampled from Utc::now() on entry.

type capture_start_ms_utc:

Optional[int]

returns:

None

raises RuntimeError:

If packaging or upload fails internally.

See also

finalize(data_id)

run(self, data_id: str, frames_iter: Iterable) None

Generator/iterator mode. Iterates frames_iter, where each yielded item is one of:

  • (N, H, W) numpy.ndarray[uint8]: a chunk of N grayscale frames

  • (H, W) numpy.ndarray[uint8]: a single frame (treated as N = 1)

  • (frames, capture_start_ms_utc) 2-tuple: frames as above + an explicit capture-start timestamp (int milliseconds since UNIX epoch, UTC). Use this when you have hardware/OS-level timestamps for each chunk (camera trigger time, OS monotonic-to-utc conversion, etc.).

For each yielded item, the method: 1) Converts to flat uint8 bytes, 2) Packages frames on a blocking worker thread, 3) Uploads the shard, 4) Proceeds concurrently for high throughput.

This method blocks until all spawned tasks complete. After run(…) returns, call finalize(data_id) to mark the stream complete on the server.

Timestamps#

When the caller does not supply a capture_start_ms_utc, this method stamps each chunk with Utc::now() sampled the moment the iterator yields, before any GIL-held packaging work. The chunk’s end_unix_ms_utc is then start + (n / fps) * 1000. Inter-chunk gaps therefore reflect real arrival jitter; intra-chunk span follows declared fps. Pass an explicit timestamp when you have a hardware clock (camera trigger, OS monotonic-to-utc conversion).

type data_id:

param data_id:

The stream data ID returned by initialize(…).

type data_id:

str

type frames_iter:

param frames_iter:

Python iterable/generator of (N,H,W) / (H,W) uint8 arrays, or (frames, capture_start_ms_utc) tuples.

type frames_iter:

Iterable

returns:

None

raises RuntimeError:

If any packaging/join/upload step fails.

raises ValueError:

If a yielded tuple’s second element is not an int.

See also

finalize(data_id)

class atomscale.streaming.TimeseriesStreamer(api_key: str, endpoint: str | None = None, points_per_chunk: int = 100)

Bases: object

A high-performance Python interface (via PyO3) for real-time instrument time series streaming into the Atomscale platform. This class takes chunks of scalar time series data and uploads them asynchronously for storage and analysis.

Typical usage:

  1. Instantiate the streamer with API key and optional endpoint

  2. initialize(…) to create the remote data item and get data_id

  3. push(…) repeatedly to send data chunks (fire-and-forget, non-blocking)

Notes

  • Each push() spawns an async upload task and returns immediately.

  • Chunks are ordered server-side by chunk_index, so out-of-order arrival is handled.

  • Multiple channels can be streamed for the same data_id.

Parameters:
  • api_key (str) – Your Atomscale API key.

  • endpoint (Optional[str]) – Base API URL. Defaults to “https://api.atomscale.ai”.

  • points_per_chunk (int) – Expected number of points per chunk. Defaults to 100.

Raises:

RuntimeError – If the HTTP client or async runtime cannot be constructed.

finalize(self, data_id: str) None

Finalize the stream, marking it as complete on the server.

Call this after all data has been pushed to signal that the stream is finished.

Parameters:

data_id (str) – The stream identifier returned by initialize().

Returns:

None

Raises:

RuntimeError – If the request fails.

initialize(self, stream_name: Optional[str] = None, synth_source_id: Optional[int] = None, ...) str

Initialize a new time series stream on the server.

Creates data_catalogue and processed_metrology_catalogue entries. Returns data_id to use for subsequent push() calls.

Parameters:
  • stream_name (Optional[str]) – Human-readable name for the stream.

  • synth_source_id (Optional[int]) – Growth instrument ID to link. Must belong to your organization. Use list_instruments() to see available instruments.

  • physical_sample (Optional[str]) – Name or UUID of a physical sample to associate with the data item. If a UUID is provided, it must match an existing sample. If a name is provided, it is matched case-insensitively against existing samples, or a new sample is created if no match is found.

  • project_id (Optional[str]) – UUID of a project to associate with the stream. When provided along with physical_sample, the project’s tracking_physical_sample_id configuration is automatically updated to link the physical sample to the project for growth monitoring.

  • tags (Optional[List[str]]) – List of tag names or UUIDs to attach to the data item. Names are matched case-insensitively against existing org tags; unknown names are created. UUIDs must reference existing tags.

Returns:

The data_id for this stream.

Return type:

str

Raises:

RuntimeError – If the initialization request fails or instrument not found.

push(self, data_id: str, chunk_index: int, channel_name: str, timestamps: list[float], values: list[float], ...) None

Push a single chunk of time series data for a channel. This method is fire-and-forget: it spawns an async upload task and returns immediately.

Parameters:
  • data_id (str) – The stream identifier returned by initialize().

  • chunk_index (int) – Zero-based index of this chunk for ordering.

  • channel_name (str) – Name of the data channel (e.g., “temperature”, “pressure”).

  • timestamps (list[float]) – Unix epoch timestamps in seconds.

  • values (list[float]) – Measured values corresponding to timestamps.

  • units (Optional[str]) – Units for the values.

Returns:

None

Raises:

RuntimeError – If timestamps/values have different lengths.

push_multi(self, data_id: str, chunk_index: int, channels: dict[str, dict]) None

Push data for multiple channels at once. Each channel’s data is uploaded as a separate async task.

Parameters:
  • data_id (str) – The stream identifier returned by initialize().

  • chunk_index (int) – Zero-based index of this chunk.

  • channels (dict[str, dict]) – Mapping of channel_name to channel data dict. Each channel dict should have: - “timestamps” (list[float]): Unix epoch timestamps in seconds - “values” (list[float]): Measured values - “units” (str, optional): Units for the values

Example

streamer.push_multi(data_id, 0, {

“temperature”: {“timestamps”: [0.0, 0.1], “values”: [25.0, 25.1], “units”: “C”}, “pressure”: {“timestamps”: [0.0, 0.1], “values”: [1.0, 1.1]},

})

Returns:

None

Raises:

RuntimeError – If any channel has mismatched lengths.

run(self, data_id: str, channel_name: str, data_iter: Iterable[tuple[list[float], list[float]]], units: str | None = None) None

Iterator mode. Stream time series data by iterating over chunks. The chunk_index is automatically assigned based on iteration order.

Each yielded item should be a tuple of (timestamps, values) where both are lists of floats.

This method blocks until all upload tasks complete.

Parameters:
  • data_id (str) – The stream identifier returned by initialize().

  • channel_name (str) – Name of the data channel (e.g., “temperature”, “pressure”).

  • data_iter (Iterable[tuple[list[float], list[float]]]) – Iterator yielding (timestamps, values) tuples.

  • units (Optional[str]) – Units for the values.

Returns:

None

Raises:

RuntimeError – If any chunk has mismatched lengths.