monitor.collector

MetricCollector — collects metrics from multiple RemoteHosts via asyncio.gather().

On each tick all hosts are polled simultaneously so results share one timestamp.

Supports three data sources:
  1. Live collection from multiple RemoteHosts (asyncio.gather() per tick)

  2. Historical JSON files

  3. Historical SQLite databases (written by a previous live collection)

class otto.monitor.collector.MonitorTarget(host, parsers=<factory>, core_count=1)

Bases: object

Pairs a RemoteHost with the parser dict to use when collecting from it.

By default all hosts use DEFAULT_PARSERS. Pass a custom dict to add new metrics or override built-in commands for a specific host:

MonitorTarget(
    host=gpu_host,
    parsers={
        **DEFAULT_PARSERS,
        'nvidia-smi --query-gpu=utilization.gpu --format=csv,noheader,nounits': NvidiaGpuParser(),
    },
)

In most cases you do not construct these directly — use register_host_parsers() from an init module and let the CLI build targets automatically.

host : --is-rst--:py:class:`~otto.host.remoteHost.RemoteHost`
parsers : --is-rst--:py:class:`dict`\ \[:py:class:`str`, :py:class:`~otto.monitor.parsers.MetricParser`]
core_count : --is-rst--:py:class:`int` = 1
class otto.monitor.collector.MetricCollector(hosts=None, parsers=None, db_path=None, targets=None)

Bases: object

Collects numeric metrics from multiple RemoteHosts and stores time-series data.

On each tick, all hosts are polled simultaneously via asyncio.gather() so that results from every host share the same timestamp.

Series keys have the form "hostname/metric_label" (e.g. "router1/CPU %").

Parameters:
  • hosts – Remote hosts to monitor. Pass None (or omit) when loading historical data.

  • parsers – Metric parser instances. Defaults to DEFAULT_PARSERS (cpu, mem, disk, load).

  • db_path – Optional SQLite file for persistence. If None, data is in-memory only.

async init_db()

Open a persistent aiosqlite connection with WAL mode and file lock.

Must be awaited before any DB writes. Called automatically by run(); callers that skip run() (e.g. tests) should call this explicitly.

Return type:

None

async close_db()

Close the persistent DB connection and release the file lock.

Return type:

None

async close()

Close the DB connection and all live host sessions.

Must be awaited before the surrounding event loop exits — otherwise subprocess transports owned by LocalSession are GC’d after the loop is closed and raise “Event loop is closed” from __del__.

Return type:

None

async run(interval=datetime.timedelta(seconds=5), duration=None)

Collect metrics from all hosts on each tick.

Each host’s collection is bounded by the interval — if a host does not respond in time, it is skipped for that tick and the session is recovered automatically. This prevents a single slow host from blocking collection on all other hosts.

Runs inside the caller’s event loop. Cancel the task wrapping this coroutine (or cancel it directly) to stop collection gracefully.

Parameters:
  • interval – Collection interval as a timedelta.

  • duration – Optional total run time. None means run forever.

Return type:

None

async add_event(label, timestamp=None, color='#888888', dash='dash', source='manual', end_timestamp=None)

Record a labeled event and push it to all dashboard subscribers.

Return type:

MonitorEvent

async delete_event(event_id)

Remove an event by id. Returns True if found and removed, False otherwise.

Return type:

bool

async update_event(event_id, label, color, dash, end_timestamp=None)

Update an existing event’s label, color, dash, and end_timestamp. Returns the updated event or None.

Return type:

MonitorEvent | None

get_series()

Return a snapshot of all series (metrics and per-process).

Format: {"hostname/label": [(ts, value, meta), ...]}

Return type:

dict[str, list[tuple[datetime, float, dict[str, Any] | None]]]

get_chart_map()

Return a mapping of series label → chart group key.

Used by the dashboard to assign historical series (loaded from /api/data) to their correct Plotly chart containers without requiring series_labels() on each parser. The map is built lazily as data arrives in _process_host_results().

Return type:

dict[str, str]

get_events()

Return all recorded events in chronological order.

Return type:

list[MonitorEvent]

get_meta()

Return metadata for the dashboard (host names, metric labels/units, tabs).

Return type:

dict[str, Any]

subscribe()

Register a new SSE subscriber and return its queue.

Return type:

Queue[dict[str, Any]]

unsubscribe(q)
Return type:

None

classmethod from_json(path, parsers=None)

Load historical metrics from a JSON file.

Expected format:

{
  "metrics": [{"timestamp": "...", "host": "...", "label": "...", "value": 42.0}, ...],
  "events":  [{"timestamp": "...", "label": "...", "source": "...",
               "color": "...", "dash": "..."}, ...]
}

The host field is optional for backward compatibility.

Return type:

MetricCollector

async classmethod from_sqlite(path, parsers=None)

Load historical metrics and events from a SQLite database.

Return type:

MetricCollector

export_json(path)

Export all collected data to a JSON file.

Return type:

None

to_json()

Serialize all metrics and events to a JSON string compatible with --file.

Return type:

str