Compare commits

..

2 Commits

28 changed files with 813 additions and 2836 deletions

View File

@ -1,27 +0,0 @@
import socket
import json
HOST, PORT = "localhost", 42424
def move_grid(x, y, orientation, dimx, dimy):
print("move grid")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((HOST, PORT))
payload = json.dumps({"x": x, "y": y, "dimx": dimx, "dimy": dimy, "orientation": orientation})
sock.sendall(f"move_grid_blocking;{payload}\n".encode())
target_reached = False
try:
while not target_reached:
reply = sock.recv(1024)
if reply == b'':
raise RuntimeError
#print("got reply: ", reply)
target_reached = reply.decode() == 'ack\n'
print("target_reached = ", target_reached)
except RuntimeError:
# try again
print("controlling failed -> trying again")
move_grid(x, y, orientation, dimx, dimy)
if __name__ == "__main__":
move_grid(3,2,'v', 7, 4)

View File

@ -1,542 +0,0 @@
import numpy as np
import pygame
import os
import threading
from webapp import app
from event_server_comm import move_grid
BLACK = np.array([0, 0, 0], dtype=np.uint8)
WHITE = np.array([255, 255, 255], dtype=np.uint8)
GRAY = np.array([200, 200, 200], dtype=np.uint8)
RED = np.array([255, 0, 0], dtype=np.uint8)
BLUE = np.array([0, 0, 255], dtype=np.uint8)
YELLOW = np.array([255, 255, 0], dtype=np.uint8)
GREEN = np.array([0, 255, 0], dtype=np.uint8)
pygame.init()
pygame.font.init() # you have to call this at the start,
# if you want to use this module.
myfont = pygame.font.SysFont('Comic Sans MS', 55)
myfont_small = pygame.font.SysFont('Comic Sans MS', 45)
P0_text = myfont.render('P0', False, tuple(BLACK))
tiledt = np.dtype([('color', np.uint8, 3), ('star', np.bool)])
class Board:
valid_colors = [GRAY, RED, BLUE]
def __init__(self, dim_x, dim_y, n_coins=2, file=None):
if file is None:
self.tiles = np.zeros((dim_y, dim_x), dtype=tiledt)
for x in range(dim_x):
for y in range(dim_y):
self.tiles[y, x]['color'] = Board.valid_colors[np.random.randint(len(Board.valid_colors))]
coins_distributed = n_coins == 0
while not coins_distributed:
coinx = np.random.randint(0, dim_x)
coiny = np.random.randint(0, dim_y)
self.tiles[coiny,coinx]['star'] = True
coins_distributed = sum([t['star'] for t in self.tiles.flatten()]) == n_coins
else:
self.tiles = np.load(file)
def render(self, scale_fac):
dimy, dimx = self.tiles.shape
board_surf = pygame.Surface((dimx * scale_fac, dimy * scale_fac))
star_surf = pygame.Surface((scale_fac, scale_fac), pygame.SRCALPHA)
pygame.draw.circle(star_surf, YELLOW, (int(0.5 * scale_fac), int(0.5 * scale_fac)), int(0.25 * scale_fac))
for y in range(self.tiles.shape[0]):
for x in range(self.tiles.shape[1]):
pygame.draw.rect(board_surf, tuple(self.tiles[y, x]['color']),
(x * scale_fac, y * scale_fac, scale_fac, scale_fac), 0)
pygame.draw.rect(board_surf, (0, 0, 0),
(x * scale_fac, y * scale_fac, scale_fac, scale_fac), 1)
if self.tiles[y, x]['star']:
board_surf.blit(star_surf, (x * scale_fac, y * scale_fac, scale_fac, scale_fac))
return board_surf
def get_xdims(self):
return self.tiles.shape[1]
def get_ydims(self):
return self.tiles.shape[0]
# def __repr__(self):
# s = ''
# for y in range(self.tiles.shape[0]):
# for x in range(self.tiles.shape[1]):
# if (x,y) == (self.robot.x, self.robot.y):
# s += self.robot.orientation
# else:
# s += '.'
# s += '\n'
# return s
class Robot:
orientations = ['^', 'left', 'down', 'right']
resulting_orientation = {
'^': {'left': '<', 'right': '>'},
'>': {'left': '^', 'right': 'v'},
'v': {'left': '>', 'right': '<'},
'<': {'left': 'v', 'right': '^'},
}
def __init__(self, x, y, orientation, use_real_robot=False):
self.x = x
self.y = y
self.orientation = orientation
self.position_changed = False
self.use_real_robot = use_real_robot
def get_forward_coordinates(self):
# get the coordinates of the neighboring tile in the given direction
if self.orientation == '^':
return self.y - 1, self.x
elif self.orientation == '>':
return self.y, self.x + 1
elif self.orientation == 'v':
return self.y + 1, self.x
elif self.orientation == '<':
return self.y, self.x - 1
else:
raise Exception("error: undefined direction")
def get_angle(self):
angle = {'>': 0, '^': np.pi/2, '<': np.pi, 'v': 3*np.pi/2}[self.orientation]
return np.rad2deg(angle)
def render(self, scale_fac):
robot_surf = pygame.Surface((scale_fac, scale_fac), pygame.SRCALPHA)
pygame.draw.lines(robot_surf, (0, 0, 0), True, [(0.75 * scale_fac, 0.5 * scale_fac),
(0.25 * scale_fac, 0.25 * scale_fac),
(0.25 * scale_fac, 0.75 * scale_fac)], 3)
robot_surf = pygame.transform.rotate(robot_surf, self.get_angle())
return robot_surf
def update_pos(self, dimx, dimy):
if self.use_real_robot:
move_grid(self.x, self.y, self.orientation, dimx, dimy)
self.position_changed = False
def __repr__(self):
return f"({self.y}, {self.x}) - {self.orientation}"
class Command:
valid_actions = {'forward', 'left', 'right', 'P0', '-'}
def __init__(self, action=None, color=GRAY):
if not (action in Command.valid_actions and any([np.all(color == c) for c in Board.valid_colors])):
raise ValueError("invalid values for command")
self.action = action
self.color = color
def __repr__(self):
return f"{self.action}: {self.color}"
def render(self, scale_fac):
cmd_surf = pygame.Surface((scale_fac, scale_fac))
cmd_surf.fill(tuple(self.color))
arrow_surf = pygame.Surface((300, 300), pygame.SRCALPHA)
pygame.draw.polygon(arrow_surf, (0, 0, 0),
((0, 100), (0, 200), (200, 200), (200, 300), (300, 150), (200, 0), (200, 100)))
arrow_surf = pygame.transform.scale(arrow_surf, (int(0.9*scale_fac), int(0.9*scale_fac)))
if self.action == 'forward':
arrow_surf = pygame.transform.rotate(arrow_surf, 90)
elif self.action == 'left':
arrow_surf = pygame.transform.rotate(arrow_surf, 180)
if self.action in {'left', 'forward', 'right'}:
cmd_surf.blit(arrow_surf, (0.05*scale_fac,0.05*scale_fac,0.95*scale_fac,0.95*scale_fac))
elif self.action == 'P0':
cmd_surf.blit(P0_text, (0.05*scale_fac,0.05*scale_fac,0.95*scale_fac,0.95*scale_fac))
return cmd_surf
class Programmer:
def __init__(self, prg):
self.prg = prg
self.available_inputs = [Command('forward'), Command('left'), Command('right'), Command('P0'),
Command('-', color=RED), Command('-', color=BLUE), Command('-', color=GRAY)]
self.command_to_edit = 0
self.screen_rect = None
def render(self, scale_fac):
"""Render surface with possible inputs for the robot.
:return: surface of the input commands
"""
inp_surf = pygame.Surface((len(self.available_inputs) * scale_fac, 1 * scale_fac))
for i, inp in enumerate(self.available_inputs):
cmd_surf = inp.render(scale_fac)
inp_surf.blit(cmd_surf, (i * scale_fac, 0, scale_fac, scale_fac))
return inp_surf
def update_selected_command(self, pos):
print(f"clicked at pos = {pos}")
xoffset = pos[0] - self.screen_rect.x
selected_input_index = xoffset * len(self.available_inputs) // self.screen_rect.width
selected_input = self.available_inputs[selected_input_index]
edited_command = self.prg.cmds[self.command_to_edit]
print("command before edit = ", edited_command)
if selected_input.action != '-':
edited_command.action = selected_input.action
else:
edited_command.color = selected_input.color
print("command after edit = ", edited_command)
class Program:
def __init__(self, robot, board, cmds):
self.cmds = cmds
self.robot = robot
self.board = board
self.prg_counter = 0
self.screen_rect = None
def step(self, state='running', check_victory=True):
if self.prg_counter >= len(self.cmds):
return 'game_over'
cmd = self.cmds[self.prg_counter]
self.prg_counter += 1
# current position
x = self.robot.x
y = self.robot.y
# current tile the robot is on
tile = self.board.tiles[y, x]
# apply next instruction of the program
if np.all(cmd.color == GRAY) or np.all(cmd.color == tile['color']):
# matching color -> execute command
if cmd.action == 'forward':
ynew, xnew = self.robot.get_forward_coordinates()
self.robot.x = xnew
self.robot.y = ynew
self.robot.position_changed = True
#self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
elif cmd.action in {'left', 'right'}:
self.robot.orientation = Robot.resulting_orientation[self.robot.orientation][cmd.action]
#self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
self.robot.position_changed = True
elif cmd.action == 'P0':
self.prg_counter = 0
else:
#print("color not matching -> skipping command")
pass
# update state for new robot position
if (not (0 <= self.robot.x < self.board.tiles.shape[1])) or not (0 <= self.robot.y < self.board.tiles.shape[0]):
# robot leaves the board -> GAME OVER
print("GAME OVER")
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
return 'game_over'
# robot collects star on new tile
tile = self.board.tiles[self.robot.y, self.robot.x]
if tile['star']:
tile['star'] = False
if check_victory and all([not t['star'] for t in self.board.tiles.flatten()]):
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
print("YOU WON")
return 'won'
# by default we continue in the running state
return state
def render(self, scale_fac, prg_counter_override=None):
"""Render the current program. This will render all commands and highlight the next command to execute
(determined by self.prg_counter).
The prg_counter_override can be used to highlight a different command instead. This is used during input mode
to highlight the command the user will edit.
:param scale_fac:
:param prg_counter_override:
:return:
"""
prg_counter = self.prg_counter
if prg_counter_override is not None:
prg_counter = prg_counter_override
prg_surf = pygame.Surface((5 * scale_fac, 1 * scale_fac))
for i in range(5):
if i < len(self.cmds):
cmd = self.cmds[i]
cmd_surf = cmd.render(scale_fac)
else:
cmd_surf = pygame.Surface((scale_fac,scale_fac))
cmd_surf.fill(GRAY)
if prg_counter is not None and i == prg_counter:
pygame.draw.rect(cmd_surf, tuple(GREEN), (0, 0, scale_fac, scale_fac), 5)
prg_surf.blit(cmd_surf, (i * scale_fac, 0, scale_fac, scale_fac))
return prg_surf
def get_clicked_command(self, pos):
print(f"clicked at pos = {pos}")
xoffset = pos[0] - self.screen_rect.x
clicked_cmd = xoffset * len(self.cmds) // self.screen_rect.width
print("clicked command = ", clicked_cmd)
return clicked_cmd
class Game:
def __init__(self, dimx, dimy, robotx, roboty, orientation, use_real_robot=False):
self.robot = Robot(x=robotx, y=roboty, orientation=orientation, use_real_robot=use_real_robot)
#self.board = Board(dimx, dimy)
self.board = Board(dimx, dimy, file='levels/56.npy')
# TODO fix number of commands at 5
self.cmds = [Command('forward'), Command('left', color=RED), Command('left', color=BLUE), Command('P0'), Command('-')]
self.state = 'reset'
self.prg = Program(self.robot, self.board, self.cmds)
self.programmer = Programmer(self.prg)
#self.scale_fac = 180
self.scale_fac = 125
self.beamer_mode = False
self.screen = pygame.display.set_mode((int(self.board.tiles.shape[1] * self.scale_fac * 1.1),
int((self.board.tiles.shape[0] + 2) * self.scale_fac * 1.2)))
self.game_over_text = myfont.render('GAME OVER', False, WHITE)
self.won_text = myfont.render('YOU WON', False, BLACK)
self.run_text = myfont.render('RUN', False, tuple(BLACK))
self.stop_text = myfont_small.render('STOP', False, tuple(BLACK))
self.step_text = myfont_small.render('STEP', False, tuple(BLACK))
self.prg_text = myfont_small.render('CURRENT PROGRAM', False, tuple(GREEN))
# save initial state
self.initial_pos = (self.robot.x, self.robot.y, self.robot.orientation)
self.initial_board_tiles = self.board.tiles.copy()
def render(self):
"""Render the game screen.
This will render the board and the robot.
Depending on the display mode it will also render the current program and the input commands for the robot.
:param prg_counter:
:return:
"""
if self.beamer_mode:
dx = self.xoffset
dy = 0
else:
dx = int(0.05 * self.screen.get_width())
dy = int(0.05 * self.screen.get_height())
self.screen.fill(tuple(BLACK))
# render the board
board_surf = self.board.render(self.scale_fac)
# render robot onto the board surface
robot_surf = self.robot.render(self.scale_fac)
board_surf.blit(robot_surf, (self.robot.x * self.scale_fac, self.robot.y * self.scale_fac, self.scale_fac, self.scale_fac))
self.screen.blit(board_surf, (dx, dy, dx + self.board.tiles.shape[1] * self.scale_fac, dy + self.board.tiles.shape[0] * self.scale_fac))
# render program
if self.state == 'input':
# in input mode we highlight the command which is selected for edit
prg_surf = self.prg.render(self.scale_fac, prg_counter_override=self.programmer.command_to_edit)
else:
# in other modes we render the current program counter
prg_surf = self.prg.render(self.scale_fac)
prg_surf = pygame.transform.scale(prg_surf, (self.screen.get_width() * 2 // 3, self.scale_fac * 2 // 3))
self.prg.screen_rect = pygame.Rect(
(dx, board_surf.get_height() + 2 * dy, prg_surf.get_width(), prg_surf.get_height()))
# render input fields and buttons
inp_surf = self.programmer.render(self.scale_fac)
inp_surf = pygame.transform.scale(inp_surf, (self.screen.get_width() * 2 // 3, self.scale_fac * 2 // 3))
self.programmer.screen_rect = pygame.Rect(
(dx, board_surf.get_height() + prg_surf.get_height() + 3 * dy, inp_surf.get_width(), inp_surf.get_height()))
btn_surf = pygame.Surface((3 * self.scale_fac // 2, self.scale_fac))
self.btn_rect = pygame.Rect((2 * dx + prg_surf.get_width(), board_surf.get_height() + 2 * dy,
btn_surf.get_height(), btn_surf.get_width()))
if self.state == 'input':
btn_surf.fill(tuple(GREEN))
btn_surf.blit(self.run_text, (0, 10))
elif self.state == 'running':
btn_surf.fill(tuple(RED))
btn_surf.blit(self.stop_text, (0, 10))
elif self.state == 'stepping':
btn_surf.fill(tuple(YELLOW))
btn_surf.blit(self.step_text, (0, 10))
if not self.beamer_mode:
# if we are not in beamer mode we render program and inputs below the board
self.screen.blit(prg_surf, self.prg.screen_rect)
self.screen.blit(inp_surf, self.programmer.screen_rect)
self.screen.blit(btn_surf, self.btn_rect)
else:
prg_surf = pygame.transform.scale(prg_surf, (dx, dx//5))
# in beamer mode we render the program to the left of the board to appear on the laptop
self.screen.blit(prg_surf, (0,100))
prg_descr_surb = pygame.Surface((500, 100))
#prg_descr_surb.blit(self.prg_text)
self.screen.blit(self.prg_text, (50, 50))
mode_text = myfont_small.render(f'STATE: {self.state}', False, tuple(GREEN))
self.screen.blit(mode_text, (50, dx//5 + 350))
# render messages
if self.state == 'game_over':
game_over_surf = pygame.Surface(((self.screen.get_width() - dx) // 2, self.screen.get_height() // 2))
game_over_surf.fill(tuple(BLACK))
game_over_surf.blit(self.game_over_text, ((game_over_surf.get_width() - self.game_over_text.get_width()) // 2,
(game_over_surf.get_height() - self.game_over_text.get_height()) // 2))
self.screen.blit(game_over_surf, (dx + (self.screen.get_width() - dx) // 4, self.screen.get_height() // 4))
pygame.display.update()
pygame.time.wait(1500)
self.state = 'reset'
elif self.state == 'won':
won_surf = pygame.Surface(((self.screen.get_width() - dx) // 2, self.screen.get_height() // 2))
won_surf.fill(tuple(WHITE))
won_surf.blit(self.won_text,
((won_surf.get_width() - self.won_text.get_width()) // 2,
(won_surf.get_height() - self.won_text.get_height()) // 2))
self.screen.blit(won_surf, (dx + (self.screen.get_width() - dx) // 4, self.screen.get_height() // 4))
pygame.display.update()
pygame.time.wait(1500)
self.state = 'reset'
pygame.display.flip()
def process_inputs(self):
# proceed events
for event in pygame.event.get():
if event.type == pygame.QUIT:
self.state = 'quit'
# handle MOUSEBUTTONUP
elif event.type == pygame.MOUSEBUTTONUP:
pos = pygame.mouse.get_pos()
# select command to edit by the programmer
if self.prg.screen_rect is not None:
if self.prg.screen_rect.collidepoint(pos):
print(f"clicked at pos = {pos}")
self.programmer.command_to_edit = self.prg.get_clicked_command(pos)
# set the selected command to a new value
if self.programmer.screen_rect is not None:
if self.programmer.screen_rect.collidepoint(pos):
self.programmer.update_selected_command(pos)
# clicked RUN/STOP button
if self.btn_rect is not None and self.btn_rect.collidepoint(*pos):
print(f"clicked at pos = {pos}")
if self.state == 'running':
self.state = 'input'
elif self.state == 'input':
self.state = 'running'
elif self.state == 'stepping':
self.state = self.prg.step(self.state)
elif event.type == pygame.KEYUP:
if event.key == pygame.K_x:
if not self.beamer_mode:
# switch to beamer mode
self.xoffset = 1000
os.environ['SDL_VIDEO_WINDOW_POS'] = f'{1920-self.xoffset}, 280'
self.scale_fac = 180
self.screen = pygame.display.set_mode((self.xoffset + self.board.tiles.shape[1] * self.scale_fac,
self.board.tiles.shape[0] * self.scale_fac),
pygame.NOFRAME)
self.beamer_mode = True
else:
# switch to normal mode
os.environ['SDL_VIDEO_WINDOW_POS'] = '0, 0'
self.scale_fac = 125
self.screen = pygame.display.set_mode((int(self.board.tiles.shape[1] * self.scale_fac * 1.1),
int((self.board.tiles.shape[0] + 2) * self.scale_fac * 1.2)))
self.beamer_mode = False
elif event.key == pygame.K_s:
# run program
self.state = 'running'
elif event.key == pygame.K_SPACE:
if self.state != 'stepping':
self.state = 'stepping'
else:
self.state = self.prg.step(self.state)
elif event.key == pygame.K_r:
self.state = 'reset'
elif event.key == pygame.K_n:
self.initial_board_tiles = Board(self.board.get_xdims(), self.board.get_ydims()).tiles.copy()
self.state = 'reset'
elif event.type == pygame.USEREVENT:
for i, cmd in enumerate(event.cmds):
self.cmds[i].action = cmd.action
self.cmds[i].color = np.array(cmd.color, dtype=np.uint8)
self.reset()
self.state = 'running'
return self.state
def reset(self):
self.prg.prg_counter = 0
self.robot.x = self.initial_pos[0]
self.robot.y = self.initial_pos[1]
self.robot.orientation = self.initial_pos[2]
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
self.board.tiles = self.initial_board_tiles.copy()
return 'input'
def run(self):
running = True
while running:
self.state = self.process_inputs()
if self.state == 'input':
pass
elif self.state == 'running':
self.state = self.prg.step()
elif self.state == 'reset':
self.state = self.reset()
elif self.state == 'quit':
running = False
elif self.state == 'stepping':
pass
elif self.state == 'game_over' or self.state == 'won':
pass
else:
print("unknown state")
return
self.render()
if self.robot.position_changed:
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
pygame.time.wait(100)
if __name__ == "__main__":
# launch webapp in thread
webserver_thread = threading.Thread(target=app.run, kwargs={'host': '0.0.0.0', 'port': 5000})
webserver_thread.start()
seed = 4
np.random.seed(seed)
game = Game(dimx=7, dimy=4, robotx=5, roboty=1, orientation='>', use_real_robot=False)
game.run()
# TODOs
# - in stepping mode (s) it is possible to drive outside of the map
# - when no P0 command is present the program counter will overflow

View File

@ -1,124 +0,0 @@
from gauss_turing import Program, Board, Command, Robot
import numpy as np
import json
levels = {
0: lambda cmds: len(cmds) <= 3,
1: lambda cmds: len(cmds) == 3
}
def rate_level(robot_path, cmds, board):
path_length = len(set(robot_path))
n_cmds = len(cmds)
n_cmd_colors = len(set([tuple(c.color) for c in cmds]))
difficulty = (path_length - 1) * (n_cmds + n_cmd_colors)
# place coins on the robot path to create a solution
if difficulty > 0:
n_coins = np.random.randint(1, min(path_length, 5)) - 1
# put one coin on last tile visited
coins = [robot_path[-1]]
# distribute other coins randomly on the path
# path without first and list tile
unique_tiles = list(set(robot_path) - {robot_path[-1]} - {robot_path[0]})
for _ in range(n_coins):
c = np.random.randint(0, len(unique_tiles))
new_coin = unique_tiles.pop(c)
coins.append(new_coin)
pass
else:
coins = []
return difficulty, coins
def generate_level(dimx, dimy, max_steps=100):
n_cmds = np.random.randint(2, 6)
assert n_cmds <= 5
# generate random board without any coins
board = Board(dimx, dimy, n_coins=0)
cmds = []
actions = list(sorted(Command.valid_actions - {'-'}))
# generate random commands
for i in range(n_cmds):
action = np.random.choice(actions)
color = Board.valid_colors[np.random.randint(len(Board.valid_colors))]
cmds.append(Command(action, color))
# generate robot at random position
rx = np.random.randint(0, dimx-1)
ry = np.random.randint(0, dimy-1)
orientation = np.random.choice(['>','v','<','^'])
r = Robot(rx, ry, orientation)
prg = Program(r, board, cmds)
continue_running = True
state = 'running'
prg_counter_old = prg.prg_counter
robot_path = [(r.x, r.y)]
step = 0
while continue_running and step < max_steps:
#print(f"prg_counter = {prg.prg_counter} - robot: {r} - state: {state}")
state = prg.step(state, check_victory=False)
robot_path.append((r.x, r.y))
stuck = prg.prg_counter == prg_counter_old
prg_counter_old = prg.prg_counter
if state == 'game_over' or stuck:
continue_running = False
step += 1
last_pos = robot_path[-1]
if not ((0 <= last_pos[0] < dimx) and (0 <= last_pos[1] < dimy)):
# remove last entry of path if robot leaves the board
robot_path.pop(-1)
difficulty, coins = rate_level(robot_path, cmds, board)
# put coins on the board
for coin in coins:
board.tiles[coin[1], coin[0]]['star'] = True
n_coins = len(coins)
return difficulty, board, n_coins, set(robot_path), (rx, ry, orientation), cmds
if __name__ == "__main__":
np.random.seed(2)
levels = {}
for i in range(100):
diff, board, n_coins, robot_path, init_robot_pos, solution = generate_level(7, 4)
if diff > 0:
print("difficulty: ", diff, "n_coins: ", n_coins, "path length: ", len(robot_path))
if diff in levels:
if n_coins > levels[diff]['n_coins'] and len(robot_path) > levels[diff]['path_length']:
levels[diff] = {'board': board, 'init_robot_pos': init_robot_pos, 'solution': solution,
'n_coins': n_coins, 'path_length': len(robot_path)}
else:
levels[diff] = {'board': board, 'init_robot_pos': init_robot_pos, 'solution': solution,
'n_coins': n_coins, 'path_length': len(robot_path)}
level_info = {}
for l, data in levels.items():
np.save(f'levels/{l}.npy', data['board'].tiles)
sol = [(cmd.action, tuple(map(int, cmd.color))) for cmd in data['solution']]
level_info[l] = {'init_robot_pos': data['init_robot_pos'], 'solution': sol,
'n_coins': int(data['n_coins']), 'path_length': int(data['path_length']),
'file': f'levels/{l}.npy'}
with open('levels/level_info.json', 'w') as f:
json.dump(level_info, f, indent=4)
pass

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.4 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 6.9 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 4.3 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 9.2 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 9.2 KiB

View File

@ -1,61 +0,0 @@
.cmd-canvas {
outline: 3px solid #dddddd;;
width: 430px;
height: 120px;
margin: 50px auto 0;
position: relative;
}
.programmer-canvas {
outline: 3px solid #dddddd;;
width: 338px;
height: 220px;
margin: 50px auto 0;
position: relative;
}
.register {
outline: 3px dashed #dddddd;;
width: 120px;
height: 240px;
margin: 0px auto 0;
position: relative;
}
.box{
width: 62px;
height: 100px;
position: absolute !important;
top: 100px;
font-size: 15px;
color: #000000;
line-height: 25px;
text-align: center;
cursor: move;
}
.top-buffer { margin-top:20px; }
.center {
margin: 20px;
position: absolute;
left: 50%;
-ms-transform: translateX(-50%, -50%);
transform: translateX(-50%, -50%);
}
.cmd0 { left: 0; top: 0; background-color: #E74C3C; position: absolute !important;}
.cmd1 { left: 92px; top: 0; background-color: #8E44AD; position: absolute !important;}
.cmd2 { left: 184px; top: 0; background-color: #5DADE2; }
.cmd3 { left: 276px; top: 0; background-color: #1ABC9C; }
.cmd4 { left: 368px; top: 0; background-color: #F1C40F; }
.programmer_cmd0 { left: 0; top: 0; background-color: #E74C3C; position: absolute !important;}
.programmer_cmd1 { left: 92px; top: 0; background-color: #8E44AD; position: absolute !important;}
.programmer_cmd2 { left: 184px; top: 0; background-color: #5DADE2; }
.programmer_cmd3 { left: 276px; top: 0; background-color: #1ABC9C; }
.programmer_cmd4 { left: 38px; top: 120px; background-color: #F1C40F; }
.programmer_cmd5 { left: 138px; top: 120px; background-color: #F39C12; }
.programmer_cmd6 { right: 38px; top: 120px; background-color: #34495E; }
.card_hidden { background-color: #dddddd; }

View File

@ -1,136 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<title>imaginaerraum.de</title>
<script src="static/jquery-3.5.1.js"></script>
<script src="static/jquery-ui.min.js"></script>
<script src="static/bootstrap.min.js"></script>
<link rel="stylesheet" href="static/bootstrap.css">
<link rel="stylesheet" href="static/style.css">
</head>
<body>
<!-- container -->
<div class="container">
<!-- current program -->
<div class="row top-buffer">
Aktuelles Programm:
<div class="cmd-canvas">
{% for cmd in current_program %}
<div class="box cmd{{ loop.index0 }}" id="prg_cmd{{ loop.index0 }}" data-action="{{ cmd.action }}" data-color="{{ cmd.color }}" data-img="prg_cmd_img{{ loop.index0 }}"
style="background-color: rgba({{ cmd.color[0] }},{{ cmd.color[1] }},{{ cmd.color[2] }},0.85)">
<img id="prg_cmd_img{{ loop.index0 }}" src="static/{{ cmd.action }}.png">
{{ loop.index0 }}
</div>
{% endfor %}
</div>
</div>
<!-- possible commands -->
<div class="row top-buffer">
Mögliche Befehle:
<div class="programmer-canvas">
{% for cmd in valid_commands %}
<div class="box programmer_cmd{{ loop.index0 }}" id="valid_cmd{{ loop.index0 }}" data-action="{{ cmd.action }}"
data-color="{{ cmd.color }}" data-img="valid_cmd_img{{ loop.index0 }}"
style="background-color: rgba({{ cmd.color[0] }},{{ cmd.color[1] }},{{ cmd.color[2] }},0.85)">
<img id="valid_cmd_img{{ loop.index0 }}" src="static/{{ cmd.action }}.png">
</div>
{% endfor %}
</div>
</div>
<div class="row top-buffer center">
<div class="col-sm">
<input class="btn btn-primary" id="btnSubmit" type="submit" value="Befehle abschicken" />
</div>
<div class="col-sm">
<div id = "alert_placeholder"></div>
</div>
</div>
</div><!-- container -->
<script>
$(document).ready(function () {
var please_wait = $(".notification").hide();
var box = $(".box");
bootstrap_alert = function() {};
bootstrap_alert.warning = function(message) {
$('#alert_placeholder').html('<div class="alert alert-primary"><a class="close" data-dismiss="alert">×</a><span>'+message+'</span></div>')
};
$("#btnSubmit").click(function () {
//alert("button");
var cmds = {
{% for i in range(5) %}
"cmd{{ i }}": {"color": document.getElementById("prg_cmd{{ i }}").getAttribute("data-color"),
"action": document.getElementById("prg_cmd{{ i }}").getAttribute("data-action")},
{% endfor %}
}
//bootstrap_alert.warning('Bitte warten bis alle Spieler ihre Aktion gewählt haben!'); //$(please_wait).show();
var cmds_json = JSON.stringify(cmds);
$.post("/send_cmds", {"cmds_json": cmds_json}, function (data) {
console.log(data);
/*
if (data === 'OK') {
location.reload(); // reload page to get new cards for next round
}
else {
console.log('waiting...')
$.get("/send_cmds", "", function (data) {
if (data === 'OK') {
location.reload(); // reload page to get new cards for next round
}
})
}
*/
});
});
var edited_command = null;
box.click(function () {
//debugger
if ( this.id.includes('prg_cmd') ) {
// program card clicked -> select clicked card for editing
edited_command = this;
}
else if (this.id.includes('valid_cmd')) {
// progamming card clicked -> edit currently selected card
var this_card_action = this.getAttribute('data-action');
var this_card_color = this.getAttribute('data-color');
if (!!edited_command) { // only if there is a card selected
console.log("editing command " + edited_command);
if (this_card_action === "-") {
// set color
edited_command.setAttribute("data-color", this_card_color);
edited_command.style["backgroundColor"] = this.style["backgroundColor"];
}
else {
// set action
edited_command.setAttribute("data-action", this_card_action)
var edited_cmd_img = document.getElementById(edited_command.getAttribute("data-img"));
var prg_img = document.getElementById(this.getAttribute("data-img"));
edited_cmd_img.src = prg_img.src;
}
}
}
});
});
</script>
</body>
</html>

View File

@ -1,163 +0,0 @@
from flask import Flask, render_template, request, session, make_response
import socket
import time
import numpy as np
from playsound import playsound
from itertools import zip_longest
import random
import json
import pygame
app = Flask(__name__)
app.secret_key = b'RoboRallyRolling'
probabilities = [0.21428571428571427, 0.14285714285714285, 0.07142857142857142, 0.07142857142857142,
0.21428571428571427, 0.21428571428571427, 0.07142857142857142]
#deck = roborally.CardDeck()
class Cmd:
possible_moves = ['forward', 'left', 'right', 'P0']
possible_colors = [(200,200,200), (255, 0, 0), (0,0,255)]
def __init__(self, action, color):
self.action = action
self.color = color
def __str__(self):
return "Cmd No. " + self.action
def __repr__(self):
return self.action + " " + str(self.color)
available_robots = {
0: {'id': 11, 'ip': '192.168.1.11', 'x': 1, 'y': 2, 'orientation': '>'},
1: {'id': 12, 'ip': '192.168.1.12', 'x': 3, 'y': 3, 'orientation': '>'}
}
players = {}
class Player:
MAX_PLAYERS = 4
player_counter = 0
def __init__(self):
if Player.player_counter < Player.MAX_PLAYERS:
self.id = Player.player_counter
Player.player_counter += 1
self.max_cards = 9
self.player_hand = deck.draw_cards(self.max_cards)
print("current hand: ", [str(c) for c in self.player_hand])
self.action_count = 5
self.action_chosen = False
self.initialize_robot()
else:
print("max players reached!")
def initialize_robot(self):
x = available_robots[self.id]['x']
y = available_robots[self.id]['y']
marker_id = available_robots[self.id]['id']
ip = available_robots[self.id]['ip']
orientation = available_robots[self.id]['orientation']
self.robot = game.board.create_robot(x, y, orientation, self.id, marker_id)
if game.comm_socket is not None:
cmd = f"initialize_robot, {marker_id}, {ip}, {x-1}, {y-1}, {orientation}\n"
game.comm_socket.sendall(cmd.encode())
data = game.comm_socket.recv(32)
if data.decode() == 'OK\n':
print("robot sucessfully initialized!")
else:
print("error: could not initialize robot!")
self.robot = None
def draw_new_cards(self):
self.player_hand += deck.draw_cards(self.max_cards - len(self.player_hand))
@app.route('/send_cmds', methods=['POST', 'GET'])
def send_cmds():
if request.method == 'POST':
cmds = json.loads(request.form['cmds_json'])
cmd_list = []
for cmd_nr in [f"cmd{i}" for i in range(5)]:
cmd = cmds[cmd_nr]
action = cmd['action']
color_str = cmd['color']
color_str = color_str.strip("()").split(",")
color = tuple(map(int, color_str))
cmd_list.append(Cmd(action, color))
print("got commands: ", cmd_list)
ev = pygame.event.Event(pygame.USEREVENT, {'cmds': cmd_list})
pygame.event.post(ev)
# send commands to the game
# if game.ready():
# game.process_actions()
#
# return 'OK'
# else:
# return 'please wait'
return "OK"
elif request.method == 'GET':
# GET is used when we have to wait for other players to finish
while not game.processing_done: # wait for other players to choose commands and processing to finish
pass
return 'OK'
WHITE = (255, 255, 255)
GRAY = (200, 200, 200)
RED = (255, 0, 0)
BLUE = (0, 0, 255)
@app.route('/', methods=['GET', 'POST'])
def hello_world():
prg = [Cmd('forward', GRAY), Cmd('left', GRAY), Cmd('right', BLUE),
Cmd('P0', GRAY), Cmd('right', RED)]
valid_cmds = [Cmd('forward', WHITE), Cmd('right', WHITE), Cmd('left', WHITE),
Cmd('P0', WHITE), Cmd('-', RED), Cmd('-', BLUE), Cmd('-', GRAY)]
if request.method == 'GET':
robot_info = None
return render_template('drag_example.html', current_program=prg, valid_commands=valid_cmds, robot_info=robot_info)
elif request.method == 'POST':
# print(request.form)
if request.form.get('drag') and request.form.get('drop'):
# swap cards in the current hand
i1 = int(request.form.get('drag')) # number of first card
i2 = int(request.form.get('drop')) # number of second card
card1 = deck.deck[i1] # get card by number
card2 = deck.deck[i2]
print("swapping {} and {}".format(card1, card2))
j1 = player_hand.index(card1) # get index of card in the hand
j2 = player_hand.index(card2)
player_hand[j1], player_hand[j2] = player_hand[j2], player_hand[j1] # swap the cards in the list
# print("current hand: ", [str(c) for c in player_hand[player_id]])
return 'OK'
else:
return render_template('drag_example.html', cmds=player_hand, player_id=player_id)
if __name__ == '__main__':
#app.run(host='192.168.1.222', port=5000)
app.run(host='0.0.0.0', port=5000)

View File

@ -1,15 +0,0 @@
import socket
import json
HOST, PORT = "localhost", 42424
def move_grid(x, y, orientation, dimx, dimy):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((HOST, PORT))
payload = json.dumps({"x": x, "y": y, "dimx": dimx, "dimy": dimy, "orientation": orientation})
sock.sendall(f"move_grid_blocking;{payload}\n".encode())
reply = sock.recv(1024)
print(reply)
if __name__ == "__main__":
move_grid(1,1,'^', 5, 5)

View File

@ -1,478 +0,0 @@
import numpy as np
import random
import pygame
import os
from event_server_comm import move_grid
BLACK = np.array([0, 0, 0], dtype=np.uint8)
WHITE = np.array([255, 255, 255], dtype=np.uint8)
RED = np.array([255, 0, 0], dtype=np.uint8)
BLUE = np.array([0, 0, 255], dtype=np.uint8)
YELLOW = np.array([255, 255, 0], dtype=np.uint8)
GREEN = np.array([0, 255, 0], dtype=np.uint8)
pygame.init()
pygame.font.init() # you have to call this at the start,
# if you want to use this module.
myfont = pygame.font.SysFont('Comic Sans MS', 55)
myfont_small = pygame.font.SysFont('Comic Sans MS', 45)
P0_text = myfont.render('P0', False, tuple(BLACK))
tiledt = np.dtype([('x', np.uint8), ('y', np.uint8), ('color', np.uint8, 3), ('star', np.bool)])
class Board:
valid_colors = [WHITE, RED, BLUE]
def __init__(self, dim_x, dim_y):
self.tiles = np.zeros((dim_y, dim_x), dtype=tiledt)
for x in range(dim_x):
for y in range(dim_y):
self.tiles[y, x]['color'] = random.choice(Board.valid_colors)
def render(self, scale_fac):
dimy, dimx = self.tiles.shape
board_surf = pygame.Surface((dimx * scale_fac, dimy * scale_fac))
star_surf = pygame.Surface((scale_fac, scale_fac), pygame.SRCALPHA)
pygame.draw.circle(star_surf, YELLOW, (int(0.5 * scale_fac), int(0.5 * scale_fac)), int(0.25 * scale_fac))
for y in range(self.tiles.shape[0]):
for x in range(self.tiles.shape[1]):
pygame.draw.rect(board_surf, tuple(self.tiles[y, x]['color']),
(x * scale_fac, y * scale_fac, scale_fac, scale_fac), 0)
pygame.draw.rect(board_surf, (0, 0, 0),
(x * scale_fac, y * scale_fac, scale_fac, scale_fac), 1)
if self.tiles[y, x]['star']:
board_surf.blit(star_surf, (x * scale_fac, y * scale_fac, scale_fac, scale_fac))
return board_surf
def get_xdims(self):
return self.tiles.shape[1]
def get_ydims(self):
return self.tiles.shape[0]
# def __repr__(self):
# s = ''
# for y in range(self.tiles.shape[0]):
# for x in range(self.tiles.shape[1]):
# if (x,y) == (self.robot.x, self.robot.y):
# s += self.robot.orientation
# else:
# s += '.'
# s += '\n'
# return s
class Robot:
orientations = ['^', 'left', 'down', 'right']
resulting_orientation = {
'^': {'left': '<', 'right': '>'},
'>': {'left': '^', 'right': 'v'},
'v': {'left': '>', 'right': '<'},
'<': {'left': 'v', 'right': '^'},
}
def __init__(self, x, y, orientation):
self.x = x
self.y = y
self.orientation = orientation
def get_forward_coordinates(self):
# get the coordinates of the neighboring tile in the given direction
if self.orientation == '^':
return self.y - 1, self.x
elif self.orientation == '>':
return self.y, self.x + 1
elif self.orientation == 'v':
return self.y + 1, self.x
elif self.orientation == '<':
return self.y, self.x - 1
else:
raise Exception("error: undefined direction")
def get_angle(self):
angle = {'>': 0, '^': np.pi/2, '<': np.pi, 'v': 3*np.pi/2}[self.orientation]
return np.rad2deg(angle)
def render(self, scale_fac):
robot_surf = pygame.Surface((scale_fac, scale_fac), pygame.SRCALPHA)
pygame.draw.lines(robot_surf, (0, 0, 0), True, [(0.75 * scale_fac, 0.5 * scale_fac),
(0.25 * scale_fac, 0.25 * scale_fac),
(0.25 * scale_fac, 0.75 * scale_fac)], 3)
robot_surf = pygame.transform.rotate(robot_surf, self.get_angle())
return robot_surf
def update_pos(self, dimx, dimy):
move_grid(self.x, self.y, self.orientation, dimx, dimy)
def __repr__(self):
return f"({self.y}, {self.x}) - {self.orientation}"
class Command:
valid_actions = {'forward', 'left', 'right', 'P0', '-'}
def __init__(self, action=None, color=WHITE):
if not (action in Command.valid_actions and any([np.all(color == c) for c in Board.valid_colors])):
raise ValueError("invalid values for command")
self.action = action
self.color = color
def __repr__(self):
return f"{self.action}: {self.color}"
def render(self, scale_fac):
cmd_surf = pygame.Surface((scale_fac, scale_fac))
cmd_surf.fill(tuple(self.color))
arrow_surf = pygame.Surface((300, 300), pygame.SRCALPHA)
pygame.draw.polygon(arrow_surf, (0, 0, 0),
((0, 100), (0, 200), (200, 200), (200, 300), (300, 150), (200, 0), (200, 100)))
arrow_surf = pygame.transform.scale(arrow_surf, (int(0.9*scale_fac), int(0.9*scale_fac)))
if self.action == 'forward':
arrow_surf = pygame.transform.rotate(arrow_surf, 90)
elif self.action == 'left':
arrow_surf = pygame.transform.rotate(arrow_surf, 180)
if self.action in {'left', 'forward', 'right'}:
cmd_surf.blit(arrow_surf, (0.05*scale_fac,0.05*scale_fac,0.95*scale_fac,0.95*scale_fac))
elif self.action == 'P0':
cmd_surf.blit(P0_text, (0.05*scale_fac,0.05*scale_fac,0.95*scale_fac,0.95*scale_fac))
return cmd_surf
class Programmer:
def __init__(self, prg):
self.prg = prg
self.available_inputs = [Command('forward'), Command('left'), Command('right'), Command('P0'),
Command('-', color=RED), Command('-', color=BLUE), Command('-', color=WHITE)]
self.command_to_edit = 0
self.screen_rect = None
def render(self, scale_fac):
"""Render surface with possible inputs for the robot.
:return: surface of the input commands
"""
inp_surf = pygame.Surface((len(self.available_inputs) * scale_fac, 1 * scale_fac))
for i, inp in enumerate(self.available_inputs):
cmd_surf = inp.render(scale_fac)
inp_surf.blit(cmd_surf, (i * scale_fac, 0, scale_fac, scale_fac))
return inp_surf
def update_selected_command(self, pos):
print(f"clicked at pos = {pos}")
xoffset = pos[0] - self.screen_rect.x
selected_input_index = xoffset * len(self.available_inputs) // self.screen_rect.width
selected_input = self.available_inputs[selected_input_index]
edited_command = self.prg.cmds[self.command_to_edit]
print("command before edit = ", edited_command)
if selected_input.action != '-':
edited_command.action = selected_input.action
else:
edited_command.color = selected_input.color
print("command after edit = ", edited_command)
class Program:
def __init__(self, robot, board, cmds):
self.cmds = cmds
self.robot = robot
self.board = board
self.prg_counter = 0
self.screen_rect = None
def step(self, state='running'):
cmd = self.cmds[self.prg_counter]
self.prg_counter += 1
# current position
x = self.robot.x
y = self.robot.y
# current tile the robot is on
tile = self.board.tiles[y, x]
# apply next instruction of the program
if np.all(cmd.color == WHITE) or np.all(cmd.color == tile['color']):
# matching color -> execute command
if cmd.action == 'forward':
ynew, xnew = self.robot.get_forward_coordinates()
self.robot.x = xnew
self.robot.y = ynew
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
elif cmd.action in {'left', 'right'}:
self.robot.orientation = Robot.resulting_orientation[self.robot.orientation][cmd.action]
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
elif cmd.action == 'P0':
self.prg_counter = 0
else:
print("color not matching -> skipping command")
# update state for new robot position
if (not (0 <= self.robot.x < self.board.tiles.shape[1])) or not (0 <= self.robot.y < self.board.tiles.shape[0]):
# robot leaves the board -> GAME OVER
print("GAME OVER")
return 'game_over'
# robot collects star on new tile
tile = self.board.tiles[self.robot.y, self.robot.x]
if tile['star']:
tile['star'] = False
if all([not t['star'] for t in self.board.tiles.flatten()]):
print("YOU WON")
return 'won'
# by default we continue in the running state
return state
def render(self, scale_fac, prg_counter_override=None):
"""Render the current program. This will render all commands and highlight the next command to execute
(determined by self.prg_counter).
The prg_counter_override can be used to highlight a different command instead. This is used during input mode
to highlight the command the user will edit.
:param scale_fac:
:param prg_counter_override:
:return:
"""
prg_counter = self.prg_counter
if prg_counter_override is not None:
prg_counter = prg_counter_override
prg_surf = pygame.Surface((5 * scale_fac, 1 * scale_fac))
for i in range(5):
if i < len(self.cmds):
cmd = self.cmds[i]
cmd_surf = cmd.render(scale_fac)
else:
cmd_surf = pygame.Surface((scale_fac,scale_fac))
cmd_surf.fill(WHITE)
if prg_counter is not None and i == prg_counter:
pygame.draw.rect(cmd_surf, tuple(GREEN), (0, 0, scale_fac, scale_fac), 5)
prg_surf.blit(cmd_surf, (i * scale_fac, 0, scale_fac, scale_fac))
return prg_surf
def get_clicked_command(self, pos):
print(f"clicked at pos = {pos}")
xoffset = pos[0] - self.screen_rect.x
clicked_cmd = xoffset * len(self.cmds) // self.screen_rect.width
print("clicked command = ", clicked_cmd)
return clicked_cmd
class Game:
def __init__(self, dimx, dimy, robotx, roboty):
self.robot = Robot(x=robotx, y=roboty, orientation='v')
self.board = Board(dimx, dimy)
coin1x = np.random.randint(0, dimx)
coin1y = np.random.randint(0, dimy)
self.board.tiles[coin1y,coin1x]['star'] = True
self.board.tiles[3,2]['star'] = True
# TODO fix number of commands at 5
self.cmds = [Command('forward'), Command('left', color=RED), Command('left', color=BLUE), Command('P0'), Command('-')]
self.state = 'reset'
self.prg = Program(self.robot, self.board, self.cmds)
self.programmer = Programmer(self.prg)
#self.scale_fac = 180
self.scale_fac = 125
self.beamer_mode = False
self.screen = pygame.display.set_mode((int(self.board.tiles.shape[1] * self.scale_fac * 1.1),
int((self.board.tiles.shape[0] + 2) * self.scale_fac * 1.2)))
self.game_over_text = myfont.render('GAME OVER', False, RED)
self.won_text = myfont.render('YOU WON', False, GREEN)
self.run_text = myfont.render('RUN', False, tuple(BLACK))
self.stop_text = myfont_small.render('STOP', False, tuple(BLACK))
self.step_text = myfont_small.render('STEP', False, tuple(BLACK))
# save initial state
self.initial_pos = (self.robot.x, self.robot.y, self.robot.orientation)
self.inital_board_tiles = self.board.tiles.copy()
def render(self):
"""Render the game screen.
This will render the board and the robot.
Depending on the display mode it will also render the current program and the input commands for the robot.
:param prg_counter:
:return:
"""
if self.beamer_mode:
dx = 0
dy = 0
else:
dx = int(0.05 * self.screen.get_width())
dy = int(0.05 * self.screen.get_height())
self.screen.fill(tuple(BLACK))
# render the board
board_surf = self.board.render(self.scale_fac)
# render robot onto the board surface
robot_surf = self.robot.render(self.scale_fac)
board_surf.blit(robot_surf, (self.robot.x * self.scale_fac, self.robot.y * self.scale_fac, self.scale_fac, self.scale_fac))
self.screen.blit(board_surf, (dx, dy, dx + self.board.tiles.shape[1] * self.scale_fac, dy + self.board.tiles.shape[0] * self.scale_fac))
if not self.beamer_mode:
# if we are not in beamer mode we also show the current program / inputs
# render program
if self.state == 'input':
# in input mode we highlight the command which is selected for edit
prg_surf = self.prg.render(self.scale_fac, prg_counter_override=self.programmer.command_to_edit)
else:
# in other modes we render the current program counter
prg_surf = self.prg.render(self.scale_fac)
prg_surf = pygame.transform.scale(prg_surf, (self.screen.get_width() * 2 // 3, self.scale_fac * 2 // 3))
self.prg.screen_rect = pygame.Rect((dx, board_surf.get_height() + 2 * dy, prg_surf.get_width(), prg_surf.get_height()))
self.screen.blit(prg_surf, self.prg.screen_rect)
# render input fields and buttons
inp_surf = self.programmer.render(self.scale_fac)
inp_surf = pygame.transform.scale(inp_surf, (self.screen.get_width() * 2 // 3, self.scale_fac * 2 // 3))
self.programmer.screen_rect = pygame.Rect((dx, board_surf.get_height() + prg_surf.get_height() + 3 * dy, inp_surf.get_width(), inp_surf.get_height()))
self.screen.blit(inp_surf, self.programmer.screen_rect)
btn_surf = pygame.Surface((3 * self.scale_fac//2, self.scale_fac))
self.btn_rect = pygame.Rect((2 * dx + prg_surf.get_width(), board_surf.get_height() + 2 * dy,
btn_surf.get_height(), btn_surf.get_width()))
if self.state == 'input':
btn_surf.fill(tuple(GREEN))
btn_surf.blit(self.run_text, (0, 10))
elif self.state == 'running':
btn_surf.fill(tuple(RED))
btn_surf.blit(self.stop_text, (0, 10))
elif self.state == 'stepping':
btn_surf.fill(tuple(YELLOW))
btn_surf.blit(self.step_text, (0, 10))
self.screen.blit(btn_surf, self.btn_rect)
# render messages
if self.state == 'game_over':
self.screen.blit(self.game_over_text, (50, 00))
pygame.display.update()
pygame.time.wait(1500)
self.state = 'reset'
elif self.state == 'won':
self.screen.blit(self.won_text, (50, 00))
pygame.display.update()
pygame.time.wait(1500)
self.state = 'reset'
pygame.display.flip()
def process_inputs(self):
# proceed events
for event in pygame.event.get():
if event.type == pygame.QUIT:
self.state = 'quit'
# handle MOUSEBUTTONUP
elif event.type == pygame.MOUSEBUTTONUP:
pos = pygame.mouse.get_pos()
# select command to edit by the programmer
if self.prg.screen_rect is not None:
if self.prg.screen_rect.collidepoint(pos):
print(f"clicked at pos = {pos}")
self.programmer.command_to_edit = self.prg.get_clicked_command(pos)
# set the selected command to a new value
if self.programmer.screen_rect is not None:
if self.programmer.screen_rect.collidepoint(pos):
self.programmer.update_selected_command(pos)
# clicked RUN/STOP button
if self.btn_rect is not None and self.btn_rect.collidepoint(*pos):
print(f"clicked at pos = {pos}")
if self.state == 'running':
self.state = 'input'
elif self.state == 'input':
self.state = 'running'
elif self.state == 'stepping':
self.state = self.prg.step(self.state)
elif event.type == pygame.KEYUP:
if event.key == pygame.K_x:
if not self.beamer_mode:
# switch to beamer mode
os.environ['SDL_VIDEO_WINDOW_POS'] = '1920, 280'
self.scale_fac = 180
self.screen = pygame.display.set_mode((self.board.tiles.shape[1] * self.scale_fac,
self.board.tiles.shape[0] * self.scale_fac),
pygame.NOFRAME)
self.beamer_mode = True
else:
# switch to normal mode
os.environ['SDL_VIDEO_WINDOW_POS'] = '0, 0'
self.scale_fac = 125
self.screen = pygame.display.set_mode((int(self.board.tiles.shape[1] * self.scale_fac * 1.1),
int((self.board.tiles.shape[0] + 2) * self.scale_fac * 1.2)))
self.beamer_mode = False
elif event.key == pygame.K_r:
# run program
self.state = 'running'
elif event.key == pygame.K_s:
if self.state != 'stepping':
self.state = 'stepping'
else:
self.state = self.prg.step(self.state)
return self.state
def reset(self):
self.prg.prg_counter = 0
self.robot.x = self.initial_pos[0]
self.robot.y = self.initial_pos[1]
self.robot.orientation = self.initial_pos[2]
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
self.board.tiles = self.inital_board_tiles.copy()
return 'input'
def run(self):
running = True
while running:
self.state = self.process_inputs()
if self.state == 'input':
pass
elif self.state == 'running':
self.state = self.prg.step()
elif self.state == 'reset':
self.state = self.reset()
elif self.state == 'quit':
running = False
elif self.state == 'stepping':
pass
elif self.state == 'game_over' or self.state == 'won':
pass
else:
print("unknown state")
return
self.render()
pygame.time.wait(100)
if __name__ == "__main__":
random.seed(42)
game = Game(dimx=7, dimy=4, robotx=3, roboty=1)
game.run()
# TODOs
# - in stepping mode (s) it is possible to drive outside of the map
# - when no P0 command is present the program counter will overflow

View File

@ -3,168 +3,38 @@ import numpy as np
import cv2 import cv2
import os import os
import time import time
import math
from pyqtgraph.Qt import QtCore, QtGui
from pyqtgraph.parametertree import Parameter, ParameterTree, parameterTypes
import pyqtgraph as pg
from shapely.geometry import LineString from shapely.geometry import LineString
from shapely.affinity import scale
from queue import Queue from queue import Queue
import aruco import aruco
class FPSCounter:
def __init__(self):
self.fps_start_time = time.time()
self.fps_counter = 0
self.fps_display_rate = 1.0
def get_fps(self):
self.fps_counter += 1
delta_t = time.time() - self.fps_start_time
if delta_t > self.fps_display_rate:
self.fps_counter = 0
self.fps_start_time = time.time()
return self.fps_counter / delta_t
## test add/remove
## this group includes a menu allowing the user to add new parameters into its child list
class RobotMarkerGroup(parameterTypes.GroupParameter):
def __init__(self, **opts):
opts['type'] = 'group'
opts['addText'] = "Add Marker"
opts['addList'] = ['Robot']
parameterTypes.GroupParameter.__init__(self, **opts)
def addNew(self, typ):
current_robots = self.children()
current_indices = [int(r.name().split(' ')[1]) for r in current_robots]
new_index = len(current_indices) + 1
val = 0
self.addChild(dict(name=f"Robot {new_index}", type='int', value=val, removable=False,
renamable=True))
class CornerMarkerGroup(parameterTypes.GroupParameter):
def __init__(self, **opts):
opts['type'] = 'group'
opts['addText'] = "Add Marker"
opts['addList'] = ['Corner']
parameterTypes.GroupParameter.__init__(self, **opts)
def addNew(self, typ):
current_corners = self.children()
current_chars = [str(r.name().split(' ')[1]) for r in current_corners]
new_char = chr(ord(current_chars[-1]) + 1)
val = 0
self.addChild(dict(name=f"Corner {new_char}", type='int', value=val, removable=False))
class ArucoEstimator: class ArucoEstimator:
def __init__(self, robot_marker_ids=None, use_realsense=True, grid_columns=7, grid_rows=4): corner_marker_ids = {
self.app = QtGui.QApplication([]) 'a': 0,
'b': 1,
'c': 2,
'd': 3
}
## Create window with GraphicsView widget corner_estimates = {
self.win = QtGui.QWidget() 'a': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
self.layout = QtGui.QGridLayout() 'b': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
self.win.setLayout(self.layout) 'c': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'd': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
}
self.win.keyPressEvent = self.handleKeyPressEvent def __init__(self, robot_marker_ids=None, use_realsense=True, grid_columns=8, grid_rows=8):
self.win.setWindowTitle('ArucoEstimator')
self.plotwidget = pg.PlotWidget()
self.layout.addWidget(self.plotwidget)
## lock the aspect ratio so pixels are always square
self.plotwidget.setAspectLocked(True)
self.plotwidget.getPlotItem().getAxis('left').hide()
self.plotwidget.getPlotItem().getAxis('bottom').hide()
## Create image item
self.img = pg.ImageItem(border='w')
self.img.setLevels([[0, 255], [0, 255], [0, 255]])
self.img.mouseClickEvent = self.handleMouseEvent
self.plotwidget.addItem(self.img)
# fps display
self.fps_counter = FPSCounter()
self.fps_overlay = pg.TextItem('fps = 0', color=(255, 255, 0), anchor=(0,1))
self.plotwidget.addItem(self.fps_overlay)
self.invert_grayscale = False
self.draw_grid = True
self.draw_markers = True
self.draw_marker_coordinate_system = False
self.corner_marker_size = 0.075
self.corner_marker_ids = {
'a': 0,
'b': 1,
'c': 2,
'd': 3
}
self.corner_estimates = {
'a': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'b': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'c': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
'd': {'pixel_coordinate': None, 'x': None, 'y': None, 'n_estimates': 0},
}
self.grid_columns = grid_columns self.grid_columns = grid_columns
self.grid_rows = grid_rows self.grid_rows = grid_rows
self.robot_marker_size = 0.07
if robot_marker_ids is None: if robot_marker_ids is None:
robot_marker_ids = [] robot_marker_ids = []
self.robot_marker_ids = robot_marker_ids self.robot_marker_ids = robot_marker_ids
self.robot_marker_estimates = dict([(marker_id, {'t': None, 'x': None, 'y': None, 'angle': None}) self.robot_marker_estimates = dict([(marker_id, {'t': None, 'x': None, 'y': None, 'angle': None})
for marker_id in self.robot_marker_ids]) for marker_id in self.robot_marker_ids])
robot_marker_group = [{'name': f'Robot {ind}', 'type': 'int', 'value': marker_id} for ind, marker_id in self.draw_grid = False
enumerate(self.robot_marker_ids, 1)]
corner_marker_group = [{'name': f'Corner {letter}', 'type': 'int', 'value': marker_id} for letter, marker_id in
self.corner_marker_ids.items()]
self.threshold = 10.0
# parameter
params_spec = [
{'name': 'Corner marker size', 'type': 'float', 'value': self.corner_marker_size, 'siPrefix': True,
'suffix': 'm', 'limits': (0.01, 1), 'step': 0.001},
{'name': 'Robot marker size', 'type': 'float', 'value': self.robot_marker_size, 'siPrefix': True,
'suffix': 'm', 'limits': (0.01, 1), 'step': 0.001},
{'name': 'Draw markers', 'type': 'bool', 'value': self.draw_markers},
{'name': 'Draw marker coordinate system', 'type': 'bool', 'value': self.draw_marker_coordinate_system},
{'name': 'Threshold', 'type': 'float', 'value': self.threshold},
{'name': 'Invert grayscale', 'type': 'bool', 'value': self.invert_grayscale, 'tip': "Invert grayscale image before marker detection"},
{'name': 'Show FPS', 'type': 'bool', 'value': True, 'tip': "Display frames per second counter"},
{'name': 'Draw grid', 'type': 'bool', 'value': self.draw_grid, 'tip': "Draw grid spanned by the markers 0 - 3"},
{'name': 'Grid columns', 'type': 'int', 'value': self.grid_columns, 'tip': "Number of columns for the grid"},
{'name': 'Grid rows', 'type': 'int', 'value': self.grid_rows, 'tip': "Number of rows for the grid"},
{'name': 'Display mode', 'type': 'list', 'values': ['color', 'grayscale'], 'value': 'grayscale', 'tip': "Display mode for the video"},
{'name': 'Autoexposure', 'type': 'bool', 'value': True},
{'name': 'Controlled robot', 'type': 'list', 'values': self.robot_marker_ids, 'tip': 'Robot to control'},
RobotMarkerGroup(name="Robot markers", children=robot_marker_group),
CornerMarkerGroup(name="Corner markers", children=corner_marker_group),
]
self.params = Parameter.create(name='params', type='group', children=params_spec)
self.params.param('Invert grayscale').sigValueChanged.connect(lambda _, v: self.__setattr__('invert_grayscale', v))
self.params.param('Threshold').sigValueChanged.connect(lambda _, v: self.__setattr__('threshold', v))
self.params.param('Draw markers').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_markers', v))
self.params.param('Draw marker coordinate system').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_marker_coordinate_system', v))
self.params.param('Draw grid').sigValueChanged.connect(lambda _, v: self.__setattr__('draw_grid', v))
self.params.param('Grid columns').sigValueChanged.connect(lambda _, v: self.__setattr__('grid_columns', v))
self.params.param('Grid rows').sigValueChanged.connect(lambda _, v: self.__setattr__('grid_rows', v))
self.params.param('Corner marker size').sigValueChanged.connect(lambda _, v: self.__setattr__('corner_marker_size', v))
self.params.param('Robot marker size').sigValueChanged.connect(lambda _, v: self.__setattr__('robot_marker_size', v))
self.params.param('Show FPS').sigValueChanged.connect(lambda _, v: self.fps_overlay.show() if v else self.fps_overlay.hide())
self.params.param('Autoexposure').sigValueChanged.connect(lambda _, v: self.set_autoexposure(v))
self.params.param('Controlled robot').sigValueChanged.connect(lambda _, v: self.event_queue.put(('controlled_robot',
{'robot_id': v})))
self.paramtree = ParameterTree()
self.paramtree.setParameters(self.params, showTop=False)
self.layout.addWidget(self.paramtree)
self.win.show() ## show widget alone in its own window
self.event_queue = Queue() self.event_queue = Queue()
@ -172,14 +42,15 @@ class ArucoEstimator:
# Configure depth and color streams # Configure depth and color streams
self.pipeline = rs.pipeline() self.pipeline = rs.pipeline()
config = rs.config() config = rs.config()
# config.enable_stream(rs.stream.color, 1920, 1080, rs.format.bgr8, 30) #config.enable_stream(rs.stream.color, 1920, 1080, rs.format.bgr8, 30)
config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30) config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30)
# Start streaming # Start streaming
self.pipeline.start(config) self.pipeline.start(config)
# enable auto exposure # disable auto exposure
self.set_autoexposure(self.params['Autoexposure']) color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
color_sensor.set_option(rs.option.enable_auto_exposure, False)
camera_intrinsics = self.pipeline.get_active_profile().get_stream( camera_intrinsics = self.pipeline.get_active_profile().get_stream(
rs.stream.color).as_video_stream_profile().get_intrinsics() rs.stream.color).as_video_stream_profile().get_intrinsics()
@ -196,175 +67,144 @@ class ArucoEstimator:
self.pipeline = None self.pipeline = None
# create detector and get parameters # create detector and get parameters
self.detector = aruco.MarkerDetector()
# self.detector.setDictionary('ARUCO_MIP_36h12')
#self.detector.setDictionary('ARUCO_MIP_16h3')
# self.detector.setDictionary('ARUCO')
#self.detector.setDetectionMode(aruco.DM_NORMAL, 0.05)
self.detector.setDetectionMode(aruco.DM_VIDEO_FAST, 0.05)
self.detector_params = self.detector.getParameters()
# print detector parameters # print detector parameters
# print("detector params:") print("detector params:")
# print(self.detector_params) for val in dir(self.detector_params):
# for val in dir(self.detector_params): if not val.startswith("__"):
# if not val.startswith("__"): print("\t{} : {}".format(val, self.detector_params.__getattribute__(val)))
# print("\t{} : {}".format(val, self.detector_params.__getattribute__(val)))
self.camparam = aruco.CameraParameters() self.camparam = aruco.CameraParameters()
if use_realsense: if use_realsense:
self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "realsense.yml")) self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "realsense.yml"))
else: else:
self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "dfk72_6mm_param2.yml")) self.camparam.readFromXMLFile(os.path.join(os.path.dirname(__file__), "dfk72_6mm_param2.yml"))
print(self.camparam)
self.drag_line_end = None def mouse_callback(self, event, px, py, flags, param):
self.drag_line_start = None """
self.previous_click = None This function is called for each mouse event inside the opencv window.
If there are reference markers available we compute the real world position corresponding to the clicked
position and put it in an event queue.
:param event: type of event (mouse movement, left click, right click, etc.)
:param px: x-position of event
:param py: y-position of event
"""
if event == cv2.EVENT_LBUTTONDOWN:
if self.all_corners_detected():
# inter/extrapolate from clicked point to marker position
px1 = self.corner_estimates['a']['pixel_coordinate'][0]
px3 = self.corner_estimates['c']['pixel_coordinate'][0]
py1 = self.corner_estimates['a']['pixel_coordinate'][1]
py3 = self.corner_estimates['c']['pixel_coordinate'][1]
def set_autoexposure(self, v): x1 = self.corner_estimates['a']['x']
if self.pipeline is not None: x3 = self.corner_estimates['c']['x']
color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1] y1 = self.corner_estimates['a']['y']
color_sensor.set_option(rs.option.enable_auto_exposure, v) y3 = self.corner_estimates['c']['y']
print(color_sensor.get_option(rs.option.enable_auto_exposure))
def compute_clicked_position(self, px, py): alpha = (px - px1) / (px3 - px1)
if self.all_corners_detected(): beta = (py - py1) / (py3 - py1)
# inter/extrapolate from clicked point to marker position
px1 = self.corner_estimates['a']['pixel_coordinate'][0]
px3 = self.corner_estimates['c']['pixel_coordinate'][0]
py1 = self.corner_estimates['a']['pixel_coordinate'][1]
py3 = self.corner_estimates['c']['pixel_coordinate'][1]
x1 = self.corner_estimates['a']['x'] print(f"alpha = {alpha}, beta = {beta}")
x3 = self.corner_estimates['c']['x']
y1 = self.corner_estimates['a']['y']
y3 = self.corner_estimates['c']['y']
alpha = (px - px1) / (px3 - px1) target_x = x1 + alpha * (x3 - x1)
beta = (py - py1) / (py3 - py1) target_y = y1 + beta * (y3 - y1)
target = np.array([target_x, target_y, 0])
print(f"alpha = {alpha}, beta = {beta}") print(f"target = ({target_x},{target_y})")
self.event_queue.put(('click', target))
target_x = x1 + alpha * (x3 - x1)
target_y = y1 + beta * (y3 - y1)
target = np.array([target_x, target_y])
else:
print("not all markers have been detected!")
target = np.array([px, -py])
return target
def handleMouseEvent(self, event):
# get click position as distance to top-left corner of the image
px = event.pos().x()
py = self.img.height() - event.pos().y()
print(f"px = {px}, py = {py}")
if event.button() == QtCore.Qt.MouseButton.LeftButton:
# self.drag_line_start = (px, py)
# elif event == cv2.EVENT_LBUTTONUP:
self.drag_line_end = (px, py)
self.drag_line_start = (px, py)
target_pos = self.compute_clicked_position(self.drag_line_start[0], self.drag_line_start[1])
if self.drag_line_start != self.drag_line_end:
# compute target angle for clicked position
facing_pos = self.compute_clicked_position(px, py)
v = facing_pos - target_pos
target_angle = math.atan2(v[1], v[0])
else: else:
# determine angle from previously clicked pos (= self.drag_line_end) print("not all markers have been detected!")
if self.previous_click is not None:
previous_pos = self.compute_clicked_position(self.previous_click[0], self.previous_click[1]) def run_tracking(self, draw_markers=True, draw_marker_coordinate_system=False, invert_grayscale=False):
v = target_pos - previous_pos """
target_angle = math.atan2(v[1], v[0]) Run the marker tracking in a loop
"""
cv2.namedWindow('RoboRally', cv2.WINDOW_AUTOSIZE)
cv2.setMouseCallback('RoboRally', self.mouse_callback)
try:
running = True
while running:
if self.pipeline:
frames = self.pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
if not color_frame:
continue
# Convert images to numpy arrays
color_image = np.asanyarray(color_frame.get_data())
else: else:
target_angle = 0.0 # Capture frame-by-frame
ret, color_image = self.cv_camera.read()
t_image = time.time()
target = np.array([target_pos[0], target_pos[1], target_angle]) gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY)
print(target) if invert_grayscale:
self.previous_click = (px, py) cv2.bitwise_not(gray, gray)
self.event_queue.put(('click', {'x': target[0], 'y': target[1], 'angle': target[2]}))
self.drag_line_start = None
elif event == cv2.EVENT_MOUSEMOVE:
if self.drag_line_start is not None:
self.drag_line_end = (px, py)
def process_frame(self): # run aruco marker detection
if self.pipeline: detected_markers = self.detector.detect(gray)
frames = self.pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
# Convert images to numpy arrays # extract data for detected markers
color_image = np.asanyarray(color_frame.get_data()) detected_marker_data = {}
else: for marker in detected_markers:
# Capture frame-by-frame detected_marker_data[marker.id] = {'marker_center': marker.getCenter()}
ret, color_image = self.cv_camera.read() if marker.id >= 0:
t_image = time.time() if marker.id in self.corner_marker_ids.values():
marker.calculateExtrinsics(0.1, self.camparam)
else:
marker.calculateExtrinsics(0.07, self.camparam)
detected_marker_data[marker.id]['Rvec'] = marker.Rvec
detected_marker_data[marker.id]['Tvec'] = marker.Tvec
gray = cv2.cvtColor(color_image, cv2.COLOR_BGR2GRAY) if marker.id >= 0: # draw markers onto the image
if self.invert_grayscale: if draw_markers:
cv2.bitwise_not(gray, gray) marker.draw(color_image, np.array([255, 255, 255]), 2, True)
detector = aruco.MarkerDetector() if draw_marker_coordinate_system:
detector.setDetectionMode(aruco.DM_VIDEO_FAST, 0.01) aruco.CvDrawingUtils.draw3dAxis(color_image, self.camparam, marker.Rvec, marker.Tvec, .1)
#detector_params = detector.getParameters() # store data
for marker_id, data in detected_marker_data.items():
self.update_estimate(marker_id, data['marker_center'], data['Rvec'], data['Tvec'], t_image)
# run aruco marker detection # draw grid
detected_markers = detector.detect(gray) if self.draw_grid:
# detected_markers2 = detector.detect(gray) color_image = self.draw_grid_lines(color_image, detected_marker_data)
#gray = detector.getThresholdedImage() color_image = self.draw_robot_pos(color_image, detected_marker_data)
display_mode = self.params.param('Display mode').value() # Show images
cv2.imshow('RoboRally', color_image)
key = cv2.waitKey(1)
#print(f"detected_markers = {[marker.id for marker in detected_markers]}") if key > 0:
#print("threshold = ", self.threshold) self.event_queue.put(('key', key))
if key == ord('g'):
# extract data for detected markers self.draw_grid = not self.draw_grid
detected_marker_data = {} if key == ord('q'):
for marker in detected_markers: running = False
if marker.id >= 0: if key == ord('a'):
detected_marker_data[marker.id] = {'marker_center': marker.getCenter()} color_sensor = self.pipeline.get_active_profile().get_device().query_sensors()[1]
if color_sensor.get_option(rs.option.enable_auto_exposure) == 1.0:
if marker.id in self.corner_marker_ids.values(): color_sensor.set_option(rs.option.enable_auto_exposure, False)
marker.calculateExtrinsics(self.corner_marker_size, self.camparam) print("auto exposure OFF")
else: else:
marker.calculateExtrinsics(self.robot_marker_size, self.camparam) color_sensor.set_option(rs.option.enable_auto_exposure, True)
detected_marker_data[marker.id]['Rvec'] = marker.Rvec print("auto exposure ON")
detected_marker_data[marker.id]['Tvec'] = marker.Tvec finally:
cv2.destroyAllWindows()
if self.draw_markers:
marker.draw(color_image, np.array([255, 255, 255]), 2, True)
if self.draw_marker_coordinate_system:
aruco.CvDrawingUtils.draw3dAxis(color_image, self.camparam, marker.Rvec, marker.Tvec, .1)
# store data
for marker_id, data in detected_marker_data.items():
self.update_estimate(marker_id, data['marker_center'], data['Rvec'], data['Tvec'], t_image)
# draw grid
if self.draw_grid:
color_image = self.draw_grid_lines(color_image, detected_marker_data)
color_image = self.draw_robot_pos(color_image, detected_marker_data)
# draw drag
if self.drag_line_start is not None and self.drag_line_end is not None:
color_image = cv2.line(color_image, self.drag_line_start, self.drag_line_end, color=(0, 0, 255), thickness=2)
# compute frame rate
self.fps_overlay.setText(f"fps = {self.fps_counter.get_fps():.1f}")
# Show images
color_image_rgb = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) # convert to RGB
if display_mode == 'color':
self.img.setImage(np.transpose(np.flipud(color_image_rgb), axes=(1, 0, 2)))
elif display_mode == 'grayscale':
self.img.setImage(np.transpose(np.flipud(gray)))
QtCore.QTimer.singleShot(1, self.process_frame)
def handleKeyPressEvent(self, ev):
key = ev.key()
self.event_queue.put(('key', key))
if key == QtCore.Qt.Key_Q:
if self.pipeline is not None: if self.pipeline is not None:
# Stop streaming # Stop streaming
self.pipeline.stop() self.pipeline.stop()
self.app.quit()
elif key == QtCore.Qt.Key_I:
self.invert_grayscale = not self.invert_grayscale
def update_estimate(self, marker_id, pixel_coord_center, rvec, tvec, t_image): def update_estimate(self, marker_id, pixel_coord_center, rvec, tvec, t_image):
# update the marker estimate with new data # update the marker estimate with new data
@ -404,7 +244,7 @@ class ArucoEstimator:
_, _, _, _, _, _, euler_angles = cv2.decomposeProjectionMatrix(pose_mat) _, _, _, _, _, _, euler_angles = cv2.decomposeProjectionMatrix(pose_mat)
angle = -euler_angles[2][0] * np.pi / 180.0 angle = -euler_angles[2][0] * np.pi / 180.0
self.robot_marker_estimates[marker_id] = {'t': float(t_image), 'x': float(x), 'y': float(y), 'angle': float(angle)} self.robot_marker_estimates[marker_id] = {'t': t_image, 'x': x, 'y': y, 'angle': angle}
def all_corners_detected(self): def all_corners_detected(self):
# checks if all corner markers have been detected at least once # checks if all corner markers have been detected at least once
@ -414,7 +254,7 @@ class ArucoEstimator:
# checks if all robot markers have been detected at least once # checks if all robot markers have been detected at least once
return not any([estimate['t'] is None for estimate in self.robot_marker_estimates.values()]) return not any([estimate['t'] is None for estimate in self.robot_marker_estimates.values()])
def get_pos_from_grid_point(self, x, y, dimx, dimy, orientation=None): def get_pos_from_grid_point(self, x, y, orientation=None):
""" """
returns the position for the given grid point based on the current corner estimates returns the position for the given grid point based on the current corner estimates
:param x: x position on the grid ( 0 &le x &lt number of grid columns) :param x: x position on the grid ( 0 &le x &lt number of grid columns)
@ -423,21 +263,17 @@ class ArucoEstimator:
:return: numpy array with corresponding real world x- and y-position :return: numpy array with corresponding real world x- and y-position
if orientation was specified the array also contains the matching angle for the orientation if orientation was specified the array also contains the matching angle for the orientation
""" """
if not self.all_corners_detected(): assert 0 <= x < self.grid_columns
#raise RuntimeError("not all corner markers have been detected yet") assert 0 <= y < self.grid_rows
a = np.array([0,1]) assert self.all_corners_detected()
b = np.array([1,1])
c = np.array([1,0])
d = np.array([0,0])
else:
# compute column line
a = np.array([self.corner_estimates['a']['x'], self.corner_estimates['a']['y']])
b = np.array([self.corner_estimates['b']['x'], self.corner_estimates['b']['y']])
c = np.array([self.corner_estimates['c']['x'], self.corner_estimates['c']['y']])
d = np.array([self.corner_estimates['d']['x'], self.corner_estimates['d']['y']])
x_frac = (x + 0.5) / dimx # compute column line
y_frac = (y + 0.5) / dimy a = np.array([self.corner_estimates['a']['x'], self.corner_estimates['a']['y']])
b = np.array([self.corner_estimates['b']['x'], self.corner_estimates['b']['y']])
c = np.array([self.corner_estimates['c']['x'], self.corner_estimates['c']['y']])
d = np.array([self.corner_estimates['d']['x'], self.corner_estimates['d']['y']])
x_frac = (x + 0.5) / self.grid_columns
y_frac = (y + 0.5) / self.grid_rows
vab = b - a vab = b - a
vdc = c - d vdc = c - d
@ -449,11 +285,8 @@ class ArucoEstimator:
row_line_top = a + y_frac * vad row_line_top = a + y_frac * vad
row_line_bottom = b + y_frac * vbc row_line_bottom = b + y_frac * vbc
column_line_orig = LineString([column_line_top, column_line_bottom]) column_line = LineString([column_line_top, column_line_bottom])
row_line_orig = LineString([row_line_top, row_line_bottom]) row_line = LineString([row_line_top, row_line_bottom])
column_line = scale(column_line_orig, 10, 10)
row_line = scale(row_line_orig, 10, 10)
int_pt = column_line.intersection(row_line) int_pt = column_line.intersection(row_line)
point_of_intersection = np.array([int_pt.x, int_pt.y]) point_of_intersection = np.array([int_pt.x, int_pt.y])
@ -495,7 +328,7 @@ class ArucoEstimator:
corner_1_center = self.corner_estimates[corner_1]['pixel_coordinate'] corner_1_center = self.corner_estimates[corner_1]['pixel_coordinate']
corner_2_center = self.corner_estimates[corner_2]['pixel_coordinate'] corner_2_center = self.corner_estimates[corner_2]['pixel_coordinate']
if corner_1_center is not None and corner_2_center is not None: if corner_1_center is not None and corner_2_center is not None:
frame = cv2.line(frame, tuple(corner_1_center.astype(int)), tuple(corner_2_center.astype(int)), color=(0, 0, 255), thickness=2) frame = cv2.line(frame, tuple(corner_1_center), tuple(corner_2_center), color=(0, 0, 255), thickness=2)
return frame return frame
def draw_grid_lines(self, frame, detected_marker_data): def draw_grid_lines(self, frame, detected_marker_data):
@ -518,7 +351,7 @@ class ArucoEstimator:
for x in range(1, self.grid_columns): for x in range(1, self.grid_columns):
column_line_top = a + x / self.grid_columns * vab column_line_top = a + x / self.grid_columns * vab
column_line_bottom = d + x / self.grid_columns * vdc column_line_bottom = d + x / self.grid_columns * vdc
frame = cv2.line(frame, tuple(column_line_top.astype(int)), tuple(column_line_bottom.astype(int)), color=(0, 255, 0), frame = cv2.line(frame, tuple(column_line_top), tuple(column_line_bottom), color=(0, 255, 0),
thickness=1) thickness=1)
# draw horizontal lines # draw horizontal lines
@ -527,7 +360,7 @@ class ArucoEstimator:
for y in range(1, self.grid_rows): for y in range(1, self.grid_rows):
row_line_top = a + y / self.grid_rows * vad row_line_top = a + y / self.grid_rows * vad
row_line_bottom = b + y / self.grid_rows * vbc row_line_bottom = b + y / self.grid_rows * vbc
frame = cv2.line(frame, tuple(row_line_top.astype(int)), tuple(row_line_bottom.astype(int)), color=(0, 255, 0), frame = cv2.line(frame, tuple(row_line_top), tuple(row_line_bottom), color=(0, 255, 0),
thickness=1) thickness=1)
return frame return frame
@ -564,9 +397,4 @@ class ArucoEstimator:
if __name__ == "__main__": if __name__ == "__main__":
estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14]) estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[11, 12, 13, 14])
estimator.process_frame() estimator.run_tracking(draw_markers=True, invert_grayscale=True)
import sys
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtGui.QApplication.instance().exec_()

View File

@ -143,7 +143,7 @@ class OpenLoopSolver:
def solve(self, x0, target): def solve(self, x0, target):
target = np.asarray(target)
angles_unwrapped = np.unwrap([x0[2], target[2]]) # unwrap angle to avoid jump in data angles_unwrapped = np.unwrap([x0[2], target[2]]) # unwrap angle to avoid jump in data
x0[2] = angles_unwrapped[0] x0[2] = angles_unwrapped[0]
target[2] = angles_unwrapped[1] target[2] = angles_unwrapped[1]

View File

@ -1,123 +0,0 @@
import numpy as np
import time
from robot import ControlledRobot
from pid_controller import PIDController
from mpc_controller import MPCController
from event_listener import EventListener
class ControlCommander:
valid_controller_types = {'pid': PIDController,
'mpc': MPCController}
def __init__(self, id_ip_dict, controller_type='pid'):
"""
:param id_ip_dict: dictionary containing robot marker id and ip of the robot, e.g. { 12: '10.10.11.91' }
:param controller_type: string 'pid', 'mpc'; or dictionary with robot id and controller type string, e.g. { 12: 'mpc', 13: 'pid'}
"""
self.robots = {}
for id, ip in id_ip_dict.items():
self.robots[id] = ControlledRobot(id, ip)
for id, r in self.robots.items():
r.connect()
if type(controller_type) == dict:
for id, ctype in controller_type.items():
if ctype in ControlCommander.valid_controller_types:
self.robots[id].attach_controller(ControlCommander.valid_controller_types[ctype]())
else:
raise Exception(f"invalid controller type {ctype} specified for robot {id}. "
f"valid controller types are {list(ControlCommander.valid_controller_types.keys())}")
elif controller_type in ControlCommander.valid_controller_types:
for id, r in self.robots.items():
r.attach_controller(ControlCommander.valid_controller_types[controller_type]())
else:
raise Exception(f"invalid controller type {controller_type} specified. valid controller types are "
f"{list(ControlCommander.valid_controller_types.keys())}")
self.event_listener = EventListener(event_server=('127.0.0.1', 42424))
self.current_robot_index = 0
self.controlling = False
self.running = False
def run(self):
unconnected_robots = list(filter(lambda r: not r.connected, self.robots.values()))
if len(unconnected_robots) > 0:
print(f"warning: could not connect to the following robots: {unconnected_robots}")
return
all_detected = False
while not all_detected:
undetected_robots = list(filter(lambda r: None in r.get_measurement(), self.robots.values()))
all_detected = len(undetected_robots) == 0
if not all_detected:
print(f"warning: no measurements available for the following robots: {undetected_robots}")
time.sleep(0.5)
print("all robots detected -> starting control")
self.running = True
while self.running:
while not self.event_listener.event_queue.empty():
event = self.event_listener.event_queue.get()
self.handle_event(event)
def handle_event(self, event):
# handle events from opencv window
print("event: ", event)
if event[0] == 'click': # non-blocking move to target pos
target = event[1]
target_pos = np.array([target['x'], target['y'], target['angle']])
controlled_robot_id = list(self.robots.keys())[self.current_robot_index]
self.robots[controlled_robot_id].move_to_pos(target_pos)
elif event[0] == 'move_blocking': # blocking move to specified target position
target = event[1]
target_pos = np.array([target['x'], target['y'], target['angle']])
controlled_robot_id = list(self.robots.keys())[self.current_robot_index]
# initialize move and wait for the robot to reach the target position
self.robots[controlled_robot_id].move_to_pos(target_pos, blocking=True)
#time.sleep(0.5)
# send confirmation to the server which initiated the command
self.event_listener.send_reply("ack\n".encode())
elif event[0] == 'key':
key = event[1]
if key == 16777235: # arrow up
self.controlling = not self.controlling
if not self.controlling:
print("disable control")
for _, r in self.robots.items():
r.stop_control()
else:
print("enable control")
for _, r in self.robots.items():
r.start_control()
elif key == 16777236: # left
# switch controlled robot
self.current_robot_index = (self.current_robot_index + 1) % len(self.robots)
controlled_robot_id = list(self.robots.keys())[self.current_robot_index]
robot = self.robots[controlled_robot_id]
print(f"controlled robot: {robot.id}")
elif key == 113 or key == 27: # q or ESCAPE
print("quit!")
for r in self.robots:
r.stop_control()
self.running = False
if __name__ == '__main__':
id_ip_dict = {
#11: '10.10.11.88',
#12: '192.168.1.12',
13: '192.168.1.13',
#14: '10.10.11.89',
}
# controller_type = {12: 'mpc', 13: 'pid'}
controller_type = 'mpc'
rc = ControlCommander(id_ip_dict, controller_type=controller_type)
rc.run()

View File

@ -1,61 +0,0 @@
import threading
import time
class ControllerBase:
def __init__(self, control_rate=0.1):
self.robot = None
self.controlling = False
self.target_pos = None
self.control_thread = None
self.control_rate = control_rate
def control_loop(self):
while self.controlling:
if self.target_pos is not None:
state = self.get_measured_position()
control = self.compute_control(state)
if self.controlling:
self.apply_control(control)
if self.robot is not None:
self.robot.send_cmd(u1=0, u2=0) # stop robot
def set_target_position(self, target_pos):
self.target_pos = target_pos
def get_measured_position(self):
if self.robot is not None:
return self.robot.get_measurement()
else:
raise Exception("error: controller cannot get measurement!\n"
" there is no robot attached to the controller!")
def compute_control(self, state):
return 0.0, 0.0
def apply_control(self, control):
if self.robot is not None:
self.robot.send_cmd(u1=control[0], u2=control[1])
else:
raise Exception("error: controller cannot apply control!\n"
" there is no robot attached to the controller!")
time.sleep(self.control_rate)
def attach_robot(self, robot):
self.robot = robot
def start(self):
self.controlling = True
# start control thread
self.control_thread = threading.Thread(target=self.control_loop)
self.control_thread.start()
def stop(self):
# pause controller
self.controlling = False
if self.control_thread is not None:
self.control_thread.join()

View File

@ -1,52 +0,0 @@
import socket
import threading
import queue
import json
class EventListener:
def __init__(self, event_server):
self.event_thread = threading.Thread(target=self.receive_events)
self.event_thread.daemon = True # mark thread as daemon -> it terminates automatically when program shuts down
self.event_queue = queue.Queue()
self.receiving = False
# connect to event server
print(f"connecting to event server on {event_server} ...")
self.event_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP socket
try:
self.event_socket.connect(event_server)
self.event_socket.sendall(f"events\n".encode())
self.event_socket.settimeout(0.1)
# check if we receive data from the event server
response = self.event_socket.recv(1024)
if 'error' not in str(response):
print("... connected! -> start listening for events")
self.event_socket.settimeout(None)
# if so we start the event thread
self.event_thread.start()
else:
print(f"error: cannot communicate with the event server.\n The response was: {response}")
except socket.timeout:
print(f"error: the event server did not respond.")
except ConnectionRefusedError:
print(f"error: could not connect to event server at {event_server}.")
def send_reply(self, data):
self.event_socket.sendall(data)
def receive_events(self):
self.receiving = True
while self.receiving:
received = str(self.event_socket.recv(1024), "utf-8")
if len(received) > 0:
events = received.split('\n')
for event_json in events:
if len(event_json) > 0:
event = json.loads(event_json)
self.event_queue.put(event)
else:
self.receiving = False
print("event server seems to have shut down -> stop listening")

View File

@ -1,62 +0,0 @@
import socket
import pygame
from argparse import ArgumentParser
class ManualController:
def __init__(self, estimator):
self.estimator = estimator
self.estimator.external_keyboard_callback = self.handle_keypress
self.controlling = False
def handle_keypress(self, key):
pass
parser = ArgumentParser()
parser.add_argument('ip', metavar='ip', type=str, help='ip address of the controlled robot')
args = parser.parse_args()
ip = args.ip
pygame.init()
pygame.display.set_mode((640, 480))
rc_socket = socket.socket()
try:
rc_socket.connect((ip, 1234)) # connect to robot
except socket.error:
print("could not connect to socket")
running = True
while running:
u1 = 0
u2 = 0
vmax = 1.0
events = pygame.event.get()
for event in events:
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_LEFT:
u1 = -vmax
u2 = vmax
print(f"turn left: ({u1},{u2})")
elif event.key == pygame.K_RIGHT:
u1 = vmax
u2 = -vmax
print(f"turn right: ({u1},{u2})")
elif event.key == pygame.K_UP:
u1 = vmax
u2 = vmax
print(f"forward: ({u1},{u2})")
elif event.key == pygame.K_DOWN:
u1 = -vmax
u2 = -vmax
print(f"backward: ({u1},{u2})")
elif event.key == pygame.K_ESCAPE:
print("quit")
running = False
u1 = 0.0
u2 = 0.0
rc_socket.send(f'({u1},{u2})\n'.encode())
elif event.type == pygame.KEYUP:
print("key released, resetting: ({},{})".format(u1, u2))
rc_socket.send(f'({u1},{u2})\n'.encode())

View File

@ -2,14 +2,12 @@ import socket
HOST, PORT = "localhost", 42424 HOST, PORT = "localhost", 42424
robot_id = 12 robot_id = 15
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # SOCK_DGRAM is the socket type to use for UDP sockets
sock.connect((HOST, PORT)) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendall(f"{robot_id}\n".encode()) # request data for robot with given id
#sock.sendall(f"events\n".encode()) # request events sock.sendto(f"{robot_id}\n".encode(), (HOST, PORT))
receiving = True while True:
while receiving:
received = str(sock.recv(1024), "utf-8") received = str(sock.recv(1024), "utf-8")
print("Received: {}".format(received)) print("Received: {}".format(received))
receiving = len(received) > 0

View File

@ -1,151 +1,46 @@
import socketserver import socketserver
import threading import threading
import time import time
import json
from queue import Queue
from aruco_estimator import ArucoEstimator from aruco_estimator import ArucoEstimator
class MeasurementHandler(socketserver.BaseRequestHandler): class MeasurementHandler(socketserver.BaseRequestHandler):
def handle(self) -> None: def handle(self) -> None:
data = self.request.recv(1024).strip() data = self.request[0]
socket = self.request[1]
cur_thread = threading.current_thread() cur_thread = threading.current_thread()
print(f"start current thread {cur_thread}") print(f"current thread {cur_thread}")
try:
if 'events' in data.decode(): marker_id = int(data)
print(f"{cur_thread} subscribed to events") if marker_id in self.server.estimator.robot_marker_estimates:
self.request.sendall('subscribed to events\n'.encode()) while True:
# send any events in the event queue to the subscriber socket.sendto(f"{self.server.estimator.robot_marker_estimates[marker_id]}\n".encode(),
event_loop_running = True self.client_address)
while event_loop_running:
try:
while not self.server.estimator.event_queue.empty():
event = self.server.estimator.event_queue.get()
# we distinguish two kinds of events:
if event[0] == 'response_event':
# 1. for 'response_event' events we expect the event subscriber to give a reply which will then
# by passed on to the response queue for transmitting to the original correspondent
message = event[1]['event']
#print(f"passing command {message} on to subscriber")
self.request.sendall((json.dumps(message) + '\n').encode())
reply = self.request.recv(1024)
#print(f"putting reply {reply} in response queue")
self.server.response_queue.put(reply)
else:
# 2. for other types of events we don't expect a reply and just pass them on to the subscriber
self.request.sendall((json.dumps(event) + '\n').encode())
self.server.estimator.last_event = None
time.sleep(1.0 / self.server.max_measurements_per_second) time.sleep(1.0 / self.server.max_measurements_per_second)
except Exception as e:
print(f"exception in event loop: {e}")
event_loop_running = False
print("after event loop")
elif 'corners' in data.decode():
print(f"{cur_thread} subscribed to corners")
# send positions of the board markers
corner_estimates = self.server.estimator.corner_estimates
response = {}
for corner, data in corner_estimates.items():
response[corner] = {'x': data['x'], 'y': data['y']}
self.request.sendall((json.dumps(response) + '\n').encode())
elif 'move_grid_blocking' in data.decode():
print(f"{cur_thread} subscribed move_grid_blocking")
# if we receive a move_grid event
# ( e.g. move_grid_blocking;{"x":1,"y":1,"dimx":10,"dimy":10,"orientation":"^"} )
#move_grid_blocking;{"x": 1, "y": 2, "dimx": 7, "dimy": 4, "orientation": ">"}
# we compute the corresponding real-world position the robot should drive to
# and then create a new move event which is put in the event queue and will be propagated to the ControlCommander
data_decode = data.decode()
#print("data: ", data_decode)
payload = data_decode.split(';')[1]
#print("payload: ", payload)
grid_pos = json.loads(payload)
#print("grid_pos = ", grid_pos)
pos = self.server.estimator.get_pos_from_grid_point(grid_pos['x'], grid_pos['y'], grid_pos['dimx'],
grid_pos['dimy'], grid_pos['orientation'])
#print("pos = ", pos)
#print("event requiring response")
# put blocking move command in event queue
self.server.estimator.event_queue.put(('response_event',
{'event': ('move_blocking', {'x': pos[0], 'y': pos[1], 'angle': pos[2]})}))
# wait for response of the move command
# TODO this assumes that we wait only for at most one response at a time
# we could add some kind of reference here to handle multiple responses (e.g. id of the response to wait for)
while self.server.response_queue.empty():
time.sleep(0.2)
self.request.sendall(b'.\n')
pass
while not self.server.response_queue.empty():
reply = self.server.response_queue.get()
# send back response to the original source
print(f"sending reply {reply} back to correspondent {self.request}")
self.request.sendall(reply)
else:
print(f"{cur_thread} subscribed to robot position")
# send robot position
try:
marker_id = int(data)
except ValueError:
marker_id = None
if marker_id is not None:
if marker_id in self.server.estimator.robot_marker_estimates:
marker_loop_running = True
while marker_loop_running:
try:
self.request.sendall((json.dumps(self.server.estimator.robot_marker_estimates[marker_id])
+ '\n').encode())
time.sleep(1.0 / self.server.max_measurements_per_second)
except Exception as e:
print(f"exception in marker loop: {e}")
marker_loop_running = False
else:
self.request.sendall("error: unknown robot marker id\n".encode())
else: else:
self.request.sendall("error: data not understood. " socket.sendto("error: unknown robot marker id\n".encode(),
"expected <robot marker id (int)> or 'events'\n".encode()) self.client_address)
except ValueError:
socket.sendto("error: data not understood. expected robot marker id (int)\n".encode(), self.client_address)
print(f"end current thread {cur_thread}")
return return
class MeasurementServer(socketserver.ThreadingMixIn, socketserver.TCPServer): class MeasurementServer(socketserver.ThreadingMixIn, socketserver.UDPServer):
allow_reuse_address = True
def __init__(self, server_address, RequestHandlerClass, estimator, max_measurements_per_second=30): def __init__(self, server_address, RequestHandlerClass, estimator, max_measurements_per_second=30):
super().__init__(server_address, RequestHandlerClass) super().__init__(server_address, RequestHandlerClass)
self.estimator = estimator self.estimator = estimator
self.max_measurements_per_second = max_measurements_per_second self.max_measurements_per_second = max_measurements_per_second
self.response_queue = Queue()
def handle_error(self, request, client_address):
print("an error occurred -> terminating connection")
if __name__ == "__main__": if __name__ == "__main__":
aruco_estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[12, 13]) aruco_estimator = ArucoEstimator(use_realsense=False, robot_marker_ids=[15])
estimator_thread = threading.Thread(target=aruco_estimator.run_tracking)
estimator_thread.start()
# first we start thread for the measurement server with MeasurementServer(('127.0.0.1', 42424), MeasurementHandler, aruco_estimator,
measurement_server = MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator, max_measurements_per_second=30) as measurement_server:
max_measurements_per_second=30) measurement_server.serve_forever()
server_thread = threading.Thread(target=measurement_server.serve_forever)
server_thread.start()
# now we start the Aruco estimator GUI # receive with: nc 127.0.0.1 42424 -u -> 15 + Enter
aruco_estimator.process_frame()
import sys
from pyqtgraph.Qt import QtCore, QtGui
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtGui.QApplication.instance().exec_()
#with MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator,
# max_measurements_per_second=30) as measurement_server:
# measurement_server.serve_forever()
# receive with: nc 127.0.0.1 42424 -> 12 + Enter

View File

@ -1,117 +0,0 @@
from flask import Flask, render_template, request
import socket
import threading
import time
import numpy as np
from robot import Robot
from mpc_controller import MPCController
from aruco_estimator import ArucoEstimator
from shapely.geometry import LineString
width = 600
height = 400
robot_ids = [11, 12, 13, 14]
robot_ids_control_history = [11, 12, 13, 14]
app = Flask(__name__)
def calc_target_pos(px, py):
if rc.estimator.all_corners_detected():
# compute column line
a = np.array([rc.estimator.corner_estimates['a']['x'], rc.estimator.corner_estimates['a']['y']])
b = np.array([rc.estimator.corner_estimates['b']['x'], rc.estimator.corner_estimates['b']['y']])
c = np.array([rc.estimator.corner_estimates['c']['x'], rc.estimator.corner_estimates['c']['y']])
d = np.array([rc.estimator.corner_estimates['d']['x'], rc.estimator.corner_estimates['d']['y']])
x_frac = (px + 0.5) / width
y_frac = (py + 0.5) / height
vab = b - a
vdc = c - d
column_line_top = a + x_frac * vab
column_line_bottom = d + x_frac * vdc
vad = d - a
vbc = c - b
row_line_top = a + y_frac * vad
row_line_bottom = b + y_frac * vbc
column_line = LineString([column_line_top, column_line_bottom])
row_line = LineString([row_line_top, row_line_bottom])
int_pt = column_line.intersection(row_line)
point_of_intersection = np.array([int_pt.x, int_pt.y, 0.0])
return point_of_intersection
@app.route('/', methods=('GET', 'POST'))
def presence():
global robot_ids_control_history
if request.method == 'GET':
return render_template('telepresence.html', oldest_control=robot_ids.index(robot_ids_control_history[0]))
elif request.method == 'POST':
x = int(float(request.form.get('x')))
y = int(float(request.form.get('y')))
print(x, y)
margin = 0.1
x = min(max(width * margin, x), width * (1 - margin))
y = min(max(height * margin, y), height * (1 - margin))
id = int(request.form.get('robot'))
robot_ids_control_history.append(robot_ids_control_history.pop(robot_ids_control_history.index(id)))
target_pos = calc_target_pos(x, y)
rc.estimator.event_queue.put(('robot', id, target_pos))
return 'OK'
class RemoteController:
def __init__(self):
self.robots = []
#self.robots = [Robot(11, '192.168.1.11')] #, Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
self.robots = [Robot(11, '10.10.11.88'), Robot(12, '10.10.11.91'),
Robot(13, '10.10.11.90'), Robot(14, '10.10.11.89')]
self.robot_ids = {}
for r in self.robots:
self.robot_ids[r.id] = r
# socket for movement commands
self.comm_socket = socket.socket()
self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
for robot in self.robots:
robot.connect()
self.comm_socket.bind(('', 1337))
self.comm_socket.listen(5)
self.t = time.time()
# start thread for marker position detection
self.estimator = ArucoEstimator(self.robot_ids.keys())
self.estimator_thread = threading.Thread(target=self.estimator.run_tracking, kwargs={'draw_markers':False})
self.estimator_thread.start()
self.controller = MPCController(self.estimator)
def run(self):
print("waiting until all markers are detected...")
while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
pass
print("starting control")
self.controller.interactive_control(robots=self.robots)
if __name__ == "__main__":
rc = RemoteController()
rc_thread = threading.Thread(target=rc.run)
rc_thread.start()
app.run('0.0.0.0')

View File

@ -1,62 +1,175 @@
import numpy as np import numpy as np
import time import time
from controller import ControllerBase
from casadi_opt import OpenLoopSolver from casadi_opt import OpenLoopSolver
class MPCController(ControllerBase): class MPCController:
def __init__(self, N=10, T=1.0): def __init__(self, estimator):
super().__init__() self.t = time.time()
self.t = None
self.ols = OpenLoopSolver(N=N, T=T) self.estimator = estimator
self.ols.setup() self.controlling = False
self.control_rate = self.ols.T / self.ols.N
self.mstep = 2 self.mstep = 2
self.ols = OpenLoopSolver(N=20, T=1.0)
self.ols.setup()
self.dt = self.ols.T / self.ols.N
# integrator # integrator
self.omega_max = 5.0 self.omega_max = 5.0
self.control_scaling = 0.5 self.control_scaling = 0.4
def set_target_position(self, target_pos): def move_to_pos(self, target_pos, robot, near_target_counter=5):
super(MPCController, self).set_target_position(target_pos) near_target = 0
self.t = time.time() while near_target < near_target_counter:
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
pass
elif event[0] == 'key':
key = event[1]
def compute_control(self, state): if key == 84: # arrow up
x_pred = np.array(state[1:]) self.controlling = True
self.t = time.time()
elif key == 82: # arrow down
self.controlling = False
robot.send_cmd()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0])
elif key == 43: # +
self.control_scaling += 0.1
self.control_scaling = min(self.control_scaling, 1.0)
print("control scaling = ", self.control_scaling)
elif key == 45: # -
self.control_scaling -= 0.1
self.control_scaling = max(self.control_scaling, 0.1)
print("control scaling = ", self.control_scaling)
elif key == 113:
print("quit!")
self.controlling = False
robot.send_cmd()
return
elif key == 27: # escape
print("quit!")
self.controlling = False
robot.send_cmd()
return
error_pos = np.linalg.norm(x_pred[0:2] - self.target_pos[0:2]) x_pred = self.get_measurement(robot.id)
angles_unwrapped = np.unwrap([x_pred[2], self.target_pos[2]]) # unwrap angle to avoid jump in data
error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1])
if error_pos > 0.05 or error_ang > 0.2: if x_pred is not None:
#if error_pos > 0.05: error_pos = np.linalg.norm(x_pred[0:2] - target_pos[0:2])
# solve mpc open loop problem angles_unwrapped = np.unwrap([x_pred[2], target_pos[2]]) # unwrap angle to avoid jump in data
res = self.ols.solve(x_pred, self.target_pos) error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1])
# print("error pos = ", error_pos)
# print("error_pos = {}, error_ang = {}".format(error_pos, error_ang))
us1 = res[0] * self.control_scaling # if error_pos > 0.075 or error_ang > 0.35:
us2 = res[1] * self.control_scaling if error_pos > 0.05 or error_ang > 0.1:
# solve mpc open loop problem
res = self.ols.solve(x_pred, target_pos)
dt_mpc = time.time() - self.t # us1 = res[0]
if dt_mpc < self.control_rate: # wait until next control can be applied # us2 = res[1]
time.sleep(self.control_rate - dt_mpc) us1 = res[0] * self.control_scaling
else: us2 = res[1] * self.control_scaling
us1 = [0] * self.mstep # print("u = {}", (us1, us2))
us2 = [0] * self.mstep
return us1, us2 # print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
dt_mpc = time.time() - self.t
if dt_mpc < self.dt: # wait until next control can be applied
# print("sleeping for {} seconds...".format(self.dt - dt_mpc))
time.sleep(self.dt - dt_mpc)
else:
us1 = [0] * self.mstep
us2 = [0] * self.mstep
near_target += 1
def apply_control(self, control): # send controls to the robot
if self.robot is not None: for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1 u1 = us1[i]
u1 = control[0][i] u2 = us2[i]
u2 = control[1][i] robot.send_cmd(u1, u2)
self.robot.send_cmd(u1, u2) if i < self.mstep:
if i < self.mstep: time.sleep(self.dt)
time.sleep(self.control_rate) self.t = time.time() # save time the most recent control was applied
self.t = time.time() # save time the most recent control was applied else:
else: print("robot not detected yet!")
raise Exception("error: controller cannot apply control!\n"
" there is no robot attached to the controller!") def interactive_control(self, robots):
controlled_robot_number = 0
robot = robots[controlled_robot_number]
target_pos = np.array([0.0, 0.0, 0.0])
running = True
while running:
# handle events from opencv window
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
target_pos = event[1]
elif event[0] == 'key':
key = event[1]
if key == 32: # arrow up
self.controlling = not self.controlling
if not self.controlling:
print("disable control")
robot.send_cmd() # stop robot
else:
print("enable control")
self.t = time.time()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0]) # TODO: use center of board for target pos
elif key == 43: # +
self.control_scaling += 0.1
self.control_scaling = min(self.control_scaling, 1.0)
print("control scaling = ", self.control_scaling)
elif key == 45: # -
self.control_scaling -= 0.1
self.control_scaling = max(self.control_scaling, 0.1)
print("control scaling = ", self.control_scaling)
elif key == 9: # TAB
# switch controlled robot
robot.send_cmd() # stop current robot
controlled_robot_number = (controlled_robot_number + 1) % len(robots)
robot = robots[controlled_robot_number]
print(f"controlled robot: {robot.id}")
elif key == 113 or key == 27: # q or ESCAPE
print("quit!")
self.controlling = False
robot.send_cmd()
return
if self.controlling:
# measure state
x_pred = self.get_measurement(robot.id)
# print(np.linalg.norm(x_pred-target_pos))
# solve mpc open loop problem
res = self.ols.solve(x_pred, target_pos)
us1 = res[0] * self.control_scaling
us2 = res[1] * self.control_scaling
dt_mpc = time.time() - self.t
if dt_mpc < self.dt: # wait until next control can be applied
time.sleep(self.dt - dt_mpc)
# send controls to the robot
for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
u1 = us1[i]
u2 = us2[i]
robot.send_cmd(u1, u2)
if i < self.mstep:
time.sleep(self.dt)
self.t = time.time() # save time the most recent control was applied
def get_measurement(self, robot_id):
return self.estimator.get_robot_state_estimate(robot_id)

View File

@ -2,98 +2,48 @@ import sys
import socket import socket
import threading import threading
import time import time
import json
from robot import Robot, ControlledRobot from robot import Robot
from mpc_controller import MPCController from mpc_controller import MPCController
from aruco_estimator import ArucoEstimator
class RemoteController: class RemoteController:
def __init__(self): def __init__(self):
self.robots = [] self.robots = []
#self.robots = [Robot(11, '192.168.1.11')] #, Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')] #self.robots = [Robot(11, '192.168.1.11'), Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
#self.robots = [Robot(12, '10.10.11.91'), Robot(13, '10.10.11.90'), Robot(14, '10.10.11.89')]
#self.robots = [Robot(14, '10.10.11.89')]
self.robots = [ControlledRobot(13, '192.168.1.13'), ControlledRobot(12, '192.168.1.12')]
self.robot_ids = {} self.robot_ids = {}
for r in self.robots: for r in self.robots:
self.robot_ids[r.id] = r self.robot_ids[r.id] = r
self.controlled_robot = self.robots[0].id
# socket for movement commands
self.comm_socket = socket.socket()
self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
for robot in self.robots: for robot in self.robots:
robot.connect() robot.connect()
# thread for handling events received by measurement server (from GUI) self.comm_socket.bind(('', 1337))
self.event_thread = threading.Thread(target=self.handle_events) self.comm_socket.listen(5)
self.event_thread.daemon = True
# connect to socket for events from GUI
try:
HOST, PORT = "localhost", 42424
self.event_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.event_socket.connect((HOST, PORT))
self.event_socket.sendall("events\n".encode())
self.event_socket.settimeout(0.1)
# check if we receive data from the measurement server
response = self.event_socket.recv(1024)
if 'error' not in str(response):
print("... connected! -> start listening for measurements")
self.event_socket.settimeout(None)
# if so we start the measurement thread
self.event_thread.start()
else:
print(f"error: cannot communicate with the measurement server.\n The response was: {response}")
except socket.timeout:
print(f"error: the measurement server did not respond with data.")
#sock.sendall(f"{robot_id}\n".encode()) # request data for robot with given id
# sock.sendall(f"events\n".encode()) # request events
# receiving = True
# while receiving:
# received = str(sock.recv(1024), "utf-8")
# print("Received: {}".format(received))
# receiving = len(received) > 0
self.t = time.time() self.t = time.time()
# start thread for marker position detection # start thread for marker position detection
self.controllers = [] self.estimator = ArucoEstimator(self.robot_ids.keys())
for r in self.robots: self.estimator_thread = threading.Thread(target=self.estimator.run_tracking)
c = MPCController() self.estimator_thread.start()
self.controllers.append(c)
r.attach_controller(c)
def handle_events(self): self.controller = MPCController(self.estimator)
receiving = True
while receiving:
received = str(self.event_socket.recv(1024), "utf-8")
if len(received) > 0:
last_received = received.split('\n')[-2]
event_type, payload = json.loads(last_received)
print(event_type, payload)
if event_type == 'click':
target_pos = (payload['x'], payload['y'], payload['angle'])
self.robot_ids[self.controlled_robot].move_to_pos(target_pos=target_pos)
elif event_type == 'controlled_robot':
self.controlled_robot = payload['robot_id']
else:
receiving = False
print(f"measurement server stopped sending event data")
def run(self): def run(self):
print("waiting until all markers are detected...")
#print("waiting until all markers are detected...") while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
#while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()): pass
# pass
print("starting control") print("starting control")
#self.controller.interactive_control(robots=self.robots) self.controller.interactive_control(robots=self.robots)
for c in self.controllers:
c.start()
pass
def main(args): def main(args):
@ -103,12 +53,3 @@ def main(args):
if __name__ == '__main__': if __name__ == '__main__':
main(sys.argv) main(sys.argv)
# to use the program, first start measurement_server.py, which launches the Aruco marker detection GUI and streams
# marker positions and GUI events via socket
# next, launch this program (mpc_test.py)
# you can now direct the robots via the GUI (assuming four corner markers for position reference have been detected)
# click any position on the screen and the current robot will drive there
# you can also switch which robot should be controlled in the GUI
# TODO document and improve program structure

View File

@ -2,15 +2,15 @@ import numpy as np
import math import math
import time import time
from controller import ControllerBase
class PIDController:
def __init__(self, estimator):
self.t = time.time()
class PIDController(ControllerBase): self.estimator = estimator
def __init__(self): self.controlling = False
super().__init__()
self.t = None
self.P_angle = 0.4 self.P_angle = 0.4 # PID parameters determined by Ziegler-Nichols. measured K_crit = 1.4, T_crit = 1.5
self.I_angle = 0.35 self.I_angle = 0.35
self.D_angle = 0.1 self.D_angle = 0.1
@ -18,103 +18,250 @@ class PIDController(ControllerBase):
self.I_pos = 0.3 self.I_pos = 0.3
self.D_pos = 0.1 self.D_pos = 0.1
self.mode = 'combined' self.mode = None
self.e_angle_old = 0.0 def move_to_pos(self, target_pos, robot, near_target_counter=5):
self.e_pos_old = 0.0 near_target = 0
while near_target < near_target_counter:
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
pass
elif event[0] == 'key':
key = event[1]
self.i = 0.0 if key == 84: # arrow up
self.i_angle = 0.0 self.controlling = True
self.i_pos = 0.0 self.t = time.time()
elif key == 82: # arrow down
self.controlling = False
robot.send_cmd()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0])
elif key == 43: # +
self.control_scaling += 0.1
self.control_scaling = min(self.control_scaling, 1.0)
print("control scaling = ", self.control_scaling)
elif key == 45: # -
self.control_scaling -= 0.1
self.control_scaling = max(self.control_scaling, 0.1)
print("control scaling = ", self.control_scaling)
elif key == 113:
print("quit!")
self.controlling = False
robot.send_cmd()
return
elif key == 27: # escape
print("quit!")
self.controlling = False
robot.send_cmd()
return
def set_target_position(self, target_pos): x_pred = self.get_measurement(robot.id)
super(PIDController, self).set_target_position(target_pos)
self.mode = 'combined'
self.e_angle_old = 0.0
self.e_pos_old = 0.0
self.i = 0.0 if x_pred is not None:
self.i_angle = 0.0 error_pos = np.linalg.norm(x_pred[0:2] - target_pos[0:2])
self.i_pos = 0.0 angles_unwrapped = np.unwrap([x_pred[2], target_pos[2]]) # unwrap angle to avoid jump in data
error_ang = np.abs(angles_unwrapped[0] - angles_unwrapped[1])
# print("error pos = ", error_pos)
# print("error_pos = {}, error_ang = {}".format(error_pos, error_ang))
def compute_control(self, state): # if error_pos > 0.075 or error_ang > 0.35:
# measure state if error_pos > 0.05 or error_ang > 0.1:
x_pred = state[1:] # solve mpc open loop problem
res = self.ols.solve(x_pred, target_pos)
if self.t is None: # us1 = res[0]
dt = 0.1 # us2 = res[1]
else: us1 = res[0] * self.control_scaling
dt = time.time() - self.t # time since last control was applied us2 = res[1] * self.control_scaling
# print("u = {}", (us1, us2))
if self.mode == 'angle': # print("---------------- mpc solution took {} seconds".format(tmpc_end - tmpc_start))
# compute angle such that robot faces to target point dt_mpc = time.time() - self.t
target_angle = self.target_pos[2] if dt_mpc < self.dt: # wait until next control can be applied
# print("sleeping for {} seconds...".format(self.dt - dt_mpc))
angles_unwrapped = np.unwrap([x_pred[2], target_angle]) # unwrap angle to avoid jump in data time.sleep(self.dt - dt_mpc)
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
p = self.P_angle * e_angle
self.i += self.I_angle * dt * (e_angle + self.e_angle_old)/2.0 # trapezoidal rule
d = self.D_angle * (e_angle - self.e_angle_old)/dt
u1 = p + self.i + d
u2 = - u1
self.e_angle_old = e_angle
elif self.mode == 'combined':
# compute angle such that robot faces to target point
v = self.target_pos[0:2] - x_pred[0:2]
target_angle = math.atan2(v[1], v[0])
angles_unwrapped = np.unwrap([x_pred[2], target_angle]) # unwrap angle to avoid jump in data
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
e_pos = np.linalg.norm(v[0:2])
if e_pos < 0.03:
self.mode = 'angle'
self.e_angle_old = 0
self.e_pos_old = 0
self.i_angle = 0
self.i_pos = 0
u1 = 0
u2 = 0
else:
forward = abs(e_angle) < np.pi/2.0
if not forward:
if e_angle > np.pi/2.0:
e_angle -= np.pi
else:
e_angle += np.pi
p_angle = self.P_angle * e_angle
self.i_angle += self.I_angle * dt * (e_angle + self.e_angle_old) / 2.0 # trapezoidal rule
d_angle = self.D_angle * (e_angle - self.e_angle_old) / dt
p_pos = self.P_pos * e_pos
self.i_pos += self.I_pos * dt * (e_pos + self.e_pos_old) / 2.0 # trapezoidal rule
d_pos = self.D_pos * (e_pos - self.e_pos_old) / dt
if forward:
u1 = p_angle + p_pos + self.i_angle + self.i_pos + d_angle + d_pos
u2 = - p_angle + p_pos - self.i_angle + self.i_pos - d_angle + d_pos
else: else:
u1 = p_angle - p_pos + self.i_angle - self.i_pos + d_angle - d_pos us1 = [0] * self.mstep
u2 = - p_angle - p_pos - self.i_angle - self.i_pos - d_angle - d_pos us2 = [0] * self.mstep
near_target += 1
self.e_pos_old = e_pos # send controls to the robot
self.e_angle_old = e_angle for i in range(0, self.mstep): # option to use multistep mpc if len(range) > 1
else: u1 = us1[i]
u1 = 0.0 u2 = us2[i]
u2 = 0.0 robot.send_cmd(u1, u2)
return u1, u2 if i < self.mstep:
time.sleep(self.dt)
self.t = time.time() # save time the most recent control was applied
else:
print("robot not detected yet!")
def stop(self): def interactive_control(self, robots):
super(PIDController, self).stop() controlled_robot_number = 0
self.t = None robot = robots[controlled_robot_number]
def apply_control(self, control): ts = []
self.t = time.time() # save time when the most recent control was applied angles = []
super(PIDController, self).apply_control(control)
target_pos = np.array([0.0, 0.0, 0.0])
e_angle_old = 0.0
e_pos_old = 0.0
i = 0.0
i_angle = 0.0
i_pos = 0.0
t0 = time.time()
running = True
while running:
# handle events from opencv window
while not self.estimator.event_queue.empty():
event = self.estimator.event_queue.get()
print("event: ", event)
if event[0] == 'click':
target_pos = event[1]
i_angle = 0
i_pos = 0
e_pos_old = 0
e_angle_old = 0
self.mode = 'combined'
elif event[0] == 'key':
key = event[1]
if key == 32: # arrow up
self.controlling = not self.controlling
if not self.controlling:
print("disable control")
robot.send_cmd() # stop robot
self.mode = None # reset control mode
else:
print("enable control")
i = 0
self.t = time.time()
elif key == 48: # 0
target_pos = np.array([0.0, 0.0, 0.0]) # TODO: use center of board for target pos
elif key == 97: # a
self.mode = 'angle'
e_angle_old = 0
i = 0
self.t = time.time()
elif key == 99: # c
self.mode = 'combined'
e_angle_old = 0
e_pos_old = 0
i_angle = 0
i_pos = 0
self.t = time.time()
elif key == 112: # p
self.mode = 'position'
e_pos_old = 0
i = 0
self.t = time.time()
elif key == 43: # +
self.P_pos += 0.1
print("P = ", self.P_angle)
elif key == 45: # -
self.P_pos -= 0.1
print("P = ", self.P_angle)
elif key == 9: # TAB
# switch controlled robot
robot.send_cmd() # stop current robot
controlled_robot_number = (controlled_robot_number + 1) % len(robots)
robot = robots[controlled_robot_number]
print(f"controlled robot: {robot.id}")
elif key == 113 or key == 27: # q or ESCAPE
print("quit!")
self.controlling = False
robot.send_cmd()
return
if self.controlling:
# measure state
x_pred = self.get_measurement(robot.id)
dt = self.t - time.time()
#print(f"x_pred = {x_pred}\ntarget = {target_pos}\nerror = {np.linalg.norm(target_pos - x_pred)}\n")
if self.mode == 'angle':
# compute angle such that robot faces to target point
target_angle = target_pos[2]
ts.append(time.time() - t0)
angles.append(x_pred[2])
angles_unwrapped = np.unwrap([x_pred[2], target_angle]) # unwrap angle to avoid jump in data
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
p = self.P_angle * e_angle
# i += self.I * dt * e # right Riemann sum
i += self.I_angle * dt * (e_angle + e_angle_old)/2.0 # trapezoidal rule
d = self.D_angle * (e_angle - e_angle_old)/dt
u1 = p - i - d
u2 = - u1
e_angle_old = e_angle
elif self.mode == 'combined':
# compute angle such that robot faces to target point
v = target_pos[0:2] - x_pred[0:2]
target_angle = math.atan2(v[1], v[0])
angles_unwrapped = np.unwrap([x_pred[2], target_angle]) # unwrap angle to avoid jump in data
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
e_pos = np.linalg.norm(v)
if e_pos < 0.05:
self.mode = 'angle'
e_angle_old = 0
e_pos_old = 0
i_angle = 0
i_pos = 0
u1 = 0
u2 = 0
else:
forward = abs(e_angle) < np.pi/2.0
if not forward:
if e_angle > np.pi/2.0:
e_angle -= np.pi
else:
e_angle += np.pi
p_angle = self.P_angle * e_angle
i_angle += self.I_angle * dt * (e_angle + e_angle_old) / 2.0 # trapezoidal rule
d_angle = self.D_angle * (e_angle - e_angle_old) / dt
p_pos = self.P_pos * e_pos
i_pos += self.I_pos * dt * (e_pos + e_pos_old) / 2.0 # trapezoidal rule
d_pos = self.D_pos * (e_pos - e_pos_old) / dt
if forward:
print("forward")
u1 = p_angle + p_pos - i_angle - i_pos - d_angle - d_pos
u2 = - p_angle + p_pos + i_angle - i_pos + d_angle - d_pos
else:
print("backward")
u1 = p_angle - p_pos - i_angle + i_pos - d_angle + d_pos
u2 = - p_angle - p_pos + i_angle + i_pos + d_angle + d_pos
e_pos_old = e_pos
e_angle_old = e_angle
else:
u1 = 0.0
u2 = 0.0
#print(f"u = ({u1}, {u2})")
robot.send_cmd(u1, u2)
self.t = time.time() # save time when the most recent control was applied
time.sleep(0.1)
def get_measurement(self, robot_id):
return self.estimator.get_robot_state_estimate(robot_id)

View File

@ -1,21 +1,12 @@
import socket import socket
import threading
import json
import numpy as np
import time
import math
class Robot: class Robot:
def __init__(self, marker_id, ip, measurement_server=('127.0.0.1', 42424)): def __init__(self, marker_id, ip):
self.id = marker_id self.id = marker_id
self.pos = None
self.t_last_measurement = None self.euler = None
self.x = None
self.y = None
self.angle = None
self.ip = ip self.ip = ip
self.port = 1234
self.socket = socket.socket() self.socket = socket.socket()
# currently active control # currently active control
@ -24,47 +15,18 @@ class Robot:
self.connected = False self.connected = False
self.measurement_server = measurement_server
self.measurement_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP socket
self.measurement_thread = threading.Thread(target=self.receive_measurements)
# mark thread as daemon -> it terminates automatically when program shuts down
self.measurement_thread.daemon = True
self.receiving = False
def connect(self): def connect(self):
# connect to robot # connect to robot
try: try:
print(f"connecting to robot {self.ip} at {self.ip}:{self.port} ...") print("connecting to robot {} with ip {} ...".format(self.id, self.ip))
self.socket.connect((self.ip, self.port)) # connect to robot self.socket.connect((self.ip, 1234)) # connect to robot
print("connected!") print("connected!")
self.connected = True self.connected = True
except socket.error: except socket.error:
print(f"error: could not connect to robot {self.id} at {self.ip}:{self.port}") print("could not connect to robot {} with ip {}".format(self.id, self.ip))
# connect to measurement server
print(f"connecting to measurement server on {self.measurement_server} ...")
try:
self.measurement_socket.connect(self.measurement_server)
self.measurement_socket.sendall(f"{self.id}\n".encode())
self.measurement_socket.settimeout(0.1)
# check if we receive data from the measurement server
response = self.measurement_socket.recv(1024)
if 'error' not in str(response):
print("... connected! -> start listening for measurements")
self.measurement_socket.settimeout(None)
# if so we start the measurement thread
self.measurement_thread.start()
else:
print(f"error: cannot communicate with the measurement server.\n The response was: {response}")
except socket.timeout:
print(f"error: the measurement server did not respond with data.")
except ConnectionRefusedError:
print(f"error: could not connect to measurement server at {self.measurement_server}.")
def send_cmd(self, u1=0.0, u2=0.0): def send_cmd(self, u1=0.0, u2=0.0):
if self.socket and self.connected: if self.socket:
try: try:
self.socket.send(f'({u1},{u2})\n'.encode()) self.socket.send(f'({u1},{u2})\n'.encode())
except BrokenPipeError: except BrokenPipeError:
@ -73,81 +35,3 @@ class Robot:
except ConnectionResetError: except ConnectionResetError:
print(f"error: connection to robot {self.id} with ip {self.ip} lost") print(f"error: connection to robot {self.id} with ip {self.ip} lost")
pass pass
else:
print(f"error: robot {self.id} is not connected to {self.ip}")
def receive_measurements(self):
self.receiving = True
while self.receiving:
received = str(self.measurement_socket.recv(1024), "utf-8")
if len(received) > 0:
last_received = received.split('\n')[-2]
measurement = json.loads(last_received)
self.t_last_measurement = measurement['t']
self.x = measurement['x']
self.y = measurement['y']
self.angle = measurement['angle']
else:
self.receiving = False
print(f"measurement server stopped sending data for robot {self.id}")
def get_measurement(self):
return self.t_last_measurement, self.x, self.y, self.angle
def __str__(self):
connection_state = '' if self.connected else ' not'
state = self.get_measurement()
last_measurement = f'last measurement = {state}' if None not in state else 'no measurements available'
return f"Robot {self.id}: ip = {self.ip}:{self.port} ({connection_state} connected ) " + last_measurement
def __repr__(self):
return str(self)
class ControlledRobot(Robot):
def __init__(self, marker_id, ip):
super().__init__(marker_id, ip)
self.controller = None
def start_control(self):
if self.controller is not None:
print("starting control")
self.controller.start()
else:
raise Exception("Error: Cannot start control: there is not controller attached to the robot!")
def stop_control(self):
if self.controller is not None:
print("stopping control")
self.controller.stop()
else:
raise Exception("Error: Cannot stop control: there is not controller attached to the robot!")
def attach_controller(self, controller):
self.controller = controller
self.controller.attach_robot(self)
def move_to_pos(self, target_pos, blocking=False):
if self.controller is not None:
if not blocking:
self.controller.set_target_position(target_pos)
else: # only return after the robot has reached the target
self.stop_control()
self.controller.set_target_position(target_pos)
self.start_control()
close_to_target = False
while not close_to_target:
current_pos = np.array([self.x, self.y, self.angle])
v = target_pos - current_pos
angles_unwrapped = np.unwrap([current_pos[2], target_pos[2]]) # unwrap angle to avoid jump in data
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
e_pos = np.linalg.norm(v[0:2])
#print(f"target_pos = {target_pos}, current_pos = {current_pos}, e_pos = {e_pos}, e_ang = {e_angle}")
print(f"e_pos = {e_pos}, e_ang = {e_angle}")
close_to_target = e_pos < 0.075 and abs(e_angle) < 0.1
time.sleep(0.1)
print("target reached!")
self.stop_control()
else:
raise Exception("Error: Cannot move to position: there is not controller attached to the robot!")

View File

@ -0,0 +1,217 @@
import socket
import pygame
import json
from math import sin,cos,atan2,pi
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument('bot', metavar='bot', type=str, help='ip address of the controlled robot')
parser.add_argument('id', metavar='id', type=str, help='id of the controlled robot')
parser.add_argument('meas', metavar='meas', type=str, help='ip address of the measurement server')
args = parser.parse_args()
bot = args.bot
meas = args.meas
w = 640
h = 480
pygame.init()
surf = pygame.display.set_mode((640, 480))
bg = pygame.Color(0,0,0)
botcol = pygame.Color(255, 0, 0)
setcol = pygame.Color(0, 255, 0)
meas_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
meas_socket.connect((meas, 42424)) # connect to robot
except socket.error:
print("could not connect to measurement socket")
rc_socket = socket.socket()
try:
rc_socket.connect((bot, 1234)) # connect to robot
except socket.error:
print("could not connect to bot socket")
meas_socket.sendall(f"{args.id}\n".encode())
class Bot(object):
FAR, CLOSE, ANGLE, REACHED = range(4)
def __init__(self, control):
self.sock = control
self.state = Bot.REACHED
self.tx = self.ty = None
self.ta = None
def move(self, x, y, angle = None):
self.tx = x
self.ty = y
self.ta = angle
self.pps = True
self.pas = True
print()
print(f"GOING TO {x},{y}")
print()
self.state = Bot.FAR
def rotate(self, angle):
self.ta = angle
self.state = Bot.ANGLE
def stop(self):
self.sock.send(f'(0,0)\n'.encode())
def step(self, x, y, angle):
umax = 1.0
if self.tx != None:
dp = ((self.tx - x)**2 + (self.ty - y)**2)**0.5
da = atan2(self.ty - y, self.tx - x) - angle
# Position state: Determine if the angle to the target flips over +-pi/2, i.e. going past the target now
def pst(angle):
while angle > pi:
angle -= 2*pi
while angle <-pi:
angle += 2*pi
return abs(angle) > pi/2
# Angle state: When 'close' to target angle (absolute value <= pi/2) use the sign, else 0. When the angle flips over 0, the difference is +- 2
def ast(angle):
while angle > pi:
angle -= 2*pi
while angle <-pi:
angle += 2*pi
return 0 if abs(angle) >= pi/2 else 1 if angle >= 0 else -1
# Move with speed and change towards angle
def control(speed, angle, backwards = True):
ca = cos(angle)
speed = speed * cos(angle)
# Move angle to [0..2*pi]
while angle < 0:
angle += 2*pi
while angle > 2*pi:
angle -= 2*pi
if backwards:
# If we can go backwards, angle should be pi or 0
if angle >= pi/2 and angle < 3*pi/2:
# pi/2 .. 3/2 pi should move towards pi
angle -= pi
print(f"Go for pi, {angle}")
else:
# Everything else should be 0. Make sure angles in 3/2 pi .. 2 pi don't lead to huge outputs.
if angle >= 3*pi/2:
angle -= 2*pi
print(f"Go for 0, {angle}")
else:
# Have to go forward, just try to get angle to 0
# Correct angle to be [-pi..pi]
while angle > pi:
angle -= 2*pi
print(f"Always go for 0, {angle}")
ul, ur = speed, speed
if speed == 0 and abs(angle) < 0.5:
angle = 0.5 if angle > 0 else -0.5 if angle < 0 else 0
ul -= angle * 0.5
ur += angle * 0.5
vd = max(1, abs(ul)/umax, abs(ur)/umax)
ul /= vd
ur /= vd
print(f"Movement: {ul},{ur}")
self.sock.send(f'({ul},{ur})\n'.encode())
self.pps = pst(angle)
self.pas = ast(angle)
if self.state == Bot.FAR:
if dp <= 0.3:
# Close to the target
self.state = Bot.CLOSE
self.pps = pst(da)
else:
# When far, just move towards the target
print(f"FAR {umax}, {da}")
control(umax, da)
if self.state == Bot.CLOSE:
if pst(da) != self.pps and dp <= 0.1:
# The angle flips over +- pi/2, go for the angle setpoint
self.state = Bot.ANGLE
self.pas = 0
else:
print(f"CLOSE {umax*dp/0.3} {da} {pst(da)} {self.pps}")
control(0.3+(umax*dp/0.3), da)
if self.state == Bot.ANGLE:
if self.ta == None:
# No angle setpoint, we're done
self.state = Bot.REACHED
bot.stop()
else:
# Use angle setpoint for angle difference
da = self.ta - angle
# Difference between angle states:
# +- 1 means flipping between 'closer than pi/2' and 'further than pi/2'
# +- 2 means abs(angle) < pi/2 and the sign changes -> In position!
if abs(ast(da) - self.pas) == 2:
# Done
self.state = Bot.REACHED
bot.stop()
else:
# Don't move, just rotate
print(f"ANGL {0} {da} {ast(da)} {self.pas}")
control(0, da, False)
meas_file = meas_socket.makefile('rw', encoding='utf8', newline='\n')
bot = Bot(rc_socket)
bot.move(0, 0, 0)
running = True
ml = 0.5
scale = 220 / 0.5
while running:
received = json.loads(meas_file.readline().strip())
received = json.loads(meas_file.readline().strip())
# Extract position, angle
x = received['x']
y = received['y']
a = received['angle']
events = pygame.event.get()
for event in events:
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_RIGHT:
bot.move(ml, 0, pi)
if event.key == pygame.K_LEFT:
bot.move(-ml, 0, 0)
if event.key == pygame.K_UP:
bot.move(0, ml, pi*3/2)
if event.key == pygame.K_DOWN:
bot.move(0, -ml, pi/2)
if event.key == pygame.K_ESCAPE:
bot.stop()
running = False
elif event.type == pygame.MOUSEBUTTONDOWN:
x,y = pygame.mouse.get_pos()
bot.move((x-w/2)/scale, (y-h/2)/scale)
surf.fill(bg)
pygame.draw.circle(surf, botcol, (w/2 + scale * x, h/2 + scale * y), 10, 2)
pygame.draw.line(surf, botcol, (w/2 + scale * x, h/2 + scale * y), (w/2 + scale * x + 15 * cos(a), h/2 + scale * y + 15 * sin(a)))
if bot.tx != None and bot.ty != None:
pygame.draw.circle(surf, setcol, (w/2 + scale * bot.tx, h/2 + scale * bot.ty), 10, 2)
if bot.ta != None:
pygame.draw.line(surf, setcol,
(w/2 + scale * bot.tx, h/2 + scale * bot.ty),
(w/2 + scale * bot.tx + 15 * cos(bot.ta), h/2 + scale * bot.ty + 15 * sin(bot.ta)))
pygame.display.flip()
if running:
bot.step(x, y, a)

View File

@ -1,85 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Mensa-Abend Telepräsenz</title>
<style>
div.meerschweinchen {
max-width: 600px;
margin: auto;
text-align: center;
}
div.arena {
height: 400px;
width: 600px;
background-image: url("https://p.0au.de/3d62ea7/foo.png");
border: none;
text-align: center;
position: relative
}
div.arena p {
margin: 0;
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%)
}
</style>
<script type=text/javascript src="{{url_for('static', filename='jquery-3.5.1.min.js') }}"></script>
</head>
<body>
<h1 style="text-align: center">Welcome to Mensa-Abend digital</h1>
<div style="text-align: center">
<p>Click anywhere to control the robots in our hackerspace. You can see them in the Zoom video.<br>
<br>
Please let us know about your study background.
</p>
</div>
<div class="meerschweinchen">
<div class=arena style="font-size: large;">
</div>
<form method="post">
<p>Choose your robot:</p>
<div>
<input type="radio" id="r11" name="robot_id" value="11">
<label for="r11">Robot 11</label>
</div>
<div>
<input type="radio" id="r12" name="robot_id" value="12">
<label for="r12">Robot 12</label>
</div>
<div>
<input type="radio" id="r13" name="robot_id" value="13">
<label for="r13">Robot 13</label>
</div>
<div>
<input type="radio" id="r14" name="robot_id" value="14">
<label for="r14">Robot 14</label>
</div>
</form>
</div>
</body>
<script>
function fireThis(e) {
var parentOffset = $(this).parent().offset();
var x = e.pageX - parentOffset.left;
var y = e.pageY - parentOffset.top;
var robot = document.querySelector('input[name = "robot_id"]:checked').value;
$.post( "/", { 'x': x, 'y': y , 'robot': robot} );
}
$('.arena').on('mousedown', fireThis);
function callRandom() {
var array = document.getElementsByName('robot_id');
array[{{ oldest_control }}].checked = true;
}
callRandom();
</script>
</html>