130 lines
No EOL
4.1 KiB
Python
130 lines
No EOL
4.1 KiB
Python
# Rui Santos & Sara Santos - Random Nerd Tutorials
|
|
# Complete project details at https://RandomNerdTutorials.com/micropython-esp32-bluetooth-low-energy-ble/
|
|
|
|
from micropython import const
|
|
import asyncio
|
|
import aioble
|
|
import bluetooth
|
|
import struct
|
|
from machine import Pin
|
|
from random import randint
|
|
import machine, neopixel
|
|
|
|
n = 1
|
|
p = 8
|
|
|
|
np = neopixel.NeoPixel(machine.Pin(p), n)
|
|
|
|
np[0] = (255, 0, 0)
|
|
np.write()
|
|
|
|
# Init random value
|
|
value = 0
|
|
|
|
# See the following for generating UUIDs:
|
|
# https://www.uuidgenerator.net/
|
|
_BLE_SERVICE_UUID = bluetooth.UUID('19b10000-e8f2-537e-4f6c-d104768a1214')
|
|
_BLE_SENSOR_CHAR_UUID = bluetooth.UUID('19b10001-e8f2-537e-4f6c-d104768a1214')
|
|
_BLE_LED_UUID = bluetooth.UUID('19b10002-e8f2-537e-4f6c-d104768a1214')
|
|
# How frequently to send advertising beacons.
|
|
_ADV_INTERVAL_MS = 250_000
|
|
|
|
# Register GATT server, the service and characteristics
|
|
ble_service = aioble.Service(_BLE_SERVICE_UUID)
|
|
sensor_characteristic = aioble.Characteristic(ble_service, _BLE_SENSOR_CHAR_UUID, read=True, notify=True)
|
|
led_characteristic = aioble.Characteristic(ble_service, _BLE_LED_UUID, read=True, write=True, notify=True, capture=True)
|
|
|
|
# Register service(s)
|
|
aioble.register_services(ble_service)
|
|
|
|
# Helper to encode the data characteristic UTF-8
|
|
def _encode_data(data):
|
|
return str(data).encode('utf-8')
|
|
|
|
# Helper to decode the LED characteristic encoding (bytes).
|
|
def _decode_data(data):
|
|
try:
|
|
if data is not None:
|
|
# Decode the UTF-8 data
|
|
number = int.from_bytes(data, 'big')
|
|
return number
|
|
except Exception as e:
|
|
print("Error decoding temperature:", e)
|
|
return None
|
|
|
|
# Get sensor readings
|
|
def get_random_value():
|
|
return randint(0,100)
|
|
|
|
# Get new value and update characteristic
|
|
async def sensor_task():
|
|
while True:
|
|
value = get_random_value()
|
|
sensor_characteristic.write(_encode_data(value), send_update=True)
|
|
print('New random value written: ', value)
|
|
await asyncio.sleep_ms(1000)
|
|
|
|
# Serially wait for connections. Don't advertise while a central is connected.
|
|
async def peripheral_task():
|
|
while True:
|
|
try:
|
|
async with await aioble.advertise(
|
|
_ADV_INTERVAL_MS,
|
|
name="ESP32",
|
|
services=[_BLE_SERVICE_UUID],
|
|
) as connection:
|
|
print("Connection from", connection.device)
|
|
await connection.disconnected()
|
|
except asyncio.CancelledError:
|
|
# Catch the CancelledError
|
|
print("Peripheral task cancelled")
|
|
except Exception as e:
|
|
print("Error in peripheral_task:", e)
|
|
finally:
|
|
# Ensure the loop continues to the next iteration
|
|
await asyncio.sleep_ms(100)
|
|
|
|
async def wait_for_write():
|
|
while True:
|
|
try:
|
|
connection, data = await led_characteristic.written()
|
|
print(data)
|
|
print(type)
|
|
data = _decode_data(data)
|
|
print('Connection: ', connection)
|
|
print('Data: ', data)
|
|
if data == 0:
|
|
print('Turning LED OFF')
|
|
np[0] = (0, 0, 0)
|
|
np.write()
|
|
elif data == 1:
|
|
print('Turning LED RED')
|
|
np[0] = (255, 0, 0)
|
|
np.write()
|
|
elif data == 2:
|
|
print('Turning LED GREEN')
|
|
np[0] = (0, 255, 0)
|
|
np.write()
|
|
elif data == 3:
|
|
print('Turning LED BLUE')
|
|
np[0] = (0, 0, 255)
|
|
np.write()
|
|
else:
|
|
print('Unknown command')
|
|
except asyncio.CancelledError:
|
|
# Catch the CancelledError
|
|
print("Peripheral task cancelled")
|
|
except Exception as e:
|
|
print("Error in peripheral_task:", e)
|
|
finally:
|
|
# Ensure the loop continues to the next iteration
|
|
await asyncio.sleep_ms(100)
|
|
|
|
# Run tasks
|
|
async def main():
|
|
t1 = asyncio.create_task(sensor_task())
|
|
t2 = asyncio.create_task(peripheral_task())
|
|
t3 = asyncio.create_task(wait_for_write())
|
|
await asyncio.gather(t1, t2)
|
|
|
|
asyncio.run(main()) |