61 lines
1.8 KiB
Python
61 lines
1.8 KiB
Python
import threading
|
|
import time
|
|
|
|
|
|
class ControllerBase:
|
|
def __init__(self, control_rate=0.1):
|
|
self.robot = None
|
|
self.controlling = False
|
|
self.target_pos = None
|
|
|
|
self.control_thread = None
|
|
self.control_rate = control_rate
|
|
|
|
def control_loop(self):
|
|
while self.controlling:
|
|
if self.target_pos is not None:
|
|
state = self.get_measured_position()
|
|
|
|
control = self.compute_control(state)
|
|
|
|
if self.controlling:
|
|
self.apply_control(control)
|
|
|
|
self.apply_control((0.0, 0.0)) # stop robot
|
|
|
|
def set_target_position(self, target_pos):
|
|
self.target_pos = target_pos
|
|
|
|
def get_measured_position(self):
|
|
if self.robot is not None:
|
|
return self.robot.get_measurement()
|
|
else:
|
|
raise Exception("error: controller cannot get measurement!\n"
|
|
" there is no robot attached to the controller!")
|
|
|
|
def compute_control(self, state):
|
|
return 0.0, 0.0
|
|
|
|
def apply_control(self, control):
|
|
if self.robot is not None:
|
|
self.robot.send_cmd(u1=control[0], u2=control[1])
|
|
else:
|
|
raise Exception("error: controller cannot apply control!\n"
|
|
" there is no robot attached to the controller!")
|
|
time.sleep(self.control_rate)
|
|
|
|
def attach_robot(self, robot):
|
|
self.robot = robot
|
|
|
|
def start(self):
|
|
self.controlling = True
|
|
# start control thread
|
|
self.control_thread = threading.Thread(target=self.control_loop)
|
|
self.control_thread.start()
|
|
|
|
def stop(self):
|
|
# pause controller
|
|
self.controlling = False
|
|
if self.control_thread is not None:
|
|
self.control_thread.join()
|