monitor.collector¶
MetricCollector — collects metrics from multiple RemoteHosts via asyncio.gather().
On each tick all hosts are polled simultaneously so results share one timestamp.
- Supports three data sources:
Live collection from multiple RemoteHosts (asyncio.gather() per tick)
Historical JSON files
Historical SQLite databases (written by a previous live collection)
- class otto.monitor.collector.MonitorTarget(host, parsers=<factory>, core_count=1)¶
Bases:
objectPairs a RemoteHost with the parser dict to use when collecting from it.
By default all hosts use DEFAULT_PARSERS. Pass a custom dict to add new metrics or override built-in commands for a specific host:
MonitorTarget( host=gpu_host, parsers={ **DEFAULT_PARSERS, 'nvidia-smi --query-gpu=utilization.gpu --format=csv,noheader,nounits': NvidiaGpuParser(), }, )In most cases you do not construct these directly — use
register_host_parsers()from an init module and let the CLI build targets automatically.- host : --is-rst--:py:class:`~otto.host.remoteHost.RemoteHost`¶
- parsers : --is-rst--:py:class:`dict`\ \[:py:class:`str`, :py:class:`~otto.monitor.parsers.MetricParser`]¶
-
core_count : --is-rst--:py:class:`int` =
1¶
-
class otto.monitor.collector.MetricCollector(hosts=
None, parsers=None, db_path=None, targets=None)¶ Bases:
objectCollects numeric metrics from multiple RemoteHosts and stores time-series data.
On each tick, all hosts are polled simultaneously via asyncio.gather() so that results from every host share the same timestamp.
Series keys have the form
"hostname/metric_label"(e.g."router1/CPU %").- Parameters:¶
hosts – Remote hosts to monitor. Pass
None(or omit) when loading historical data.parsers – Metric parser instances. Defaults to DEFAULT_PARSERS (cpu, mem, disk, load).
db_path – Optional SQLite file for persistence. If
None, data is in-memory only.
- async init_db()¶
Open a persistent aiosqlite connection with WAL mode and file lock.
Must be awaited before any DB writes. Called automatically by
run(); callers that skiprun()(e.g. tests) should call this explicitly.- Return type:¶
None
- async close()¶
Close the DB connection and all live host sessions.
Must be awaited before the surrounding event loop exits — otherwise subprocess transports owned by LocalSession are GC’d after the loop is closed and raise “Event loop is closed” from __del__.
- Return type:¶
None
-
async run(interval=
datetime.timedelta(seconds=5), duration=None)¶ Collect metrics from all hosts on each tick.
Each host’s collection is bounded by the interval — if a host does not respond in time, it is skipped for that tick and the session is recovered automatically. This prevents a single slow host from blocking collection on all other hosts.
Runs inside the caller’s event loop. Cancel the task wrapping this coroutine (or cancel it directly) to stop collection gracefully.
-
async add_event(label, timestamp=
None, color='#888888', dash='dash', source='manual', end_timestamp=None)¶ Record a labeled event and push it to all dashboard subscribers.
- Return type:¶
- async delete_event(event_id)¶
Remove an event by id. Returns True if found and removed, False otherwise.
- Return type:¶
bool
-
async update_event(event_id, label, color, dash, end_timestamp=
None)¶ Update an existing event’s label, color, dash, and end_timestamp. Returns the updated event or None.
- Return type:¶
MonitorEvent|None
- get_series()¶
Return a snapshot of all series (metrics and per-process).
Format:
{"hostname/label": [(ts, value, meta), ...]}- Return type:¶
dict[str,list[tuple[datetime,float,dict[str,Any] |None]]]
- get_chart_map()¶
Return a mapping of series label → chart group key.
Used by the dashboard to assign historical series (loaded from
/api/data) to their correct Plotly chart containers without requiringseries_labels()on each parser. The map is built lazily as data arrives in_process_host_results().- Return type:¶
dict[str,str]
- get_events()¶
Return all recorded events in chronological order.
- Return type:¶
list[MonitorEvent]
- get_meta()¶
Return metadata for the dashboard (host names, metric labels/units, tabs).
- Return type:¶
dict[str,Any]
- subscribe()¶
Register a new SSE subscriber and return its queue.
- Return type:¶
Queue[dict[str,Any]]
-
classmethod from_json(path, parsers=
None)¶ Load historical metrics from a JSON file.
Expected format:
{ "metrics": [{"timestamp": "...", "host": "...", "label": "...", "value": 42.0}, ...], "events": [{"timestamp": "...", "label": "...", "source": "...", "color": "...", "dash": "..."}, ...] }The
hostfield is optional for backward compatibility.- Return type:¶