Guard-fenix_CodigoFenix_2026/_Sort/ble/V006.py

331 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# V005.py (VERSION: 3D-PCB+GRAPH+SHAKEBAR-1)
print("=== V006 VERSION: 3D-PCB+GRAPH+SHAKEBAR-1 ===")
import sys
import time
import math
import asyncio
import threading
import numpy as np
from bleak import BleakClient
from PyQt5 import QtWidgets, QtCore
import pyqtgraph as pg
import pyqtgraph.opengl as gl
# ====== HARD-CODED DEVICE ======
ADDRESS = "10:51:DB:1B:E7:1E"
CHAR_UUID = "7e2a1002-1111-2222-3333-444455556666"
# ====== GRAPH SETTINGS ======
HISTORY_SECONDS = 10.0
UI_UPDATE_MS = 50
MAX_G = 4.0
# ====== SHAKE METRICS SETTINGS ======
# magnitude |a| in g (gravity ~1.0g when stationary)
WARN_MAG_G = 1.30 # mild motion
SHAKE_MAG_G = 1.70 # strong shake
# Use delta from 1g for percent mapping:
# delta_g = abs(|a| - 1.0)
DELTA_G_FULL_SCALE = 1.0 # delta=1.0g => 100%
# Shared latest values (written by BLE notify, read by UI)
latest = {"ax": 0.0, "ay": 0.0, "az": 1.0, "ts": 0.0, "count": 0, "raw": ""}
# Ring buffers for plotting
t_buf = []
ax_buf = []
ay_buf = []
az_buf = []
mag_buf = []
# ---------------- BLE ----------------
def parse_csv_triplet(s: str):
s = s.strip().strip('"')
parts = s.split(",")
if len(parts) != 3:
return None
try:
return float(parts[0]), float(parts[1]), float(parts[2])
except ValueError:
return None
def notify_handler(_sender: int, data: bytearray):
s = data.decode(errors="ignore").strip()
latest["raw"] = s
v = parse_csv_triplet(s)
if v is None:
return
latest["ax"], latest["ay"], latest["az"] = v
latest["ts"] = time.time()
latest["count"] += 1
def roll_pitch_from_accel(ax, ay, az):
roll = math.degrees(math.atan2(ay, az))
pitch = math.degrees(math.atan2(-ax, math.sqrt(ay * ay + az * az)))
return roll, pitch
async def ble_loop():
while True:
try:
print(f"Connecting to {ADDRESS} ...")
async with BleakClient(ADDRESS, timeout=15.0) as client:
if not client.is_connected:
raise RuntimeError("Connect failed.")
print("Connected. Enabling notifications...")
await client.start_notify(CHAR_UUID, notify_handler)
print("Notify enabled. Streaming...\n")
while True:
await asyncio.sleep(1.0)
except Exception as e:
print(f"[BLE] Error/disconnect: {e}")
print("Reconnecting in 2 seconds...\n")
await asyncio.sleep(2.0)
def start_ble_thread():
t = threading.Thread(target=lambda: asyncio.run(ble_loop()), daemon=True)
t.start()
# ---------------- 3D helpers ----------------
def make_cuboid(size=(2.0, 1.2, 0.1)):
L, W, T = size
x, y, z = L/2, W/2, T/2
verts = np.array([
[-x,-y,-z], [ x,-y,-z], [ x, y,-z], [-x, y,-z],
[-x,-y, z], [ x,-y, z], [ x, y, z], [-x, y, z],
], dtype=float)
faces = np.array([
[0,1,2],[0,2,3],
[4,6,5],[4,7,6],
[0,4,5],[0,5,1],
[1,5,6],[1,6,2],
[2,6,7],[2,7,3],
[3,7,4],[3,4,0],
], dtype=int)
return verts, faces
# ---------------- UI ----------------
class MainWindow(QtWidgets.QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("ADXL345 3D PCB + Live Graph + Shake Meter (BLE)")
central = QtWidgets.QWidget()
self.setCentralWidget(central)
outer = QtWidgets.QVBoxLayout(central)
outer.setContentsMargins(6, 6, 6, 6)
outer.setSpacing(6)
# ---- 3D view ----
self.view3d = gl.GLViewWidget()
self.view3d.setMinimumHeight(420)
self.view3d.setCameraPosition(distance=6, elevation=25, azimuth=40)
outer.addWidget(self.view3d, stretch=3)
grid = gl.GLGridItem()
grid.setSize(10, 10)
grid.setSpacing(1, 1)
self.view3d.addItem(grid)
axis = gl.GLAxisItem()
axis.setSize(2, 2, 2)
self.view3d.addItem(axis)
verts, faces = make_cuboid()
colors = np.ones((faces.shape[0], 4), dtype=float)
colors[:,0]=0.25; colors[:,1]=0.7; colors[:,2]=0.35; colors[:,3]=0.9
meshdata = gl.MeshData(vertexes=verts, faces=faces)
self.mesh = gl.GLMeshItem(meshdata=meshdata, faceColors=colors,
drawEdges=True, edgeColor=(0,0,0,1))
self.view3d.addItem(self.mesh)
self.vec = gl.GLLinePlotItem(pos=np.array([[0,0,0],[0,0,1]]),
width=3, antialias=True)
self.view3d.addItem(self.vec)
# ---- Status label ----
self.status = QtWidgets.QLabel()
self.status.setStyleSheet("color:white; background:rgba(0,0,0,140); padding:6px;")
outer.addWidget(self.status)
# ---- Bottom area: graph + shake meter (right) ----
bottom = QtWidgets.QHBoxLayout()
bottom.setSpacing(10)
outer.addLayout(bottom, stretch=2)
# Graph
self.plot = pg.PlotWidget()
self.plot.setMinimumHeight(230)
self.plot.showGrid(x=True, y=True, alpha=0.25)
self.plot.setLabel("left", "Acceleration", units="g")
self.plot.setLabel("bottom", "Time", units="s")
self.plot.setYRange(-MAX_G, MAX_G)
self.plot.addLegend(offset=(10, 10))
bottom.addWidget(self.plot, stretch=5)
# Curves
self.curve_ax = self.plot.plot([], [], name="ax", pen=pg.mkPen('r', width=2))
self.curve_ay = self.plot.plot([], [], name="ay", pen=pg.mkPen('g', width=2))
self.curve_az = self.plot.plot([], [], name="az", pen=pg.mkPen('b', width=2))
self.curve_mag = self.plot.plot([], [], name="|a|", pen=pg.mkPen('y', width=2))
# Threshold lines (horizontal) for magnitude
self.warn_line = pg.InfiniteLine(pos=+WARN_MAG_G, angle=0, pen=pg.mkPen((255,165,0), width=2, style=QtCore.Qt.DashLine))
self.shake_line = pg.InfiniteLine(pos=+SHAKE_MAG_G, angle=0, pen=pg.mkPen((255, 60, 60), width=2, style=QtCore.Qt.DashLine))
self.plot.addItem(self.warn_line)
self.plot.addItem(self.shake_line)
# Shake meter panel (right)
panel = QtWidgets.QVBoxLayout()
panel.setSpacing(6)
bottom.addLayout(panel, stretch=1)
self.meter_title = QtWidgets.QLabel("Shake magnitude")
self.meter_title.setStyleSheet("font-weight: 600;")
panel.addWidget(self.meter_title)
self.meter_bar = QtWidgets.QProgressBar()
self.meter_bar.setRange(0, 100)
self.meter_bar.setValue(0)
self.meter_bar.setFormat("%p%")
self.meter_bar.setTextVisible(True)
self.meter_bar.setMinimumWidth(220)
self.meter_bar.setMinimumHeight(28)
panel.addWidget(self.meter_bar)
self.meter_readout = QtWidgets.QLabel("|a| = 0.000 g\nΔ=0.000 g")
panel.addWidget(self.meter_readout)
self.indicators = QtWidgets.QLabel("CALM")
self.indicators.setAlignment(QtCore.Qt.AlignCenter)
self.indicators.setStyleSheet(
"font-weight: 700; font-size: 18px; padding: 10px; "
"border: 2px solid #555; border-radius: 10px;"
)
panel.addWidget(self.indicators)
panel.addStretch(1)
# ---- State ----
self.roll = 0.0
self.pitch = 0.0
self.t0 = time.time()
# ---- Timer ----
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self.update_ui)
self.timer.start(UI_UPDATE_MS)
def set_indicator_state(self, state: str):
# Simple color changes via stylesheet
if state == "CALM":
self.indicators.setText("CALM")
self.indicators.setStyleSheet(
"font-weight: 700; font-size: 18px; padding: 10px;"
"border: 2px solid #555; border-radius: 10px;"
"background: rgba(80,80,80,120); color: white;"
)
elif state == "WARN":
self.indicators.setText("WARN")
self.indicators.setStyleSheet(
"font-weight: 700; font-size: 18px; padding: 10px;"
"border: 2px solid #a86a00; border-radius: 10px;"
"background: rgba(255,165,0,130); color: black;"
)
else: # SHAKE
self.indicators.setText("SHAKE!")
self.indicators.setStyleSheet(
"font-weight: 800; font-size: 20px; padding: 10px;"
"border: 2px solid #b00000; border-radius: 10px;"
"background: rgba(255,60,60,180); color: white;"
)
def update_ui(self):
axg, ayg, azg = latest["ax"], latest["ay"], latest["az"]
# Magnitude (g)
mag = math.sqrt(axg*axg + ayg*ayg + azg*azg)
delta = abs(mag - 1.0)
# Map delta to 0..100%
pct = int(max(0.0, min(1.0, delta / max(1e-6, DELTA_G_FULL_SCALE))) * 100.0)
self.meter_bar.setValue(pct)
self.meter_readout.setText(f"|a| = {mag:.3f} g\nΔ = {delta:.3f} g")
# Threshold indicators
if mag >= SHAKE_MAG_G:
self.set_indicator_state("SHAKE")
elif mag >= WARN_MAG_G:
self.set_indicator_state("WARN")
else:
self.set_indicator_state("CALM")
# Add to history buffers
t = time.time() - self.t0
t_buf.append(t)
ax_buf.append(axg)
ay_buf.append(ayg)
az_buf.append(azg)
mag_buf.append(mag)
# Trim buffers
while t_buf and (t - t_buf[0]) > HISTORY_SECONDS:
t_buf.pop(0)
ax_buf.pop(0)
ay_buf.pop(0)
az_buf.pop(0)
mag_buf.pop(0)
# Update plot
self.curve_ax.setData(t_buf, ax_buf)
self.curve_ay.setData(t_buf, ay_buf)
self.curve_az.setData(t_buf, az_buf)
self.curve_mag.setData(t_buf, mag_buf)
if t_buf:
self.plot.setXRange(max(0, t_buf[-1] - HISTORY_SECONDS), t_buf[-1])
# Update 3D tilt (smoothed)
roll, pitch = roll_pitch_from_accel(axg, ayg, azg)
alpha = 0.15
self.roll = (1 - alpha) * self.roll + alpha * roll
self.pitch = (1 - alpha) * self.pitch + alpha * pitch
self.mesh.resetTransform()
self.mesh.rotate(self.pitch, 0, 1, 0)
self.mesh.rotate(self.roll, 1, 0, 0)
# Vector line
self.vec.setData(pos=np.array([[0, 0, 0], [axg, ayg, azg]], dtype=float))
# Top status text
age = (time.time() - latest["ts"]) * 1000 if latest["ts"] else 0
self.status.setText(
f"Packets: {latest['count']} Age: {age:.0f} ms\n"
f"ax={axg:+.3f}g ay={ayg:+.3f}g az={azg:+.3f}g |a|={mag:.3f}g\n"
f"roll={self.roll:+.1f}° pitch={self.pitch:+.1f}°"
)
def main():
print("IMPORTANT: Disconnect nRF Connect (or any other BLE client) first.")
start_ble_thread()
app = QtWidgets.QApplication.instance()
if app is None:
app = QtWidgets.QApplication(sys.argv)
win = MainWindow()
win.resize(1100, 820)
win.show()
sys.exit(app.exec_())
if __name__ == "__main__":
main()