host.host

Async host abstraction: the Host protocol, BaseHost ABC, and run helpers.

otto.host.host.get_logging_command_output_enabled() bool

Return True if command-output logging is enabled on the active context.

otto.host.host.is_dry_run() bool

Return True if dry-run mode is enabled on the active context.

class otto.host.host.ShellCommand(cmd: str, expects: tuple[str | Pattern[str], str] | list[tuple[str | Pattern[str], str]] | None = None, timeout: float | None = None, log: LogMode | None = None)

Bases: object

A command plus the per-command options that should be used to run it.

Fields left as None inherit from the run-level kwargs on Host.run(). A scalar Expect value is accepted for expects for ergonomics; it is normalized to a one-element list before execution.

cmd : str

Command string to execute.

expects : tuple[str | Pattern[str], str] | list[tuple[str | Pattern[str], str]] | None

Per-command expects. None inherits the run-level expects value.

timeout : float | None

Per-command timeout cap. None inherits the run-level timeout value.

In list form, the effective timeout is always bounded by the remaining cumulative budget.

log : LogMode | None

Per-command logging disposition. None inherits the run-level log value.

class otto.host.host.RunResult(status: Status, statuses: list[CommandStatus])

Bases: object

Unified result of Host.run() regardless of how many commands ran.

statuses always has one entry per issued command. status is the aggregate: Status.Success when every entry is ok, otherwise the first non-ok status encountered (matching the old tuple-form semantics).

status : Status

Aggregate status across all commands.

statuses : list[CommandStatus]

Per-command statuses in execution order.

property only : CommandStatus

Return the sole CommandStatus when exactly one command ran.

Raises ValueError otherwise — useful for single-command call sites that want to read fields directly without unpacking.

class otto.host.host.Host(*args, **kwargs)

Bases: Protocol

Structural protocol defining the public interface every otto host must satisfy.

Implementations of Host connect otto to a specific target type (SSH, serial console, QEMU, etc.). BaseHost provides concrete default implementations for the shared mechanics; individual host classes such as UnixHost or EmbeddedHost inherit from BaseHost and implement the family-specific hooks.

log : LogMode

Standing per-host logging disposition. Composed with the per-command mode via effective_mode at the emit seam.

id : str

Unique identifier for this host.

name : str

Human-readable name for this host.

resources : set[str]

Resources required to reserve this host.

products : list['Product']

Software-under-test deployed to this host (default empty).

power_control : PowerController | None

Pluggable power backend, or None when this host can’t be power-controlled.

async interact() None

Open an interactive shell bridged to the local terminal.

async run(cmds: str | ShellCommand | Sequence[str | ShellCommand], expects: tuple[str | Pattern[str], str] | list[tuple[str | Pattern[str], str]] | None = None, timeout: float | None = None, log: LogMode = LogMode.NORMAL, sudo: bool = False) RunResult

Run one or more commands on the host and collect their results.

Parameters:
  • cmds – A single command or a sequence of commands to run in order. Strings and ShellCommand objects may be mixed.

  • expects – Optional (pattern, response) pair(s) for interactive prompts. Inherited by each command unless overridden per-command.

  • timeout – Per-command timeout for a single command, or a cumulative budget shared across all commands in a sequence. None means no limit.

  • log – Whether to log command output for this call.

  • sudo – If True, each command is run with elevated privileges. Implementations that do not support elevation raise NotImplementedError.

Returns:

A RunResult aggregating each command’s status and output.

async oneshot(cmd: str, timeout: float | None = None, log: LogMode = LogMode.NORMAL) CommandStatus

Run a single command outside the typical stateful run workflow.

Concurrency safety is implementation-dependent. Host families with an independent exec primitive (e.g. UnixHost, LocalHost) open a fresh connection or subprocess per call, so oneshot is safe to use concurrently from multiple coroutines. Families exposing only a single console (e.g. EmbeddedHost) share the persistent session and are not concurrency-safe — see the concrete class.

Returns the CommandStatus for the command.

async open_session(name: str) HostSession

Open a named auxiliary session on this host.

Named sessions are independent of the host’s default persistent session and of each other, allowing concurrent shell interactions. The caller is responsible for closing the returned HostSession when done.

async send(text: str, log: LogMode = LogMode.NORMAL) None

Send raw text to the host’s persistent session without waiting for a response.

Useful for driving interactive prompts or menu-driven interfaces where a full run() round-trip is not appropriate.

async expect(pattern: str | Pattern[str], timeout: float = 30.0) str

Wait for pattern to appear in the host’s session output.

Parameters:
  • pattern – A literal string or compiled regex to match against output.

  • timeout – Maximum seconds to wait before raising a timeout error.

Returns:

The matched text.

async get(src_files: list[Path] | Path, dest_dir: Path) tuple[Status, str]

Download one or more files from the host to a local directory.

Returns a (Status, message) tuple: Success with an empty message on success; a non-ok status with a diagnostic on failure.

async put(src_files: list[Path] | Path, dest_dir: Path) tuple[Status, str]

Upload one or more local files to a directory on the host.

Returns a (Status, message) tuple: Success with an empty message on success; a non-ok status with a diagnostic on failure.

async power(state: str | None = None) tuple[Status, str]

Power this host on, off, or toggle (when state is None).

Returns a (Status, message) tuple.

async reboot(hard: bool = False, wait: bool = False, timeout: float = 600.0) tuple[Status, str]

Reboot this host.

hard=False issues an in-shell reboot; hard=True power-cycles via the PowerController. When wait is True, blocks until the host is reachable again or timeout seconds have elapsed. Returns a (Status, message) tuple.

async shutdown() tuple[Status, str]

Power this host off from its own shell.

Distinct from power() ('off'), which uses an external power controller. Returns a (Status, message) tuple.

async is_reachable(timeout: float = 10.0) bool

Return True if the host responds to a connection probe within timeout seconds.

async wait_until_up(timeout: float, interval: float = 2.0) bool

Poll until the host is reachable or timeout seconds elapse.

Returns True if reachable before the deadline, False otherwise.

async wait_until_down(timeout: float, interval: float = 2.0) bool

Poll until the host is unreachable or timeout seconds elapse.

Returns True if unreachable before the deadline, False otherwise.

async close() None

Close the host’s persistent session and release any held resources.

async stage() tuple[Status, str]

Stage every product onto this host (transfer/place, no install).

Returns a (Status, message) tuple.

async install(stage_only: bool = False) tuple[Status, str]

Stage and then install every product on this host.

When stage_only is True, stops after staging without installing. Returns a (Status, message) tuple, short-circuiting on the first failure.

async uninstall() tuple[Status, str]

Uninstall every product from this host (best-effort).

Returns a (Status, message) tuple.

async is_installed() bool

Return True iff at least one product is declared and all are installed.

async is_uninstalled() bool

Return True iff is_installed() returns False.

class otto.host.host.BaseHost

Bases: ABC

Abstract base class providing shared mechanics for all host implementations.

BaseHost implements the cross-cutting concerns that every host family needs — command budgeting, dry-run stubs, product lifecycle, power/reboot orchestration, and the repeat-command scheduler. Concrete host classes (UnixHost, EmbeddedHost, etc.) inherit from BaseHost, implement the family-specific hooks (_run_one, oneshot, _soft_reboot, …), and satisfy the Host protocol.

id : str
name : str
log : LogMode
resources : set[str]
products : list['Product']
power_control : PowerController | None
async switch_user(user: str = '', password: str | None = None) None

Switch the persistent session to another user via su.

Default raises — only posix-shell hosts (via PosixPrivilege) support su.

as_user(user: str = 'root', password: str | None = None) NoReturn

Async context manager to run a block as user.

Default raises — only posix-shell hosts (via PosixPrivilege) support su-based user switching.

property current_user : str

User this host’s default shell session is currently running as.

Seeded from the login user; changes only through switch_user() / as_user(). See current_user for named sessions.

async interact() None

Open an interactive shell bridged to the local terminal.

Subclasses implement _interact to do the actual protocol work. This wrapper exists so CLI and SDK callers have a single public entry point.

stdin and stdout are bridged directly to the remote terminal and the session is recorded to the otto log. Press Ctrl+] to disconnect locally without ending the remote session; type exit or logout to end the session normally.

async run(cmds: ~types.Annotated[str | ~otto.host.host.ShellCommand | ~collections.abc.Sequence[str | ~otto.host.host.ShellCommand], ~otto.utils.Arg(variadic=True, elem_type=str, name=None, help=Command(s) to run.)], expects: ~types.Annotated[tuple[str | ~re.Pattern[str], str] | list[tuple[str | ~re.Pattern[str], str]] | None, <otto.utils._Exclude object at 0x7c9c99ff98c0>] | None = None, timeout: ~types.Annotated[float | None, ~otto.utils.Opt(elem_type=None, name=None, help=Per-command/cumulative timeout (seconds).)] | None = None, log: ~otto.logger.mode.Annotated[~otto.logger.mode.LogMode, <otto.utils._Exclude object at 0x7c9c99ff98c0>] = LogMode.NORMAL, sudo: bool = False) RunResult

Execute one or more commands on the host via the persistent shell session.

The session is stateful: working directory changes (cd), exported environment variables, and other shell state persist between calls, just as they would in an interactive terminal.

Parameters:
  • cmds – A single command (str or ShellCommand) or a sequence of commands. Strings and ShellCommand objects may be mixed. For single-command calls, read the result via result.only (or result.statuses[0]).

  • expects – Default (pattern, response) pair(s) for interactive prompts. Accepts a single Expect tuple or a list of them. Each command inherits this value unless its own ShellCommand.expects is set.

  • timeout – For a single command, the per-command timeout. For a sequence, a cumulative timeout shared across all commands — each command receives the remaining budget; when exhausted, remaining commands are skipped with Status.Error. ShellCommand.timeout caps the per-command value but is still bounded by the remaining budget.

  • sudo – If True, each command is rewritten through _elevate before execution. Hosts that do not support elevation (e.g. embedded/RTOS) raise NotImplementedError — see _elevate.

Returns:

RunResult with the aggregate Status and a list of per-command CommandStatus entries.

See also

oneshot(): stateless, concurrent-safe alternative for one-off commands.

async oneshot(cmd: str, timeout: float | None = None, log: LogMode = LogMode.NORMAL) CommandStatus

Run a single command outside the persistent shell session. Subclasses must override.

async open_session(name: str) HostSession

Open a named auxiliary session on this host. Subclasses must override.

async send(text: str, log: LogMode = LogMode.NORMAL) None

Send raw text to the host’s persistent session. Subclasses must override.

async expect(pattern: str | Pattern[str], timeout: float = 30.0) str

Wait for pattern in the session output. Subclasses must override.

async get(src_files: list[Path] | Path, dest_dir: Path) tuple[Status, str]

Download files from the host to a local directory. Subclasses must override.

async put(src_files: list[Path] | Path, dest_dir: Path) tuple[Status, str]

Upload local files to a directory on the host. Subclasses must override.

async close() None

Close the persistent session and release held resources. Subclasses must override.

async stage() tuple[Status, str]

Stage every product onto this host (transfer/place, no install).

Iterates products in declaration order, returning the first non-ok (Status, str); an empty list is a successful no-op.

async install(stage_only: bool = False) tuple[Status, str]

Stage, then install every product.

Calls stage() first; returns early if stage_only is set or the stage step failed. Otherwise installs each product in declaration order, short-circuiting on the first failure. Projects may override for cross-product ordering/dependencies.

async uninstall() tuple[Status, str]

Uninstall every product (best-effort).

Attempts every product even if one fails, returning the first non-ok result seen (so cleanup is not abandoned halfway).

async is_installed() bool

Return True iff there is at least one product and all are installed.

An empty products list is not installed (avoids the vacuous-truth surprise of all([])).

async is_uninstalled() bool

Inverse of is_installed().

async power(state: Annotated[str | None, Arg(variadic=False, elem_type=None, name=None, help=None)] | None = None) tuple[Status, str]

Power this host 'on'/'off', or toggle when state is None.

Toggling reads the controller’s status(); if the controller can’t report state, pass an explicit state.

async reboot(hard: bool = False, wait: bool = False, timeout: float = 600.0) tuple[Status, str]

Reboot this host.

hard=False (default) issues the in-shell reboot command (_soft_reboot); hard=True power-cycles via the PowerController. When wait, block on wait_until_up() (up to timeout, default 10 minutes); if the host is still unreachable when timeout expires, the result is downgraded to Failed.

async shutdown() tuple[Status, str]

Power this host off from its own shell (distinct from external power('off')).

Per-family override; default raises.

async is_reachable(timeout: float = 10.0) bool

Whether this host answers a lightweight connection probe.

Per-family override; default raises (no generic probe).

async wait_until_up(timeout: float, interval: float = 2.0) bool

Poll is_reachable() until reachable or timeout. Returns success.

async wait_until_down(timeout: float, interval: float = 2.0) bool

Poll is_reachable() until not reachable or timeout.

start_repeat(name: str, cmds: list[str] | str, interval: timedelta, times: int = -1, duration: timedelta = datetime.timedelta(days=999999999, seconds=86399, microseconds=999999), until: datetime = datetime.datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=datetime.timezone.utc), on_result: Callable[[str, datetime, list[CommandStatus]], None] | None = None, max_history: int = 1000) None

Start a named background task that runs cmds repeatedly at interval.

Parameters:
  • name – Unique label for this repeat task. Raises RuntimeError if a task with the same name is already running.

  • cmds – Command string or list of command strings to run each cycle.

  • interval – Time between the start of successive runs.

  • times – Maximum number of cycles, or -1 for unlimited.

  • duration – Stop after this wall-clock duration, or timedelta.max for unlimited.

  • until – Stop at this UTC datetime, or the module sentinel for unlimited.

  • on_result – Optional callback invoked after each cycle with the task name, timestamp, and per-command statuses from that run.

  • max_history – Maximum number of past run results to retain (ring buffer).

async stop_repeat(name: str) None

Cancel and await the named background repeat task.

async stop_all_repeats() None

Cancel and await all running background repeat tasks.

repeat_results(name: str) list[tuple[datetime, list[CommandStatus]]]

Return the stored run history for the named repeat task.

Each entry is a (timestamp, statuses) pair recorded at the end of one cycle. At most max_history entries are kept (oldest discarded first, as set in start_repeat()).

class otto.host.host.HostFilter(name='')

Bases: Filter

Console-side suppress filter: drops QUIET/NEVER records and honors the global flag.

Attached to the console + console.log handlers only — verbose.log keeps the records (see management).

The per-host standing mode is now folded into each record’s log_mode via BaseHost._effective_log at the emit seam, so the filter decides purely on record.log_mode plus the global command-output flag.

filter(record: LogRecord) bool

Determine if the specified record is to be logged.

Returns True if the record should be logged, or False otherwise. If deemed appropriate, the record may be modified in-place.

class otto.host.host.SuppressCommandOutput(host: Host | None = None)

Bases: object

Suppress command/output logging for one host or globally.

On enter, the prior state is snapshotted; on exit it is restored. That makes nesting safe — an inner context cannot clobber an outer one — and makes concurrent per-host suppressions race-free, since each context only touches its own host’s log attribute.

The no-host (global) path mutates log_command_output on the active OttoContext when one is present. When no context is active the call is a no-op (there is nothing to suppress). Prefer the per-host form when suppressing work that runs concurrently.

host : Host | None = None

Host object to suppress. If not provided, all host output is affected.