# --------------------------------------- # MicroPython ESP32 - Guardián Fénix FINAL # Enviromental(BME680) + ADXL345 # I am Guardian Fenix # Samuel Alexader # --------------------------------------- import network, socket, time,struct, math, ujson, gc from machine import Pin, I2C, PWM,UART import adxl345 import neopixel # ========================= # DFPLAYER (UART2) # ========================= DF_UART_ID = 2 DF_TX_PIN = 21 # ESP32 TX -> DFPlayer RX DF_RX_PIN = 20 # ESP32 RX <- DFPlayer TX (optional, but harmless) DF_BUSY_PIN = 47 # DFPlayer BUSY -> ESP32 GPIO (usually LOW when playing) DF_BAUD = 9600 _START = 0x7E _VER = 0xFF _LEN = 0x06 _ACK = 0x00 _END = 0xEF CMD_SET_VOLUME = 0x06 CMD_PLAY_TRACK = 0x03 CMD_PLAY_FOLDER = 0x0F CMD_STOP = 0x16 CMD_RESET = 0x0C def _checksum(cmd, p1, p2): s = _VER + _LEN + cmd + _ACK + p1 + p2 c = (0 - s) & 0xFFFF return (c >> 8) & 0xFF, c & 0xFF class DFPlayer: def __init__(self, uart: UART, busy_pin=None): self.uart = uart self.busy = Pin(busy_pin, Pin.IN, Pin.PULL_UP) if busy_pin is not None else None time.sleep_ms(800) def _send(self, cmd, param=0): p1 = (param >> 8) & 0xFF p2 = param & 0xFF ch, cl = _checksum(cmd, p1, p2) frame = bytes([_START, _VER, _LEN, cmd, _ACK, p1, p2, ch, cl, _END]) self.uart.write(frame) def reset(self): self._send(CMD_RESET, 0) time.sleep_ms(1200) def volume(self, level): level = max(0, min(30, int(level))) self._send(CMD_SET_VOLUME, level) def stop(self): self._send(CMD_STOP, 0) def play_track(self, track_num): self._send(CMD_PLAY_TRACK, int(track_num)) def play_folder_file(self, folder, file): folder = max(1, min(99, int(folder))) file = max(1, min(255, int(file))) self._send(CMD_PLAY_FOLDER, (folder << 8) | file) def is_playing(self): if self.busy is None: return None # Typical DFPlayer: BUSY LOW = playing, HIGH = idle return self.busy.value() == 0 def wait_done(self, timeout_ms=20000): """Wait for BUSY to go LOW (start) then HIGH (done).""" if self.busy is None: return False t0 = time.ticks_ms() # wait for playback to begin (BUSY goes LOW) while self.busy.value() == 1: if time.ticks_diff(time.ticks_ms(), t0) > 1000: return False time.sleep_ms(5) # wait for playback to finish (BUSY goes HIGH) while self.busy.value() == 0: if time.ticks_diff(time.ticks_ms(), t0) > timeout_ms: return False time.sleep_ms(10) return True df_uart = UART( DF_UART_ID, baudrate=DF_BAUD, bits=8, parity=None, stop=1, tx=Pin(DF_TX_PIN), rx=Pin(DF_RX_PIN), timeout=100 ) player = DFPlayer(df_uart, busy_pin=DF_BUSY_PIN) # ========================= # MODBUS RTU (RS485 UART1) # ========================= MB_UART_ID = 1 MB_TX_PIN = 45 MB_RX_PIN = 0 MB_BAUD = 9600 SLAVE_ID = 10 # RS485 direction pin (DE/RE tied together) RS485_DIR_PIN = 18 rs485_dir = Pin(RS485_DIR_PIN, Pin.OUT) rs485_dir.value(0) # receive mode mb_uart = UART( MB_UART_ID, baudrate=MB_BAUD, bits=8, parity=None, stop=1, tx=Pin(MB_TX_PIN), rx=Pin(MB_RX_PIN), timeout=200 ) def crc16(data): crc = 0xFFFF for b in data: crc ^= b for _ in range(8): crc = (crc >> 1) ^ 0xA001 if (crc & 1) else (crc >> 1) return crc def read_holding(start_reg, count): frame = bytearray(6) frame[0] = SLAVE_ID frame[1] = 3 # FC03 frame[2] = start_reg >> 8 frame[3] = start_reg & 0xFF frame[4] = count >> 8 frame[5] = count & 0xFF frame += struct.pack("h", struct.pack(">H", regs[4]))[0] / 100 hum = regs[5] / 100 pres = regs[6] / 10 # Indice de calidad del aire IAQ = regs[7] except: temp = hum = pres = 0 try: ax, ay, az = acc.read_g() vib = round(math.sqrt(ax*ax + ay*ay + az*az), 2) except: ax = ay = az = vib = 0 estado = "Normal" alerta_sensor = "" player.volume(20) # time.sleep_ms(50) # Alertas menos sensibles según estadísticas normales if vib > 1.6: estado = "ALERTA" alerta_sensor = "Vibración" elif temp > 26.7 or temp < 22: estado = "ALERTA" alerta_sensor = "Temperatura" elif hum < 30 or hum > 60: estado = "ALERTA" alerta_sensor = "Humedad" elif pres < 950 or pres > 1050: estado = "ALERTA" alerta_sensor = "Presión" elif IAQ > 100: estado = "ALERTA" alerta_sensor = "IAQ" _Volume = 15 # LED y ... if estado == "ALERTA": alarma_activa = True if alerta_sensor == "Vibración": actualizar_led_alerta('vib') player.volume(_Volume) time.sleep_ms(50) player.play_folder_file(2, 105) ok = player.wait_done(timeout_ms=25000) # time.sleep(1) elif alerta_sensor == "Temperatura": actualizar_led_alerta('temp') player.volume(_Volume) time.sleep_ms(50) player.play_folder_file(2, 105) ok = player.wait_done(timeout_ms=25000) # time.sleep(1) elif alerta_sensor == "Humedad": actualizar_led_alerta('hum') player.volume(_Volume) time.sleep_ms(50) player.play_folder_file(2, 105) ok = player.wait_done(timeout_ms=25000) # time.sleep(1) elif alerta_sensor == "Presión": actualizar_led_alerta('pres') player.volume(_Volume) time.sleep_ms(50) player.play_folder_file(2, 105) ok = player.wait_done(timeout_ms=25000) # time.sleep(1) elif alerta_sensor == "IAQ": actualizar_led_alerta('IAQ') player.volume(_Volume) time.sleep_ms(50) player.play_folder_file(2, 105) ok = player.wait_done(timeout_ms=25000) # time.sleep(1) else: if not alarma_activa: actualizar_led_alerta("normal") dato = { "temp": temp, "hum": hum, "pres": pres, "IAQ": IAQ, "vib": vib, "ax": round(ax,2), "ay": round(ay,2), "az": round(az,2), "estado": estado, "alarma": alarma_activa, "alerta_sensor": alerta_sensor } registros.append(dato) if len(registros) > MAX_REG: registros.pop(0) # ---------------------- # HTML con Canvas y alerta ambiental # ---------------------- def web_page(): return """ Guardián Fénix

🛡️ Guardián Fénix 🔥

🌿 Sensor Ambiental

Temp: -- °C
Hum: -- %
Pres: -- hPa
Alerta:

📳 Vibración

Shake: --
ax: --
ay: --
az: --
Estado: --
🚨 ALERTA!

📝 Registros manuales

#HoraTempHumPresShakeEstado

📊 Gráficas Canvas

Temperatura (°C)

Humedad (%)

Presión (hPa)

Vibración (g)

""" # ---------------------- # Servidor socket # ---------------------- addr = socket.getaddrinfo("0.0.0.0", 80)[0][-1] s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind(addr) s.listen(1) print("🌐 Servidor listo") def handle_client(cl): global alarma_activa try: req = cl.recv(1024).decode() if "GET /data" in req: cl.send("HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\r\n") cl.send(ujson.dumps(registros)) elif "GET /reset" in req: alarma_activa = False actualizar_led_alerta("normal") cl.send("HTTP/1.1 200 OK\r\n\r\nOK") else: cl.send("HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n") cl.send(web_page()) except Exception as e: print("❌ Error:", e) cl.close() # ---------------------- # Loop principal # ---------------------- last_read = 0 while True: if time.ticks_ms() - last_read > 0.5: leer_sensores() last_read = time.ticks_ms() print("Free: ", gc.mem_free()) print("Allocated: ", gc.mem_alloc()) try: cl, addr = s.accept() handle_client(cl) except Exception as e: print("❌ Socket error:", e)