MQTT Interface

class shc.interfaces.mqtt.MQTTClientInterface(hostname: str = 'localhost', port: int = 1883, username: Optional[str] = None, password: Optional[str] = None, client_id: Optional[str] = None, protocol: Union[ProtocolVersion, int] = ProtocolVersion.V311, auto_reconnect: bool = True, failsafe_start: bool = False)

SHC interface for connecting to MQTT brokers.

The interface allows to create subscribable and writable connector objects for subscribing and publishing to individual MQTT topics. Additionally it provides generic MQTT client functionality, that can be used by other interfaces/adapters for implementing special protocols over MQTT (e.g. TasmotaInterface for IoT devices running the Tasmota firmware).

Internally, the interface uses a asyncio_mqtt.Client object, which itself uses a paho.mqtt.client.Client. The connection options (hostname, port, username, password, client_id and protocol are passed to that client implementation. TLS encrypted connections are currently not supported.

By default, the interface features automatic reconnect on MQTT connection loss, but only if the initial connection attempt is successful. This behaviour can be customized using the auto_reconnect and failsafe_start parameters.

For interacting with “raw” MQTT topics, use one of the following methods for creating connector objects:

Parameters:
  • hostname – MQTT broker hostname or IP address

  • port – MQTT broker port

  • username – MQTT broker login username (no login is performed if None)

  • password – MQTT broker login password (no login is performed if None)

  • client_id – MQTT client name (if None, it is auto-generated by the Paho MQTT client)

  • protocol – MQTT protocol version (from asyncio_mqtt library), defaults to MQTT 3.11

  • auto_reconnect – If True (default), the client tries to reconnect automatically on connection errors with exponential backoff (1 * 1.25^n seconds sleep). Otherwise, the complete SHC system is shut down, when a connection error occurs.

  • failsafe_start – If True and auto_reconnect is True, the API client allows SHC to start up, even if the MQTT connection can not be established in the first try. The connection is retried in background with exponential backoff (see auto_reconnect option). Otherwise (default), the first connection attempt on startup is not retried and will shutdown the SHC application on failure, even if auto_reconnect is True.

topic_raw(topic: str, subscribe_topics: Optional[str] = None, qos: int = 0, retain: bool = False, force_mqtt_subscription: bool = False) RawMQTTTopicVariable

Create a connector for publishing and receiving MQTT messages to/form a single topic as raw bytes

Parameters:
  • topic – The MQTT topic to publish messages to

  • subscribe_topics – The topic filter to subscribe to. If None (default) it will be equal to topic. This can be used to subscribe to a wildcard topic filter instead of only the single publishing topic. Attention: subscribe_topics must always include the topic!

  • qos – The MQTT QoS for publishing messages to that topic (subscription QoS is currently always 0)

  • retain – The MQTT retain bit for publishing messages to that topic

  • force_mqtt_subscription – Force subscription to the MQTT topic(s) even if there are no SHC-internal Subscribers or logic handlers registered to this object. This is required for the write method to await the successful publishing of a published message to all clients. If there are local Subscribers or triggered logic handlers and force_mqtt_subscription is False, the client will not subscribe to the MQTT broker for this topic, so write() of this connector object will return immediately.

topic_string(topic: str, subscribe_topics: Optional[str] = None, qos: int = 0, retain: bool = False, force_mqtt_subscription: bool = False) StringMQTTTopicVariable

Create a connector for publishing and receiving MQTT messages to/form a single topic as UTF-8 encoded strings

Parameters:
  • topic – The MQTT topic to publish messages to

  • subscribe_topics – The topic filter to subscribe to. If None (default) it will be equal to topic. This can be used to subscribe to a wildcard topic filter instead of only the single publishing topic. Attention: subscribe_topics must always include the topic!

  • qos – The MQTT QoS for publishing messages to that topic (subscription QoS is currently always 0)

  • retain – The MQTT retain bit for publishing messages to that topic

  • force_mqtt_subscription – Force subscription to the MQTT topic(s) even if there are no SHC-internal Subscribers or logic handlers registered to this object. This is required for the write method to await the successful publishing of a published message to all clients. If there are local Subscribers or triggered logic handlers and force_mqtt_subscription is False, the client will not subscribe to the MQTT broker for this topic, so write() of this connector object will return immediately.

topic_json(type_: Type[T], topic: str, subscribe_topics: Optional[str] = None, qos: int = 0, retain: bool = False, force_mqtt_subscription: bool = False) JSONMQTTTopicVariable[T]

Create a connector for publishing and receiving arbitrary values as JSON-encoded MQTT messages

The connector uses SHC’s generic JSON encoding/decoding system from the shc.conversion module. It has built-in support for basic datatypes, typing.NamedTuple-based types and enum.Enum-based types. For handling a custom JSON message format, you should create a custom Python type and specify JSON encoder and decoder functions for it, using shc.conversion.register_json_conversion(). Additionally, you’ll want to add default type conversion functions from/to default types, using register_converter(), so that type conversion can happen on-the-fly between connected objects.

Parameters:
  • type – The connector’s value type (used for its .type attribute and for chosing the correct JSON decoding function)

  • topic – The MQTT topic to publish messages to

  • subscribe_topics – The topic filter to subscribe to. If None (default) it will be equal to topic. This can be used to subscribe to a wildcard topic filter instead of only the single publishing topic. Attention: subscribe_topics must always include the topic!

  • qos – The MQTT QoS for publishing messages to that topic (subscription QoS is currently always 0)

  • retain – The MQTT retain bit for publishing messages to that topic

  • force_mqtt_subscription – Force subscription to the MQTT topic(s) even if there are no SHC-internal Subscribers or logic handlers registered to this object. This is required for the write method to await the successful publishing of a published message to all clients. If there are local Subscribers or triggered logic handlers and force_mqtt_subscription is False, the client will not subscribe to the MQTT broker for this topic, so write() of this connector object will return immediately.

async publish_message(topic: str, payload: bytes, qos: int = 0, retain: bool = False) None

Generic coroutine for publishing an MQTT message to be used by higher-level interfaces

Parameters:
  • topic – The MQTT topic to send the message to

  • payload – The raw MQTT message payload

  • qos – The MQTT QoS value of the published message

  • retain – The MQTT retain bit for publishing

register_filtered_receiver(topic_filter: str, receiver: Callable[[MQTTMessage], None], subscription_qos: int = 0) None

Subscribe to an MQTT topic filter and register a callback function to be called when a message matching this filter is received.

Warning: If multiple receivers with intersecting topic filters are registered, the receiver behaviour for MQTT messages seems to be undefined. Each receiver will still only receive the correct messages, but QoS guarantees will probably not be hold. Esp., messages might be received multiple times.

Parameters:
  • topic_filter – An MQTT topic filter, possible containing wildcard characters. It is used for subscribing to these topics and for filtering the received messages.

  • receiver – The callback function to be called when a matching MQTT message is received. The function must have a single free positional argument for taking the MQTTMessage object.

  • subscription_qos – The subscription QoS