Guard-fenix_CodigoFenix_2026/_Sort/ble/V005.py

225 lines
7.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# V005.py (VERSION: 3D-PCB+GRAPH-1)
print("=== V005 VERSION: 3D-PCB+GRAPH-1 ===")
import sys
import time
import math
import asyncio
import threading
import numpy as np
from bleak import BleakClient
from PyQt5 import QtWidgets, QtCore
import pyqtgraph as pg
import pyqtgraph.opengl as gl
# ====== HARD-CODED DEVICE ======
ADDRESS = "10:51:DB:1B:E7:1E"
CHAR_UUID = "7e2a1002-1111-2222-3333-444455556666"
# ====== GRAPH SETTINGS ======
HISTORY_SECONDS = 10.0 # how many seconds of history to show
GRAPH_UPDATE_MS = 50 # graph refresh rate
MAX_G = 4.0 # y-axis range for graph
# Shared latest values (written by BLE notify, read by UI)
latest = {"ax": 0.0, "ay": 0.0, "az": 1.0, "ts": 0.0, "count": 0, "raw": ""}
# Ring buffer for plotting
t_buf = []
ax_buf = []
ay_buf = []
az_buf = []
def parse_csv_triplet(s: str):
s = s.strip().strip('"')
parts = s.split(",")
if len(parts) != 3:
return None
try:
return float(parts[0]), float(parts[1]), float(parts[2])
except ValueError:
return None
def notify_handler(_sender: int, data: bytearray):
s = data.decode(errors="ignore").strip()
latest["raw"] = s
v = parse_csv_triplet(s)
if v is None:
return
latest["ax"], latest["ay"], latest["az"] = v
latest["ts"] = time.time()
latest["count"] += 1
def roll_pitch_from_accel(ax, ay, az):
roll = math.degrees(math.atan2(ay, az))
pitch = math.degrees(math.atan2(-ax, math.sqrt(ay * ay + az * az)))
return roll, pitch
async def ble_loop():
while True:
try:
print(f"Connecting to {ADDRESS} ...")
async with BleakClient(ADDRESS, timeout=15.0) as client:
if not client.is_connected:
raise RuntimeError("Connect failed.")
print("Connected. Enabling notifications...")
await client.start_notify(CHAR_UUID, notify_handler)
print("Notify enabled. Streaming...\n")
while True:
await asyncio.sleep(1.0)
except Exception as e:
print(f"[BLE] Error/disconnect: {e}")
print("Reconnecting in 2 seconds...\n")
await asyncio.sleep(2.0)
def start_ble_thread():
t = threading.Thread(target=lambda: asyncio.run(ble_loop()), daemon=True)
t.start()
def make_cuboid(size=(2.0, 1.2, 0.1)):
L, W, T = size
x, y, z = L/2, W/2, T/2
verts = np.array([
[-x,-y,-z], [ x,-y,-z], [ x, y,-z], [-x, y,-z],
[-x,-y, z], [ x,-y, z], [ x, y, z], [-x, y, z],
], dtype=float)
faces = np.array([
[0,1,2],[0,2,3],
[4,6,5],[4,7,6],
[0,4,5],[0,5,1],
[1,5,6],[1,6,2],
[2,6,7],[2,7,3],
[3,7,4],[3,4,0],
], dtype=int)
return verts, faces
class MainWindow(QtWidgets.QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("ADXL345 3D PCB + Live Accel Graph (BLE)")
# ---- Layout container ----
central = QtWidgets.QWidget()
self.setCentralWidget(central)
vbox = QtWidgets.QVBoxLayout(central)
vbox.setContentsMargins(6, 6, 6, 6)
vbox.setSpacing(6)
# ---- 3D view ----
self.view3d = gl.GLViewWidget()
self.view3d.setMinimumHeight(430)
self.view3d.setCameraPosition(distance=6, elevation=25, azimuth=40)
vbox.addWidget(self.view3d, stretch=3)
grid = gl.GLGridItem()
grid.setSize(10, 10)
grid.setSpacing(1, 1)
self.view3d.addItem(grid)
axis = gl.GLAxisItem()
axis.setSize(2, 2, 2)
self.view3d.addItem(axis)
verts, faces = make_cuboid()
colors = np.ones((faces.shape[0], 4), dtype=float)
colors[:,0]=0.25; colors[:,1]=0.7; colors[:,2]=0.35; colors[:,3]=0.9
meshdata = gl.MeshData(vertexes=verts, faces=faces)
self.mesh = gl.GLMeshItem(meshdata=meshdata, faceColors=colors, drawEdges=True, edgeColor=(0,0,0,1))
self.view3d.addItem(self.mesh)
self.vec = gl.GLLinePlotItem(pos=np.array([[0,0,0],[0,0,1]]), width=3, antialias=True)
self.view3d.addItem(self.vec)
# Small overlay label (top-left of window)
self.label = QtWidgets.QLabel()
self.label.setStyleSheet("color:white; background:rgba(0,0,0,140); padding:6px;")
vbox.addWidget(self.label)
# ---- Graph ----
self.plot = pg.PlotWidget()
self.plot.setMinimumHeight(200)
self.plot.showGrid(x=True, y=True, alpha=0.25)
self.plot.setLabel("left", "Acceleration", units="g")
self.plot.setLabel("bottom", "Time", units="s")
self.plot.setYRange(-MAX_G, MAX_G)
self.plot.addLegend(offset=(10, 10))
vbox.addWidget(self.plot, stretch=1)
# Curves
self.curve_ax = self.plot.plot([], [], name="ax", pen=pg.mkPen('r', width=2))
self.curve_ay = self.plot.plot([], [], name="ay", pen=pg.mkPen('g', width=2))
self.curve_az = self.plot.plot([], [], name="az", pen=pg.mkPen('b', width=2))
# ---- State ----
self.roll = 0.0
self.pitch = 0.0
self.t0 = time.time()
# ---- Timers ----
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self.update_ui)
self.timer.start(GRAPH_UPDATE_MS)
def update_ui(self):
axg, ayg, azg = latest["ax"], latest["ay"], latest["az"]
# Add to history buffers
t = time.time() - self.t0
t_buf.append(t)
ax_buf.append(axg)
ay_buf.append(ayg)
az_buf.append(azg)
# Trim buffers to last HISTORY_SECONDS
while t_buf and (t - t_buf[0]) > HISTORY_SECONDS:
t_buf.pop(0)
ax_buf.pop(0)
ay_buf.pop(0)
az_buf.pop(0)
# Update plot curves
self.curve_ax.setData(t_buf, ax_buf)
self.curve_ay.setData(t_buf, ay_buf)
self.curve_az.setData(t_buf, az_buf)
if t_buf:
self.plot.setXRange(max(0, t_buf[-1] - HISTORY_SECONDS), t_buf[-1])
# Update 3D block tilt (smoothed)
roll, pitch = roll_pitch_from_accel(axg, ayg, azg)
alpha = 0.15
self.roll = (1 - alpha) * self.roll + alpha * roll
self.pitch = (1 - alpha) * self.pitch + alpha * pitch
self.mesh.resetTransform()
self.mesh.rotate(self.pitch, 0, 1, 0)
self.mesh.rotate(self.roll, 1, 0, 0)
# Update accel vector
self.vec.setData(pos=np.array([[0, 0, 0], [axg, ayg, azg]], dtype=float))
# Update label
age = (time.time() - latest["ts"]) * 1000 if latest["ts"] else 0
self.label.setText(
f"Packets: {latest['count']} Age: {age:.0f} ms\n"
f"ax={axg:+.3f}g ay={ayg:+.3f}g az={azg:+.3f}g\n"
f"roll={self.roll:+.1f}° pitch={self.pitch:+.1f}°"
)
def main():
print("IMPORTANT: Disconnect nRF Connect (or any other BLE client) first.")
start_ble_thread()
app = QtWidgets.QApplication.instance()
if app is None:
app = QtWidgets.QApplication(sys.argv)
win = MainWindow()
win.resize(1000, 750)
win.show()
sys.exit(app.exec_())
if __name__ == "__main__":
main()