Compare commits

..

10 Commits

8 changed files with 600 additions and 40 deletions

View File

@ -0,0 +1,27 @@
import socket
import json
HOST, PORT = "localhost", 42424
def move_grid(x, y, orientation, dimx, dimy):
print("move grid")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((HOST, PORT))
payload = json.dumps({"x": x, "y": y, "dimx": dimx, "dimy": dimy, "orientation": orientation})
sock.sendall(f"move_grid_blocking;{payload}\n".encode())
target_reached = False
try:
while not target_reached:
reply = sock.recv(1024)
if reply == b'':
raise RuntimeError
#print("got reply: ", reply)
target_reached = reply.decode() == 'ack\n'
print("target_reached = ", target_reached)
except RuntimeError:
# try again
print("controlling failed -> trying again")
move_grid(x, y, orientation, dimx, dimy)
if __name__ == "__main__":
move_grid(3,2,'v', 7, 4)

View File

@ -0,0 +1,499 @@
import numpy as np
import random
import pygame
import os
from event_server_comm import move_grid
BLACK = np.array([0, 0, 0], dtype=np.uint8)
WHITE = np.array([255, 255, 255], dtype=np.uint8)
RED = np.array([255, 0, 0], dtype=np.uint8)
BLUE = np.array([0, 0, 255], dtype=np.uint8)
YELLOW = np.array([255, 255, 0], dtype=np.uint8)
GREEN = np.array([0, 255, 0], dtype=np.uint8)
pygame.init()
pygame.font.init() # you have to call this at the start,
# if you want to use this module.
myfont = pygame.font.SysFont('Comic Sans MS', 55)
myfont_small = pygame.font.SysFont('Comic Sans MS', 45)
P0_text = myfont.render('P0', False, tuple(BLACK))
tiledt = np.dtype([('x', np.uint8), ('y', np.uint8), ('color', np.uint8, 3), ('star', np.bool)])
class Board:
valid_colors = [WHITE, RED, BLUE]
def __init__(self, dim_x, dim_y):
self.tiles = np.zeros((dim_y, dim_x), dtype=tiledt)
for x in range(dim_x):
for y in range(dim_y):
self.tiles[y, x]['color'] = random.choice(Board.valid_colors)
def render(self, scale_fac):
dimy, dimx = self.tiles.shape
board_surf = pygame.Surface((dimx * scale_fac, dimy * scale_fac))
star_surf = pygame.Surface((scale_fac, scale_fac), pygame.SRCALPHA)
pygame.draw.circle(star_surf, YELLOW, (int(0.5 * scale_fac), int(0.5 * scale_fac)), int(0.25 * scale_fac))
for y in range(self.tiles.shape[0]):
for x in range(self.tiles.shape[1]):
pygame.draw.rect(board_surf, tuple(self.tiles[y, x]['color']),
(x * scale_fac, y * scale_fac, scale_fac, scale_fac), 0)
pygame.draw.rect(board_surf, (0, 0, 0),
(x * scale_fac, y * scale_fac, scale_fac, scale_fac), 1)
if self.tiles[y, x]['star']:
board_surf.blit(star_surf, (x * scale_fac, y * scale_fac, scale_fac, scale_fac))
return board_surf
def get_xdims(self):
return self.tiles.shape[1]
def get_ydims(self):
return self.tiles.shape[0]
# def __repr__(self):
# s = ''
# for y in range(self.tiles.shape[0]):
# for x in range(self.tiles.shape[1]):
# if (x,y) == (self.robot.x, self.robot.y):
# s += self.robot.orientation
# else:
# s += '.'
# s += '\n'
# return s
class Robot:
orientations = ['^', 'left', 'down', 'right']
resulting_orientation = {
'^': {'left': '<', 'right': '>'},
'>': {'left': '^', 'right': 'v'},
'v': {'left': '>', 'right': '<'},
'<': {'left': 'v', 'right': '^'},
}
def __init__(self, x, y, orientation):
self.x = x
self.y = y
self.orientation = orientation
self.position_changed = False
def get_forward_coordinates(self):
# get the coordinates of the neighboring tile in the given direction
if self.orientation == '^':
return self.y - 1, self.x
elif self.orientation == '>':
return self.y, self.x + 1
elif self.orientation == 'v':
return self.y + 1, self.x
elif self.orientation == '<':
return self.y, self.x - 1
else:
raise Exception("error: undefined direction")
def get_angle(self):
angle = {'>': 0, '^': np.pi/2, '<': np.pi, 'v': 3*np.pi/2}[self.orientation]
return np.rad2deg(angle)
def render(self, scale_fac):
robot_surf = pygame.Surface((scale_fac, scale_fac), pygame.SRCALPHA)
pygame.draw.lines(robot_surf, (0, 0, 0), True, [(0.75 * scale_fac, 0.5 * scale_fac),
(0.25 * scale_fac, 0.25 * scale_fac),
(0.25 * scale_fac, 0.75 * scale_fac)], 3)
robot_surf = pygame.transform.rotate(robot_surf, self.get_angle())
return robot_surf
def update_pos(self, dimx, dimy):
move_grid(self.x, self.y, self.orientation, dimx, dimy)
self.position_changed = False
def __repr__(self):
return f"({self.y}, {self.x}) - {self.orientation}"
class Command:
valid_actions = {'forward', 'left', 'right', 'P0', '-'}
def __init__(self, action=None, color=WHITE):
if not (action in Command.valid_actions and any([np.all(color == c) for c in Board.valid_colors])):
raise ValueError("invalid values for command")
self.action = action
self.color = color
def __repr__(self):
return f"{self.action}: {self.color}"
def render(self, scale_fac):
cmd_surf = pygame.Surface((scale_fac, scale_fac))
cmd_surf.fill(tuple(self.color))
arrow_surf = pygame.Surface((300, 300), pygame.SRCALPHA)
pygame.draw.polygon(arrow_surf, (0, 0, 0),
((0, 100), (0, 200), (200, 200), (200, 300), (300, 150), (200, 0), (200, 100)))
arrow_surf = pygame.transform.scale(arrow_surf, (int(0.9*scale_fac), int(0.9*scale_fac)))
if self.action == 'forward':
arrow_surf = pygame.transform.rotate(arrow_surf, 90)
elif self.action == 'left':
arrow_surf = pygame.transform.rotate(arrow_surf, 180)
if self.action in {'left', 'forward', 'right'}:
cmd_surf.blit(arrow_surf, (0.05*scale_fac,0.05*scale_fac,0.95*scale_fac,0.95*scale_fac))
elif self.action == 'P0':
cmd_surf.blit(P0_text, (0.05*scale_fac,0.05*scale_fac,0.95*scale_fac,0.95*scale_fac))
return cmd_surf
class Programmer:
def __init__(self, prg):
self.prg = prg
self.available_inputs = [Command('forward'), Command('left'), Command('right'), Command('P0'),
Command('-', color=RED), Command('-', color=BLUE), Command('-', color=WHITE)]
self.command_to_edit = 0
self.screen_rect = None
def render(self, scale_fac):
"""Render surface with possible inputs for the robot.
:return: surface of the input commands
"""
inp_surf = pygame.Surface((len(self.available_inputs) * scale_fac, 1 * scale_fac))
for i, inp in enumerate(self.available_inputs):
cmd_surf = inp.render(scale_fac)
inp_surf.blit(cmd_surf, (i * scale_fac, 0, scale_fac, scale_fac))
return inp_surf
def update_selected_command(self, pos):
print(f"clicked at pos = {pos}")
xoffset = pos[0] - self.screen_rect.x
selected_input_index = xoffset * len(self.available_inputs) // self.screen_rect.width
selected_input = self.available_inputs[selected_input_index]
edited_command = self.prg.cmds[self.command_to_edit]
print("command before edit = ", edited_command)
if selected_input.action != '-':
edited_command.action = selected_input.action
else:
edited_command.color = selected_input.color
print("command after edit = ", edited_command)
class Program:
def __init__(self, robot, board, cmds):
self.cmds = cmds
self.robot = robot
self.board = board
self.prg_counter = 0
self.screen_rect = None
def step(self, state='running'):
cmd = self.cmds[self.prg_counter]
self.prg_counter += 1
# current position
x = self.robot.x
y = self.robot.y
# current tile the robot is on
tile = self.board.tiles[y, x]
# apply next instruction of the program
if np.all(cmd.color == WHITE) or np.all(cmd.color == tile['color']):
# matching color -> execute command
if cmd.action == 'forward':
ynew, xnew = self.robot.get_forward_coordinates()
self.robot.x = xnew
self.robot.y = ynew
self.robot.position_changed = True
#self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
elif cmd.action in {'left', 'right'}:
self.robot.orientation = Robot.resulting_orientation[self.robot.orientation][cmd.action]
#self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
self.robot.position_changed = True
elif cmd.action == 'P0':
self.prg_counter = 0
else:
print("color not matching -> skipping command")
# update state for new robot position
if (not (0 <= self.robot.x < self.board.tiles.shape[1])) or not (0 <= self.robot.y < self.board.tiles.shape[0]):
# robot leaves the board -> GAME OVER
print("GAME OVER")
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
return 'game_over'
# robot collects star on new tile
tile = self.board.tiles[self.robot.y, self.robot.x]
if tile['star']:
tile['star'] = False
if all([not t['star'] for t in self.board.tiles.flatten()]):
print("YOU WON")
return 'won'
# by default we continue in the running state
return state
def render(self, scale_fac, prg_counter_override=None):
"""Render the current program. This will render all commands and highlight the next command to execute
(determined by self.prg_counter).
The prg_counter_override can be used to highlight a different command instead. This is used during input mode
to highlight the command the user will edit.
:param scale_fac:
:param prg_counter_override:
:return:
"""
prg_counter = self.prg_counter
if prg_counter_override is not None:
prg_counter = prg_counter_override
prg_surf = pygame.Surface((5 * scale_fac, 1 * scale_fac))
for i in range(5):
if i < len(self.cmds):
cmd = self.cmds[i]
cmd_surf = cmd.render(scale_fac)
else:
cmd_surf = pygame.Surface((scale_fac,scale_fac))
cmd_surf.fill(WHITE)
if prg_counter is not None and i == prg_counter:
pygame.draw.rect(cmd_surf, tuple(GREEN), (0, 0, scale_fac, scale_fac), 5)
prg_surf.blit(cmd_surf, (i * scale_fac, 0, scale_fac, scale_fac))
return prg_surf
def get_clicked_command(self, pos):
print(f"clicked at pos = {pos}")
xoffset = pos[0] - self.screen_rect.x
clicked_cmd = xoffset * len(self.cmds) // self.screen_rect.width
print("clicked command = ", clicked_cmd)
return clicked_cmd
class Game:
def __init__(self, dimx, dimy, robotx, roboty):
self.robot = Robot(x=robotx, y=roboty, orientation='v')
self.board = Board(dimx, dimy)
coin1x = np.random.randint(0, dimx)
coin1y = np.random.randint(0, dimy)
self.board.tiles[coin1y,coin1x]['star'] = True
self.board.tiles[3,2]['star'] = True
# TODO fix number of commands at 5
self.cmds = [Command('forward'), Command('left', color=RED), Command('left', color=BLUE), Command('P0'), Command('-')]
self.state = 'won'
self.prg = Program(self.robot, self.board, self.cmds)
self.programmer = Programmer(self.prg)
#self.scale_fac = 180
self.scale_fac = 125
self.beamer_mode = False
self.screen = pygame.display.set_mode((int(self.board.tiles.shape[1] * self.scale_fac * 1.1),
int((self.board.tiles.shape[0] + 2) * self.scale_fac * 1.2)))
self.game_over_text = myfont.render('GAME OVER', False, BLACK)
self.won_text = myfont.render('YOU WON', False, BLACK)
self.run_text = myfont.render('RUN', False, tuple(BLACK))
self.stop_text = myfont_small.render('STOP', False, tuple(BLACK))
self.step_text = myfont_small.render('STEP', False, tuple(BLACK))
# save initial state
self.initial_pos = (self.robot.x, self.robot.y, self.robot.orientation)
self.inital_board_tiles = self.board.tiles.copy()
def render(self):
"""Render the game screen.
This will render the board and the robot.
Depending on the display mode it will also render the current program and the input commands for the robot.
:param prg_counter:
:return:
"""
if self.beamer_mode:
dx = 0
dy = 0
else:
dx = int(0.05 * self.screen.get_width())
dy = int(0.05 * self.screen.get_height())
self.screen.fill(tuple(BLACK))
# render the board
board_surf = self.board.render(self.scale_fac)
# render robot onto the board surface
robot_surf = self.robot.render(self.scale_fac)
board_surf.blit(robot_surf, (self.robot.x * self.scale_fac, self.robot.y * self.scale_fac, self.scale_fac, self.scale_fac))
self.screen.blit(board_surf, (dx, dy, dx + self.board.tiles.shape[1] * self.scale_fac, dy + self.board.tiles.shape[0] * self.scale_fac))
if not self.beamer_mode:
# if we are not in beamer mode we also show the current program / inputs
# render program
if self.state == 'input':
# in input mode we highlight the command which is selected for edit
prg_surf = self.prg.render(self.scale_fac, prg_counter_override=self.programmer.command_to_edit)
else:
# in other modes we render the current program counter
prg_surf = self.prg.render(self.scale_fac)
prg_surf = pygame.transform.scale(prg_surf, (self.screen.get_width() * 2 // 3, self.scale_fac * 2 // 3))
self.prg.screen_rect = pygame.Rect((dx, board_surf.get_height() + 2 * dy, prg_surf.get_width(), prg_surf.get_height()))
self.screen.blit(prg_surf, self.prg.screen_rect)
# render input fields and buttons
inp_surf = self.programmer.render(self.scale_fac)
inp_surf = pygame.transform.scale(inp_surf, (self.screen.get_width() * 2 // 3, self.scale_fac * 2 // 3))
self.programmer.screen_rect = pygame.Rect((dx, board_surf.get_height() + prg_surf.get_height() + 3 * dy, inp_surf.get_width(), inp_surf.get_height()))
self.screen.blit(inp_surf, self.programmer.screen_rect)
btn_surf = pygame.Surface((3 * self.scale_fac//2, self.scale_fac))
self.btn_rect = pygame.Rect((2 * dx + prg_surf.get_width(), board_surf.get_height() + 2 * dy,
btn_surf.get_height(), btn_surf.get_width()))
if self.state == 'input':
btn_surf.fill(tuple(GREEN))
btn_surf.blit(self.run_text, (0, 10))
elif self.state == 'running':
btn_surf.fill(tuple(RED))
btn_surf.blit(self.stop_text, (0, 10))
elif self.state == 'stepping':
btn_surf.fill(tuple(YELLOW))
btn_surf.blit(self.step_text, (0, 10))
self.screen.blit(btn_surf, self.btn_rect)
# render messages
if self.state == 'game_over':
game_over_surf = pygame.Surface((self.screen.get_width() // 2, self.screen.get_height() // 2))
game_over_surf.fill(tuple(GREEN))
game_over_surf.blit(self.game_over_text, ((game_over_surf.get_width() - self.game_over_text.get_width()) // 2,
(game_over_surf.get_height() - self.game_over_text.get_height()) // 2))
self.screen.blit(game_over_surf, (self.screen.get_width() // 4, self.screen.get_height() // 4))
pygame.display.update()
pygame.time.wait(1500)
self.state = 'reset'
elif self.state == 'won':
won_surf = pygame.Surface((self.screen.get_width() // 2, self.screen.get_height() // 2))
won_surf.fill(tuple(GREEN))
won_surf.blit(self.won_text,
((won_surf.get_width() - self.won_text.get_width()) // 2,
(won_surf.get_height() - self.won_text.get_height()) // 2))
self.screen.blit(won_surf, (self.screen.get_width() // 4, self.screen.get_height() // 4))
pygame.display.update()
pygame.time.wait(1500)
self.state = 'reset'
pygame.display.flip()
def process_inputs(self):
# proceed events
for event in pygame.event.get():
if event.type == pygame.QUIT:
self.state = 'quit'
# handle MOUSEBUTTONUP
elif event.type == pygame.MOUSEBUTTONUP:
pos = pygame.mouse.get_pos()
# select command to edit by the programmer
if self.prg.screen_rect is not None:
if self.prg.screen_rect.collidepoint(pos):
print(f"clicked at pos = {pos}")
self.programmer.command_to_edit = self.prg.get_clicked_command(pos)
# set the selected command to a new value
if self.programmer.screen_rect is not None:
if self.programmer.screen_rect.collidepoint(pos):
self.programmer.update_selected_command(pos)
# clicked RUN/STOP button
if self.btn_rect is not None and self.btn_rect.collidepoint(*pos):
print(f"clicked at pos = {pos}")
if self.state == 'running':
self.state = 'input'
elif self.state == 'input':
self.state = 'running'
elif self.state == 'stepping':
self.state = self.prg.step(self.state)
elif event.type == pygame.KEYUP:
if event.key == pygame.K_x:
if not self.beamer_mode:
# switch to beamer mode
os.environ['SDL_VIDEO_WINDOW_POS'] = '1920, 280'
self.scale_fac = 180
self.screen = pygame.display.set_mode((self.board.tiles.shape[1] * self.scale_fac,
self.board.tiles.shape[0] * self.scale_fac),
pygame.NOFRAME)
self.beamer_mode = True
else:
# switch to normal mode
os.environ['SDL_VIDEO_WINDOW_POS'] = '0, 0'
self.scale_fac = 125
self.screen = pygame.display.set_mode((int(self.board.tiles.shape[1] * self.scale_fac * 1.1),
int((self.board.tiles.shape[0] + 2) * self.scale_fac * 1.2)))
self.beamer_mode = False
elif event.key == pygame.K_s:
# run program
self.state = 'running'
elif event.key == pygame.K_SPACE:
if self.state != 'stepping':
self.state = 'stepping'
else:
self.state = self.prg.step(self.state)
elif event.key == pygame.K_r:
self.state = 'reset'
return self.state
def reset(self):
self.prg.prg_counter = 0
self.robot.x = self.initial_pos[0]
self.robot.y = self.initial_pos[1]
self.robot.orientation = self.initial_pos[2]
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
self.board.tiles = self.inital_board_tiles.copy()
return 'input'
def run(self):
running = True
while running:
self.state = self.process_inputs()
if self.state == 'input':
pass
elif self.state == 'running':
self.state = self.prg.step()
elif self.state == 'reset':
self.state = self.reset()
elif self.state == 'quit':
running = False
elif self.state == 'stepping':
pass
elif self.state == 'game_over' or self.state == 'won':
pass
else:
print("unknown state")
return
self.render()
if self.robot.position_changed:
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
pygame.time.wait(100)
if __name__ == "__main__":
seed = 2
random.seed(seed)
np.random.seed(seed)
game = Game(dimx=7, dimy=4, robotx=3, roboty=1)
game.run()
# TODOs
# - in stepping mode (s) it is possible to drive outside of the map
# - when no P0 command is present the program counter will overflow

View File

@ -10,6 +10,7 @@ import pyqtgraph as pg
from shapely.geometry import LineString
from shapely.affinity import scale
from queue import Queue
import aruco
@ -448,8 +449,11 @@ class ArucoEstimator:
row_line_top = a + y_frac * vad
row_line_bottom = b + y_frac * vbc
column_line = LineString([column_line_top, column_line_bottom])
row_line = LineString([row_line_top, row_line_bottom])
column_line_orig = LineString([column_line_top, column_line_bottom])
row_line_orig = LineString([row_line_top, row_line_bottom])
column_line = scale(column_line_orig, 10, 10)
row_line = scale(row_line_orig, 10, 10)
int_pt = column_line.intersection(row_line)
point_of_intersection = np.array([int_pt.x, int_pt.y])

View File

@ -57,8 +57,8 @@ class ControlCommander:
if not all_detected:
print(f"warning: no measurements available for the following robots: {undetected_robots}")
time.sleep(0.5)
print("all robots detected -> starting control")
print("starting control")
self.running = True
while self.running:
while not self.event_listener.event_queue.empty():
@ -111,13 +111,13 @@ class ControlCommander:
if __name__ == '__main__':
id_ip_dict = {
#11: '10.10.11.88',
12: '192.168.1.12',
#12: '192.168.1.12',
13: '192.168.1.13',
#14: '10.10.11.89',
}
# controller_type = {12: 'mpc', 13: 'pid'}
controller_type = 'pid'
controller_type = 'mpc'
rc = ControlCommander(id_ip_dict, controller_type=controller_type)
rc.run()

View File

@ -12,30 +12,38 @@ class MeasurementHandler(socketserver.BaseRequestHandler):
data = self.request.recv(1024).strip()
cur_thread = threading.current_thread()
print(f"current thread {cur_thread}")
print(f"start current thread {cur_thread}")
if 'events' in data.decode():
print(f"{cur_thread} subscribed to events")
self.request.sendall('subscribed to events\n'.encode())
# send any events in the event queue to the subscriber
while True:
while not self.server.estimator.event_queue.empty():
event = self.server.estimator.event_queue.get()
# we distinguish two kinds of events:
if event[0] == 'response_event':
# 1. for 'response_event' events we expect the event subscriber to give a reply which will then
# by passed on to the response queue for transmitting to the original correspondent
message = event[1]['event']
#print(f"passing command {message} on to subscriber")
self.request.sendall((json.dumps(message) + '\n').encode())
reply = self.request.recv(1024)
#print(f"putting reply {reply} in response queue")
self.server.response_queue.put(reply)
else:
# 2. for other types of events we don't expect a reply and just pass them on to the subscriber
self.request.sendall((json.dumps(event) + '\n').encode())
self.server.estimator.last_event = None
time.sleep(1.0 / self.server.max_measurements_per_second)
event_loop_running = True
while event_loop_running:
try:
while not self.server.estimator.event_queue.empty():
event = self.server.estimator.event_queue.get()
# we distinguish two kinds of events:
if event[0] == 'response_event':
# 1. for 'response_event' events we expect the event subscriber to give a reply which will then
# by passed on to the response queue for transmitting to the original correspondent
message = event[1]['event']
#print(f"passing command {message} on to subscriber")
self.request.sendall((json.dumps(message) + '\n').encode())
reply = self.request.recv(1024)
#print(f"putting reply {reply} in response queue")
self.server.response_queue.put(reply)
else:
# 2. for other types of events we don't expect a reply and just pass them on to the subscriber
self.request.sendall((json.dumps(event) + '\n').encode())
self.server.estimator.last_event = None
time.sleep(1.0 / self.server.max_measurements_per_second)
except Exception as e:
print(f"exception in event loop: {e}")
event_loop_running = False
print("after event loop")
elif 'corners' in data.decode():
print(f"{cur_thread} subscribed to corners")
# send positions of the board markers
corner_estimates = self.server.estimator.corner_estimates
response = {}
@ -43,19 +51,23 @@ class MeasurementHandler(socketserver.BaseRequestHandler):
response[corner] = {'x': data['x'], 'y': data['y']}
self.request.sendall((json.dumps(response) + '\n').encode())
elif 'move_grid_blocking' in data.decode():
print(f"{cur_thread} subscribed move_grid_blocking")
# if we receive a move_grid event
# ( e.g. move_grid;{"x":1,"y":1,"dimx":10,"dimy":10,"orientation":"^","require_response":True} )
# ( e.g. move_grid_blocking;{"x":1,"y":1,"dimx":10,"dimy":10,"orientation":"^"} )
#move_grid_blocking;{"x": 1, "y": 2, "dimx": 7, "dimy": 4, "orientation": ">"}
# we compute the corresponding real-world position the robot should drive to
# and then create a new move event which is put in the event queue and will be propagated to the ControlCommander
data_decode = data.decode()
# print("data: ", data_decode)
payload = data.decode().split(';')[1]
#print("data: ", data_decode)
payload = data_decode.split(';')[1]
#print("payload: ", payload)
grid_pos = json.loads(payload)
# print("grid_pos = ", grid_pos)
#print("grid_pos = ", grid_pos)
pos = self.server.estimator.get_pos_from_grid_point(grid_pos['x'], grid_pos['y'], grid_pos['dimx'],
grid_pos['dimy'], grid_pos['orientation'])
# print("pos = ", pos)
# print("event requiring response")
#print("pos = ", pos)
#print("event requiring response")
# put blocking move command in event queue
self.server.estimator.event_queue.put(('response_event',
{'event': ('move_blocking', {'x': pos[0], 'y': pos[1], 'angle': pos[2]})}))
@ -64,13 +76,17 @@ class MeasurementHandler(socketserver.BaseRequestHandler):
# TODO this assumes that we wait only for at most one response at a time
# we could add some kind of reference here to handle multiple responses (e.g. id of the response to wait for)
while self.server.response_queue.empty():
time.sleep(0.2)
self.request.sendall(b'.\n')
pass
reply = self.server.response_queue.get()
while not self.server.response_queue.empty():
reply = self.server.response_queue.get()
# send back response to the original source
#print(f"sending reply {reply} back to correspondent {self.request}")
print(f"sending reply {reply} back to correspondent {self.request}")
self.request.sendall(reply)
else:
print(f"{cur_thread} subscribed to robot position")
# send robot position
try:
marker_id = int(data)
@ -79,15 +95,22 @@ class MeasurementHandler(socketserver.BaseRequestHandler):
if marker_id is not None:
if marker_id in self.server.estimator.robot_marker_estimates:
while True:
self.request.sendall((json.dumps(self.server.estimator.robot_marker_estimates[marker_id])
marker_loop_running = True
while marker_loop_running:
try:
self.request.sendall((json.dumps(self.server.estimator.robot_marker_estimates[marker_id])
+ '\n').encode())
time.sleep(1.0 / self.server.max_measurements_per_second)
time.sleep(1.0 / self.server.max_measurements_per_second)
except Exception as e:
print(f"exception in marker loop: {e}")
marker_loop_running = False
else:
self.request.sendall("error: unknown robot marker id\n".encode())
else:
self.request.sendall("error: data not understood. "
"expected <robot marker id (int)> or 'events'\n".encode())
print(f"end current thread {cur_thread}")
return
@ -105,7 +128,7 @@ class MeasurementServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
if __name__ == "__main__":
aruco_estimator = ArucoEstimator(use_realsense=False, robot_marker_ids=[12, 13])
aruco_estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[12, 13])
# first we start thread for the measurement server
measurement_server = MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator,

View File

@ -6,7 +6,7 @@ from casadi_opt import OpenLoopSolver
class MPCController(ControllerBase):
def __init__(self, N=20, T=1.0):
def __init__(self, N=10, T=1.0):
super().__init__()
self.t = None

View File

@ -70,9 +70,9 @@ class PIDController(ControllerBase):
angles_unwrapped = np.unwrap([x_pred[2], target_angle]) # unwrap angle to avoid jump in data
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
e_pos = np.linalg.norm(v)
e_pos = np.linalg.norm(v[0:2])
if e_pos < 0.05:
if e_pos < 0.03:
self.mode = 'angle'
self.e_angle_old = 0
self.e_pos_old = 0
@ -111,6 +111,10 @@ class PIDController(ControllerBase):
u2 = 0.0
return u1, u2
def stop(self):
super(PIDController, self).stop()
self.t = None
def apply_control(self, control):
self.t = time.time() # save time when the most recent control was applied
super(PIDController, self).apply_control(control)

View File

@ -111,12 +111,14 @@ class ControlledRobot(Robot):
def start_control(self):
if self.controller is not None:
print("starting control")
self.controller.start()
else:
raise Exception("Error: Cannot start control: there is not controller attached to the robot!")
def stop_control(self):
if self.controller is not None:
print("stopping control")
self.controller.stop()
else:
raise Exception("Error: Cannot stop control: there is not controller attached to the robot!")
@ -141,8 +143,9 @@ class ControlledRobot(Robot):
angles_unwrapped = np.unwrap([current_pos[2], target_pos[2]]) # unwrap angle to avoid jump in data
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
e_pos = np.linalg.norm(v[0:2])
#print(f"target_pos = {target_pos}, current_pos = {current_pos}, e_pos = {e_pos}, e_ang = {e_angle}")
print(f"e_pos = {e_pos}, e_ang = {e_angle}")
close_to_target = e_pos < 0.05 and abs(e_angle) < 0.1
close_to_target = e_pos < 0.075 and abs(e_angle) < 0.1
time.sleep(0.1)
print("target reached!")
self.stop_control()