smartplug_AWOX/smartplug.py

172 lines
6.5 KiB
Python

import asyncio
import binascii
import struct
import sys
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.exc import BleakDeviceNotFoundError
from asyncio import Queue
START_OF_MESSAGE = b'\x0f'
END_OF_MESSAGE = b'\xff\xff'
# UUIDs spécifiques pour la prise Awox SMP-B16
PLUG_CMD_CHAR_UUID = "0000fff3-0000-1000-8000-00805f9b34fb"
PLUG_NOTIFY_CHAR_UUID = "0000fff4-0000-1000-8000-00805f9b34fb"
PLUG_NAME_CHAR_UUID = "0000fff6-0000-1000-8000-00805f9b34fb"
notification_queue = Queue() # Queue for handling notifications
buffer = bytearray() # Buffer for receiving notifications
def calculate_checksum(buffer):
checksum = 0
for byte in buffer:
checksum += byte
return (checksum + 1) & 0xFF
def add_checksum(command):
checksum = calculate_checksum(command)
return command + bytes([checksum])
def handle_data():
"""Processes data received from the smart plug."""
global buffer
if len(buffer) < 8: # Minimum length needed to unpack the expected data
print(f"[ERROR] Insufficient data received: {len(buffer)} bytes")
return
# Unpack state, power, and voltage
try:
if buffer[0:3] == b'\x0f\x0f\x04':
# Unpack data (state, dummy, power, voltage)
state, _, power, voltage = struct.unpack_from(">?BIB", buffer, offset=4)
power /= 1000.0 # Convert power from milliwatts to watts
# Print extracted data
print(f"State: {'on' if state else 'off'}, Power: {power:.3f} W, Voltage: {voltage} V")
elif buffer[0:5] == b'\x0f\x04\x03\x00\x00':
print(f"State: change")
else:
print("[ERROR] Unexpected data format.")
except struct.error as e:
print(f"[ERROR] Error unpacking data: {e}")
# Clear the buffer after processing
buffer.clear()
def notification_handler(characteristic: BleakGATTCharacteristic, data: bytearray):
"""Handler for receiving notifications from the smart plug."""
global buffer
print(f"[NOTIFICATION] Data received from {characteristic.uuid}: {binascii.hexlify(data)}")
# Append the received data to the buffer
buffer.extend(data)
# Process the data when enough bytes are in the buffer
handle_data()
async def scan_for_devices():
"""Scan for available SMP-B16 devices and display them in a formatted table."""
print("[DEBUG] Starting Bluetooth device scan... (5 seconds)")
devices = await BleakScanner.discover(timeout=5.0)
found_devices = []
for device in devices:
if device.name is not None and ("SMP-B16-FR" in device.name or "SMP-B16-GR" in device.name):
found_devices.append((device.name, device.address))
if not found_devices:
print("[DEBUG] No SMP-B16 devices found.")
else:
print(f"{'Device Name':<20}{'MAC Address':<20}")
print("-" * 40)
for name, address in found_devices:
print(f"{name:<20}{address:<20}")
print("[DEBUG] Scan completed.")
async def connect_and_write(mac_address, cmd_uuid, cmd_data, notify_uuid=None):
"""Connect to the plug and send command, optionally handle notifications."""
print(f"[DEBUG] Connecting to {mac_address}")
try:
async with BleakClient(mac_address,timeout=5.0) as client:
await client.connect()
print(f"[DEBUG] Connected to {mac_address}")
# Enable notifications if a notify UUID is provided
if notify_uuid:
await client.start_notify(notify_uuid, notification_handler)
print(f"[DEBUG] Notifications enabled for {notify_uuid}")
# Add checksum and write data to the characteristic
command_with_checksum = START_OF_MESSAGE + struct.pack("B", len(cmd_data) + 1) + cmd_data + struct.pack("B", calculate_checksum(cmd_data)) + END_OF_MESSAGE
print(f"[DEBUG] Sending command with checksum: {binascii.hexlify(command_with_checksum)}")
await client.write_gatt_char(cmd_uuid, command_with_checksum)
# If notifications are enabled, wait for the queue to return data
if notify_uuid:
await asyncio.sleep(5) # Adjust sleep duration to allow time for notifications
await client.stop_notify(notify_uuid)
print(f"[DEBUG] Notifications disabled for {notify_uuid}")
print(f"[DEBUG] Disconnected from {mac_address}")
except BleakDeviceNotFoundError:
print(f"[ERROR] Device with address {mac_address} not found.")
except Exception as e:
print(f"[ERROR] An error occurred: {str(e)}")
async def get_status(mac_address):
"""Get the status of the smart plug."""
print("[DEBUG] Getting status of the smart plug...")
await connect_and_write(mac_address, PLUG_CMD_CHAR_UUID, binascii.unhexlify('04000000'), notify_uuid=PLUG_NOTIFY_CHAR_UUID)
async def turn_on(mac_address):
"""Turn on the smart plug."""
print("[DEBUG] Turning on the smart plug...")
await connect_and_write(mac_address, PLUG_CMD_CHAR_UUID, binascii.unhexlify('0300010000'), notify_uuid=PLUG_NOTIFY_CHAR_UUID)
async def turn_off(mac_address):
"""Turn off the smart plug."""
print("[DEBUG] Turning off the smart plug...")
await connect_and_write(mac_address, PLUG_CMD_CHAR_UUID, binascii.unhexlify('0300000000'), notify_uuid=PLUG_NOTIFY_CHAR_UUID)
async def get_name(mac_address):
"""Get the name of the smart plug."""
async with BleakClient(mac_address) as client:
name_data = await client.read_gatt_char(PLUG_NAME_CHAR_UUID)
name = name_data.decode('iso-8859-1')
print(f"Smart plug name: {name}")
async def main():
if len(sys.argv) < 2:
print("Usage : python smartplug.py <MAC_ADDRESS> <command>")
print("Commands: on, off, state, conso, name, scan")
sys.exit(1)
command = sys.argv[1].lower()
if command == "scan":
await scan_for_devices()
else:
if len(sys.argv) < 3:
print("Usage: python smartplug.py <MAC_ADDRESS> <command>")
sys.exit(1)
mac_address = sys.argv[1]
command = sys.argv[2].lower()
if command == "on":
await turn_on(mac_address)
elif command == "off":
await turn_off(mac_address)
elif command == "state":
await get_status(mac_address)
elif command == "name":
await get_name(mac_address)
else:
print(f"Unknown command: {command}")
if __name__ == "__main__":
asyncio.run(main())