First commit

This commit is contained in:
Nicolas 2024-10-23 09:36:21 +02:00
commit 675fca88b8
2 changed files with 224 additions and 0 deletions

53
README.md Normal file
View File

@ -0,0 +1,53 @@
# SmartPlug AWOX
## Presentation
Fondée en 2003, la société AwoX est un pure-player des technologies et des objets connectés dédiés à lunivers du Smart Home (Maison intelligente). Le groupe est un acteur majeur de léclairage connecté, avec la plus large gamme dampoules LED intelligentes et daccessoires connectés pour la maison, et de laudio connecté multi-room, ces deux univers constituant les segments les plus dynamiques du marché du Smart Home.
Soutenue par Innovacom dès 2005, AwoX fut en 2014 la première société sur les objets connectés côtée en Bourse en Europe.
[AWOX](https://play-lh.googleusercontent.com/EEnSfuFX9gIVBCiI8lwxkJFgvo7Zv_-imTmbdIsEu6hvIVoPMkQdBTr1C01_4wbUisEc)
## Logiciel
Le script ecrit en Python avec la librairie Bleak permet d'allumer ou d'eteindre une prise commande en bluetooth. Cela fonctionne sous MacOS (devrait fonctionner sur autres systemes) avec les prises SMP-B16-FR et SMP-B16-GR.
## Installation de l'environnement
Installation de la librairie Bluetooth Bleak
```
python3 -m venv venv
source ./venv/bin/activate
pip install bleak
```
## Lancement de l'application
Scanner les appareils
```
user % source venv/bin/activate
user % python3 smartplug.py scan
[DEBUG] Starting Bluetooth device scan... (5 seconds)
Device Name MAC Address
----------------------------------------
SMP-B16-GR EADD44B9-E19E-50A6-CED1-FB458857E3E5
```
Allumer une prise
```
user % source venv/bin/activate
user % python3 smartplug.py EADD44B9-E19E-50A6-CED1-FB458857E3E5 on
[DEBUG] Turning on the smart plug...
[DEBUG] Connecting to EADD44B9-E19E-50A6-CED1-FB458857E3E5
[ERROR] Device with address EADD44B9-E19E-50A6-CED1-FB458857E3E5 not found.
```
Eteindre une prise
```
user % source venv/bin/activate
user % python3 smartplug.py EADD44B9-E19E-50A6-CED1-FB458857E3E5 off
[DEBUG] Turning off the smart plug...
[DEBUG] Connecting to EADD44B9-E19E-50A6-CED1-FB458857E3E5
[ERROR] Device with address EADD44B9-E19E-50A6-CED1-FB458857E3E5 not found.
```

171
smartplug.py Normal file
View File

@ -0,0 +1,171 @@
import asyncio
import binascii
import struct
import sys
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.exc import BleakDeviceNotFoundError
from asyncio import Queue
START_OF_MESSAGE = b'\x0f'
END_OF_MESSAGE = b'\xff\xff'
# UUIDs spécifiques pour la prise Awox SMP-B16
PLUG_CMD_CHAR_UUID = "0000fff3-0000-1000-8000-00805f9b34fb"
PLUG_NOTIFY_CHAR_UUID = "0000fff4-0000-1000-8000-00805f9b34fb"
PLUG_NAME_CHAR_UUID = "0000fff6-0000-1000-8000-00805f9b34fb"
notification_queue = Queue() # Queue for handling notifications
buffer = bytearray() # Buffer for receiving notifications
def calculate_checksum(buffer):
checksum = 0
for byte in buffer:
checksum += byte
return (checksum + 1) & 0xFF
def add_checksum(command):
checksum = calculate_checksum(command)
return command + bytes([checksum])
def handle_data():
"""Processes data received from the smart plug."""
global buffer
if len(buffer) < 8: # Minimum length needed to unpack the expected data
print(f"[ERROR] Insufficient data received: {len(buffer)} bytes")
return
# Unpack state, power, and voltage
try:
if buffer[0:3] == b'\x0f\x0f\x04':
# Unpack data (state, dummy, power, voltage)
state, _, power, voltage = struct.unpack_from(">?BIB", buffer, offset=4)
power /= 1000.0 # Convert power from milliwatts to watts
# Print extracted data
print(f"State: {'on' if state else 'off'}, Power: {power:.3f} W, Voltage: {voltage} V")
elif buffer[0:5] == b'\x0f\x04\x03\x00\x00':
print(f"State: change")
else:
print("[ERROR] Unexpected data format.")
except struct.error as e:
print(f"[ERROR] Error unpacking data: {e}")
# Clear the buffer after processing
buffer.clear()
def notification_handler(characteristic: BleakGATTCharacteristic, data: bytearray):
"""Handler for receiving notifications from the smart plug."""
global buffer
print(f"[NOTIFICATION] Data received from {characteristic.uuid}: {binascii.hexlify(data)}")
# Append the received data to the buffer
buffer.extend(data)
# Process the data when enough bytes are in the buffer
handle_data()
async def scan_for_devices():
"""Scan for available SMP-B16 devices and display them in a formatted table."""
print("[DEBUG] Starting Bluetooth device scan... (5 seconds)")
devices = await BleakScanner.discover(timeout=5.0)
found_devices = []
for device in devices:
if device.name is not None and ("SMP-B16-FR" in device.name or "SMP-B16-GR" in device.name):
found_devices.append((device.name, device.address))
if not found_devices:
print("[DEBUG] No SMP-B16 devices found.")
else:
print(f"{'Device Name':<20}{'MAC Address':<20}")
print("-" * 40)
for name, address in found_devices:
print(f"{name:<20}{address:<20}")
print("[DEBUG] Scan completed.")
async def connect_and_write(mac_address, cmd_uuid, cmd_data, notify_uuid=None):
"""Connect to the plug and send command, optionally handle notifications."""
print(f"[DEBUG] Connecting to {mac_address}")
try:
async with BleakClient(mac_address,timeout=5.0) as client:
await client.connect()
print(f"[DEBUG] Connected to {mac_address}")
# Enable notifications if a notify UUID is provided
if notify_uuid:
await client.start_notify(notify_uuid, notification_handler)
print(f"[DEBUG] Notifications enabled for {notify_uuid}")
# Add checksum and write data to the characteristic
command_with_checksum = START_OF_MESSAGE + struct.pack("B", len(cmd_data) + 1) + cmd_data + struct.pack("B", calculate_checksum(cmd_data)) + END_OF_MESSAGE
print(f"[DEBUG] Sending command with checksum: {binascii.hexlify(command_with_checksum)}")
await client.write_gatt_char(cmd_uuid, command_with_checksum)
# If notifications are enabled, wait for the queue to return data
if notify_uuid:
await asyncio.sleep(5) # Adjust sleep duration to allow time for notifications
await client.stop_notify(notify_uuid)
print(f"[DEBUG] Notifications disabled for {notify_uuid}")
print(f"[DEBUG] Disconnected from {mac_address}")
except BleakDeviceNotFoundError:
print(f"[ERROR] Device with address {mac_address} not found.")
except Exception as e:
print(f"[ERROR] An error occurred: {str(e)}")
async def get_status(mac_address):
"""Get the status of the smart plug."""
print("[DEBUG] Getting status of the smart plug...")
await connect_and_write(mac_address, PLUG_CMD_CHAR_UUID, binascii.unhexlify('04000000'), notify_uuid=PLUG_NOTIFY_CHAR_UUID)
async def turn_on(mac_address):
"""Turn on the smart plug."""
print("[DEBUG] Turning on the smart plug...")
await connect_and_write(mac_address, PLUG_CMD_CHAR_UUID, binascii.unhexlify('0300010000'), notify_uuid=PLUG_NOTIFY_CHAR_UUID)
async def turn_off(mac_address):
"""Turn off the smart plug."""
print("[DEBUG] Turning off the smart plug...")
await connect_and_write(mac_address, PLUG_CMD_CHAR_UUID, binascii.unhexlify('0300000000'), notify_uuid=PLUG_NOTIFY_CHAR_UUID)
async def get_name(mac_address):
"""Get the name of the smart plug."""
async with BleakClient(mac_address) as client:
name_data = await client.read_gatt_char(PLUG_NAME_CHAR_UUID)
name = name_data.decode('iso-8859-1')
print(f"Smart plug name: {name}")
async def main():
if len(sys.argv) < 2:
print("Usage : python smartplug.py <MAC_ADDRESS> <command>")
print("Commands: on, off, state, conso, name, scan")
sys.exit(1)
command = sys.argv[1].lower()
if command == "scan":
await scan_for_devices()
else:
if len(sys.argv) < 3:
print("Usage: python smartplug.py <MAC_ADDRESS> <command>")
sys.exit(1)
mac_address = sys.argv[1]
command = sys.argv[2].lower()
if command == "on":
await turn_on(mac_address)
elif command == "off":
await turn_off(mac_address)
elif command == "state":
await get_status(mac_address)
elif command == "name":
await get_name(mac_address)
else:
print(f"Unknown command: {command}")
if __name__ == "__main__":
asyncio.run(main())