Smart Home Connect Base Concepts
Smart Home Connect does not bring a configuration language like the YAML-based configuration files of HomeAssistant or SmartHomeNG. It also does not include a graphical editor or web interface for configuring your smart devices and interconnecting or automating them. Instead, Smart Home Connect is a framework for building smart home applications in Python 3. It works as a library, providing the building blocks for your application, like interfaces for different communication protocols, timers to trigger your logic/automation functions and a web user interface with many components.
Connectable Objects
The core concept of building up a home automation application with SHC is connecting Connectable objects. A Connectable object is a Python object that produces, consumes or stores values of a certain type. It may implement one or more of the following traits (i.e. inherit from these classes):
Writable
: The object can be updated with a new value via itsWritable.write()
method. It will use this value to update its internal state or send it to an external bus/interface/database/etc.Readable
: The object has a current value that can be retrieved via itsReadable.read()
method.Subscribable
: The object produces new values occasionally (e.g. received values from an external interface) and publishes them to its subscribers. A subscriber is a Writable object, that has been registered via the Subscribable object’sSubscribable.subscribe()
method.Reading
: The object needs to read a value in certain situations. This may be an optional additional feature of the object (e.g. a KNX GroupAddress object answering to GroupRead telegrams) or mandatory for the object’s functionality (e.g. a web UI widget which needs to get the current value, when a new client connects). This is denoted by the is_reading_optional attribute of the object (class or instance attribute). In any case, the object tries to read the value of it’s default provider in such situations, which is a Readable object, registered via theReading.set_provider()
method.
Note
Not every Connectable object is Readable:
A connector object for a bus address, for example, may send and receive values to/from the bus (i.e. it is Writable + Subscribable) but it does not have a “current value”, which can be read.
To cache the latest received value and make it Readable, the object must be combined with a shc.Variable
object.
The connect
Method
Typically, you’ll want to connect two Connectable objects bidirectional, such that new values from one object are send/published/written to the other and vice versa.
Thus, you’ll usually need two subscribe()
calls and probably additional set_provider()
calls.
To shorten this procedure, every Connectable object provides the shc.base.Connectable.connect()
method.
It connects two Connectable objects by
subscribing each to the other if applicable (i.e. it is Writable and the other one is Subscribable) and
setting each as the other’s default provider if applicable (i.e. it is Readable and the other is Reading) and the other explicitly requires reading values (as specified by the reading_is_mandatory attribute).
The default behaviour can be customized via the send
/receive
arguments (for subscribing) resp. the provide
/read
arguments (for registering as default provider).
Example
Connecting a switch widget in the web UI to a KNX Group Address via a Variable object for caching the value manually would look like this:
import shc
knx_connection = shc.interfaces.knx.KNXConnector()
web_interface = shc.web.WebServer("localhost", 8080, "index")
variable = shc.Variable(bool, "variable's name")
knx_group_address = knx_connection.group(shc.interfaces.knx.KNXGAD(1, 2, 3), dpt="1")
variable.subscribe(knx_group_address)
knx_group_address.subscribe(variable)
switch_widget = shc.web.widgets.Switch("Switch Label")
variable.subscribe(switch_widget)
switch_widget.subscribe(variable)
switch_widget.set_provider(variable)
Using the connect()
method, it can be shortened to:
import shc
knx_connection = shc.interfaces.knx.KNXConnector()
web_interface = shc.web.WebServer("localhost", 8080, "index")
variable = shc.Variable(bool, "variable's name")\
.connect(knx_connection.group(shc.interfaces.knx.KNXGAD(1, 2, 3), dpt="1"))
switch_widget = shc.web.widgets.Switch("Switch Label")\
.connect(variable)
Note
For most Connectable objects, the Reading functionality is an optional feature and not required for normal functionality.
E.g., shc.Variable
can read for initialization at startup, shc.interfaces.knx.KNXGroupVar
can respond to GroupRead telegrams from the KNX bus, shc.interfaces.shc_client.WebApiClientObject
can do client-to-server state synchronization at client startup, etc.
Thus, the .connect()
method does not set the default provider by default unless explicitly requested with the provide
/read
arguments.
However, some Connectable objects require reading from a default provider for their normal functionality, such as all WebDisplayDatapoint
-based web UI widgets for reading the current value when a new client connects.
These classes typically have set is_reading_optional = False
, so .connect()
will set the default provider, unless not explicitly disabled.
The origin
of a New Value
When updating an object’s value using the Writable.write()
method, a second argument must be provided, called origin
.
It is expected to be a list of all objects that led to the update being performed, i.e. the “trace” of the update event, starting with its origin (typically an external event source/interface or a timer) and up to the object or function which now calls the write method.
This list is used by Subscribable objects to avoid recursive feedback loops:
When publishing a value, all subscribers which are already included in origin
are automatically skipped, as they took part in causing this publishing event and probably will cause the same event again, closing the feedback loop.
When calling Writable.write()
from within a logic handler, decorated with the shc.handler()
decorator, the origin
argument may omitted, since it is magically provided via a hidden environment variable.
See section “Logic Handlers” below for more details.
Typing of Connectable Objects
Connectable objects are statically typed.
This means, each object is supposed to handle (receive/publish/provide/read) only values of a defined Python type.
The object’s type is indicated by its type
attribute, which may be a class attribute (if the Connectable class specifies a fixed value type) or an instance attribute (for generic Connectable classes like shc.Variable
, where each instance may handle values of a different type).
The instance-specific type
of generic Connectable classes may either be given to each object explicitly (as an argument to its __init__ method as for shc.Variable
) or derived from other properties of the object (like the KNX Datapoint Type of shc.interfaces.knx.KNXGroupVar
objects).
When connecting two Connectable objects using Connectable.connect()
, Subscribable.subscribe()
or Reading.set_provider()
, the consistency of the two objects’ type
attributes are checked and a TypeError is raised if they don’t match.
In many cases, you’ll still want to connect those objects and make sure, the value is adequately converted when being written to/read from the other object.
For this purpose, the connect, subscribe and set_provider methods provide an optional argument convert
.
If convert=True
is specified, the shc.conversion
module is searched for a default conversion function for the relevant value types (using shc.conversion.get_converter()
).
In case there is not default conversion for the relevant types or you want to convert values in a different way, subscribe, set_provider and connect allow to pass callables to the convert
argument (e.g. a lambda function or function reference), which are used to convert the values exchanged via this particular subscription/Reading object.
Since connect can establish a connection between two objects in both directions, its convert
parameter takes a tuple of two callables: a.connect(b, convert=(a2b, b2a))
, where a2b()
is a function to convert a’s type to b’s type and b2a()
a function for the other direction.
Example
This code will raise a TypeError:
var1 = shc.Variable(int)
var2 = shc.Variable(float)
var1.connect(var2)
This code will make sure new values from var1
are send to var2
after being converted to its .type
and vice versa, using the trivial int→float resp. float→int conversions:
var1 = shc.Variable(int)
var2 = shc.Variable(float)
var1.connect(var2, convert=True)
This code will work as well, but use the ceil function for converting float values to int:
var1 = shc.Variable(int)
var2 = shc.Variable(float)
var1.subscribe(var2, convert=True)
var2.subscribe(var1, convert=lambda x: ceil(x))
We can shorten this by using the connect
method:
var1 = shc.Variable(int)
var2 = shc.Variable(float)
var1.connect(var2, convert=(lambda x: x, lambda x: ceil(x))
Logic Handlers
A logic handler is a Python function which is executed (“triggered”) by a Subscribable object when it publishes a new value.
To register a logic handler to be triggered by an object, use the object’s Subscribable.trigger()
method.
This method can either be used in a functional style (subscribable_object.trigger(my_logic_handler)
) or as a decorator for the logic handler (as shown in the example below).
For triggering logic handlers at defined times or in a fixed interval, you may use a Timer.
Since SHC is completely relying on asyncio for (pseudo-)concurrency, all methods dealing with runtime events (including reading and writing values) are defined as asynchronous coroutines.
This also applies to logic handlers, which must be defined with async def
accordingly.
When triggered, a logic handler is called with two parameters: The new value of the triggering object and the origin of the event, i.e. the list of objects publishing to/writing to/triggering one another, resulting in the logic handler being triggered.
For avoiding recursive feedback loops, the logic handler should skip execution when it (the function object) is contained in the origin list.
It should also appended itself to a copy of the list and pass it to all calls of write and other logic handlers.
To help with that, there’s the shc.handler()
decorator.
It automatically skips recursive execution and ensures passing of the correctly modified origin
list to write calls via a hidden context variable.
Putting it all together, a logic handler may look as follows:
timer = shc.timer.Every(datetime.timedelta(minutes=5))
some_variable = shc.Variable(int)
some_knx_object = knx_interface.group(shc.interfaces.knx.KNXGAD(1, 2, 3), dpt="5")
@timer.trigger
@some_variable.trigger
@shc.handler()
async def my_logics(_value, _origin):
""" Write value of `some_variable` to KNX bus every 5 minutes & when it changes, but only for values > 42 """
# We cannot use the value provided, since it is not defined when triggered by the timer
value = await some_variable.read()
if value > 42:
await some_knx_object.write(value)
Warning
Since logic handlers are executed as asynchronous coroutines in the same AsyncIO event loop (thread) as all the logic of SHC, they must not block the control flow.
This means, any function call which may block the execution for more than fractions of a millisecond (e.g. file I/O, network I/O, other synchronous system calls or CPU-heavy calculations) must be turned into an asynchronous call, which is awaited—allowing the event loop to schedule other coroutines in the meantime.
This can be achieved by either replacing the blocking call with a call of an async function (using an AsyncIO-compatible library) or executing the blocking code in a different Python thread and awaiting its result with an AsyncIO Future (e.g. using asyncio.loop.run_in_executor()
).
For example, instead of writing:
@shc.handler()
async def my_logic_handler(value, _origin):
# DO NOT DO THIS!!! All of the following lines are blocking!
with open('/tmp/hello.txt', 'w') as f:
f.write("Hello, World!")
some_result = some_cpu_heavy_calculation(value)
… use:
import asyncio
import aiofile # https://pypi.org/project/aiofile/
@shc.handler()
async def my_logic_handler(value, _origin):
async with aiofile.AIOFile('/tmp/hello.txt', 'w') as f:
await f.write("Hello, World!")
loop = asyncio.get_event_loop()
some_result = await loop.run_in_executor(None, some_cpu_heavy_calculation, value)
If you’re logic handler function does not need to interact with asynchronous functions (i.e. not read or write Connectables’ values or trigger other logic handlers), you may write it as a non-async function and use the shc.blocking_handler()
decorator, which does the thread executor scheduling:
@shc.blocking_handler()
def my_blocking_logic_handler(value, _origin):
with open('/tmp/hello.txt', 'w') as f:
f.write("Hello, World!")
some_result = some_cpu_heavy_calculation(value)
# Unfortunately, no .write() or .read() possible here.
Tip
The shc.handler()
and shc.blocking_handler()
decorators take care of calling the logic handler function with the correct number of arguments:
If you don’t need the origin
list, you can simply omit the second parameter of your wrapped logic handler function:
@shc.handler()
async def my_value_only_handler(value):
await some_variable.write(value + 3)
If you don’t need the value
either, you can also omit this parameter.
Hence, the logic handler from the first example above can be rewritten as:
@timer.trigger
@some_variable.trigger
@shc.handler()
async def my_logics():
value = await some_variable.read()
if value > 42:
await some_knx_object.write(value)
shc.base
Module Reference
- class shc.base.Connectable
- Variables:
type – The type of the values, this object is supposed to handle
- connect(other: Connectable[S], send: Optional[bool] = None, receive: Optional[bool] = None, read: Optional[bool] = None, provide: Optional[bool] = None, convert: Union[bool, Tuple[Callable[[T], S], Callable[[S], T]]] = False) C
Subscribe self to other and set as default_provider and vice versa (depending on the two object’s capabilities, optionalities and given parameters).
- Parameters:
other – The other Connectable object to connect with
send – Send value updates to other (i.e. subscribe other to self). Requires self to be
Subscribable
and other to beWritable
. In this case, defaults to True if not specified.receive – Receive value updates from other (i.e. subscribe self to other). Requires self to be
Writable
and other to beSubscribable
. In this case, defaults to True if not specified.read – Read values from other (i.e. set other as default provider for self). Requires self to be
Reading
and other to beReadable
. If not specified, defaults to False unless self.is_reading_optional is set to False.provide – Provide values to other for reading (i.e. set self as default provider for other). Requires self to be
Readable
and other to beReading
. If not specified, defaults to False unless other.is_reading_optional is set to False.convert – Enable built-in type conversion for the created subscriptions/default_providers: Either a boolean or a tuple of two conversion functions. Defaults to
False
, i.e. a type mismatch will result in a TypeError instead of implicit conversion. Set toTrue
, to choose the appropriate conversion function automatically. Raises a TypeError if no default conversion for one of the two directions is available and that direction is used. For custom conversion functions, pass a tuple(to_other, to_self)
, whereto_other
is a (non-async) callable for converting self’s value type into other’s value type andto_self
does the conversion the other way round.
- Returns:
Returns self to allow functional-style chaining
- class shc.base.Writable
- async write(value: T_con, origin: Optional[List[Any]] = None) None
Asynchronous coroutine to update the object with a new value
This method calls
_write()
internally for the actual implementation-specific update logic. Inheriting classes should override _write instead of this method to keep profiting from the value type checking and magic context-based origin passing features.This method awaits the complete transmission and processing of the new value by the next stateful object/system, e.g. storing and re-publishing the value on a Variable, publishing the value from an MQTT broker on an MQTT client connector, forwarding the value to all subscribers on an Expression.
- Parameters:
value – The new value
origin – The origin / trace of the value update event, i.e. the list of objects/functions which have been publishing to/calling one another to cause this value update. It is used to avoid recursive feedback loops and may be used for origin-specific handling of the value. The last entry of the list should be the object/function calling this method.
- Raises:
TypeError – If the value’s type does not match the the object’s
type
attribute.
- abstract async _write(value: T_con, origin: List[Any]) None
Abstract internal method containing the actual implementation-specific write-logic.
It must be overridden by classes inheriting from
Writable
to be updated with new values. The _write implementation does not need to check the new value’s type.Please make sure that your _write implementation awaits the processing of the value update by the next stateful object/system before returning:
On a Variable and similar stateful re-publishing objects, which use _stateful_publishing = True, await the storing and _publish()-ing of the new value
On an Expression and similar stateless re-publishing objects, use _publish_and_wait() instead of _publish() and await its return
On external interface connectors (e.g. MQTT connector) await (at least) the successful processing of the value by the external system
This is required to make the state inconsistency mitigation method of
Subscribable
objects work.- Parameters:
value – The new value to update this object with
origin – The origin / trace of the value update event. Should be passed to
Subscribable._publish()
if the implementing class is Subscribable and re-publishes new values.
- class shc.base.Readable
- abstract async read() T_co
Get the current value of this Connectable object.
- Raises:
UninitializedError – If the value is not (yet) specified
- class shc.base.Subscribable(*args, **kwargs)
- subscribe(subscriber: Writable[S], convert: Union[Callable[[T_co], S], bool] = False) None
Subscribe a writable object to this object to be updated, when this object publishes a new value.
The subscriber’s
Writable.write()
method will be called for any new value published by this object, as long as the subscriber did not lead to the relevant update of this object (i.e. is not included in the origin list). The origin list passed to the subscriber’s write method will contain this object as the last entry.- Parameters:
subscriber – The object to subscribe for updates
convert – A callable to convert this object’s new value to the data
type
of the subscriber orTrue
to choose the appropriate conversion function automatically.
- Raises:
TypeError – If the type of the subscriber does not match this object’s type and
convert
is False or ifconvert
is True but no type conversion is known to convert this object’s type into the subscriber’s type.
- trigger(target: Callable[[T, List[Any]], Awaitable[None]], synchronous: bool = False) Callable[[T, List[Any]], Awaitable[None]]
Register a logic handler function to be triggered when this object is updated.
This method can be used as a decorator for custom logic handler functions. Alternatively, it can simply called with a function object:
some_subscribable.trigger(some_handler_function)
.The target function must be an async coroutine that takes two arguments: The new value of this object and the origin/trace of the event (a list of objects that led to the handler being tiggered). The handler function must make sure to prevent infinite recursive feedback loops: In contrast to subscribed objects, logic handler functions are also triggered, if they led to the object being updated (i.e. they are already conained in the
origin
list). Thus, they should skip execution if called recursively. It should also append itself to theorigin
list and pass the extended list to allWritable.write()
calls it does.To ensure all this for a custom handler function, use the
handler()
decorator:@some_subscribable.trigger @handler() async def some_handler_function(value, origin): ... some_writable.write(value + 1) ...
You may even use multiple trigger decorators:
@some_subscribable.trigger @another_subscribable.trigger @handler() async def some_handler_function(value, origin): if origin[-1] is some_subscribable: ... else: ...
- Parameters:
target – The handler function/coroutine to be triggered on updates. Must comply with the requirements mentioned above.
synchronous – If True, the target coroutine is triggered synchronously, i.e.
_publish_and_wait()
will wait for it to return. This SHALL be used for internal triggering methods that re-publish the value updates of this Subscribable object. For logic handlers that might require substantial time to return, it SHALL not be used!
- Returns:
The
target
function (unchanged) – this allows decorator-style usage of this method
- _publish(value: T_co, origin: List[Any])
Method to publish a new value to all subscribers and trigger all registered logic handlers.
All logic handlers and
Writable.write()
methods are called in parallel asyncio tasks, which are not awaited to return. If _stateful_publishing is True on this object (or class), it will keep track of the currently pending publishing tasks. This information is then used to mitigate state inconsistencies, caused by value updates crossing over each other, by resetting the origin of following value updates to the respective subscribers (with pending tasks). This disables the usual loop prevention and causes the value update to travel back its originating path. Consequently, the update will override the intermediate conflicting value in all places.Additionally, these updates with resetted origin are delayed for a random delay up to
MAX_CONCURRENT_UPDATE_DELAY
seconds (and skipped if overtaken by another update in that time) to avoid endless update loops of two competing value updates. To avoid unnecessary value updates flying around, we limit the origin resetting to value updates that reached this object via another path as the pending updates to the subscriber and value updates that already result from republishing with resetted origin. Other updates should not be prone to conflicts due to the ordered asyncio task queue._stateful_publishing should be enabled for all objects that are Subscribable and Writable (i.e. allows sending and receiving value updates) and have either internal state (like Variables) or represent an external, stateful (or quasi-stateful) system (like a Connector object for a KNX group address/an MQTT topic). On the other hand, objects, that are only Subscribable, should not use _stateful_publishing.
If the object only forwards value updates without storing a state, it should await the successful publishing of the value by using the
_publish_and_wait()
coroutine.- Parameters:
value – The new value to be published by this object. Must be an instance of this object’s type.
origin – The origin list of the new value, excluding this object. See The origin of a New Value for more details.`self` is appended automatically before calling the registered subscribers and logic handlers.
- class shc.base.Reading(*args, **kwargs)
-
- async _from_provider() Optional[T_con]
Private method to be used by inheriting classes to read the current value from this object’s default provider (via its
Readable.read()
method) and convert it to this object’s type, if necessary, using the registered converter function.- Returns:
The default provider’s current value or
None
if no default provider is set or it’s read method raises anUninitializedError
.
- @shc.handler(reset_origin=False, allow_recursion=False) Callable[[Union[Callable[[T, List[Any]], Awaitable[None]], Callable[[T], Awaitable[None]], Callable[[], Awaitable[None]]]], Callable[[T, List[Any]], Awaitable[None]]]
Decorator for custom logic handler functions.
Wraps a custom logic handler functions to make sure it is suited to be registered for triggering by a subscribable object with
Subscribable.trigger()
. It makes sure thatexceptions, occurring execution, are logged,
the origin is extended with the logic handler itself and magically passed to all
Writable.write()
callsthe origin can magically be passed when called directly by other logic handlers
the execution is skipped when called recursively (i.e. the logic handler is already contained in the origin list
It also allows to define the logic handler function with different numbers of parameters: If the function takes two parameters, the trigger value and the origin are passed. If the function takes one parameter, only the value is passed. If the function takes no parameters, it is called without arguments.
- Parameters:
reset_origin – If True, the origin which is magically passed to all write calls, only contains the logic handler itself, not the previous origin list, which led to the handler’s execution. This can be used to change an object’s value, which triggered this logic handler. This may cause infinite recursive feedback loops, so use with care!
allow_recursion – If True, recursive execution of the handler is not skipped. The handler must check the passed values and/or the origin list itself to prevent infinite feedback loops via write calls or calls to other logic handlers – especiaally when used together with reset_origin.
- @shc.blocking_handler Callable[[Union[Callable[[T, List[Any]], None], Callable[[T], None], Callable[[], None]]], Callable[[T, List[Any]], Awaitable[None]]]
Decorator for custom blocking (non-async) logic handler functions.
Wraps a function to transform it into an async logic handler function, which is suited to be registered for triggering by a subscribable object with
Subscribable.trigger()
. The wrapped function is executed in a separate thread, using asyncio’s run_in_executor().Like
handler()
, this decorator catches and logs errors and ensures that the origin can magically be passed when called directly by other logic handlers. However, since the wrapped function is not an asynchronous coroutine, it is not able to callWritable.write()
or another logic handler directly. Thus, this decorator does not include special measures for preparing and passing the origin list or avoiding recursive execution. Still, it takes care of the correct number of arguments (zero to two) for calling the function.