Source code for SAMPLE_TEAM.gcreceiver

#!/usr/bin/env python3
#-*- coding:utf-8 -*-

"""
This module shows how the GameController Communication protocol can be used
in python and also allows to be changed such that every team using python to
interface with the GC can utilize the new protocol.

.. moduleauthor:: Nils Rokita <0rokita@informatik.uni-hamburg.de>
.. moduleauthor:: Robert Kessler <8kessler@informatik.uni-hamburg.de>

Modded by Egor Davydenko egordv@gmail.com for elsiros league

"""

import socket
import time
import logging
import argparse
import threading

# Requires construct==2.5.3
from construct import Container, ConstError
from gamestate import GameState, ReturnData, GAME_CONTROLLER_RESPONSE_VERSION

[docs]logger = logging.getLogger('game_controller')
logger.setLevel(logging.DEBUG)
[docs]console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s")) logger.addHandler(console_handler)
[docs]DEFAULT_LISTENING_HOST = '0.0.0.0'
[docs]GAME_CONTROLLER_LISTEN_PORT = 3838
[docs]GAME_CONTROLLER_ANSWER_PORT = 3939
[docs]parser = argparse.ArgumentParser()
parser.add_argument('--team', type=int, default=1, help="team ID, default is 1") parser.add_argument('--player', type=int, default=1, help="player ID, default is 1") parser.add_argument('--goalkeeper', action="store_true", help="if this flag is present, the player takes the role of the goalkeeper")
[docs]class GameStateReceiver(object): """ This class puts up a simple UDP Server which receives the *addr* parameter to listen to the packages from the game_controller. If it receives a package it will be interpreted with the construct data structure and the :func:`on_new_gamestate` will be called with the content. After this we send a package back to the GC """ def __init__(self, team, player, is_goalkeeper, addr=(DEFAULT_LISTENING_HOST, GAME_CONTROLLER_LISTEN_PORT), answer_port=GAME_CONTROLLER_ANSWER_PORT): # Information that is used when sending the answer to the game controller self.team = team self.player = player self.man_penalize = True self.is_goalkeeper = is_goalkeeper # The address listening on and the port for sending back the robots meta data self.addr = addr self.answer_port = answer_port # The state and time we received last form the GC self.state = None self.time = None # The socket and whether it is still running self.socket = None self.running = True self._open_socket()
[docs] def _open_socket(self): """ Erzeugt das Socket """ self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(self.addr) self.socket.settimeout(0.5) self.socket2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self.socket2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
[docs] def receive_forever(self): """ Waits in a loop that is terminated by setting self.running = False """ while self.running: try: self.receive_once() except IOError as e: logger.debug("Fehler beim Senden des KeepAlive: " + str(e))
[docs] def receive_once(self): """ Receives a package and interprets it. Calls :func:`on_new_gamestate` Sends an answer to the GC """ try: data, peer = self.socket.recvfrom(GameState.sizeof()) #print(len(data)) # Throws a ConstError if it doesn't work parsed_state = GameState.parse(data) # Assign the new package after it parsed successful to the state self.state = parsed_state self.time = time.time() # Call the handler for the package self.on_new_gamestate(self.state) # Answer the GameController self.answer_to_gamecontroller(peer) except AssertionError as ae: logger.error(ae.message) except socket.timeout: logger.warning("Socket timeout") except ConstError: logger.warning("Parse Error: Probably using an old protocol!") except Exception as e: logger.exception(e) pass
[docs] def answer_to_gamecontroller(self, peer): """ Sends a life sign to the game controller """ return_message = 0 if self.man_penalize else 2 if self.is_goalkeeper: return_message = 3 data = Container( header=b"RGrt", version=GAME_CONTROLLER_RESPONSE_VERSION, team=self.team, player=self.player, message=return_message) try: destination = peer[0], GAME_CONTROLLER_ANSWER_PORT self.socket.sendto(ReturnData.build(data), destination) except Exception as e: logger.log("Network Error: %s" % str(e))
[docs] def on_new_gamestate(self, state): """ Is called with the new game state after receiving a package Needs to be implemented or set :param state: Game State """ raise NotImplementedError()
[docs] def get_last_state(self): return self.state, self.time
[docs] def get_time_since_last_package(self): return time.time() - self.time
[docs] def stop(self): self.running = False
[docs] def set_manual_penalty(self, flag): self.man_penalize = flag
[docs]class SampleGameStateReceiver(GameStateReceiver):
[docs] def on_new_gamestate(self, state): print(state) print(state.secondary_state_info)
[docs]class ThreadedGameStateReceiver(GameStateReceiver): def __init__(self, team, player, is_goalkeeper, addr=(DEFAULT_LISTENING_HOST, GAME_CONTROLLER_LISTEN_PORT), answer_port=GAME_CONTROLLER_ANSWER_PORT): super().__init__(team, player, is_goalkeeper, addr=addr, answer_port=answer_port) self.receive_thread = threading.Thread(target=self.receive_forever) self.receive_thread.daemon=True # force the thread to terminate when the main process ends self.team_state = None self.player_state = None
[docs] def start(self): self.receive_thread.start()
[docs] def on_new_gamestate(self, state): for team in state['teams']: if team.team_number == self.team: self.team_state = team self.player_state = team['players'][self.player-1] # GC player number starts from 1, here we ned an list index which starts from 0