Compare commits
9 Commits
d0c17c0b91
...
4c3c3f973e
Author | SHA1 | Date | |
---|---|---|---|
4c3c3f973e | |||
57cd60de8c | |||
8c544f8fcc | |||
ff6caa457f | |||
9eea9bd245 | |||
945833c8ac | |||
e931085ca8 | |||
439cf44d70 | |||
c5f3f3babb |
464
gauss-turing/webserver/gauss_turing.py
Normal file
464
gauss-turing/webserver/gauss_turing.py
Normal file
|
@ -0,0 +1,464 @@
|
|||
import numpy as np
|
||||
import random
|
||||
import pygame
|
||||
import os
|
||||
|
||||
BLACK = np.array([0, 0, 0], dtype=np.uint8)
|
||||
WHITE = np.array([255, 255, 255], dtype=np.uint8)
|
||||
RED = np.array([255, 0, 0], dtype=np.uint8)
|
||||
BLUE = np.array([0, 0, 255], dtype=np.uint8)
|
||||
YELLOW = np.array([255, 255, 0], dtype=np.uint8)
|
||||
GREEN = np.array([0, 255, 0], dtype=np.uint8)
|
||||
|
||||
pygame.init()
|
||||
|
||||
pygame.font.init() # you have to call this at the start,
|
||||
# if you want to use this module.
|
||||
myfont = pygame.font.SysFont('Comic Sans MS', 55)
|
||||
myfont_small = pygame.font.SysFont('Comic Sans MS', 45)
|
||||
|
||||
P0_text = myfont.render('P0', False, tuple(BLACK))
|
||||
|
||||
tiledt = np.dtype([('x', np.uint8), ('y', np.uint8), ('color', np.uint8, 3), ('star', np.bool)])
|
||||
|
||||
|
||||
class Board:
|
||||
valid_colors = [WHITE, RED, BLUE]
|
||||
|
||||
def __init__(self, dim_x, dim_y):
|
||||
self.tiles = np.zeros((dim_y, dim_x), dtype=tiledt)
|
||||
for x in range(dim_x):
|
||||
for y in range(dim_y):
|
||||
self.tiles[y, x]['x'] = x
|
||||
self.tiles[y, x]['y'] = y
|
||||
self.tiles[y, x]['color'] = random.choice(Board.valid_colors)
|
||||
|
||||
def render(self, scale_fac):
|
||||
dimy, dimx = self.tiles.shape
|
||||
board_surf = pygame.Surface((dimx * scale_fac, dimy * scale_fac))
|
||||
|
||||
star_surf = pygame.Surface((scale_fac, scale_fac), pygame.SRCALPHA)
|
||||
pygame.draw.circle(star_surf, YELLOW, (int(0.5 * scale_fac), int(0.5 * scale_fac)), int(0.25 * scale_fac))
|
||||
|
||||
for y in range(self.tiles.shape[0]):
|
||||
for x in range(self.tiles.shape[1]):
|
||||
pygame.draw.rect(board_surf, tuple(self.tiles[y, x]['color']),
|
||||
(x * scale_fac, y * scale_fac, scale_fac, scale_fac), 0)
|
||||
pygame.draw.rect(board_surf, (0, 0, 0),
|
||||
(x * scale_fac, y * scale_fac, scale_fac, scale_fac), 1)
|
||||
if self.tiles[y, x]['star']:
|
||||
board_surf.blit(star_surf, (x * scale_fac, y * scale_fac, scale_fac, scale_fac))
|
||||
|
||||
return board_surf
|
||||
|
||||
# def __repr__(self):
|
||||
# s = ''
|
||||
# for y in range(self.tiles.shape[0]):
|
||||
# for x in range(self.tiles.shape[1]):
|
||||
# if (x,y) == (self.robot.x, self.robot.y):
|
||||
# s += self.robot.orientation
|
||||
# else:
|
||||
# s += '.'
|
||||
# s += '\n'
|
||||
# return s
|
||||
|
||||
|
||||
class Robot:
|
||||
orientations = ['^', 'left', 'down', 'right']
|
||||
resulting_orientation = {
|
||||
'^': {'left': '<', 'right': '>'},
|
||||
'>': {'left': '^', 'right': 'v'},
|
||||
'v': {'left': '>', 'right': '<'},
|
||||
'<': {'left': 'v', 'right': '^'},
|
||||
}
|
||||
|
||||
|
||||
def __init__(self, x, y, orientation):
|
||||
self.x = x
|
||||
self.y = y
|
||||
self.orientation = orientation
|
||||
|
||||
def get_forward_coordinates(self):
|
||||
# get the coordinates of the neighboring tile in the given direction
|
||||
if self.orientation == '^':
|
||||
return self.y - 1, self.x
|
||||
elif self.orientation == '>':
|
||||
return self.y, self.x + 1
|
||||
elif self.orientation == 'v':
|
||||
return self.y + 1, self.x
|
||||
elif self.orientation == '<':
|
||||
return self.y, self.x - 1
|
||||
else:
|
||||
raise Exception("error: undefined direction")
|
||||
|
||||
def get_angle(self):
|
||||
angle = {'>': 0, '^': np.pi/2, '<': np.pi, 'v': 3*np.pi/2}[self.orientation]
|
||||
return np.rad2deg(angle)
|
||||
|
||||
def render(self, scale_fac):
|
||||
robot_surf = pygame.Surface((scale_fac, scale_fac), pygame.SRCALPHA)
|
||||
pygame.draw.lines(robot_surf, (0, 0, 0), True, [(0.75 * scale_fac, 0.5 * scale_fac),
|
||||
(0.25 * scale_fac, 0.25 * scale_fac),
|
||||
(0.25 * scale_fac, 0.75 * scale_fac)], 3)
|
||||
robot_surf = pygame.transform.rotate(robot_surf, self.get_angle())
|
||||
return robot_surf
|
||||
|
||||
def __repr__(self):
|
||||
return f"({self.y}, {self.x}) - {self.orientation}"
|
||||
|
||||
|
||||
class Command:
|
||||
valid_actions = {'forward', 'left', 'right', 'P0', '-'}
|
||||
|
||||
def __init__(self, action=None, color=WHITE):
|
||||
if not (action in Command.valid_actions and any([np.all(color == c) for c in Board.valid_colors])):
|
||||
raise ValueError("invalid values for command")
|
||||
self.action = action
|
||||
self.color = color
|
||||
|
||||
def __repr__(self):
|
||||
return f"{self.action}: {self.color}"
|
||||
|
||||
def render(self, scale_fac):
|
||||
cmd_surf = pygame.Surface((scale_fac, scale_fac))
|
||||
cmd_surf.fill(tuple(self.color))
|
||||
|
||||
arrow_surf = pygame.Surface((300, 300), pygame.SRCALPHA)
|
||||
|
||||
pygame.draw.polygon(arrow_surf, (0, 0, 0),
|
||||
((0, 100), (0, 200), (200, 200), (200, 300), (300, 150), (200, 0), (200, 100)))
|
||||
arrow_surf = pygame.transform.scale(arrow_surf, (int(0.9*scale_fac), int(0.9*scale_fac)))
|
||||
|
||||
if self.action == 'forward':
|
||||
arrow_surf = pygame.transform.rotate(arrow_surf, 90)
|
||||
elif self.action == 'left':
|
||||
arrow_surf = pygame.transform.rotate(arrow_surf, 180)
|
||||
|
||||
if self.action in {'left', 'forward', 'right'}:
|
||||
cmd_surf.blit(arrow_surf, (0.05*scale_fac,0.05*scale_fac,0.95*scale_fac,0.95*scale_fac))
|
||||
elif self.action == 'P0':
|
||||
cmd_surf.blit(P0_text, (0.05*scale_fac,0.05*scale_fac,0.95*scale_fac,0.95*scale_fac))
|
||||
|
||||
return cmd_surf
|
||||
|
||||
class Programmer:
|
||||
def __init__(self, prg):
|
||||
self.prg = prg
|
||||
self.available_inputs = [Command('forward'), Command('left'), Command('right'), Command('P0'),
|
||||
Command('-', color=RED), Command('-', color=BLUE), Command('-', color=WHITE)]
|
||||
self.command_to_edit = 0
|
||||
self.screen_rect = None
|
||||
|
||||
def render(self, scale_fac):
|
||||
"""Render surface with possible inputs for the robot.
|
||||
|
||||
:return: surface of the input commands
|
||||
"""
|
||||
inp_surf = pygame.Surface((len(self.available_inputs) * scale_fac, 1 * scale_fac))
|
||||
for i, inp in enumerate(self.available_inputs):
|
||||
cmd_surf = inp.render(scale_fac)
|
||||
inp_surf.blit(cmd_surf, (i * scale_fac, 0, scale_fac, scale_fac))
|
||||
return inp_surf
|
||||
|
||||
def update_selected_command(self, pos):
|
||||
print(f"clicked at pos = {pos}")
|
||||
xoffset = pos[0] - self.screen_rect.x
|
||||
selected_input_index = xoffset * len(self.available_inputs) // self.screen_rect.width
|
||||
selected_input = self.available_inputs[selected_input_index]
|
||||
|
||||
edited_command = self.prg.cmds[self.command_to_edit]
|
||||
|
||||
print("command before edit = ", edited_command)
|
||||
if selected_input.action != '-':
|
||||
edited_command.action = selected_input.action
|
||||
else:
|
||||
edited_command.color = selected_input.color
|
||||
print("command after edit = ", edited_command)
|
||||
|
||||
class Program:
|
||||
def __init__(self, robot, board, cmds):
|
||||
self.cmds = cmds
|
||||
self.robot = robot
|
||||
self.board = board
|
||||
self.prg_counter = 0
|
||||
self.screen_rect = None
|
||||
|
||||
def step(self):
|
||||
cmd = self.cmds[self.prg_counter]
|
||||
self.prg_counter += 1
|
||||
|
||||
# current position
|
||||
x = self.robot.x
|
||||
y = self.robot.y
|
||||
|
||||
# current tile the robot is on
|
||||
tile = self.board.tiles[y, x]
|
||||
|
||||
# apply next instruction of the program
|
||||
if np.all(cmd.color == WHITE) or np.all(cmd.color == tile['color']):
|
||||
# matching color -> execute command
|
||||
if cmd.action == 'forward':
|
||||
ynew, xnew = self.robot.get_forward_coordinates()
|
||||
self.robot.x = xnew
|
||||
self.robot.y = ynew
|
||||
elif cmd.action in {'left', 'right'}:
|
||||
self.robot.orientation = Robot.resulting_orientation[self.robot.orientation][cmd.action]
|
||||
elif cmd.action == 'P0':
|
||||
self.prg_counter = 0
|
||||
else:
|
||||
print("color not matching -> skipping command")
|
||||
|
||||
# get all events
|
||||
ev = pygame.event.get()
|
||||
# proceed events
|
||||
for event in ev:
|
||||
# handle MOUSEBUTTONUP
|
||||
if event.type == pygame.MOUSEBUTTONUP:
|
||||
pos = pygame.mouse.get_pos()
|
||||
if pos[0] >= 325 and pos[0] <= 400 and pos[1] >= 600 and pos[1] <= 650:
|
||||
print(f"clicked at pos = {pos}")
|
||||
self.state = 'input'
|
||||
|
||||
# update state for new robot position
|
||||
if (not (0 <= self.robot.x < self.board.tiles.shape[1])) or not (0 <= self.robot.y < self.board.tiles.shape[0]):
|
||||
# robot leaves the board -> GAME OVER
|
||||
print("GAME OVER")
|
||||
return 'game_over'
|
||||
|
||||
# robot collects star on new tile
|
||||
tile = self.board.tiles[self.robot.y, self.robot.x]
|
||||
if tile['star']:
|
||||
tile['star'] = False
|
||||
|
||||
if all([not t['star'] for t in self.board.tiles.flatten()]):
|
||||
print("YOU WON")
|
||||
return 'won'
|
||||
|
||||
# by default we continue in the running state
|
||||
return 'running'
|
||||
|
||||
def render(self, scale_fac, prg_counter_override=None):
|
||||
"""Render the current program. This will render all commands and highlight the next command to execute
|
||||
(determined by self.prg_counter).
|
||||
|
||||
The prg_counter_override can be used to highlight a different command instead. This is used during input mode
|
||||
to highlight the command the user will edit.
|
||||
|
||||
:param scale_fac:
|
||||
:param prg_counter_override:
|
||||
:return:
|
||||
"""
|
||||
prg_counter = self.prg_counter
|
||||
if prg_counter_override is not None:
|
||||
prg_counter = prg_counter_override
|
||||
prg_surf = pygame.Surface((5 * scale_fac, 1 * scale_fac))
|
||||
for i in range(5):
|
||||
if i < len(self.cmds):
|
||||
cmd = self.cmds[i]
|
||||
cmd_surf = cmd.render(scale_fac)
|
||||
else:
|
||||
cmd_surf = pygame.Surface((scale_fac,scale_fac))
|
||||
cmd_surf.fill(WHITE)
|
||||
if prg_counter is not None and i == prg_counter:
|
||||
pygame.draw.rect(cmd_surf, tuple(GREEN), (0, 0, scale_fac, scale_fac), 5)
|
||||
prg_surf.blit(cmd_surf, (i * scale_fac, 0, scale_fac, scale_fac))
|
||||
|
||||
return prg_surf
|
||||
|
||||
def get_clicked_command(self, pos):
|
||||
print(f"clicked at pos = {pos}")
|
||||
xoffset = pos[0] - self.screen_rect.x
|
||||
clicked_cmd = xoffset * len(self.cmds) // self.screen_rect.width
|
||||
print("clicked command = ", clicked_cmd)
|
||||
return clicked_cmd
|
||||
|
||||
|
||||
class Game:
|
||||
def __init__(self, dimx, dimy, robotx, roboty):
|
||||
self.robot = Robot(x=robotx, y=roboty, orientation='v')
|
||||
self.board = Board(dimx, dimy)
|
||||
self.board.tiles[3,3]['star'] = True
|
||||
self.board.tiles[3,2]['star'] = True
|
||||
|
||||
# TODO fix number of commands at 5
|
||||
self.cmds = [Command('forward'), Command('left', color=RED), Command('left', color=BLUE), Command('P0'), Command('-')]
|
||||
self.state = 'input'
|
||||
|
||||
self.prg = Program(self.robot, self.board, self.cmds)
|
||||
|
||||
self.programmer = Programmer(self.prg)
|
||||
|
||||
#self.scale_fac = 180
|
||||
self.scale_fac = 125
|
||||
self.beamer_mode = False
|
||||
self.screen = pygame.display.set_mode((int(self.board.tiles.shape[1] * self.scale_fac * 1.1),
|
||||
int((self.board.tiles.shape[0] + 2) * self.scale_fac * 1.2)))
|
||||
|
||||
self.game_over_text = myfont.render('GAME OVER', False, RED)
|
||||
self.won_text = myfont.render('YOU WON', False, GREEN)
|
||||
self.run_text = myfont.render('RUN', False, tuple(BLACK))
|
||||
self.stop_text = myfont_small.render('STOP', False, tuple(BLACK))
|
||||
|
||||
# save initial state
|
||||
self.initial_pos = (self.robot.x, self.robot.y, self.robot.orientation)
|
||||
self.inital_board_tiles = self.board.tiles.copy()
|
||||
|
||||
def render(self):
|
||||
"""Render the game screen.
|
||||
|
||||
This will render the board and the robot.
|
||||
Depending on the display mode it will also render the current program and the input commands for the robot.
|
||||
|
||||
:param prg_counter:
|
||||
:return:
|
||||
"""
|
||||
if self.beamer_mode:
|
||||
dx = 0
|
||||
dy = 0
|
||||
else:
|
||||
dx = int(0.05 * self.screen.get_width())
|
||||
dy = int(0.05 * self.screen.get_height())
|
||||
self.screen.fill(tuple(BLACK))
|
||||
|
||||
# render the board
|
||||
board_surf = self.board.render(self.scale_fac)
|
||||
|
||||
# render robot onto the board surface
|
||||
robot_surf = self.robot.render(self.scale_fac)
|
||||
board_surf.blit(robot_surf, (self.robot.x * self.scale_fac, self.robot.y * self.scale_fac, self.scale_fac, self.scale_fac))
|
||||
self.screen.blit(board_surf, (dx, dy, dx + self.board.tiles.shape[1] * self.scale_fac, dy + self.board.tiles.shape[0] * self.scale_fac))
|
||||
|
||||
if not self.beamer_mode:
|
||||
# if we are not in beamer mode we also show the current program / inputs
|
||||
|
||||
# render program
|
||||
if self.state == 'input':
|
||||
# in input mode we highlight the command which is selected for edit
|
||||
prg_surf = self.prg.render(self.scale_fac, prg_counter_override=self.programmer.command_to_edit)
|
||||
else:
|
||||
# in other modes we render the current program counter
|
||||
prg_surf = self.prg.render(self.scale_fac)
|
||||
prg_surf = pygame.transform.scale(prg_surf, (self.screen.get_width() * 2 // 3, self.scale_fac * 2 // 3))
|
||||
self.prg.screen_rect = pygame.Rect((dx, board_surf.get_height() + 2 * dy, prg_surf.get_width(), prg_surf.get_height()))
|
||||
self.screen.blit(prg_surf, self.prg.screen_rect)
|
||||
|
||||
# render input fields and buttons
|
||||
inp_surf = self.programmer.render(self.scale_fac)
|
||||
inp_surf = pygame.transform.scale(inp_surf, (self.screen.get_width() * 2 // 3, self.scale_fac * 2 // 3))
|
||||
self.programmer.screen_rect = pygame.Rect((dx, board_surf.get_height() + prg_surf.get_height() + 3 * dy, inp_surf.get_width(), inp_surf.get_height()))
|
||||
self.screen.blit(inp_surf, self.programmer.screen_rect)
|
||||
|
||||
btn_surf = pygame.Surface((3 * self.scale_fac//2, self.scale_fac))
|
||||
self.btn_rect = pygame.Rect((2 * dx + prg_surf.get_width(), board_surf.get_height() + 2 * dy,
|
||||
btn_surf.get_height(), btn_surf.get_width()))
|
||||
if self.state == 'input':
|
||||
btn_surf.fill(tuple(GREEN))
|
||||
btn_surf.blit(self.run_text, (0, 10))
|
||||
elif self.state == 'running':
|
||||
btn_surf.fill(tuple(RED))
|
||||
btn_surf.blit(self.stop_text, (0, 10))
|
||||
self.screen.blit(btn_surf, self.btn_rect)
|
||||
|
||||
# render messages
|
||||
if self.state == 'game_over':
|
||||
self.screen.blit(self.game_over_text, (50, 00))
|
||||
pygame.display.update()
|
||||
pygame.time.wait(1500)
|
||||
self.state = 'reset'
|
||||
elif self.state == 'won':
|
||||
self.screen.blit(self.won_text, (50, 00))
|
||||
pygame.display.update()
|
||||
pygame.time.wait(1500)
|
||||
self.state = 'reset'
|
||||
|
||||
pygame.display.flip()
|
||||
|
||||
def process_inputs(self):
|
||||
# proceed events
|
||||
for event in pygame.event.get():
|
||||
if event.type == pygame.QUIT:
|
||||
self.state = 'quit'
|
||||
# handle MOUSEBUTTONUP
|
||||
elif event.type == pygame.MOUSEBUTTONUP:
|
||||
pos = pygame.mouse.get_pos()
|
||||
|
||||
# select command to edit by the programmer
|
||||
if self.prg.screen_rect is not None:
|
||||
if self.prg.screen_rect.collidepoint(pos):
|
||||
print(f"clicked at pos = {pos}")
|
||||
self.programmer.command_to_edit = self.prg.get_clicked_command(pos)
|
||||
|
||||
# set the selected command to a new value
|
||||
if self.programmer.screen_rect is not None:
|
||||
if self.programmer.screen_rect.collidepoint(pos):
|
||||
self.programmer.update_selected_command(pos)
|
||||
|
||||
# clicked RUN/STOP button
|
||||
if self.btn_rect is not None and self.btn_rect.collidepoint(*pos):
|
||||
print(f"clicked at pos = {pos}")
|
||||
if self.state == 'running':
|
||||
self.state = 'input'
|
||||
elif self.state == 'input':
|
||||
self.state = 'running'
|
||||
elif event.type == pygame.KEYUP:
|
||||
if event.key == pygame.K_x:
|
||||
if not self.beamer_mode:
|
||||
# switch to beamer mode
|
||||
os.environ['SDL_VIDEO_WINDOW_POS'] = '1920, 280'
|
||||
self.scale_fac = 180
|
||||
self.screen = pygame.display.set_mode((self.board.tiles.shape[1] * self.scale_fac,
|
||||
self.board.tiles.shape[0] * self.scale_fac),
|
||||
pygame.NOFRAME)
|
||||
self.beamer_mode = True
|
||||
else:
|
||||
# switch to normal mode
|
||||
os.environ['SDL_VIDEO_WINDOW_POS'] = '0, 0'
|
||||
self.scale_fac = 125
|
||||
self.screen = pygame.display.set_mode((self.board.tiles.shape[1] * self.scale_fac,
|
||||
self.board.tiles.shape[0] * self.scale_fac + 5 * self.scale_fac))
|
||||
self.beamer_mode = False
|
||||
elif event.key == pygame.K_r:
|
||||
# run program
|
||||
self.state = 'running'
|
||||
elif event.key == pygame.K_s:
|
||||
self.state = 'manual'
|
||||
self.prg.step()
|
||||
return self.state
|
||||
|
||||
def reset(self):
|
||||
self.prg.prg_counter = 0
|
||||
self.robot.x = self.initial_pos[0]
|
||||
self.robot.y = self.initial_pos[1]
|
||||
self.robot.orientation = self.initial_pos[2]
|
||||
self.board.tiles = self.inital_board_tiles.copy()
|
||||
return 'input'
|
||||
|
||||
def run(self):
|
||||
running = True
|
||||
while running:
|
||||
self.state = self.process_inputs()
|
||||
if self.state == 'input':
|
||||
pass
|
||||
elif self.state == 'running':
|
||||
self.state = self.prg.step()
|
||||
elif self.state == 'reset':
|
||||
self.state = self.reset()
|
||||
elif self.state == 'quit':
|
||||
running = False
|
||||
elif self.state == 'manual':
|
||||
pass
|
||||
else:
|
||||
print("unknown state")
|
||||
return
|
||||
|
||||
self.render()
|
||||
pygame.time.wait(100)
|
||||
|
||||
if __name__ == "__main__":
|
||||
random.seed(42)
|
||||
game = Game(dimx=7, dimy=4, robotx=3, roboty=1)
|
||||
game.run()
|
||||
|
||||
# TODOs
|
||||
# - in stepping mode (s) it is possible to drive outside of the map
|
||||
# - when no P0 command is present the program counter will overflow
|
|
@ -141,6 +141,7 @@ class ArucoEstimator:
|
|||
{'name': 'Grid rows', 'type': 'int', 'value': self.grid_rows, 'tip': "Number of rows for the grid"},
|
||||
{'name': 'Display mode', 'type': 'list', 'values': ['color', 'grayscale'], 'value': 'color', 'tip': "Display mode for the video"},
|
||||
{'name': 'Autoexposure', 'type': 'bool', 'value': True},
|
||||
{'name': 'Controlled robot', 'type': 'list', 'values': self.robot_marker_ids, 'tip': 'Robot to control'},
|
||||
RobotMarkerGroup(name="Robot markers", children=robot_marker_group),
|
||||
CornerMarkerGroup(name="Corner markers", children=corner_marker_group),
|
||||
]
|
||||
|
@ -156,6 +157,8 @@ class ArucoEstimator:
|
|||
self.params.param('Robot marker size').sigValueChanged.connect(lambda _, v: self.__setattr__('robot_marker_size', v))
|
||||
self.params.param('Show FPS').sigValueChanged.connect(lambda _, v: self.fps_overlay.show() if v else self.fps_overlay.hide())
|
||||
self.params.param('Autoexposure').sigValueChanged.connect(lambda _, v: self.set_autoexposure(v))
|
||||
self.params.param('Controlled robot').sigValueChanged.connect(lambda _, v: self.event_queue.put(('controlled_robot',
|
||||
{'robot_id': v})))
|
||||
|
||||
self.paramtree = ParameterTree()
|
||||
self.paramtree.setParameters(self.params, showTop=False)
|
||||
|
|
|
@ -143,7 +143,7 @@ class OpenLoopSolver:
|
|||
|
||||
|
||||
def solve(self, x0, target):
|
||||
|
||||
target = np.asarray(target)
|
||||
angles_unwrapped = np.unwrap([x0[2], target[2]]) # unwrap angle to avoid jump in data
|
||||
x0[2] = angles_unwrapped[0]
|
||||
target[2] = angles_unwrapped[1]
|
||||
|
|
|
@ -2,25 +2,12 @@ import sys
|
|||
import socket
|
||||
import threading
|
||||
import time
|
||||
import json
|
||||
|
||||
from robot import Robot, ControlledRobot
|
||||
|
||||
from mpc_controller import MPCController
|
||||
|
||||
# HOST, PORT = "localhost", 42424
|
||||
#
|
||||
# robot_id = 12
|
||||
#
|
||||
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
# sock.connect((HOST, PORT))
|
||||
# sock.sendall(f"{robot_id}\n".encode()) # request data for robot with given id
|
||||
# #sock.sendall(f"events\n".encode()) # request events
|
||||
# receiving = True
|
||||
# while receiving:
|
||||
# received = str(sock.recv(1024), "utf-8")
|
||||
# print("Received: {}".format(received))
|
||||
# receiving = len(received) > 0
|
||||
|
||||
|
||||
class RemoteController:
|
||||
def __init__(self):
|
||||
|
@ -28,35 +15,84 @@ class RemoteController:
|
|||
#self.robots = [Robot(11, '192.168.1.11')] #, Robot(13, '192.168.1.13'), Robot(14, '192.168.1.14')]
|
||||
#self.robots = [Robot(12, '10.10.11.91'), Robot(13, '10.10.11.90'), Robot(14, '10.10.11.89')]
|
||||
#self.robots = [Robot(14, '10.10.11.89')]
|
||||
self.robots = [ControlledRobot(13, '192.168.1.13')]
|
||||
self.robots = [ControlledRobot(13, '192.168.1.13'), ControlledRobot(12, '192.168.1.12')]
|
||||
|
||||
self.robot_ids = {}
|
||||
for r in self.robots:
|
||||
self.robot_ids[r.id] = r
|
||||
|
||||
# socket for movement commands
|
||||
self.comm_socket = socket.socket()
|
||||
self.comm_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self.controlled_robot = self.robots[0].id
|
||||
|
||||
for robot in self.robots:
|
||||
robot.connect()
|
||||
|
||||
self.comm_socket.bind(('', 1337))
|
||||
self.comm_socket.listen(5)
|
||||
# thread for handling events received by measurement server (from GUI)
|
||||
self.event_thread = threading.Thread(target=self.handle_events)
|
||||
self.event_thread.daemon = True
|
||||
|
||||
# connect to socket for events from GUI
|
||||
try:
|
||||
HOST, PORT = "localhost", 42424
|
||||
self.event_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.event_socket.connect((HOST, PORT))
|
||||
self.event_socket.sendall("events\n".encode())
|
||||
self.event_socket.settimeout(0.1)
|
||||
# check if we receive data from the measurement server
|
||||
response = self.event_socket.recv(1024)
|
||||
if 'error' not in str(response):
|
||||
print("... connected! -> start listening for measurements")
|
||||
self.event_socket.settimeout(None)
|
||||
# if so we start the measurement thread
|
||||
self.event_thread.start()
|
||||
else:
|
||||
print(f"error: cannot communicate with the measurement server.\n The response was: {response}")
|
||||
except socket.timeout:
|
||||
print(f"error: the measurement server did not respond with data.")
|
||||
|
||||
|
||||
#sock.sendall(f"{robot_id}\n".encode()) # request data for robot with given id
|
||||
# sock.sendall(f"events\n".encode()) # request events
|
||||
# receiving = True
|
||||
# while receiving:
|
||||
# received = str(sock.recv(1024), "utf-8")
|
||||
# print("Received: {}".format(received))
|
||||
# receiving = len(received) > 0
|
||||
|
||||
self.t = time.time()
|
||||
|
||||
# start thread for marker position detection
|
||||
self.controller = MPCController()
|
||||
self.robots[0].attach_controller(self.controller)
|
||||
self.controllers = []
|
||||
for r in self.robots:
|
||||
c = MPCController()
|
||||
self.controllers.append(c)
|
||||
r.attach_controller(c)
|
||||
|
||||
def handle_events(self):
|
||||
receiving = True
|
||||
while receiving:
|
||||
received = str(self.event_socket.recv(1024), "utf-8")
|
||||
if len(received) > 0:
|
||||
last_received = received.split('\n')[-2]
|
||||
event_type, payload = json.loads(last_received)
|
||||
print(event_type, payload)
|
||||
|
||||
if event_type == 'click':
|
||||
target_pos = (payload['x'], payload['y'], payload['angle'])
|
||||
self.robot_ids[self.controlled_robot].move_to_pos(target_pos=target_pos)
|
||||
elif event_type == 'controlled_robot':
|
||||
self.controlled_robot = payload['robot_id']
|
||||
else:
|
||||
receiving = False
|
||||
print(f"measurement server stopped sending event data")
|
||||
|
||||
def run(self):
|
||||
|
||||
#print("waiting until all markers are detected...")
|
||||
#while not (self.estimator.all_corners_detected() and self.estimator.all_robots_detected()):
|
||||
# pass
|
||||
print("starting control")
|
||||
#self.controller.interactive_control(robots=self.robots)
|
||||
self.controller.start()
|
||||
for c in self.controllers:
|
||||
c.start()
|
||||
pass
|
||||
|
||||
|
||||
|
@ -70,10 +106,9 @@ if __name__ == '__main__':
|
|||
|
||||
# to use the program, first start measurement_server.py, which launches the Aruco marker detection GUI and streams
|
||||
# marker positions and GUI events via socket
|
||||
# next, launch this program (mpc_test.py) in debug mode and set a breakpoint after self.controller.start()
|
||||
# you can now set the target position for the controlled robot via move_to_pos(), e.g.
|
||||
# self.robots[0].move_to_pos([0.0,0,0], True)
|
||||
# next, launch this program (mpc_test.py)
|
||||
# you can now direct the robots via the GUI (assuming four corner markers for position reference have been detected)
|
||||
# click any position on the screen and the current robot will drive there
|
||||
# you can also switch which robot should be controlled in the GUI
|
||||
|
||||
# TODO
|
||||
# figure out how to get clicked positions from measurement server and drive robot there
|
||||
# also document and improve program structure
|
||||
# TODO document and improve program structure
|
|
@ -95,10 +95,10 @@ class Robot:
|
|||
return self.t_last_measurement, self.x, self.y, self.angle
|
||||
|
||||
def __str__(self):
|
||||
connection_state = '' if self.connected else 'not'
|
||||
connection_state = '' if self.connected else ' not'
|
||||
state = self.get_measurement()
|
||||
last_measurement = f'last measurement = {state}' if None not in state else 'no measurements available'
|
||||
return f"Robot {self.id}: ip = {self.ip}:{self.port} ({connection_state} connected) " + last_measurement
|
||||
return f"Robot {self.id}: ip = {self.ip}:{self.port} ({connection_state} connected ) " + last_measurement
|
||||
|
||||
def __repr__(self):
|
||||
return str(self)
|
||||
|
|
Loading…
Reference in New Issue
Block a user