In this blog, we'll walk through how to extract sensor data from the Dyson air purifier and send it to my monitoring stack to visualise real-time temperature and air quality.
Getting started
If you've read some of my other blogs, you'll know that MQTT is a commonly used protocol for IoT communications. This device has WiFi connectivity and a mobile-app for controlling its functions, so I expect we'll see MQTT here too.
To confirm, we'll need to inspect the network traffic from a smart-phone running the Dyson app. Let's start a packet capture on the iPhone.
When opening the Dyson app, we see an MQTT packet with a 'connect command'. That's a good sign, and it's a packet containing the username and password required to subscribe and publish to MQTT topics. Let's make a secure copy of these credentials for use in later steps.
After turning the device on/off and adjusting the fan speed, we start to see the MQTT packets and structure. The topic for issuing commands appears to be '438/YN2-AU-KJA1987A/command', where 'YN2-AU-KJA1987A' is the serial number. Looking at some other packets, we can also determine that real-time data is published to the topic '438/YN2-AU-KJA1987A/status/current' every few seconds when the app is open.
Proof of concept
Now we have the credentials, the topic and the syntax for a few message payloads, let's create a script to request the current metrics from the device.
import paho.mqtt.client as mqtt
from datetime import datetime
import time
import configparser
config = configparser.ConfigParser()
config.read('config.ini')
USERNAME = config['MQTT']['USERNAME']
PASSWORD = config['MQTT']['PASSWORD']
HOST = '<device-IP>'
PORT = 1883
TOPIC = '438/' + USERNAME + '/command'
PAYLOAD_TEMPLATE = '{"mode-reason": "LAPP","time": "%s","msg": "REQUEST-PRODUCT-ENVIRONMENT-CURRENT-SENSOR-DATA"}'
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT broker")
else:
print("Connection failed. Return code =", rc)
def on_publish(client, userdata, mid):
print("Message Published")
if __name__ == '__main__':
client = mqtt.Client(protocol=mqtt.MQTTv311)
client.on_connect = on_connect
client.on_publish = on_publish
client.username_pw_set(USERNAME, PASSWORD)
client.connect(HOST, port=PORT, keepalive=60)
client.loop_start()
while not client.is_connected():
time.sleep(1)
current_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
PAYLOAD = PAYLOAD_TEMPLATE % current_time
print("Publishing message to topic:", TOPIC)
result = client.publish(TOPIC, PAYLOAD)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
print("Message sent successfully")
else:
print("Failed to send message. Return code =", result.rc)
client.loop_stop()
client.disconnect()
By creating another small script that subscribes to the topic '438/YN2-AU-KJA1987A/status/current', we can see the request has been successful and we're presented with a JSON array of the current AQ data.
Making the data useful
To use this data, we need to get it into Prometheus. Let's break down what needs to happen:
- connect to the MQTT broker
- subscribe to the '438/YN2-AU-KJA1987A/status/current' topic
- send a request to obtain the current sensor metrics
- parse the MQTT JSON response and transmit the values to my Prometheus push gateway
- rinse and repeat every 30 seconds
Here's what that looks like in code...
#!/usr/bin/env python3
import paho.mqtt.client as mqtt
import json
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
from datetime import datetime
import time
import configparser
import ssl
import socket
import schedule
import syslog
config = configparser.ConfigParser()
config.read('config.ini')
USERNAME = config['MQTT']['USERNAME']
PASSWORD = config['MQTT']['PASSWORD']
MQTT_HOST = '192.168.201.52'
MQTT_PORT = 1883
PUSH_GATEWAY_ADDRESS = 'https://monitoring.brentonbaker.com:9091'
PUSH_GATEWAY_JOB = 'dyson_aq_living_room'
TOPIC_REQUEST = '438/' + USERNAME + '/command'
TOPIC_RESPONSE = '438/' + USERNAME + '/status/current'
PAYLOAD_TEMPLATE = '{"mode-reason": "LAPP","time": "%s","msg": "REQUEST-PRODUCT-ENVIRONMENT-CURRENT-SENSOR-DATA"}'
registry = CollectorRegistry()
temperature_metric = Gauge('dyson_livingroom_environment_temperature', 'Environment Temperature (Celsius)',
registry=registry)
humidity_metric = Gauge('dyson_livingroom_environment_humidity', 'Environment Humidity (%)', registry=registry)
pm25_metric = Gauge('dyson_livingroom_environment_pm25', 'Particulate Matter PM2.5', registry=registry)
pm10_metric = Gauge('dyson_livingroom_environment_pm10', 'Particulate Matter PM10', registry=registry)
client = mqtt.Client(protocol=mqtt.MQTTv311)
syslog.openlog(logoption=syslog.LOG_PID, facility=syslog.LOG_LOCAL0)
def log_message(message, level=syslog.LOG_INFO):
syslog.syslog(level, message)
def on_connect(client, userdata, flags, rc):
if rc == 0:
log_message("Connected to MQTT broker")
client.subscribe(TOPIC_RESPONSE)
def on_message(client, userdata, msg):
try:
payload = msg.payload.decode('utf-8')
process_mqtt_response(payload)
except Exception as e:
log_message(f"Error processing MQTT message: {e}", level=syslog.LOG_ERR)
def process_mqtt_response(payload):
try:
response_data = json.loads(payload)
if 'data' in response_data and 'tact' in response_data['data'] and 'hact' in response_data['data'] \
and 'pm25' in response_data['data'] and 'pm10' in response_data['data']:
temperature = int(response_data['data']['tact']) / 10.0 - 273 # Formula to convert to Celsius
humidity = int(response_data['data']['hact'])
pm25 = int(response_data['data']['pm25'])
pm10 = int(response_data['data']['pm10'])
temperature_metric.set(temperature)
humidity_metric.set(humidity)
pm25_metric.set(pm25)
pm10_metric.set(pm10)
log_message(f"Metrics: Temperature={temperature}, Humidity={humidity}, PM2.5={pm25}, PM10={pm10}")
push_to_gateway(PUSH_GATEWAY_ADDRESS, job=PUSH_GATEWAY_JOB, registry=registry)
else:
log_message("Received unexpected MQTT message format. Ignoring.", level=syslog.LOG_WARNING)
except Exception as e:
log_message(f"Error processing MQTT response: {e}", level=syslog.LOG_ERR)
def job():
current_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
PAYLOAD = PAYLOAD_TEMPLATE % current_time
log_message(f"Publishing message to topic: {TOPIC_REQUEST}")
result = client.publish(TOPIC_REQUEST, PAYLOAD)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
log_message("Message sent successfully")
else:
log_message(f"Failed to send message. Return code = {result.rc}", level=syslog.LOG_ERR)
if __name__ == '__main__':
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(USERNAME, PASSWORD)
socket.setdefaulttimeout(30)
try:
client.connect(MQTT_HOST, port=MQTT_PORT, keepalive=60)
except socket.timeout:
log_message("Connection timed out.", level=syslog.LOG_ERR)
exit(1)
client.loop_start()
while not client.is_connected():
time.sleep(1)
schedule.every(5).seconds.do(job)
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
pass
except Exception as e:
log_message(f"Unexpected error: {e}", level=syslog.LOG_ERR)
exit(1)
client.loop_stop()
client.disconnect()
Visualising the data
Adding to our existing AQ dashboard, we are now tracking the temperature and humidity from the Dyson.