monitor.collector¶
MetricCollector — collects metrics from multiple UnixHosts via asyncio.gather().
On each tick all hosts are polled simultaneously so results share one timestamp.
- Supports three data sources:
Live collection from multiple UnixHosts (asyncio.gather() per tick)
Historical JSON files
Historical SQLite databases (written by a previous live collection)
- class otto.monitor.collector.MetricView(*args, **kwargs)¶
Bases:
ProtocolThe presentation surface a series needs to be charted.
Both
MetricParserandSnmpMetricsatisfy this structurally, so the record/publish path is identical whether a point came from a shell command or an SNMP OID.
- class otto.monitor.collector.MonitorTarget(host: RemoteHost, parsers: dict[str, ~otto.monitor.parsers.MetricParser] = <factory>, core_count: int = 1, snmp: ~otto.monitor.snmp.SnmpSource | None = None)¶
Bases:
objectPairs a UnixHost with the parser dict to use when collecting from it.
By default all hosts use DEFAULT_PARSERS. Pass a custom dict to add new metrics or override built-in commands for a specific host:
MonitorTarget( host=gpu_host, parsers={ **DEFAULT_PARSERS, 'nvidia-smi --query-gpu=utilization.gpu --format=csv,noheader,nounits': NvidiaGpuParser(), }, )In most cases you do not construct these directly — use
register_host_parsers()from an init module and let the CLI build targets automatically.Set
snmpto collect from this host over SNMP instead of by running shell commands — the host then needs no shell parsers, andparsersis ignored. This is what lets otto monitor a host (embedded or Unix) over a channel separate from command execution.- host : RemoteHost¶
- parsers : dict[str, MetricParser]¶
-
snmp : SnmpSource | None =
None¶
-
class otto.monitor.collector.MetricCollector(hosts: collections.abc.Sequence[RemoteHost] | None =
None, parsers: list[MetricParser] | None =None, db_path: str | None =None, targets: list[MonitorTarget] | None =None)¶ Bases:
objectCollects numeric metrics from multiple UnixHosts and stores time-series data.
On each tick, all hosts are polled simultaneously via asyncio.gather() so that results from every host share the same timestamp.
Series keys have the form
"hostname/metric_label"(e.g."router1/CPU %").- Parameters:¶
hosts – Remote hosts to monitor. Pass
None(or omit) when loading historical data.parsers – Metric parser instances. Defaults to DEFAULT_PARSERS (cpu, mem, disk, load).
db_path – Optional SQLite file for persistence. If
None, data is in-memory only.
- async init_db() None¶
Open a persistent aiosqlite connection with WAL mode and file lock.
Must be awaited before any DB writes. Called automatically by
run(); callers that skiprun()(e.g. tests) should call this explicitly.
- async close() None¶
Close the DB connection and all live host sessions.
Must be awaited before the surrounding event loop exits — otherwise subprocess transports owned by LocalSession are GC’d after the loop is closed and raise “Event loop is closed” from __del__.
-
async run(interval: timedelta =
datetime.timedelta(seconds=5), duration: timedelta | None =None) None¶ Collect metrics from all hosts on each tick.
Each host’s collection is bounded by the interval — if a host does not respond in time, it is skipped for that tick and the session is recovered automatically. This prevents a single slow host from blocking collection on all other hosts.
Runs inside the caller’s event loop. Cancel the task wrapping this coroutine (or cancel it directly) to stop collection gracefully.
- Parameters:¶
interval – Collection interval as a timedelta.
duration – Optional total run time.
Nonemeans run forever.
-
async add_event(label: str, timestamp: datetime | None =
None, color: str ='#888888', dash: str ='dash', source: str ='manual', end_timestamp: datetime | None =None) MonitorEvent¶ Record a labeled event and push it to all dashboard subscribers.
- async delete_event(event_id: int) bool¶
Remove an event by id. Returns True if found and removed, False otherwise.
-
async update_event(event_id: int, label: str, color: str, dash: str, end_timestamp: datetime | None =
None) MonitorEvent | None¶ Update an existing event’s label, color, dash, and end_timestamp. Returns the updated event or None.
- get_series() dict[str, list[MetricPoint]]¶
Return a snapshot of all series (metrics and per-process).
Format:
{"hostname/label": [MetricPoint(ts, value, meta), ...]}
- get_chart_map() dict[str, str]¶
Return a mapping of series label → chart group key.
Used by the dashboard to assign historical series (loaded from
/api/data) to their correct Plotly chart containers without requiringseries_labels()on each parser. The map is built lazily as data arrives in_process_host_results().
- get_events() list[MonitorEvent]¶
Return all recorded events in chronological order.
- get_meta() dict[str, Any]¶
Return metadata for the dashboard (host names, metric labels/units, tabs).
-
classmethod from_json(path: str, parsers: list[MetricParser] | None =
None) MetricCollector¶ Load historical metrics from a JSON file.
Expected format:
{ "metrics": [{"timestamp": "...", "host": "...", "label": "...", "value": 42.0}, ...], "events": [{"timestamp": "...", "label": "...", "source": "...", "color": "...", "dash": "..."}, ...] }The
hostfield is optional for backward compatibility.
-
async classmethod from_sqlite(path: str, parsers: list[MetricParser] | None =
None) MetricCollector¶ Load historical metrics and events from a SQLite database.