Controller_powers/controller/controller_reifined.py

319 lines
10 KiB
Python

import glob
import sys
import time
from logging import info, debug, error
from serial import *
class ControllerRefined:
def __init__(self, controller_id: int, programmator_id: int):
self.id = controller_id
self.prog_id = programmator_id
self.ser = Serial()
self.packet_id = 0
self.checksum = 0
self.buffer = bytearray()
self.read_buffer = bytearray()
self.readDop = False # READ MODE BUG FIX TODO
def connect(self, port: str) -> None:
self.ser = Serial(
port, baudrate=250_000,
bytesize=8, parity='N', stopbits=2, timeout=0,
xonxoff=False, rtscts=False
)
info(f'Connected to {port}')
def close(self) -> None:
self.ser.close()
info(f'Closed connection to {self.ser.port}')
def writeBytes(self, bytesToWrite: bytes) -> None:
for b in bytesToWrite:
self.buffer.append(b)
self.checksum += b
def readBytes(self, cnt: int) -> bytes:
buf = self.ser.read(cnt)
for b in buf:
self.read_buffer.append(b)
return buf
def flush(self) -> None:
self.ser.write(self.buffer)
debug("|".join(hex(b) for b in self.buffer))
self.buffer.clear()
self.checksum = 0
debug(f'flush, checksum = {self.checksum}')
def validateChecksum(self, checksum: int) -> bool:
s = 0
for b in self.read_buffer:
s += b
debug(f'Checksum calculated: {s}, required: {checksum}')
return checksum == s
def sendCondition(self) -> None:
self.ser.baudrate = 250_000 / 5
self.ser.write(b'\x00')
self.ser.baudrate = 250_000
def sendPacketHead(self, packetLen: int) -> None:
self.writeBytes(b'\xCC\x01')
self.writeBytes(packetLen.to_bytes(1, byteorder='big'))
self.writeBytes(self.id.to_bytes(6, byteorder='big'))
self.writeBytes(self.prog_id.to_bytes(6, byteorder='big'))
self.writeBytes(self.packet_id.to_bytes(1, byteorder='big'))
self.writeBytes(b'\x01')
self.writeBytes(bytes(3))
def waitForStartBytes(self) -> bool:
wait_for = 20
x = 0
counter = wait_for
while counter > 0:
prev = x
time.sleep(0.05)
x = self.readBytes(1)
if x == prev:
counter = counter - 1
debug(f'x: {x}, prev: {prev}')
if prev == b'\xCC' and x == b'\x01':
debug(f'Got start bytes after {wait_for - counter} reads')
break
if counter == 0:
error('No data!')
return False
return True
def sendRequest(self, command_code: int, target: tuple, data: list) -> None:
self.sendCondition()
self.sendPacketHead(
1 + # Packet len
6 * 2 + # Controller id + programmator id
1 + # Send number
4 + # Ox10 0x00 0x00 0x00 Bytes
4 + # Set + request code(2) + value count
len(data) # One byte per value (if no data present converts to 0)
+ 2 # Checksum
)
self.writeBytes(command_code.to_bytes(1, byteorder='big'))
self.writeBytes(target[0].to_bytes(1, byteorder='big'))
self.writeBytes(target[1].to_bytes(1, byteorder='big'))
self.writeBytes(len(data).to_bytes(1, byteorder='big'))
for value in data:
debug(value)
if type(value) is bytes:
self.writeBytes(value)
else:
self.writeBytes(value.to_bytes(1, byteorder='big'))
self.writeBytes(self.checksum.to_bytes(2, byteorder='big'))
self.flush()
def readData(self) -> list:
self.read_buffer.clear()
if not self.waitForStartBytes():
return []
self.readBytes(21)
valueCount = self.readBytes(1)
values = []
for _ in range(valueCount[0]):
values.append(self.readBytes(1)[0])
if self.readDop: # READ MODE BUG FIX TODO
values.append(self.readBytes(1)[0])
debug('Read bytes: ' + "|".join(hex(b) for b in self.read_buffer))
chsum = int.from_bytes(self.ser.read(2), byteorder='big')
if not self.validateChecksum(chsum):
error('Invalid checksum!')
return []
return values
class ControllerInterface:
def __init__(self, controller: ControllerRefined):
self.controller = controller
def connect(self, port: str) -> None:
self.controller.connect(port)
def close(self) -> None:
self.controller.close()
def sendPacket(self, command_code: int, target: tuple, data: list) -> bool:
try:
self.controller.sendRequest(command_code, target, data)
time.sleep(0.002)
self.controller.sendRequest(command_code, target, data)
time.sleep(1)
self.controller.sendRequest(command_code, target, data)
time.sleep(0.002)
self.controller.sendRequest(command_code, target, data)
self.controller.packet_id = (self.controller.packet_id + 1) % 255
return True
except SerialException or IndexError as e:
error(e)
return False
def sendPacketWithResponse(self, command_code: int, target: tuple, data: list, throw_if_error=False) -> list:
try:
self.controller.sendRequest(command_code, target, data)
self.controller.packet_id = (self.controller.packet_id + 1) % 255
return self.controller.readData()
except SerialException or IndexError as e:
error(e)
if throw_if_error:
raise e
return []
c_codes = {
"set": 0x30,
"get": 0x20
}
targets = {
"currents set": (0xBE, 0xEE),
"currents get": (0xBE, 0xEF),
"powers": (0xBE, 0x10),
"address": (0xBF, 0x20),
"device info": (0xBF, 0x00),
"device mode set": (0xBF, 0x10),
"device mode get": (0xBF, 0x11),
##### OLD
# "currents set": (0xBE, 0xEE),
# "currents get": (0xBE, 0xEF),
# "powers set": (0xBE, 0x10),
# "powers get": (0xBE, 0x11),
# "address": (0xBF, 0x20),
# "device info": (0xBF, 0x00),
# "device mode set": (0xBF, 0x10),
# "device mode get": (0xBF, 0x11),
}
# Currents #
def writeCurrents(self, currents: list) -> bool:
return self.sendPacket(self.c_codes['set'], self.targets['currents set'], currents)
def readCurrents(self) -> list:
return self.sendPacketWithResponse(self.c_codes['get'], self.targets['currents get'], [])
# DMX Addresses #
def writeAddress(self, address: int) -> bool:
try:
self.sendPacketWithResponse(self.c_codes['set'], self.targets['address'], list(address.to_bytes(2,byteorder='big')), True)
return True
except SerialException as e:
return False
def readAddress(self) -> int:
data = self.sendPacketWithResponse(self.c_codes['get'], self.targets['address'], [])
if not data:
return -1
return int.from_bytes(data, byteorder='big')
# Device info #
def readDeviceInfo(self) -> tuple:
data = self.sendPacketWithResponse(self.c_codes['get'], self.targets['device info'], [])
if not data:
return ()
data = data[10:] # cut out first ten bytes
mode_footprint = int.from_bytes(data[0:2], byteorder='big')
mode_number = data[2]
mode_count = data[3]
address = int.from_bytes(data[4:6], byteorder='big')
return mode_footprint, mode_number, mode_count, address
# Modes #
def writeMode(self, mode_number: int) -> bool:
try:
self.sendPacketWithResponse(self.c_codes['set'], self.targets['device mode set'], [mode_number], True)
return True
except SerialException as e:
return False
def readMode(self, mode_index: int) -> tuple:
# self.controller.readDop = True # READ MODE BUG FIX TODO
data = self.sendPacketWithResponse(self.c_codes['get'], self.targets['device mode get'], [mode_index + 1])
# self.controller.readDop = False # READ MODE BUG FIX TODO
debug('Read bytes:')
debug("|".join(hex(b) for b in self.controller.read_buffer))
if not data:
return ()
data = data[1:] # cut out first byte
mode_footprint = int.from_bytes(data[1:3], byteorder='big')
mode_name = bytes(data[3:14]).decode()
return mode_footprint, mode_name
def readModes(self) -> list:
data = self.sendPacketWithResponse(self.c_codes['get'], self.targets['device info'], [])
if not data:
return []
data = data[10:] # cut out first ten bytes
mode_footprint = int.from_bytes(data[0:2], byteorder='big')
mode_number = data[2]
mode_count = data[3]
address = int.from_bytes(data[4:6], byteorder='big')
modes = []
for i in range(mode_count):
mode = self.readMode(i)
if not mode:
return []
modes.append(mode)
time.sleep(0.05)
i = 0
for m in modes:
i = i + 1
debug(f'Mode {i}: {m}')
return modes
# Powers #
def writePowers(self, currents: list, voltages: list, power_poss: int, koeffs: list, a_max: int) -> bool:
data = currents + voltages + [power_poss] + koeffs + [a_max.to_bytes(2, byteorder='big')[0], a_max.to_bytes(2, byteorder='big')[1]]
return self.sendPacket(self.c_codes['set'], self.targets['powers'], data)
def readPowers(self) -> list:
return self.sendPacketWithResponse(self.c_codes['get'], self.targets['powers'], [])
# source: https://stackoverflow.com/a/14224477
@staticmethod
def readComPorts() -> list:
ports = []
if sys.platform.startswith('win'):
ports = ['COM%s' % (i + 1) for i in range(256)]
elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
ports = glob.glob('/dev/tty[A-Za-z]*')
result = []
for port in ports:
try:
s = Serial(port)
s.close()
result.append(port)
except (OSError, SerialException):
pass
return result