Compare commits

...

9 Commits

5 changed files with 535 additions and 33 deletions

View File

@ -0,0 +1,464 @@
import numpy as np
import random
import pygame
import os
BLACK = np.array([0, 0, 0], dtype=np.uint8)
WHITE = np.array([255, 255, 255], dtype=np.uint8)
RED = np.array([255, 0, 0], dtype=np.uint8)
BLUE = np.array([0, 0, 255], dtype=np.uint8)
YELLOW = np.array([255, 255, 0], dtype=np.uint8)
GREEN = np.array([0, 255, 0], dtype=np.uint8)
pygame.init()
pygame.font.init() # you have to call this at the start,
# if you want to use this module.
myfont = pygame.font.SysFont('Comic Sans MS', 55)
myfont_small = pygame.font.SysFont('Comic Sans MS', 45)
P0_text = myfont.render('P0', False, tuple(BLACK))
tiledt = np.dtype([('x', np.uint8), ('y', np.uint8), ('color', np.uint8, 3), ('star', np.bool)])
class Board:
valid_colors = [WHITE, RED, BLUE]
def __init__(self, dim_x, dim_y):
self.tiles = np.zeros((dim_y, dim_x), dtype=tiledt)
for x in range(dim_x):
for y in range(dim_y):
self.tiles[y, x]['x'] = x
self.tiles[y, x]['y'] = y
self.tiles[y, x]['color'] = random.choice(Board.valid_colors)
def render(self, scale_fac):
dimy, dimx = self.tiles.shape
board_surf = pygame.Surface((dimx * scale_fac, dimy * scale_fac))
star_surf = pygame.Surface((scale_fac, scale_fac), pygame.SRCALPHA)
pygame.draw.circle(star_surf, YELLOW, (int(0.5 * scale_fac), int(0.5 * scale_fac)), int(0.25 * scale_fac))
for y in range(self.tiles.shape[0]):
for x in range(self.tiles.shape[1]):
pygame.draw.rect(board_surf, tuple(self.tiles[y, x]['color']),
(x * scale_fac, y * scale_fac, scale_fac, scale_fac), 0)
pygame.draw.rect(board_surf, (0, 0, 0),
(x * scale_fac, y * scale_fac, scale_fac, scale_fac), 1)
if self.tiles[y, x]['star']:
board_surf.blit(star_surf, (x * scale_fac, y * scale_fac, scale_fac, scale_fac))
return board_surf
# def __repr__(self):
# s = ''
# for y in range(self.tiles.shape[0]):
# for x in range(self.tiles.shape[1]):
# if (x,y) == (self.robot.x, self.robot.y):
# s += self.robot.orientation
# else:
# s += '.'
# s += '\n'
# return s
class Robot:
orientations = ['^', 'left', 'down', 'right']
resulting_orientation = {
'^': {'left': '<', 'right': '>'},
'>': {'left': '^', 'right': 'v'},
'v': {'left': '>', 'right': '<'},
'<': {'left': 'v', 'right': '^'},
}
def __init__(self, x, y, orientation):
self.x = x
self.y = y
self.orientation = orientation
def get_forward_coordinates(self):
# get the coordinates of the neighboring tile in the given direction
if self.orientation == '^':
return self.y - 1, self.x
elif self.orientation == '>':
return self.y, self.x + 1
elif self.orientation == 'v':
return self.y + 1, self.x
elif self.orientation == '<':
return self.y, self.x - 1
else:
raise Exception("error: undefined direction")
def get_angle(self):
angle = {'>': 0, '^': np.pi/2, '<': np.pi, 'v': 3*np.pi/2}[self.orientation]
return np.rad2deg(angle)
def render(self, scale_fac):
robot_surf = pygame.Surface((scale_fac, scale_fac), pygame.SRCALPHA)
pygame.draw.lines(robot_surf, (0, 0, 0), True, [(0.75 * scale_fac, 0.5 * scale_fac),
(0.25 * scale_fac, 0.25 * scale_fac),
(0.25 * scale_fac, 0.75 * scale_fac)], 3)
robot_surf = pygame.transform.rotate(robot_surf, self.get_angle())
return robot_surf
def __repr__(self):
return f"({self.y}, {self.x}) - {self.orientation}"
class Command:
valid_actions = {'forward', 'left', 'right', 'P0', '-'}
def __init__(self, action=None, color=WHITE):
if not (action in Command.valid_actions and any([np.all(color == c) for c in Board.valid_colors])):
raise ValueError("invalid values for command")
self.action = action
self.color = color
def __repr__(self):
return f"{self.action}: {self.color}"
def render(self, scale_fac):
cmd_surf = pygame.Surface((scale_fac, scale_fac))
cmd_surf.fill(tuple(self.color))
arrow_surf = pygame.Surface((300, 300), pygame.SRCALPHA)
pygame.draw.polygon(arrow_surf, (0, 0, 0),
((0, 100), (0, 200), (200, 200), (200, 300), (300, 150), (200, 0), (200, 100)))
arrow_surf = pygame.transform.scale(arrow_surf, (int(0.9*scale_fac), int(0.9*scale_fac)))
if self.action == 'forward':
arrow_surf = pygame.transform.rotate(arrow_surf, 90)
elif self.action == 'left':
arrow_surf = pygame.transform.rotate(arrow_surf, 180)
if self.action in {'left', 'forward', 'right'}:
cmd_surf.blit(arrow_surf, (0.05*scale_fac,0.05*scale_fac,0.95*scale_fac,0.95*scale_fac))
elif self.action == 'P0':
cmd_surf.blit(P0_text, (0.05*scale_fac,0.05*scale_fac,0.95*scale_fac,0.95*scale_fac))
return cmd_surf
class Programmer:
def __init__(self, prg):
self.prg = prg
self.available_inputs = [Command('forward'), Command('left'), Command('right'), Command('P0'),
Command('-', color=RED), Command('-', color=BLUE), Command('-', color=WHITE)]
self.command_to_edit = 0
self.screen_rect = None
def render(self, scale_fac):
"""Render surface with possible inputs for the robot.
:return: surface of the input commands
"""
inp_surf = pygame.Surface((len(self.available_inputs) * scale_fac, 1 * scale_fac))
for i, inp in enumerate(self.available_inputs):
cmd_surf = inp.render(scale_fac)
inp_surf.blit(cmd_surf, (i * scale_fac, 0, scale_fac, scale_fac))
return inp_surf
def update_selected_command(self, pos):
print(f"clicked at pos = {pos}")
xoffset = pos[0] - self.screen_rect.x
selected_input_index = xoffset * len(self.available_inputs) // self.screen_rect.width
selected_input = self.available_inputs[selected_input_index]
edited_command = self.prg.cmds[self.command_to_edit]
print("command before edit = ", edited_command)
if selected_input.action != '-':
edited_command.action = selected_input.action
else:
edited_command.color = selected_input.color
print("command after edit = ", edited_command)
class Program:
def __init__(self, robot, board, cmds):
self.cmds = cmds
self.robot = robot
self.board = board
self.prg_counter = 0
self.screen_rect = None
def step(self):
cmd = self.cmds[self.prg_counter]
self.prg_counter += 1
# current position
x = self.robot.x
y = self.robot.y
# current tile the robot is on
tile = self.board.tiles[y, x]
# apply next instruction of the program
if np.all(cmd.color == WHITE) or np.all(cmd.color == tile['color']):
# matching color -> execute command
if cmd.action == 'forward':
ynew, xnew = self.robot.get_forward_coordinates()
self.robot.x = xnew
self.robot.y = ynew
elif cmd.action in {'left', 'right'}:
self.robot.orientation = Robot.resulting_orientation[self.robot.orientation][cmd.action]
elif cmd.action == 'P0':
self.prg_counter = 0
else:
print("color not matching -> skipping command")
# get all events
ev = pygame.event.get()
# proceed events
for event in ev:
# handle MOUSEBUTTONUP
if event.type == pygame.MOUSEBUTTONUP:
pos = pygame.mouse.get_pos()
if pos[0] >= 325 and pos[0] <= 400 and pos[1] >= 600 and pos[1] <= 650:
print(f"clicked at pos = {pos}")
self.state = 'input'
# update state for new robot position
if (not (0 <= self.robot.x < self.board.tiles.shape[1])) or not (0 <= self.robot.y < self.board.tiles.shape[0]):
# robot leaves the board -> GAME OVER
print("GAME OVER")
return 'game_over'
# robot collects star on new tile
tile = self.board.tiles[self.robot.y, self.robot.x]
if tile['star']:
tile['star'] = False
if all([not t['star'] for t in self.board.tiles.flatten()]):
print("YOU WON")
return 'won'
# by default we continue in the running state
return 'running'
def render(self, scale_fac, prg_counter_override=None):
"""Render the current program. This will render all commands and highlight the next command to execute
(determined by self.prg_counter).
The prg_counter_override can be used to highlight a different command instead. This is used during input mode
to highlight the command the user will edit.
:param scale_fac:
:param prg_counter_override:
:return:
"""
prg_counter = self.prg_counter
if prg_counter_override is not None:
prg_counter = prg_counter_override
prg_surf = pygame.Surface((5 * scale_fac, 1 * scale_fac))
for i in range(5):
if i < len(self.cmds):
cmd = self.cmds[i]
cmd_surf = cmd.render(scale_fac)
else:
cmd_surf = pygame.Surface((scale_fac,scale_fac))
cmd_surf.fill(WHITE)
if prg_counter is not None and i == prg_counter:
pygame.draw.rect(cmd_surf, tuple(GREEN), (0, 0, scale_fac, scale_fac), 5)
prg_surf.blit(cmd_surf, (i * scale_fac, 0, scale_fac, scale_fac))
return prg_surf
def get_clicked_command(self, pos):
print(f"clicked at pos = {pos}")
xoffset = pos[0] - self.screen_rect.x
clicked_cmd = xoffset * len(self.cmds) // self.screen_rect.width
print("clicked command = ", clicked_cmd)
return clicked_cmd
class Game:
def __init__(self, dimx, dimy, robotx, roboty):
self.robot = Robot(x=robotx, y=roboty, orientation='v')
self.board = Board(dimx, dimy)
self.board.tiles[3,3]['star'] = True
self.board.tiles[3,2]['star'] = True
# TODO fix number of commands at 5
self.cmds = [Command('forward'), Command('left', color=RED), Command('left', color=BLUE), Command('P0'), Command('-')]
self.state = 'input'
self.prg = Program(self.robot, self.board, self.cmds)
self.programmer = Programmer(self.prg)
#self.scale_fac = 180
self.scale_fac = 125
self.beamer_mode = False
self.screen = pygame.display.set_mode((int(self.board.tiles.shape[1] * self.scale_fac * 1.1),
int((self.board.tiles.shape[0] + 2) * self.scale_fac * 1.2)))
self.game_over_text = myfont.render('GAME OVER', False, RED)
self.won_text = myfont.render('YOU WON', False, GREEN)
self.run_text = myfont.render('RUN', False, tuple(BLACK))
self.stop_text = myfont_small.render('STOP', False, tuple(BLACK))
# save initial state
self.initial_pos = (self.robot.x, self.robot.y, self.robot.orientation)
self.inital_board_tiles = self.board.tiles.copy()
def render(self):
"""Render the game screen.
This will render the board and the robot.
Depending on the display mode it will also render the current program and the input commands for the robot.
:param prg_counter:
:return:
"""
if self.beamer_mode:
dx = 0
dy = 0
else:
dx = int(0.05 * self.screen.get_width())
dy = int(0.05 * self.screen.get_height())
self.screen.fill(tuple(BLACK))
# render the board
board_surf = self.board.render(self.scale_fac)
# render robot onto the board surface
robot_surf = self.robot.render(self.scale_fac)
board_surf.blit(robot_surf, (self.robot.x * self.scale_fac, self.robot.y * self.scale_fac, self.scale_fac, self.scale_fac))
self.screen.blit(board_surf, (dx, dy, dx + self.board.tiles.shape[1] * self.scale_fac, dy + self.board.tiles.shape[0] * self.scale_fac))
if not self.beamer_mode:
# if we are not in beamer mode we also show the current program / inputs
# render program
if self.state == 'input':
# in input mode we highlight the command which is selected for edit
prg_surf = self.prg.render(self.scale_fac, prg_counter_override=self.programmer.command_to_edit)
else:
# in other modes we render the current program counter
prg_surf = self.prg.render(self.scale_fac)
prg_surf = pygame.transform.scale(prg_surf, (self.screen.get_width() * 2 // 3, self.scale_fac * 2 // 3))
self.prg.screen_rect = pygame.Rect((dx, board_surf.get_height() + 2 * dy, prg_surf.get_width(), prg_surf.get_height()))
self.screen.blit(prg_surf, self.prg.screen_rect)
# render input fields and buttons
inp_surf = self.programmer.render(self.scale_fac)
inp_surf = pygame.transform.scale(inp_surf, (self.screen.get_width() * 2 // 3, self.scale_fac * 2 // 3))
self.programmer.screen_rect = pygame.Rect((dx, board_surf.get_height() + prg_surf.get_height() + 3 * dy, inp_surf.get_width(), inp_surf.get_height()))
self.screen.blit(inp_surf, self.programmer.screen_rect)
btn_surf = pygame.Surface((3 * self.scale_fac//2, self.scale_fac))
self.btn_rect = pygame.Rect((2 * dx + prg_surf.get_width(), board_surf.get_height() + 2 * dy,
btn_surf.get_height(), btn_surf.get_width()))
if self.state == 'input':
btn_surf.fill(tuple(GREEN))
btn_surf.blit(self.run_text, (0, 10))
elif self.state == 'running':
btn_surf.fill(tuple(RED))
btn_surf.blit(self.stop_text, (0, 10))
self.screen.blit(btn_surf, self.btn_rect)
# render messages
if self.state == 'game_over':
self.screen.blit(self.game_over_text, (50, 00))
pygame.display.update()
pygame.time.wait(1500)
self.state = 'reset'
elif self.state == 'won':
self.screen.blit(self.won_text, (50, 00))
pygame.display.update()
pygame.time.wait(1500)
self.state = 'reset'
pygame.display.flip()
def process_inputs(self):
# proceed events
for event in pygame.event.get():
if event.type == pygame.QUIT:
self.state = 'quit'
# handle MOUSEBUTTONUP
elif event.type == pygame.MOUSEBUTTONUP:
pos = pygame.mouse.get_pos()
# select command to edit by the programmer
if self.prg.screen_rect is not None:
if self.prg.screen_rect.collidepoint(pos):
print(f"clicked at pos = {pos}")
self.programmer.command_to_edit = self.prg.get_clicked_command(pos)
# set the selected command to a new value
if self.programmer.screen_rect is not None:
if self.programmer.screen_rect.collidepoint(pos):
self.programmer.update_selected_command(pos)
# clicked RUN/STOP button
if self.btn_rect is not None and self.btn_rect.collidepoint(*pos):
print(f"clicked at pos = {pos}")
if self.state == 'running':
self.state = 'input'
elif self.state == 'input':
self.state = 'running'
elif event.type == pygame.KEYUP:
if event.key == pygame.K_x:
if not self.beamer_mode:
# switch to beamer mode
os.environ['SDL_VIDEO_WINDOW_POS'] = '1920, 280'
self.scale_fac = 180
self.screen = pygame.display.set_mode((self.board.tiles.shape[1] * self.scale_fac,
self.board.tiles.shape[0] * self.scale_fac),
pygame.NOFRAME)
self.beamer_mode = True
else:
# switch to normal mode
os.environ['SDL_VIDEO_WINDOW_POS'] = '0, 0'
self.scale_fac = 125
self.screen = pygame.display.set_mode((self.board.tiles.shape[1] * self.scale_fac,
self.board.tiles.shape[0] * self.scale_fac + 5 * self.scale_fac))
self.beamer_mode = False
elif event.key == pygame.K_r:
# run program
self.state = 'running'
elif event.key == pygame.K_s:
self.state = 'manual'
self.prg.step()
return self.state
def reset(self):
self.prg.prg_counter = 0
self.robot.x = self.initial_pos[0]
self.robot.y = self.initial_pos[1]
self.robot.orientation = self.initial_pos[2]
self.board.tiles = self.inital_board_tiles.copy()
return 'input'
def run(self):
running = True
while running:
self.state = self.process_inputs()
if self.state == 'input':
pass
elif self.state == 'running':
self.state = self.prg.step()
elif self.state == 'reset':
self.state = self.reset()
elif self.state == 'quit':
running = False
elif self.state == 'manual':
pass
else:
print("unknown state")
return
self.render()
pygame.time.wait(100)
if __name__ == "__main__":
random.seed(42)
game = Game(dimx=7, dimy=4, robotx=3, roboty=1)
game.run()
# TODOs
# - in stepping mode (s) it is possible to drive outside of the map
# - when no P0 command is present the program counter will overflow

View File

@ -141,6 +141,7 @@ class ArucoEstimator:
{'name': 'Grid rows', 'type': 'int', 'value': self.grid_rows, 'tip': "Number of rows for the grid"},
{'name': 'Display mode', 'type': 'list', 'values': ['color', 'grayscale'], 'value': 'color', 'tip': "Display mode for the video"},
{'name': 'Autoexposure', 'type': 'bool', 'value': True},
{'name': 'Controlled robot', 'type': 'list', 'values': self.robot_marker_ids, 'tip': 'Robot to control'},
RobotMarkerGroup(name="Robot markers", children=robot_marker_group),
CornerMarkerGroup(name="Corner markers", children=corner_marker_group),
]
@ -156,6 +157,8 @@ class ArucoEstimator:
self.params.param('Robot marker size').sigValueChanged.connect(lambda _, v: self.__setattr__('robot_marker_size', v))
self.params.param('Show FPS').sigValueChanged.connect(lambda _, v: self.fps_overlay.show() if v else self.fps_overlay.hide())
self.params.param('Autoexposure').sigValueChanged.connect(lambda _, v: self.set_autoexposure(v))
self.params.param('Controlled robot').sigValueChanged.connect(lambda _, v: self.event_queue.put(('controlled_robot',
{'robot_id': v})))
self.paramtree = ParameterTree()
self.paramtree.setParameters(self.params, showTop=False)

View File

@ -143,7 +143,7 @@ class OpenLoopSolver:
def solve(self, x0, target):
target = np.asarray(target)
angles_unwrapped = np.unwrap([x0[2], target[2]]) # unwrap angle to avoid jump in data
x0[2] = angles_unwrapped[0]
target[2] = angles_unwrapped[1]

View File

@ -2,25 +2,12 @@ import sys
import socket
import threading
import time
import json
from robot import Robot, ControlledRobot
from mpc_controller import MPCController
# HOST, PORT = "localhost", 42424
#
# robot_id = 12
#
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# sock.connect((HOST, PORT))
# sock.sendall(f"{robot_id}\n".encode()) # request data for robot with given id
# #sock.sendall(f"events\n".encode()) # request events
# receiving = True
# while receiving:
# received = str(sock.recv(1024), "utf-8")
# print("Received: {}".format(received))
# receiving = len(received) > 0
class RemoteController:
def __init__(self):
@ -28,35 +15,84 @@ class RemoteController:
#self.robots = [Robot(11, '192.168.1.11')] #, Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
#self.robots = [Robot(12, '10.10.11.91'), Robot(13, '10.10.11.90'), Robot(14, '10.10.11.89')]
#self.robots = [Robot(14, '10.10.11.89')]
self.robots = [ControlledRobot(13, '192.168.1.13')]
self.robots = [ControlledRobot(13, '192.168.1.13'), ControlledRobot(12, '192.168.1.12')]
self.robot_ids = {}
for r in self.robots:
self.robot_ids[r.id] = r
# socket for movement commands
self.comm_socket = socket.socket()
self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.controlled_robot = self.robots[0].id
for robot in self.robots:
robot.connect()
self.comm_socket.bind(('', 1337))
self.comm_socket.listen(5)
# thread for handling events received by measurement server (from GUI)
self.event_thread = threading.Thread(target=self.handle_events)
self.event_thread.daemon = True
# connect to socket for events from GUI
try:
HOST, PORT = "localhost", 42424
self.event_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.event_socket.connect((HOST, PORT))
self.event_socket.sendall("events\n".encode())
self.event_socket.settimeout(0.1)
# check if we receive data from the measurement server
response = self.event_socket.recv(1024)
if 'error' not in str(response):
print("... connected! -> start listening for measurements")
self.event_socket.settimeout(None)
# if so we start the measurement thread
self.event_thread.start()
else:
print(f"error: cannot communicate with the measurement server.\n The response was: {response}")
except socket.timeout:
print(f"error: the measurement server did not respond with data.")
#sock.sendall(f"{robot_id}\n".encode()) # request data for robot with given id
# sock.sendall(f"events\n".encode()) # request events
# receiving = True
# while receiving:
# received = str(sock.recv(1024), "utf-8")
# print("Received: {}".format(received))
# receiving = len(received) > 0
self.t = time.time()
# start thread for marker position detection
self.controller = MPCController()
self.robots[0].attach_controller(self.controller)
self.controllers = []
for r in self.robots:
c = MPCController()
self.controllers.append(c)
r.attach_controller(c)
def handle_events(self):
receiving = True
while receiving:
received = str(self.event_socket.recv(1024), "utf-8")
if len(received) > 0:
last_received = received.split('\n')[-2]
event_type, payload = json.loads(last_received)
print(event_type, payload)
if event_type == 'click':
target_pos = (payload['x'], payload['y'], payload['angle'])
self.robot_ids[self.controlled_robot].move_to_pos(target_pos=target_pos)
elif event_type == 'controlled_robot':
self.controlled_robot = payload['robot_id']
else:
receiving = False
print(f"measurement server stopped sending event data")
def run(self):
#print("waiting until all markers are detected...")
#while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
# pass
print("starting control")
#self.controller.interactive_control(robots=self.robots)
self.controller.start()
for c in self.controllers:
c.start()
pass
@ -70,10 +106,9 @@ if __name__ == '__main__':
# to use the program, first start measurement_server.py, which launches the Aruco marker detection GUI and streams
# marker positions and GUI events via socket
# next, launch this program (mpc_test.py) in debug mode and set a breakpoint after self.controller.start()
# you can now set the target position for the controlled robot via move_to_pos(), e.g.
# self.robots[0].move_to_pos([0.0,0,0], True)
# next, launch this program (mpc_test.py)
# you can now direct the robots via the GUI (assuming four corner markers for position reference have been detected)
# click any position on the screen and the current robot will drive there
# you can also switch which robot should be controlled in the GUI
# TODO
# figure out how to get clicked positions from measurement server and drive robot there
# also document and improve program structure
# TODO document and improve program structure

View File

@ -95,10 +95,10 @@ class Robot:
return self.t_last_measurement, self.x, self.y, self.angle
def __str__(self):
connection_state = '' if self.connected else 'not'
connection_state = '' if self.connected else ' not'
state = self.get_measurement()
last_measurement = f'last measurement = {state}' if None not in state else 'no measurements available'
return f"Robot {self.id}: ip = {self.ip}:{self.port} ({connection_state} connected) " + last_measurement
return f"Robot {self.id}: ip = {self.ip}:{self.port} ({connection_state} connected ) " + last_measurement
def __repr__(self):
return str(self)