robot_protocol_framework/motor_manager.py
2025-01-21 19:14:55 +08:00

82 lines
2.6 KiB
Python

import time
import threading
import traceback
import random
import os
import logging
import yaml
import typing
from unitree_actuator_sdk import *
from motor_instance import MotorInstance
base_dir = os.path.dirname(__file__)
log_config = None
if not os.path.exists(base_dir + "/logs"):
os.mkdir(base_dir + "/logs")
with open(base_dir + "/logging_config.yaml", "r") as f:
log_config = yaml.safe_load(f.read())
logging.config.dictConfig(log_config)
logger = logging.getLogger(__file__.split("/")[-1])
for handle in logger.handlers:
if isinstance(handle, logging.handlers.TimedRotatingFileHandler):
handle.suffix = "%Y-%m-%d.log"
class MotorManager(object):
transfer_thread: threading.Thread | None = None
cmd_interval_ms: int
motor_dict: dict[MotorInstance] = dict()
loop_flag: bool = False
motor_cmds: dict[str, int]
task_list: dict[str, typing.Callable] = {}
def __init__(self, cmd_interval_ms: int):
self.cmd_interval_ms = cmd_interval_ms
def register_motor(self, motor: MotorInstance):
self.motor_dict[motor.get_motor_name()] = motor
def register_task(self, task: typing.Callable, task_name: str = None) -> str:
if task_name is None or task_name == "":
task_name = task.__name__ + "-" + str(time.time * 1000)
self.task_list[task_name] = task
return task_name
def run(self):
self.loop_flag = True
self.transfer_thread = threading.Thread(target=self.loop)
self.transfer_thread.start()
def stop(self):
self.stop_without_join()
self.transfer_thread.join()
def stop_without_join(self):
self.loop_flag = False
def loop(self):
cur_time = time.time() * 1000
next_run_time = cur_time + self.cmd_interval_ms
while self.loop_flag:
for task_name, task in self.task_list.items():
try:
task(self, task_name)
except Exception as e:
logger.warning(f"run task: {task_name} has trouble: {traceback.format_exc()}")
cur_time = time.time() * 1000
time_delta = cur_time - next_run_time
if time_delta < 0:
logger.warning(f"loop run too slow, took {self.cmd_interval_ms - time_delta} ms")
next_run_time = cur_time + self.cmd_interval_ms
# timeout and no sleep
else:
sleep_seconds = (next_run_time - cur_time) / 1000
next_run_time = next_run_time + self.cmd_interval_ms
time.sleep(sleep_seconds)