# V005.py (VERSION: 3D-PCB+GRAPH+SHAKEBAR-1) print("=== V006 VERSION: 3D-PCB+GRAPH+SHAKEBAR-1 ===") import sys import time import math import asyncio import threading import numpy as np from bleak import BleakClient from PyQt5 import QtWidgets, QtCore import pyqtgraph as pg import pyqtgraph.opengl as gl # ====== HARD-CODED DEVICE ====== ADDRESS = "10:51:DB:1B:E7:1E" CHAR_UUID = "7e2a1002-1111-2222-3333-444455556666" # ====== GRAPH SETTINGS ====== HISTORY_SECONDS = 10.0 UI_UPDATE_MS = 50 MAX_G = 4.0 # ====== SHAKE METRICS SETTINGS ====== # magnitude |a| in g (gravity ~1.0g when stationary) WARN_MAG_G = 1.30 # mild motion SHAKE_MAG_G = 1.70 # strong shake # Use delta from 1g for percent mapping: # delta_g = abs(|a| - 1.0) DELTA_G_FULL_SCALE = 1.0 # delta=1.0g => 100% # Shared latest values (written by BLE notify, read by UI) latest = {"ax": 0.0, "ay": 0.0, "az": 1.0, "ts": 0.0, "count": 0, "raw": ""} # Ring buffers for plotting t_buf = [] ax_buf = [] ay_buf = [] az_buf = [] mag_buf = [] # ---------------- BLE ---------------- def parse_csv_triplet(s: str): s = s.strip().strip('"') parts = s.split(",") if len(parts) != 3: return None try: return float(parts[0]), float(parts[1]), float(parts[2]) except ValueError: return None def notify_handler(_sender: int, data: bytearray): s = data.decode(errors="ignore").strip() latest["raw"] = s v = parse_csv_triplet(s) if v is None: return latest["ax"], latest["ay"], latest["az"] = v latest["ts"] = time.time() latest["count"] += 1 def roll_pitch_from_accel(ax, ay, az): roll = math.degrees(math.atan2(ay, az)) pitch = math.degrees(math.atan2(-ax, math.sqrt(ay * ay + az * az))) return roll, pitch async def ble_loop(): while True: try: print(f"Connecting to {ADDRESS} ...") async with BleakClient(ADDRESS, timeout=15.0) as client: if not client.is_connected: raise RuntimeError("Connect failed.") print("Connected. Enabling notifications...") await client.start_notify(CHAR_UUID, notify_handler) print("Notify enabled. Streaming...\n") while True: await asyncio.sleep(1.0) except Exception as e: print(f"[BLE] Error/disconnect: {e}") print("Reconnecting in 2 seconds...\n") await asyncio.sleep(2.0) def start_ble_thread(): t = threading.Thread(target=lambda: asyncio.run(ble_loop()), daemon=True) t.start() # ---------------- 3D helpers ---------------- def make_cuboid(size=(2.0, 1.2, 0.1)): L, W, T = size x, y, z = L/2, W/2, T/2 verts = np.array([ [-x,-y,-z], [ x,-y,-z], [ x, y,-z], [-x, y,-z], [-x,-y, z], [ x,-y, z], [ x, y, z], [-x, y, z], ], dtype=float) faces = np.array([ [0,1,2],[0,2,3], [4,6,5],[4,7,6], [0,4,5],[0,5,1], [1,5,6],[1,6,2], [2,6,7],[2,7,3], [3,7,4],[3,4,0], ], dtype=int) return verts, faces # ---------------- UI ---------------- class MainWindow(QtWidgets.QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("ADXL345 – 3D PCB + Live Graph + Shake Meter (BLE)") central = QtWidgets.QWidget() self.setCentralWidget(central) outer = QtWidgets.QVBoxLayout(central) outer.setContentsMargins(6, 6, 6, 6) outer.setSpacing(6) # ---- 3D view ---- self.view3d = gl.GLViewWidget() self.view3d.setMinimumHeight(420) self.view3d.setCameraPosition(distance=6, elevation=25, azimuth=40) outer.addWidget(self.view3d, stretch=3) grid = gl.GLGridItem() grid.setSize(10, 10) grid.setSpacing(1, 1) self.view3d.addItem(grid) axis = gl.GLAxisItem() axis.setSize(2, 2, 2) self.view3d.addItem(axis) verts, faces = make_cuboid() colors = np.ones((faces.shape[0], 4), dtype=float) colors[:,0]=0.25; colors[:,1]=0.7; colors[:,2]=0.35; colors[:,3]=0.9 meshdata = gl.MeshData(vertexes=verts, faces=faces) self.mesh = gl.GLMeshItem(meshdata=meshdata, faceColors=colors, drawEdges=True, edgeColor=(0,0,0,1)) self.view3d.addItem(self.mesh) self.vec = gl.GLLinePlotItem(pos=np.array([[0,0,0],[0,0,1]]), width=3, antialias=True) self.view3d.addItem(self.vec) # ---- Status label ---- self.status = QtWidgets.QLabel() self.status.setStyleSheet("color:white; background:rgba(0,0,0,140); padding:6px;") outer.addWidget(self.status) # ---- Bottom area: graph + shake meter (right) ---- bottom = QtWidgets.QHBoxLayout() bottom.setSpacing(10) outer.addLayout(bottom, stretch=2) # Graph self.plot = pg.PlotWidget() self.plot.setMinimumHeight(230) self.plot.showGrid(x=True, y=True, alpha=0.25) self.plot.setLabel("left", "Acceleration", units="g") self.plot.setLabel("bottom", "Time", units="s") self.plot.setYRange(-MAX_G, MAX_G) self.plot.addLegend(offset=(10, 10)) bottom.addWidget(self.plot, stretch=5) # Curves self.curve_ax = self.plot.plot([], [], name="ax", pen=pg.mkPen('r', width=2)) self.curve_ay = self.plot.plot([], [], name="ay", pen=pg.mkPen('g', width=2)) self.curve_az = self.plot.plot([], [], name="az", pen=pg.mkPen('b', width=2)) self.curve_mag = self.plot.plot([], [], name="|a|", pen=pg.mkPen('y', width=2)) # Threshold lines (horizontal) for magnitude self.warn_line = pg.InfiniteLine(pos=+WARN_MAG_G, angle=0, pen=pg.mkPen((255,165,0), width=2, style=QtCore.Qt.DashLine)) self.shake_line = pg.InfiniteLine(pos=+SHAKE_MAG_G, angle=0, pen=pg.mkPen((255, 60, 60), width=2, style=QtCore.Qt.DashLine)) self.plot.addItem(self.warn_line) self.plot.addItem(self.shake_line) # Shake meter panel (right) panel = QtWidgets.QVBoxLayout() panel.setSpacing(6) bottom.addLayout(panel, stretch=1) self.meter_title = QtWidgets.QLabel("Shake magnitude") self.meter_title.setStyleSheet("font-weight: 600;") panel.addWidget(self.meter_title) self.meter_bar = QtWidgets.QProgressBar() self.meter_bar.setRange(0, 100) self.meter_bar.setValue(0) self.meter_bar.setFormat("%p%") self.meter_bar.setTextVisible(True) self.meter_bar.setMinimumWidth(220) self.meter_bar.setMinimumHeight(28) panel.addWidget(self.meter_bar) self.meter_readout = QtWidgets.QLabel("|a| = 0.000 g\nΔ=0.000 g") panel.addWidget(self.meter_readout) self.indicators = QtWidgets.QLabel("CALM") self.indicators.setAlignment(QtCore.Qt.AlignCenter) self.indicators.setStyleSheet( "font-weight: 700; font-size: 18px; padding: 10px; " "border: 2px solid #555; border-radius: 10px;" ) panel.addWidget(self.indicators) panel.addStretch(1) # ---- State ---- self.roll = 0.0 self.pitch = 0.0 self.t0 = time.time() # ---- Timer ---- self.timer = QtCore.QTimer() self.timer.timeout.connect(self.update_ui) self.timer.start(UI_UPDATE_MS) def set_indicator_state(self, state: str): # Simple color changes via stylesheet if state == "CALM": self.indicators.setText("CALM") self.indicators.setStyleSheet( "font-weight: 700; font-size: 18px; padding: 10px;" "border: 2px solid #555; border-radius: 10px;" "background: rgba(80,80,80,120); color: white;" ) elif state == "WARN": self.indicators.setText("WARN") self.indicators.setStyleSheet( "font-weight: 700; font-size: 18px; padding: 10px;" "border: 2px solid #a86a00; border-radius: 10px;" "background: rgba(255,165,0,130); color: black;" ) else: # SHAKE self.indicators.setText("SHAKE!") self.indicators.setStyleSheet( "font-weight: 800; font-size: 20px; padding: 10px;" "border: 2px solid #b00000; border-radius: 10px;" "background: rgba(255,60,60,180); color: white;" ) def update_ui(self): axg, ayg, azg = latest["ax"], latest["ay"], latest["az"] # Magnitude (g) mag = math.sqrt(axg*axg + ayg*ayg + azg*azg) delta = abs(mag - 1.0) # Map delta to 0..100% pct = int(max(0.0, min(1.0, delta / max(1e-6, DELTA_G_FULL_SCALE))) * 100.0) self.meter_bar.setValue(pct) self.meter_readout.setText(f"|a| = {mag:.3f} g\nΔ = {delta:.3f} g") # Threshold indicators if mag >= SHAKE_MAG_G: self.set_indicator_state("SHAKE") elif mag >= WARN_MAG_G: self.set_indicator_state("WARN") else: self.set_indicator_state("CALM") # Add to history buffers t = time.time() - self.t0 t_buf.append(t) ax_buf.append(axg) ay_buf.append(ayg) az_buf.append(azg) mag_buf.append(mag) # Trim buffers while t_buf and (t - t_buf[0]) > HISTORY_SECONDS: t_buf.pop(0) ax_buf.pop(0) ay_buf.pop(0) az_buf.pop(0) mag_buf.pop(0) # Update plot self.curve_ax.setData(t_buf, ax_buf) self.curve_ay.setData(t_buf, ay_buf) self.curve_az.setData(t_buf, az_buf) self.curve_mag.setData(t_buf, mag_buf) if t_buf: self.plot.setXRange(max(0, t_buf[-1] - HISTORY_SECONDS), t_buf[-1]) # Update 3D tilt (smoothed) roll, pitch = roll_pitch_from_accel(axg, ayg, azg) alpha = 0.15 self.roll = (1 - alpha) * self.roll + alpha * roll self.pitch = (1 - alpha) * self.pitch + alpha * pitch self.mesh.resetTransform() self.mesh.rotate(self.pitch, 0, 1, 0) self.mesh.rotate(self.roll, 1, 0, 0) # Vector line self.vec.setData(pos=np.array([[0, 0, 0], [axg, ayg, azg]], dtype=float)) # Top status text age = (time.time() - latest["ts"]) * 1000 if latest["ts"] else 0 self.status.setText( f"Packets: {latest['count']} Age: {age:.0f} ms\n" f"ax={axg:+.3f}g ay={ayg:+.3f}g az={azg:+.3f}g |a|={mag:.3f}g\n" f"roll={self.roll:+.1f}° pitch={self.pitch:+.1f}°" ) def main(): print("IMPORTANT: Disconnect nRF Connect (or any other BLE client) first.") start_ble_thread() app = QtWidgets.QApplication.instance() if app is None: app = QtWidgets.QApplication(sys.argv) win = MainWindow() win.resize(1100, 820) win.show() sys.exit(app.exec_()) if __name__ == "__main__": main()