feat: add record csv

This commit is contained in:
hehesheng 2025-01-21 19:15:34 +08:00
parent 24b1ef4291
commit e18d1fb7a0

View File

@ -4,6 +4,8 @@ import threading
import traceback import traceback
import random import random
import os import os
import csv
from datetime import datetime
import logging import logging
import yaml import yaml
import typing import typing
@ -47,6 +49,11 @@ class MotorManager(object):
self.cmd_interval_ms = cmd_interval_ms self.cmd_interval_ms = cmd_interval_ms
self.register_task(MotorManager.transfer_motor_cmds_task, task_name="TransferCmds") self.register_task(MotorManager.transfer_motor_cmds_task, task_name="TransferCmds")
self.register_task(MotorManager.notify_motor_data_task, task_name="NotifyData") self.register_task(MotorManager.notify_motor_data_task, task_name="NotifyData")
self.init_record_csv()
self.register_task(MotorManager.record_csv_file, task_name="RecordCSV")
def __del__(self):
pass
def register_motor(self, motor: MotorInstance): def register_motor(self, motor: MotorInstance):
self.motor_dict[motor.get_motor_name()] = motor self.motor_dict[motor.get_motor_name()] = motor
@ -93,6 +100,34 @@ class MotorManager(object):
def add_motor_data_callback(self, callback: typing.Callable): def add_motor_data_callback(self, callback: typing.Callable):
self.motor_data_callback_list.append(callback) self.motor_data_callback_list.append(callback)
def init_record_csv(self):
if not os.path.exists(base_dir + "/record"):
os.mkdir(base_dir + "/record")
datetime_str = datetime.now().strftime("%Y%m%d_%H%M%S")
with open("/".join([base_dir, "record", f"{datetime_str}_record.csv"]), "w", newline="") as csvfile:
fieldnames = ["timestamp", "motor_name", "tau", "dq", "q"]
self.recorder = csv.DictWriter(csvfile, fieldnames=fieldnames)
self.recorder.writeheader()
@staticmethod
def record_csv_file(self: "MotorManager"):
if self.recorder is None:
return
timestamp = int(time.time() * 1000)
notify_data = {}
with self.motor_cmds_and_data_sem as sem:
notify_data = copy.deepcopy(self.motor_data)
for name, data in notify_data.items():
self.recorder.writerow(
{
timestamp: timestamp,
"motor_name": name,
"tau": data.tau,
"dq": data.dq,
"q": data.q,
}
)
def run(self): def run(self):
self.loop_flag = True self.loop_flag = True
self.transfer_thread = threading.Thread(target=self.loop) self.transfer_thread = threading.Thread(target=self.loop)