Compare commits

...

52 Commits

Author SHA1 Message Date
e910041c95 added possibility to load level file 2021-09-11 01:44:57 +02:00
65f195fecf implemented level generator 2021-09-11 01:44:03 +02:00
794c987899 implemented option to switch between randomly generated levels 2021-09-10 23:30:15 +02:00
e374198fef improved layout of website 2021-09-10 23:29:20 +02:00
0956f6d0fe show grayscale image by default 2021-09-10 22:04:06 +02:00
3d64e2e834 show current program in beamer mode + general improvements; stable version 2021-09-10 16:07:46 +02:00
e669b10ebc connected game to web server 2021-09-10 13:46:24 +02:00
ca8e5aac9b allow further distance from target position 2021-09-10 01:23:14 +02:00
6aef0660a3 empty response queue to avoid desync 2021-09-10 01:22:44 +02:00
885a516fe3 implemented won text and set consistent seed 2021-09-10 01:22:17 +02:00
ca1fe99ff2 stable version of the game 2021-09-10 01:04:17 +02:00
a69d45f97e make it possible to get positions outside of grid rect 2021-09-10 00:44:16 +02:00
eea1cee1ec use mpc controller with short horizon 2021-09-10 00:43:02 +02:00
1c8d4a1d57 send heartbeat while waiting for control to finish 2021-09-10 00:42:31 +02:00
029068b259 added some output 2021-09-09 21:36:38 +02:00
7962199c3a reset time when stopping controller 2021-09-09 21:36:13 +02:00
f3babdcf0a cleaner exit in case of errors 2021-09-09 21:34:54 +02:00
019c4590aa implemented communication interface with real robots 2021-09-07 22:35:02 +02:00
3082eebc8d implemented blocking events to make it possible to wait for the robot to reach the desired target position 2021-09-07 22:31:28 +02:00
2c54e56f95 also enable computation of position outside of marker rectangle and for arbitrary grid sizes 2021-09-07 22:29:27 +02:00
1db24bc573 changed some defaults 2021-09-07 00:24:05 +02:00
c65dcce176 added grid move to measurement server 2021-09-07 00:23:36 +02:00
2dc3eef2d6 adjusted key events to Qt and changed robot ips 2021-09-07 00:21:51 +02:00
bbb341ea56 improved stepping mode 2021-09-06 09:00:00 +02:00
4c3c3f973e implemented stepping mode 2021-09-06 00:34:26 +02:00
57cd60de8c use KEYUP instead of KEYDOWN because otherwise switching from beamer mode does not work 2021-09-06 00:22:58 +02:00
8c544f8fcc implemented quitting the program 2021-09-06 00:05:52 +02:00
ff6caa457f missing copy to restore initial game state 2021-09-05 23:42:08 +02:00
9eea9bd245 refactored game into Game class for better maintainability 2021-09-05 23:37:47 +02:00
945833c8ac implemented state machine and option to switch display mode for second screen 2021-09-05 20:43:22 +02:00
e931085ca8 implemented basic input 2021-09-03 22:41:35 +02:00
439cf44d70 implemented core game logic and display 2021-09-03 21:38:30 +02:00
c5f3f3babb implemented control of multiple robots through GUI using measurement server 2021-08-31 00:05:16 +02:00
d0c17c0b91 got mpc controller running again 2021-08-25 22:53:09 +02:00
b65b6568d0 run GUI in main thread and measurement server in secondary thread 2021-08-25 21:59:24 +02:00
2607e94958 enable auto-exposure by default, some cleanup 2021-08-25 20:39:39 +02:00
f9f4a2c1c6 added qt gui with changeable parameters 2020-11-28 19:57:22 +01:00
9c1115d505 use qt gui instead of opencv 2020-11-25 21:19:56 +01:00
f26f958dc7 fixed errors in pid and mpc controller 2020-11-22 15:57:53 +01:00
0fddd75393 make parameters of mpc controller changeable 2020-11-18 21:54:23 +01:00
c2307261c5 second robot controlled by mpc controller 2020-11-18 21:53:32 +01:00
2f081faee8 restructured controller to work with mpc 2020-11-18 21:38:41 +01:00
17637f427b initial commit for control commander 2020-11-17 21:24:55 +01:00
edc3e8d290 some cleanup 2020-11-17 21:23:22 +01:00
35b6c3bdfd rewrote pid controller to derive from ControllerBase. works with multiple robots now 2020-11-14 16:41:16 +01:00
2060d8eb15 fixed problem with slow event server 2020-11-14 16:40:33 +01:00
708284749d fps counter 2020-11-14 16:39:53 +01:00
a3af40b001 implemented control from multiple robots 2020-11-14 16:06:57 +01:00
47f652fd62 about to make changes s.t. PID controller derives from ControllerBase class 2020-11-14 15:22:41 +01:00
6b9373cce5 enabled better mouse interaction 2020-11-14 15:01:01 +01:00
eff5474ac2 switched to using TCP sockets for measurement and event communication 2020-11-11 21:33:48 +01:00
8fc5ad9868 measurment client and server example 2020-11-09 22:32:27 +01:00
27 changed files with 2833 additions and 593 deletions

View File

@ -0,0 +1,27 @@
import socket
import json
HOST, PORT = "localhost", 42424
def move_grid(x, y, orientation, dimx, dimy):
print("move grid")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((HOST, PORT))
payload = json.dumps({"x": x, "y": y, "dimx": dimx, "dimy": dimy, "orientation": orientation})
sock.sendall(f"move_grid_blocking;{payload}\n".encode())
target_reached = False
try:
while not target_reached:
reply = sock.recv(1024)
if reply == b'':
raise RuntimeError
#print("got reply: ", reply)
target_reached = reply.decode() == 'ack\n'
print("target_reached = ", target_reached)
except RuntimeError:
# try again
print("controlling failed -> trying again")
move_grid(x, y, orientation, dimx, dimy)
if __name__ == "__main__":
move_grid(3,2,'v', 7, 4)

View File

@ -0,0 +1,542 @@
import numpy as np
import pygame
import os
import threading
from webapp import app
from event_server_comm import move_grid
BLACK = np.array([0, 0, 0], dtype=np.uint8)
WHITE = np.array([255, 255, 255], dtype=np.uint8)
GRAY = np.array([200, 200, 200], dtype=np.uint8)
RED = np.array([255, 0, 0], dtype=np.uint8)
BLUE = np.array([0, 0, 255], dtype=np.uint8)
YELLOW = np.array([255, 255, 0], dtype=np.uint8)
GREEN = np.array([0, 255, 0], dtype=np.uint8)
pygame.init()
pygame.font.init() # you have to call this at the start,
# if you want to use this module.
myfont = pygame.font.SysFont('Comic Sans MS', 55)
myfont_small = pygame.font.SysFont('Comic Sans MS', 45)
P0_text = myfont.render('P0', False, tuple(BLACK))
tiledt = np.dtype([('color', np.uint8, 3), ('star', np.bool)])
class Board:
valid_colors = [GRAY, RED, BLUE]
def __init__(self, dim_x, dim_y, n_coins=2, file=None):
if file is None:
self.tiles = np.zeros((dim_y, dim_x), dtype=tiledt)
for x in range(dim_x):
for y in range(dim_y):
self.tiles[y, x]['color'] = Board.valid_colors[np.random.randint(len(Board.valid_colors))]
coins_distributed = n_coins == 0
while not coins_distributed:
coinx = np.random.randint(0, dim_x)
coiny = np.random.randint(0, dim_y)
self.tiles[coiny,coinx]['star'] = True
coins_distributed = sum([t['star'] for t in self.tiles.flatten()]) == n_coins
else:
self.tiles = np.load(file)
def render(self, scale_fac):
dimy, dimx = self.tiles.shape
board_surf = pygame.Surface((dimx * scale_fac, dimy * scale_fac))
star_surf = pygame.Surface((scale_fac, scale_fac), pygame.SRCALPHA)
pygame.draw.circle(star_surf, YELLOW, (int(0.5 * scale_fac), int(0.5 * scale_fac)), int(0.25 * scale_fac))
for y in range(self.tiles.shape[0]):
for x in range(self.tiles.shape[1]):
pygame.draw.rect(board_surf, tuple(self.tiles[y, x]['color']),
(x * scale_fac, y * scale_fac, scale_fac, scale_fac), 0)
pygame.draw.rect(board_surf, (0, 0, 0),
(x * scale_fac, y * scale_fac, scale_fac, scale_fac), 1)
if self.tiles[y, x]['star']:
board_surf.blit(star_surf, (x * scale_fac, y * scale_fac, scale_fac, scale_fac))
return board_surf
def get_xdims(self):
return self.tiles.shape[1]
def get_ydims(self):
return self.tiles.shape[0]
# def __repr__(self):
# s = ''
# for y in range(self.tiles.shape[0]):
# for x in range(self.tiles.shape[1]):
# if (x,y) == (self.robot.x, self.robot.y):
# s += self.robot.orientation
# else:
# s += '.'
# s += '\n'
# return s
class Robot:
orientations = ['^', 'left', 'down', 'right']
resulting_orientation = {
'^': {'left': '<', 'right': '>'},
'>': {'left': '^', 'right': 'v'},
'v': {'left': '>', 'right': '<'},
'<': {'left': 'v', 'right': '^'},
}
def __init__(self, x, y, orientation, use_real_robot=False):
self.x = x
self.y = y
self.orientation = orientation
self.position_changed = False
self.use_real_robot = use_real_robot
def get_forward_coordinates(self):
# get the coordinates of the neighboring tile in the given direction
if self.orientation == '^':
return self.y - 1, self.x
elif self.orientation == '>':
return self.y, self.x + 1
elif self.orientation == 'v':
return self.y + 1, self.x
elif self.orientation == '<':
return self.y, self.x - 1
else:
raise Exception("error: undefined direction")
def get_angle(self):
angle = {'>': 0, '^': np.pi/2, '<': np.pi, 'v': 3*np.pi/2}[self.orientation]
return np.rad2deg(angle)
def render(self, scale_fac):
robot_surf = pygame.Surface((scale_fac, scale_fac), pygame.SRCALPHA)
pygame.draw.lines(robot_surf, (0, 0, 0), True, [(0.75 * scale_fac, 0.5 * scale_fac),
(0.25 * scale_fac, 0.25 * scale_fac),
(0.25 * scale_fac, 0.75 * scale_fac)], 3)
robot_surf = pygame.transform.rotate(robot_surf, self.get_angle())
return robot_surf
def update_pos(self, dimx, dimy):
if self.use_real_robot:
move_grid(self.x, self.y, self.orientation, dimx, dimy)
self.position_changed = False
def __repr__(self):
return f"({self.y}, {self.x}) - {self.orientation}"
class Command:
valid_actions = {'forward', 'left', 'right', 'P0', '-'}
def __init__(self, action=None, color=GRAY):
if not (action in Command.valid_actions and any([np.all(color == c) for c in Board.valid_colors])):
raise ValueError("invalid values for command")
self.action = action
self.color = color
def __repr__(self):
return f"{self.action}: {self.color}"
def render(self, scale_fac):
cmd_surf = pygame.Surface((scale_fac, scale_fac))
cmd_surf.fill(tuple(self.color))
arrow_surf = pygame.Surface((300, 300), pygame.SRCALPHA)
pygame.draw.polygon(arrow_surf, (0, 0, 0),
((0, 100), (0, 200), (200, 200), (200, 300), (300, 150), (200, 0), (200, 100)))
arrow_surf = pygame.transform.scale(arrow_surf, (int(0.9*scale_fac), int(0.9*scale_fac)))
if self.action == 'forward':
arrow_surf = pygame.transform.rotate(arrow_surf, 90)
elif self.action == 'left':
arrow_surf = pygame.transform.rotate(arrow_surf, 180)
if self.action in {'left', 'forward', 'right'}:
cmd_surf.blit(arrow_surf, (0.05*scale_fac,0.05*scale_fac,0.95*scale_fac,0.95*scale_fac))
elif self.action == 'P0':
cmd_surf.blit(P0_text, (0.05*scale_fac,0.05*scale_fac,0.95*scale_fac,0.95*scale_fac))
return cmd_surf
class Programmer:
def __init__(self, prg):
self.prg = prg
self.available_inputs = [Command('forward'), Command('left'), Command('right'), Command('P0'),
Command('-', color=RED), Command('-', color=BLUE), Command('-', color=GRAY)]
self.command_to_edit = 0
self.screen_rect = None
def render(self, scale_fac):
"""Render surface with possible inputs for the robot.
:return: surface of the input commands
"""
inp_surf = pygame.Surface((len(self.available_inputs) * scale_fac, 1 * scale_fac))
for i, inp in enumerate(self.available_inputs):
cmd_surf = inp.render(scale_fac)
inp_surf.blit(cmd_surf, (i * scale_fac, 0, scale_fac, scale_fac))
return inp_surf
def update_selected_command(self, pos):
print(f"clicked at pos = {pos}")
xoffset = pos[0] - self.screen_rect.x
selected_input_index = xoffset * len(self.available_inputs) // self.screen_rect.width
selected_input = self.available_inputs[selected_input_index]
edited_command = self.prg.cmds[self.command_to_edit]
print("command before edit = ", edited_command)
if selected_input.action != '-':
edited_command.action = selected_input.action
else:
edited_command.color = selected_input.color
print("command after edit = ", edited_command)
class Program:
def __init__(self, robot, board, cmds):
self.cmds = cmds
self.robot = robot
self.board = board
self.prg_counter = 0
self.screen_rect = None
def step(self, state='running', check_victory=True):
if self.prg_counter >= len(self.cmds):
return 'game_over'
cmd = self.cmds[self.prg_counter]
self.prg_counter += 1
# current position
x = self.robot.x
y = self.robot.y
# current tile the robot is on
tile = self.board.tiles[y, x]
# apply next instruction of the program
if np.all(cmd.color == GRAY) or np.all(cmd.color == tile['color']):
# matching color -> execute command
if cmd.action == 'forward':
ynew, xnew = self.robot.get_forward_coordinates()
self.robot.x = xnew
self.robot.y = ynew
self.robot.position_changed = True
#self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
elif cmd.action in {'left', 'right'}:
self.robot.orientation = Robot.resulting_orientation[self.robot.orientation][cmd.action]
#self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
self.robot.position_changed = True
elif cmd.action == 'P0':
self.prg_counter = 0
else:
#print("color not matching -> skipping command")
pass
# update state for new robot position
if (not (0 <= self.robot.x < self.board.tiles.shape[1])) or not (0 <= self.robot.y < self.board.tiles.shape[0]):
# robot leaves the board -> GAME OVER
print("GAME OVER")
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
return 'game_over'
# robot collects star on new tile
tile = self.board.tiles[self.robot.y, self.robot.x]
if tile['star']:
tile['star'] = False
if check_victory and all([not t['star'] for t in self.board.tiles.flatten()]):
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
print("YOU WON")
return 'won'
# by default we continue in the running state
return state
def render(self, scale_fac, prg_counter_override=None):
"""Render the current program. This will render all commands and highlight the next command to execute
(determined by self.prg_counter).
The prg_counter_override can be used to highlight a different command instead. This is used during input mode
to highlight the command the user will edit.
:param scale_fac:
:param prg_counter_override:
:return:
"""
prg_counter = self.prg_counter
if prg_counter_override is not None:
prg_counter = prg_counter_override
prg_surf = pygame.Surface((5 * scale_fac, 1 * scale_fac))
for i in range(5):
if i < len(self.cmds):
cmd = self.cmds[i]
cmd_surf = cmd.render(scale_fac)
else:
cmd_surf = pygame.Surface((scale_fac,scale_fac))
cmd_surf.fill(GRAY)
if prg_counter is not None and i == prg_counter:
pygame.draw.rect(cmd_surf, tuple(GREEN), (0, 0, scale_fac, scale_fac), 5)
prg_surf.blit(cmd_surf, (i * scale_fac, 0, scale_fac, scale_fac))
return prg_surf
def get_clicked_command(self, pos):
print(f"clicked at pos = {pos}")
xoffset = pos[0] - self.screen_rect.x
clicked_cmd = xoffset * len(self.cmds) // self.screen_rect.width
print("clicked command = ", clicked_cmd)
return clicked_cmd
class Game:
def __init__(self, dimx, dimy, robotx, roboty, orientation, use_real_robot=False):
self.robot = Robot(x=robotx, y=roboty, orientation=orientation, use_real_robot=use_real_robot)
#self.board = Board(dimx, dimy)
self.board = Board(dimx, dimy, file='levels/56.npy')
# TODO fix number of commands at 5
self.cmds = [Command('forward'), Command('left', color=RED), Command('left', color=BLUE), Command('P0'), Command('-')]
self.state = 'reset'
self.prg = Program(self.robot, self.board, self.cmds)
self.programmer = Programmer(self.prg)
#self.scale_fac = 180
self.scale_fac = 125
self.beamer_mode = False
self.screen = pygame.display.set_mode((int(self.board.tiles.shape[1] * self.scale_fac * 1.1),
int((self.board.tiles.shape[0] + 2) * self.scale_fac * 1.2)))
self.game_over_text = myfont.render('GAME OVER', False, WHITE)
self.won_text = myfont.render('YOU WON', False, BLACK)
self.run_text = myfont.render('RUN', False, tuple(BLACK))
self.stop_text = myfont_small.render('STOP', False, tuple(BLACK))
self.step_text = myfont_small.render('STEP', False, tuple(BLACK))
self.prg_text = myfont_small.render('CURRENT PROGRAM', False, tuple(GREEN))
# save initial state
self.initial_pos = (self.robot.x, self.robot.y, self.robot.orientation)
self.initial_board_tiles = self.board.tiles.copy()
def render(self):
"""Render the game screen.
This will render the board and the robot.
Depending on the display mode it will also render the current program and the input commands for the robot.
:param prg_counter:
:return:
"""
if self.beamer_mode:
dx = self.xoffset
dy = 0
else:
dx = int(0.05 * self.screen.get_width())
dy = int(0.05 * self.screen.get_height())
self.screen.fill(tuple(BLACK))
# render the board
board_surf = self.board.render(self.scale_fac)
# render robot onto the board surface
robot_surf = self.robot.render(self.scale_fac)
board_surf.blit(robot_surf, (self.robot.x * self.scale_fac, self.robot.y * self.scale_fac, self.scale_fac, self.scale_fac))
self.screen.blit(board_surf, (dx, dy, dx + self.board.tiles.shape[1] * self.scale_fac, dy + self.board.tiles.shape[0] * self.scale_fac))
# render program
if self.state == 'input':
# in input mode we highlight the command which is selected for edit
prg_surf = self.prg.render(self.scale_fac, prg_counter_override=self.programmer.command_to_edit)
else:
# in other modes we render the current program counter
prg_surf = self.prg.render(self.scale_fac)
prg_surf = pygame.transform.scale(prg_surf, (self.screen.get_width() * 2 // 3, self.scale_fac * 2 // 3))
self.prg.screen_rect = pygame.Rect(
(dx, board_surf.get_height() + 2 * dy, prg_surf.get_width(), prg_surf.get_height()))
# render input fields and buttons
inp_surf = self.programmer.render(self.scale_fac)
inp_surf = pygame.transform.scale(inp_surf, (self.screen.get_width() * 2 // 3, self.scale_fac * 2 // 3))
self.programmer.screen_rect = pygame.Rect(
(dx, board_surf.get_height() + prg_surf.get_height() + 3 * dy, inp_surf.get_width(), inp_surf.get_height()))
btn_surf = pygame.Surface((3 * self.scale_fac // 2, self.scale_fac))
self.btn_rect = pygame.Rect((2 * dx + prg_surf.get_width(), board_surf.get_height() + 2 * dy,
btn_surf.get_height(), btn_surf.get_width()))
if self.state == 'input':
btn_surf.fill(tuple(GREEN))
btn_surf.blit(self.run_text, (0, 10))
elif self.state == 'running':
btn_surf.fill(tuple(RED))
btn_surf.blit(self.stop_text, (0, 10))
elif self.state == 'stepping':
btn_surf.fill(tuple(YELLOW))
btn_surf.blit(self.step_text, (0, 10))
if not self.beamer_mode:
# if we are not in beamer mode we render program and inputs below the board
self.screen.blit(prg_surf, self.prg.screen_rect)
self.screen.blit(inp_surf, self.programmer.screen_rect)
self.screen.blit(btn_surf, self.btn_rect)
else:
prg_surf = pygame.transform.scale(prg_surf, (dx, dx//5))
# in beamer mode we render the program to the left of the board to appear on the laptop
self.screen.blit(prg_surf, (0,100))
prg_descr_surb = pygame.Surface((500, 100))
#prg_descr_surb.blit(self.prg_text)
self.screen.blit(self.prg_text, (50, 50))
mode_text = myfont_small.render(f'STATE: {self.state}', False, tuple(GREEN))
self.screen.blit(mode_text, (50, dx//5 + 350))
# render messages
if self.state == 'game_over':
game_over_surf = pygame.Surface(((self.screen.get_width() - dx) // 2, self.screen.get_height() // 2))
game_over_surf.fill(tuple(BLACK))
game_over_surf.blit(self.game_over_text, ((game_over_surf.get_width() - self.game_over_text.get_width()) // 2,
(game_over_surf.get_height() - self.game_over_text.get_height()) // 2))
self.screen.blit(game_over_surf, (dx + (self.screen.get_width() - dx) // 4, self.screen.get_height() // 4))
pygame.display.update()
pygame.time.wait(1500)
self.state = 'reset'
elif self.state == 'won':
won_surf = pygame.Surface(((self.screen.get_width() - dx) // 2, self.screen.get_height() // 2))
won_surf.fill(tuple(WHITE))
won_surf.blit(self.won_text,
((won_surf.get_width() - self.won_text.get_width()) // 2,
(won_surf.get_height() - self.won_text.get_height()) // 2))
self.screen.blit(won_surf, (dx + (self.screen.get_width() - dx) // 4, self.screen.get_height() // 4))
pygame.display.update()
pygame.time.wait(1500)
self.state = 'reset'
pygame.display.flip()
def process_inputs(self):
# proceed events
for event in pygame.event.get():
if event.type == pygame.QUIT:
self.state = 'quit'
# handle MOUSEBUTTONUP
elif event.type == pygame.MOUSEBUTTONUP:
pos = pygame.mouse.get_pos()
# select command to edit by the programmer
if self.prg.screen_rect is not None:
if self.prg.screen_rect.collidepoint(pos):
print(f"clicked at pos = {pos}")
self.programmer.command_to_edit = self.prg.get_clicked_command(pos)
# set the selected command to a new value
if self.programmer.screen_rect is not None:
if self.programmer.screen_rect.collidepoint(pos):
self.programmer.update_selected_command(pos)
# clicked RUN/STOP button
if self.btn_rect is not None and self.btn_rect.collidepoint(*pos):
print(f"clicked at pos = {pos}")
if self.state == 'running':
self.state = 'input'
elif self.state == 'input':
self.state = 'running'
elif self.state == 'stepping':
self.state = self.prg.step(self.state)
elif event.type == pygame.KEYUP:
if event.key == pygame.K_x:
if not self.beamer_mode:
# switch to beamer mode
self.xoffset = 1000
os.environ['SDL_VIDEO_WINDOW_POS'] = f'{1920-self.xoffset}, 280'
self.scale_fac = 180
self.screen = pygame.display.set_mode((self.xoffset + self.board.tiles.shape[1] * self.scale_fac,
self.board.tiles.shape[0] * self.scale_fac),
pygame.NOFRAME)
self.beamer_mode = True
else:
# switch to normal mode
os.environ['SDL_VIDEO_WINDOW_POS'] = '0, 0'
self.scale_fac = 125
self.screen = pygame.display.set_mode((int(self.board.tiles.shape[1] * self.scale_fac * 1.1),
int((self.board.tiles.shape[0] + 2) * self.scale_fac * 1.2)))
self.beamer_mode = False
elif event.key == pygame.K_s:
# run program
self.state = 'running'
elif event.key == pygame.K_SPACE:
if self.state != 'stepping':
self.state = 'stepping'
else:
self.state = self.prg.step(self.state)
elif event.key == pygame.K_r:
self.state = 'reset'
elif event.key == pygame.K_n:
self.initial_board_tiles = Board(self.board.get_xdims(), self.board.get_ydims()).tiles.copy()
self.state = 'reset'
elif event.type == pygame.USEREVENT:
for i, cmd in enumerate(event.cmds):
self.cmds[i].action = cmd.action
self.cmds[i].color = np.array(cmd.color, dtype=np.uint8)
self.reset()
self.state = 'running'
return self.state
def reset(self):
self.prg.prg_counter = 0
self.robot.x = self.initial_pos[0]
self.robot.y = self.initial_pos[1]
self.robot.orientation = self.initial_pos[2]
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
self.board.tiles = self.initial_board_tiles.copy()
return 'input'
def run(self):
running = True
while running:
self.state = self.process_inputs()
if self.state == 'input':
pass
elif self.state == 'running':
self.state = self.prg.step()
elif self.state == 'reset':
self.state = self.reset()
elif self.state == 'quit':
running = False
elif self.state == 'stepping':
pass
elif self.state == 'game_over' or self.state == 'won':
pass
else:
print("unknown state")
return
self.render()
if self.robot.position_changed:
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
pygame.time.wait(100)
if __name__ == "__main__":
# launch webapp in thread
webserver_thread = threading.Thread(target=app.run, kwargs={'host': '0.0.0.0', 'port': 5000})
webserver_thread.start()
seed = 4
np.random.seed(seed)
game = Game(dimx=7, dimy=4, robotx=5, roboty=1, orientation='>', use_real_robot=False)
game.run()
# TODOs
# - in stepping mode (s) it is possible to drive outside of the map
# - when no P0 command is present the program counter will overflow

View File

@ -0,0 +1,124 @@
from gauss_turing import Program, Board, Command, Robot
import numpy as np
import json
levels = {
0: lambda cmds: len(cmds) <= 3,
1: lambda cmds: len(cmds) == 3
}
def rate_level(robot_path, cmds, board):
path_length = len(set(robot_path))
n_cmds = len(cmds)
n_cmd_colors = len(set([tuple(c.color) for c in cmds]))
difficulty = (path_length - 1) * (n_cmds + n_cmd_colors)
# place coins on the robot path to create a solution
if difficulty > 0:
n_coins = np.random.randint(1, min(path_length, 5)) - 1
# put one coin on last tile visited
coins = [robot_path[-1]]
# distribute other coins randomly on the path
# path without first and list tile
unique_tiles = list(set(robot_path) - {robot_path[-1]} - {robot_path[0]})
for _ in range(n_coins):
c = np.random.randint(0, len(unique_tiles))
new_coin = unique_tiles.pop(c)
coins.append(new_coin)
pass
else:
coins = []
return difficulty, coins
def generate_level(dimx, dimy, max_steps=100):
n_cmds = np.random.randint(2, 6)
assert n_cmds <= 5
# generate random board without any coins
board = Board(dimx, dimy, n_coins=0)
cmds = []
actions = list(sorted(Command.valid_actions - {'-'}))
# generate random commands
for i in range(n_cmds):
action = np.random.choice(actions)
color = Board.valid_colors[np.random.randint(len(Board.valid_colors))]
cmds.append(Command(action, color))
# generate robot at random position
rx = np.random.randint(0, dimx-1)
ry = np.random.randint(0, dimy-1)
orientation = np.random.choice(['>','v','<','^'])
r = Robot(rx, ry, orientation)
prg = Program(r, board, cmds)
continue_running = True
state = 'running'
prg_counter_old = prg.prg_counter
robot_path = [(r.x, r.y)]
step = 0
while continue_running and step < max_steps:
#print(f"prg_counter = {prg.prg_counter} - robot: {r} - state: {state}")
state = prg.step(state, check_victory=False)
robot_path.append((r.x, r.y))
stuck = prg.prg_counter == prg_counter_old
prg_counter_old = prg.prg_counter
if state == 'game_over' or stuck:
continue_running = False
step += 1
last_pos = robot_path[-1]
if not ((0 <= last_pos[0] < dimx) and (0 <= last_pos[1] < dimy)):
# remove last entry of path if robot leaves the board
robot_path.pop(-1)
difficulty, coins = rate_level(robot_path, cmds, board)
# put coins on the board
for coin in coins:
board.tiles[coin[1], coin[0]]['star'] = True
n_coins = len(coins)
return difficulty, board, n_coins, set(robot_path), (rx, ry, orientation), cmds
if __name__ == "__main__":
np.random.seed(2)
levels = {}
for i in range(100):
diff, board, n_coins, robot_path, init_robot_pos, solution = generate_level(7, 4)
if diff > 0:
print("difficulty: ", diff, "n_coins: ", n_coins, "path length: ", len(robot_path))
if diff in levels:
if n_coins > levels[diff]['n_coins'] and len(robot_path) > levels[diff]['path_length']:
levels[diff] = {'board': board, 'init_robot_pos': init_robot_pos, 'solution': solution,
'n_coins': n_coins, 'path_length': len(robot_path)}
else:
levels[diff] = {'board': board, 'init_robot_pos': init_robot_pos, 'solution': solution,
'n_coins': n_coins, 'path_length': len(robot_path)}
level_info = {}
for l, data in levels.items():
np.save(f'levels/{l}.npy', data['board'].tiles)
sol = [(cmd.action, tuple(map(int, cmd.color))) for cmd in data['solution']]
level_info[l] = {'init_robot_pos': data['init_robot_pos'], 'solution': sol,
'n_coins': int(data['n_coins']), 'path_length': int(data['path_length']),
'file': f'levels/{l}.npy'}
with open('levels/level_info.json', 'w') as f:
json.dump(level_info, f, indent=4)
pass

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.4 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.9 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.2 KiB

View File

@ -0,0 +1,61 @@
.cmd-canvas {
outline: 3px solid #dddddd;;
width: 430px;
height: 120px;
margin: 50px auto 0;
position: relative;
}
.programmer-canvas {
outline: 3px solid #dddddd;;
width: 338px;
height: 220px;
margin: 50px auto 0;
position: relative;
}
.register {
outline: 3px dashed #dddddd;;
width: 120px;
height: 240px;
margin: 0px auto 0;
position: relative;
}
.box{
width: 62px;
height: 100px;
position: absolute !important;
top: 100px;
font-size: 15px;
color: #000000;
line-height: 25px;
text-align: center;
cursor: move;
}
.top-buffer { margin-top:20px; }
.center {
margin: 20px;
position: absolute;
left: 50%;
-ms-transform: translateX(-50%, -50%);
transform: translateX(-50%, -50%);
}
.cmd0 { left: 0; top: 0; background-color: #E74C3C; position: absolute !important;}
.cmd1 { left: 92px; top: 0; background-color: #8E44AD; position: absolute !important;}
.cmd2 { left: 184px; top: 0; background-color: #5DADE2; }
.cmd3 { left: 276px; top: 0; background-color: #1ABC9C; }
.cmd4 { left: 368px; top: 0; background-color: #F1C40F; }
.programmer_cmd0 { left: 0; top: 0; background-color: #E74C3C; position: absolute !important;}
.programmer_cmd1 { left: 92px; top: 0; background-color: #8E44AD; position: absolute !important;}
.programmer_cmd2 { left: 184px; top: 0; background-color: #5DADE2; }
.programmer_cmd3 { left: 276px; top: 0; background-color: #1ABC9C; }
.programmer_cmd4 { left: 38px; top: 120px; background-color: #F1C40F; }
.programmer_cmd5 { left: 138px; top: 120px; background-color: #F39C12; }
.programmer_cmd6 { right: 38px; top: 120px; background-color: #34495E; }
.card_hidden { background-color: #dddddd; }

View File

@ -0,0 +1,136 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<title>imaginaerraum.de</title>
<script src="static/jquery-3.5.1.js"></script>
<script src="static/jquery-ui.min.js"></script>
<script src="static/bootstrap.min.js"></script>
<link rel="stylesheet" href="static/bootstrap.css">
<link rel="stylesheet" href="static/style.css">
</head>
<body>
<!-- container -->
<div class="container">
<!-- current program -->
<div class="row top-buffer">
Aktuelles Programm:
<div class="cmd-canvas">
{% for cmd in current_program %}
<div class="box cmd{{ loop.index0 }}" id="prg_cmd{{ loop.index0 }}" data-action="{{ cmd.action }}" data-color="{{ cmd.color }}" data-img="prg_cmd_img{{ loop.index0 }}"
style="background-color: rgba({{ cmd.color[0] }},{{ cmd.color[1] }},{{ cmd.color[2] }},0.85)">
<img id="prg_cmd_img{{ loop.index0 }}" src="static/{{ cmd.action }}.png">
{{ loop.index0 }}
</div>
{% endfor %}
</div>
</div>
<!-- possible commands -->
<div class="row top-buffer">
Mögliche Befehle:
<div class="programmer-canvas">
{% for cmd in valid_commands %}
<div class="box programmer_cmd{{ loop.index0 }}" id="valid_cmd{{ loop.index0 }}" data-action="{{ cmd.action }}"
data-color="{{ cmd.color }}" data-img="valid_cmd_img{{ loop.index0 }}"
style="background-color: rgba({{ cmd.color[0] }},{{ cmd.color[1] }},{{ cmd.color[2] }},0.85)">
<img id="valid_cmd_img{{ loop.index0 }}" src="static/{{ cmd.action }}.png">
</div>
{% endfor %}
</div>
</div>
<div class="row top-buffer center">
<div class="col-sm">
<input class="btn btn-primary" id="btnSubmit" type="submit" value="Befehle abschicken" />
</div>
<div class="col-sm">
<div id = "alert_placeholder"></div>
</div>
</div>
</div><!-- container -->
<script>
$(document).ready(function () {
var please_wait = $(".notification").hide();
var box = $(".box");
bootstrap_alert = function() {};
bootstrap_alert.warning = function(message) {
$('#alert_placeholder').html('<div class="alert alert-primary"><a class="close" data-dismiss="alert">×</a><span>'+message+'</span></div>')
};
$("#btnSubmit").click(function () {
//alert("button");
var cmds = {
{% for i in range(5) %}
"cmd{{ i }}": {"color": document.getElementById("prg_cmd{{ i }}").getAttribute("data-color"),
"action": document.getElementById("prg_cmd{{ i }}").getAttribute("data-action")},
{% endfor %}
}
//bootstrap_alert.warning('Bitte warten bis alle Spieler ihre Aktion gewählt haben!'); //$(please_wait).show();
var cmds_json = JSON.stringify(cmds);
$.post("/send_cmds", {"cmds_json": cmds_json}, function (data) {
console.log(data);
/*
if (data === 'OK') {
location.reload(); // reload page to get new cards for next round
}
else {
console.log('waiting...')
$.get("/send_cmds", "", function (data) {
if (data === 'OK') {
location.reload(); // reload page to get new cards for next round
}
})
}
*/
});
});
var edited_command = null;
box.click(function () {
//debugger
if ( this.id.includes('prg_cmd') ) {
// program card clicked -> select clicked card for editing
edited_command = this;
}
else if (this.id.includes('valid_cmd')) {
// progamming card clicked -> edit currently selected card
var this_card_action = this.getAttribute('data-action');
var this_card_color = this.getAttribute('data-color');
if (!!edited_command) { // only if there is a card selected
console.log("editing command " + edited_command);
if (this_card_action === "-") {
// set color
edited_command.setAttribute("data-color", this_card_color);
edited_command.style["backgroundColor"] = this.style["backgroundColor"];
}
else {
// set action
edited_command.setAttribute("data-action", this_card_action)
var edited_cmd_img = document.getElementById(edited_command.getAttribute("data-img"));
var prg_img = document.getElementById(this.getAttribute("data-img"));
edited_cmd_img.src = prg_img.src;
}
}
}
});
});
</script>
</body>
</html>

163
gauss-turing/game/webapp.py Normal file
View File

@ -0,0 +1,163 @@
from flask import Flask, render_template, request, session, make_response
import socket
import time
import numpy as np
from playsound import playsound
from itertools import zip_longest
import random
import json
import pygame
app = Flask(__name__)
app.secret_key = b'RoboRallyRolling'
probabilities = [0.21428571428571427, 0.14285714285714285, 0.07142857142857142, 0.07142857142857142,
0.21428571428571427, 0.21428571428571427, 0.07142857142857142]
#deck = roborally.CardDeck()
class Cmd:
possible_moves = ['forward', 'left', 'right', 'P0']
possible_colors = [(200,200,200), (255, 0, 0), (0,0,255)]
def __init__(self, action, color):
self.action = action
self.color = color
def __str__(self):
return "Cmd No. " + self.action
def __repr__(self):
return self.action + " " + str(self.color)
available_robots = {
0: {'id': 11, 'ip': '192.168.1.11', 'x': 1, 'y': 2, 'orientation': '>'},
1: {'id': 12, 'ip': '192.168.1.12', 'x': 3, 'y': 3, 'orientation': '>'}
}
players = {}
class Player:
MAX_PLAYERS = 4
player_counter = 0
def __init__(self):
if Player.player_counter < Player.MAX_PLAYERS:
self.id = Player.player_counter
Player.player_counter += 1
self.max_cards = 9
self.player_hand = deck.draw_cards(self.max_cards)
print("current hand: ", [str(c) for c in self.player_hand])
self.action_count = 5
self.action_chosen = False
self.initialize_robot()
else:
print("max players reached!")
def initialize_robot(self):
x = available_robots[self.id]['x']
y = available_robots[self.id]['y']
marker_id = available_robots[self.id]['id']
ip = available_robots[self.id]['ip']
orientation = available_robots[self.id]['orientation']
self.robot = game.board.create_robot(x, y, orientation, self.id, marker_id)
if game.comm_socket is not None:
cmd = f"initialize_robot, {marker_id}, {ip}, {x-1}, {y-1}, {orientation}\n"
game.comm_socket.sendall(cmd.encode())
data = game.comm_socket.recv(32)
if data.decode() == 'OK\n':
print("robot sucessfully initialized!")
else:
print("error: could not initialize robot!")
self.robot = None
def draw_new_cards(self):
self.player_hand += deck.draw_cards(self.max_cards - len(self.player_hand))
@app.route('/send_cmds', methods=['POST', 'GET'])
def send_cmds():
if request.method == 'POST':
cmds = json.loads(request.form['cmds_json'])
cmd_list = []
for cmd_nr in [f"cmd{i}" for i in range(5)]:
cmd = cmds[cmd_nr]
action = cmd['action']
color_str = cmd['color']
color_str = color_str.strip("()").split(",")
color = tuple(map(int, color_str))
cmd_list.append(Cmd(action, color))
print("got commands: ", cmd_list)
ev = pygame.event.Event(pygame.USEREVENT, {'cmds': cmd_list})
pygame.event.post(ev)
# send commands to the game
# if game.ready():
# game.process_actions()
#
# return 'OK'
# else:
# return 'please wait'
return "OK"
elif request.method == 'GET':
# GET is used when we have to wait for other players to finish
while not game.processing_done: # wait for other players to choose commands and processing to finish
pass
return 'OK'
WHITE = (255, 255, 255)
GRAY = (200, 200, 200)
RED = (255, 0, 0)
BLUE = (0, 0, 255)
@app.route('/', methods=['GET', 'POST'])
def hello_world():
prg = [Cmd('forward', GRAY), Cmd('left', GRAY), Cmd('right', BLUE),
Cmd('P0', GRAY), Cmd('right', RED)]
valid_cmds = [Cmd('forward', WHITE), Cmd('right', WHITE), Cmd('left', WHITE),
Cmd('P0', WHITE), Cmd('-', RED), Cmd('-', BLUE), Cmd('-', GRAY)]
if request.method == 'GET':
robot_info = None
return render_template('drag_example.html', current_program=prg, valid_commands=valid_cmds, robot_info=robot_info)
elif request.method == 'POST':
# print(request.form)
if request.form.get('drag') and request.form.get('drop'):
# swap cards in the current hand
i1 = int(request.form.get('drag')) # number of first card
i2 = int(request.form.get('drop')) # number of second card
card1 = deck.deck[i1] # get card by number
card2 = deck.deck[i2]
print("swapping {} and {}".format(card1, card2))
j1 = player_hand.index(card1) # get index of card in the hand
j2 = player_hand.index(card2)
player_hand[j1], player_hand[j2] = player_hand[j2], player_hand[j1] # swap the cards in the list
# print("current hand: ", [str(c) for c in player_hand[player_id]])
return 'OK'
else:
return render_template('drag_example.html', cmds=player_hand, player_id=player_id)
if __name__ == '__main__':
#app.run(host='192.168.1.222', port=5000)
app.run(host='0.0.0.0', port=5000)

View File

@ -0,0 +1,15 @@
import socket
import json
HOST, PORT = "localhost", 42424
def move_grid(x, y, orientation, dimx, dimy):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((HOST, PORT))
payload = json.dumps({"x": x, "y": y, "dimx": dimx, "dimy": dimy, "orientation": orientation})
sock.sendall(f"move_grid_blocking;{payload}\n".encode())
reply = sock.recv(1024)
print(reply)
if __name__ == "__main__":
move_grid(1,1,'^', 5, 5)

View File

@ -0,0 +1,478 @@
import numpy as np
import random
import pygame
import os
from event_server_comm import move_grid
BLACK = np.array([0, 0, 0], dtype=np.uint8)
WHITE = np.array([255, 255, 255], dtype=np.uint8)
RED = np.array([255, 0, 0], dtype=np.uint8)
BLUE = np.array([0, 0, 255], dtype=np.uint8)
YELLOW = np.array([255, 255, 0], dtype=np.uint8)
GREEN = np.array([0, 255, 0], dtype=np.uint8)
pygame.init()
pygame.font.init() # you have to call this at the start,
# if you want to use this module.
myfont = pygame.font.SysFont('Comic Sans MS', 55)
myfont_small = pygame.font.SysFont('Comic Sans MS', 45)
P0_text = myfont.render('P0', False, tuple(BLACK))
tiledt = np.dtype([('x', np.uint8), ('y', np.uint8), ('color', np.uint8, 3), ('star', np.bool)])
class Board:
valid_colors = [WHITE, RED, BLUE]
def __init__(self, dim_x, dim_y):
self.tiles = np.zeros((dim_y, dim_x), dtype=tiledt)
for x in range(dim_x):
for y in range(dim_y):
self.tiles[y, x]['color'] = random.choice(Board.valid_colors)
def render(self, scale_fac):
dimy, dimx = self.tiles.shape
board_surf = pygame.Surface((dimx * scale_fac, dimy * scale_fac))
star_surf = pygame.Surface((scale_fac, scale_fac), pygame.SRCALPHA)
pygame.draw.circle(star_surf, YELLOW, (int(0.5 * scale_fac), int(0.5 * scale_fac)), int(0.25 * scale_fac))
for y in range(self.tiles.shape[0]):
for x in range(self.tiles.shape[1]):
pygame.draw.rect(board_surf, tuple(self.tiles[y, x]['color']),
(x * scale_fac, y * scale_fac, scale_fac, scale_fac), 0)
pygame.draw.rect(board_surf, (0, 0, 0),
(x * scale_fac, y * scale_fac, scale_fac, scale_fac), 1)
if self.tiles[y, x]['star']:
board_surf.blit(star_surf, (x * scale_fac, y * scale_fac, scale_fac, scale_fac))
return board_surf
def get_xdims(self):
return self.tiles.shape[1]
def get_ydims(self):
return self.tiles.shape[0]
# def __repr__(self):
# s = ''
# for y in range(self.tiles.shape[0]):
# for x in range(self.tiles.shape[1]):
# if (x,y) == (self.robot.x, self.robot.y):
# s += self.robot.orientation
# else:
# s += '.'
# s += '\n'
# return s
class Robot:
orientations = ['^', 'left', 'down', 'right']
resulting_orientation = {
'^': {'left': '<', 'right': '>'},
'>': {'left': '^', 'right': 'v'},
'v': {'left': '>', 'right': '<'},
'<': {'left': 'v', 'right': '^'},
}
def __init__(self, x, y, orientation):
self.x = x
self.y = y
self.orientation = orientation
def get_forward_coordinates(self):
# get the coordinates of the neighboring tile in the given direction
if self.orientation == '^':
return self.y - 1, self.x
elif self.orientation == '>':
return self.y, self.x + 1
elif self.orientation == 'v':
return self.y + 1, self.x
elif self.orientation == '<':
return self.y, self.x - 1
else:
raise Exception("error: undefined direction")
def get_angle(self):
angle = {'>': 0, '^': np.pi/2, '<': np.pi, 'v': 3*np.pi/2}[self.orientation]
return np.rad2deg(angle)
def render(self, scale_fac):
robot_surf = pygame.Surface((scale_fac, scale_fac), pygame.SRCALPHA)
pygame.draw.lines(robot_surf, (0, 0, 0), True, [(0.75 * scale_fac, 0.5 * scale_fac),
(0.25 * scale_fac, 0.25 * scale_fac),
(0.25 * scale_fac, 0.75 * scale_fac)], 3)
robot_surf = pygame.transform.rotate(robot_surf, self.get_angle())
return robot_surf
def update_pos(self, dimx, dimy):
move_grid(self.x, self.y, self.orientation, dimx, dimy)
def __repr__(self):
return f"({self.y}, {self.x}) - {self.orientation}"
class Command:
valid_actions = {'forward', 'left', 'right', 'P0', '-'}
def __init__(self, action=None, color=WHITE):
if not (action in Command.valid_actions and any([np.all(color == c) for c in Board.valid_colors])):
raise ValueError("invalid values for command")
self.action = action
self.color = color
def __repr__(self):
return f"{self.action}: {self.color}"
def render(self, scale_fac):
cmd_surf = pygame.Surface((scale_fac, scale_fac))
cmd_surf.fill(tuple(self.color))
arrow_surf = pygame.Surface((300, 300), pygame.SRCALPHA)
pygame.draw.polygon(arrow_surf, (0, 0, 0),
((0, 100), (0, 200), (200, 200), (200, 300), (300, 150), (200, 0), (200, 100)))
arrow_surf = pygame.transform.scale(arrow_surf, (int(0.9*scale_fac), int(0.9*scale_fac)))
if self.action == 'forward':
arrow_surf = pygame.transform.rotate(arrow_surf, 90)
elif self.action == 'left':
arrow_surf = pygame.transform.rotate(arrow_surf, 180)
if self.action in {'left', 'forward', 'right'}:
cmd_surf.blit(arrow_surf, (0.05*scale_fac,0.05*scale_fac,0.95*scale_fac,0.95*scale_fac))
elif self.action == 'P0':
cmd_surf.blit(P0_text, (0.05*scale_fac,0.05*scale_fac,0.95*scale_fac,0.95*scale_fac))
return cmd_surf
class Programmer:
def __init__(self, prg):
self.prg = prg
self.available_inputs = [Command('forward'), Command('left'), Command('right'), Command('P0'),
Command('-', color=RED), Command('-', color=BLUE), Command('-', color=WHITE)]
self.command_to_edit = 0
self.screen_rect = None
def render(self, scale_fac):
"""Render surface with possible inputs for the robot.
:return: surface of the input commands
"""
inp_surf = pygame.Surface((len(self.available_inputs) * scale_fac, 1 * scale_fac))
for i, inp in enumerate(self.available_inputs):
cmd_surf = inp.render(scale_fac)
inp_surf.blit(cmd_surf, (i * scale_fac, 0, scale_fac, scale_fac))
return inp_surf
def update_selected_command(self, pos):
print(f"clicked at pos = {pos}")
xoffset = pos[0] - self.screen_rect.x
selected_input_index = xoffset * len(self.available_inputs) // self.screen_rect.width
selected_input = self.available_inputs[selected_input_index]
edited_command = self.prg.cmds[self.command_to_edit]
print("command before edit = ", edited_command)
if selected_input.action != '-':
edited_command.action = selected_input.action
else:
edited_command.color = selected_input.color
print("command after edit = ", edited_command)
class Program:
def __init__(self, robot, board, cmds):
self.cmds = cmds
self.robot = robot
self.board = board
self.prg_counter = 0
self.screen_rect = None
def step(self, state='running'):
cmd = self.cmds[self.prg_counter]
self.prg_counter += 1
# current position
x = self.robot.x
y = self.robot.y
# current tile the robot is on
tile = self.board.tiles[y, x]
# apply next instruction of the program
if np.all(cmd.color == WHITE) or np.all(cmd.color == tile['color']):
# matching color -> execute command
if cmd.action == 'forward':
ynew, xnew = self.robot.get_forward_coordinates()
self.robot.x = xnew
self.robot.y = ynew
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
elif cmd.action in {'left', 'right'}:
self.robot.orientation = Robot.resulting_orientation[self.robot.orientation][cmd.action]
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
elif cmd.action == 'P0':
self.prg_counter = 0
else:
print("color not matching -> skipping command")
# update state for new robot position
if (not (0 <= self.robot.x < self.board.tiles.shape[1])) or not (0 <= self.robot.y < self.board.tiles.shape[0]):
# robot leaves the board -> GAME OVER
print("GAME OVER")
return 'game_over'
# robot collects star on new tile
tile = self.board.tiles[self.robot.y, self.robot.x]
if tile['star']:
tile['star'] = False
if all([not t['star'] for t in self.board.tiles.flatten()]):
print("YOU WON")
return 'won'
# by default we continue in the running state
return state
def render(self, scale_fac, prg_counter_override=None):
"""Render the current program. This will render all commands and highlight the next command to execute
(determined by self.prg_counter).
The prg_counter_override can be used to highlight a different command instead. This is used during input mode
to highlight the command the user will edit.
:param scale_fac:
:param prg_counter_override:
:return:
"""
prg_counter = self.prg_counter
if prg_counter_override is not None:
prg_counter = prg_counter_override
prg_surf = pygame.Surface((5 * scale_fac, 1 * scale_fac))
for i in range(5):
if i < len(self.cmds):
cmd = self.cmds[i]
cmd_surf = cmd.render(scale_fac)
else:
cmd_surf = pygame.Surface((scale_fac,scale_fac))
cmd_surf.fill(WHITE)
if prg_counter is not None and i == prg_counter:
pygame.draw.rect(cmd_surf, tuple(GREEN), (0, 0, scale_fac, scale_fac), 5)
prg_surf.blit(cmd_surf, (i * scale_fac, 0, scale_fac, scale_fac))
return prg_surf
def get_clicked_command(self, pos):
print(f"clicked at pos = {pos}")
xoffset = pos[0] - self.screen_rect.x
clicked_cmd = xoffset * len(self.cmds) // self.screen_rect.width
print("clicked command = ", clicked_cmd)
return clicked_cmd
class Game:
def __init__(self, dimx, dimy, robotx, roboty):
self.robot = Robot(x=robotx, y=roboty, orientation='v')
self.board = Board(dimx, dimy)
coin1x = np.random.randint(0, dimx)
coin1y = np.random.randint(0, dimy)
self.board.tiles[coin1y,coin1x]['star'] = True
self.board.tiles[3,2]['star'] = True
# TODO fix number of commands at 5
self.cmds = [Command('forward'), Command('left', color=RED), Command('left', color=BLUE), Command('P0'), Command('-')]
self.state = 'reset'
self.prg = Program(self.robot, self.board, self.cmds)
self.programmer = Programmer(self.prg)
#self.scale_fac = 180
self.scale_fac = 125
self.beamer_mode = False
self.screen = pygame.display.set_mode((int(self.board.tiles.shape[1] * self.scale_fac * 1.1),
int((self.board.tiles.shape[0] + 2) * self.scale_fac * 1.2)))
self.game_over_text = myfont.render('GAME OVER', False, RED)
self.won_text = myfont.render('YOU WON', False, GREEN)
self.run_text = myfont.render('RUN', False, tuple(BLACK))
self.stop_text = myfont_small.render('STOP', False, tuple(BLACK))
self.step_text = myfont_small.render('STEP', False, tuple(BLACK))
# save initial state
self.initial_pos = (self.robot.x, self.robot.y, self.robot.orientation)
self.inital_board_tiles = self.board.tiles.copy()
def render(self):
"""Render the game screen.
This will render the board and the robot.
Depending on the display mode it will also render the current program and the input commands for the robot.
:param prg_counter:
:return:
"""
if self.beamer_mode:
dx = 0
dy = 0
else:
dx = int(0.05 * self.screen.get_width())
dy = int(0.05 * self.screen.get_height())
self.screen.fill(tuple(BLACK))
# render the board
board_surf = self.board.render(self.scale_fac)
# render robot onto the board surface
robot_surf = self.robot.render(self.scale_fac)
board_surf.blit(robot_surf, (self.robot.x * self.scale_fac, self.robot.y * self.scale_fac, self.scale_fac, self.scale_fac))
self.screen.blit(board_surf, (dx, dy, dx + self.board.tiles.shape[1] * self.scale_fac, dy + self.board.tiles.shape[0] * self.scale_fac))
if not self.beamer_mode:
# if we are not in beamer mode we also show the current program / inputs
# render program
if self.state == 'input':
# in input mode we highlight the command which is selected for edit
prg_surf = self.prg.render(self.scale_fac, prg_counter_override=self.programmer.command_to_edit)
else:
# in other modes we render the current program counter
prg_surf = self.prg.render(self.scale_fac)
prg_surf = pygame.transform.scale(prg_surf, (self.screen.get_width() * 2 // 3, self.scale_fac * 2 // 3))
self.prg.screen_rect = pygame.Rect((dx, board_surf.get_height() + 2 * dy, prg_surf.get_width(), prg_surf.get_height()))
self.screen.blit(prg_surf, self.prg.screen_rect)
# render input fields and buttons
inp_surf = self.programmer.render(self.scale_fac)
inp_surf = pygame.transform.scale(inp_surf, (self.screen.get_width() * 2 // 3, self.scale_fac * 2 // 3))
self.programmer.screen_rect = pygame.Rect((dx, board_surf.get_height() + prg_surf.get_height() + 3 * dy, inp_surf.get_width(), inp_surf.get_height()))
self.screen.blit(inp_surf, self.programmer.screen_rect)
btn_surf = pygame.Surface((3 * self.scale_fac//2, self.scale_fac))
self.btn_rect = pygame.Rect((2 * dx + prg_surf.get_width(), board_surf.get_height() + 2 * dy,
btn_surf.get_height(), btn_surf.get_width()))
if self.state == 'input':
btn_surf.fill(tuple(GREEN))
btn_surf.blit(self.run_text, (0, 10))
elif self.state == 'running':
btn_surf.fill(tuple(RED))
btn_surf.blit(self.stop_text, (0, 10))
elif self.state == 'stepping':
btn_surf.fill(tuple(YELLOW))
btn_surf.blit(self.step_text, (0, 10))
self.screen.blit(btn_surf, self.btn_rect)
# render messages
if self.state == 'game_over':
self.screen.blit(self.game_over_text, (50, 00))
pygame.display.update()
pygame.time.wait(1500)
self.state = 'reset'
elif self.state == 'won':
self.screen.blit(self.won_text, (50, 00))
pygame.display.update()
pygame.time.wait(1500)
self.state = 'reset'
pygame.display.flip()
def process_inputs(self):
# proceed events
for event in pygame.event.get():
if event.type == pygame.QUIT:
self.state = 'quit'
# handle MOUSEBUTTONUP
elif event.type == pygame.MOUSEBUTTONUP:
pos = pygame.mouse.get_pos()
# select command to edit by the programmer
if self.prg.screen_rect is not None:
if self.prg.screen_rect.collidepoint(pos):
print(f"clicked at pos = {pos}")
self.programmer.command_to_edit = self.prg.get_clicked_command(pos)
# set the selected command to a new value
if self.programmer.screen_rect is not None:
if self.programmer.screen_rect.collidepoint(pos):
self.programmer.update_selected_command(pos)
# clicked RUN/STOP button
if self.btn_rect is not None and self.btn_rect.collidepoint(*pos):
print(f"clicked at pos = {pos}")
if self.state == 'running':
self.state = 'input'
elif self.state == 'input':
self.state = 'running'
elif self.state == 'stepping':
self.state = self.prg.step(self.state)
elif event.type == pygame.KEYUP:
if event.key == pygame.K_x:
if not self.beamer_mode:
# switch to beamer mode
os.environ['SDL_VIDEO_WINDOW_POS'] = '1920, 280'
self.scale_fac = 180
self.screen = pygame.display.set_mode((self.board.tiles.shape[1] * self.scale_fac,
self.board.tiles.shape[0] * self.scale_fac),
pygame.NOFRAME)
self.beamer_mode = True
else:
# switch to normal mode
os.environ['SDL_VIDEO_WINDOW_POS'] = '0, 0'
self.scale_fac = 125
self.screen = pygame.display.set_mode((int(self.board.tiles.shape[1] * self.scale_fac * 1.1),
int((self.board.tiles.shape[0] + 2) * self.scale_fac * 1.2)))
self.beamer_mode = False
elif event.key == pygame.K_r:
# run program
self.state = 'running'
elif event.key == pygame.K_s:
if self.state != 'stepping':
self.state = 'stepping'
else:
self.state = self.prg.step(self.state)
return self.state
def reset(self):
self.prg.prg_counter = 0
self.robot.x = self.initial_pos[0]
self.robot.y = self.initial_pos[1]
self.robot.orientation = self.initial_pos[2]
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
self.board.tiles = self.inital_board_tiles.copy()
return 'input'
def run(self):
running = True
while running:
self.state = self.process_inputs()
if self.state == 'input':
pass
elif self.state == 'running':
self.state = self.prg.step()
elif self.state == 'reset':
self.state = self.reset()
elif self.state == 'quit':
running = False
elif self.state == 'stepping':
pass
elif self.state == 'game_over' or self.state == 'won':
pass
else:
print("unknown state")
return
self.render()
pygame.time.wait(100)
if __name__ == "__main__":
random.seed(42)
game = Game(dimx=7, dimy=4, robotx=3, roboty=1)
game.run()
# TODOs
# - in stepping mode (s) it is possible to drive outside of the map
# - when no P0 command is present the program counter will overflow

View File

@ -3,38 +3,168 @@ import numpy as np
import cv2
import os
import time
import math
from pyqtgraph.Qt import QtCore, QtGui
from pyqtgraph.parametertree import Parameter, ParameterTree, parameterTypes
import pyqtgraph as pg
from shapely.geometry import LineString
from shapely.affinity import scale
from queue import Queue
import aruco
class FPSCounter:
def __init__(self):
self.fps_start_time = time.time()
self.fps_counter = 0
self.fps_display_rate = 1.0
def get_fps(self):
self.fps_counter += 1
delta_t = time.time() - self.fps_start_time
if delta_t > self.fps_display_rate:
self.fps_counter = 0
self.fps_start_time = time.time()
return self.fps_counter / delta_t
## test add/remove
## this group includes a menu allowing the user to add new parameters into its child list
class RobotMarkerGroup(parameterTypes.GroupParameter):
def __init__(self, **opts):
opts['type'] = 'group'
opts['addText'] = "Add Marker"
opts['addList'] = ['Robot']
parameterTypes.GroupParameter.__init__(self, **opts)
def addNew(self, typ):
current_robots = self.children()
current_indices = [int(r.name().split(' ')[1]) for r in current_robots]
new_index = len(current_indices) + 1
val = 0
self.addChild(dict(name=f"Robot {new_index}", type='int', value=val, removable=False,
renamable=True))
class CornerMarkerGroup(parameterTypes.GroupParameter):
def __init__(self, **opts):
opts['type'] = 'group'
opts['addText'] = "Add Marker"
opts['addList'] = ['Corner']
parameterTypes.GroupParameter.__init__(self, **opts)
def addNew(self, typ):
current_corners = self.children()
current_chars = [str(r.name().split(' ')[1]) for r in current_corners]
new_char = chr(ord(current_chars[-1]) + 1)
val = 0
self.addChild(dict(name=f"Corner {new_char}", type='int', value=val, removable=False))
class ArucoEstimator:
corner_marker_ids = {
def __init__(self, robot_marker_ids=None, use_realsense=True, grid_columns=7, grid_rows=4):
self.app = QtGui.QApplication([])
## Create window with GraphicsView widget
self.win = QtGui.QWidget()
self.layout = QtGui.QGridLayout()
self.win.setLayout(self.layout)
self.win.keyPressEvent = self.handleKeyPressEvent
self.win.setWindowTitle('ArucoEstimator')
self.plotwidget = pg.PlotWidget()
self.layout.addWidget(self.plotwidget)
## lock the aspect ratio so pixels are always square
self.plotwidget.setAspectLocked(True)
self.plotwidget.getPlotItem().getAxis('left').hide()
self.plotwidget.getPlotItem().getAxis('bottom').hide()
## Create image item
self.img = pg.ImageItem(border='w')
self.img.setLevels([[0, 255], [0, 255], [0, 255]])
self.img.mouseClickEvent = self.handleMouseEvent
self.plotwidget.addItem(self.img)
# fps display
self.fps_counter = FPSCounter()
self.fps_overlay = pg.TextItem('fps = 0', color=(255, 255, 0), anchor=(0,1))
self.plotwidget.addItem(self.fps_overlay)
self.invert_grayscale = False
self.draw_grid = True
self.draw_markers = True
self.draw_marker_coordinate_system = False
self.corner_marker_size = 0.075
self.corner_marker_ids = {
'a': 0,
'b': 1,
'c': 2,
'd': 3
}
corner_estimates = {
self.corner_estimates = {
'a': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'b': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'c': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'd': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
}
def __init__(self, robot_marker_ids=None, use_realsense=True, grid_columns=8, grid_rows=8):
self.grid_columns = grid_columns
self.grid_rows = grid_rows
self.robot_marker_size = 0.07
if robot_marker_ids is None:
robot_marker_ids = []
self.robot_marker_ids = robot_marker_ids
self.robot_marker_estimates = dict([(marker_id, {'t': None, 'x': None, 'y': None, 'angle': None})
for marker_id in self.robot_marker_ids])
self.draw_grid = False
robot_marker_group = [{'name': f'Robot {ind}', 'type': 'int', 'value': marker_id} for ind, marker_id in
enumerate(self.robot_marker_ids, 1)]
corner_marker_group = [{'name': f'Corner {letter}', 'type': 'int', 'value': marker_id} for letter, marker_id in
self.corner_marker_ids.items()]
self.threshold = 10.0
# parameter
params_spec = [
{'name': 'Corner marker size', 'type': 'float', 'value': self.corner_marker_size, 'siPrefix': True,
'suffix': 'm', 'limits': (0.01, 1), 'step': 0.001},
{'name': 'Robot marker size', 'type': 'float', 'value': self.robot_marker_size, 'siPrefix': True,
'suffix': 'm', 'limits': (0.01, 1), 'step': 0.001},
{'name': 'Draw markers', 'type': 'bool', 'value': self.draw_markers},
{'name': 'Draw marker coordinate system', 'type': 'bool', 'value': self.draw_marker_coordinate_system},
{'name': 'Threshold', 'type': 'float', 'value': self.threshold},
{'name': 'Invert grayscale', 'type': 'bool', 'value': self.invert_grayscale, 'tip': "Invert grayscale image before marker detection"},
{'name': 'Show FPS', 'type': 'bool', 'value': True, 'tip': "Display frames per second counter"},
{'name': 'Draw grid', 'type': 'bool', 'value': self.draw_grid, 'tip': "Draw grid spanned by the markers 0 - 3"},
{'name': 'Grid columns', 'type': 'int', 'value': self.grid_columns, 'tip': "Number of columns for the grid"},
{'name': 'Grid rows', 'type': 'int', 'value': self.grid_rows, 'tip': "Number of rows for the grid"},
{'name': 'Display mode', 'type': 'list', 'values': ['color', 'grayscale'], 'value': 'grayscale', 'tip': "Display mode for the video"},
{'name': 'Autoexposure', 'type': 'bool', 'value': True},
{'name': 'Controlled robot', 'type': 'list', 'values': self.robot_marker_ids, 'tip': 'Robot to control'},
RobotMarkerGroup(name="Robot markers", children=robot_marker_group),
CornerMarkerGroup(name="Corner markers", children=corner_marker_group),
]
self.params = Parameter.create(name='params', type='group', children=params_spec)
self.params.param('Invert grayscale').sigValueChanged.connect(lambda _, v: self.__setattr__('invert_grayscale', v))
self.params.param('Threshold').sigValueChanged.connect(lambda _, v: self.__setattr__('threshold', v))
self.params.param('Draw markers').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_markers', v))
self.params.param('Draw marker coordinate system').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_marker_coordinate_system', v))
self.params.param('Draw grid').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_grid', v))
self.params.param('Grid columns').sigValueChanged.connect(lambda _, v: self.__setattr__('grid_columns', v))
self.params.param('Grid rows').sigValueChanged.connect(lambda _, v: self.__setattr__('grid_rows', v))
self.params.param('Corner marker size').sigValueChanged.connect(lambda _, v: self.__setattr__('corner_marker_size', v))
self.params.param('Robot marker size').sigValueChanged.connect(lambda _, v: self.__setattr__('robot_marker_size', v))
self.params.param('Show FPS').sigValueChanged.connect(lambda _, v: self.fps_overlay.show() if v else self.fps_overlay.hide())
self.params.param('Autoexposure').sigValueChanged.connect(lambda _, v: self.set_autoexposure(v))
self.params.param('Controlled robot').sigValueChanged.connect(lambda _, v: self.event_queue.put(('controlled_robot',
{'robot_id': v})))
self.paramtree = ParameterTree()
self.paramtree.setParameters(self.params, showTop=False)
self.layout.addWidget(self.paramtree)
self.win.show() ## show widget alone in its own window
self.event_queue = Queue()
@ -42,15 +172,14 @@ class ArucoEstimator:
# Configure depth and color streams
self.pipeline = rs.pipeline()
config = rs.config()
#config.enable_stream(rs.stream.color, 1920, 1080, rs.format.bgr8, 30)
# config.enable_stream(rs.stream.color, 1920, 1080, rs.format.bgr8, 30)
config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30)
# Start streaming
self.pipeline.start(config)
# disable auto exposure
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
color_sensor.set_option(rs.option.enable_auto_exposure, False)
# enable auto exposure
self.set_autoexposure(self.params['Autoexposure'])
camera_intrinsics = self.pipeline.get_active_profile().get_stream(
rs.stream.color).as_video_stream_profile().get_intrinsics()
@ -67,38 +196,31 @@ class ArucoEstimator:
self.pipeline = None
# create detector and get parameters
self.detector = aruco.MarkerDetector()
# self.detector.setDictionary('ARUCO_MIP_36h12')
#self.detector.setDictionary('ARUCO_MIP_16h3')
# self.detector.setDictionary('ARUCO')
#self.detector.setDetectionMode(aruco.DM_NORMAL, 0.05)
self.detector.setDetectionMode(aruco.DM_VIDEO_FAST, 0.05)
self.detector_params = self.detector.getParameters()
# print detector parameters
print("detector params:")
for val in dir(self.detector_params):
if not val.startswith("__"):
print("\t{} : {}".format(val, self.detector_params.__getattribute__(val)))
# print("detector params:")
# print(self.detector_params)
# for val in dir(self.detector_params):
# if not val.startswith("__"):
# print("\t{} : {}".format(val, self.detector_params.__getattribute__(val)))
self.camparam = aruco.CameraParameters()
if use_realsense:
self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "realsense.yml"))
else:
self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "dfk72_6mm_param2.yml"))
print(self.camparam)
def mouse_callback(self, event, px, py, flags, param):
"""
This function is called for each mouse event inside the opencv window.
If there are reference markers available we compute the real world position corresponding to the clicked
position and put it in an event queue.
:param event: type of event (mouse movement, left click, right click, etc.)
:param px: x-position of event
:param py: y-position of event
"""
if event == cv2.EVENT_LBUTTONDOWN:
self.drag_line_end = None
self.drag_line_start = None
self.previous_click = None
def set_autoexposure(self, v):
if self.pipeline is not None:
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
color_sensor.set_option(rs.option.enable_auto_exposure, v)
print(color_sensor.get_option(rs.option.enable_auto_exposure))
def compute_clicked_position(self, px, py):
if self.all_corners_detected():
# inter/extrapolate from clicked point to marker position
px1 = self.corner_estimates['a']['pixel_coordinate'][0]
@ -118,28 +240,51 @@ class ArucoEstimator:
target_x = x1 + alpha * (x3 - x1)
target_y = y1 + beta * (y3 - y1)
target = np.array([target_x, target_y, 0])
print(f"target = ({target_x},{target_y})")
self.event_queue.put(('click', target))
target = np.array([target_x, target_y])
else:
print("not all markers have been detected!")
target = np.array([px, -py])
return target
def run_tracking(self, draw_markers=True, draw_marker_coordinate_system=False, invert_grayscale=False):
"""
Run the marker tracking in a loop
"""
cv2.namedWindow('RoboRally', cv2.WINDOW_AUTOSIZE)
cv2.setMouseCallback('RoboRally', self.mouse_callback)
def handleMouseEvent(self, event):
# get click position as distance to top-left corner of the image
px = event.pos().x()
py = self.img.height() - event.pos().y()
print(f"px = {px}, py = {py}")
if event.button() == QtCore.Qt.MouseButton.LeftButton:
# self.drag_line_start = (px, py)
# elif event == cv2.EVENT_LBUTTONUP:
self.drag_line_end = (px, py)
self.drag_line_start = (px, py)
target_pos = self.compute_clicked_position(self.drag_line_start[0], self.drag_line_start[1])
if self.drag_line_start != self.drag_line_end:
# compute target angle for clicked position
facing_pos = self.compute_clicked_position(px, py)
v = facing_pos - target_pos
target_angle = math.atan2(v[1], v[0])
else:
# determine angle from previously clicked pos (= self.drag_line_end)
if self.previous_click is not None:
previous_pos = self.compute_clicked_position(self.previous_click[0], self.previous_click[1])
v = target_pos - previous_pos
target_angle = math.atan2(v[1], v[0])
else:
target_angle = 0.0
try:
running = True
while running:
target = np.array([target_pos[0], target_pos[1], target_angle])
print(target)
self.previous_click = (px, py)
self.event_queue.put(('click', {'x': target[0], 'y': target[1], 'angle': target[2]}))
self.drag_line_start = None
elif event == cv2.EVENT_MOUSEMOVE:
if self.drag_line_start is not None:
self.drag_line_end = (px, py)
def process_frame(self):
if self.pipeline:
frames = self.pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
if not color_frame:
continue
# Convert images to numpy arrays
color_image = np.asanyarray(color_frame.get_data())
else:
@ -148,29 +293,41 @@ class ArucoEstimator:
t_image = time.time()
gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY)
if invert_grayscale:
if self.invert_grayscale:
cv2.bitwise_not(gray, gray)
detector = aruco.MarkerDetector()
detector.setDetectionMode(aruco.DM_VIDEO_FAST, 0.01)
#detector_params = detector.getParameters()
# run aruco marker detection
detected_markers = self.detector.detect(gray)
detected_markers = detector.detect(gray)
# detected_markers2 = detector.detect(gray)
#gray = detector.getThresholdedImage()
display_mode = self.params.param('Display mode').value()
#print(f"detected_markers = {[marker.id for marker in detected_markers]}")
#print("threshold = ", self.threshold)
# extract data for detected markers
detected_marker_data = {}
for marker in detected_markers:
detected_marker_data[marker.id] = {'marker_center': marker.getCenter()}
if marker.id >= 0:
detected_marker_data[marker.id] = {'marker_center': marker.getCenter()}
if marker.id in self.corner_marker_ids.values():
marker.calculateExtrinsics(0.1, self.camparam)
marker.calculateExtrinsics(self.corner_marker_size, self.camparam)
else:
marker.calculateExtrinsics(0.07, self.camparam)
marker.calculateExtrinsics(self.robot_marker_size, self.camparam)
detected_marker_data[marker.id]['Rvec'] = marker.Rvec
detected_marker_data[marker.id]['Tvec'] = marker.Tvec
if marker.id >= 0: # draw markers onto the image
if draw_markers:
if self.draw_markers:
marker.draw(color_image, np.array([255, 255, 255]), 2, True)
if draw_marker_coordinate_system:
if self.draw_marker_coordinate_system:
aruco.CvDrawingUtils.draw3dAxis(color_image, self.camparam, marker.Rvec, marker.Tvec, .1)
# store data
@ -182,29 +339,32 @@ class ArucoEstimator:
color_image = self.draw_grid_lines(color_image, detected_marker_data)
color_image = self.draw_robot_pos(color_image, detected_marker_data)
# Show images
cv2.imshow('RoboRally', color_image)
key = cv2.waitKey(1)
# draw drag
if self.drag_line_start is not None and self.drag_line_end is not None:
color_image = cv2.line(color_image, self.drag_line_start, self.drag_line_end, color=(0, 0, 255), thickness=2)
if key > 0:
# compute frame rate
self.fps_overlay.setText(f"fps = {self.fps_counter.get_fps():.1f}")
# Show images
color_image_rgb = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) # convert to RGB
if display_mode == 'color':
self.img.setImage(np.transpose(np.flipud(color_image_rgb), axes=(1, 0, 2)))
elif display_mode == 'grayscale':
self.img.setImage(np.transpose(np.flipud(gray)))
QtCore.QTimer.singleShot(1, self.process_frame)
def handleKeyPressEvent(self, ev):
key = ev.key()
self.event_queue.put(('key', key))
if key == ord('g'):
self.draw_grid = not self.draw_grid
if key == ord('q'):
running = False
if key == ord('a'):
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
if color_sensor.get_option(rs.option.enable_auto_exposure) == 1.0:
color_sensor.set_option(rs.option.enable_auto_exposure, False)
print("auto exposure OFF")
else:
color_sensor.set_option(rs.option.enable_auto_exposure, True)
print("auto exposure ON")
finally:
cv2.destroyAllWindows()
if key == QtCore.Qt.Key_Q:
if self.pipeline is not None:
# Stop streaming
self.pipeline.stop()
self.app.quit()
elif key == QtCore.Qt.Key_I:
self.invert_grayscale = not self.invert_grayscale
def update_estimate(self, marker_id, pixel_coord_center, rvec, tvec, t_image):
# update the marker estimate with new data
@ -244,7 +404,7 @@ class ArucoEstimator:
_, _, _, _, _, _, euler_angles = cv2.decomposeProjectionMatrix(pose_mat)
angle = -euler_angles[2][0] * np.pi / 180.0
self.robot_marker_estimates[marker_id] = {'t': t_image, 'x': x, 'y': y, 'angle': angle}
self.robot_marker_estimates[marker_id] = {'t': float(t_image), 'x': float(x), 'y': float(y), 'angle': float(angle)}
def all_corners_detected(self):
# checks if all corner markers have been detected at least once
@ -254,7 +414,7 @@ class ArucoEstimator:
# checks if all robot markers have been detected at least once
return not any([estimate['t'] is None for estimate in self.robot_marker_estimates.values()])
def get_pos_from_grid_point(self, x, y, orientation=None):
def get_pos_from_grid_point(self, x, y, dimx, dimy, orientation=None):
"""
returns the position for the given grid point based on the current corner estimates
:param x: x position on the grid ( 0 &le x &lt number of grid columns)
@ -263,17 +423,21 @@ class ArucoEstimator:
:return: numpy array with corresponding real world x- and y-position
if orientation was specified the array also contains the matching angle for the orientation
"""
assert 0 <= x < self.grid_columns
assert 0 <= y < self.grid_rows
assert self.all_corners_detected()
if not self.all_corners_detected():
#raise RuntimeError("not all corner markers have been detected yet")
a = np.array([0,1])
b = np.array([1,1])
c = np.array([1,0])
d = np.array([0,0])
else:
# compute column line
a = np.array([self.corner_estimates['a']['x'], self.corner_estimates['a']['y']])
b = np.array([self.corner_estimates['b']['x'], self.corner_estimates['b']['y']])
c = np.array([self.corner_estimates['c']['x'], self.corner_estimates['c']['y']])
d = np.array([self.corner_estimates['d']['x'], self.corner_estimates['d']['y']])
x_frac = (x + 0.5) / self.grid_columns
y_frac = (y + 0.5) / self.grid_rows
x_frac = (x + 0.5) / dimx
y_frac = (y + 0.5) / dimy
vab = b - a
vdc = c - d
@ -285,8 +449,11 @@ class ArucoEstimator:
row_line_top = a + y_frac * vad
row_line_bottom = b + y_frac * vbc
column_line = LineString([column_line_top, column_line_bottom])
row_line = LineString([row_line_top, row_line_bottom])
column_line_orig = LineString([column_line_top, column_line_bottom])
row_line_orig = LineString([row_line_top, row_line_bottom])
column_line = scale(column_line_orig, 10, 10)
row_line = scale(row_line_orig, 10, 10)
int_pt = column_line.intersection(row_line)
point_of_intersection = np.array([int_pt.x, int_pt.y])
@ -328,7 +495,7 @@ class ArucoEstimator:
corner_1_center = self.corner_estimates[corner_1]['pixel_coordinate']
corner_2_center = self.corner_estimates[corner_2]['pixel_coordinate']
if corner_1_center is not None and corner_2_center is not None:
frame = cv2.line(frame, tuple(corner_1_center), tuple(corner_2_center), color=(0, 0, 255), thickness=2)
frame = cv2.line(frame, tuple(corner_1_center.astype(int)), tuple(corner_2_center.astype(int)), color=(0, 0, 255), thickness=2)
return frame
def draw_grid_lines(self, frame, detected_marker_data):
@ -351,7 +518,7 @@ class ArucoEstimator:
for x in range(1, self.grid_columns):
column_line_top = a + x / self.grid_columns * vab
column_line_bottom = d + x / self.grid_columns * vdc
frame = cv2.line(frame, tuple(column_line_top), tuple(column_line_bottom), color=(0, 255, 0),
frame = cv2.line(frame, tuple(column_line_top.astype(int)), tuple(column_line_bottom.astype(int)), color=(0, 255, 0),
thickness=1)
# draw horizontal lines
@ -360,7 +527,7 @@ class ArucoEstimator:
for y in range(1, self.grid_rows):
row_line_top = a + y / self.grid_rows * vad
row_line_bottom = b + y / self.grid_rows * vbc
frame = cv2.line(frame, tuple(row_line_top), tuple(row_line_bottom), color=(0, 255, 0),
frame = cv2.line(frame, tuple(row_line_top.astype(int)), tuple(row_line_bottom.astype(int)), color=(0, 255, 0),
thickness=1)
return frame
@ -397,4 +564,9 @@ class ArucoEstimator:
if __name__ == "__main__":
estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14])
estimator.run_tracking(draw_markers=True, invert_grayscale=True)
estimator.process_frame()
import sys
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtGui.QApplication.instance().exec_()

View File

@ -143,7 +143,7 @@ class OpenLoopSolver:
def solve(self, x0, target):
target = np.asarray(target)
angles_unwrapped = np.unwrap([x0[2], target[2]]) # unwrap angle to avoid jump in data
x0[2] = angles_unwrapped[0]
target[2] = angles_unwrapped[1]

View File

@ -0,0 +1,123 @@
import numpy as np
import time
from robot import ControlledRobot
from pid_controller import PIDController
from mpc_controller import MPCController
from event_listener import EventListener
class ControlCommander:
valid_controller_types = {'pid': PIDController,
'mpc': MPCController}
def __init__(self, id_ip_dict, controller_type='pid'):
"""
:param id_ip_dict: dictionary containing robot marker id and ip of the robot, e.g. { 12: '10.10.11.91' }
:param controller_type: string 'pid', 'mpc'; or dictionary with robot id and controller type string, e.g. { 12: 'mpc', 13: 'pid'}
"""
self.robots = {}
for id, ip in id_ip_dict.items():
self.robots[id] = ControlledRobot(id, ip)
for id, r in self.robots.items():
r.connect()
if type(controller_type) == dict:
for id, ctype in controller_type.items():
if ctype in ControlCommander.valid_controller_types:
self.robots[id].attach_controller(ControlCommander.valid_controller_types[ctype]())
else:
raise Exception(f"invalid controller type {ctype} specified for robot {id}. "
f"valid controller types are {list(ControlCommander.valid_controller_types.keys())}")
elif controller_type in ControlCommander.valid_controller_types:
for id, r in self.robots.items():
r.attach_controller(ControlCommander.valid_controller_types[controller_type]())
else:
raise Exception(f"invalid controller type {controller_type} specified. valid controller types are "
f"{list(ControlCommander.valid_controller_types.keys())}")
self.event_listener = EventListener(event_server=('127.0.0.1', 42424))
self.current_robot_index = 0
self.controlling = False
self.running = False
def run(self):
unconnected_robots = list(filter(lambda r: not r.connected, self.robots.values()))
if len(unconnected_robots) > 0:
print(f"warning: could not connect to the following robots: {unconnected_robots}")
return
all_detected = False
while not all_detected:
undetected_robots = list(filter(lambda r: None in r.get_measurement(), self.robots.values()))
all_detected = len(undetected_robots) == 0
if not all_detected:
print(f"warning: no measurements available for the following robots: {undetected_robots}")
time.sleep(0.5)
print("all robots detected -> starting control")
self.running = True
while self.running:
while not self.event_listener.event_queue.empty():
event = self.event_listener.event_queue.get()
self.handle_event(event)
def handle_event(self, event):
# handle events from opencv window
print("event: ", event)
if event[0] == 'click': # non-blocking move to target pos
target = event[1]
target_pos = np.array([target['x'], target['y'], target['angle']])
controlled_robot_id = list(self.robots.keys())[self.current_robot_index]
self.robots[controlled_robot_id].move_to_pos(target_pos)
elif event[0] == 'move_blocking': # blocking move to specified target position
target = event[1]
target_pos = np.array([target['x'], target['y'], target['angle']])
controlled_robot_id = list(self.robots.keys())[self.current_robot_index]
# initialize move and wait for the robot to reach the target position
self.robots[controlled_robot_id].move_to_pos(target_pos, blocking=True)
#time.sleep(0.5)
# send confirmation to the server which initiated the command
self.event_listener.send_reply("ack\n".encode())
elif event[0] == 'key':
key = event[1]
if key == 16777235: # arrow up
self.controlling = not self.controlling
if not self.controlling:
print("disable control")
for _, r in self.robots.items():
r.stop_control()
else:
print("enable control")
for _, r in self.robots.items():
r.start_control()
elif key == 16777236: # left
# switch controlled robot
self.current_robot_index = (self.current_robot_index + 1) % len(self.robots)
controlled_robot_id = list(self.robots.keys())[self.current_robot_index]
robot = self.robots[controlled_robot_id]
print(f"controlled robot: {robot.id}")
elif key == 113 or key == 27: # q or ESCAPE
print("quit!")
for r in self.robots:
r.stop_control()
self.running = False
if __name__ == '__main__':
id_ip_dict = {
#11: '10.10.11.88',
#12: '192.168.1.12',
13: '192.168.1.13',
#14: '10.10.11.89',
}
# controller_type = {12: 'mpc', 13: 'pid'}
controller_type = 'mpc'
rc = ControlCommander(id_ip_dict, controller_type=controller_type)
rc.run()

View File

@ -0,0 +1,61 @@
import threading
import time
class ControllerBase:
def __init__(self, control_rate=0.1):
self.robot = None
self.controlling = False
self.target_pos = None
self.control_thread = None
self.control_rate = control_rate
def control_loop(self):
while self.controlling:
if self.target_pos is not None:
state = self.get_measured_position()
control = self.compute_control(state)
if self.controlling:
self.apply_control(control)
if self.robot is not None:
self.robot.send_cmd(u1=0, u2=0) # stop robot
def set_target_position(self, target_pos):
self.target_pos = target_pos
def get_measured_position(self):
if self.robot is not None:
return self.robot.get_measurement()
else:
raise Exception("error: controller cannot get measurement!\n"
" there is no robot attached to the controller!")
def compute_control(self, state):
return 0.0, 0.0
def apply_control(self, control):
if self.robot is not None:
self.robot.send_cmd(u1=control[0], u2=control[1])
else:
raise Exception("error: controller cannot apply control!\n"
" there is no robot attached to the controller!")
time.sleep(self.control_rate)
def attach_robot(self, robot):
self.robot = robot
def start(self):
self.controlling = True
# start control thread
self.control_thread = threading.Thread(target=self.control_loop)
self.control_thread.start()
def stop(self):
# pause controller
self.controlling = False
if self.control_thread is not None:
self.control_thread.join()

View File

@ -0,0 +1,52 @@
import socket
import threading
import queue
import json
class EventListener:
def __init__(self, event_server):
self.event_thread = threading.Thread(target=self.receive_events)
self.event_thread.daemon = True # mark thread as daemon -> it terminates automatically when program shuts down
self.event_queue = queue.Queue()
self.receiving = False
# connect to event server
print(f"connecting to event server on {event_server} ...")
self.event_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP socket
try:
self.event_socket.connect(event_server)
self.event_socket.sendall(f"events\n".encode())
self.event_socket.settimeout(0.1)
# check if we receive data from the event server
response = self.event_socket.recv(1024)
if 'error' not in str(response):
print("... connected! -> start listening for events")
self.event_socket.settimeout(None)
# if so we start the event thread
self.event_thread.start()
else:
print(f"error: cannot communicate with the event server.\n The response was: {response}")
except socket.timeout:
print(f"error: the event server did not respond.")
except ConnectionRefusedError:
print(f"error: could not connect to event server at {event_server}.")
def send_reply(self, data):
self.event_socket.sendall(data)
def receive_events(self):
self.receiving = True
while self.receiving:
received = str(self.event_socket.recv(1024), "utf-8")
if len(received) > 0:
events = received.split('\n')
for event_json in events:
if len(event_json) > 0:
event = json.loads(event_json)
self.event_queue.put(event)
else:
self.receiving = False
print("event server seems to have shut down -> stop listening")

View File

@ -0,0 +1,62 @@
import socket
import pygame
from argparse import ArgumentParser
class ManualController:
def __init__(self, estimator):
self.estimator = estimator
self.estimator.external_keyboard_callback = self.handle_keypress
self.controlling = False
def handle_keypress(self, key):
pass
parser = ArgumentParser()
parser.add_argument('ip', metavar='ip', type=str, help='ip address of the controlled robot')
args = parser.parse_args()
ip = args.ip
pygame.init()
pygame.display.set_mode((640, 480))
rc_socket = socket.socket()
try:
rc_socket.connect((ip, 1234)) # connect to robot
except socket.error:
print("could not connect to socket")
running = True
while running:
u1 = 0
u2 = 0
vmax = 1.0
events = pygame.event.get()
for event in events:
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT:
u1 = -vmax
u2 = vmax
print(f"turn left: ({u1},{u2})")
elif event.key == pygame.K_RIGHT:
u1 = vmax
u2 = -vmax
print(f"turn right: ({u1},{u2})")
elif event.key == pygame.K_UP:
u1 = vmax
u2 = vmax
print(f"forward: ({u1},{u2})")
elif event.key == pygame.K_DOWN:
u1 = -vmax
u2 = -vmax
print(f"backward: ({u1},{u2})")
elif event.key == pygame.K_ESCAPE:
print("quit")
running = False
u1 = 0.0
u2 = 0.0
rc_socket.send(f'({u1},{u2})\n'.encode())
elif event.type == pygame.KEYUP:
print("key released, resetting: ({},{})".format(u1, u2))
rc_socket.send(f'({u1},{u2})\n'.encode())

View File

@ -2,12 +2,14 @@ import socket
HOST, PORT = "localhost", 42424
robot_id = 15
robot_id = 12
# SOCK_DGRAM is the socket type to use for UDP sockets
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(f"{robot_id}\n".encode(), (HOST, PORT))
while True:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((HOST, PORT))
sock.sendall(f"{robot_id}\n".encode()) # request data for robot with given id
#sock.sendall(f"events\n".encode()) # request events
receiving = True
while receiving:
received = str(sock.recv(1024), "utf-8")
print("Received: {}".format(received))
receiving = len(received) > 0

View File

@ -1,46 +1,151 @@
import socketserver
import threading
import time
import json
from queue import Queue
from aruco_estimator import ArucoEstimator
class MeasurementHandler(socketserver.BaseRequestHandler):
def handle(self) -> None:
data = self.request[0]
socket = self.request[1]
data = self.request.recv(1024).strip()
cur_thread = threading.current_thread()
print(f"current thread {cur_thread}")
print(f"start current thread {cur_thread}")
if 'events' in data.decode():
print(f"{cur_thread} subscribed to events")
self.request.sendall('subscribed to events\n'.encode())
# send any events in the event queue to the subscriber
event_loop_running = True
while event_loop_running:
try:
while not self.server.estimator.event_queue.empty():
event = self.server.estimator.event_queue.get()
# we distinguish two kinds of events:
if event[0] == 'response_event':
# 1. for 'response_event' events we expect the event subscriber to give a reply which will then
# by passed on to the response queue for transmitting to the original correspondent
message = event[1]['event']
#print(f"passing command {message} on to subscriber")
self.request.sendall((json.dumps(message) + '\n').encode())
reply = self.request.recv(1024)
#print(f"putting reply {reply} in response queue")
self.server.response_queue.put(reply)
else:
# 2. for other types of events we don't expect a reply and just pass them on to the subscriber
self.request.sendall((json.dumps(event) + '\n').encode())
self.server.estimator.last_event = None
time.sleep(1.0 / self.server.max_measurements_per_second)
except Exception as e:
print(f"exception in event loop: {e}")
event_loop_running = False
print("after event loop")
elif 'corners' in data.decode():
print(f"{cur_thread} subscribed to corners")
# send positions of the board markers
corner_estimates = self.server.estimator.corner_estimates
response = {}
for corner, data in corner_estimates.items():
response[corner] = {'x': data['x'], 'y': data['y']}
self.request.sendall((json.dumps(response) + '\n').encode())
elif 'move_grid_blocking' in data.decode():
print(f"{cur_thread} subscribed move_grid_blocking")
# if we receive a move_grid event
# ( e.g. move_grid_blocking;{"x":1,"y":1,"dimx":10,"dimy":10,"orientation":"^"} )
#move_grid_blocking;{"x": 1, "y": 2, "dimx": 7, "dimy": 4, "orientation": ">"}
# we compute the corresponding real-world position the robot should drive to
# and then create a new move event which is put in the event queue and will be propagated to the ControlCommander
data_decode = data.decode()
#print("data: ", data_decode)
payload = data_decode.split(';')[1]
#print("payload: ", payload)
grid_pos = json.loads(payload)
#print("grid_pos = ", grid_pos)
pos = self.server.estimator.get_pos_from_grid_point(grid_pos['x'], grid_pos['y'], grid_pos['dimx'],
grid_pos['dimy'], grid_pos['orientation'])
#print("pos = ", pos)
#print("event requiring response")
# put blocking move command in event queue
self.server.estimator.event_queue.put(('response_event',
{'event': ('move_blocking', {'x': pos[0], 'y': pos[1], 'angle': pos[2]})}))
# wait for response of the move command
# TODO this assumes that we wait only for at most one response at a time
# we could add some kind of reference here to handle multiple responses (e.g. id of the response to wait for)
while self.server.response_queue.empty():
time.sleep(0.2)
self.request.sendall(b'.\n')
pass
while not self.server.response_queue.empty():
reply = self.server.response_queue.get()
# send back response to the original source
print(f"sending reply {reply} back to correspondent {self.request}")
self.request.sendall(reply)
else:
print(f"{cur_thread} subscribed to robot position")
# send robot position
try:
marker_id = int(data)
if marker_id in self.server.estimator.robot_marker_estimates:
while True:
socket.sendto(f"{self.server.estimator.robot_marker_estimates[marker_id]}\n".encode(),
self.client_address)
time.sleep(1.0 / self.server.max_measurements_per_second)
else:
socket.sendto("error: unknown robot marker id\n".encode(),
self.client_address)
except ValueError:
socket.sendto("error: data not understood. expected robot marker id (int)\n".encode(), self.client_address)
marker_id = None
if marker_id is not None:
if marker_id in self.server.estimator.robot_marker_estimates:
marker_loop_running = True
while marker_loop_running:
try:
self.request.sendall((json.dumps(self.server.estimator.robot_marker_estimates[marker_id])
+ '\n').encode())
time.sleep(1.0 / self.server.max_measurements_per_second)
except Exception as e:
print(f"exception in marker loop: {e}")
marker_loop_running = False
else:
self.request.sendall("error: unknown robot marker id\n".encode())
else:
self.request.sendall("error: data not understood. "
"expected <robot marker id (int)> or 'events'\n".encode())
print(f"end current thread {cur_thread}")
return
class MeasurementServer(socketserver.ThreadingMixIn, socketserver.UDPServer):
class MeasurementServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
allow_reuse_address = True
def __init__(self, server_address, RequestHandlerClass, estimator, max_measurements_per_second=30):
super().__init__(server_address, RequestHandlerClass)
self.estimator = estimator
self.max_measurements_per_second = max_measurements_per_second
self.response_queue = Queue()
def handle_error(self, request, client_address):
print("an error occurred -> terminating connection")
if __name__ == "__main__":
aruco_estimator = ArucoEstimator(use_realsense=False, robot_marker_ids=[15])
estimator_thread = threading.Thread(target=aruco_estimator.run_tracking)
estimator_thread.start()
aruco_estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[12, 13])
with MeasurementServer(('127.0.0.1', 42424), MeasurementHandler, aruco_estimator,
max_measurements_per_second=30) as measurement_server:
measurement_server.serve_forever()
# first we start thread for the measurement server
measurement_server = MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator,
max_measurements_per_second=30)
server_thread = threading.Thread(target=measurement_server.serve_forever)
server_thread.start()
# receive with: nc 127.0.0.1 42424 -u -> 15 + Enter
# now we start the Aruco estimator GUI
aruco_estimator.process_frame()
import sys
from pyqtgraph.Qt import QtCore, QtGui
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtGui.QApplication.instance().exec_()
#with MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator,
# max_measurements_per_second=30) as measurement_server:
# measurement_server.serve_forever()
# receive with: nc 127.0.0.1 42424 -> 12 + Enter

View File

@ -0,0 +1,117 @@
from flask import Flask, render_template, request
import socket
import threading
import time
import numpy as np
from robot import Robot
from mpc_controller import MPCController
from aruco_estimator import ArucoEstimator
from shapely.geometry import LineString
width = 600
height = 400
robot_ids = [11, 12, 13, 14]
robot_ids_control_history = [11, 12, 13, 14]
app = Flask(__name__)
def calc_target_pos(px, py):
if rc.estimator.all_corners_detected():
# compute column line
a = np.array([rc.estimator.corner_estimates['a']['x'], rc.estimator.corner_estimates['a']['y']])
b = np.array([rc.estimator.corner_estimates['b']['x'], rc.estimator.corner_estimates['b']['y']])
c = np.array([rc.estimator.corner_estimates['c']['x'], rc.estimator.corner_estimates['c']['y']])
d = np.array([rc.estimator.corner_estimates['d']['x'], rc.estimator.corner_estimates['d']['y']])
x_frac = (px + 0.5) / width
y_frac = (py + 0.5) / height
vab = b - a
vdc = c - d
column_line_top = a + x_frac * vab
column_line_bottom = d + x_frac * vdc
vad = d - a
vbc = c - b
row_line_top = a + y_frac * vad
row_line_bottom = b + y_frac * vbc
column_line = LineString([column_line_top, column_line_bottom])
row_line = LineString([row_line_top, row_line_bottom])
int_pt = column_line.intersection(row_line)
point_of_intersection = np.array([int_pt.x, int_pt.y, 0.0])
return point_of_intersection
@app.route('/', methods=('GET', 'POST'))
def presence():
global robot_ids_control_history
if request.method == 'GET':
return render_template('telepresence.html', oldest_control=robot_ids.index(robot_ids_control_history[0]))
elif request.method == 'POST':
x = int(float(request.form.get('x')))
y = int(float(request.form.get('y')))
print(x, y)
margin = 0.1
x = min(max(width * margin, x), width * (1 - margin))
y = min(max(height * margin, y), height * (1 - margin))
id = int(request.form.get('robot'))
robot_ids_control_history.append(robot_ids_control_history.pop(robot_ids_control_history.index(id)))
target_pos = calc_target_pos(x, y)
rc.estimator.event_queue.put(('robot', id, target_pos))
return 'OK'
class RemoteController:
def __init__(self):
self.robots = []
#self.robots = [Robot(11, '192.168.1.11')] #, Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
self.robots = [Robot(11, '10.10.11.88'), Robot(12, '10.10.11.91'),
Robot(13, '10.10.11.90'), Robot(14, '10.10.11.89')]
self.robot_ids = {}
for r in self.robots:
self.robot_ids[r.id] = r
# socket for movement commands
self.comm_socket = socket.socket()
self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
for robot in self.robots:
robot.connect()
self.comm_socket.bind(('', 1337))
self.comm_socket.listen(5)
self.t = time.time()
# start thread for marker position detection
self.estimator = ArucoEstimator(self.robot_ids.keys())
self.estimator_thread = threading.Thread(target=self.estimator.run_tracking, kwargs={'draw_markers':False})
self.estimator_thread.start()
self.controller = MPCController(self.estimator)
def run(self):
print("waiting until all markers are detected...")
while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
pass
print("starting control")
self.controller.interactive_control(robots=self.robots)
if __name__ == "__main__":
rc = RemoteController()
rc_thread = threading.Thread(target=rc.run)
rc_thread.start()
app.run('0.0.0.0')

View File

@ -1,175 +1,62 @@
import numpy as np
import time
from controller import ControllerBase
from casadi_opt import OpenLoopSolver
class MPCController:
def __init__(self, estimator):
self.t = time.time()
class MPCController(ControllerBase):
def __init__(self, N=10, T=1.0):
super().__init__()
self.t = None
self.estimator = estimator
self.controlling = False
self.ols = OpenLoopSolver(N=N, T=T)
self.ols.setup()
self.control_rate = self.ols.T / self.ols.N
self.mstep = 2
self.ols = OpenLoopSolver(N=20, T=1.0)
self.ols.setup()
self.dt = self.ols.T / self.ols.N
# integrator
self.omega_max = 5.0
self.control_scaling = 0.4
self.control_scaling = 0.5
def move_to_pos(self, target_pos, robot, near_target_counter=5):
near_target = 0
while near_target < near_target_counter:
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
pass
elif event[0] == 'key':
key = event[1]
if key == 84: # arrow up
self.controlling = True
def set_target_position(self, target_pos):
super(MPCController, self).set_target_position(target_pos)
self.t = time.time()
elif key == 82: # arrow down
self.controlling = False
robot.send_cmd()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0])
elif key == 43: # +
self.control_scaling += 0.1
self.control_scaling = min(self.control_scaling, 1.0)
print("control scaling = ", self.control_scaling)
elif key == 45: # -
self.control_scaling -= 0.1
self.control_scaling = max(self.control_scaling, 0.1)
print("control scaling = ", self.control_scaling)
elif key == 113:
print("quit!")
self.controlling = False
robot.send_cmd()
return
elif key == 27: # escape
print("quit!")
self.controlling = False
robot.send_cmd()
return
x_pred = self.get_measurement(robot.id)
def compute_control(self, state):
x_pred = np.array(state[1:])
if x_pred is not None:
error_pos = np.linalg.norm(x_pred[0:2] - target_pos[0:2])
angles_unwrapped = np.unwrap([x_pred[2], target_pos[2]]) # unwrap angle to avoid jump in data
error_pos = np.linalg.norm(x_pred[0:2] - self.target_pos[0:2])
angles_unwrapped = np.unwrap([x_pred[2], self.target_pos[2]]) # unwrap angle to avoid jump in data
error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1])
# print("error pos = ", error_pos)
# print("error_pos = {}, error_ang = {}".format(error_pos, error_ang))
# if error_pos > 0.075 or error_ang > 0.35:
if error_pos > 0.05 or error_ang > 0.1:
if error_pos > 0.05 or error_ang > 0.2:
#if error_pos > 0.05:
# solve mpc open loop problem
res = self.ols.solve(x_pred, target_pos)
res = self.ols.solve(x_pred, self.target_pos)
# us1 = res[0]
# us2 = res[1]
us1 = res[0] * self.control_scaling
us2 = res[1] * self.control_scaling
# print("u = {}", (us1, us2))
# print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
dt_mpc = time.time() - self.t
if dt_mpc < self.dt: # wait until next control can be applied
# print("sleeping for {} seconds...".format(self.dt - dt_mpc))
time.sleep(self.dt - dt_mpc)
if dt_mpc < self.control_rate: # wait until next control can be applied
time.sleep(self.control_rate - dt_mpc)
else:
us1 = [0] * self.mstep
us2 = [0] * self.mstep
near_target += 1
# send controls to the robot
return us1, us2
def apply_control(self, control):
if self.robot is not None:
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
u1 = us1[i]
u2 = us2[i]
robot.send_cmd(u1, u2)
u1 = control[0][i]
u2 = control[1][i]
self.robot.send_cmd(u1, u2)
if i < self.mstep:
time.sleep(self.dt)
time.sleep(self.control_rate)
self.t = time.time() # save time the most recent control was applied
else:
print("robot not detected yet!")
def interactive_control(self, robots):
controlled_robot_number = 0
robot = robots[controlled_robot_number]
target_pos = np.array([0.0, 0.0, 0.0])
running = True
while running:
# handle events from opencv window
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
target_pos = event[1]
elif event[0] == 'key':
key = event[1]
if key == 32: # arrow up
self.controlling = not self.controlling
if not self.controlling:
print("disable control")
robot.send_cmd() # stop robot
else:
print("enable control")
self.t = time.time()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0]) # TODO: use center of board for target pos
elif key == 43: # +
self.control_scaling += 0.1
self.control_scaling = min(self.control_scaling, 1.0)
print("control scaling = ", self.control_scaling)
elif key == 45: # -
self.control_scaling -= 0.1
self.control_scaling = max(self.control_scaling, 0.1)
print("control scaling = ", self.control_scaling)
elif key == 9: # TAB
# switch controlled robot
robot.send_cmd() # stop current robot
controlled_robot_number = (controlled_robot_number + 1) % len(robots)
robot = robots[controlled_robot_number]
print(f"controlled robot: {robot.id}")
elif key == 113 or key == 27: # q or ESCAPE
print("quit!")
self.controlling = False
robot.send_cmd()
return
if self.controlling:
# measure state
x_pred = self.get_measurement(robot.id)
# print(np.linalg.norm(x_pred-target_pos))
# solve mpc open loop problem
res = self.ols.solve(x_pred, target_pos)
us1 = res[0] * self.control_scaling
us2 = res[1] * self.control_scaling
dt_mpc = time.time() - self.t
if dt_mpc < self.dt: # wait until next control can be applied
time.sleep(self.dt - dt_mpc)
# send controls to the robot
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
u1 = us1[i]
u2 = us2[i]
robot.send_cmd(u1, u2)
if i < self.mstep:
time.sleep(self.dt)
self.t = time.time() # save time the most recent control was applied
def get_measurement(self, robot_id):
return self.estimator.get_robot_state_estimate(robot_id)
raise Exception("error: controller cannot apply control!\n"
" there is no robot attached to the controller!")

View File

@ -2,48 +2,98 @@ import sys
import socket
import threading
import time
import json
from robot import Robot
from robot import Robot, ControlledRobot
from mpc_controller import MPCController
from aruco_estimator import ArucoEstimator
class RemoteController:
def __init__(self):
self.robots = []
#self.robots = [Robot(11, '192.168.1.11'), Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
#self.robots = [Robot(11, '192.168.1.11')] #, Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
#self.robots = [Robot(12, '10.10.11.91'), Robot(13, '10.10.11.90'), Robot(14, '10.10.11.89')]
#self.robots = [Robot(14, '10.10.11.89')]
self.robots = [ControlledRobot(13, '192.168.1.13'), ControlledRobot(12, '192.168.1.12')]
self.robot_ids = {}
for r in self.robots:
self.robot_ids[r.id] = r
# socket for movement commands
self.comm_socket = socket.socket()
self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.controlled_robot = self.robots[0].id
for robot in self.robots:
robot.connect()
self.comm_socket.bind(('', 1337))
self.comm_socket.listen(5)
# thread for handling events received by measurement server (from GUI)
self.event_thread = threading.Thread(target=self.handle_events)
self.event_thread.daemon = True
# connect to socket for events from GUI
try:
HOST, PORT = "localhost", 42424
self.event_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.event_socket.connect((HOST, PORT))
self.event_socket.sendall("events\n".encode())
self.event_socket.settimeout(0.1)
# check if we receive data from the measurement server
response = self.event_socket.recv(1024)
if 'error' not in str(response):
print("... connected! -> start listening for measurements")
self.event_socket.settimeout(None)
# if so we start the measurement thread
self.event_thread.start()
else:
print(f"error: cannot communicate with the measurement server.\n The response was: {response}")
except socket.timeout:
print(f"error: the measurement server did not respond with data.")
#sock.sendall(f"{robot_id}\n".encode()) # request data for robot with given id
# sock.sendall(f"events\n".encode()) # request events
# receiving = True
# while receiving:
# received = str(sock.recv(1024), "utf-8")
# print("Received: {}".format(received))
# receiving = len(received) > 0
self.t = time.time()
# start thread for marker position detection
self.estimator = ArucoEstimator(self.robot_ids.keys())
self.estimator_thread = threading.Thread(target=self.estimator.run_tracking)
self.estimator_thread.start()
self.controllers = []
for r in self.robots:
c = MPCController()
self.controllers.append(c)
r.attach_controller(c)
self.controller = MPCController(self.estimator)
def handle_events(self):
receiving = True
while receiving:
received = str(self.event_socket.recv(1024), "utf-8")
if len(received) > 0:
last_received = received.split('\n')[-2]
event_type, payload = json.loads(last_received)
print(event_type, payload)
if event_type == 'click':
target_pos = (payload['x'], payload['y'], payload['angle'])
self.robot_ids[self.controlled_robot].move_to_pos(target_pos=target_pos)
elif event_type == 'controlled_robot':
self.controlled_robot = payload['robot_id']
else:
receiving = False
print(f"measurement server stopped sending event data")
def run(self):
print("waiting until all markers are detected...")
while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
pass
#print("waiting until all markers are detected...")
#while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
# pass
print("starting control")
self.controller.interactive_control(robots=self.robots)
#self.controller.interactive_control(robots=self.robots)
for c in self.controllers:
c.start()
pass
def main(args):
@ -53,3 +103,12 @@ def main(args):
if __name__ == '__main__':
main(sys.argv)
# to use the program, first start measurement_server.py, which launches the Aruco marker detection GUI and streams
# marker positions and GUI events via socket
# next, launch this program (mpc_test.py)
# you can now direct the robots via the GUI (assuming four corner markers for position reference have been detected)
# click any position on the screen and the current robot will drive there
# you can also switch which robot should be controlled in the GUI
# TODO document and improve program structure

View File

@ -2,15 +2,15 @@ import numpy as np
import math
import time
from controller import ControllerBase
class PIDController:
def __init__(self, estimator):
self.t = time.time()
self.estimator = estimator
self.controlling = False
class PIDController(ControllerBase):
def __init__(self):
super().__init__()
self.t = None
self.P_angle = 0.4 # PID parameters determined by Ziegler-Nichols. measured K_crit = 1.4, T_crit = 1.5
self.P_angle = 0.4
self.I_angle = 0.35
self.D_angle = 0.1
@ -18,211 +18,66 @@ class PIDController:
self.I_pos = 0.3
self.D_pos = 0.1
self.mode = None
def move_to_pos(self, target_pos, robot, near_target_counter=5):
near_target = 0
while near_target < near_target_counter:
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
pass
elif event[0] == 'key':
key = event[1]
if key == 84: # arrow up
self.controlling = True
self.t = time.time()
elif key == 82: # arrow down
self.controlling = False
robot.send_cmd()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0])
elif key == 43: # +
self.control_scaling += 0.1
self.control_scaling = min(self.control_scaling, 1.0)
print("control scaling = ", self.control_scaling)
elif key == 45: # -
self.control_scaling -= 0.1
self.control_scaling = max(self.control_scaling, 0.1)
print("control scaling = ", self.control_scaling)
elif key == 113:
print("quit!")
self.controlling = False
robot.send_cmd()
return
elif key == 27: # escape
print("quit!")
self.controlling = False
robot.send_cmd()
return
x_pred = self.get_measurement(robot.id)
if x_pred is not None:
error_pos = np.linalg.norm(x_pred[0:2] - target_pos[0:2])
angles_unwrapped = np.unwrap([x_pred[2], target_pos[2]]) # unwrap angle to avoid jump in data
error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1])
# print("error pos = ", error_pos)
# print("error_pos = {}, error_ang = {}".format(error_pos, error_ang))
# if error_pos > 0.075 or error_ang > 0.35:
if error_pos > 0.05 or error_ang > 0.1:
# solve mpc open loop problem
res = self.ols.solve(x_pred, target_pos)
# us1 = res[0]
# us2 = res[1]
us1 = res[0] * self.control_scaling
us2 = res[1] * self.control_scaling
# print("u = {}", (us1, us2))
# print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
dt_mpc = time.time() - self.t
if dt_mpc < self.dt: # wait until next control can be applied
# print("sleeping for {} seconds...".format(self.dt - dt_mpc))
time.sleep(self.dt - dt_mpc)
else:
us1 = [0] * self.mstep
us2 = [0] * self.mstep
near_target += 1
# send controls to the robot
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
u1 = us1[i]
u2 = us2[i]
robot.send_cmd(u1, u2)
if i < self.mstep:
time.sleep(self.dt)
self.t = time.time() # save time the most recent control was applied
else:
print("robot not detected yet!")
def interactive_control(self, robots):
controlled_robot_number = 0
robot = robots[controlled_robot_number]
ts = []
angles = []
target_pos = np.array([0.0, 0.0, 0.0])
e_angle_old = 0.0
e_pos_old = 0.0
i = 0.0
i_angle = 0.0
i_pos = 0.0
t0 = time.time()
running = True
while running:
# handle events from opencv window
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
target_pos = event[1]
i_angle = 0
i_pos = 0
e_pos_old = 0
e_angle_old = 0
self.mode = 'combined'
elif event[0] == 'key':
key = event[1]
if key == 32: # arrow up
self.controlling = not self.controlling
if not self.controlling:
print("disable control")
robot.send_cmd() # stop robot
self.mode = None # reset control mode
else:
print("enable control")
i = 0
self.t = time.time()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0]) # TODO: use center of board for target pos
elif key == 97: # a
self.mode = 'angle'
e_angle_old = 0
i = 0
self.t = time.time()
elif key == 99: # c
self.e_angle_old = 0.0
self.e_pos_old = 0.0
self.i = 0.0
self.i_angle = 0.0
self.i_pos = 0.0
def set_target_position(self, target_pos):
super(PIDController, self).set_target_position(target_pos)
self.mode = 'combined'
e_angle_old = 0
e_pos_old = 0
i_angle = 0
i_pos = 0
self.t = time.time()
elif key == 112: # p
self.mode = 'position'
e_pos_old = 0
i = 0
self.t = time.time()
elif key == 43: # +
self.P_pos += 0.1
print("P = ", self.P_angle)
elif key == 45: # -
self.P_pos -= 0.1
print("P = ", self.P_angle)
elif key == 9: # TAB
# switch controlled robot
robot.send_cmd() # stop current robot
controlled_robot_number = (controlled_robot_number + 1) % len(robots)
robot = robots[controlled_robot_number]
print(f"controlled robot: {robot.id}")
elif key == 113 or key == 27: # q or ESCAPE
print("quit!")
self.controlling = False
robot.send_cmd()
return
self.e_angle_old = 0.0
self.e_pos_old = 0.0
if self.controlling:
self.i = 0.0
self.i_angle = 0.0
self.i_pos = 0.0
def compute_control(self, state):
# measure state
x_pred = self.get_measurement(robot.id)
dt = self.t - time.time()
x_pred = state[1:]
if self.t is None:
dt = 0.1
else:
dt = time.time() - self.t # time since last control was applied
#print(f"x_pred = {x_pred}\ntarget = {target_pos}\nerror = {np.linalg.norm(target_pos - x_pred)}\n")
if self.mode == 'angle':
# compute angle such that robot faces to target point
target_angle = target_pos[2]
ts.append(time.time() - t0)
angles.append(x_pred[2])
target_angle = self.target_pos[2]
angles_unwrapped = np.unwrap([x_pred[2], target_angle]) # unwrap angle to avoid jump in data
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
p = self.P_angle * e_angle
# i += self.I * dt * e # right Riemann sum
i += self.I_angle * dt * (e_angle + e_angle_old)/2.0 # trapezoidal rule
d = self.D_angle * (e_angle - e_angle_old)/dt
self.i += self.I_angle * dt * (e_angle + self.e_angle_old)/2.0 # trapezoidal rule
d = self.D_angle * (e_angle - self.e_angle_old)/dt
u1 = p - i - d
u1 = p + self.i + d
u2 = - u1
e_angle_old = e_angle
self.e_angle_old = e_angle
elif self.mode == 'combined':
# compute angle such that robot faces to target point
v = target_pos[0:2] - x_pred[0:2]
v = self.target_pos[0:2] - x_pred[0:2]
target_angle = math.atan2(v[1], v[0])
angles_unwrapped = np.unwrap([x_pred[2], target_angle]) # unwrap angle to avoid jump in data
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
e_pos = np.linalg.norm(v)
e_pos = np.linalg.norm(v[0:2])
if e_pos < 0.05:
if e_pos < 0.03:
self.mode = 'angle'
e_angle_old = 0
e_pos_old = 0
i_angle = 0
i_pos = 0
self.e_angle_old = 0
self.e_pos_old = 0
self.i_angle = 0
self.i_pos = 0
u1 = 0
u2 = 0
else:
@ -235,33 +90,31 @@ class PIDController:
e_angle += np.pi
p_angle = self.P_angle * e_angle
i_angle += self.I_angle * dt * (e_angle + e_angle_old) / 2.0 # trapezoidal rule
d_angle = self.D_angle * (e_angle - e_angle_old) / dt
self.i_angle += self.I_angle * dt * (e_angle + self.e_angle_old) / 2.0 # trapezoidal rule
d_angle = self.D_angle * (e_angle - self.e_angle_old) / dt
p_pos = self.P_pos * e_pos
i_pos += self.I_pos * dt * (e_pos + e_pos_old) / 2.0 # trapezoidal rule
d_pos = self.D_pos * (e_pos - e_pos_old) / dt
self.i_pos += self.I_pos * dt * (e_pos + self.e_pos_old) / 2.0 # trapezoidal rule
d_pos = self.D_pos * (e_pos - self.e_pos_old) / dt
if forward:
print("forward")
u1 = p_angle + p_pos - i_angle - i_pos - d_angle - d_pos
u2 = - p_angle + p_pos + i_angle - i_pos + d_angle - d_pos
u1 = p_angle + p_pos + self.i_angle + self.i_pos + d_angle + d_pos
u2 = - p_angle + p_pos - self.i_angle + self.i_pos - d_angle + d_pos
else:
print("backward")
u1 = p_angle - p_pos - i_angle + i_pos - d_angle + d_pos
u2 = - p_angle - p_pos + i_angle + i_pos + d_angle + d_pos
e_pos_old = e_pos
e_angle_old = e_angle
u1 = p_angle - p_pos + self.i_angle - self.i_pos + d_angle - d_pos
u2 = - p_angle - p_pos - self.i_angle - self.i_pos - d_angle - d_pos
self.e_pos_old = e_pos
self.e_angle_old = e_angle
else:
u1 = 0.0
u2 = 0.0
#print(f"u = ({u1}, {u2})")
robot.send_cmd(u1, u2)
return u1, u2
def stop(self):
super(PIDController, self).stop()
self.t = None
def apply_control(self, control):
self.t = time.time() # save time when the most recent control was applied
time.sleep(0.1)
def get_measurement(self, robot_id):
return self.estimator.get_robot_state_estimate(robot_id)
super(PIDController, self).apply_control(control)

View File

@ -1,12 +1,21 @@
import socket
import threading
import json
import numpy as np
import time
import math
class Robot:
def __init__(self, marker_id, ip):
def __init__(self, marker_id, ip, measurement_server=('127.0.0.1', 42424)):
self.id = marker_id
self.pos = None
self.euler = None
self.t_last_measurement = None
self.x = None
self.y = None
self.angle = None
self.ip = ip
self.port = 1234
self.socket = socket.socket()
# currently active control
@ -15,18 +24,47 @@ class Robot:
self.connected = False
self.measurement_server = measurement_server
self.measurement_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP socket
self.measurement_thread = threading.Thread(target=self.receive_measurements)
# mark thread as daemon -> it terminates automatically when program shuts down
self.measurement_thread.daemon = True
self.receiving = False
def connect(self):
# connect to robot
try:
print("connecting to robot {} with ip {} ...".format(self.id, self.ip))
self.socket.connect((self.ip, 1234)) # connect to robot
print(f"connecting to robot {self.ip} at {self.ip}:{self.port} ...")
self.socket.connect((self.ip, self.port)) # connect to robot
print("connected!")
self.connected = True
except socket.error:
print("could not connect to robot {} with ip {}".format(self.id, self.ip))
print(f"error: could not connect to robot {self.id} at {self.ip}:{self.port}")
# connect to measurement server
print(f"connecting to measurement server on {self.measurement_server} ...")
try:
self.measurement_socket.connect(self.measurement_server)
self.measurement_socket.sendall(f"{self.id}\n".encode())
self.measurement_socket.settimeout(0.1)
# check if we receive data from the measurement server
response = self.measurement_socket.recv(1024)
if 'error' not in str(response):
print("... connected! -> start listening for measurements")
self.measurement_socket.settimeout(None)
# if so we start the measurement thread
self.measurement_thread.start()
else:
print(f"error: cannot communicate with the measurement server.\n The response was: {response}")
except socket.timeout:
print(f"error: the measurement server did not respond with data.")
except ConnectionRefusedError:
print(f"error: could not connect to measurement server at {self.measurement_server}.")
def send_cmd(self, u1=0.0, u2=0.0):
if self.socket:
if self.socket and self.connected:
try:
self.socket.send(f'({u1},{u2})\n'.encode())
except BrokenPipeError:
@ -35,3 +73,81 @@ class Robot:
except ConnectionResetError:
print(f"error: connection to robot {self.id} with ip {self.ip} lost")
pass
else:
print(f"error: robot {self.id} is not connected to {self.ip}")
def receive_measurements(self):
self.receiving = True
while self.receiving:
received = str(self.measurement_socket.recv(1024), "utf-8")
if len(received) > 0:
last_received = received.split('\n')[-2]
measurement = json.loads(last_received)
self.t_last_measurement = measurement['t']
self.x = measurement['x']
self.y = measurement['y']
self.angle = measurement['angle']
else:
self.receiving = False
print(f"measurement server stopped sending data for robot {self.id}")
def get_measurement(self):
return self.t_last_measurement, self.x, self.y, self.angle
def __str__(self):
connection_state = '' if self.connected else ' not'
state = self.get_measurement()
last_measurement = f'last measurement = {state}' if None not in state else 'no measurements available'
return f"Robot {self.id}: ip = {self.ip}:{self.port} ({connection_state} connected ) " + last_measurement
def __repr__(self):
return str(self)
class ControlledRobot(Robot):
def __init__(self, marker_id, ip):
super().__init__(marker_id, ip)
self.controller = None
def start_control(self):
if self.controller is not None:
print("starting control")
self.controller.start()
else:
raise Exception("Error: Cannot start control: there is not controller attached to the robot!")
def stop_control(self):
if self.controller is not None:
print("stopping control")
self.controller.stop()
else:
raise Exception("Error: Cannot stop control: there is not controller attached to the robot!")
def attach_controller(self, controller):
self.controller = controller
self.controller.attach_robot(self)
def move_to_pos(self, target_pos, blocking=False):
if self.controller is not None:
if not blocking:
self.controller.set_target_position(target_pos)
else: # only return after the robot has reached the target
self.stop_control()
self.controller.set_target_position(target_pos)
self.start_control()
close_to_target = False
while not close_to_target:
current_pos = np.array([self.x, self.y, self.angle])
v = target_pos - current_pos
angles_unwrapped = np.unwrap([current_pos[2], target_pos[2]]) # unwrap angle to avoid jump in data
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
e_pos = np.linalg.norm(v[0:2])
#print(f"target_pos = {target_pos}, current_pos = {current_pos}, e_pos = {e_pos}, e_ang = {e_angle}")
print(f"e_pos = {e_pos}, e_ang = {e_angle}")
close_to_target = e_pos < 0.075 and abs(e_angle) < 0.1
time.sleep(0.1)
print("target reached!")
self.stop_control()
else:
raise Exception("Error: Cannot move to position: there is not controller attached to the robot!")

View File

@ -0,0 +1,85 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Mensa-Abend Telepräsenz</title>
<style>
div.meerschweinchen {
max-width: 600px;
margin: auto;
text-align: center;
}
div.arena {
height: 400px;
width: 600px;
background-image: url("https://p.0au.de/3d62ea7/foo.png");
border: none;
text-align: center;
position: relative
}
div.arena p {
margin: 0;
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%)
}
</style>
<script type=text/javascript src="{{url_for('static', filename='jquery-3.5.1.min.js') }}"></script>
</head>
<body>
<h1 style="text-align: center">Welcome to Mensa-Abend digital</h1>
<div style="text-align: center">
<p>Click anywhere to control the robots in our hackerspace. You can see them in the Zoom video.<br>
<br>
Please let us know about your study background.
</p>
</div>
<div class="meerschweinchen">
<div class=arena style="font-size: large;">
</div>
<form method="post">
<p>Choose your robot:</p>
<div>
<input type="radio" id="r11" name="robot_id" value="11">
<label for="r11">Robot 11</label>
</div>
<div>
<input type="radio" id="r12" name="robot_id" value="12">
<label for="r12">Robot 12</label>
</div>
<div>
<input type="radio" id="r13" name="robot_id" value="13">
<label for="r13">Robot 13</label>
</div>
<div>
<input type="radio" id="r14" name="robot_id" value="14">
<label for="r14">Robot 14</label>
</div>
</form>
</div>
</body>
<script>
function fireThis(e) {
var parentOffset = $(this).parent().offset();
var x = e.pageX - parentOffset.left;
var y = e.pageY - parentOffset.top;
var robot = document.querySelector('input[name = "robot_id"]:checked').value;
$.post( "/", { 'x': x, 'y': y , 'robot': robot} );
}
$('.arena').on('mousedown', fireThis);
function callRandom() {
var array = document.getElementsByName('robot_id');
array[{{ oldest_control }}].checked = true;
}
callRandom();
</script>
</html>