ESPHome Server in Python

Last year I installed holiday lights on my house (previously). At the time I chose a controller that had the following key qualities: a) cheap, b) worked with WLED, c) cheap.

After using it for a season I discovered that the reason why this controller is so cheap is because it uses an ESP8266, which is fine, but it doesn't play well with my setup. For example, if I enable the Home Assistant integration the controller falls over after a few hours. It also reboots for unknowable reasons sometimes and I would come home to find the lights in their default orange color.

I probably could have fixed this with a more powerful controller. I even bought a neat ESP32 controller with a built-in wide range voltage regulator but never got around to setting it up.

Loosely, what I want is:

  1. Control the lights without random reboots
  2. Easy Home Assistant integration
  3. Easy customization
  4. No Wi-Fi
  5. Use hardware that I already have
  6. Tailscale, ideally
  7. Learn some stuff

I could have gotten the first two using a more powerful ESP32 module. Third could be done with ESPHome. Four and five are contradictory while staying within the constraint of an ESP32-based system.

Also last year, I built a little box that controls the power for my 3D printer with a Raspberry Pi Pico and a second Klipper instance (previously) so naturally I tried to get Klipper to fit in this non-3d-printer shaped hole. I tried so hard.

On the surface, Klipper appears to do everything that I want (control addressable LEDs, kind of customizable) but it makes no compromises in wanting to be a 3D printer controller. Most of the firmware is dedicated to running a motion controller, there's a lot of emphasis on scheduling things to happen in the near future, and there's a global printer object. Importantly for my purposes, there's no built-in way to set up a digital input without declaring it a button.

It's fine. Klipper is fine. It's just not built to be a generic IO platform.

So, what's a reasonable rational person to do?

Write an ESPHome Protocol Server

Of course.

There are essentially three ways to get arbitrary devices and entities to show up automatically in Home Assistant.

First, one can write a Home Assistant integration. This is fine and good but it doesn't work for me because my devices are far away from the VM that Home Assistant runs in.

Second, there's MQTT autodiscovery. I know this works because it's how my Zigbee devices integrate with HA, but I just could not make any of the existing generic autodiscovery libraries work consistently. Usually I would end up with a bunch of duplicate MQTT devices and then HA would get confused.

Third, there's ESPHome. ESPHome is a firmware for ESP modules (think: small devices with wifi like plugs, air quality monitors, etc). ESPHome belongs to the Open Home Foundation, same as Home Assistant, so it has commercial support and a first class HA integration. I already have a bunch of ESPHome devices running in my house, so it seems like a pretty natural fit.

The normal and ordinary way of using ESPHome is to write some YAML config that ESPHome compiles into a firmware for your device, then you flash the device and HA sets itself up to interact with the entities you described in YAML. What I want to do is just that last bit, the part where I can tell HA what entities I have and it sets up UI for me.

HA talks to ESPHome over what they call their "native API". The native API is a TCP-based streaming protocol where the ESPHome device is the server and Home Assistant is the client. They exchange protocol buffer encoded messages over either plain TCP or with a Noise-based encryption scheme.

Over the last week or so I built a Python implementation of that protocol named aioesphomeserver, bootstrapping off of the official aioesphomeapi client library that HA uses.

A Minimal Example

Here's a very simple example of what aioesphomeserver looks like:

import asyncio

from aioesphomeserver import (
    Device,
    SwitchEntity,
    BinarySensorEntity,
    EntityListener,
)

class SwitchListener(EntityListener):
    async def handle(self, key, message):
        sensor = self.device.get_entity("test_binary_sensor")
        if sensor != None:
            await sensor.set_state(message.state)

device = Device(
    name = "Test Device",
    mac_address = "AC:BC:32:89:0E:C9",
)

device.add_entity(
    BinarySensorEntity(
        name = "Test Binary Sensor",
    )
)

device.add_entity(
    SwitchEntity(
        name = "Test Switch",
    )
)

device.add_entity(
    SwitchListener(
        name="_listener",
        entity_id="test_switch"
    )
)

asyncio.run(device.run())

From the top, we import a bunch of stuff and then create a class that listens for messages from the device (the handle method). Then, we set up a device with a name and a fake MAC address. Device can generate a random one for you but it doesn't persist, so if you want this device to stick around in HA you should declare a static MAC.

We then add some entities to it: a binary sensor, a switch, and an instance of our switch listener configured for Test Switch.

Finally, we start the asyncio event loop.

With just that, you get the ESPHome web UI:

ESPHome Web UI

Adding the device to Home Assistant you'll see this:

Home Assistant view

AIO ESPHome Server Architecture

I tried to follow the spirit of ESPHome's architecture when writing the server.

Home Assistant device view for the test device

The Device is a central registrar for Entitys and serves as a message hub. The native API server and web server are entities that plug into the message bus, as are things like SwitchEntity and BinarySensorEntity. Everything is async using Python's asyncio.

Any entity with a run method will automatically be scheduled as a task at startup.

A Production Example

The development case for this library has been driving the addressable LEDs on my house. I found a project named u2if that turns a Raspberry Pi Pico into a USB peripheral that provides a bunch of fun stuff: GPIO, I2C, SPI, PWM, ADC as well as an addressable LED driver for WS2812-compatible lights. The fun wrinkle of the light driver is that it offloads the bitstream generation to the Pico's PIO coprocessors.

I forked u2if and added a few things:

  • RGBW support, which was already in the codebase but not available
  • Support for the Pico clone boards I have (SparkFun Pro Micro RP2040)
  • A set of effects for Neopixel along with a console-mode simulator to use while developing
  • A Docker image that bundles the firmware and the Python library

This deployment consists of:

  • A Dell Wyse 3040 thin client running Alpine Linux that already handles Z-Wave for the garage
  • SparkFun Pro Micro RP2040 running the u2if firmware connected over USB
  • Two channels of RS485 transceivers so I can get the very fast, very unforgiving light control signals 40 feet from where the 3040 is mounted to the wall to where the light power injector lives.
Project spread out on my desk, including a Dell Wyse 3040 thin client, the USB interface board in a 3d printed box, the RS485 receiver board hooked up to a spool of LEDs via a breadboard.

Here is the full script that I'm using to drive the addressable lights on my house:

import asyncio

from machine import WS2812B
from neopixel.effects import StaticEffect, BlendEffect, TwinkleEffect

from aioesphomeserver import (
    Device,
    LightEntity,
    LightStateResponse,
    EntityListener,
)

from aioesphomeapi import LightColorCapability

class LightStrip(EntityListener):
    def __init__(self, *args, strings=[], effects={}, **kwargs):
        super().__init__(*args, **kwargs)
        self.strings = strings
        self.num_pixels = sum([s[1] for s in strings])
        self.effects = effects

        self.current_effect_name = None
        self.current_effect = StaticEffect(count=self.num_pixels)
        self.white_brightness = 0.0
        self.color_brightness = 0.0

    async def handle(self, key, message):
        if type(message) != LightStateResponse:
            return

        await self.device.log(1, f"message.effect: '{message.effect}'")

        if message.effect != "" and message.effect != self.current_effect_name:
            if message.effect in self.effects:
                self.current_effect_name = message.effect
                self.current_effect = self.effects[message.effect](self.num_pixels, message)
                self.current_effect.next_frame()

        if self.current_effect:
            self.current_effect.update(message)

        self.color_brightness = message.color_brightness
        self.white_brightness = message.brightness

        if message.state == False:
            self.color_brightness = 0.0
            self.white_brightness = 0.0

    def render(self):
        pixels = []

        for i in range(self.num_pixels):
            color = self.current_effect.pixels[i]

            pixel = [
                int(color[0] * 255.0 * self.color_brightness),
                int(color[1] * 255.0 * self.color_brightness),
                int(color[2] * 255.0 * self.color_brightness),
                int(color[3] * 255.0 * self.white_brightness),
            ]

            pixels.append(pixel)

        # partition strings
        # write to each string
        cur = 0
        for string, length in self.strings:
            last = cur + length - 1
            string.write(pixels[cur:last])
            cur = last + 1

    async def run(self):
        while True:
            self.current_effect.next_frame()
            self.render()
            await asyncio.sleep(1/24.0)


device = Device(
    name = "Garage Stuff",
    mac_address = "7E:85:BA:7E:38:07",
    model = "Garage Stuff"
)

device.add_entity(LightEntity(
    name="Front Lights",
    color_modes=[LightColorCapability.ON_OFF | LightColorCapability.BRIGHTNESS | LightColorCapability.RGB | LightColorCapability.WHITE],
    effects=["Static", "Twinkle"],
))

def make_twinkle_effect(count, state):
    return BlendEffect(
        TwinkleEffect(count=count, include_white_channel=True),
        StaticEffect(count=count, color=[state.red, state.green, state.blue, state.white], include_white_channel=True),
        mode='lighten',
        include_white_channel=True,
    )

device.add_entity(LightStrip(
    name = "_front_lights_strip",
    entity_id = "front_lights",
    strings = [(WS2812B(23, rgbw = True, color_order="GRBW"), 20)],
    effects={
        "Static": lambda count, state: StaticEffect(count=count, color=[state.red, state.green, state.blue, state.white], include_white_channel=True),
        "Twinkle": make_twinkle_effect,
    },
))

asyncio.run(device.run())

The structure is basically the same as the minimal example. We import some stuff, we set up an EntityListener class, and then we set up a Device with a LightEntity and an instance of the listener .

In this case, the listener listens for state responses from a Light entity and renders pixels according to a set of effects. It also has a run method that renders the current effect out every 1/24th of a second.

Should you use this?

I don't know!

If your constraints match mine, maybe it'd be helpful. If you want to expose a thing to Home Assistant and would rather have it show up as an ESPHome device rather than, say, writing your own HA integration or messing with MQTT or writing RESTful API handlers, this would probably be useful.

That said, I think if your use case fits within ESPHome proper you should use that. ESPHome has built in drivers for so many things and is going to be better supported (i.e. people are paid to work on it).

Pretty neat though, eh?

Posted in: Home Automation  

Tagged: Home Assistant Programming