smartplug_AWOX/smartplug.py

172 lines
6.5 KiB
Python
Raw Normal View History

2024-10-23 09:36:21 +02:00
import asyncio
import binascii
import struct
import sys
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.exc import BleakDeviceNotFoundError
from asyncio import Queue
START_OF_MESSAGE = b'\x0f'
END_OF_MESSAGE = b'\xff\xff'
# UUIDs spécifiques pour la prise Awox SMP-B16
PLUG_CMD_CHAR_UUID = "0000fff3-0000-1000-8000-00805f9b34fb"
PLUG_NOTIFY_CHAR_UUID = "0000fff4-0000-1000-8000-00805f9b34fb"
PLUG_NAME_CHAR_UUID = "0000fff6-0000-1000-8000-00805f9b34fb"
notification_queue = Queue() # Queue for handling notifications
buffer = bytearray() # Buffer for receiving notifications
def calculate_checksum(buffer):
checksum = 0
for byte in buffer:
checksum += byte
return (checksum + 1) & 0xFF
def add_checksum(command):
checksum = calculate_checksum(command)
return command + bytes([checksum])
def handle_data():
"""Processes data received from the smart plug."""
global buffer
if len(buffer) < 8: # Minimum length needed to unpack the expected data
print(f"[ERROR] Insufficient data received: {len(buffer)} bytes")
return
# Unpack state, power, and voltage
try:
if buffer[0:3] == b'\x0f\x0f\x04':
# Unpack data (state, dummy, power, voltage)
state, _, power, voltage = struct.unpack_from(">?BIB", buffer, offset=4)
power /= 1000.0 # Convert power from milliwatts to watts
# Print extracted data
print(f"State: {'on' if state else 'off'}, Power: {power:.3f} W, Voltage: {voltage} V")
elif buffer[0:5] == b'\x0f\x04\x03\x00\x00':
print(f"State: change")
else:
print("[ERROR] Unexpected data format.")
except struct.error as e:
print(f"[ERROR] Error unpacking data: {e}")
# Clear the buffer after processing
buffer.clear()
def notification_handler(characteristic: BleakGATTCharacteristic, data: bytearray):
"""Handler for receiving notifications from the smart plug."""
global buffer
print(f"[NOTIFICATION] Data received from {characteristic.uuid}: {binascii.hexlify(data)}")
# Append the received data to the buffer
buffer.extend(data)
# Process the data when enough bytes are in the buffer
handle_data()
async def scan_for_devices():
"""Scan for available SMP-B16 devices and display them in a formatted table."""
print("[DEBUG] Starting Bluetooth device scan... (5 seconds)")
devices = await BleakScanner.discover(timeout=5.0)
found_devices = []
for device in devices:
if device.name is not None and ("SMP-B16-FR" in device.name or "SMP-B16-GR" in device.name):
found_devices.append((device.name, device.address))
if not found_devices:
print("[DEBUG] No SMP-B16 devices found.")
else:
print(f"{'Device Name':<20}{'MAC Address':<20}")
print("-" * 40)
for name, address in found_devices:
print(f"{name:<20}{address:<20}")
print("[DEBUG] Scan completed.")
async def connect_and_write(mac_address, cmd_uuid, cmd_data, notify_uuid=None):
"""Connect to the plug and send command, optionally handle notifications."""
print(f"[DEBUG] Connecting to {mac_address}")
try:
async with BleakClient(mac_address,timeout=5.0) as client:
await client.connect()
print(f"[DEBUG] Connected to {mac_address}")
# Enable notifications if a notify UUID is provided
if notify_uuid:
await client.start_notify(notify_uuid, notification_handler)
print(f"[DEBUG] Notifications enabled for {notify_uuid}")
# Add checksum and write data to the characteristic
command_with_checksum = START_OF_MESSAGE + struct.pack("B", len(cmd_data) + 1) + cmd_data + struct.pack("B", calculate_checksum(cmd_data)) + END_OF_MESSAGE
print(f"[DEBUG] Sending command with checksum: {binascii.hexlify(command_with_checksum)}")
await client.write_gatt_char(cmd_uuid, command_with_checksum)
# If notifications are enabled, wait for the queue to return data
if notify_uuid:
await asyncio.sleep(5) # Adjust sleep duration to allow time for notifications
await client.stop_notify(notify_uuid)
print(f"[DEBUG] Notifications disabled for {notify_uuid}")
print(f"[DEBUG] Disconnected from {mac_address}")
except BleakDeviceNotFoundError:
print(f"[ERROR] Device with address {mac_address} not found.")
except Exception as e:
print(f"[ERROR] An error occurred: {str(e)}")
async def get_status(mac_address):
"""Get the status of the smart plug."""
print("[DEBUG] Getting status of the smart plug...")
await connect_and_write(mac_address, PLUG_CMD_CHAR_UUID, binascii.unhexlify('04000000'), notify_uuid=PLUG_NOTIFY_CHAR_UUID)
async def turn_on(mac_address):
"""Turn on the smart plug."""
print("[DEBUG] Turning on the smart plug...")
await connect_and_write(mac_address, PLUG_CMD_CHAR_UUID, binascii.unhexlify('0300010000'), notify_uuid=PLUG_NOTIFY_CHAR_UUID)
async def turn_off(mac_address):
"""Turn off the smart plug."""
print("[DEBUG] Turning off the smart plug...")
await connect_and_write(mac_address, PLUG_CMD_CHAR_UUID, binascii.unhexlify('0300000000'), notify_uuid=PLUG_NOTIFY_CHAR_UUID)
async def get_name(mac_address):
"""Get the name of the smart plug."""
async with BleakClient(mac_address) as client:
name_data = await client.read_gatt_char(PLUG_NAME_CHAR_UUID)
name = name_data.decode('iso-8859-1')
print(f"Smart plug name: {name}")
async def main():
if len(sys.argv) < 2:
print("Usage : python smartplug.py <MAC_ADDRESS> <command>")
print("Commands: on, off, state, conso, name, scan")
sys.exit(1)
command = sys.argv[1].lower()
if command == "scan":
await scan_for_devices()
else:
if len(sys.argv) < 3:
print("Usage: python smartplug.py <MAC_ADDRESS> <command>")
sys.exit(1)
mac_address = sys.argv[1]
command = sys.argv[2].lower()
if command == "on":
await turn_on(mac_address)
elif command == "off":
await turn_off(mac_address)
elif command == "state":
await get_status(mac_address)
elif command == "name":
await get_name(mac_address)
else:
print(f"Unknown command: {command}")
if __name__ == "__main__":
asyncio.run(main())