import threading import time class ControllerBase: def __init__(self, control_rate=0.1): self.robot = None self.controlling = False self.target_pos = None self.control_thread = None self.control_rate = control_rate def control_loop(self): while self.controlling: if self.target_pos is not None: state = self.get_measured_position() control = self.compute_control(state) self.apply_control(control) time.sleep(self.control_rate) self.apply_control((0.0,0.0)) # stop robot def set_target_position(self, target_pos): self.target_pos = target_pos def get_measured_position(self): if self.robot is not None: return self.robot.get_measurement() else: raise Exception("error: controller cannot get measurement!\n there is no robot attached to the controller!") def compute_control(self, state): return (0.0, 0.0) def apply_control(self, control): if self.robot is not None: self.robot.send_cmd(u1=control[0], u2=control[1]) else: raise Exception("error: controller cannot apply control!\n there is no robot attached to the controller!") def attach_robot(self, robot): self.robot = robot def start(self): self.controlling = True # start control thread self.control_thread = threading.Thread(target=self.control_loop) self.control_thread.start() def stop(self): # pause controller self.controlling = False if self.control_thread is not None: self.control_thread.join()