57 lines
1.7 KiB
Python
57 lines
1.7 KiB
Python
|
import threading
|
||
|
import time
|
||
|
|
||
|
class ControllerBase:
|
||
|
def __init__(self, control_rate=0.1):
|
||
|
self.robot = None
|
||
|
self.controlling = False
|
||
|
self.target_pos = None
|
||
|
|
||
|
self.control_thread = None
|
||
|
self.control_rate = control_rate
|
||
|
|
||
|
def control_loop(self):
|
||
|
while self.controlling:
|
||
|
if self.target_pos is not None:
|
||
|
state = self.get_measured_position()
|
||
|
|
||
|
control = self.compute_control(state)
|
||
|
|
||
|
self.apply_control(control)
|
||
|
|
||
|
time.sleep(self.control_rate)
|
||
|
self.apply_control((0.0,0.0)) # stop robot
|
||
|
|
||
|
def set_target_position(self, target_pos):
|
||
|
self.target_pos = target_pos
|
||
|
|
||
|
def get_measured_position(self):
|
||
|
if self.robot is not None:
|
||
|
return self.robot.get_measurement()
|
||
|
else:
|
||
|
raise Exception("error: controller cannot get measurement!\n there is no robot attached to the controller!")
|
||
|
|
||
|
def compute_control(self, state):
|
||
|
return (0.0, 0.0)
|
||
|
|
||
|
def apply_control(self, control):
|
||
|
if self.robot is not None:
|
||
|
self.robot.send_cmd(u1=control[0], u2=control[1])
|
||
|
else:
|
||
|
raise Exception("error: controller cannot apply control!\n there is no robot attached to the controller!")
|
||
|
|
||
|
def attach_robot(self, robot):
|
||
|
self.robot = robot
|
||
|
|
||
|
def start(self):
|
||
|
self.controlling = True
|
||
|
# start control thread
|
||
|
self.control_thread = threading.Thread(target=self.control_loop)
|
||
|
self.control_thread.start()
|
||
|
|
||
|
def stop(self):
|
||
|
# pause controller
|
||
|
self.controlling = False
|
||
|
if self.control_thread is not None:
|
||
|
self.control_thread.join()
|