2020-11-14 15:06:57 +00:00
|
|
|
import threading
|
|
|
|
import time
|
|
|
|
|
2020-11-17 20:23:22 +00:00
|
|
|
|
2020-11-14 15:06:57 +00:00
|
|
|
class ControllerBase:
|
|
|
|
def __init__(self, control_rate=0.1):
|
|
|
|
self.robot = None
|
|
|
|
self.controlling = False
|
|
|
|
self.target_pos = None
|
|
|
|
|
|
|
|
self.control_thread = None
|
|
|
|
self.control_rate = control_rate
|
|
|
|
|
|
|
|
def control_loop(self):
|
|
|
|
while self.controlling:
|
|
|
|
if self.target_pos is not None:
|
|
|
|
state = self.get_measured_position()
|
|
|
|
|
|
|
|
control = self.compute_control(state)
|
|
|
|
|
2020-11-18 20:38:41 +00:00
|
|
|
if self.controlling:
|
|
|
|
self.apply_control(control)
|
2020-11-14 15:06:57 +00:00
|
|
|
|
2021-08-25 20:53:09 +00:00
|
|
|
if self.robot is not None:
|
|
|
|
self.robot.send_cmd(u1=0, u2=0) # stop robot
|
2020-11-14 15:06:57 +00:00
|
|
|
|
|
|
|
def set_target_position(self, target_pos):
|
|
|
|
self.target_pos = target_pos
|
|
|
|
|
|
|
|
def get_measured_position(self):
|
|
|
|
if self.robot is not None:
|
|
|
|
return self.robot.get_measurement()
|
|
|
|
else:
|
2020-11-17 20:23:22 +00:00
|
|
|
raise Exception("error: controller cannot get measurement!\n"
|
|
|
|
" there is no robot attached to the controller!")
|
2020-11-14 15:06:57 +00:00
|
|
|
|
|
|
|
def compute_control(self, state):
|
2020-11-17 20:23:22 +00:00
|
|
|
return 0.0, 0.0
|
2020-11-14 15:06:57 +00:00
|
|
|
|
|
|
|
def apply_control(self, control):
|
|
|
|
if self.robot is not None:
|
|
|
|
self.robot.send_cmd(u1=control[0], u2=control[1])
|
|
|
|
else:
|
2020-11-17 20:23:22 +00:00
|
|
|
raise Exception("error: controller cannot apply control!\n"
|
|
|
|
" there is no robot attached to the controller!")
|
2020-11-18 20:38:41 +00:00
|
|
|
time.sleep(self.control_rate)
|
2020-11-14 15:06:57 +00:00
|
|
|
|
|
|
|
def attach_robot(self, robot):
|
|
|
|
self.robot = robot
|
|
|
|
|
|
|
|
def start(self):
|
|
|
|
self.controlling = True
|
|
|
|
# start control thread
|
|
|
|
self.control_thread = threading.Thread(target=self.control_loop)
|
|
|
|
self.control_thread.start()
|
|
|
|
|
|
|
|
def stop(self):
|
|
|
|
# pause controller
|
|
|
|
self.controlling = False
|
|
|
|
if self.control_thread is not None:
|
|
|
|
self.control_thread.join()
|