import threading import time class ControllerBase: def __init__(self, control_rate=0.1): self.robot = None self.controlling = False self.target_pos = None self.control_thread = None self.control_rate = control_rate def control_loop(self): while self.controlling: if self.target_pos is not None: state = self.get_measured_position() control = self.compute_control(state) if self.controlling: self.apply_control(control) if self.robot is not None: self.robot.send_cmd(u1=0, u2=0) # stop robot def set_target_position(self, target_pos): self.target_pos = target_pos def get_measured_position(self): if self.robot is not None: return self.robot.get_measurement() else: raise Exception("error: controller cannot get measurement!\n" " there is no robot attached to the controller!") def compute_control(self, state): return 0.0, 0.0 def apply_control(self, control): if self.robot is not None: self.robot.send_cmd(u1=control[0], u2=control[1]) else: raise Exception("error: controller cannot apply control!\n" " there is no robot attached to the controller!") time.sleep(self.control_rate) def attach_robot(self, robot): self.robot = robot def start(self): self.controlling = True # start control thread self.control_thread = threading.Thread(target=self.control_loop) self.control_thread.start() def stop(self): # pause controller self.controlling = False if self.control_thread is not None: self.control_thread.join()