import logging import os import random import time from datetime import datetime from logging import info, fatal, error from random import randint from time import sleep from typing import Any import controller.controller_reifined as c def setup_logs() -> None: os.makedirs("logs", exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(f'./logs/{datetime.now().strftime("%Y%m%d-%H%M%S")}.log'), logging.StreamHandler() ] ) def testFunction(func, n: int, controller: c.ControllerInterface, delay=0) -> list[int | Any]: passed = 0 for i in range(1, n + 1): info(f'Test {i}/{n} of {func.__name__}...') res = func(controller) if res: passed += 1 info(f'Test {i}/{n} of {func.__name__}: {"successful" if res else "failed"}\n\n') if delay != 0: time.sleep(delay) if passed == n: info(f'Tests of {func.__name__} passed\n\n') else: error(f'Tests of {func.__name__} failed, passed {passed}/{n}\n\n') return [passed, n] def batch_test(functions: list[tuple[Any, int, int]], test_delay: int, controller: c.ControllerInterface) -> list[ int | Any]: res = [[0, 0] for i in range(len(functions))] for index, (func, n, delay) in enumerate(functions): res[index] = testFunction(func, n, controller, delay) passed_tests = 0 info('Test results!') for i, (passed, total) in enumerate(res): if passed == total: passed_tests += 1 info(f'Tests of {functions[i][0].__name__}: passed {passed}/{total}') sleep(test_delay) info(f'Passed {passed_tests}/{len(functions)} tests') return [passed_tests, len(functions)] def start() -> None: setup_logs() port = '/dev/ttyUSB0' controller = c.ControllerInterface(c.ControllerRefined(0xFFFFFFFFFFFF, 0x112233445542)) controller.connect(port) info(f'Controller connected to {port}') batch_test( [ (test_power_write, 30, 2), (test_power_read, 100, 0), (test_mode_read, 10, 0), (test_dev_info_read, 100, 0), (test_dev_info_write, 30, 1), (test_address_read, 100, 0), (test_address_write, 30, 1), (test_current_read, 100, 0), (test_current_write, 30, 1) ], 2, controller ) ## TESTS ## def test_current_read(controller: c.ControllerInterface) -> bool: info('Reading currents...') currents = controller.readCurrents() if currents: info(f'Read currents: {currents}') return True else: error('Failed to read currents') return False def test_current_write(controller: c.ControllerInterface) -> bool: current_min = 40 current_max = 67 currents = [ random.randint(current_min, current_max), random.randint(current_min, current_max), random.randint(current_min, current_max), random.randint(current_min, current_max), ] info(f'Writing currents {currents}...') res = controller.writeCurrents(currents) if res: info(f'Written currents: {res}') else: error('Failed to write currents') return False time.sleep(1) real = controller.readCurrents() if real: info(f'Read real currents: {real}') if currents == real: info(f'Check successful') return True else: error('Check failed: currents do not match') else: error('Failed to read currents') return False def test_address_read(controller: c.ControllerInterface) -> bool: info('Reading address...') address = controller.readAddress() if address != -1: info(f'Read address: {address}') return True else: error('Failed to read address') return False def test_address_write(controller: c.ControllerInterface) -> bool: address_min = 1 address_max = 63 address = random.randint(address_min, address_max) info(f'Writing address {address}...') res = controller.writeAddress(address) if res: info(f'Written address: {res}') else: error('Failed to write address') return False time.sleep(1) real = controller.readAddress() if real: info(f'Read real address: {real}') if address == real: info(f'Check successful') return True else: error('Check failed: addresses do not match') else: error('Failed to read address') return False def test_mode_read(controller: c.ControllerInterface) -> bool: info('Reading mode...') modes = controller.readModes() if modes: info(f'Read modes: {modes}') return True else: error('Failed to read modes') return False def test_dev_info_read(controller: c.ControllerInterface) -> bool: info('Reading dev info...') dev_info = controller.readDeviceInfo() if dev_info: info(f'Read dev info: {dev_info}') return True else: error('Failed to read dev_info') return False def test_dev_info_write(controller: c.ControllerInterface) -> bool: info('Reading dev info...') dev_info = controller.readDeviceInfo() if dev_info: info(f'Read dev info: {dev_info}') else: error('Failed to read dev_info') return False mode_min = 1 mode_max = dev_info[2] - 1 mode = random.randint(mode_min, mode_max) info(f'Writing mode {mode}...') res = controller.writeMode(mode) if res: info(f'Written mode: {mode}') else: error('Failed to write mode') return False time.sleep(1) dev_info = controller.readDeviceInfo() if dev_info: info(f'Read dev info: {dev_info}') else: error('Failed to read dev_info') return False real = controller.readMode(mode) if real: info(f'Read real mode: {real}') if mode == dev_info[1]: info(f'Check successful') return True else: error('Check failed: mode indexes do not match') else: error('Failed to read mode') return False def test_power_read(controller: c.ControllerInterface) -> bool: info('Reading powers...') powers = controller.readPowers() if powers: info(f'Read powers: {powers}') return True else: error('Failed to read powers') return False def test_power_write(controller: c.ControllerInterface) -> bool: # Ia Ib Ic Id Ua Ub Uc Ud Pposs Kn_a Kn_b Kn_c Kn_d Am_H Am_L Pwm_H Pwm_L Pm # 43, 41, 48, 57, 0, 0, 0, 0, 30, 0, 0, 0, 0, 0, 0, 117, 48, 30 info('Reading powers...') powers = controller.readPowers() if powers: info(f'Read powers: {powers}') else: error('Failed to read powers') return False p_max = powers[17] p_poss = powers[8] pwm_max = int.from_bytes([powers[15], powers[16]], byteorder='big') current_min = 400 current_max = 670 currents = [ random.randint(current_min, current_max) // 10 * 10, random.randint(current_min, current_max) // 10 * 10, random.randint(current_min, current_max) // 10 * 10, random.randint(current_min, current_max) // 10 * 10, ] voltage_min = 2 voltage_max = 60 voltages = [ random.randint(voltage_min, voltage_max), random.randint(voltage_min, voltage_max), random.randint(voltage_min, voltage_max), random.randint(voltage_min, voltage_max), ] currents_ma = [currents[i] * 0.001 for i in range(4)] currents_controller = [int(currents[i] * 0.1) for i in range(4)] voltages_controller = [int(voltages[i] / 0.235) for i in range(4)] powers = [currents_ma[i] * voltages[i] for i in range(4)] p_sum = sum(powers) ka = [powers[i] / p_sum for i in range(4)] km = min(p_poss / p_sum, 1) Amax = int(km * pwm_max) kn = [min(int(ka[i] * 256), 255) for i in range(4)] # TODO check if 255 info(f'Currents: {currents}') info(f'Currents controller: {currents_controller}') info(f'Currents ma: {currents_ma}') info(f'Voltages: {voltages}') info(f'Voltages controller: {voltages_controller}') info(f'Powers: {powers}') info(f'Power sum: {p_sum}') info(f'Ka: {ka}') info(f'Km: {km}') info(f'Pmax: {p_max}') info(f'Pext: {p_poss}') info(f'PwmMax: {pwm_max}') info(f'Amax: {Amax}') info(f'Kn: {kn}') data = (currents_controller + voltages_controller + [p_poss] + kn # + [Amax.to_bytes(2, byteorder='big')[0], Amax.to_bytes(2, byteorder='big')[1]] # TODO баг в прошивке + [kn[2], kn[3]] ) info(f'Writing powers: {data}...') res = controller.writePowers(currents_controller, voltages_controller, p_poss, kn, Amax) if res: info(f'Written powers: {data}') else: error('Failed to write powers') return False time.sleep(1) real = controller.readPowers() if real: data = data + [pwm_max.to_bytes(2, byteorder='big')[0], pwm_max.to_bytes(2, byteorder='big')[1]] + [p_max] info(f'Added const data to written data') info(f'Modded powers: {data}') info(f'Read rl powers: {real}') if real == data: info(f'Check successful') return True else: error('Check failed: powers do not match') else: error('Failed to read powers') return False