Fixed protocol - ready for pyqt

This commit is contained in:
livefish
2024-12-01 19:32:51 +03:00
parent b7d9b380f9
commit f281cea944
2 changed files with 30 additions and 18 deletions

View File

@ -4,6 +4,7 @@ import time
from serial import *
class ControllerRefined:
def __init__(self, controller_id: int, programmator_id: int):
self.id = controller_id
@ -14,8 +15,7 @@ class ControllerRefined:
self.buffer = bytearray()
self.read_buffer = bytearray()
self.readDop = False # READ MODE BUG FIX TODO
self.readDop = False # READ MODE BUG FIX TODO
def connect(self, port: str) -> None:
self.ser = Serial(
@ -123,7 +123,7 @@ class ControllerRefined:
for _ in range(valueCount[0]):
values.append(self.readBytes(1)[0])
if self.readDop: # READ MODE BUG FIX TODO
if self.readDop: # READ MODE BUG FIX TODO
values.append(self.readBytes(1)[0])
print(f'Read bytes: {self.read_buffer}')
@ -161,13 +161,15 @@ class ControllerInterface:
print(e)
return False
def sendPacketWithResponse(self, command_code: int, target: tuple, data: list) -> list:
def sendPacketWithResponse(self, command_code: int, target: tuple, data: list, throw_if_error=False) -> list:
try:
self.controller.sendRequest(command_code, target, data)
self.controller.packet_id = self.controller.packet_id + 1
return self.controller.readData()
except SerialException as e:
print(e)
if throw_if_error:
raise e
return []
c_codes = {
@ -195,8 +197,11 @@ class ControllerInterface:
# DMX Addresses #
def writeAddress(self, address: int) -> bool:
data = self.sendPacketWithResponse(self.c_codes['set'], self.targets['address'], [address])
return data != []
try:
self.sendPacketWithResponse(self.c_codes['set'], self.targets['address'], [address], True)
return True
except SerialException as e:
return False
def readAddress(self) -> int:
data = self.sendPacketWithResponse(self.c_codes['get'], self.targets['address'], [])
@ -210,7 +215,7 @@ class ControllerInterface:
if not data:
return ()
data = data[10:] # cut out first ten bytes
data = data[10:] # cut out first ten bytes
mode_footprint = int.from_bytes(data[0:2], byteorder='big')
mode_number = data[2]
mode_count = data[3]
@ -220,19 +225,24 @@ class ControllerInterface:
# Modes #
def writeMode(self, mode_number: int) -> bool:
data = self.sendPacketWithResponse(self.c_codes['set'], self.targets['device mode set'], [mode_number])
return data != []
try:
self.sendPacketWithResponse(self.c_codes['set'], self.targets['device mode set'], [mode_number], True)
return True
except SerialException as e:
return False
def readMode(self, mode_index: int) -> tuple:
self.controller.readDop = True # READ MODE BUG FIX TODO
self.controller.readDop = True # READ MODE BUG FIX TODO
data = self.sendPacketWithResponse(self.c_codes['get'], self.targets['device mode get'], [mode_index + 1])
self.controller.readDop = False # READ MODE BUG FIX TODO
self.controller.readDop = False # READ MODE BUG FIX TODO
print(f'Read bytes: {self.controller.read_buffer}')
print('Read bytes:')
for i, b in enumerate(self.controller.read_buffer):
print(f'{i}: {b.to_bytes(1).hex()}')
if not data:
return ()
data = data[1:] # cut out first byte
data = data[1:] # cut out first byte
mode_footprint = int.from_bytes(data[1:3], byteorder='big')
mode_name = bytes(data[3:14]).decode()
@ -263,7 +273,6 @@ class ControllerInterface:
return modes
# Powers #
def writePowers(self, currents: list, voltages: list, power: int, koeffs: list, a_max: int) -> bool:
data = currents + voltages + [power] + koeffs + [a_max]

View File

@ -159,8 +159,8 @@ def readControllerCurrentsAndData() -> bool:
for index, m in enumerate(modes):
mode_names.append(f'{index + 1}:{m[1]}')
personality_combobox.configure(values=mode_names)
personality_combobox.set(mode_names[dev_info[1] - 1])
personality_combobox.configure(values=mode_names)
return values or address != -1 or modes
@ -204,17 +204,20 @@ def writeCurrentsAddressAndMode() -> None:
global modes
setStatus(f'Чтение адреса: {controller.controller.ser.port}...')
dmx_real = controller.readAddress()
setStatus(f'Чтение режима: {controller.controller.ser.port}...')
dev_info = controller.readDeviceInfo()
pers_real = f'{dev_info[1]}:{modes[dev_info[1] - 1][1]}'
setStatus(f'Чтение токов: {controller.controller.ser.port}...')
time.sleep(1)
currents_real = controller.readCurrents()
if dmx_real == -1 or not currents_real or not dev_info:
if dmx_real == -1 or not dev_info or not currents_real:
connectionLost()
return
pers_real = f'{dev_info[1]}:{modes[dev_info[1] - 1][1]}'
failed = False
if dmx_save == "" or int(dmx_save) != dmx_real:
failed = True