Reverse Engineering the Dyson Air Purifier

In this blog, we'll uncover how to extract and send sensor data from a Dyson air purifier to my monitoring stack for use on my home dashboards.

Let's get snooping

If you've read some of my other blogs, you'll know that MQTT is a commonly used protocol for IoT communications. This device has WiFi connectivity and a mobile-app for controlling its functions, so I expect we'll see MQTT in action here also.

To confirm, we'll need to inspect the network traffic from our smart-phone running the Dyson app. Let's start a packet capture on the iPhone.

When opening the Dyson app, we see an MQTT packet with a 'connect command'. This is a good sign, and it's a packet containing the username and password required to subscribe and publish to MQTT topics. Let's make a secure copy of these credentials for use in later steps.

After turning the device on/off and adjusting the fan speed a few times, we start to see the MQTT packets and structure. The topic for issuing commands appears to be '438/YN2-AU-KJA1987A/command', where 'YN2-AU-KJA1987A' is the serial number. Looking at some other packets, we can also determine that real-time data is published to the topic '438/YN2-AU-KJA1987A/status/current' every few seconds when the app is open.

Proof of concept

Now we have the credentials, the topic and the syntax for a few message payloads, let's create a script to request the current metrics from the device.

import paho.mqtt.client as mqtt
from datetime import datetime
import time
import configparser

config = configparser.ConfigParser()
config.read('config.ini')

USERNAME = config['MQTT']['USERNAME']
PASSWORD = config['MQTT']['PASSWORD']

HOST = '<device-IP>'
PORT = 1883
TOPIC = '438/' + USERNAME + '/command'


PAYLOAD_TEMPLATE = '{"mode-reason": "LAPP","time": "%s","msg": "REQUEST-PRODUCT-ENVIRONMENT-CURRENT-SENSOR-DATA"}'

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected to MQTT broker")
    else:
        print("Connection failed. Return code =", rc)

def on_publish(client, userdata, mid):
    print("Message Published")

if __name__ == '__main__':
    client = mqtt.Client(protocol=mqtt.MQTTv311)
    client.on_connect = on_connect
    client.on_publish = on_publish

    client.username_pw_set(USERNAME, PASSWORD)
    client.connect(HOST, port=PORT, keepalive=60)

    client.loop_start()  

    while not client.is_connected():
        time.sleep(1)


    current_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
    PAYLOAD = PAYLOAD_TEMPLATE % current_time

    print("Publishing message to topic:", TOPIC)
    result = client.publish(TOPIC, PAYLOAD)

    if result.rc == mqtt.MQTT_ERR_SUCCESS:
        print("Message sent successfully")
    else:
        print("Failed to send message. Return code =", result.rc)

    client.loop_stop() 
    client.disconnect()

By creating another small script that subscribes to the topic '438/YN2-AU-KJA1987A/status/current', we can see the request has been successful and we're presented with a JSON array of the current AQ data.

Making the data useful

To use this data, we need to get it into Prometheus. Let's break down what needs to happen:

  1. connect to the MQTT broker
  2. subscribe to the '438/YN2-AU-KJA1987A/status/current' topic
  3. send a request to obtain the current sensor metrics
  4. parse the MQTT JSON response and transmit the values to my Prometheus push gateway
  5. rinse and repeat every 30 seconds

Here's what that looks like in code...

#!/usr/bin/env python3

import paho.mqtt.client as mqtt
import json
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
from datetime import datetime
import time
import configparser
import ssl
import socket
import schedule
import syslog

config = configparser.ConfigParser()
config.read('config.ini')

USERNAME = config['MQTT']['USERNAME']
PASSWORD = config['MQTT']['PASSWORD']
MQTT_HOST = '192.168.201.52'
MQTT_PORT = 1883
PUSH_GATEWAY_ADDRESS = 'https://monitoring.brentonbaker.com:9091'
PUSH_GATEWAY_JOB = 'dyson_aq_living_room'

TOPIC_REQUEST = '438/' + USERNAME + '/command'
TOPIC_RESPONSE = '438/' + USERNAME + '/status/current'

PAYLOAD_TEMPLATE = '{"mode-reason": "LAPP","time": "%s","msg": "REQUEST-PRODUCT-ENVIRONMENT-CURRENT-SENSOR-DATA"}'


registry = CollectorRegistry()
temperature_metric = Gauge('dyson_livingroom_environment_temperature', 'Environment Temperature (Celsius)',
                           registry=registry)
humidity_metric = Gauge('dyson_livingroom_environment_humidity', 'Environment Humidity (%)', registry=registry)
pm25_metric = Gauge('dyson_livingroom_environment_pm25', 'Particulate Matter PM2.5', registry=registry)
pm10_metric = Gauge('dyson_livingroom_environment_pm10', 'Particulate Matter PM10', registry=registry)

client = mqtt.Client(protocol=mqtt.MQTTv311)


syslog.openlog(logoption=syslog.LOG_PID, facility=syslog.LOG_LOCAL0)

def log_message(message, level=syslog.LOG_INFO):
    syslog.syslog(level, message)

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        log_message("Connected to MQTT broker")
        client.subscribe(TOPIC_RESPONSE)

def on_message(client, userdata, msg):
    try:
        payload = msg.payload.decode('utf-8')
        process_mqtt_response(payload)
    except Exception as e:
        log_message(f"Error processing MQTT message: {e}", level=syslog.LOG_ERR)

def process_mqtt_response(payload):
    try:
        response_data = json.loads(payload)
        if 'data' in response_data and 'tact' in response_data['data'] and 'hact' in response_data['data'] \
                and 'pm25' in response_data['data'] and 'pm10' in response_data['data']:
            temperature = int(response_data['data']['tact']) / 10.0 - 273  # Formula to convert to Celsius
            humidity = int(response_data['data']['hact'])
            pm25 = int(response_data['data']['pm25'])
            pm10 = int(response_data['data']['pm10'])


            temperature_metric.set(temperature)
            humidity_metric.set(humidity)
            pm25_metric.set(pm25)
            pm10_metric.set(pm10)

 
            log_message(f"Metrics: Temperature={temperature}, Humidity={humidity}, PM2.5={pm25}, PM10={pm10}")

   
            push_to_gateway(PUSH_GATEWAY_ADDRESS, job=PUSH_GATEWAY_JOB, registry=registry)
        else:
            log_message("Received unexpected MQTT message format. Ignoring.", level=syslog.LOG_WARNING)
    except Exception as e:
        log_message(f"Error processing MQTT response: {e}", level=syslog.LOG_ERR)

def job():
    current_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
    PAYLOAD = PAYLOAD_TEMPLATE % current_time

    log_message(f"Publishing message to topic: {TOPIC_REQUEST}")
    result = client.publish(TOPIC_REQUEST, PAYLOAD)

    if result.rc == mqtt.MQTT_ERR_SUCCESS:
        log_message("Message sent successfully")
    else:
        log_message(f"Failed to send message. Return code = {result.rc}", level=syslog.LOG_ERR)

if __name__ == '__main__':
    client.on_connect = on_connect
    client.on_message = on_message

    client.username_pw_set(USERNAME, PASSWORD)

    socket.setdefaulttimeout(30)

    try:
        client.connect(MQTT_HOST, port=MQTT_PORT, keepalive=60)
    except socket.timeout:
        log_message("Connection timed out.", level=syslog.LOG_ERR)
        exit(1)

    client.loop_start()

    while not client.is_connected():
        time.sleep(1)

    schedule.every(5).seconds.do(job)

    try:
        while True:
            schedule.run_pending()
            time.sleep(1)
    except KeyboardInterrupt:
        pass
    except Exception as e:
        log_message(f"Unexpected error: {e}", level=syslog.LOG_ERR)
        exit(1)

    client.loop_stop()
    client.disconnect()

Displaying the data

Adding to our existing AQ dashboard, we are now tracking the temperature and humidity from the Dyson.