331 lines
11 KiB
Python
331 lines
11 KiB
Python
# V005.py (VERSION: 3D-PCB+GRAPH+SHAKEBAR-1)
|
||
print("=== V006 VERSION: 3D-PCB+GRAPH+SHAKEBAR-1 ===")
|
||
|
||
import sys
|
||
import time
|
||
import math
|
||
import asyncio
|
||
import threading
|
||
|
||
import numpy as np
|
||
from bleak import BleakClient
|
||
|
||
from PyQt5 import QtWidgets, QtCore
|
||
import pyqtgraph as pg
|
||
import pyqtgraph.opengl as gl
|
||
|
||
# ====== HARD-CODED DEVICE ======
|
||
ADDRESS = "10:51:DB:1B:E7:1E"
|
||
CHAR_UUID = "7e2a1002-1111-2222-3333-444455556666"
|
||
|
||
# ====== GRAPH SETTINGS ======
|
||
HISTORY_SECONDS = 10.0
|
||
UI_UPDATE_MS = 50
|
||
MAX_G = 4.0
|
||
|
||
# ====== SHAKE METRICS SETTINGS ======
|
||
# magnitude |a| in g (gravity ~1.0g when stationary)
|
||
WARN_MAG_G = 1.30 # mild motion
|
||
SHAKE_MAG_G = 1.70 # strong shake
|
||
|
||
# Use delta from 1g for percent mapping:
|
||
# delta_g = abs(|a| - 1.0)
|
||
DELTA_G_FULL_SCALE = 1.0 # delta=1.0g => 100%
|
||
|
||
# Shared latest values (written by BLE notify, read by UI)
|
||
latest = {"ax": 0.0, "ay": 0.0, "az": 1.0, "ts": 0.0, "count": 0, "raw": ""}
|
||
|
||
# Ring buffers for plotting
|
||
t_buf = []
|
||
ax_buf = []
|
||
ay_buf = []
|
||
az_buf = []
|
||
mag_buf = []
|
||
|
||
# ---------------- BLE ----------------
|
||
|
||
def parse_csv_triplet(s: str):
|
||
s = s.strip().strip('"')
|
||
parts = s.split(",")
|
||
if len(parts) != 3:
|
||
return None
|
||
try:
|
||
return float(parts[0]), float(parts[1]), float(parts[2])
|
||
except ValueError:
|
||
return None
|
||
|
||
def notify_handler(_sender: int, data: bytearray):
|
||
s = data.decode(errors="ignore").strip()
|
||
latest["raw"] = s
|
||
v = parse_csv_triplet(s)
|
||
if v is None:
|
||
return
|
||
latest["ax"], latest["ay"], latest["az"] = v
|
||
latest["ts"] = time.time()
|
||
latest["count"] += 1
|
||
|
||
def roll_pitch_from_accel(ax, ay, az):
|
||
roll = math.degrees(math.atan2(ay, az))
|
||
pitch = math.degrees(math.atan2(-ax, math.sqrt(ay * ay + az * az)))
|
||
return roll, pitch
|
||
|
||
async def ble_loop():
|
||
while True:
|
||
try:
|
||
print(f"Connecting to {ADDRESS} ...")
|
||
async with BleakClient(ADDRESS, timeout=15.0) as client:
|
||
if not client.is_connected:
|
||
raise RuntimeError("Connect failed.")
|
||
print("Connected. Enabling notifications...")
|
||
await client.start_notify(CHAR_UUID, notify_handler)
|
||
print("Notify enabled. Streaming...\n")
|
||
while True:
|
||
await asyncio.sleep(1.0)
|
||
except Exception as e:
|
||
print(f"[BLE] Error/disconnect: {e}")
|
||
print("Reconnecting in 2 seconds...\n")
|
||
await asyncio.sleep(2.0)
|
||
|
||
def start_ble_thread():
|
||
t = threading.Thread(target=lambda: asyncio.run(ble_loop()), daemon=True)
|
||
t.start()
|
||
|
||
# ---------------- 3D helpers ----------------
|
||
|
||
def make_cuboid(size=(2.0, 1.2, 0.1)):
|
||
L, W, T = size
|
||
x, y, z = L/2, W/2, T/2
|
||
verts = np.array([
|
||
[-x,-y,-z], [ x,-y,-z], [ x, y,-z], [-x, y,-z],
|
||
[-x,-y, z], [ x,-y, z], [ x, y, z], [-x, y, z],
|
||
], dtype=float)
|
||
faces = np.array([
|
||
[0,1,2],[0,2,3],
|
||
[4,6,5],[4,7,6],
|
||
[0,4,5],[0,5,1],
|
||
[1,5,6],[1,6,2],
|
||
[2,6,7],[2,7,3],
|
||
[3,7,4],[3,4,0],
|
||
], dtype=int)
|
||
return verts, faces
|
||
|
||
# ---------------- UI ----------------
|
||
|
||
class MainWindow(QtWidgets.QMainWindow):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.setWindowTitle("ADXL345 – 3D PCB + Live Graph + Shake Meter (BLE)")
|
||
|
||
central = QtWidgets.QWidget()
|
||
self.setCentralWidget(central)
|
||
outer = QtWidgets.QVBoxLayout(central)
|
||
outer.setContentsMargins(6, 6, 6, 6)
|
||
outer.setSpacing(6)
|
||
|
||
# ---- 3D view ----
|
||
self.view3d = gl.GLViewWidget()
|
||
self.view3d.setMinimumHeight(420)
|
||
self.view3d.setCameraPosition(distance=6, elevation=25, azimuth=40)
|
||
outer.addWidget(self.view3d, stretch=3)
|
||
|
||
grid = gl.GLGridItem()
|
||
grid.setSize(10, 10)
|
||
grid.setSpacing(1, 1)
|
||
self.view3d.addItem(grid)
|
||
|
||
axis = gl.GLAxisItem()
|
||
axis.setSize(2, 2, 2)
|
||
self.view3d.addItem(axis)
|
||
|
||
verts, faces = make_cuboid()
|
||
colors = np.ones((faces.shape[0], 4), dtype=float)
|
||
colors[:,0]=0.25; colors[:,1]=0.7; colors[:,2]=0.35; colors[:,3]=0.9
|
||
|
||
meshdata = gl.MeshData(vertexes=verts, faces=faces)
|
||
self.mesh = gl.GLMeshItem(meshdata=meshdata, faceColors=colors,
|
||
drawEdges=True, edgeColor=(0,0,0,1))
|
||
self.view3d.addItem(self.mesh)
|
||
|
||
self.vec = gl.GLLinePlotItem(pos=np.array([[0,0,0],[0,0,1]]),
|
||
width=3, antialias=True)
|
||
self.view3d.addItem(self.vec)
|
||
|
||
# ---- Status label ----
|
||
self.status = QtWidgets.QLabel()
|
||
self.status.setStyleSheet("color:white; background:rgba(0,0,0,140); padding:6px;")
|
||
outer.addWidget(self.status)
|
||
|
||
# ---- Bottom area: graph + shake meter (right) ----
|
||
bottom = QtWidgets.QHBoxLayout()
|
||
bottom.setSpacing(10)
|
||
outer.addLayout(bottom, stretch=2)
|
||
|
||
# Graph
|
||
self.plot = pg.PlotWidget()
|
||
self.plot.setMinimumHeight(230)
|
||
self.plot.showGrid(x=True, y=True, alpha=0.25)
|
||
self.plot.setLabel("left", "Acceleration", units="g")
|
||
self.plot.setLabel("bottom", "Time", units="s")
|
||
self.plot.setYRange(-MAX_G, MAX_G)
|
||
self.plot.addLegend(offset=(10, 10))
|
||
|
||
bottom.addWidget(self.plot, stretch=5)
|
||
|
||
# Curves
|
||
self.curve_ax = self.plot.plot([], [], name="ax", pen=pg.mkPen('r', width=2))
|
||
self.curve_ay = self.plot.plot([], [], name="ay", pen=pg.mkPen('g', width=2))
|
||
self.curve_az = self.plot.plot([], [], name="az", pen=pg.mkPen('b', width=2))
|
||
self.curve_mag = self.plot.plot([], [], name="|a|", pen=pg.mkPen('y', width=2))
|
||
|
||
# Threshold lines (horizontal) for magnitude
|
||
self.warn_line = pg.InfiniteLine(pos=+WARN_MAG_G, angle=0, pen=pg.mkPen((255,165,0), width=2, style=QtCore.Qt.DashLine))
|
||
self.shake_line = pg.InfiniteLine(pos=+SHAKE_MAG_G, angle=0, pen=pg.mkPen((255, 60, 60), width=2, style=QtCore.Qt.DashLine))
|
||
self.plot.addItem(self.warn_line)
|
||
self.plot.addItem(self.shake_line)
|
||
|
||
# Shake meter panel (right)
|
||
panel = QtWidgets.QVBoxLayout()
|
||
panel.setSpacing(6)
|
||
bottom.addLayout(panel, stretch=1)
|
||
|
||
self.meter_title = QtWidgets.QLabel("Shake magnitude")
|
||
self.meter_title.setStyleSheet("font-weight: 600;")
|
||
panel.addWidget(self.meter_title)
|
||
|
||
self.meter_bar = QtWidgets.QProgressBar()
|
||
self.meter_bar.setRange(0, 100)
|
||
self.meter_bar.setValue(0)
|
||
self.meter_bar.setFormat("%p%")
|
||
self.meter_bar.setTextVisible(True)
|
||
self.meter_bar.setMinimumWidth(220)
|
||
self.meter_bar.setMinimumHeight(28)
|
||
panel.addWidget(self.meter_bar)
|
||
|
||
self.meter_readout = QtWidgets.QLabel("|a| = 0.000 g\nΔ=0.000 g")
|
||
panel.addWidget(self.meter_readout)
|
||
|
||
self.indicators = QtWidgets.QLabel("CALM")
|
||
self.indicators.setAlignment(QtCore.Qt.AlignCenter)
|
||
self.indicators.setStyleSheet(
|
||
"font-weight: 700; font-size: 18px; padding: 10px; "
|
||
"border: 2px solid #555; border-radius: 10px;"
|
||
)
|
||
panel.addWidget(self.indicators)
|
||
|
||
panel.addStretch(1)
|
||
|
||
# ---- State ----
|
||
self.roll = 0.0
|
||
self.pitch = 0.0
|
||
self.t0 = time.time()
|
||
|
||
# ---- Timer ----
|
||
self.timer = QtCore.QTimer()
|
||
self.timer.timeout.connect(self.update_ui)
|
||
self.timer.start(UI_UPDATE_MS)
|
||
|
||
def set_indicator_state(self, state: str):
|
||
# Simple color changes via stylesheet
|
||
if state == "CALM":
|
||
self.indicators.setText("CALM")
|
||
self.indicators.setStyleSheet(
|
||
"font-weight: 700; font-size: 18px; padding: 10px;"
|
||
"border: 2px solid #555; border-radius: 10px;"
|
||
"background: rgba(80,80,80,120); color: white;"
|
||
)
|
||
elif state == "WARN":
|
||
self.indicators.setText("WARN")
|
||
self.indicators.setStyleSheet(
|
||
"font-weight: 700; font-size: 18px; padding: 10px;"
|
||
"border: 2px solid #a86a00; border-radius: 10px;"
|
||
"background: rgba(255,165,0,130); color: black;"
|
||
)
|
||
else: # SHAKE
|
||
self.indicators.setText("SHAKE!")
|
||
self.indicators.setStyleSheet(
|
||
"font-weight: 800; font-size: 20px; padding: 10px;"
|
||
"border: 2px solid #b00000; border-radius: 10px;"
|
||
"background: rgba(255,60,60,180); color: white;"
|
||
)
|
||
|
||
def update_ui(self):
|
||
axg, ayg, azg = latest["ax"], latest["ay"], latest["az"]
|
||
|
||
# Magnitude (g)
|
||
mag = math.sqrt(axg*axg + ayg*ayg + azg*azg)
|
||
delta = abs(mag - 1.0)
|
||
|
||
# Map delta to 0..100%
|
||
pct = int(max(0.0, min(1.0, delta / max(1e-6, DELTA_G_FULL_SCALE))) * 100.0)
|
||
self.meter_bar.setValue(pct)
|
||
self.meter_readout.setText(f"|a| = {mag:.3f} g\nΔ = {delta:.3f} g")
|
||
|
||
# Threshold indicators
|
||
if mag >= SHAKE_MAG_G:
|
||
self.set_indicator_state("SHAKE")
|
||
elif mag >= WARN_MAG_G:
|
||
self.set_indicator_state("WARN")
|
||
else:
|
||
self.set_indicator_state("CALM")
|
||
|
||
# Add to history buffers
|
||
t = time.time() - self.t0
|
||
t_buf.append(t)
|
||
ax_buf.append(axg)
|
||
ay_buf.append(ayg)
|
||
az_buf.append(azg)
|
||
mag_buf.append(mag)
|
||
|
||
# Trim buffers
|
||
while t_buf and (t - t_buf[0]) > HISTORY_SECONDS:
|
||
t_buf.pop(0)
|
||
ax_buf.pop(0)
|
||
ay_buf.pop(0)
|
||
az_buf.pop(0)
|
||
mag_buf.pop(0)
|
||
|
||
# Update plot
|
||
self.curve_ax.setData(t_buf, ax_buf)
|
||
self.curve_ay.setData(t_buf, ay_buf)
|
||
self.curve_az.setData(t_buf, az_buf)
|
||
self.curve_mag.setData(t_buf, mag_buf)
|
||
|
||
if t_buf:
|
||
self.plot.setXRange(max(0, t_buf[-1] - HISTORY_SECONDS), t_buf[-1])
|
||
|
||
# Update 3D tilt (smoothed)
|
||
roll, pitch = roll_pitch_from_accel(axg, ayg, azg)
|
||
alpha = 0.15
|
||
self.roll = (1 - alpha) * self.roll + alpha * roll
|
||
self.pitch = (1 - alpha) * self.pitch + alpha * pitch
|
||
|
||
self.mesh.resetTransform()
|
||
self.mesh.rotate(self.pitch, 0, 1, 0)
|
||
self.mesh.rotate(self.roll, 1, 0, 0)
|
||
|
||
# Vector line
|
||
self.vec.setData(pos=np.array([[0, 0, 0], [axg, ayg, azg]], dtype=float))
|
||
|
||
# Top status text
|
||
age = (time.time() - latest["ts"]) * 1000 if latest["ts"] else 0
|
||
self.status.setText(
|
||
f"Packets: {latest['count']} Age: {age:.0f} ms\n"
|
||
f"ax={axg:+.3f}g ay={ayg:+.3f}g az={azg:+.3f}g |a|={mag:.3f}g\n"
|
||
f"roll={self.roll:+.1f}° pitch={self.pitch:+.1f}°"
|
||
)
|
||
|
||
def main():
|
||
print("IMPORTANT: Disconnect nRF Connect (or any other BLE client) first.")
|
||
start_ble_thread()
|
||
|
||
app = QtWidgets.QApplication.instance()
|
||
if app is None:
|
||
app = QtWidgets.QApplication(sys.argv)
|
||
|
||
win = MainWindow()
|
||
win.resize(1100, 820)
|
||
win.show()
|
||
sys.exit(app.exec_())
|
||
|
||
if __name__ == "__main__":
|
||
main()
|