import logging import os import sys import time from datetime import datetime from logging import debug, info, error from threading import Timer from PyQt6 import QtCore, QtWidgets, QtGui from PyQt6.QtCore import QRegularExpression from PyQt6.QtGui import QRegularExpressionValidator import controller.controller_reifined as controller from all import Ui_Controller class Window(QtWidgets.QMainWindow): def __init__(self): QtWidgets.QMainWindow.__init__(self) self.ui = Ui_Controller() self.ui.setupUi(self) self.controller = controller.ControllerInterface(controller.ControllerRefined(0xFFFFFFFFFFFF, 0x112233445542)) self.device = "" # dev selection self.hoverFilter = self.ComboBoxHoverEventFilter(self.ui.DeviceSelection, self.refreshDeviceList) self.connected = False # object containers self.currentFields = [self.ui.Current1Field, self.ui.Current2Field, self.ui.Current3Field, self.ui.Current4Field] self.voltageFields = [self.ui.Voltage1Field, self.ui.Voltage2Field, self.ui.Voltage3Field, self.ui.Voltage4Field] # qt styles self.redLineEdit = "QLineEdit { background: rgb(255, 0, 0); selection-background-color: rgb(111, 136, 222); }" self.disLineEdit = "QLineEdit { background: rgb(117, 116, 114); selection-background-color: rgb(117, 116, 114); }" self.defLineEdit = "QLineEdit {}" self.redCombobox = "QComboBox { background: rgb(255, 0, 0); selection-background-color: rgb(111, 136, 222); }" self.defCombobox = "QComboBox {}" # Other state vars self.last_click_time = 0 self.savedStatus = "" self.modes = [] # Mode table fields self.imageDialog = None self.imageLabel = None self.pixmap = None # connect signals and validation self.connectMethods() self.applyValidators() def connectMethods(self): self.ui.DeviceSelection.activated.connect(self.deviceSelectionCallback) self.ui.DeviceSelection.installEventFilter(self.hoverFilter) self.ui.Read.clicked.connect(self.readCallback) self.ui.Write.clicked.connect(self.writeCallback) self.ui.ModeInfo.clicked.connect(self.showModeTable) def applyValidators(self): # 1 - 511 self.ui.DmxAddressField.setValidator(QRegularExpressionValidator(QRegularExpression("^([1-9]|[1-9][0-9]|[1-4][0-9]{2}|50[0-9]|511)$"))) # 100 - 2550 for f in self.currentFields: f.setValidator(QRegularExpressionValidator( QRegularExpression("^(?:[1-9][0-9]{2}|1[0-9]{3}|2[0-4][0-9]{2}|25[0-4][0-9]|2550)$"))) # 2 - 60 for f in self.voltageFields: f.setValidator(QRegularExpressionValidator( QRegularExpression("^(?:[2-9]|[1-5][0-9]|60)$"))) # 5 - 255 self.ui.PowerField.setValidator(QRegularExpressionValidator( QRegularExpression("^(?:[5-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$"))) def resetAllStyles(self) -> None: for f in self.currentFields: f.setStyleSheet(self.defLineEdit) for f in self.voltageFields: f.setStyleSheet(self.defLineEdit) self.ui.ModeSelection.setStyleSheet(self.defCombobox) self.ui.PDop.setStyleSheet(self.defLineEdit) self.ui.DmxAddressField.setStyleSheet(self.defLineEdit) def clearAllFields(self) -> None: self.ui.DmxAddressField.clear() self.ui.ModeSelection.clear() for f in self.currentFields: f.setText("") f.setReadOnly(False) for f in self.voltageFields: f.setText("") f.setReadOnly(False) self.ui.PowerField.setText("") self.ui.PowerField.setReadOnly(False) def resetStatus(self) -> None: self.setStatus(self.savedStatus) def setStatus(self, text: str) -> None: print(text) self.ui.Status.setText(text) app.processEvents() def setTempStatus(self, new_status: str, reset_time: float) -> None: if self.ui.Status.text() != new_status: self.savedStatus = self.ui.Status.text() self.setStatus(new_status) Timer(reset_time, self.resetStatus).start() def setText(self, field, text) -> None: field.setReadOnly(False) field.setStyleSheet(self.defLineEdit) field.setText(text) def setReadonly(self, fields: list) -> None: for f in fields: f.setReadOnly(True) f.setStyleSheet(self.disLineEdit) def setCurrents(self, currents: list) -> None: for i, f in enumerate(self.currentFields): self.setText(f, str(currents[i] * 10)) def getCurrents(self) -> list: currents = [] for f in self.currentFields: currents.append(int(f.text()) if f.text() != "" else 0) return currents def setVoltages(self, voltages: list) -> None: for i, f in enumerate(self.voltageFields): self.setText(f, str(int(voltages[i] * 0.235))) def getVoltages(self) -> list: voltages = [] for f in self.voltageFields: voltages.append(int(f.text()) if f.text() != "" else 0) return voltages def setPower(self, power: int) -> None: self.setText(self.ui.PowerField, str(power)) def connectionLost(self) -> None: self.controller.close() self.setStatus('Пожалуйста, выберите порт') self.setTempStatus(f'Подключение к {self.controller.controller.ser.port} потеряно!', 2) self.resetAllStyles() self.clearAllFields() self.ui.DeviceSelection.clear() # Manually clear device selection combobox def determine_capabilities(self, readModes=True) -> tuple[bool, tuple]: # 1.1 self.setStatus(f'Чтение адреса {self.controller.controller.ser.port}...') address = self.controller.readAddress() if address == -1: self.setStatus(f'Ошибка чтения адреса {self.controller.controller.ser.port}') return False, () # 1.2 self.setStatus(f'Чтение режима {self.controller.controller.ser.port}...') dev_info = self.controller.readDeviceInfo() debug(dev_info) if not dev_info: self.setStatus(f'Ошибка чтения режима {self.controller.controller.ser.port}') return False, () # 1.2 (after all) if readModes: self.modes = [] mode_count = dev_info[2] for index in range(mode_count): self.setStatus(f'Чтение режима {index + 1} {self.controller.controller.ser.port}...') mode = self.controller.readMode(index) if not mode: return False, () self.modes.append(mode) time.sleep(0.05) # 1.3 self.setStatus(f'Чтение токов {self.controller.controller.ser.port}...') currents = self.controller.readCurrents() # 1.4 self.setStatus(f'Чтение эл. параметров {self.controller.controller.ser.port}...') powers = self.controller.readPowers() return True, (address, dev_info, self.modes, currents, powers) def readData(self, connecting=False) -> bool: self.connected, data = self.determine_capabilities() if not self.connected: self.setStatus(f'{"Не удалось подключиться" if connecting else "Потеряно подключение"} к {self.device}') self.ui.DeviceSelection.clear() self.controller.close() return False address, dev_info, self.modes, currents, powers = data self.setStatus(f'Выбранный порт: {self.device}') self.ui.DmxAddressField.setText(str(address)) self.ui.ModeSelection.setStyleSheet(self.defCombobox) self.ui.ModeSelection.clear() mode_names = [] for index, m in enumerate(self.modes): mode_names.append(f'{index + 1}:{m[1]}') self.ui.ModeSelection.addItems(mode_names) self.ui.ModeSelection.setCurrentText(mode_names[dev_info[1] - 1]) if currents: self.setCurrents(currents) else: self.setReadonly(self.currentFields) if powers: self.setVoltages(powers[4:8]) self.setPower(powers[8]) else: self.setReadonly([*self.voltageFields, self.ui.PowerField]) return True def writeData(self) -> None: success, data = self.determine_capabilities(readModes=False) if not success: self.connectionLost() return address, dev_info, _, currents, powers = data self.setStatus(f'Запись адреса в {self.controller.controller.ser.port}...') if self.ui.DmxAddressField.text() != "" and not self.controller.writeAddress(int(self.ui.DmxAddressField.text())): self.connectionLost() return self.setStatus(f'Запись режима в {self.controller.controller.ser.port}...') if not self.controller.writeMode(self.ui.ModeSelection.currentIndex() + 1): self.connectionLost() return if currents: self.setStatus(f'Запись токов в {self.controller.controller.ser.port}...') currents_controller = [int(self.getCurrents()[i] * 0.1) for i in range(4)] if not self.controller.writeCurrents(currents_controller): self.connectionLost() return if powers: currents = self.getCurrents() voltages = self.getVoltages() # FIX ROUNDING ISSUE for i in range(4): voltages[i] = voltages[i] + 1 p_poss = int(self.ui.PowerField.text()) p_max = powers[17] pwm_max = int.from_bytes([powers[15], powers[16]], byteorder='big') currents_ma = [currents[i] * 0.001 for i in range(4)] currents_controller = [int(currents[i] * 0.1) for i in range(4)] voltages_controller = [int(voltages[i] / 0.235) for i in range(4)] powers = [currents_ma[i] * voltages[i] for i in range(4)] p_sum = sum(powers) ka = [powers[i] / p_sum for i in range(4)] km = min(p_poss / p_sum, 1) Amax = int(km * pwm_max) kn = [min(int(ka[i] * 256), 255) for i in range(4)] info(f'Currents: {currents}') info(f'Currents controller: {currents_controller}') info(f'Currents ma: {currents_ma}') info(f'Voltages: {voltages}') info(f'Voltages controller: {voltages_controller}') info(f'Powers: {powers}') info(f'Power sum: {p_sum}') info(f'Ka: {ka}') info(f'Km: {km}') info(f'Pmax: {p_max}') info(f'Pext: {p_poss}') info(f'PwmMax: {pwm_max}') info(f'Amax: {Amax}') info(f'Kn: {kn}') self.setStatus(f'Запись эл. параметров в {self.controller.controller.ser.port}...') if not self.controller.writePowers(currents_controller, voltages_controller, p_poss, kn, Amax): self.connectionLost() return if not self.ui.Check.isChecked(): self.setStatus(f'Выбранный порт: {self.controller.controller.ser.port}') self.setTempStatus(f'Данные были записаны в {self.controller.controller.ser.port}', 1) else: time.sleep(0.5) dmx_save = self.ui.DmxAddressField.text() pers_save = self.ui.ModeSelection.currentText() currents_save = [] for entry in self.currentFields: currents_save.append(entry.text()) voltages_save = [] for entry in self.voltageFields: voltages_save.append(entry.text()) p_poss_save = self.ui.PowerField.text() self.setStatus(f'Чтение адреса: {self.controller.controller.ser.port}...') dmx_real = self.controller.readAddress() if dmx_real == -1: self.connectionLost() return self.setStatus(f'Чтение режима: {self.controller.controller.ser.port}...') dev_info = self.controller.readDeviceInfo() if not dev_info: self.connectionLost() return pers_real = f'{dev_info[1]}:{self.modes[dev_info[1] - 1][1]}' currents_real = [] if currents: self.setStatus(f'Чтение токов: {self.controller.controller.ser.port}...') currents_real = [a * 10 for a in self.controller.readCurrents()] if not currents_real: self.connectionLost() return voltages_real = [] p_poss_real = 0 if powers: self.setStatus(f'Чтение эл. параметров: {self.controller.controller.ser.port}...') powers_real = self.controller.readPowers() if not powers_real: self.connectionLost() return voltages_real = [int(powers_real[4 + i] * 0.235) for i in range(4)] p_poss_real = powers_real[8] if not currents: currents_real = powers_real[0:4] print(f'Save: {dmx_save}, {pers_save}, {currents_save}, {voltages_save}, {p_poss_save}') print(f'Real: {dmx_real}, {pers_real}, {currents_real}, {voltages_real}, {p_poss_real}') failed = False if dmx_save == "" or int(dmx_save) != dmx_real: failed = True self.ui.DmxAddressField.setStyleSheet(self.redLineEdit) else: self.ui.DmxAddressField.setStyleSheet(self.defLineEdit) if pers_save != pers_real: failed = True self.ui.ModeSelection.clear() self.ui.ModeSelection.setStyleSheet('QComboBox { background: rgb(255, 0, 0); selection-background-color: rgb(111, 136, 222); }') else: self.ui.ModeSelection.setStyleSheet('QComboBox {}') for index in range(4): if currents: if currents_save[index] == "" or int(currents_save[index]) != currents_real[index]: failed = True print(f'Failed current: {index}') self.currentFields[index].setStyleSheet(self.redLineEdit) else: self.currentFields[index].setStyleSheet(self.defLineEdit) if powers: if voltages_save[index] == "" or int(voltages_save[index]) != voltages_save[index]: failed = True print(f'Failed voltage: {index}') self.voltageFields[index].setStyleSheet(self.redLineEdit) else: self.voltageFields[index].setStyleSheet(self.defLineEdit) if powers: if p_poss_save != p_poss_real: failed = True self.ui.PowerField.setStyleSheet(self.redLineEdit) else: self.ui.PowerField.setStyleSheet(self.defLineEdit) self.setStatus(f'Выбранный порт: {self.controller.controller.ser.port}') if failed: self.setTempStatus('Обнаружены ошибки, повторите запись', 1) else: self.setTempStatus('Проверка прошла успешно', 1) def deviceSelectionCallback(self, deviceIndex: int): self.resetAllStyles() self.clearAllFields() self.device = self.ui.DeviceSelection.itemText(deviceIndex) self.connected = False self.controller.close() self.setStatus(f'Подключение к {self.device}...') self.controller.connect(self.device) self.readData(connecting=True) def readCallback(self): if time.time() - self.last_click_time < 1: return self.last_click_time = time.time() if self.connected: if self.readData(): self.setStatus(f'Выбранный порт: {self.controller.controller.ser.port}') self.setTempStatus(f'Данные прочитаны, порт: {self.controller.controller.ser.port}', 1) else: self.setTempStatus('Порт не выбран!', 1) def writeCallback(self): if time.time() - self.last_click_time < 2: return self.last_click_time = time.time() if self.connected: self.writeData() else: self.setTempStatus('Порт не выбран!', 1) def refreshDeviceList(self): selected = self.ui.DeviceSelection.currentText() self.ui.DeviceSelection.clear() self.ui.DeviceSelection.addItems(self.controller.readComPorts()) self.ui.DeviceSelection.setCurrentText(selected) def showModeTable(self) -> None: def resource_path(relative_path): try: base_path = sys._MEIPASS except Exception: base_path = os.path.abspath(".") return os.path.join(base_path, relative_path) self.pixmap = QtGui.QPixmap(resource_path("../table.png")) if self.pixmap.width() == 0: self.pixmap = QtGui.QPixmap(resource_path("table.png")) if self.pixmap.width() == 0: error('Failed to load mode table image!') exit(1) debug(self.pixmap.width()) self.imageDialog = QtWidgets.QDialog() self.imageDialog.setWindowTitle('Описание режимов работы') self.imageDialog.resize(self.pixmap.width(), self.pixmap.height()) self.imageLabel = QtWidgets.QLabel(self.imageDialog) self.imageLabel.resize(self.pixmap.width(), self.pixmap.height()) self.imageLabel.setPixmap(self.pixmap) self.imageDialog.exec() class ComboBoxHoverEventFilter(QtCore.QObject): def __init__(self, parent, onEnter): super().__init__(parent) self.parent = parent self.onEnter = onEnter def eventFilter(self, obj, event): if event.type() == QtCore.QEvent.Type.Enter: self.onEnter() return super().eventFilter(obj, event) app = QtWidgets.QApplication(sys.argv) def setup_logs() -> None: os.makedirs("logs", exist_ok=True) logging.basicConfig( level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(f'./logs/{datetime.now().strftime("%Y%m%d-%H%M%S")}.log'), logging.StreamHandler() ] ) if __name__ == '__main__': setup_logs() window = Window() window.show() sys.exit(app.exec())