From e18d1fb7a008b9763c3e320c52acc0a182fbfd6e Mon Sep 17 00:00:00 2001 From: hehesheng Date: Tue, 21 Jan 2025 19:15:34 +0800 Subject: [PATCH] feat: add record csv --- motor_manager.py | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/motor_manager.py b/motor_manager.py index 4166e27..2e8f3e0 100644 --- a/motor_manager.py +++ b/motor_manager.py @@ -4,6 +4,8 @@ import threading import traceback import random import os +import csv +from datetime import datetime import logging import yaml import typing @@ -47,6 +49,11 @@ class MotorManager(object): self.cmd_interval_ms = cmd_interval_ms self.register_task(MotorManager.transfer_motor_cmds_task, task_name="TransferCmds") self.register_task(MotorManager.notify_motor_data_task, task_name="NotifyData") + self.init_record_csv() + self.register_task(MotorManager.record_csv_file, task_name="RecordCSV") + + def __del__(self): + pass def register_motor(self, motor: MotorInstance): self.motor_dict[motor.get_motor_name()] = motor @@ -93,6 +100,34 @@ class MotorManager(object): def add_motor_data_callback(self, callback: typing.Callable): self.motor_data_callback_list.append(callback) + def init_record_csv(self): + if not os.path.exists(base_dir + "/record"): + os.mkdir(base_dir + "/record") + datetime_str = datetime.now().strftime("%Y%m%d_%H%M%S") + with open("/".join([base_dir, "record", f"{datetime_str}_record.csv"]), "w", newline="") as csvfile: + fieldnames = ["timestamp", "motor_name", "tau", "dq", "q"] + self.recorder = csv.DictWriter(csvfile, fieldnames=fieldnames) + self.recorder.writeheader() + + @staticmethod + def record_csv_file(self: "MotorManager"): + if self.recorder is None: + return + timestamp = int(time.time() * 1000) + notify_data = {} + with self.motor_cmds_and_data_sem as sem: + notify_data = copy.deepcopy(self.motor_data) + for name, data in notify_data.items(): + self.recorder.writerow( + { + timestamp: timestamp, + "motor_name": name, + "tau": data.tau, + "dq": data.dq, + "q": data.q, + } + ) + def run(self): self.loop_flag = True self.transfer_thread = threading.Thread(target=self.loop)