monitor.collector

MetricCollector — collects metrics from multiple UnixHosts via asyncio.gather().

On each tick all hosts are polled simultaneously so results share one timestamp.

Supports three data sources:
  1. Live collection from multiple UnixHosts (asyncio.gather() per tick)

  2. Historical JSON files

  3. Historical SQLite databases (written by a previous live collection)

class otto.monitor.collector.MetricView(*args, **kwargs)

Bases: Protocol

The presentation surface a series needs to be charted.

Both MetricParser and SnmpMetric satisfy this structurally, so the record/publish path is identical whether a point came from a shell command or an SNMP OID.

chart : str
y_title : str
unit : str
tab : str
tab_label : str
class otto.monitor.collector.MonitorTarget(host: RemoteHost, parsers: dict[str, ~otto.monitor.parsers.MetricParser] = <factory>, core_count: int = 1, snmp: ~otto.monitor.snmp.SnmpSource | None = None)

Bases: object

Pairs a UnixHost with the parser dict to use when collecting from it.

By default all hosts use DEFAULT_PARSERS. Pass a custom dict to add new metrics or override built-in commands for a specific host:

MonitorTarget(
    host=gpu_host,
    parsers={
        **DEFAULT_PARSERS,
        'nvidia-smi --query-gpu=utilization.gpu --format=csv,noheader,nounits': NvidiaGpuParser(),
    },
)

In most cases you do not construct these directly — use register_host_parsers() from an init module and let the CLI build targets automatically.

Set snmp to collect from this host over SNMP instead of by running shell commands — the host then needs no shell parsers, and parsers is ignored. This is what lets otto monitor a host (embedded or Unix) over a channel separate from command execution.

host : RemoteHost
parsers : dict[str, MetricParser]
core_count : int = 1
snmp : SnmpSource | None = None
class otto.monitor.collector.MetricCollector(hosts: collections.abc.Sequence[RemoteHost] | None = None, parsers: list[MetricParser] | None = None, db_path: str | None = None, targets: list[MonitorTarget] | None = None)

Bases: object

Collects numeric metrics from multiple UnixHosts and stores time-series data.

On each tick, all hosts are polled simultaneously via asyncio.gather() so that results from every host share the same timestamp.

Series keys have the form "hostname/metric_label" (e.g. "router1/CPU %").

Parameters:
  • hosts – Remote hosts to monitor. Pass None (or omit) when loading historical data.

  • parsers – Metric parser instances. Defaults to DEFAULT_PARSERS (cpu, mem, disk, load).

  • db_path – Optional SQLite file for persistence. If None, data is in-memory only.

async init_db() None

Open a persistent aiosqlite connection with WAL mode and file lock.

Must be awaited before any DB writes. Called automatically by run(); callers that skip run() (e.g. tests) should call this explicitly.

async close_db() None

Close the persistent DB connection and release the file lock.

async close() None

Close the DB connection and all live host sessions.

Must be awaited before the surrounding event loop exits — otherwise subprocess transports owned by LocalSession are GC’d after the loop is closed and raise “Event loop is closed” from __del__.

async run(interval: timedelta = datetime.timedelta(seconds=5), duration: timedelta | None = None) None

Collect metrics from all hosts on each tick.

Each host’s collection is bounded by the interval — if a host does not respond in time, it is skipped for that tick and the session is recovered automatically. This prevents a single slow host from blocking collection on all other hosts.

Runs inside the caller’s event loop. Cancel the task wrapping this coroutine (or cancel it directly) to stop collection gracefully.

Parameters:
  • interval – Collection interval as a timedelta.

  • duration – Optional total run time. None means run forever.

async add_event(label: str, timestamp: datetime | None = None, color: str = '#888888', dash: str = 'dash', source: str = 'manual', end_timestamp: datetime | None = None) MonitorEvent

Record a labeled event and push it to all dashboard subscribers.

async delete_event(event_id: int) bool

Remove an event by id. Returns True if found and removed, False otherwise.

async update_event(event_id: int, label: str, color: str, dash: str, end_timestamp: datetime | None = None) MonitorEvent | None

Update an existing event’s label, color, dash, and end_timestamp. Returns the updated event or None.

get_series() dict[str, list[MetricPoint]]

Return a snapshot of all series (metrics and per-process).

Format: {"hostname/label": [MetricPoint(ts, value, meta), ...]}

get_chart_map() dict[str, str]

Return a mapping of series label → chart group key.

Used by the dashboard to assign historical series (loaded from /api/data) to their correct Plotly chart containers without requiring series_labels() on each parser. The map is built lazily as data arrives in _process_host_results().

get_events() list[MonitorEvent]

Return all recorded events in chronological order.

get_meta() dict[str, Any]

Return metadata for the dashboard (host names, metric labels/units, tabs).

subscribe() Queue[dict[str, Any]]

Register a new SSE subscriber and return its queue.

unsubscribe(q: Queue[dict[str, Any]]) None
classmethod from_json(path: str, parsers: list[MetricParser] | None = None) MetricCollector

Load historical metrics from a JSON file.

Expected format:

{
  "metrics": [{"timestamp": "...", "host": "...", "label": "...", "value": 42.0}, ...],
  "events":  [{"timestamp": "...", "label": "...", "source": "...",
               "color": "...", "dash": "..."}, ...]
}

The host field is optional for backward compatibility.

async classmethod from_sqlite(path: str, parsers: list[MetricParser] | None = None) MetricCollector

Load historical metrics and events from a SQLite database.

export_json(path: str) None

Export all collected data to a JSON file.

to_json() str

Serialize all metrics and events to a JSON string compatible with --file.