Creating Custom Interfaces

TODO

Interface Base and Helper Classes Reference

class shc.supervisor.AbstractInterface

Abstract base class for all SHC interface implementations

An interface is an object that is ‘started’ at SHC startup. This is typically used to run interfaces to the outside world, using network connections (or serial device connections) and background asyncio Tasks for handling incoming messages and forwarding them to Subscribable objects.

If an interface inherits from this base class, it is automatically registered for startup via main().

monitoring_connector() Readable[InterfaceStatus]

Get a connector which represents the current status of this interface for monitoring purposes.

The returned connector object must be of value type InterfaceStatus and be at least readable. The connector may also be subscribable. In this case, the interface is expected to monitor its status continuously and publish any status changes asynchronously. This is typically implemented by client interfaces that actively monitor a connection to some server. Reading from the connector shall also return the current interface status.

In the case of a readable-only connector, the interface will typically perform its health checks on-demand when the connector’s read() method is called.

Concrete interface implementation should override this method to provide their own monitoring connector implementation. There are different more specific interface base classes that to help with that:

  • ReadableStatusInterface is a simple helper for implementing a readable monitoring connector, calling an async health check method on the interface on demand

  • SubscribableStatusInterface is a simple helper for implementing a subscribable monitoring connector, which can be updated when a status change is detected

  • SupervisedClientInterface implements logic for supervision, error handling and automatic reconnect of connection clients. It provides a subscribable monitoring connector.

If the interface does not allow any reasonable status monitoring (e.g. when it’s completely stateless or failsafe or implemented to shut down the overall SHC process on error), the default implementation of this method can be used, which raises a NotImplementedError.

Raises:

NotImplementedError – when the interface does not provide any health monitoring

abstract async start() None

This coroutine is called once on each interface when the SHC application is started via main().

It may be used to initialize network connections and start background Tasks for message handling and event generation. The start coroutines of all interfaces are started in parallel. Timers and Variable’s are only initialized when all start coroutines have returned successfully.

The method should await the completion of the interface startup, i.e. only return when the interface is fully functional. In case of an error, this method may raise and exception or call interface_failure() to terminate the SHC application.

abstract async stop() None

This coroutine is called once on each interface to shut down the SHC application.

This may happen when a SIGTERM (or similar) is received or when a critical error in an interface is reported (via interface_failure() or an exception raised by an interface’s start() method). Thus, the stop method may even be called while start() is still running. It shall be able to handle that situation and stop the half-started interface.

In any case, the stop method shall cause the termination of all pending background task of the interface within a few seconds and only return when all tasks have terminated.

async shc.supervisor.interface_failure(interface_name: str = 'n/a') None

Shut down the SHC application due to a critical error in an interface

This coroutine shall be called from an interface’s background Task on a critical failure. It will shut down the SHC system gracefully and lets the Python process return with exit code 1.

Parameters:

interface_name – String identifying the interface which caused the shutdown.

class shc.interfaces._helper.ReadableStatusInterface

Abstract base class for interfaces that provide a readable monitoring connector for an Interface.

This mixin overrides the shc.supervisor.AbstractInterface.monitoring_connector() method to provide a readable connector that triggers this interfaces health checks when being read. The health checks must be implemented for each interface implementation by overriding the _get_status() method.

abstract async _get_status() InterfaceStatus

Determine the current status of the interface for monitoring purposes.

class shc.interfaces._helper.SubscribableStatusInterface

Abstract base class for interfaces that provide a readable and subscribable monitoring connector for an Interface.

This mixin overrides the shc.supervisor.AbstractInterface.monitoring_connector() method to provide a connector that allows to publish status changes of the interface. It also stores the latest status to provide to read callers.

Each interface, inheriting this mixin, should monitor its health status continuously and update the status connector

Variables:

_status_connector – The interface’s monitoring connector

class shc.interfaces._helper.SubscribableStatusConnector
update_status(status: Optional[ServiceStatus] = None, message: Optional[str] = None) None

Method to be called by the interface when its monitored status changes.

The changed status is published to subscribers and stored in this connector to be returned on subsequent read calls.

To simplify managing the interface status, this method does not take a full InterfaceStatus tuple of the new status, but instead allows to modify the individual fields of the previous status.

Parameters:
  • status – The new overall health status of the interface or None (default) to leave it unchanged

  • message – The new status message of the interface. If the interface status is OK, it should be “” (empty string). To keep the previous message, pass None or omit this parameter.

class shc.interfaces._helper.SupervisedClientInterface(auto_reconnect: bool = True, failsafe_start: bool = False)

Abstract base class for client interfaces, providing run task supervision and automatic reconnects

This class can be used as a base class for client interface implementations to simplify error handling and automatic reconnection attempts. The core of its functionality is the _supervise Task. It is started at interface start() and runs until shutdown of the interface. It starts and supervises the implementation-specific _run() Task, which typically contains a loop for handling incoming messages. In addition, it calls _connect() before and _subscribe() after starting the run task. If the run task exits unexpectedly and auto_reconnect is enabled, reconnecting the interface via _connect, _run and _subscribe is attempted. For shutting down the interface (and stopping the run task in case of a subscribe error), _disconnect must be implemented in such a way, that it shuts down the run task.

This class inherits from SubscribableStatusInterface to publish the current status of the supervised client as monitoring status. However, it only updates the status and message attributes of the InterfaceStatus, but does not touch the metrics. So, you can fill the metrics with custom values from your derived interface class via self._status_connector.update_status(metrics={…}).

__init__(auto_reconnect: bool = True, failsafe_start: bool = False)
Parameters:
  • auto_reconnect – If True (default), the supervisor tries to reconnect the interface automatically with exponential backoff (backoff_base * backoff_exponent ^ n seconds sleep), when _run exits unexpectedly or any of _connect, _run or _subscribe raise an exception. Otherwise, the complete SHC system is shut down on connection errors.

  • failsafe_start – If True and auto_reconnect is True, the interface allows SHC to start up, even if the _connect or _subscribe fails in the first try. The connection is retried in background with exponential backoff (see auto_reconnect option). Otherwise (default), the first connection attempt on startup is not retried and will raise an exception from start() on failure, even if auto_reconnect is True.

abstract async _connect() None

This coroutine is run to connect the client.

This will happen at start up and after any error (if auto_reconnect is enabled). In case of an error, no disconnect is attempted before calling _connect. Thus, this coroutine should be able to handle any connection state (not yet connected, not connected due to failed _connect attempt, broken connection, open connection with failed _subscribe call).

This method is called before starting the _run() task.

abstract async _run() None

Entrypoint for the run task, which handles messages etc. while the connection is active.

This coroutine is started in a separate task after _connect() has completed sucessfully. As soon as it is ready, it must set the _running event. Only then the _subscribe() method is called and the startup of the interface is reported as finished.

The _run coroutine should be stoppable by calling _disconnect() (i.e. it should return or raise an exception when _disconnect is called). If in doubt, just add a new asyncio.Event and use asyncio.wait() with the event’s .wait() method and your original future in all places, where you need to await a future. In addition, _run should return or raise an exception when a client error occurs, in order to trigger a reconnect attempt.

abstract async _subscribe() None

This coroutine is called after connecting the client and starting the _run() task.

It can be used to subscribe to topics, send initialization messages, etc. It will be called again after a reconnect, when an error occurs and auto_reconnect is enabled.

abstract async _disconnect() None

This coroutine is called to disconnect the client and stop the _run task.

This may happen either to shut down the interface (when stop() is called by the supervisor) or when an error occurs during _subscribe() or when any error occurred and no auto_reconnect is attempted. Thus, disconnect should be able to shut down the client in a failed state as well. It should also be idempotent, i.e. allow to be called multiple times without reconnect. This method should not raise Exceptions but instead try its best to shut down the interface.

Calling this coroutine must somehow stop the run task. I.e. _run() should return or raise an exception shortly afterwards.

async wait_running(timeout: Optional[float] = None) None

Wait for the interface to be running.

Attention: This must be called after start() has initially been called (not neccessarily after it has returned).

Parameters:

timeout – If given, this method will raise an asyncio.TimeoutError after the given timeout in seconds, if the interface has not come up by this time.