Smart Home Connect Base Concepts

Smart Home Connect does not bring a configuration language like the YAML-based configuration files of HomeAssistant or SmartHomeNG. It also does not include a graphical editor or web interface for configuring your smart devices and interconnecting or automating them. Instead, Smart Home Connect is a framework for building smart home applications in Python 3. It works as a library, providing the building blocks for your application, like interfaces for different communication protocols, timers to trigger your logic/automation functions and a web user interface with many components.

Connectable Objects

The core concept of building up a home automation application with SHC is connecting Connectable objects. A Connectable object is a Python object that produces, consumes or stores values of a certain type. It may implement one or more of the following traits (i.e. inherit from these classes):

  • Writable: The object can be updated with a new value via its Writable.write() method. It will use this value to update its internal state or send it to an external bus/interface/database/etc.

  • Readable: The object has a current value that can be retrieved via its Readable.read() method.

  • Subscribable: The object produces new values occasionally (e.g. received values from an external interface) and publishes them to its subscribers. A subscriber is a Writable object, that has been registered via the Subscribable object’s Subscribable.subscribe() method.

  • Reading: The object needs to read a value in certain situations. This may be an optional additional feature of the object (e.g. a KNX GroupAddress object answering to GroupRead telegrams) or mandatory for the object’s functionality (e.g. a web UI widget which needs to get the current value, when a new client connects). This is denoted by the is_reading_optional attribute of the object (class or instance attribute). In any case, the object tries to read the value of it’s default provider in such situations, which is a Readable object, registered via the Reading.set_provider() method.

Note

Not every Connectable object is Readable: A connector object for a bus address, for example, may send and receive values to/from the bus (i.e. it is Writable + Subscribable) but it does not have a “current value”, which can be read. To cache the latest received value and make it Readable, the object must be combined with a shc.Variable object.

The connect Method

Typically, you’ll want to connect two Connectable objects bidirectional, such that new values from one object are send/published/written to the other and vice versa. Thus, you’ll usually need two subscribe() calls and probably additional set_provider() calls. To shorten this procedure, every Connectable object provides the shc.base.Connectable.connect() method. It connects two Connectable objects by

  • subscribing each to the other if applicable (i.e. it is Writable and the other one is Subscribable) and

  • setting each as the other’s default provider if applicable (i.e. it is Readable and the other is Reading) and the other explicitly requires reading values (as specified by the reading_is_mandatory attribute).

The default behaviour can be customized via the send/receive arguments (for subscribing) resp. the provide/read arguments (for registering as default provider).

Example

Connecting a switch widget in the web UI to a KNX Group Address via a Variable object for caching the value manually would look like this:

import shc
knx_connection = shc.interfaces.knx.KNXConnector()
web_interface = shc.web.WebServer("localhost", 8080, "index")

variable = shc.Variable(bool, "variable's name")
knx_group_address = knx_connection.group(shc.interfaces.knx.KNXGAD(1, 2, 3), dpt="1")
variable.subscribe(knx_group_address)
knx_group_address.subscribe(variable)

switch_widget = shc.web.widgets.Switch("Switch Label")
variable.subscribe(switch_widget)
switch_widget.subscribe(variable)
switch_widget.set_provider(variable)

Using the connect() method, it can be shortened to:

import shc
knx_connection = shc.interfaces.knx.KNXConnector()
web_interface = shc.web.WebServer("localhost", 8080, "index")

variable = shc.Variable(bool, "variable's name")\
    .connect(knx_connection.group(shc.interfaces.knx.KNXGAD(1, 2, 3), dpt="1"))
switch_widget = shc.web.widgets.Switch("Switch Label")\
    .connect(variable)

Note

For most Connectable objects, the Reading functionality is an optional feature and not required for normal functionality. E.g., shc.Variable can read for initialization at startup, shc.interfaces.knx.KNXGroupVar can respond to GroupRead telegrams from the KNX bus, shc.interfaces.shc_client.WebApiClientObject can do client-to-server state synchronization at client startup, etc. Thus, the .connect() method does not set the default provider by default unless explicitly requested with the provide/read arguments. However, some Connectable objects require reading from a default provider for their normal functionality, such as all WebDisplayDatapoint-based web UI widgets for reading the current value when a new client connects. These classes typically have set is_reading_optional = False, so .connect() will set the default provider, unless not explicitly disabled.

The origin of a New Value

When updating an object’s value using the Writable.write() method, a second argument must be provided, called origin. It is expected to be a list of all objects that led to the update being performed, i.e. the “trace” of the update event, starting with its origin (typically an external event source/interface or a timer) and up to the object or function which now calls the write method. This list is used by Subscribable objects to avoid recursive feedback loops: When publishing a value, all subscribers which are already included in origin are automatically skipped, as they took part in causing this publishing event and probably will cause the same event again, closing the feedback loop.

When calling Writable.write() from within a logic handler, decorated with the shc.handler() decorator, the origin argument may omitted, since it is magically provided via a hidden environment variable. See section “Logic Handlers” below for more details.

Typing of Connectable Objects

Connectable objects are statically typed. This means, each object is supposed to handle (receive/publish/provide/read) only values of a defined Python type. The object’s type is indicated by its type attribute, which may be a class attribute (if the Connectable class specifies a fixed value type) or an instance attribute (for generic Connectable classes like shc.Variable, where each instance may handle values of a different type).

The instance-specific type of generic Connectable classes may either be given to each object explicitly (as an argument to its __init__ method as for shc.Variable) or derived from other properties of the object (like the KNX Datapoint Type of shc.interfaces.knx.KNXGroupVar objects).

When connecting two Connectable objects using Connectable.connect(), Subscribable.subscribe() or Reading.set_provider(), the consistency of the two objects’ type attributes are checked and a TypeError is raised if they don’t match. In many cases, you’ll still want to connect those objects and make sure, the value is adequately converted when being written to/read from the other object. For this purpose, the connect, subscribe and set_provider methods provide an optional argument convert.

If convert=True is specified, the shc.conversion module is searched for a default conversion function for the relevant value types (using shc.conversion.get_converter()). In case there is not default conversion for the relevant types or you want to convert values in a different way, subscribe, set_provider and connect allow to pass callables to the convert argument (e.g. a lambda function or function reference), which are used to convert the values exchanged via this particular subscription/Reading object. Since connect can establish a connection between two objects in both directions, its convert parameter takes a tuple of two callables: a.connect(b, convert=(a2b, b2a)), where a2b() is a function to convert a’s type to b’s type and b2a() a function for the other direction.

Example

This code will raise a TypeError:

var1 = shc.Variable(int)
var2 = shc.Variable(float)
var1.connect(var2)

This code will make sure new values from var1 are send to var2 after being converted to its .type and vice versa, using the trivial int→float resp. float→int conversions:

var1 = shc.Variable(int)
var2 = shc.Variable(float)
var1.connect(var2, convert=True)

This code will work as well, but use the ceil function for converting float values to int:

var1 = shc.Variable(int)
var2 = shc.Variable(float)
var1.subscribe(var2, convert=True)
var2.subscribe(var1, convert=lambda x: ceil(x))

We can shorten this by using the connect method:

var1 = shc.Variable(int)
var2 = shc.Variable(float)
var1.connect(var2, convert=(lambda x: x, lambda x: ceil(x))

Logic Handlers

A logic handler is a Python function which is executed (“triggered”) by a Subscribable object when it publishes a new value. To register a logic handler to be triggered by an object, use the object’s Subscribable.trigger() method. This method can either be used in a functional style (subscribable_object.trigger(my_logic_handler)) or as a decorator for the logic handler (as shown in the example below). For triggering logic handlers at defined times or in a fixed interval, you may use a Timer.

Since SHC is completely relying on asyncio for (pseudo-)concurrency, all methods dealing with runtime events (including reading and writing values) are defined as asynchronous coroutines. This also applies to logic handlers, which must be defined with async def accordingly.

When triggered, a logic handler is called with two parameters: The new value of the triggering object and the origin of the event, i.e. the list of objects publishing to/writing to/triggering one another, resulting in the logic handler being triggered.

For avoiding recursive feedback loops, the logic handler should skip execution when it (the function object) is contained in the origin list. It should also appended itself to a copy of the list and pass it to all calls of write and other logic handlers. To help with that, there’s the shc.handler() decorator. It automatically skips recursive execution and ensures passing of the correctly modified origin list to write calls via a hidden context variable.

Putting it all together, a logic handler may look as follows:

timer = shc.timer.Every(datetime.timedelta(minutes=5))
some_variable = shc.Variable(int)
some_knx_object = knx_interface.group(shc.interfaces.knx.KNXGAD(1, 2, 3), dpt="5")

@timer.trigger
@some_variable.trigger
@shc.handler()
async def my_logics(_value, _origin):
    """ Write value of `some_variable` to KNX bus every 5 minutes & when it changes, but only for values > 42 """
    # We cannot use the value provided, since it is not defined when triggered by the timer
    value = await some_variable.read()
    if value > 42:
        await some_knx_object.write(value)

Warning

Since logic handlers are executed as asynchronous coroutines in the same AsyncIO event loop (thread) as all the logic of SHC, they must not block the control flow. This means, any function call which may block the execution for more than fractions of a millisecond (e.g. file I/O, network I/O, other synchronous system calls or CPU-heavy calculations) must be turned into an asynchronous call, which is awaited—allowing the event loop to schedule other coroutines in the meantime. This can be achieved by either replacing the blocking call with a call of an async function (using an AsyncIO-compatible library) or executing the blocking code in a different Python thread and awaiting its result with an AsyncIO Future (e.g. using asyncio.loop.run_in_executor()).

For example, instead of writing:

@shc.handler()
async def my_logic_handler(value, _origin):
    # DO NOT DO THIS!!! All of the following lines are blocking!
    with open('/tmp/hello.txt', 'w') as f:
        f.write("Hello, World!")

    some_result = some_cpu_heavy_calculation(value)

… use:

import asyncio
import aiofile  # https://pypi.org/project/aiofile/

@shc.handler()
async def my_logic_handler(value, _origin):
    async with aiofile.AIOFile('/tmp/hello.txt', 'w') as f:
        await f.write("Hello, World!")

    loop = asyncio.get_event_loop()
    some_result = await loop.run_in_executor(None, some_cpu_heavy_calculation, value)

If you’re logic handler function does not need to interact with asynchronous functions (i.e. not read or write Connectables’ values or trigger other logic handlers), you may write it as a non-async function and use the shc.blocking_handler() decorator, which does the thread executor scheduling:

@shc.blocking_handler()
def my_blocking_logic_handler(value, _origin):
    with open('/tmp/hello.txt', 'w') as f:
        f.write("Hello, World!")

    some_result = some_cpu_heavy_calculation(value)

    # Unfortunately, no .write() or .read() possible here.

Tip

The shc.handler() and shc.blocking_handler() decorators take care of calling the logic handler function with the correct number of arguments: If you don’t need the origin list, you can simply omit the second parameter of your wrapped logic handler function:

@shc.handler()
async def my_value_only_handler(value):
    await some_variable.write(value + 3)

If you don’t need the value either, you can also omit this parameter. Hence, the logic handler from the first example above can be rewritten as:

@timer.trigger
@some_variable.trigger
@shc.handler()
async def my_logics():
    value = await some_variable.read()
    if value > 42:
        await some_knx_object.write(value)

shc.base Module Reference

class shc.base.Connectable
Variables:

type – The type of the values, this object is supposed to handle

connect(other: Connectable[S], send: Optional[bool] = None, receive: Optional[bool] = None, read: Optional[bool] = None, provide: Optional[bool] = None, convert: Union[bool, Tuple[Callable[[T], S], Callable[[S], T]]] = False) C

Subscribe self to other and set as default_provider and vice versa (depending on the two object’s capabilities, optionalities and given parameters).

Parameters:
  • other – The other Connectable object to connect with

  • send – Send value updates to other (i.e. subscribe other to self). Requires self to be Subscribable and other to be Writable. In this case, defaults to True if not specified.

  • receive – Receive value updates from other (i.e. subscribe self to other). Requires self to be Writable and other to be Subscribable. In this case, defaults to True if not specified.

  • read – Read values from other (i.e. set other as default provider for self). Requires self to be Reading and other to be Readable. If not specified, defaults to False unless self.is_reading_optional is set to False.

  • provide – Provide values to other for reading (i.e. set self as default provider for other). Requires self to be Readable and other to be Reading. If not specified, defaults to False unless other.is_reading_optional is set to False.

  • convert – Enable built-in type conversion for the created subscriptions/default_providers: Either a boolean or a tuple of two conversion functions. Defaults to False, i.e. a type mismatch will result in a TypeError instead of implicit conversion. Set to True, to choose the appropriate conversion function automatically. Raises a TypeError if no default conversion for one of the two directions is available and that direction is used. For custom conversion functions, pass a tuple (to_other, to_self), where to_other is a (non-async) callable for converting self’s value type into other’s value type and to_self does the conversion the other way round.

Returns:

Returns self to allow functional-style chaining

class shc.base.Writable
async write(value: T_con, origin: Optional[List[Any]] = None) None

Asynchronous coroutine to update the object with a new value

This method calls _write() internally for the actual implementation-specific update logic. Inheriting classes should override _write instead of this method to keep profiting from the value type checking and magic context-based origin passing features.

This method awaits the complete transmission and processing of the new value by the next stateful object/system, e.g. storing and re-publishing the value on a Variable, publishing the value from an MQTT broker on an MQTT client connector, forwarding the value to all subscribers on an Expression.

Parameters:
  • value – The new value

  • origin – The origin / trace of the value update event, i.e. the list of objects/functions which have been publishing to/calling one another to cause this value update. It is used to avoid recursive feedback loops and may be used for origin-specific handling of the value. The last entry of the list should be the object/function calling this method.

Raises:

TypeError – If the value’s type does not match the the object’s type attribute.

abstract async _write(value: T_con, origin: List[Any]) None

Abstract internal method containing the actual implementation-specific write-logic.

It must be overridden by classes inheriting from Writable to be updated with new values. The _write implementation does not need to check the new value’s type.

Please make sure that your _write implementation awaits the processing of the value update by the next stateful object/system before returning:

  • On a Variable and similar stateful re-publishing objects, which use _stateful_publishing = True, await the storing and _publish()-ing of the new value

  • On an Expression and similar stateless re-publishing objects, use _publish_and_wait() instead of _publish() and await its return

  • On external interface connectors (e.g. MQTT connector) await (at least) the successful processing of the value by the external system

This is required to make the state inconsistency mitigation method of Subscribable objects work.

Parameters:
  • value – The new value to update this object with

  • origin – The origin / trace of the value update event. Should be passed to Subscribable._publish() if the implementing class is Subscribable and re-publishes new values.

class shc.base.Readable
abstract async read() T_co

Get the current value of this Connectable object.

Raises:

UninitializedError – If the value is not (yet) specified

class shc.base.Subscribable(*args, **kwargs)
subscribe(subscriber: Writable[S], convert: Union[Callable[[T_co], S], bool] = False) None

Subscribe a writable object to this object to be updated, when this object publishes a new value.

The subscriber’s Writable.write() method will be called for any new value published by this object, as long as the subscriber did not lead to the relevant update of this object (i.e. is not included in the origin list). The origin list passed to the subscriber’s write method will contain this object as the last entry.

Parameters:
  • subscriber – The object to subscribe for updates

  • convert – A callable to convert this object’s new value to the data type of the subscriber or True to choose the appropriate conversion function automatically.

Raises:

TypeError – If the type of the subscriber does not match this object’s type and convert is False or if convert is True but no type conversion is known to convert this object’s type into the subscriber’s type.

trigger(target: Callable[[T, List[Any]], Awaitable[None]], synchronous: bool = False) Callable[[T, List[Any]], Awaitable[None]]

Register a logic handler function to be triggered when this object is updated.

This method can be used as a decorator for custom logic handler functions. Alternatively, it can simply called with a function object: some_subscribable.trigger(some_handler_function).

The target function must be an async coroutine that takes two arguments: The new value of this object and the origin/trace of the event (a list of objects that led to the handler being tiggered). The handler function must make sure to prevent infinite recursive feedback loops: In contrast to subscribed objects, logic handler functions are also triggered, if they led to the object being updated (i.e. they are already conained in the origin list). Thus, they should skip execution if called recursively. It should also append itself to the origin list and pass the extended list to all Writable.write() calls it does.

To ensure all this for a custom handler function, use the handler() decorator:

@some_subscribable.trigger
@handler()
async def some_handler_function(value, origin):
    ...
    some_writable.write(value + 1)
    ...

You may even use multiple trigger decorators:

@some_subscribable.trigger
@another_subscribable.trigger
@handler()
async def some_handler_function(value, origin):
    if origin[-1] is some_subscribable:
        ...
    else:
        ...
Parameters:
  • target – The handler function/coroutine to be triggered on updates. Must comply with the requirements mentioned above.

  • synchronous – If True, the target coroutine is triggered synchronously, i.e. _publish_and_wait() will wait for it to return. This SHALL be used for internal triggering methods that re-publish the value updates of this Subscribable object. For logic handlers that might require substantial time to return, it SHALL not be used!

Returns:

The target function (unchanged) – this allows decorator-style usage of this method

_publish(value: T_co, origin: List[Any])

Method to publish a new value to all subscribers and trigger all registered logic handlers.

All logic handlers and Writable.write() methods are called in parallel asyncio tasks, which are not awaited to return. If _stateful_publishing is True on this object (or class), it will keep track of the currently pending publishing tasks. This information is then used to mitigate state inconsistencies, caused by value updates crossing over each other, by resetting the origin of following value updates to the respective subscribers (with pending tasks). This disables the usual loop prevention and causes the value update to travel back its originating path. Consequently, the update will override the intermediate conflicting value in all places.

Additionally, these updates with resetted origin are delayed for a random delay up to MAX_CONCURRENT_UPDATE_DELAY seconds (and skipped if overtaken by another update in that time) to avoid endless update loops of two competing value updates. To avoid unnecessary value updates flying around, we limit the origin resetting to value updates that reached this object via another path as the pending updates to the subscriber and value updates that already result from republishing with resetted origin. Other updates should not be prone to conflicts due to the ordered asyncio task queue.

_stateful_publishing should be enabled for all objects that are Subscribable and Writable (i.e. allows sending and receiving value updates) and have either internal state (like Variables) or represent an external, stateful (or quasi-stateful) system (like a Connector object for a KNX group address/an MQTT topic). On the other hand, objects, that are only Subscribable, should not use _stateful_publishing.

If the object only forwards value updates without storing a state, it should await the successful publishing of the value by using the _publish_and_wait() coroutine.

Parameters:
  • value – The new value to be published by this object. Must be an instance of this object’s type.

  • origin – The origin list of the new value, excluding this object. See The origin of a New Value for more details.`self` is appended automatically before calling the registered subscribers and logic handlers.

class shc.base.Reading(*args, **kwargs)
set_provider(provider: Readable[S], convert: Union[Callable[[S], T_con], bool] = False)
async _from_provider() Optional[T_con]

Private method to be used by inheriting classes to read the current value from this object’s default provider (via its Readable.read() method) and convert it to this object’s type, if necessary, using the registered converter function.

Returns:

The default provider’s current value or None if no default provider is set or it’s read method raises an UninitializedError.

@shc.handler(reset_origin=False, allow_recursion=False) Callable[[Union[Callable[[T, List[Any]], Awaitable[None]], Callable[[T], Awaitable[None]], Callable[[], Awaitable[None]]]], Callable[[T, List[Any]], Awaitable[None]]]

Decorator for custom logic handler functions.

Wraps a custom logic handler functions to make sure it is suited to be registered for triggering by a subscribable object with Subscribable.trigger(). It makes sure that

  • exceptions, occurring execution, are logged,

  • the origin is extended with the logic handler itself and magically passed to all Writable.write() calls

  • the origin can magically be passed when called directly by other logic handlers

  • the execution is skipped when called recursively (i.e. the logic handler is already contained in the origin list

It also allows to define the logic handler function with different numbers of parameters: If the function takes two parameters, the trigger value and the origin are passed. If the function takes one parameter, only the value is passed. If the function takes no parameters, it is called without arguments.

Parameters:
  • reset_origin – If True, the origin which is magically passed to all write calls, only contains the logic handler itself, not the previous origin list, which led to the handler’s execution. This can be used to change an object’s value, which triggered this logic handler. This may cause infinite recursive feedback loops, so use with care!

  • allow_recursion – If True, recursive execution of the handler is not skipped. The handler must check the passed values and/or the origin list itself to prevent infinite feedback loops via write calls or calls to other logic handlers – especiaally when used together with reset_origin.

@shc.blocking_handler Callable[[Union[Callable[[T, List[Any]], None], Callable[[T], None], Callable[[], None]]], Callable[[T, List[Any]], Awaitable[None]]]

Decorator for custom blocking (non-async) logic handler functions.

Wraps a function to transform it into an async logic handler function, which is suited to be registered for triggering by a subscribable object with Subscribable.trigger(). The wrapped function is executed in a separate thread, using asyncio’s run_in_executor().

Like handler(), this decorator catches and logs errors and ensures that the origin can magically be passed when called directly by other logic handlers. However, since the wrapped function is not an asynchronous coroutine, it is not able to call Writable.write() or another logic handler directly. Thus, this decorator does not include special measures for preparing and passing the origin list or avoiding recursive execution. Still, it takes care of the correct number of arguments (zero to two) for calling the function.