Added powers to controller

This commit is contained in:
livefish
2024-11-24 19:09:14 +03:00
commit bf5a7328c2
8 changed files with 983 additions and 0 deletions

3
.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,3 @@
# Default ignored files
/shelf/
/workspace.xml

10
.idea/PowerUI.iml generated Normal file
View File

@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.12" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

8
.idea/modules.xml generated Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/PowerUI.iml" filepath="$PROJECT_DIR$/.idea/PowerUI.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

467
controller/controller.py Normal file
View File

@ -0,0 +1,467 @@
import glob
import sys
import time
from serial import *
class Controller:
def __init__(self, controller_id: int, programmator_id: int):
self.id = controller_id
self.prog_id = programmator_id
self.ser = Serial()
self.packet_id = 0
self.checksum = 0
self.buffer = bytearray()
self.read_buffer = bytearray()
def connect(self, port: str) -> None:
self.ser = Serial(
port, baudrate=250_000,
bytesize=8, parity='N', stopbits=2, timeout=0,
xonxoff=False, rtscts=False
)
print(f'Connected to {port}')
def close(self) -> None:
self.ser.close()
print(f'Closed connection to {self.ser.port}')
def writeBytes(self, bytesToWrite: bytes) -> None:
for b in bytesToWrite:
self.buffer.append(b)
self.checksum += b
def readBytes(self, cnt: int) -> bytes:
buf = self.ser.read(cnt)
for b in buf:
self.read_buffer.append(b)
return buf
def flush(self) -> None:
self.ser.write(self.buffer)
print(self.buffer)
self.buffer.clear()
self.checksum = 0
def validateChecksum(self, checksum: int) -> bool:
s = 0
for b in self.read_buffer:
s += b
return checksum == s
def sendCondition(self) -> None:
self.ser.baudrate = 250_000 / 5
self.ser.write(b'\x00')
self.ser.baudrate = 250_000
def sendPacketHead(self, packetLen: int) -> None:
self.writeBytes(b'\xCC\x01')
self.writeBytes(packetLen.to_bytes(1, byteorder='big'))
self.writeBytes(self.id.to_bytes(6, byteorder='big'))
self.writeBytes(self.prog_id.to_bytes(6, byteorder='big'))
self.writeBytes(self.packet_id.to_bytes(1, byteorder='big'))
self.writeBytes(b'\x01')
self.writeBytes(bytes(3))
def waitForStartBytes(self) -> bool:
x = 0
counter = 10
while counter > 0:
prev = x
time.sleep(0.05)
x = self.readBytes(1)
if x == prev:
counter = counter - 1
print(f'x: {x}, prev: {prev}')
if prev == b'\xCC' and x == b'\x01':
break
if counter == 0:
print('No data!')
return False
return True
# CURRENTS #
def sendWriteCurrentsPacket(self, currents: list) -> None:
self.sendCondition()
self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + len(currents) + 2)
self.writeBytes(b'\x30\xBE\xEE')
self.writeBytes(len(currents).to_bytes(1, byteorder='big'))
for current in currents:
self.writeBytes(current.to_bytes(1, byteorder='big'))
self.writeBytes(self.checksum.to_bytes(2, byteorder='big'))
self.flush()
def sendReadCurrentsPacket(self) -> None:
self.sendCondition()
self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 2)
self.writeBytes(b'\x20\xBE\xEF\x00')
self.writeBytes(self.checksum.to_bytes(2, byteorder='big'))
self.flush()
def readCurrentsPacket(self) -> list:
self.read_buffer.clear()
if not self.waitForStartBytes():
return []
self.readBytes(21)
valueCount = self.readBytes(1)
values = []
for _ in range(valueCount[0]):
values.append(self.readBytes(1)[0])
chsum = int.from_bytes(self.ser.read(2), byteorder='big')
if not self.validateChecksum(chsum):
print('Invalid checksum!')
return []
return values
# DMX addresses #
def sendWriteAddressPacket(self, address: int) -> None:
self.sendCondition()
self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 2 + 2)
self.writeBytes(b'\x30\xBF\x20\x02')
self.writeBytes(address.to_bytes(2, byteorder='big'))
self.writeBytes(self.checksum.to_bytes(2, byteorder='big'))
self.flush()
def readWriteAddressPacket(self) -> bool:
self.read_buffer.clear()
if not self.waitForStartBytes():
return False
self.readBytes(22)
chsum = int.from_bytes(self.ser.read(2), byteorder='big')
if not self.validateChecksum(chsum):
print('Invalid checksum!')
return False
return True
def sendReadAddressPacket(self) -> None:
self.sendCondition()
self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 2)
self.writeBytes(b'\x20\xBF\x20\x00')
self.writeBytes(self.checksum.to_bytes(2, byteorder='big'))
self.flush()
def readAddressPacket(self) -> int:
self.read_buffer.clear()
if not self.waitForStartBytes():
return -1
self.readBytes(22)
address = int.from_bytes(self.readBytes(2), byteorder='big')
chsum = int.from_bytes(self.ser.read(2), byteorder='big')
if not self.validateChecksum(chsum):
print('Invalid checksum!')
return -1
return address
# Device info #
def sendReadDevInfoPacket(self) -> None:
self.sendCondition()
self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 2)
self.writeBytes(b'\x20\xBF\x00\x00')
self.writeBytes(self.checksum.to_bytes(2, byteorder='big'))
self.flush()
def readDevInfoPacket(self) -> tuple:
self.read_buffer.clear()
if not self.waitForStartBytes():
return ()
self.readBytes(32)
mode_footprint = int.from_bytes(self.readBytes(2), byteorder='big')
mode_number = int.from_bytes(self.readBytes(1), byteorder='big')
mode_count = int.from_bytes(self.readBytes(1), byteorder='big')
address = int.from_bytes(self.readBytes(2), byteorder='big')
self.readBytes(3)
chsum = int.from_bytes(self.ser.read(2), byteorder='big')
if not self.validateChecksum(chsum):
print('Invalid checksum!')
return ()
return mode_footprint, mode_number, mode_count, address
# Modes #
def sendWriteModePacket(self, mode_number: int) -> None:
self.sendCondition()
self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 1 + 2)
self.writeBytes(b'\x30\xBF\x10\x01')
self.writeBytes(mode_number.to_bytes(1, byteorder='big'))
self.writeBytes(self.checksum.to_bytes(2, byteorder='big'))
self.flush()
def readWriteModePacket(self) -> bool:
self.read_buffer.clear()
if not self.waitForStartBytes():
return False
self.readBytes(22)
chsum = int.from_bytes(self.ser.read(2), byteorder='big')
if not self.validateChecksum(chsum):
print('Invalid checksum!')
return False
return True
def sendReadModeInfoPacket(self, mode_number: int) -> None:
self.sendCondition()
self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 1 + 2)
self.writeBytes(b'\x20\xBF\x11\x01')
self.writeBytes(mode_number.to_bytes(1, byteorder='big'))
self.writeBytes(self.checksum.to_bytes(2, byteorder='big'))
self.flush()
def readModeInfoPacket(self) -> tuple:
self.read_buffer.clear()
if not self.waitForStartBytes():
return ()
self.readBytes(23)
mode_footprint = int.from_bytes(self.readBytes(2), byteorder='big')
mode_name = self.readBytes(12).decode()
chsum = int.from_bytes(self.ser.read(2), byteorder='big')
if not self.validateChecksum(chsum):
print('Invalid checksum!')
return ()
return mode_footprint, mode_name
# Power #
def sendWritePowerPacket(self, powers: list) -> None:
self.sendCondition()
self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + len(powers) + 2)
self.writeBytes(b'\x30\xBE\x11')
self.writeBytes(len(powers).to_bytes(1, byteorder='big'))
for data in powers:
self.writeBytes(data.to_bytes(1, byteorder='big'))
self.writeBytes(self.checksum.to_bytes(2, byteorder='big'))
self.flush()
def sendReadPowerPacket(self) -> None:
self.sendCondition()
self.sendPacketHead(1 + 6 * 2 + 1 + 4 + 4 + 2)
self.writeBytes(b'\x20\xBE\x10\x00')
self.writeBytes(self.checksum.to_bytes(2, byteorder='big'))
self.flush()
def readPowerPacket(self) -> list:
self.read_buffer.clear()
if not self.waitForStartBytes():
return []
self.readBytes(21)
valueCount = self.readBytes(1)
values = []
for _ in range(valueCount[0]):
values.append(self.readBytes(1)[0])
chsum = int.from_bytes(self.ser.read(2), byteorder='big')
if not self.validateChecksum(chsum):
print('Invalid checksum!')
return []
return values
class ControllerInterface:
def __init__(self, controller: Controller):
self.controller = controller
def connect(self, port: str) -> None:
self.controller.connect(port)
def close(self) -> None:
self.controller.close()
# Currents #
def writeCurrents(self, currents: list) -> bool:
try:
self.controller.sendWriteCurrentsPacket(currents)
time.sleep(0.002)
self.controller.sendWriteCurrentsPacket(currents)
time.sleep(1)
self.controller.sendWriteCurrentsPacket(currents)
time.sleep(0.002)
self.controller.sendWriteCurrentsPacket(currents)
self.controller.packet_id = self.controller.packet_id + 1
return True
except SerialException as e:
print(e)
return False
def readCurrents(self) -> list:
try:
self.controller.sendReadCurrentsPacket()
self.controller.packet_id = self.controller.packet_id + 1
return self.controller.readCurrentsPacket()
except SerialException as e:
print(e)
return []
# DMX Addresses #
def writeAddress(self, address: int) -> bool:
try:
self.controller.sendWriteAddressPacket(address)
self.controller.readWriteAddressPacket()
self.controller.packet_id = self.controller.packet_id + 1
return True
except SerialException as e:
print(e)
return False
def readAddress(self) -> int:
try:
self.controller.sendReadAddressPacket()
self.controller.packet_id = self.controller.packet_id + 1
return self.controller.readAddressPacket()
except SerialException as e:
print(e)
return -1
# Device info #
def readDeviceInfo(self) -> tuple:
try:
self.controller.sendReadDevInfoPacket()
self.controller.packet_id = self.controller.packet_id + 1
return self.controller.readDevInfoPacket()
except SerialException as e:
print(e)
return ()
# Modes #
def writeMode(self, mode_number: int) -> bool:
try:
self.controller.sendWriteModePacket(mode_number)
self.controller.readWriteModePacket()
self.controller.packet_id = self.controller.packet_id + 1
return True
except SerialException as e:
print(e)
return False
def read_mode(self, mode_index: int) -> tuple:
try:
self.controller.sendReadModeInfoPacket(mode_index + 1)
return self.controller.readModeInfoPacket()
except SerialException as e:
print(e)
return ()
def readModes(self) -> list:
try:
self.controller.sendReadDevInfoPacket()
self.controller.packet_id = self.controller.packet_id + 1
info = self.controller.readDevInfoPacket()
if not info:
return []
mode_footprint, mode_number, mode_count, address = info
modes = []
for i in range(mode_count):
mode = self.read_mode(i)
if not mode:
return []
modes.append(mode)
time.sleep(0.05)
i = 0
for m in modes:
i = i + 1
print(f'Mode {i}: {m}')
return modes
except SerialException as e:
print(e)
return []
# Powers #
def writePowers(self, currents: list, voltages: list, power: int, koeffs: list, a_max: int) -> bool:
try:
data = currents + voltages + [power] + koeffs + [a_max]
print(f'Data: {data}')
self.controller.sendWritePowerPacket(data)
time.sleep(0.002)
self.controller.sendWritePowerPacket(data)
time.sleep(1)
self.controller.sendWritePowerPacket(data)
time.sleep(0.002)
self.controller.sendWritePowerPacket(data)
self.controller.packet_id = self.controller.packet_id + 1
return True
except SerialException as e:
print(e)
return False
def readPowers(self) -> list:
try:
self.controller.sendReadPowerPacket()
self.controller.packet_id = self.controller.packet_id + 1
return self.controller.readPowerPacket()
except SerialException as e:
print(e)
return []
# source: https://stackoverflow.com/a/14224477
@staticmethod
def readComPorts() -> list:
ports = []
if sys.platform.startswith('win'):
ports = ['COM%s' % (i + 1) for i in range(256)]
elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
ports = glob.glob('/dev/tty[A-Za-z]*')
result = []
for port in ports:
try:
s = Serial(port)
s.close()
result.append(port)
except (OSError, SerialException):
pass
return result

467
interface.py Normal file
View File

@ -0,0 +1,467 @@
import base64
import time
from threading import Timer
import controller.controller as controller
import tkinter as tk
from tkinter import ttk
controller = controller.ControllerInterface(controller.Controller(0xFFFFFFFFFFFF, 0x112233445542))
win = tk.Tk()
device_choice = ttk.Combobox()
selected_port = tk.StringVar()
dmx_address_field = ttk.Entry()
personality_combobox = ttk.Combobox(win)
failed_to_read_style = ttk.Style(win)
failed_to_read_style.configure('failedToRead.TEntry', foreground="red")
status = tk.StringVar()
status.set('Пожалуйста, выберите COM порт')
saved_status = ''
current_entries = list()
modes = []
connected = False
last_click_time = time.time()
check = tk.IntVar()
def setStatus(new_status: str):
status.set(new_status)
win.update()
win.update_idletasks()
def resetStatus() -> None:
global saved_status
setStatus(saved_status)
def setTempStatus(new_status: str, reset_time: float):
global saved_status
if status.get() != new_status:
saved_status = status.get()
setStatus(new_status)
Timer(reset_time, resetStatus).start()
def resetAllStyles() -> None:
for e in current_entries:
e.configure(style='TEntry')
dmx_address_field.configure(style='TEntry')
personality_combobox.configure(style='TCombobox')
def clearAllFields() -> None:
dmx_address_field.delete(0, tk.END)
personality_combobox.set('')
personality_combobox.configure(values=[])
for e in current_entries:
e.delete(0, tk.END)
def comPortSelected(event) -> None:
global connected
connected = False
resetAllStyles()
clearAllFields()
setStatus(f'Подключение к {selected_port.get()}...')
controller.close()
controller.connect(selected_port.get())
print(selected_port.get())
if readControllerCurrentsAndData():
setStatus(f'Выбранный порт: {selected_port.get()}')
connected = True
else:
setStatus(f'Не удалось подключиться к {selected_port.get()}')
device_choice.set('')
connected = False
controller.close()
def refreshComPorts(event) -> None:
device_choice['values'] = controller.readComPorts()
def connectionLost() -> None:
controller.close()
setStatus('Пожалуйста, выберите порт')
setTempStatus(f'Подключение к {controller.controller.ser.port} потеряно!', 2)
resetAllStyles()
device_choice.set('')
clearAllFields()
def readControllerCurrentsAndData() -> bool:
setStatus(f'Чтение токов {controller.controller.ser.port}...')
values = controller.readCurrents()
if not values:
for e in current_entries:
e.configure(style='failedToRead.TEntry')
else:
index = 0
for e in current_entries:
e.configure(style='TEntry')
e.delete(0, tk.END)
e.insert(tk.END, values[index] * 10)
index = index + 1
setStatus(f'Чтение адреса {controller.controller.ser.port}...')
address = controller.readAddress()
if address == -1:
dmx_address_field.configure(style='failedToRead.TEntry')
else:
dmx_address_field.configure(style='TEntry')
dmx_address_field.delete(0, tk.END)
dmx_address_field.insert(tk.END, str(address))
global modes
modes = []
setStatus(f'Чтение режимов {controller.controller.ser.port}...')
dev_info = controller.readDeviceInfo()
if dev_info:
mode_count = dev_info[2]
for index in range(mode_count):
setStatus(f'Чтение режима {index + 1} {controller.controller.ser.port}...')
mode = controller.read_mode(index)
if not mode:
modes = []
break
modes.append(mode)
time.sleep(0.05)
if not modes:
personality_combobox.set('ERROR')
else:
personality_combobox.configure(style='TCombobox')
personality_combobox.configure(values=modes)
mode_names = []
for index, m in enumerate(modes):
mode_names.append(f'{index + 1}:{m[1]}')
personality_combobox.configure(values=mode_names)
personality_combobox.set(mode_names[dev_info[1] - 1])
return values or address != -1 or modes
def writeCurrentsAddressAndMode() -> None:
toWrite = []
for e in current_entries:
x = e.get()
if x != "":
toWrite.append(int(x) // 10)
else:
toWrite.append(0)
setStatus(f'Запись токов в {controller.controller.ser.port}...')
print(f'Writing... {toWrite}')
if not controller.writeCurrents(toWrite):
connectionLost()
return
setStatus(f'Запись адреса в {controller.controller.ser.port}...')
if dmx_address_field.get() != "" and not controller.writeAddress(int(dmx_address_field.get())):
connectionLost()
return
setStatus(f'Запись режима в {controller.controller.ser.port}...')
if not controller.writeMode(personality_combobox.current() + 1):
connectionLost()
return
if check.get() == 0:
setStatus(f'Выбранный порт: {controller.controller.ser.port}')
setTempStatus(f'Данные были записаны в {controller.controller.ser.port}', 1)
else:
time.sleep(0.5)
dmx_save = dmx_address_field.get()
pers_save = personality_combobox.get()
currents_save = []
for entry in current_entries:
currents_save.append(entry.get())
global modes
setStatus(f'Чтение адреса: {controller.controller.ser.port}...')
dmx_real = controller.readAddress()
setStatus(f'Чтение режима: {controller.controller.ser.port}...')
dev_info = controller.readDeviceInfo()
pers_real = f'{dev_info[1]}:{modes[dev_info[1] - 1][1]}'
setStatus(f'Чтение токов: {controller.controller.ser.port}...')
time.sleep(1)
currents_real = controller.readCurrents()
if dmx_real == -1 or not currents_real or not dev_info:
connectionLost()
return
failed = False
if dmx_save == "" or int(dmx_save) != dmx_real:
failed = True
dmx_address_field.configure(style='failedToRead.TEntry')
else:
dmx_address_field.configure(style='TEntry')
if pers_save != pers_real:
failed = True
personality_combobox.set('ERROR')
else:
personality_combobox.configure(style='TCombobox')
for index in range(4):
if currents_save[index] == "" or int(currents_save[index]) != currents_real[index] * 10:
failed = True
current_entries[index].configure(style='failedToRead.TEntry')
else:
current_entries[index].configure(style='TEntry')
setStatus(f'Выбранный порт: {controller.controller.ser.port}')
if failed:
setTempStatus('Обнаружены ошибки, повторите запись', 1)
else:
setTempStatus('Проверка прошла успешно', 1)
def show_personality_callback() -> None:
popup = tk.Toplevel(win)
popup.grab_set()
popup.resizable(False, False)
popup.title('Информация о режимах')
columns = (
("Номер", 50),
("Название", 100),
("Каналов", 65),
("Плавность", 80),
("Поведение без DMX", 160),
("Описание", 200),
)
for index, c in enumerate(columns):
label = tk.Label(popup, text=f'{c[0]}', width=c[1] // 8, borderwidth=1, relief="solid")
label.grid(row=0, column=index, sticky="nsew")
mode_info = [
(1, "1CH MAX FST", 'нет', 'максимальная яркость'),
(2, "1CH MAX SLO", 'есть', 'максимальная яркость'),
(3, "1CH KEEP FST", 'нет', 'сохранение кадра'),
(4, "1CH KEEP SLO", 'есть', 'сохранение кадра'),
(5, "1CH DARK FST", 'нет', 'гашение'),
(6, "1CH DARK SLO", 'есть', 'гашение'),
(7, "2CH MAX FST", 'нет', 'максимальная яркость'),
(8, "2CH MAX SLO", 'есть', 'максимальная яркость'),
(9, "2CH KEEP FST", 'нет', 'сохранение кадра'),
(10, "2CH KEEP SLO", 'есть', 'сохранение кадра'),
(11, "2CH DARK FST", 'нет', 'гашение'),
(12, "2CH DARK FST", 'есть', 'гашение'),
(13, "3 COLOR CHG", 'нет', 'сохранение кадра'),
(14, "4 COLOR CHG", 'нет', 'сохранение кадра'),
(15, "4CH MAXA FST", 'нет', 'максимальная яркость всех'),
(16, "4CH MAXA SLO", 'есть', 'максимальная яркость всех'),
(17, "4CH MAXW FST", 'нет', 'максимальная яркость белый'),
(18, "4CH MAXW SLO", 'есть', 'максимальная яркость.белый'),
(19, "4CH KEEP FST", 'нет', 'сохранение кадра'),
(20, "4CH KEEP SLO", 'есть', 'сохранение кадра'),
(21, "4CH DARK FST", 'нет', 'гашение'),
(22, "4CH DARK SLO", 'есть', 'гашение'),
(23, "4CH RB3S FST", 'нет', 'RGB перелив синхронизированный'),
(24, "4CH RB3S SLO", 'есть', 'RGB перелив синхронизированный'),
(25, "4CH RB3A FST", 'нет', 'RGB перелив индивидуальный'),
(26, "4CH RB3A SLO", 'есть', 'RGB перелив индивидуальный'),
(27, "4CH RB4S FST", 'нет', 'RGBW перелив синхронизированный'),
(28, "4CH RB4S SLO", 'есть', 'RGBW перелив синхронизированный'),
(29, "4CH RB4A FST", 'нет', ' RGBW перелив индивидуальный '),
(30, "4CH RB4A SLO", 'есть', 'RGBW перелив индивидуальный '),
]
for index in range(len(mode_info)):
for j in range(2):
label = tk.Label(popup, text=f'{mode_info[index][j]}', borderwidth=1, relief="solid")
label.grid(row=index + 1, column=j, sticky="nsew")
for index in range(len(mode_info)):
for j in range(2, 4):
label = tk.Label(popup, text=f'{mode_info[index][j]}', borderwidth=1, relief="solid")
label.grid(row=index + 1, column=j + 1, sticky="nsew")
descr = [
('Режим одноканального диммера. \nВсе выходные каналы работают параллельно', 6),
('Режим двухканального диммера. \nВыходные каналы 1,2 и 3,4 работают параллельно', 6),
(
'Режим RGB перелива с изменением яркости каждого канала. \nW не управляется. Последний канал скорость перелива. ',
1),
(
'Режим RGB перелива с изменением яркости каждого канала. \nW управляется отдельно по DMX. Последний канал скорость перелива',
1),
('Режим четырехканального диммера. \nВсе выходные каналы работают независимо', 16)
]
delta = 0
for index, c in enumerate(descr):
label = tk.Label(popup, text=f'{c[0]}', borderwidth=1, relief="solid")
label.grid(row=delta + 1, column=5, sticky="nsew", rowspan=c[1])
delta += c[1]
chan = [(1, 6), (2, 6), (4, 1), (5, 1), (4, 16)]
delta = 0
for index, c in enumerate(chan):
label = tk.Label(popup, text=f'{c[0]}', borderwidth=1, relief="solid")
label.grid(row=delta + 1, column=2, sticky="nsew", rowspan=c[1])
delta += c[1]
def readCallback() -> None:
global last_click_time
if time.time() - last_click_time < 1:
return
last_click_time = time.time()
if connected:
if not readControllerCurrentsAndData():
connectionLost()
return
setStatus(f'Выбранный порт: {controller.controller.ser.port}')
setTempStatus(f'Токи прочитаны, порт: {controller.controller.ser.port}', 1)
else:
setTempStatus('Порт не выбран!', 1)
def writeCallback() -> None:
global last_click_time
if time.time() - last_click_time < 2:
return
last_click_time = time.time()
if connected:
writeCurrentsAddressAndMode()
else:
setTempStatus('Порт не выбран!', 1)
def validate_dmx_address_entry(entry_data: str) -> bool:
if entry_data.isdigit():
return 1 <= int(entry_data) <= 512
if not entry_data:
return True
return False
def validate_current_entry(entry_data: str) -> bool:
if entry_data.isdigit():
return int(entry_data) <= 2500
if not entry_data:
return True
return False
def finish():
win.destroy()
controller.close()
if __name__ == "__main__":
controller.writePowers([100, 200, 300, 400], [300, 2000, 2341, 10], 49, [100, 240, 30, 239], 52)
win.configure(width=270, height=350)
win.resizable(False, False)
win.title("Controllers")
win.protocol("WM_DELETE_WINDOW", finish)
com_port_label = ttk.Label(win)
com_port_label.configure(font="{Arial} 12 {}", text='COM порт:')
com_port_label.place(anchor="nw", relx=0.05, rely=0.05)
device_choice = ttk.Combobox(win, name="device_choice", textvariable=selected_port)
device_choice.configure(justify="center")
device_choice.place(anchor="nw", relx=0.40, rely=0.04, relwidth=0.4, relheight=0.08)
device_choice['values'] = controller.readComPorts()
device_choice['state'] = 'readonly'
device_choice.bind('<<ComboboxSelected>>', comPortSelected)
device_choice.bind('<Enter>', refreshComPorts)
dmx_address_label = ttk.Label(win)
dmx_address_label.configure(font="{Arial} 12 {}", text='DMX Адрес:')
dmx_address_label.place(anchor="nw", relx=0.05, rely=0.15)
validate_callback = win.register(validate_dmx_address_entry)
dmx_address_field = ttk.Entry(win)
dmx_address_field.configure(font="{Arial} 12 {}", justify="center", validate='key',
validatecommand=(validate_callback, '%P'))
dmx_address_field.place(anchor="nw", relx=0.40, rely=0.15, relwidth=0.2, relheight=0.06)
personality_label = ttk.Label(win)
personality_label.configure(font="{Arial} 12 {}", text='Personality:')
personality_label.place(anchor="nw", relx=0.05, rely=0.26)
image = base64.b64decode(
b'iVBORw0KGgoAAAANSUhEUgAAABAAAAAQCAYAAAAf8/9hAAAAAXNSR0IArs4c6QAAAnxJREFUOE+lk01IVUEUx/9zZ+Z+Pg2DsIWiSQaBYrgoREqkyFUEQWihqyQR+6KtywrbVERUhkQSCG1q1aJNkYmiqSUUQQuJapFZfvR8+nz3Y07cufLSPlYd7r0wc87/N/9zZ4bhP4P9S3/wzAgFyoCKFAgBXtxu+GvtH5NN5yZpMcsRKQD5LIPgBBVm8bKvboMmPzh0fpgWcx6yOQIzGOqrCrGzzMGqrzD8LoMPX/3ErAow3lu7Dr3Ww96uKYrFnstxrasc1RUuokhBKUIUEXoezODpm2VtS0U+xm/u0hD9iW1/zxgAY/GDnvZScM7wZCKNw3WFqCqz8fmbj/ZbMzofR255Dq9665Phno4pinumeMQYXJvDV0BEDNXlNq6eKMaX+RCtN2JAsgiiABPXqxiL//Zc1knMGAzEGHIhIYgYGGe4cHwL9td4GBjKoH8ok3cAAiavVDLWeGqM0jlLr7ziK2R9giEMCJOjs6kIbY2FePvJR/fDNCK9etIDEWH04jbGGjpHaCl0sbAUIlTQQmEK3fedzmLkAoWT/T+wFDJwYegdSnaD8Ly7hLF9HYP0cd5FEIslh7SS90BNCs31KUzPRrg75kPGYMlh8AQgoPD47NbET+mRMVIG10UaYAuYtkCBJ0BSwHQETEvofOwiDmM1g0enSxPAjqODtMK8NUAirq2wcflYEZ69D3D/tYLlSj2vWzSAgRbv1zmIIRXNo0SmDdPiunB3pYNLLUUYng7RNx7B9kxYnoTjSrD0LO61lWwExJDtrRPEbUsDTFuieLNEzhCQjtQAr0DC9Bfy4vxJXH8jK9uGyE5tguk6MB0JyxVIeQKmEeief7+9PwEI7uIRQxNy9wAAAABJRU5ErkJgggAA')
personality_button = ttk.Button(win, command=show_personality_callback)
question_image = tk.PhotoImage(data=image, format='png')
personality_button.configure(compound="right", default="normal", image=question_image)
personality_button.place(anchor="nw", relx=0.87, rely=0.246, relwidth=0.1)
personality_combobox_font = ('TkDefaultFont', 9)
personality_combobox = ttk.Combobox(win, font=personality_combobox_font)
personality_combobox.configure(
exportselection=True,
justify="left",
state="readonly",
takefocus=False)
personality_combobox.place(anchor="nw", relx=0.40, rely=0.246, relwidth=0.45, relheight=0.085)
for i in range(4):
channel_label = ttk.Label(win)
channel_label.configure(font="{Arial} 12 {}", text=f'Ток канал {i + 1}:')
channel_label.place(anchor="nw", relx=0.05, rely=0.38 + 0.08 * i)
validate_callback = win.register(validate_current_entry)
ent = ttk.Entry(win)
ent.configure(font="{Arial} 12 {}", justify="center", validate='key',
validatecommand=(validate_callback, '%P'))
ent.place(anchor="nw", relx=0.40, rely=0.38 + 0.08 * i, relwidth=0.2, relheight=0.06)
current_entries.append(ent)
ma_label = ttk.Label(win)
ma_label.configure(font="{Arial} 12 {}", text='mA')
ma_label.place(anchor="nw", relx=0.65, rely=0.38 + 0.08 * i)
read_button = ttk.Button(win)
read_button.configure(text='Чтение', command=readCallback)
read_button.place(anchor="nw", relx=0.05, rely=0.7, relwidth=0.37)
write_button = ttk.Button(win)
write_button.configure(text='Запись', command=writeCallback)
write_button.place(anchor="nw", relx=0.6, rely=0.7, relwidth=0.37)
check_checkbox = ttk.Checkbutton(win, variable=check)
check_checkbox.configure(text='Проверить')
check_checkbox.place(anchor="n", relx=0.5, rely=0.8)
status_label = ttk.Label(win, textvariable=status)
status_label.place(anchor="center", relx=0.5, rely=0.91)
win.mainloop()

16
main.py Normal file
View File

@ -0,0 +1,16 @@
# This is a sample Python script.
# Press Shift+F10 to execute it or replace it with your code.
# Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.
def print_hi(name):
# Use a breakpoint in the code line below to debug your script.
print(f'Hi, {name}') # Press Ctrl+F8 to toggle the breakpoint.
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
print_hi('PyCharm')
# See PyCharm help at https://www.jetbrains.com/help/pycharm/