2024-11-27 22:51:07 +03:00

474 lines
14 KiB
Python

import glob
import sys
import time
from serial import *
class Controller:
def __init__(self, controller_id: int, programmator_id: int):
self.id = controller_id
self.prog_id = programmator_id
self.ser = Serial()
self.packet_id = 0
self.checksum = 0
self.buffer = bytearray()
self.read_buffer = bytearray()
def connect(self, port: str) -> None:
self.ser = Serial(
port, baudrate=250_000,
bytesize=8, parity='N', stopbits=2, timeout=0,
xonxoff=False, rtscts=False
)
print(f'Connected to {port}')
def close(self) -> None:
self.ser.close()
print(f'Closed connection to {self.ser.port}')
def writeBytes(self, bytesToWrite: bytes) -> None:
for b in bytesToWrite:
self.buffer.append(b)
self.checksum += b
def readBytes(self, cnt: int) -> bytes:
buf = self.ser.read(cnt)
for b in buf:
self.read_buffer.append(b)
return buf
def flush(self) -> None:
self.ser.write(self.buffer)
print(self.buffer)
self.buffer.clear()
self.checksum = 0
def validateChecksum(self, checksum: int) -> bool:
s = 0
for b in self.read_buffer:
s += b
print(f'Checksum calculated: {s}, required: {checksum}')
return checksum == s
def sendCondition(self) -> None:
self.ser.baudrate = 250_000 / 5
self.ser.write(b'\x00')
self.ser.baudrate = 250_000
def sendPacketHead(self, packetLen: int) -> None:
self.writeBytes(b'\xCC\x01')
self.writeBytes(packetLen.to_bytes(1, byteorder='big'))
self.writeBytes(self.id.to_bytes(6, byteorder='big'))
self.writeBytes(self.prog_id.to_bytes(6, byteorder='big'))
self.writeBytes(self.packet_id.to_bytes(1, byteorder='big'))
self.writeBytes(b'\x01')
self.writeBytes(bytes(3))
def waitForStartBytes(self) -> bool:
x = 0
counter = 10
while counter > 0:
prev = x
time.sleep(0.05)
x = self.readBytes(1)
if x == prev:
counter = counter - 1
print(f'x: {x}, prev: {prev}')
if prev == b'\xCC' and x == b'\x01':
break
if counter == 0:
print('No data!')
return False
return True
# CURRENTS #
def sendWriteCurrentsPacket(self, currents: list) -> None:
self.sendCondition()
self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + len(currents) + 2)
self.writeBytes(b'\x30\xBE\xEE')
self.writeBytes(len(currents).to_bytes(1, byteorder='big'))
for current in currents:
self.writeBytes(current.to_bytes(1, byteorder='big'))
self.writeBytes(self.checksum.to_bytes(2, byteorder='big'))
self.flush()
def sendReadCurrentsPacket(self) -> None:
self.sendCondition()
self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 2)
self.writeBytes(b'\x20\xBE\xEF\x00')
self.writeBytes(self.checksum.to_bytes(2, byteorder='big'))
self.flush()
def readCurrentsPacket(self) -> list:
self.read_buffer.clear()
if not self.waitForStartBytes():
return []
self.readBytes(21)
valueCount = self.readBytes(1)
values = []
for _ in range(valueCount[0]):
values.append(self.readBytes(1)[0])
chsum = int.from_bytes(self.ser.read(2), byteorder='big')
if not self.validateChecksum(chsum):
print('Invalid checksum!')
return []
return values
# DMX addresses #
def sendWriteAddressPacket(self, address: int) -> None:
self.sendCondition()
self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 2 + 2)
self.writeBytes(b'\x30\xBF\x20\x02')
self.writeBytes(address.to_bytes(2, byteorder='big'))
self.writeBytes(self.checksum.to_bytes(2, byteorder='big'))
self.flush()
def readWriteAddressPacket(self) -> bool:
self.read_buffer.clear()
if not self.waitForStartBytes():
return False
self.readBytes(22)
chsum = int.from_bytes(self.ser.read(2), byteorder='big')
if not self.validateChecksum(chsum):
print('Invalid checksum!')
return False
return True
def sendReadAddressPacket(self) -> None:
self.sendCondition()
self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 2)
self.writeBytes(b'\x20\xBF\x20\x00')
self.writeBytes(self.checksum.to_bytes(2, byteorder='big'))
self.flush()
def readAddressPacket(self) -> int:
self.read_buffer.clear()
if not self.waitForStartBytes():
return -1
self.readBytes(22)
address = int.from_bytes(self.readBytes(2), byteorder='big')
chsum = int.from_bytes(self.ser.read(2), byteorder='big')
if not self.validateChecksum(chsum):
print('Invalid checksum!')
return -1
return address
# Device info #
def sendReadDevInfoPacket(self) -> None:
self.sendCondition()
self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 2)
self.writeBytes(b'\x20\xBF\x00\x00')
self.writeBytes(self.checksum.to_bytes(2, byteorder='big'))
self.flush()
def readDevInfoPacket(self) -> tuple:
self.read_buffer.clear()
if not self.waitForStartBytes():
return ()
self.readBytes(32)
mode_footprint = int.from_bytes(self.readBytes(2), byteorder='big')
mode_number = int.from_bytes(self.readBytes(1), byteorder='big')
mode_count = int.from_bytes(self.readBytes(1), byteorder='big')
address = int.from_bytes(self.readBytes(2), byteorder='big')
self.readBytes(3)
print(f'Read bytes: {self.read_buffer}')
chsum = int.from_bytes(self.ser.read(2), byteorder='big')
if not self.validateChecksum(chsum):
print('Invalid checksum!')
return ()
return mode_footprint, mode_number, mode_count, address
# Modes #
def sendWriteModePacket(self, mode_number: int) -> None:
self.sendCondition()
self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 1 + 2)
self.writeBytes(b'\x30\xBF\x10\x01')
self.writeBytes(mode_number.to_bytes(1, byteorder='big'))
self.writeBytes(self.checksum.to_bytes(2, byteorder='big'))
self.flush()
def readWriteModePacket(self) -> bool:
self.read_buffer.clear()
if not self.waitForStartBytes():
return False
self.readBytes(22)
chsum = int.from_bytes(self.ser.read(2), byteorder='big')
if not self.validateChecksum(chsum):
print('Invalid checksum!')
return False
return True
def sendReadModeInfoPacket(self, mode_number: int) -> None:
self.sendCondition()
self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 1 + 2)
self.writeBytes(b'\x20\xBF\x11\x01')
self.writeBytes(mode_number.to_bytes(1, byteorder='big'))
self.writeBytes(self.checksum.to_bytes(2, byteorder='big'))
self.flush()
def readModeInfoPacket(self) -> tuple:
self.read_buffer.clear()
if not self.waitForStartBytes():
return ()
self.readBytes(23)
mode_footprint = int.from_bytes(self.readBytes(2), byteorder='big')
mode_name = self.readBytes(12).decode()
print(f'Read bytes: {self.read_buffer}')
chsum = int.from_bytes(self.ser.read(2), byteorder='big')
if not self.validateChecksum(chsum):
print('Invalid checksum!')
return ()
return mode_footprint, mode_name
# Power #
def sendWritePowerPacket(self, powers: list) -> None:
self.sendCondition()
self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + len(powers) + 2)
self.writeBytes(b'\x30\xBE\x11')
self.writeBytes(len(powers).to_bytes(1, byteorder='big'))
for data in powers:
self.writeBytes(data.to_bytes(1, byteorder='big'))
self.writeBytes(self.checksum.to_bytes(2, byteorder='big'))
self.flush()
def sendReadPowerPacket(self) -> None:
self.sendCondition()
self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 2)
self.writeBytes(b'\x20\xBE\x10\x00')
self.writeBytes(self.checksum.to_bytes(2, byteorder='big'))
self.flush()
def readPowerPacket(self) -> list:
self.read_buffer.clear()
if not self.waitForStartBytes():
return []
self.readBytes(21)
valueCount = self.readBytes(1)
values = []
for _ in range(valueCount[0]):
values.append(self.readBytes(1)[0])
chsum = int.from_bytes(self.ser.read(2), byteorder='big')
if not self.validateChecksum(chsum):
print('Invalid checksum!')
return []
return values
class ControllerInterface:
def __init__(self, controller: Controller):
self.controller = controller
def connect(self, port: str) -> None:
self.controller.connect(port)
def close(self) -> None:
self.controller.close()
# Currents #
def writeCurrents(self, currents: list) -> bool:
try:
self.controller.sendWriteCurrentsPacket(currents)
time.sleep(0.002)
self.controller.sendWriteCurrentsPacket(currents)
time.sleep(1)
self.controller.sendWriteCurrentsPacket(currents)
time.sleep(0.002)
self.controller.sendWriteCurrentsPacket(currents)
self.controller.packet_id = self.controller.packet_id + 1
return True
except SerialException as e:
print(e)
return False
def readCurrents(self) -> list:
try:
self.controller.sendReadCurrentsPacket()
self.controller.packet_id = self.controller.packet_id + 1
return self.controller.readCurrentsPacket()
except SerialException as e:
print(e)
return []
# DMX Addresses #
def writeAddress(self, address: int) -> bool:
try:
self.controller.sendWriteAddressPacket(address)
self.controller.packet_id = self.controller.packet_id + 1
self.controller.readWriteAddressPacket()
return True
except SerialException as e:
print(e)
return False
def readAddress(self) -> int:
try:
self.controller.sendReadAddressPacket()
self.controller.packet_id = self.controller.packet_id + 1
return self.controller.readAddressPacket()
except SerialException as e:
print(e)
return -1
# Device info #
def readDeviceInfo(self) -> tuple:
try:
self.controller.sendReadDevInfoPacket()
self.controller.packet_id = self.controller.packet_id + 1
return self.controller.readDevInfoPacket()
except SerialException as e:
print(e)
return ()
# Modes #
def writeMode(self, mode_number: int) -> bool:
try:
self.controller.sendWriteModePacket(mode_number)
self.controller.readWriteModePacket()
self.controller.packet_id = self.controller.packet_id + 1
return True
except SerialException as e:
print(e)
return False
def readMode(self, mode_index: int) -> tuple:
try:
self.controller.sendReadModeInfoPacket(mode_index + 1)
return self.controller.readModeInfoPacket()
except SerialException as e:
print(e)
return ()
def readModes(self) -> list:
try:
self.controller.sendReadDevInfoPacket()
self.controller.packet_id = self.controller.packet_id + 1
info = self.controller.readDevInfoPacket()
if not info:
return []
mode_footprint, mode_number, mode_count, address = info
modes = []
for i in range(mode_count):
mode = self.readMode(i)
if not mode:
return []
modes.append(mode)
time.sleep(0.05)
i = 0
for m in modes:
i = i + 1
print(f'Mode {i}: {m}')
return modes
except SerialException as e:
print(e)
return []
# Powers #
def writePowers(self, currents: list, voltages: list, power: int, koeffs: list, a_max: int) -> bool:
try:
data = currents + voltages + [power] + koeffs + [a_max]
print(f'Data: {data}')
self.controller.sendWritePowerPacket(data)
time.sleep(0.002)
self.controller.sendWritePowerPacket(data)
time.sleep(1)
self.controller.sendWritePowerPacket(data)
time.sleep(0.002)
self.controller.sendWritePowerPacket(data)
self.controller.packet_id = self.controller.packet_id + 1
return True
except SerialException as e:
print(e)
return False
def readPowers(self) -> list:
try:
self.controller.sendReadPowerPacket()
self.controller.packet_id = self.controller.packet_id + 1
return self.controller.readPowerPacket()
except SerialException as e:
print(e)
return []
# source: https://stackoverflow.com/a/14224477
@staticmethod
def readComPorts() -> list:
ports = []
if sys.platform.startswith('win'):
ports = ['COM%s' % (i + 1) for i in range(256)]
elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
ports = glob.glob('/dev/tty[A-Za-z]*')
result = []
for port in ports:
try:
s = Serial(port)
s.close()
result.append(port)
except (OSError, SerialException):
pass
return result