#!/usr/bin/env python3
#-*- coding:utf-8 -*-
"""
This module shows how the GameController Communication protocol can be used
in python and also allows to be changed such that every team using python to
interface with the GC can utilize the new protocol.
.. moduleauthor:: Nils Rokita <0rokita@informatik.uni-hamburg.de>
.. moduleauthor:: Robert Kessler <8kessler@informatik.uni-hamburg.de>
Modded by Egor Davydenko egordv@gmail.com for elsiros league
"""
import socket
import time
import logging
import argparse
import threading
# Requires construct==2.5.3
from construct import Container, ConstError
from gamestate import GameState, ReturnData, GAME_CONTROLLER_RESPONSE_VERSION
[docs]logger = logging.getLogger('game_controller')
logger.setLevel(logging.DEBUG)
[docs]console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(console_handler)
[docs]DEFAULT_LISTENING_HOST = '0.0.0.0'
[docs]GAME_CONTROLLER_LISTEN_PORT = 3838
[docs]GAME_CONTROLLER_ANSWER_PORT = 3939
[docs]parser = argparse.ArgumentParser()
parser.add_argument('--team', type=int, default=1, help="team ID, default is 1")
parser.add_argument('--player', type=int, default=1, help="player ID, default is 1")
parser.add_argument('--goalkeeper', action="store_true", help="if this flag is present, the player takes the role of the goalkeeper")
[docs]class GameStateReceiver(object):
""" This class puts up a simple UDP Server which receives the
*addr* parameter to listen to the packages from the game_controller.
If it receives a package it will be interpreted with the construct data
structure and the :func:`on_new_gamestate` will be called with the content.
After this we send a package back to the GC """
def __init__(self, team, player, is_goalkeeper, addr=(DEFAULT_LISTENING_HOST, GAME_CONTROLLER_LISTEN_PORT), answer_port=GAME_CONTROLLER_ANSWER_PORT):
# Information that is used when sending the answer to the game controller
self.team = team
self.player = player
self.man_penalize = True
self.is_goalkeeper = is_goalkeeper
# The address listening on and the port for sending back the robots meta data
self.addr = addr
self.answer_port = answer_port
# The state and time we received last form the GC
self.state = None
self.time = None
# The socket and whether it is still running
self.socket = None
self.running = True
self._open_socket()
[docs] def _open_socket(self):
""" Erzeugt das Socket """
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.addr)
self.socket.settimeout(0.5)
self.socket2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.socket2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
[docs] def receive_forever(self):
""" Waits in a loop that is terminated by setting self.running = False """
while self.running:
try:
self.receive_once()
except IOError as e:
logger.debug("Fehler beim Senden des KeepAlive: " + str(e))
[docs] def receive_once(self):
""" Receives a package and interprets it.
Calls :func:`on_new_gamestate`
Sends an answer to the GC """
try:
data, peer = self.socket.recvfrom(GameState.sizeof())
#print(len(data))
# Throws a ConstError if it doesn't work
parsed_state = GameState.parse(data)
# Assign the new package after it parsed successful to the state
self.state = parsed_state
self.time = time.time()
# Call the handler for the package
self.on_new_gamestate(self.state)
# Answer the GameController
self.answer_to_gamecontroller(peer)
except AssertionError as ae:
logger.error(ae.message)
except socket.timeout:
logger.warning("Socket timeout")
except ConstError:
logger.warning("Parse Error: Probably using an old protocol!")
except Exception as e:
logger.exception(e)
pass
[docs] def answer_to_gamecontroller(self, peer):
""" Sends a life sign to the game controller """
return_message = 0 if self.man_penalize else 2
if self.is_goalkeeper:
return_message = 3
data = Container(
header=b"RGrt",
version=GAME_CONTROLLER_RESPONSE_VERSION,
team=self.team,
player=self.player,
message=return_message)
try:
destination = peer[0], GAME_CONTROLLER_ANSWER_PORT
self.socket.sendto(ReturnData.build(data), destination)
except Exception as e:
logger.log("Network Error: %s" % str(e))
[docs] def on_new_gamestate(self, state):
""" Is called with the new game state after receiving a package
Needs to be implemented or set
:param state: Game State
"""
raise NotImplementedError()
[docs] def get_last_state(self):
return self.state, self.time
[docs] def get_time_since_last_package(self):
return time.time() - self.time
[docs] def stop(self):
self.running = False
[docs] def set_manual_penalty(self, flag):
self.man_penalize = flag
[docs]class SampleGameStateReceiver(GameStateReceiver):
[docs] def on_new_gamestate(self, state):
print(state)
print(state.secondary_state_info)
[docs]class ThreadedGameStateReceiver(GameStateReceiver):
def __init__(self, team, player, is_goalkeeper, addr=(DEFAULT_LISTENING_HOST, GAME_CONTROLLER_LISTEN_PORT), answer_port=GAME_CONTROLLER_ANSWER_PORT):
super().__init__(team, player, is_goalkeeper, addr=addr, answer_port=answer_port)
self.receive_thread = threading.Thread(target=self.receive_forever)
self.receive_thread.daemon=True # force the thread to terminate when the main process ends
self.team_state = None
self.player_state = None
[docs] def start(self):
self.receive_thread.start()
[docs] def on_new_gamestate(self, state):
for team in state['teams']:
if team.team_number == self.team:
self.team_state = team
self.player_state = team['players'][self.player-1] # GC player number starts from 1, here we ned an list index which starts from 0