Data Logging

There are SHC interfaces for accepting value updates and store them to a (typically persistent) database, together with the current timestamp. This allows retrieving of historic data series for a certain value (e.g. a room temperature, measured by some thermometer, connected to your SHC server). SHC provides a generic interface to retrieve such time series (“data logs”) from the underlying database, to be implemented by the respective database interface: The DataLogVariable class.

It also includes functionality to retrieve aggregated log data (e.g. average value per time interval instead of raw data points) and subscribing to live updates from the (raw or aggregated) timeseries. Live updates are automatically created from SHC internal writes or a regular timer if the database does not support them natively.

Typically, DataLogVariables are used to add Log Widgets to a WebPage in the web user interface, for displaying live data plots (or listings) of the recent trend of variables.

Included interfaces with data logging support

shc.data_logging Module Reference

class shc.data_logging.DataLogVariable

Interface for objects that allow to retrieve a live data series for a single variable

This interface is typically implemented by connector objects for a single time series in a time series database to allow consumers, e.g. chart widgets on the web UI, to access historic data. Such consumers typically inherit from LiveDataLogView, which provides functionality to specify a time interval, fetch recent values within that interval and push synchronized updates for new data.

This abstract class can be combined with shc.base.Readable and shc.base.Writable to form a connector implementation that allows writing new values to the time series database and read the most recent value.

The DataLogVariable interface allows to retrieve aggregated data series, i.e. calculating the minimum, maximum, average (of numeric values), or on-time (of boolean values) for each interval in an isochronous grid. This abstract class provides a pure-Python implementation of the aggregation in the retrieve_aggregated_log() method, based on the implementor’s retrieve_log() method. If the underlying database technology allows to calculate such aggregated data series natively, this method may be overriden to improve performance.

A DataLogVariable may be subscribable for log updates, allowing one or more LiveDataLogView instances to receive push updates after initially retrieving the current data log. To provide this functionality, implementing classes need to override subscribe_data_log(). In addition, this requires a way to retrieve the current data log state synchronized with the pushes. This must be provided by overriding the retrieve_log_sync() method.

Even if the data log backend does not provide a push/event/notification mechanism, we can generate it within SHC, as long as all new values in this specific data log are created trough this DataLogVariable object (which should be Writable in this case). For this purpose, the combined WritableDataLogVariable should be used (in addition to possibly other base classes, like Readable).

In any case, derived classes need to implement retrieve_log().

Variables:

type – The data type of the values of the time series

async retrieve_aggregated_log(start_time: datetime, end_time: datetime, aggregation_method: AggregationMethod, aggregation_interval: timedelta) List[Tuple[datetime, float]]

Retrieve an aggregated time series from the underlying time series

The returned time series will contain data points in a fixed interval, as specified by aggregation_interval. Each of these points results from aggregating the dynamic value from the raw time series within the following aggregation interval, up to (but not including) the start of next aggregation interval. The raw time series value is considered to be constant until the next recorded data point. Thus, the aggregation algorithm considers the latest data point from the previous interval(s) as the effective time series value from the start of the interval up to the first data point within the interval.

The aggregation function, combining the values within each single aggregation interval into a single value, is specified by the aggregation_method parameter. Note, that AVERAGE, MINIMUM and MAXIMUM are only supported for numeric value types, whereas ON_TIME and ON_TIME_RATIO are only supported on boolean-typed DataLogVariables.

Parameters:
  • start_time – Begin timestamp of the first aggregation interval

  • end_time – End of the overall interval, for which data is retrieved and included in the aggregation. If this the overall interval is not divisible by aggregation_interval, the last aggregation interval will be shortened to end at end_time.

  • aggregation_method – Enum value, indicating the aggregation function to use.

  • aggregation_interval – The duration of each single aggregation interval, i.e. the fixed temporal distance of the points of the returned aggregated time series

abstract async retrieve_log(start_time: datetime, end_time: datetime, include_previous: bool = True) List[Tuple[datetime, T]]

Retrieve all log entries for this log variable in the specified time range from the data log backend/database

The method shall return a list of all log entries with a timestamp greater or equal to the start_time and less than the end_time. If include_previous is True (shall be the default value), the last entry before the start shall also be included, if there is no entry exactly at the start_time.

Parameters:
  • start_time – Begin of the time range (inclusive)

  • end_time – End of the time range (exclusive)

  • include_previous – If True (the default), the last value before start_time

async retrieve_log_sync(start_time: datetime, end_time: datetime, include_previous: bool = True) List[Tuple[datetime, T]]

Retrieve the current log, synchronized with push updates

If this LogDataVariable does not support push updates, i.e. is not subscribable (subscribe_data_log() does not raise DataLogNotSubscribable), this is typically equivalent to retrieve_log().

subscribe_data_log(subscriber: LiveDataLogView) None

Add a LiveDataLogView as a subscriber to this log variable.

Subclasses which override this method must ensure that:

  • all datapoints, added to the data log, are pushed to all subscribed LiveDataLogViews via LiveDataLogView._new_log_values_written()

  • These pushes are synchronized with calls to retrieve_log_sync(), such that appending all pushed values to the returned result of retrieve_log_sync() results in a consistent replica of the log without missing or duplicated values. This means: All values pushed before a call to retrieve_log_sync() returns must be included in its returned result. All values pushed after a call to retrieve_log_sync() returns, must not be included in its returned result.

Raises:

DataLogNotSubscribable – If this data log variable cannot provide push updates and thus is not subscribable

class shc.data_logging.AggregationMethod(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)
AVERAGE = 0

The weighted average of a numeric data series within each aggregation interval

MAXIMUM = 2

The maximum numeric value in each aggregation interval

MINIMUM = 1

The minimum numeric value in each aggregation interval

ON_TIME = 3

The cumulated time the boolean value has been True in each aggregation interval in seconds

ON_TIME_RATIO = 4

The cumulated time the boolean value has been True as fraction of the aggregation interval

class shc.data_logging.WritableDataLogVariable(**kwargs)

Combined base class for DataLogVariables that are Writable.

In addition to the interface of the two base classes, this class allows subscription for push updates of the data log via the subscribe_data_log() method by providing synchronization of writes to the data log: All calls to write() are serialized via a mutex and will trigger a callback method on all subscribed LiveDataLogView objects. The retrieve_log_sync() method locks the same mutex to prevent concurrent writes to the data log, so that all writes are either completed before the retrieval (and thus included in the retrieved log) or queued to be written after the log retrieval returned (and thus included in later calls to the callback method).

Derived classes shall implement _write_to_data_log() (instead of the usual Writable._write() method), and retrieve_log(), as usual for DataLogVariables. In addition, derived classes shall allow to set override external_updates attribute for each instance (via an __init__ parameter) to disable push updates for data logs which are updated externally.

Variables:

external_updates – Specifies, whether SHC-external updates to the data log are expected. This effectively will make this class unsubscribable (because we cannot guarantee to reproduce a consistent log through push updates), so attached LiveDataLogView will fall back to periodic polling of the log.

async retrieve_log_sync(start_time: datetime, end_time: datetime, include_previous: bool = True) List[Tuple[datetime, T]]

Retrieve the current log, synchronized with push updates

If this LogDataVariable does not support push updates, i.e. is not subscribable (subscribe_data_log() does not raise DataLogNotSubscribable), this is typically equivalent to retrieve_log().

subscribe_data_log(subscriber: LiveDataLogView) None

Add a LiveDataLogView as a subscriber to this log variable.

Subclasses which override this method must ensure that:

  • all datapoints, added to the data log, are pushed to all subscribed LiveDataLogViews via LiveDataLogView._new_log_values_written()

  • These pushes are synchronized with calls to retrieve_log_sync(), such that appending all pushed values to the returned result of retrieve_log_sync() results in a consistent replica of the log without missing or duplicated values. This means: All values pushed before a call to retrieve_log_sync() returns must be included in its returned result. All values pushed after a call to retrieve_log_sync() returns, must not be included in its returned result.

Raises:

DataLogNotSubscribable – If this data log variable cannot provide push updates and thus is not subscribable

class shc.data_logging.LiveDataLogView(data_log: DataLogVariable[T], interval: timedelta, aggregation: Optional[AggregationMethod] = None, aggregation_interval: Optional[timedelta] = None, align_to: datetime = datetime.datetime(2020, 1, 1, 0, 0), update_interval: Optional[timedelta] = None)

Base class for consumers of a DataLogVariable, providing historic data within a fixed interval and live updates for this data.

This class provides functionality to fetch historic time series data, starting a fixed interval in the past, from a DataLogVariable and provide push-updates for new or updated values, to update the time series data in a consistent manner afterward. Updates are either triggered via subscription of the DataLogVariable (resp. WritableDataLogVariable) or in a regular interval.

This allows to initialize new copies of the recent time series data at any time and update all these copies consistently and efficiently, i.e. to provide data for a plot of the time series to any number of web browser clients, including newly connected clients. It is achieved via the following procedure:

  1. Create an instance of a LiveDataLogView and subscribe it to the log variable. If supported (see below), it will subscribe to the variable for push-updates, otherwise, it will create a timer for regular updates.

  2. When a new data log copy is to be initialized (e.g. a new client connects), call and await get_current_view(), and initialize the log copy/client with the returned data

  3. Implement _process_new_logvalues() to update all log copies/clients with the provided data points. With aggregation enabled, this will regularly include replacing the latest data point, otherwise, new datapoints are always to be appended

Actual push updates (i.e. _process_new_logvalues() is called directly after a write to the data log) are only supported under the following conditions:

  • aggregation is None, and

  • the DataLogVariable is subscribable, i.e. its subscribe_data_log() method does not raise DataLogNotSubscribable. This is especially true for DataLogVariables inheriting from WritableDataLogVariable, with “external_updates” set to False.

In any other case, a periodic timer of update_interval is created to fetch new/updated values regularly and pass them to _process_new_logvalues(). However, if the DataLogVariable is subscribable, it will be used as an additional trigger for updating the values.

Parameters:
  • data_log – The DataLogVariable to retrieve the time series and updates from

  • interval – The length timespan to retrieve when calling get_current_view()

  • aggregation – The aggregation function to use or None to disable aggregation and retrieve raw log datapoints

  • aggregation_interval – If aggregation is enabled: The duration of each single aggregation interval

  • align_to – If aggregation is enabled: Align the grid of aggregation intervals to the given timestamp, i.e., the times of the aggregated values are calculated in such a way, that one of the aggregated values’ timestamp would equal the given timestamp (if interval stretched long enough into the past or future to include this timestamp)

  • update_interval – Unless push updates are possible (see above), the period of the periodic log retrieval and update. If not specified, it is set to 1/20th of the interval, but not more than 1min.

async get_current_view(include_previous: bool = False) Sequence[Tuple[datetime, Union[T, float]]]

Retrieve the recent log values from the DataLogVariable, as specified by this object’s constructor arguments

Retrieves and returns the data log entries from up to interval time ago, possible aggregated, as defined by aggregation`, in a way, that future invocations of _process_new_logvalues() provide consistent updates to these entries.

push

Whether we use push updates of the values via subscription of a WritableDataLogVariable (True) or interval-based update via log retrieval (potentially also triggered via subscription) (False)

class shc.data_logging.LoggingWebUIView(data_log: DataLogVariable[T], interval: timedelta, aggregation: Optional[AggregationMethod] = None, aggregation_interval: Optional[timedelta] = None, align_to: datetime = datetime.datetime(2020, 1, 1, 0, 0), update_interval: Optional[timedelta] = None, converter: Optional[Callable[[T], Any]] = None, include_previous: bool = True)

Bases: LiveDataLogView[T], WebUIConnector, Generic[T]

A WebUIConnector which subclasses LiveDataLogView to allow retrieving and live-update of a DataLogVariable for a specified time interval and via the Webinterface UI websocket.

shc.interfaces.in_memory_data_logging Module Reference

class shc.interfaces.in_memory_data_logging.InMemoryDataLogVariable(type_: Type[T], keep: timedelta)

Bases: Writable[T], DataLogVariable[T], Readable[T], Generic[T]

A single in-memory DataLogVariable, based on a simple Python list, without any persistent storage.

Data log entries that are older than the specified keep time are dropped automatically, to keep memory usage limited. This class is sufficient for logging a variable value for the purpose of displaying a chart over the last few minutes (or maybe hours) and calculate aggregated values, if you don’t mind losing the historic data on every restart of the SHC application. It’s also fine for demonstration purposes (see ui_logging_showcase.py in the examples/ directory).

Parameters:
  • type_ – Value type of this connectable object (and as a DataLogVariable)

  • keep – timespan for which to keep the values. Older values will be deleted upon the next write to the object.