156 lines
4.1 KiB
Python
156 lines
4.1 KiB
Python
from micropython import const
|
|
import asyncio
|
|
import aioble
|
|
import bluetooth
|
|
import machine
|
|
import neopixel
|
|
import ujson as json
|
|
import time
|
|
import _thread
|
|
|
|
from machine import Pin, I2C
|
|
import bme680
|
|
|
|
# ----------------- NeoPixel -----------------
|
|
NEO_PIN = 8
|
|
NEO_COUNT = 1
|
|
np = neopixel.NeoPixel(machine.Pin(NEO_PIN), NEO_COUNT)
|
|
np[0] = (127, 0, 127)
|
|
np.write()
|
|
|
|
def set_led(cmd: int):
|
|
if cmd == 0:
|
|
np[0] = (0, 0, 0)
|
|
elif cmd == 1:
|
|
np[0] = (255, 0, 0)
|
|
elif cmd == 2:
|
|
np[0] = (0, 255, 0)
|
|
elif cmd == 3:
|
|
np[0] = (0, 0, 255)
|
|
else:
|
|
return
|
|
np.write()
|
|
|
|
def decode_cmd(data: bytes):
|
|
try:
|
|
if data is None:
|
|
return None
|
|
return int.from_bytes(data, "big")
|
|
except:
|
|
return None
|
|
|
|
# ----------------- I2C + BME680 -----------------
|
|
# Your wiring: GPIO5=SCL, GPIO4=SDA
|
|
i2c = I2C(0, scl=Pin(5), sda=Pin(4), freq=400_000)
|
|
sensor = bme680.BME680(i2c)
|
|
|
|
# Shared sensor data (written by thread, read by asyncio)
|
|
_lock = _thread.allocate_lock()
|
|
_latest = {
|
|
"t_c": None,
|
|
"h_pct": None,
|
|
"p_hpa": None,
|
|
"gas_ohms": None,
|
|
"gas_valid": True,
|
|
"ts": 0,
|
|
}
|
|
|
|
def bme680_thread():
|
|
i = 0
|
|
while True:
|
|
i += 1
|
|
|
|
# Gas is slow: do it every 10 seconds
|
|
gas_on = (i % 10 == 0)
|
|
|
|
# Fast settings for demo
|
|
sensor.measure(
|
|
gas=gas_on,
|
|
t_os=1, p_os=1, h_os=1,
|
|
iir_filter=0,
|
|
gas_temp=300,
|
|
gas_ms=80
|
|
)
|
|
#sensor.measure(gas=True)
|
|
t = sensor.temperature()
|
|
h = sensor.humidity()
|
|
p = sensor.pressure()
|
|
g = sensor.gas() if gas_on else {"ohms": None, "valid": False}
|
|
|
|
with _lock:
|
|
_latest["t_c"] = round(t, 2)
|
|
_latest["h_pct"] = round(h, 2)
|
|
_latest["p_hpa"] = round(p, 2)
|
|
_latest["gas_ohms"] = g["ohms"]
|
|
_latest["gas_valid"] = bool(g["valid"])
|
|
_latest["ts"] = time.time()
|
|
|
|
time.sleep(1)
|
|
|
|
_thread.start_new_thread(bme680_thread, ())
|
|
|
|
# ----------------- BLE UUIDs -----------------
|
|
_BLE_SERVICE_UUID = bluetooth.UUID("19b10000-e8f2-537e-4f6c-d104768a1214")
|
|
_BLE_SENSOR_CHAR_UUID = bluetooth.UUID("19b10001-e8f2-537e-4f6c-d104768a1214") # notify
|
|
_BLE_LED_CHAR_UUID = bluetooth.UUID("19b10002-e8f2-537e-4f6c-d104768a1214") # write
|
|
|
|
_ADV_INTERVAL_MS = const(250_000)
|
|
|
|
ble_service = aioble.Service(_BLE_SERVICE_UUID)
|
|
sensor_characteristic = aioble.Characteristic(ble_service, _BLE_SENSOR_CHAR_UUID, read=True, notify=True)
|
|
led_characteristic = aioble.Characteristic(ble_service, _BLE_LED_CHAR_UUID, read=True, write=True, capture=True)
|
|
aioble.register_services(ble_service)
|
|
|
|
def encode_json(obj) -> bytes:
|
|
return json.dumps(obj).encode("utf-8")
|
|
|
|
# ----------------- BLE tasks -----------------
|
|
async def notify_task():
|
|
last = None
|
|
while True:
|
|
with _lock:
|
|
payload = dict(_latest)
|
|
|
|
b = encode_json(payload)
|
|
if b != last:
|
|
sensor_characteristic.write(b, send_update=True)
|
|
last = b
|
|
|
|
await asyncio.sleep_ms(500) # UI feels live at 2 Hz
|
|
|
|
async def peripheral_task():
|
|
while True:
|
|
try:
|
|
async with await aioble.advertise(
|
|
_ADV_INTERVAL_MS,
|
|
name="ESP32-BME680",
|
|
services=[_BLE_SERVICE_UUID],
|
|
) as connection:
|
|
print("Connection from", connection.device)
|
|
await connection.disconnected()
|
|
print("Disconnected")
|
|
except Exception as e:
|
|
print("peripheral_task error:", e)
|
|
await asyncio.sleep_ms(200)
|
|
|
|
async def led_write_task():
|
|
while True:
|
|
try:
|
|
connection, data = await led_characteristic.written()
|
|
cmd = decode_cmd(data)
|
|
print("LED cmd:", cmd, "from", connection.device)
|
|
if cmd is not None:
|
|
set_led(cmd)
|
|
except Exception as e:
|
|
print("led_write_task error:", e)
|
|
await asyncio.sleep_ms(100)
|
|
|
|
async def main():
|
|
await asyncio.gather(
|
|
notify_task(),
|
|
peripheral_task(),
|
|
led_write_task()
|
|
)
|
|
|
|
asyncio.run(main())
|
|
|