Compare commits
10 Commits
019c4590aa
...
ca8e5aac9b
Author | SHA1 | Date | |
---|---|---|---|
ca8e5aac9b | |||
6aef0660a3 | |||
885a516fe3 | |||
ca1fe99ff2 | |||
a69d45f97e | |||
eea1cee1ec | |||
1c8d4a1d57 | |||
029068b259 | |||
7962199c3a | |||
f3babdcf0a |
27
gauss-turing/game/event_server_comm.py
Normal file
27
gauss-turing/game/event_server_comm.py
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
import socket
|
||||||
|
import json
|
||||||
|
|
||||||
|
HOST, PORT = "localhost", 42424
|
||||||
|
|
||||||
|
def move_grid(x, y, orientation, dimx, dimy):
|
||||||
|
print("move grid")
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
sock.connect((HOST, PORT))
|
||||||
|
payload = json.dumps({"x": x, "y": y, "dimx": dimx, "dimy": dimy, "orientation": orientation})
|
||||||
|
sock.sendall(f"move_grid_blocking;{payload}\n".encode())
|
||||||
|
target_reached = False
|
||||||
|
try:
|
||||||
|
while not target_reached:
|
||||||
|
reply = sock.recv(1024)
|
||||||
|
if reply == b'':
|
||||||
|
raise RuntimeError
|
||||||
|
#print("got reply: ", reply)
|
||||||
|
target_reached = reply.decode() == 'ack\n'
|
||||||
|
print("target_reached = ", target_reached)
|
||||||
|
except RuntimeError:
|
||||||
|
# try again
|
||||||
|
print("controlling failed -> trying again")
|
||||||
|
move_grid(x, y, orientation, dimx, dimy)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
move_grid(3,2,'v', 7, 4)
|
499
gauss-turing/game/gauss_turing.py
Normal file
499
gauss-turing/game/gauss_turing.py
Normal file
|
@ -0,0 +1,499 @@
|
||||||
|
import numpy as np
|
||||||
|
import random
|
||||||
|
import pygame
|
||||||
|
import os
|
||||||
|
|
||||||
|
from event_server_comm import move_grid
|
||||||
|
|
||||||
|
BLACK = np.array([0, 0, 0], dtype=np.uint8)
|
||||||
|
WHITE = np.array([255, 255, 255], dtype=np.uint8)
|
||||||
|
RED = np.array([255, 0, 0], dtype=np.uint8)
|
||||||
|
BLUE = np.array([0, 0, 255], dtype=np.uint8)
|
||||||
|
YELLOW = np.array([255, 255, 0], dtype=np.uint8)
|
||||||
|
GREEN = np.array([0, 255, 0], dtype=np.uint8)
|
||||||
|
|
||||||
|
pygame.init()
|
||||||
|
|
||||||
|
pygame.font.init() # you have to call this at the start,
|
||||||
|
# if you want to use this module.
|
||||||
|
myfont = pygame.font.SysFont('Comic Sans MS', 55)
|
||||||
|
myfont_small = pygame.font.SysFont('Comic Sans MS', 45)
|
||||||
|
|
||||||
|
P0_text = myfont.render('P0', False, tuple(BLACK))
|
||||||
|
|
||||||
|
tiledt = np.dtype([('x', np.uint8), ('y', np.uint8), ('color', np.uint8, 3), ('star', np.bool)])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class Board:
|
||||||
|
valid_colors = [WHITE, RED, BLUE]
|
||||||
|
|
||||||
|
def __init__(self, dim_x, dim_y):
|
||||||
|
self.tiles = np.zeros((dim_y, dim_x), dtype=tiledt)
|
||||||
|
for x in range(dim_x):
|
||||||
|
for y in range(dim_y):
|
||||||
|
self.tiles[y, x]['color'] = random.choice(Board.valid_colors)
|
||||||
|
|
||||||
|
def render(self, scale_fac):
|
||||||
|
dimy, dimx = self.tiles.shape
|
||||||
|
board_surf = pygame.Surface((dimx * scale_fac, dimy * scale_fac))
|
||||||
|
|
||||||
|
star_surf = pygame.Surface((scale_fac, scale_fac), pygame.SRCALPHA)
|
||||||
|
pygame.draw.circle(star_surf, YELLOW, (int(0.5 * scale_fac), int(0.5 * scale_fac)), int(0.25 * scale_fac))
|
||||||
|
|
||||||
|
for y in range(self.tiles.shape[0]):
|
||||||
|
for x in range(self.tiles.shape[1]):
|
||||||
|
pygame.draw.rect(board_surf, tuple(self.tiles[y, x]['color']),
|
||||||
|
(x * scale_fac, y * scale_fac, scale_fac, scale_fac), 0)
|
||||||
|
pygame.draw.rect(board_surf, (0, 0, 0),
|
||||||
|
(x * scale_fac, y * scale_fac, scale_fac, scale_fac), 1)
|
||||||
|
if self.tiles[y, x]['star']:
|
||||||
|
board_surf.blit(star_surf, (x * scale_fac, y * scale_fac, scale_fac, scale_fac))
|
||||||
|
|
||||||
|
return board_surf
|
||||||
|
|
||||||
|
def get_xdims(self):
|
||||||
|
return self.tiles.shape[1]
|
||||||
|
|
||||||
|
def get_ydims(self):
|
||||||
|
return self.tiles.shape[0]
|
||||||
|
|
||||||
|
# def __repr__(self):
|
||||||
|
# s = ''
|
||||||
|
# for y in range(self.tiles.shape[0]):
|
||||||
|
# for x in range(self.tiles.shape[1]):
|
||||||
|
# if (x,y) == (self.robot.x, self.robot.y):
|
||||||
|
# s += self.robot.orientation
|
||||||
|
# else:
|
||||||
|
# s += '.'
|
||||||
|
# s += '\n'
|
||||||
|
# return s
|
||||||
|
|
||||||
|
|
||||||
|
class Robot:
|
||||||
|
orientations = ['^', 'left', 'down', 'right']
|
||||||
|
resulting_orientation = {
|
||||||
|
'^': {'left': '<', 'right': '>'},
|
||||||
|
'>': {'left': '^', 'right': 'v'},
|
||||||
|
'v': {'left': '>', 'right': '<'},
|
||||||
|
'<': {'left': 'v', 'right': '^'},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def __init__(self, x, y, orientation):
|
||||||
|
self.x = x
|
||||||
|
self.y = y
|
||||||
|
self.orientation = orientation
|
||||||
|
self.position_changed = False
|
||||||
|
|
||||||
|
def get_forward_coordinates(self):
|
||||||
|
# get the coordinates of the neighboring tile in the given direction
|
||||||
|
if self.orientation == '^':
|
||||||
|
return self.y - 1, self.x
|
||||||
|
elif self.orientation == '>':
|
||||||
|
return self.y, self.x + 1
|
||||||
|
elif self.orientation == 'v':
|
||||||
|
return self.y + 1, self.x
|
||||||
|
elif self.orientation == '<':
|
||||||
|
return self.y, self.x - 1
|
||||||
|
else:
|
||||||
|
raise Exception("error: undefined direction")
|
||||||
|
|
||||||
|
def get_angle(self):
|
||||||
|
angle = {'>': 0, '^': np.pi/2, '<': np.pi, 'v': 3*np.pi/2}[self.orientation]
|
||||||
|
return np.rad2deg(angle)
|
||||||
|
|
||||||
|
def render(self, scale_fac):
|
||||||
|
robot_surf = pygame.Surface((scale_fac, scale_fac), pygame.SRCALPHA)
|
||||||
|
pygame.draw.lines(robot_surf, (0, 0, 0), True, [(0.75 * scale_fac, 0.5 * scale_fac),
|
||||||
|
(0.25 * scale_fac, 0.25 * scale_fac),
|
||||||
|
(0.25 * scale_fac, 0.75 * scale_fac)], 3)
|
||||||
|
robot_surf = pygame.transform.rotate(robot_surf, self.get_angle())
|
||||||
|
return robot_surf
|
||||||
|
|
||||||
|
def update_pos(self, dimx, dimy):
|
||||||
|
move_grid(self.x, self.y, self.orientation, dimx, dimy)
|
||||||
|
self.position_changed = False
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"({self.y}, {self.x}) - {self.orientation}"
|
||||||
|
|
||||||
|
|
||||||
|
class Command:
|
||||||
|
valid_actions = {'forward', 'left', 'right', 'P0', '-'}
|
||||||
|
|
||||||
|
def __init__(self, action=None, color=WHITE):
|
||||||
|
if not (action in Command.valid_actions and any([np.all(color == c) for c in Board.valid_colors])):
|
||||||
|
raise ValueError("invalid values for command")
|
||||||
|
self.action = action
|
||||||
|
self.color = color
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"{self.action}: {self.color}"
|
||||||
|
|
||||||
|
def render(self, scale_fac):
|
||||||
|
cmd_surf = pygame.Surface((scale_fac, scale_fac))
|
||||||
|
cmd_surf.fill(tuple(self.color))
|
||||||
|
|
||||||
|
arrow_surf = pygame.Surface((300, 300), pygame.SRCALPHA)
|
||||||
|
|
||||||
|
pygame.draw.polygon(arrow_surf, (0, 0, 0),
|
||||||
|
((0, 100), (0, 200), (200, 200), (200, 300), (300, 150), (200, 0), (200, 100)))
|
||||||
|
arrow_surf = pygame.transform.scale(arrow_surf, (int(0.9*scale_fac), int(0.9*scale_fac)))
|
||||||
|
|
||||||
|
if self.action == 'forward':
|
||||||
|
arrow_surf = pygame.transform.rotate(arrow_surf, 90)
|
||||||
|
elif self.action == 'left':
|
||||||
|
arrow_surf = pygame.transform.rotate(arrow_surf, 180)
|
||||||
|
|
||||||
|
if self.action in {'left', 'forward', 'right'}:
|
||||||
|
cmd_surf.blit(arrow_surf, (0.05*scale_fac,0.05*scale_fac,0.95*scale_fac,0.95*scale_fac))
|
||||||
|
elif self.action == 'P0':
|
||||||
|
cmd_surf.blit(P0_text, (0.05*scale_fac,0.05*scale_fac,0.95*scale_fac,0.95*scale_fac))
|
||||||
|
|
||||||
|
return cmd_surf
|
||||||
|
|
||||||
|
class Programmer:
|
||||||
|
def __init__(self, prg):
|
||||||
|
self.prg = prg
|
||||||
|
self.available_inputs = [Command('forward'), Command('left'), Command('right'), Command('P0'),
|
||||||
|
Command('-', color=RED), Command('-', color=BLUE), Command('-', color=WHITE)]
|
||||||
|
self.command_to_edit = 0
|
||||||
|
self.screen_rect = None
|
||||||
|
|
||||||
|
def render(self, scale_fac):
|
||||||
|
"""Render surface with possible inputs for the robot.
|
||||||
|
|
||||||
|
:return: surface of the input commands
|
||||||
|
"""
|
||||||
|
inp_surf = pygame.Surface((len(self.available_inputs) * scale_fac, 1 * scale_fac))
|
||||||
|
for i, inp in enumerate(self.available_inputs):
|
||||||
|
cmd_surf = inp.render(scale_fac)
|
||||||
|
inp_surf.blit(cmd_surf, (i * scale_fac, 0, scale_fac, scale_fac))
|
||||||
|
return inp_surf
|
||||||
|
|
||||||
|
def update_selected_command(self, pos):
|
||||||
|
print(f"clicked at pos = {pos}")
|
||||||
|
xoffset = pos[0] - self.screen_rect.x
|
||||||
|
selected_input_index = xoffset * len(self.available_inputs) // self.screen_rect.width
|
||||||
|
selected_input = self.available_inputs[selected_input_index]
|
||||||
|
|
||||||
|
edited_command = self.prg.cmds[self.command_to_edit]
|
||||||
|
|
||||||
|
print("command before edit = ", edited_command)
|
||||||
|
if selected_input.action != '-':
|
||||||
|
edited_command.action = selected_input.action
|
||||||
|
else:
|
||||||
|
edited_command.color = selected_input.color
|
||||||
|
print("command after edit = ", edited_command)
|
||||||
|
|
||||||
|
class Program:
|
||||||
|
def __init__(self, robot, board, cmds):
|
||||||
|
self.cmds = cmds
|
||||||
|
self.robot = robot
|
||||||
|
self.board = board
|
||||||
|
self.prg_counter = 0
|
||||||
|
self.screen_rect = None
|
||||||
|
|
||||||
|
def step(self, state='running'):
|
||||||
|
cmd = self.cmds[self.prg_counter]
|
||||||
|
self.prg_counter += 1
|
||||||
|
|
||||||
|
# current position
|
||||||
|
x = self.robot.x
|
||||||
|
y = self.robot.y
|
||||||
|
|
||||||
|
# current tile the robot is on
|
||||||
|
tile = self.board.tiles[y, x]
|
||||||
|
|
||||||
|
# apply next instruction of the program
|
||||||
|
if np.all(cmd.color == WHITE) or np.all(cmd.color == tile['color']):
|
||||||
|
# matching color -> execute command
|
||||||
|
if cmd.action == 'forward':
|
||||||
|
ynew, xnew = self.robot.get_forward_coordinates()
|
||||||
|
self.robot.x = xnew
|
||||||
|
self.robot.y = ynew
|
||||||
|
self.robot.position_changed = True
|
||||||
|
#self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
|
||||||
|
elif cmd.action in {'left', 'right'}:
|
||||||
|
self.robot.orientation = Robot.resulting_orientation[self.robot.orientation][cmd.action]
|
||||||
|
#self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
|
||||||
|
self.robot.position_changed = True
|
||||||
|
elif cmd.action == 'P0':
|
||||||
|
self.prg_counter = 0
|
||||||
|
else:
|
||||||
|
print("color not matching -> skipping command")
|
||||||
|
|
||||||
|
# update state for new robot position
|
||||||
|
if (not (0 <= self.robot.x < self.board.tiles.shape[1])) or not (0 <= self.robot.y < self.board.tiles.shape[0]):
|
||||||
|
# robot leaves the board -> GAME OVER
|
||||||
|
print("GAME OVER")
|
||||||
|
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
|
||||||
|
return 'game_over'
|
||||||
|
|
||||||
|
# robot collects star on new tile
|
||||||
|
tile = self.board.tiles[self.robot.y, self.robot.x]
|
||||||
|
if tile['star']:
|
||||||
|
tile['star'] = False
|
||||||
|
|
||||||
|
if all([not t['star'] for t in self.board.tiles.flatten()]):
|
||||||
|
print("YOU WON")
|
||||||
|
return 'won'
|
||||||
|
|
||||||
|
# by default we continue in the running state
|
||||||
|
return state
|
||||||
|
|
||||||
|
def render(self, scale_fac, prg_counter_override=None):
|
||||||
|
"""Render the current program. This will render all commands and highlight the next command to execute
|
||||||
|
(determined by self.prg_counter).
|
||||||
|
|
||||||
|
The prg_counter_override can be used to highlight a different command instead. This is used during input mode
|
||||||
|
to highlight the command the user will edit.
|
||||||
|
|
||||||
|
:param scale_fac:
|
||||||
|
:param prg_counter_override:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
prg_counter = self.prg_counter
|
||||||
|
if prg_counter_override is not None:
|
||||||
|
prg_counter = prg_counter_override
|
||||||
|
prg_surf = pygame.Surface((5 * scale_fac, 1 * scale_fac))
|
||||||
|
for i in range(5):
|
||||||
|
if i < len(self.cmds):
|
||||||
|
cmd = self.cmds[i]
|
||||||
|
cmd_surf = cmd.render(scale_fac)
|
||||||
|
else:
|
||||||
|
cmd_surf = pygame.Surface((scale_fac,scale_fac))
|
||||||
|
cmd_surf.fill(WHITE)
|
||||||
|
if prg_counter is not None and i == prg_counter:
|
||||||
|
pygame.draw.rect(cmd_surf, tuple(GREEN), (0, 0, scale_fac, scale_fac), 5)
|
||||||
|
prg_surf.blit(cmd_surf, (i * scale_fac, 0, scale_fac, scale_fac))
|
||||||
|
|
||||||
|
return prg_surf
|
||||||
|
|
||||||
|
def get_clicked_command(self, pos):
|
||||||
|
print(f"clicked at pos = {pos}")
|
||||||
|
xoffset = pos[0] - self.screen_rect.x
|
||||||
|
clicked_cmd = xoffset * len(self.cmds) // self.screen_rect.width
|
||||||
|
print("clicked command = ", clicked_cmd)
|
||||||
|
return clicked_cmd
|
||||||
|
|
||||||
|
|
||||||
|
class Game:
|
||||||
|
def __init__(self, dimx, dimy, robotx, roboty):
|
||||||
|
self.robot = Robot(x=robotx, y=roboty, orientation='v')
|
||||||
|
self.board = Board(dimx, dimy)
|
||||||
|
coin1x = np.random.randint(0, dimx)
|
||||||
|
coin1y = np.random.randint(0, dimy)
|
||||||
|
self.board.tiles[coin1y,coin1x]['star'] = True
|
||||||
|
self.board.tiles[3,2]['star'] = True
|
||||||
|
|
||||||
|
# TODO fix number of commands at 5
|
||||||
|
self.cmds = [Command('forward'), Command('left', color=RED), Command('left', color=BLUE), Command('P0'), Command('-')]
|
||||||
|
self.state = 'won'
|
||||||
|
|
||||||
|
self.prg = Program(self.robot, self.board, self.cmds)
|
||||||
|
|
||||||
|
self.programmer = Programmer(self.prg)
|
||||||
|
|
||||||
|
#self.scale_fac = 180
|
||||||
|
self.scale_fac = 125
|
||||||
|
self.beamer_mode = False
|
||||||
|
self.screen = pygame.display.set_mode((int(self.board.tiles.shape[1] * self.scale_fac * 1.1),
|
||||||
|
int((self.board.tiles.shape[0] + 2) * self.scale_fac * 1.2)))
|
||||||
|
|
||||||
|
self.game_over_text = myfont.render('GAME OVER', False, BLACK)
|
||||||
|
self.won_text = myfont.render('YOU WON', False, BLACK)
|
||||||
|
self.run_text = myfont.render('RUN', False, tuple(BLACK))
|
||||||
|
self.stop_text = myfont_small.render('STOP', False, tuple(BLACK))
|
||||||
|
self.step_text = myfont_small.render('STEP', False, tuple(BLACK))
|
||||||
|
|
||||||
|
# save initial state
|
||||||
|
self.initial_pos = (self.robot.x, self.robot.y, self.robot.orientation)
|
||||||
|
self.inital_board_tiles = self.board.tiles.copy()
|
||||||
|
|
||||||
|
def render(self):
|
||||||
|
"""Render the game screen.
|
||||||
|
|
||||||
|
This will render the board and the robot.
|
||||||
|
Depending on the display mode it will also render the current program and the input commands for the robot.
|
||||||
|
|
||||||
|
:param prg_counter:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
if self.beamer_mode:
|
||||||
|
dx = 0
|
||||||
|
dy = 0
|
||||||
|
else:
|
||||||
|
dx = int(0.05 * self.screen.get_width())
|
||||||
|
dy = int(0.05 * self.screen.get_height())
|
||||||
|
self.screen.fill(tuple(BLACK))
|
||||||
|
|
||||||
|
# render the board
|
||||||
|
board_surf = self.board.render(self.scale_fac)
|
||||||
|
|
||||||
|
# render robot onto the board surface
|
||||||
|
robot_surf = self.robot.render(self.scale_fac)
|
||||||
|
board_surf.blit(robot_surf, (self.robot.x * self.scale_fac, self.robot.y * self.scale_fac, self.scale_fac, self.scale_fac))
|
||||||
|
self.screen.blit(board_surf, (dx, dy, dx + self.board.tiles.shape[1] * self.scale_fac, dy + self.board.tiles.shape[0] * self.scale_fac))
|
||||||
|
|
||||||
|
if not self.beamer_mode:
|
||||||
|
# if we are not in beamer mode we also show the current program / inputs
|
||||||
|
|
||||||
|
# render program
|
||||||
|
if self.state == 'input':
|
||||||
|
# in input mode we highlight the command which is selected for edit
|
||||||
|
prg_surf = self.prg.render(self.scale_fac, prg_counter_override=self.programmer.command_to_edit)
|
||||||
|
else:
|
||||||
|
# in other modes we render the current program counter
|
||||||
|
prg_surf = self.prg.render(self.scale_fac)
|
||||||
|
prg_surf = pygame.transform.scale(prg_surf, (self.screen.get_width() * 2 // 3, self.scale_fac * 2 // 3))
|
||||||
|
self.prg.screen_rect = pygame.Rect((dx, board_surf.get_height() + 2 * dy, prg_surf.get_width(), prg_surf.get_height()))
|
||||||
|
self.screen.blit(prg_surf, self.prg.screen_rect)
|
||||||
|
|
||||||
|
# render input fields and buttons
|
||||||
|
inp_surf = self.programmer.render(self.scale_fac)
|
||||||
|
inp_surf = pygame.transform.scale(inp_surf, (self.screen.get_width() * 2 // 3, self.scale_fac * 2 // 3))
|
||||||
|
self.programmer.screen_rect = pygame.Rect((dx, board_surf.get_height() + prg_surf.get_height() + 3 * dy, inp_surf.get_width(), inp_surf.get_height()))
|
||||||
|
self.screen.blit(inp_surf, self.programmer.screen_rect)
|
||||||
|
|
||||||
|
btn_surf = pygame.Surface((3 * self.scale_fac//2, self.scale_fac))
|
||||||
|
self.btn_rect = pygame.Rect((2 * dx + prg_surf.get_width(), board_surf.get_height() + 2 * dy,
|
||||||
|
btn_surf.get_height(), btn_surf.get_width()))
|
||||||
|
if self.state == 'input':
|
||||||
|
btn_surf.fill(tuple(GREEN))
|
||||||
|
btn_surf.blit(self.run_text, (0, 10))
|
||||||
|
elif self.state == 'running':
|
||||||
|
btn_surf.fill(tuple(RED))
|
||||||
|
btn_surf.blit(self.stop_text, (0, 10))
|
||||||
|
elif self.state == 'stepping':
|
||||||
|
btn_surf.fill(tuple(YELLOW))
|
||||||
|
btn_surf.blit(self.step_text, (0, 10))
|
||||||
|
self.screen.blit(btn_surf, self.btn_rect)
|
||||||
|
|
||||||
|
# render messages
|
||||||
|
if self.state == 'game_over':
|
||||||
|
game_over_surf = pygame.Surface((self.screen.get_width() // 2, self.screen.get_height() // 2))
|
||||||
|
game_over_surf.fill(tuple(GREEN))
|
||||||
|
game_over_surf.blit(self.game_over_text, ((game_over_surf.get_width() - self.game_over_text.get_width()) // 2,
|
||||||
|
(game_over_surf.get_height() - self.game_over_text.get_height()) // 2))
|
||||||
|
self.screen.blit(game_over_surf, (self.screen.get_width() // 4, self.screen.get_height() // 4))
|
||||||
|
pygame.display.update()
|
||||||
|
pygame.time.wait(1500)
|
||||||
|
self.state = 'reset'
|
||||||
|
elif self.state == 'won':
|
||||||
|
won_surf = pygame.Surface((self.screen.get_width() // 2, self.screen.get_height() // 2))
|
||||||
|
won_surf.fill(tuple(GREEN))
|
||||||
|
won_surf.blit(self.won_text,
|
||||||
|
((won_surf.get_width() - self.won_text.get_width()) // 2,
|
||||||
|
(won_surf.get_height() - self.won_text.get_height()) // 2))
|
||||||
|
self.screen.blit(won_surf, (self.screen.get_width() // 4, self.screen.get_height() // 4))
|
||||||
|
pygame.display.update()
|
||||||
|
pygame.time.wait(1500)
|
||||||
|
self.state = 'reset'
|
||||||
|
|
||||||
|
pygame.display.flip()
|
||||||
|
|
||||||
|
def process_inputs(self):
|
||||||
|
# proceed events
|
||||||
|
for event in pygame.event.get():
|
||||||
|
if event.type == pygame.QUIT:
|
||||||
|
self.state = 'quit'
|
||||||
|
# handle MOUSEBUTTONUP
|
||||||
|
elif event.type == pygame.MOUSEBUTTONUP:
|
||||||
|
pos = pygame.mouse.get_pos()
|
||||||
|
|
||||||
|
# select command to edit by the programmer
|
||||||
|
if self.prg.screen_rect is not None:
|
||||||
|
if self.prg.screen_rect.collidepoint(pos):
|
||||||
|
print(f"clicked at pos = {pos}")
|
||||||
|
self.programmer.command_to_edit = self.prg.get_clicked_command(pos)
|
||||||
|
|
||||||
|
# set the selected command to a new value
|
||||||
|
if self.programmer.screen_rect is not None:
|
||||||
|
if self.programmer.screen_rect.collidepoint(pos):
|
||||||
|
self.programmer.update_selected_command(pos)
|
||||||
|
|
||||||
|
# clicked RUN/STOP button
|
||||||
|
if self.btn_rect is not None and self.btn_rect.collidepoint(*pos):
|
||||||
|
print(f"clicked at pos = {pos}")
|
||||||
|
if self.state == 'running':
|
||||||
|
self.state = 'input'
|
||||||
|
elif self.state == 'input':
|
||||||
|
self.state = 'running'
|
||||||
|
elif self.state == 'stepping':
|
||||||
|
self.state = self.prg.step(self.state)
|
||||||
|
elif event.type == pygame.KEYUP:
|
||||||
|
if event.key == pygame.K_x:
|
||||||
|
if not self.beamer_mode:
|
||||||
|
# switch to beamer mode
|
||||||
|
os.environ['SDL_VIDEO_WINDOW_POS'] = '1920, 280'
|
||||||
|
self.scale_fac = 180
|
||||||
|
self.screen = pygame.display.set_mode((self.board.tiles.shape[1] * self.scale_fac,
|
||||||
|
self.board.tiles.shape[0] * self.scale_fac),
|
||||||
|
pygame.NOFRAME)
|
||||||
|
self.beamer_mode = True
|
||||||
|
else:
|
||||||
|
# switch to normal mode
|
||||||
|
os.environ['SDL_VIDEO_WINDOW_POS'] = '0, 0'
|
||||||
|
self.scale_fac = 125
|
||||||
|
self.screen = pygame.display.set_mode((int(self.board.tiles.shape[1] * self.scale_fac * 1.1),
|
||||||
|
int((self.board.tiles.shape[0] + 2) * self.scale_fac * 1.2)))
|
||||||
|
self.beamer_mode = False
|
||||||
|
elif event.key == pygame.K_s:
|
||||||
|
# run program
|
||||||
|
self.state = 'running'
|
||||||
|
elif event.key == pygame.K_SPACE:
|
||||||
|
if self.state != 'stepping':
|
||||||
|
self.state = 'stepping'
|
||||||
|
else:
|
||||||
|
self.state = self.prg.step(self.state)
|
||||||
|
elif event.key == pygame.K_r:
|
||||||
|
self.state = 'reset'
|
||||||
|
return self.state
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
self.prg.prg_counter = 0
|
||||||
|
self.robot.x = self.initial_pos[0]
|
||||||
|
self.robot.y = self.initial_pos[1]
|
||||||
|
self.robot.orientation = self.initial_pos[2]
|
||||||
|
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
|
||||||
|
self.board.tiles = self.inital_board_tiles.copy()
|
||||||
|
return 'input'
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
running = True
|
||||||
|
while running:
|
||||||
|
self.state = self.process_inputs()
|
||||||
|
if self.state == 'input':
|
||||||
|
pass
|
||||||
|
elif self.state == 'running':
|
||||||
|
self.state = self.prg.step()
|
||||||
|
elif self.state == 'reset':
|
||||||
|
self.state = self.reset()
|
||||||
|
elif self.state == 'quit':
|
||||||
|
running = False
|
||||||
|
elif self.state == 'stepping':
|
||||||
|
pass
|
||||||
|
elif self.state == 'game_over' or self.state == 'won':
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
print("unknown state")
|
||||||
|
return
|
||||||
|
|
||||||
|
self.render()
|
||||||
|
if self.robot.position_changed:
|
||||||
|
self.robot.update_pos(self.board.get_xdims(), self.board.get_ydims())
|
||||||
|
|
||||||
|
pygame.time.wait(100)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
seed = 2
|
||||||
|
random.seed(seed)
|
||||||
|
np.random.seed(seed)
|
||||||
|
game = Game(dimx=7, dimy=4, robotx=3, roboty=1)
|
||||||
|
game.run()
|
||||||
|
|
||||||
|
# TODOs
|
||||||
|
# - in stepping mode (s) it is possible to drive outside of the map
|
||||||
|
# - when no P0 command is present the program counter will overflow
|
|
@ -10,6 +10,7 @@ import pyqtgraph as pg
|
||||||
|
|
||||||
|
|
||||||
from shapely.geometry import LineString
|
from shapely.geometry import LineString
|
||||||
|
from shapely.affinity import scale
|
||||||
from queue import Queue
|
from queue import Queue
|
||||||
|
|
||||||
import aruco
|
import aruco
|
||||||
|
@ -448,8 +449,11 @@ class ArucoEstimator:
|
||||||
row_line_top = a + y_frac * vad
|
row_line_top = a + y_frac * vad
|
||||||
row_line_bottom = b + y_frac * vbc
|
row_line_bottom = b + y_frac * vbc
|
||||||
|
|
||||||
column_line = LineString([column_line_top, column_line_bottom])
|
column_line_orig = LineString([column_line_top, column_line_bottom])
|
||||||
row_line = LineString([row_line_top, row_line_bottom])
|
row_line_orig = LineString([row_line_top, row_line_bottom])
|
||||||
|
|
||||||
|
column_line = scale(column_line_orig, 10, 10)
|
||||||
|
row_line = scale(row_line_orig, 10, 10)
|
||||||
|
|
||||||
int_pt = column_line.intersection(row_line)
|
int_pt = column_line.intersection(row_line)
|
||||||
point_of_intersection = np.array([int_pt.x, int_pt.y])
|
point_of_intersection = np.array([int_pt.x, int_pt.y])
|
||||||
|
|
|
@ -57,8 +57,8 @@ class ControlCommander:
|
||||||
if not all_detected:
|
if not all_detected:
|
||||||
print(f"warning: no measurements available for the following robots: {undetected_robots}")
|
print(f"warning: no measurements available for the following robots: {undetected_robots}")
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
|
print("all robots detected -> starting control")
|
||||||
|
|
||||||
print("starting control")
|
|
||||||
self.running = True
|
self.running = True
|
||||||
while self.running:
|
while self.running:
|
||||||
while not self.event_listener.event_queue.empty():
|
while not self.event_listener.event_queue.empty():
|
||||||
|
@ -111,13 +111,13 @@ class ControlCommander:
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
id_ip_dict = {
|
id_ip_dict = {
|
||||||
#11: '10.10.11.88',
|
#11: '10.10.11.88',
|
||||||
12: '192.168.1.12',
|
#12: '192.168.1.12',
|
||||||
13: '192.168.1.13',
|
13: '192.168.1.13',
|
||||||
#14: '10.10.11.89',
|
#14: '10.10.11.89',
|
||||||
}
|
}
|
||||||
|
|
||||||
# controller_type = {12: 'mpc', 13: 'pid'}
|
# controller_type = {12: 'mpc', 13: 'pid'}
|
||||||
controller_type = 'pid'
|
controller_type = 'mpc'
|
||||||
|
|
||||||
rc = ControlCommander(id_ip_dict, controller_type=controller_type)
|
rc = ControlCommander(id_ip_dict, controller_type=controller_type)
|
||||||
rc.run()
|
rc.run()
|
||||||
|
|
|
@ -12,30 +12,38 @@ class MeasurementHandler(socketserver.BaseRequestHandler):
|
||||||
data = self.request.recv(1024).strip()
|
data = self.request.recv(1024).strip()
|
||||||
|
|
||||||
cur_thread = threading.current_thread()
|
cur_thread = threading.current_thread()
|
||||||
print(f"current thread {cur_thread}")
|
print(f"start current thread {cur_thread}")
|
||||||
|
|
||||||
if 'events' in data.decode():
|
if 'events' in data.decode():
|
||||||
|
print(f"{cur_thread} subscribed to events")
|
||||||
self.request.sendall('subscribed to events\n'.encode())
|
self.request.sendall('subscribed to events\n'.encode())
|
||||||
# send any events in the event queue to the subscriber
|
# send any events in the event queue to the subscriber
|
||||||
while True:
|
event_loop_running = True
|
||||||
while not self.server.estimator.event_queue.empty():
|
while event_loop_running:
|
||||||
event = self.server.estimator.event_queue.get()
|
try:
|
||||||
# we distinguish two kinds of events:
|
while not self.server.estimator.event_queue.empty():
|
||||||
if event[0] == 'response_event':
|
event = self.server.estimator.event_queue.get()
|
||||||
# 1. for 'response_event' events we expect the event subscriber to give a reply which will then
|
# we distinguish two kinds of events:
|
||||||
# by passed on to the response queue for transmitting to the original correspondent
|
if event[0] == 'response_event':
|
||||||
message = event[1]['event']
|
# 1. for 'response_event' events we expect the event subscriber to give a reply which will then
|
||||||
#print(f"passing command {message} on to subscriber")
|
# by passed on to the response queue for transmitting to the original correspondent
|
||||||
self.request.sendall((json.dumps(message) + '\n').encode())
|
message = event[1]['event']
|
||||||
reply = self.request.recv(1024)
|
#print(f"passing command {message} on to subscriber")
|
||||||
#print(f"putting reply {reply} in response queue")
|
self.request.sendall((json.dumps(message) + '\n').encode())
|
||||||
self.server.response_queue.put(reply)
|
reply = self.request.recv(1024)
|
||||||
else:
|
#print(f"putting reply {reply} in response queue")
|
||||||
# 2. for other types of events we don't expect a reply and just pass them on to the subscriber
|
self.server.response_queue.put(reply)
|
||||||
self.request.sendall((json.dumps(event) + '\n').encode())
|
else:
|
||||||
self.server.estimator.last_event = None
|
# 2. for other types of events we don't expect a reply and just pass them on to the subscriber
|
||||||
time.sleep(1.0 / self.server.max_measurements_per_second)
|
self.request.sendall((json.dumps(event) + '\n').encode())
|
||||||
|
self.server.estimator.last_event = None
|
||||||
|
time.sleep(1.0 / self.server.max_measurements_per_second)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"exception in event loop: {e}")
|
||||||
|
event_loop_running = False
|
||||||
|
print("after event loop")
|
||||||
elif 'corners' in data.decode():
|
elif 'corners' in data.decode():
|
||||||
|
print(f"{cur_thread} subscribed to corners")
|
||||||
# send positions of the board markers
|
# send positions of the board markers
|
||||||
corner_estimates = self.server.estimator.corner_estimates
|
corner_estimates = self.server.estimator.corner_estimates
|
||||||
response = {}
|
response = {}
|
||||||
|
@ -43,19 +51,23 @@ class MeasurementHandler(socketserver.BaseRequestHandler):
|
||||||
response[corner] = {'x': data['x'], 'y': data['y']}
|
response[corner] = {'x': data['x'], 'y': data['y']}
|
||||||
self.request.sendall((json.dumps(response) + '\n').encode())
|
self.request.sendall((json.dumps(response) + '\n').encode())
|
||||||
elif 'move_grid_blocking' in data.decode():
|
elif 'move_grid_blocking' in data.decode():
|
||||||
|
print(f"{cur_thread} subscribed move_grid_blocking")
|
||||||
|
|
||||||
# if we receive a move_grid event
|
# if we receive a move_grid event
|
||||||
# ( e.g. move_grid;{"x":1,"y":1,"dimx":10,"dimy":10,"orientation":"^","require_response":True} )
|
# ( e.g. move_grid_blocking;{"x":1,"y":1,"dimx":10,"dimy":10,"orientation":"^"} )
|
||||||
|
#move_grid_blocking;{"x": 1, "y": 2, "dimx": 7, "dimy": 4, "orientation": ">"}
|
||||||
# we compute the corresponding real-world position the robot should drive to
|
# we compute the corresponding real-world position the robot should drive to
|
||||||
# and then create a new move event which is put in the event queue and will be propagated to the ControlCommander
|
# and then create a new move event which is put in the event queue and will be propagated to the ControlCommander
|
||||||
data_decode = data.decode()
|
data_decode = data.decode()
|
||||||
# print("data: ", data_decode)
|
#print("data: ", data_decode)
|
||||||
payload = data.decode().split(';')[1]
|
payload = data_decode.split(';')[1]
|
||||||
|
#print("payload: ", payload)
|
||||||
grid_pos = json.loads(payload)
|
grid_pos = json.loads(payload)
|
||||||
# print("grid_pos = ", grid_pos)
|
#print("grid_pos = ", grid_pos)
|
||||||
pos = self.server.estimator.get_pos_from_grid_point(grid_pos['x'], grid_pos['y'], grid_pos['dimx'],
|
pos = self.server.estimator.get_pos_from_grid_point(grid_pos['x'], grid_pos['y'], grid_pos['dimx'],
|
||||||
grid_pos['dimy'], grid_pos['orientation'])
|
grid_pos['dimy'], grid_pos['orientation'])
|
||||||
# print("pos = ", pos)
|
#print("pos = ", pos)
|
||||||
# print("event requiring response")
|
#print("event requiring response")
|
||||||
# put blocking move command in event queue
|
# put blocking move command in event queue
|
||||||
self.server.estimator.event_queue.put(('response_event',
|
self.server.estimator.event_queue.put(('response_event',
|
||||||
{'event': ('move_blocking', {'x': pos[0], 'y': pos[1], 'angle': pos[2]})}))
|
{'event': ('move_blocking', {'x': pos[0], 'y': pos[1], 'angle': pos[2]})}))
|
||||||
|
@ -64,13 +76,17 @@ class MeasurementHandler(socketserver.BaseRequestHandler):
|
||||||
# TODO this assumes that we wait only for at most one response at a time
|
# TODO this assumes that we wait only for at most one response at a time
|
||||||
# we could add some kind of reference here to handle multiple responses (e.g. id of the response to wait for)
|
# we could add some kind of reference here to handle multiple responses (e.g. id of the response to wait for)
|
||||||
while self.server.response_queue.empty():
|
while self.server.response_queue.empty():
|
||||||
|
time.sleep(0.2)
|
||||||
|
self.request.sendall(b'.\n')
|
||||||
pass
|
pass
|
||||||
reply = self.server.response_queue.get()
|
while not self.server.response_queue.empty():
|
||||||
|
reply = self.server.response_queue.get()
|
||||||
|
|
||||||
# send back response to the original source
|
# send back response to the original source
|
||||||
#print(f"sending reply {reply} back to correspondent {self.request}")
|
print(f"sending reply {reply} back to correspondent {self.request}")
|
||||||
self.request.sendall(reply)
|
self.request.sendall(reply)
|
||||||
else:
|
else:
|
||||||
|
print(f"{cur_thread} subscribed to robot position")
|
||||||
# send robot position
|
# send robot position
|
||||||
try:
|
try:
|
||||||
marker_id = int(data)
|
marker_id = int(data)
|
||||||
|
@ -79,15 +95,22 @@ class MeasurementHandler(socketserver.BaseRequestHandler):
|
||||||
|
|
||||||
if marker_id is not None:
|
if marker_id is not None:
|
||||||
if marker_id in self.server.estimator.robot_marker_estimates:
|
if marker_id in self.server.estimator.robot_marker_estimates:
|
||||||
while True:
|
marker_loop_running = True
|
||||||
self.request.sendall((json.dumps(self.server.estimator.robot_marker_estimates[marker_id])
|
while marker_loop_running:
|
||||||
|
try:
|
||||||
|
self.request.sendall((json.dumps(self.server.estimator.robot_marker_estimates[marker_id])
|
||||||
+ '\n').encode())
|
+ '\n').encode())
|
||||||
time.sleep(1.0 / self.server.max_measurements_per_second)
|
time.sleep(1.0 / self.server.max_measurements_per_second)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"exception in marker loop: {e}")
|
||||||
|
marker_loop_running = False
|
||||||
else:
|
else:
|
||||||
self.request.sendall("error: unknown robot marker id\n".encode())
|
self.request.sendall("error: unknown robot marker id\n".encode())
|
||||||
else:
|
else:
|
||||||
self.request.sendall("error: data not understood. "
|
self.request.sendall("error: data not understood. "
|
||||||
"expected <robot marker id (int)> or 'events'\n".encode())
|
"expected <robot marker id (int)> or 'events'\n".encode())
|
||||||
|
|
||||||
|
print(f"end current thread {cur_thread}")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
|
@ -105,7 +128,7 @@ class MeasurementServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
aruco_estimator = ArucoEstimator(use_realsense=False, robot_marker_ids=[12, 13])
|
aruco_estimator = ArucoEstimator(use_realsense=True, robot_marker_ids=[12, 13])
|
||||||
|
|
||||||
# first we start thread for the measurement server
|
# first we start thread for the measurement server
|
||||||
measurement_server = MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator,
|
measurement_server = MeasurementServer(('0.0.0.0', 42424), MeasurementHandler, aruco_estimator,
|
||||||
|
|
|
@ -6,7 +6,7 @@ from casadi_opt import OpenLoopSolver
|
||||||
|
|
||||||
|
|
||||||
class MPCController(ControllerBase):
|
class MPCController(ControllerBase):
|
||||||
def __init__(self, N=20, T=1.0):
|
def __init__(self, N=10, T=1.0):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.t = None
|
self.t = None
|
||||||
|
|
||||||
|
|
|
@ -70,9 +70,9 @@ class PIDController(ControllerBase):
|
||||||
angles_unwrapped = np.unwrap([x_pred[2], target_angle]) # unwrap angle to avoid jump in data
|
angles_unwrapped = np.unwrap([x_pred[2], target_angle]) # unwrap angle to avoid jump in data
|
||||||
|
|
||||||
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
|
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
|
||||||
e_pos = np.linalg.norm(v)
|
e_pos = np.linalg.norm(v[0:2])
|
||||||
|
|
||||||
if e_pos < 0.05:
|
if e_pos < 0.03:
|
||||||
self.mode = 'angle'
|
self.mode = 'angle'
|
||||||
self.e_angle_old = 0
|
self.e_angle_old = 0
|
||||||
self.e_pos_old = 0
|
self.e_pos_old = 0
|
||||||
|
@ -111,6 +111,10 @@ class PIDController(ControllerBase):
|
||||||
u2 = 0.0
|
u2 = 0.0
|
||||||
return u1, u2
|
return u1, u2
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
super(PIDController, self).stop()
|
||||||
|
self.t = None
|
||||||
|
|
||||||
def apply_control(self, control):
|
def apply_control(self, control):
|
||||||
self.t = time.time() # save time when the most recent control was applied
|
self.t = time.time() # save time when the most recent control was applied
|
||||||
super(PIDController, self).apply_control(control)
|
super(PIDController, self).apply_control(control)
|
||||||
|
|
|
@ -111,12 +111,14 @@ class ControlledRobot(Robot):
|
||||||
|
|
||||||
def start_control(self):
|
def start_control(self):
|
||||||
if self.controller is not None:
|
if self.controller is not None:
|
||||||
|
print("starting control")
|
||||||
self.controller.start()
|
self.controller.start()
|
||||||
else:
|
else:
|
||||||
raise Exception("Error: Cannot start control: there is not controller attached to the robot!")
|
raise Exception("Error: Cannot start control: there is not controller attached to the robot!")
|
||||||
|
|
||||||
def stop_control(self):
|
def stop_control(self):
|
||||||
if self.controller is not None:
|
if self.controller is not None:
|
||||||
|
print("stopping control")
|
||||||
self.controller.stop()
|
self.controller.stop()
|
||||||
else:
|
else:
|
||||||
raise Exception("Error: Cannot stop control: there is not controller attached to the robot!")
|
raise Exception("Error: Cannot stop control: there is not controller attached to the robot!")
|
||||||
|
@ -141,8 +143,9 @@ class ControlledRobot(Robot):
|
||||||
angles_unwrapped = np.unwrap([current_pos[2], target_pos[2]]) # unwrap angle to avoid jump in data
|
angles_unwrapped = np.unwrap([current_pos[2], target_pos[2]]) # unwrap angle to avoid jump in data
|
||||||
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
|
e_angle = angles_unwrapped[0] - angles_unwrapped[1] # angle difference
|
||||||
e_pos = np.linalg.norm(v[0:2])
|
e_pos = np.linalg.norm(v[0:2])
|
||||||
|
#print(f"target_pos = {target_pos}, current_pos = {current_pos}, e_pos = {e_pos}, e_ang = {e_angle}")
|
||||||
print(f"e_pos = {e_pos}, e_ang = {e_angle}")
|
print(f"e_pos = {e_pos}, e_ang = {e_angle}")
|
||||||
close_to_target = e_pos < 0.05 and abs(e_angle) < 0.1
|
close_to_target = e_pos < 0.075 and abs(e_angle) < 0.1
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
print("target reached!")
|
print("target reached!")
|
||||||
self.stop_control()
|
self.stop_control()
|
||||||
|
|
Loading…
Reference in New Issue
Block a user