Guard-fenix_CodigoFenix_2026/_Sort/ble/V007.py

295 lines
9.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# bme680_ui.py (VERSION: BME680-GRAPH+BAR-1)
print("=== BME680 UI VERSION: BME680-GRAPH+BAR-1 ===")
import asyncio
import threading
import time
import math
import sys
from bleak import BleakClient
from PyQt5 import QtWidgets, QtCore
import pyqtgraph as pg
# ====== HARD-CODED DEVICE + UUIDs ======
ADDRESS = "98:88:E0:76:30:56" # change if your BME680 device uses a different MAC
CHAR_UUID = "7e2a0002-1111-2222-3333-444455556666" # change if different from ADXL project
# ====== UI SETTINGS ======
HISTORY_SECONDS = 120.0 # 2 minutes of history (BME changes slower than accel)
UI_UPDATE_MS = 250 # 4 Hz UI refresh is plenty for env data
# “Air change” percent meter tuning (based on gas resistance change)
# We measure delta from a slowly-updated baseline.
BASELINE_ALPHA = 0.01 # slower = steadier baseline
DELTA_FULL_SCALE = 20000 # ohms delta => 100% (tune this)
# Thresholds for indicator states (percent)
WARN_PCT = 30
ALERT_PCT = 60
latest = {
"t": None, # °C
"h": None, # %
"gas": None, # ohms
"p": None, # hPa
"alt": None, # m
"ts": 0.0,
"count": 0,
"raw": "",
}
# Buffers
t_buf = []
temp_buf = []
hum_buf = []
press_buf = []
gas_buf = []
alt_buf = []
gas_baseline = None # running baseline for “change” meter
def parse_csv_5(s: str):
s = s.strip().strip('"')
parts = s.split(",")
if len(parts) != 5:
return None
try:
# temp, hum, gas_ohms, pressure_hpa, altitude_m
return float(parts[0]), float(parts[1]), float(parts[2]), float(parts[3]), float(parts[4])
except ValueError:
return None
def notify_handler(_sender: int, data: bytearray):
s = data.decode(errors="ignore").strip()
latest["raw"] = s
v = parse_csv_5(s)
if v is None:
return
latest["t"], latest["h"], latest["gas"], latest["p"], latest["alt"] = v
latest["ts"] = time.time()
latest["count"] += 1
async def ble_loop():
while True:
try:
print(f"Connecting to {ADDRESS} ...")
async with BleakClient(ADDRESS, timeout=15.0) as client:
if not client.is_connected:
raise RuntimeError("Connect failed.")
print("Connected. Enabling notifications...")
await client.start_notify(CHAR_UUID, notify_handler)
print("Notify enabled. Streaming...\n")
while True:
await asyncio.sleep(1.0)
except Exception as e:
print(f"[BLE] Error/disconnect: {e}")
print("Reconnecting in 2 seconds...\n")
await asyncio.sleep(2.0)
def start_ble_thread():
t = threading.Thread(target=lambda: asyncio.run(ble_loop()), daemon=True)
t.start()
class MainWindow(QtWidgets.QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("BME680 Live Graph + Air-Change Meter (BLE)")
central = QtWidgets.QWidget()
self.setCentralWidget(central)
outer = QtWidgets.QVBoxLayout(central)
outer.setContentsMargins(8, 8, 8, 8)
outer.setSpacing(8)
self.status = QtWidgets.QLabel("Waiting for data...")
self.status.setStyleSheet("color:white; background:rgba(0,0,0,140); padding:8px;")
outer.addWidget(self.status)
# Bottom area: plots + meter on right
row = QtWidgets.QHBoxLayout()
row.setSpacing(10)
outer.addLayout(row)
# One plot with multiple y-scales is messy; well use 2 plots stacked.
plots_col = QtWidgets.QVBoxLayout()
plots_col.setSpacing(8)
row.addLayout(plots_col, stretch=5)
# Plot 1: Temp + Humidity
self.plot1 = pg.PlotWidget()
self.plot1.showGrid(x=True, y=True, alpha=0.25)
self.plot1.setLabel("left", "Temp (°C) / Humidity (%)")
self.plot1.setLabel("bottom", "Time (s)")
self.plot1.addLegend(offset=(10, 10))
plots_col.addWidget(self.plot1)
self.curve_temp = self.plot1.plot([], [], name="Temp °C", pen=pg.mkPen('r', width=2))
self.curve_hum = self.plot1.plot([], [], name="Humidity %", pen=pg.mkPen('g', width=2))
# Plot 2: Pressure + Gas (gas is big so itll dominate visually; thats ok)
self.plot2 = pg.PlotWidget()
self.plot2.showGrid(x=True, y=True, alpha=0.25)
self.plot2.setLabel("left", "Pressure (hPa) / Gas (Ω)")
self.plot2.setLabel("bottom", "Time (s)")
self.plot2.addLegend(offset=(10, 10))
plots_col.addWidget(self.plot2)
self.curve_press = self.plot2.plot([], [], name="Pressure hPa", pen=pg.mkPen('c', width=2))
self.curve_gas = self.plot2.plot([], [], name="Gas Ω", pen=pg.mkPen('y', width=2))
# Meter panel on right
panel = QtWidgets.QVBoxLayout()
panel.setSpacing(8)
row.addLayout(panel, stretch=1)
title = QtWidgets.QLabel("Air change (gas Δ)")
title.setStyleSheet("font-weight: 700;")
panel.addWidget(title)
self.bar = QtWidgets.QProgressBar()
self.bar.setRange(0, 100)
self.bar.setValue(0)
self.bar.setMinimumWidth(240)
self.bar.setMinimumHeight(28)
self.bar.setFormat("%p%")
panel.addWidget(self.bar)
self.readout = QtWidgets.QLabel("gas: --- Ω\nbaseline: --- Ω\nΔ: --- Ω")
panel.addWidget(self.readout)
self.ind = QtWidgets.QLabel("CALM")
self.ind.setAlignment(QtCore.Qt.AlignCenter)
self.ind.setStyleSheet(
"font-weight: 800; font-size: 20px; padding: 10px;"
"border: 2px solid #555; border-radius: 10px;"
"background: rgba(80,80,80,120); color: white;"
)
panel.addWidget(self.ind)
panel.addStretch(1)
self.t0 = time.time()
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self.update_ui)
self.timer.start(UI_UPDATE_MS)
def set_state(self, state: str):
if state == "CALM":
self.ind.setText("CALM")
self.ind.setStyleSheet(
"font-weight: 800; font-size: 20px; padding: 10px;"
"border: 2px solid #555; border-radius: 10px;"
"background: rgba(80,80,80,120); color: white;"
)
elif state == "WARN":
self.ind.setText("CHANGE")
self.ind.setStyleSheet(
"font-weight: 800; font-size: 20px; padding: 10px;"
"border: 2px solid #a86a00; border-radius: 10px;"
"background: rgba(255,165,0,140); color: black;"
)
else:
self.ind.setText("BIG CHANGE")
self.ind.setStyleSheet(
"font-weight: 900; font-size: 20px; padding: 10px;"
"border: 2px solid #b00000; border-radius: 10px;"
"background: rgba(255,60,60,180); color: white;"
)
def update_ui(self):
global gas_baseline
if latest["t"] is None:
return
now = time.time()
t = now - self.t0
temp = latest["t"]
hum = latest["h"]
gas = latest["gas"]
pres = latest["p"]
alt = latest["alt"]
# Update baseline slowly
if gas_baseline is None:
gas_baseline = gas
else:
gas_baseline = (1.0 - BASELINE_ALPHA) * gas_baseline + BASELINE_ALPHA * gas
delta = abs(gas - gas_baseline)
pct = int(max(0.0, min(1.0, delta / max(1e-6, DELTA_FULL_SCALE))) * 100.0)
self.bar.setValue(pct)
if pct >= ALERT_PCT:
self.set_state("ALERT")
elif pct >= WARN_PCT:
self.set_state("WARN")
else:
self.set_state("CALM")
self.readout.setText(
f"gas: {gas:,.0f} Ω\nbaseline: {gas_baseline:,.0f} Ω\nΔ: {delta:,.0f} Ω"
)
# Buffers
t_buf.append(t)
temp_buf.append(temp)
hum_buf.append(hum)
press_buf.append(pres)
gas_buf.append(gas)
# Trim
while t_buf and (t - t_buf[0]) > HISTORY_SECONDS:
t_buf.pop(0)
temp_buf.pop(0)
hum_buf.pop(0)
press_buf.pop(0)
gas_buf.pop(0)
# Update plots
self.curve_temp.setData(t_buf, temp_buf)
self.curve_hum.setData(t_buf, hum_buf)
self.curve_press.setData(t_buf, press_buf)
self.curve_gas.setData(t_buf, gas_buf)
if t_buf:
xmin = max(0, t_buf[-1] - HISTORY_SECONDS)
xmax = t_buf[-1]
self.plot1.setXRange(xmin, xmax)
self.plot2.setXRange(xmin, xmax)
age_ms = (now - latest["ts"]) * 1000.0 if latest["ts"] else 0.0
self.status.setText(
f"Packets: {latest['count']} Age: {age_ms:.0f} ms\n"
f"T={temp:.2f} °C RH={hum:.1f}% P={pres:.1f} hPa Gas={gas:,.0f} Ω Alt={alt:.1f} m"
)
def main():
print("IMPORTANT: Disconnect nRF Connect (or any other BLE client) first.")
start_ble_thread()
app = QtWidgets.QApplication.instance()
if app is None:
app = QtWidgets.QApplication(sys.argv)
win = MainWindow()
win.resize(1200, 720)
win.show()
sys.exit(app.exec_())
if __name__ == "__main__":
main()