Source code for atomscale.timeseries.provider
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Mapping
from typing import Any, ClassVar, Generic, TypeVar
from pandas import DataFrame
from atomscale.core import BaseClient
R = TypeVar("R") # the result type this provider returns
_STAT_SUFFIX_LABELS: Mapping[str, str] = {
"_z_score": "Z Score",
"_ema_z_score": "EMA Z Score",
"_anomaly_probability": "Anomaly Probability",
}
[docs]
def extend_with_statistics(rename_map: Mapping[str, str]) -> dict[str, str]:
"""Add suffix-based statistical labels to a rename map."""
extended = dict(rename_map)
for source_name, label in rename_map.items():
for suffix, suffix_label in _STAT_SUFFIX_LABELS.items():
extended[f"{source_name}{suffix}"] = f"{label} {suffix_label}"
return extended
[docs]
class TimeseriesProvider(ABC, Generic[R]):
"""Strategy interface for parsing timeseries by domain."""
# canonical domain name used as a key in the registry
TYPE: ClassVar[str]
[docs]
@abstractmethod
def fetch_raw(self, client: BaseClient, data_id: str) -> Any:
"""Perform the HTTP GET(s) to retrieve raw payload(s)."""
[docs]
@abstractmethod
def to_dataframe(self, raw: Any) -> DataFrame:
"""Convert raw payload to a tidy DataFrame with domain-specific renames/index."""
[docs]
@abstractmethod
def build_result(
self,
client: BaseClient,
data_id: str,
data_type: str,
ts_df: DataFrame,
) -> R:
"""Build time series result object"""
# Optional override points
[docs]
def snapshot_url(self, data_id: str) -> str: # noqa: ARG002
"""API endpoint that exposes extracted/snapshot frames."""
return ""
[docs]
def snapshot_image_uuids(
self,
frames_payload: dict[str, Any], # noqa: ARG002
) -> list[dict]:
"""Extract requests from frames payload. Default: no snapshots."""
return []
[docs]
def fetch_snapshot(
self,
client: BaseClient, # noqa: ARG002
req: dict, # noqa: ARG002
) -> Any | None:
"""Resolve one snapshot request → domain-specific ImageResult (or None)."""
return None