import glob import sys import time from serial import * class Controller: def __init__(self, controller_id: int, programmator_id: int): self.id = controller_id self.prog_id = programmator_id self.ser = Serial() self.packet_id = 0 self.checksum = 0 self.buffer = bytearray() self.read_buffer = bytearray() def connect(self, port: str) -> None: self.ser = Serial( port, baudrate=250_000, bytesize=8, parity='N', stopbits=2, timeout=0, xonxoff=False, rtscts=False ) print(f'Connected to {port}') def close(self) -> None: self.ser.close() print(f'Closed connection to {self.ser.port}') def writeBytes(self, bytesToWrite: bytes) -> None: for b in bytesToWrite: self.buffer.append(b) self.checksum += b def readBytes(self, cnt: int) -> bytes: buf = self.ser.read(cnt) for b in buf: self.read_buffer.append(b) return buf def flush(self) -> None: self.ser.write(self.buffer) print(self.buffer) self.buffer.clear() self.checksum = 0 def validateChecksum(self, checksum: int) -> bool: s = 0 for b in self.read_buffer: s += b print(f'Checksum calculated: {s}, required: {checksum}') return checksum == s def sendCondition(self) -> None: self.ser.baudrate = 250_000 / 5 self.ser.write(b'\x00') self.ser.baudrate = 250_000 def sendPacketHead(self, packetLen: int) -> None: self.writeBytes(b'\xCC\x01') self.writeBytes(packetLen.to_bytes(1, byteorder='big')) self.writeBytes(self.id.to_bytes(6, byteorder='big')) self.writeBytes(self.prog_id.to_bytes(6, byteorder='big')) self.writeBytes(self.packet_id.to_bytes(1, byteorder='big')) self.writeBytes(b'\x01') self.writeBytes(bytes(3)) def waitForStartBytes(self) -> bool: x = 0 counter = 10 while counter > 0: prev = x time.sleep(0.05) x = self.readBytes(1) if x == prev: counter = counter - 1 print(f'x: {x}, prev: {prev}') if prev == b'\xCC' and x == b'\x01': break if counter == 0: print('No data!') return False return True # CURRENTS # def sendWriteCurrentsPacket(self, currents: list) -> None: self.sendCondition() self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + len(currents) + 2) self.writeBytes(b'\x30\xBE\xEE') self.writeBytes(len(currents).to_bytes(1, byteorder='big')) for current in currents: self.writeBytes(current.to_bytes(1, byteorder='big')) self.writeBytes(self.checksum.to_bytes(2, byteorder='big')) self.flush() def sendReadCurrentsPacket(self) -> None: self.sendCondition() self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 2) self.writeBytes(b'\x20\xBE\xEF\x00') self.writeBytes(self.checksum.to_bytes(2, byteorder='big')) self.flush() def readCurrentsPacket(self) -> list: self.read_buffer.clear() if not self.waitForStartBytes(): return [] self.readBytes(21) valueCount = self.readBytes(1) values = [] for _ in range(valueCount[0]): values.append(self.readBytes(1)[0]) chsum = int.from_bytes(self.ser.read(2), byteorder='big') if not self.validateChecksum(chsum): print('Invalid checksum!') return [] return values # DMX addresses # def sendWriteAddressPacket(self, address: int) -> None: self.sendCondition() self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 2 + 2) self.writeBytes(b'\x30\xBF\x20\x02') self.writeBytes(address.to_bytes(2, byteorder='big')) self.writeBytes(self.checksum.to_bytes(2, byteorder='big')) self.flush() def readWriteAddressPacket(self) -> bool: self.read_buffer.clear() if not self.waitForStartBytes(): return False self.readBytes(22) chsum = int.from_bytes(self.ser.read(2), byteorder='big') if not self.validateChecksum(chsum): print('Invalid checksum!') return False return True def sendReadAddressPacket(self) -> None: self.sendCondition() self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 2) self.writeBytes(b'\x20\xBF\x20\x00') self.writeBytes(self.checksum.to_bytes(2, byteorder='big')) self.flush() def readAddressPacket(self) -> int: self.read_buffer.clear() if not self.waitForStartBytes(): return -1 self.readBytes(22) address = int.from_bytes(self.readBytes(2), byteorder='big') chsum = int.from_bytes(self.ser.read(2), byteorder='big') if not self.validateChecksum(chsum): print('Invalid checksum!') return -1 return address # Device info # def sendReadDevInfoPacket(self) -> None: self.sendCondition() self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 2) self.writeBytes(b'\x20\xBF\x00\x00') self.writeBytes(self.checksum.to_bytes(2, byteorder='big')) self.flush() def readDevInfoPacket(self) -> tuple: self.read_buffer.clear() if not self.waitForStartBytes(): return () self.readBytes(32) mode_footprint = int.from_bytes(self.readBytes(2), byteorder='big') mode_number = int.from_bytes(self.readBytes(1), byteorder='big') mode_count = int.from_bytes(self.readBytes(1), byteorder='big') address = int.from_bytes(self.readBytes(2), byteorder='big') self.readBytes(3) print(f'Read bytes: {self.read_buffer}') chsum = int.from_bytes(self.ser.read(2), byteorder='big') if not self.validateChecksum(chsum): print('Invalid checksum!') return () return mode_footprint, mode_number, mode_count, address # Modes # def sendWriteModePacket(self, mode_number: int) -> None: self.sendCondition() self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 1 + 2) self.writeBytes(b'\x30\xBF\x10\x01') self.writeBytes(mode_number.to_bytes(1, byteorder='big')) self.writeBytes(self.checksum.to_bytes(2, byteorder='big')) self.flush() def readWriteModePacket(self) -> bool: self.read_buffer.clear() if not self.waitForStartBytes(): return False self.readBytes(22) chsum = int.from_bytes(self.ser.read(2), byteorder='big') if not self.validateChecksum(chsum): print('Invalid checksum!') return False return True def sendReadModeInfoPacket(self, mode_number: int) -> None: self.sendCondition() self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 1 + 2) self.writeBytes(b'\x20\xBF\x11\x01') self.writeBytes(mode_number.to_bytes(1, byteorder='big')) self.writeBytes(self.checksum.to_bytes(2, byteorder='big')) self.flush() def readModeInfoPacket(self) -> tuple: self.read_buffer.clear() if not self.waitForStartBytes(): return () self.readBytes(23) mode_footprint = int.from_bytes(self.readBytes(2), byteorder='big') mode_name = self.readBytes(12).decode() print(f'Read bytes: {self.read_buffer}') chsum = int.from_bytes(self.ser.read(2), byteorder='big') if not self.validateChecksum(chsum): print('Invalid checksum!') return () return mode_footprint, mode_name # Power # def sendWritePowerPacket(self, powers: list) -> None: self.sendCondition() self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + len(powers) + 2) self.writeBytes(b'\x30\xBE\x11') self.writeBytes(len(powers).to_bytes(1, byteorder='big')) for data in powers: self.writeBytes(data.to_bytes(1, byteorder='big')) self.writeBytes(self.checksum.to_bytes(2, byteorder='big')) self.flush() def sendReadPowerPacket(self) -> None: self.sendCondition() self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 2) self.writeBytes(b'\x20\xBE\x10\x00') self.writeBytes(self.checksum.to_bytes(2, byteorder='big')) self.flush() def readPowerPacket(self) -> list: self.read_buffer.clear() if not self.waitForStartBytes(): return [] self.readBytes(21) valueCount = self.readBytes(1) values = [] for _ in range(valueCount[0]): values.append(self.readBytes(1)[0]) chsum = int.from_bytes(self.ser.read(2), byteorder='big') if not self.validateChecksum(chsum): print('Invalid checksum!') return [] return values class ControllerInterface: def __init__(self, controller: Controller): self.controller = controller def connect(self, port: str) -> None: self.controller.connect(port) def close(self) -> None: self.controller.close() # Currents # def writeCurrents(self, currents: list) -> bool: try: self.controller.sendWriteCurrentsPacket(currents) time.sleep(0.002) self.controller.sendWriteCurrentsPacket(currents) time.sleep(1) self.controller.sendWriteCurrentsPacket(currents) time.sleep(0.002) self.controller.sendWriteCurrentsPacket(currents) self.controller.packet_id = self.controller.packet_id + 1 return True except SerialException as e: print(e) return False def readCurrents(self) -> list: try: self.controller.sendReadCurrentsPacket() self.controller.packet_id = self.controller.packet_id + 1 return self.controller.readCurrentsPacket() except SerialException as e: print(e) return [] # DMX Addresses # def writeAddress(self, address: int) -> bool: try: self.controller.sendWriteAddressPacket(address) self.controller.packet_id = self.controller.packet_id + 1 self.controller.readWriteAddressPacket() return True except SerialException as e: print(e) return False def readAddress(self) -> int: try: self.controller.sendReadAddressPacket() self.controller.packet_id = self.controller.packet_id + 1 return self.controller.readAddressPacket() except SerialException as e: print(e) return -1 # Device info # def readDeviceInfo(self) -> tuple: try: self.controller.sendReadDevInfoPacket() self.controller.packet_id = self.controller.packet_id + 1 return self.controller.readDevInfoPacket() except SerialException as e: print(e) return () # Modes # def writeMode(self, mode_number: int) -> bool: try: self.controller.sendWriteModePacket(mode_number) self.controller.readWriteModePacket() self.controller.packet_id = self.controller.packet_id + 1 return True except SerialException as e: print(e) return False def readMode(self, mode_index: int) -> tuple: try: self.controller.sendReadModeInfoPacket(mode_index + 1) return self.controller.readModeInfoPacket() except SerialException as e: print(e) return () def readModes(self) -> list: try: self.controller.sendReadDevInfoPacket() self.controller.packet_id = self.controller.packet_id + 1 info = self.controller.readDevInfoPacket() if not info: return [] mode_footprint, mode_number, mode_count, address = info modes = [] for i in range(mode_count): mode = self.readMode(i) if not mode: return [] modes.append(mode) time.sleep(0.05) i = 0 for m in modes: i = i + 1 print(f'Mode {i}: {m}') return modes except SerialException as e: print(e) return [] # Powers # def writePowers(self, currents: list, voltages: list, power: int, koeffs: list, a_max: int) -> bool: try: data = currents + voltages + [power] + koeffs + [a_max] print(f'Data: {data}') self.controller.sendWritePowerPacket(data) time.sleep(0.002) self.controller.sendWritePowerPacket(data) time.sleep(1) self.controller.sendWritePowerPacket(data) time.sleep(0.002) self.controller.sendWritePowerPacket(data) self.controller.packet_id = self.controller.packet_id + 1 return True except SerialException as e: print(e) return False def readPowers(self) -> list: try: self.controller.sendReadPowerPacket() self.controller.packet_id = self.controller.packet_id + 1 return self.controller.readPowerPacket() except SerialException as e: print(e) return [] # source: https://stackoverflow.com/a/14224477 @staticmethod def readComPorts() -> list: ports = [] if sys.platform.startswith('win'): ports = ['COM%s' % (i + 1) for i in range(256)] elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'): ports = glob.glob('/dev/tty[A-Za-z]*') result = [] for port in ports: try: s = Serial(port) s.close() result.append(port) except (OSError, SerialException): pass return result