import glob import sys import time from serial import * class ControllerRefined: def __init__(self, controller_id: int, programmator_id: int): self.id = controller_id self.prog_id = programmator_id self.ser = Serial() self.packet_id = 0 self.checksum = 0 self.buffer = bytearray() self.read_buffer = bytearray() self.readDop = False # READ MODE BUG FIX TODO def connect(self, port: str) -> None: self.ser = Serial( port, baudrate=250_000, bytesize=8, parity='N', stopbits=2, timeout=0, xonxoff=False, rtscts=False ) print(f'Connected to {port}') def close(self) -> None: self.ser.close() print(f'Closed connection to {self.ser.port}') def writeBytes(self, bytesToWrite: bytes) -> None: for b in bytesToWrite: self.buffer.append(b) self.checksum += b def readBytes(self, cnt: int) -> bytes: buf = self.ser.read(cnt) for b in buf: self.read_buffer.append(b) return buf def flush(self) -> None: self.ser.write(self.buffer) print(self.buffer) self.buffer.clear() self.checksum = 0 print(f'flush, checksum = {self.checksum}') def validateChecksum(self, checksum: int) -> bool: s = 0 for b in self.read_buffer: s += b print(f'Checksum calculated: {s}, required: {checksum}') return checksum == s def sendCondition(self) -> None: self.ser.baudrate = 250_000 / 5 self.ser.write(b'\x00') self.ser.baudrate = 250_000 def sendPacketHead(self, packetLen: int) -> None: self.writeBytes(b'\xCC\x01') self.writeBytes(packetLen.to_bytes(1, byteorder='big')) self.writeBytes(self.id.to_bytes(6, byteorder='big')) self.writeBytes(self.prog_id.to_bytes(6, byteorder='big')) self.writeBytes(self.packet_id.to_bytes(1, byteorder='big')) self.writeBytes(b'\x01') self.writeBytes(bytes(3)) def waitForStartBytes(self) -> bool: x = 0 counter = 10 while counter > 0: prev = x time.sleep(0.05) x = self.readBytes(1) if x == prev: counter = counter - 1 print(f'x: {x}, prev: {prev}') if prev == b'\xCC' and x == b'\x01': break if counter == 0: print('No data!') return False return True def sendRequest(self, command_code: int, target: tuple, data: list) -> None: self.sendCondition() self.sendPacketHead( 1 + # Packet len 6 * 2 + # Controller id + programmator id 1 + # Send number 4 + # Ox10 0x00 0x00 0x00 Bytes 4 + # Set + request code(2) + value count len(data) # One byte per value (if no data present converts to 0)vgTt + 2 # Checksum ) self.writeBytes(command_code.to_bytes(1, byteorder='big')) self.writeBytes(target[0].to_bytes(1, byteorder='big')) self.writeBytes(target[1].to_bytes(1, byteorder='big')) self.writeBytes(len(data).to_bytes(1, byteorder='big')) for value in data: self.writeBytes(value.to_bytes(1, byteorder='big')) self.writeBytes(self.checksum.to_bytes(2, byteorder='big')) self.flush() def readData(self) -> list: self.read_buffer.clear() if not self.waitForStartBytes(): return [] self.readBytes(21) valueCount = self.readBytes(1) values = [] for _ in range(valueCount[0]): values.append(self.readBytes(1)[0]) if self.readDop: # READ MODE BUG FIX TODO values.append(self.readBytes(1)[0]) print(f'Read bytes: {self.read_buffer}') chsum = int.from_bytes(self.ser.read(2), byteorder='big') if not self.validateChecksum(chsum): print('Invalid checksum!') return [] return values class ControllerInterface: def __init__(self, controller: ControllerRefined): self.controller = controller def connect(self, port: str) -> None: self.controller.connect(port) def close(self) -> None: self.controller.close() def sendPacket(self, command_code: int, target: tuple, data: list) -> bool: try: self.controller.sendRequest(command_code, target, data) time.sleep(0.002) self.controller.sendRequest(command_code, target, data) time.sleep(1) self.controller.sendRequest(command_code, target, data) time.sleep(0.002) self.controller.sendRequest(command_code, target, data) self.controller.packet_id = (self.controller.packet_id + 1) % 255 return True except SerialException as e: print(e) return False def sendPacketWithResponse(self, command_code: int, target: tuple, data: list, throw_if_error=False) -> list: try: self.controller.sendRequest(command_code, target, data) self.controller.packet_id = (self.controller.packet_id + 1) % 255 return self.controller.readData() except SerialException as e: print(e) if throw_if_error: raise e return [] c_codes = { "set": 0x30, "get": 0x20 } targets = { "currents set": (0xBE, 0xEE), "currents get": (0xBE, 0xEF), "powers": (0xBE, 0x10), "address": (0xBF, 0x20), "device info": (0xBF, 0x00), "device mode set": (0xBF, 0x10), "device mode get": (0xBF, 0x11), ##### OLD # "currents set": (0xBE, 0xEE), # "currents get": (0xBE, 0xEF), # "powers set": (0xBE, 0x10), # "powers get": (0xBE, 0x11), # "address": (0xBF, 0x20), # "device info": (0xBF, 0x00), # "device mode set": (0xBF, 0x10), # "device mode get": (0xBF, 0x11), } # Currents # def writeCurrents(self, currents: list) -> bool: return self.sendPacket(self.c_codes['set'], self.targets['currents set'], currents) def readCurrents(self) -> list: return self.sendPacketWithResponse(self.c_codes['get'], self.targets['currents get'], []) # DMX Addresses # def writeAddress(self, address: int) -> bool: try: self.sendPacketWithResponse(self.c_codes['set'], self.targets['address'], [address], True) return True except SerialException as e: return False def readAddress(self) -> int: data = self.sendPacketWithResponse(self.c_codes['get'], self.targets['address'], []) if not data: return -1 return data[0] # Device info # def readDeviceInfo(self) -> tuple: data = self.sendPacketWithResponse(self.c_codes['get'], self.targets['device info'], []) if not data: return () data = data[10:] # cut out first ten bytes mode_footprint = int.from_bytes(data[0:2], byteorder='big') mode_number = data[2] mode_count = data[3] address = int.from_bytes(data[4:6], byteorder='big') return mode_footprint, mode_number, mode_count, address # Modes # def writeMode(self, mode_number: int) -> bool: try: self.sendPacketWithResponse(self.c_codes['set'], self.targets['device mode set'], [mode_number], True) return True except SerialException as e: return False def readMode(self, mode_index: int) -> tuple: # self.controller.readDop = True # READ MODE BUG FIX TODO data = self.sendPacketWithResponse(self.c_codes['get'], self.targets['device mode get'], [mode_index + 1]) # self.controller.readDop = False # READ MODE BUG FIX TODO print('Read bytes:') for i, b in enumerate(self.controller.read_buffer): print(f'{i}: {b.to_bytes(1).hex()}') if not data: return () data = data[1:] # cut out first byte mode_footprint = int.from_bytes(data[1:3], byteorder='big') mode_name = bytes(data[3:14]).decode() return mode_footprint, mode_name def readModes(self) -> list: data = self.sendPacketWithResponse(self.c_codes['get'], self.targets['device info'], []) if not data: return [] data = data[10:] # cut out first ten bytes mode_footprint = int.from_bytes(data[0:2], byteorder='big') mode_number = data[2] mode_count = data[3] address = int.from_bytes(data[4:6], byteorder='big') modes = [] for i in range(mode_count): mode = self.readMode(i) if not mode: return [] modes.append(mode) time.sleep(0.05) i = 0 for m in modes: i = i + 1 print(f'Mode {i}: {m}') return modes # Powers # def writePowers(self, currents: list, voltages: list, power: int, koeffs: list, a_max: int) -> bool: data = currents + voltages + [power] + koeffs + [a_max] return self.sendPacket(self.c_codes['set'], self.targets['powers'], data) def readPowers(self) -> list: return self.sendPacketWithResponse(self.c_codes['get'], self.targets['powers'], []) # source: https://stackoverflow.com/a/14224477 @staticmethod def readComPorts() -> list: ports = [] if sys.platform.startswith('win'): ports = ['COM%s' % (i + 1) for i in range(256)] elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'): ports = glob.glob('/dev/tty[A-Za-z]*') result = [] for port in ports: try: s = Serial(port) s.close() result.append(port) except (OSError, SerialException): pass return result