Исходный код message_manager

Сlass operates with protobuff messages. used to create and parse messages.

import logging
# TODO: Can't add google.protobuf to exe
# from google.protobuf import text_format

import messages_pb2

[документация]class MessageManager(): def __init__(self, logger = logging, head_buffer_size=4): """ Args: head_buffer_size (int): Value of heder byte buffer. Defaults to 4. """ self.logger = logger self.size = head_buffer_size self.init_request = None
[документация] def get_size(self): """ Returns: int: Value of heder byte buffer. """ return self.size
[документация] def create_requests_message(): """Create Empty protobuf class instance for request message. Returns: messages_pb2: Empty protobuf class instance. """ return messages_pb2.ActuatorRequests()
[документация] def create_answer_message(): """Create Empty protobuf class instance for answer message. Returns: messages_pb2: Empty protobuf class instance. """ return messages_pb2.SensorMeasurements()
[документация] def build_request_from_file(path): """Parsing data from message file to protobuf message instance. Args: path (string): path to filename.txt with message. Returns: messages_pb2: protobuf class instance of filled message from file. """ request = messages_pb2.ActuatorRequests() # TODO: Can't add google.protobuf to exe # with open(path, 'r', encoding="utf-8") as actuator_requests: # text_format.Parse(actuator_requests.read(), request) return request
[документация] def build_request_positions(self, positions): """Сreating an instance of the protobuff class and fills it with the values ​​of the actuators Args: positions (dict): key - servo name and values ​​- position. Returns: messages_pb2: protobuf class instance of filled message with servos. """ request = messages_pb2.ActuatorRequests() for pos in positions: motor = request.motor_positions.add() motor.name = pos motor.position = positions[pos] return self.generate_message(request)
[документация] def generate_message(message): """Generate bytes string for sending message. Args: message (messages_pb2): protobuf class instance of filled message. Returns: bytes: bytes string of message. """ return message.ByteSize().to_bytes(4, byteorder='big', signed=False) \ + message.SerializeToString()
[документация] def message_from_file(self, path): """ Function process the protobuff message. Measurement values of sensors, messages from player.exe and webots. Received messages are placed in the dictionary Args: path ([string]): path to filename.txt with default message. Returns: messages_pb2: protobuf class instance, with values ​​from file. """ return self.generate_message(self.build_request_from_file(path))
[документация] def get_answer_size(self, content_size): """ Сalculating message size from header bytes Args: content_size (bytes): Byte size of answer message. Returns: int: Size of answer message. """ size = int.from_bytes(content_size, byteorder='big', signed=False) # self.logger.debug("Byte size of received messages: %d", size) return size
[документация] def add_initial_request(self, sensor_name, sensor_time): """Generate bytes string for sending message. Args: sensor_name (string): protobuf class instance of filled message. Returns: bytes: bytes string of message. """ if self.init_request is None: self.init_request = messages_pb2.ActuatorRequests() sensor = self.init_request.sensor_time_steps.add() sensor.name = sensor_name sensor.timeStep = sensor_time
[документация] def build_initial_request(self): """Generate bytes string for initialization message. Returns: bytes: bytes string of message. """ return self.generate_message(self.init_request)
[документация] def parse_answer_message(self, data): """Parsing answer message from byte array to dict with measurements Args: data ([type]): [description] Returns: [type]: [description] """ message = messages_pb2.SensorMeasurements() message.ParseFromString(data) return self.parse_message(message)
[документация] def parse_message(message) -> dict: """ Function process the protobuff message. Measurement values of sensors, messages from player.exe and webots. Received messages are placed in the dictionary Args: message (messages_pb2): protobuf class instance of new message with filled or unfilled. Returns: dict: dict with keys of names sensors """ parse_message = {} parse_message.update( { "time": { "unix time": message.real_time, "sim time": message.time } }) for sensor in message.accelerometers: parse_message.update( { sensor.name: { "position": [ sensor.value.X, sensor.value.Y, sensor.value.Z ], "time": message.time } }) for sensor in message.cameras: parse_message.update( { sensor.name: { "width": sensor.width, "height": sensor.height, "quality": sensor.quality, "image": sensor.image, "time": message.time } }) for sensor in message.position_sensors: parse_message.update( { sensor.name: { "position": sensor.value, "time": message.time } }) for sensor in message.gyros: parse_message.update( { sensor.name: { "position": [ sensor.value.X, sensor.value.Y, sensor.value.Z ], "time": message.time } }) for sensor in message.gps: parse_message.update( { sensor.name: { "position": [sensor.value.X, sensor.value.Y], "time": message.time } }) if hasattr(message, "objects"): for sensor in message.objects: parse_message.update( { sensor.name: { "position": [sensor.value.X, sensor.value.Y], "time": message.time } }) for sensor in message.imu: parse_message.update( { sensor.name: { "position": [ sensor.angles.roll, sensor.angles.pitch, sensor.angles.yaw ], "time": message.time } }) for sensor in message.messages: parse_message.update( { "warnings": { "message_type": sensor.message_type, "text": sensor.text, "time": message.time } }) return parse_message