pyrcf.utils.data_io_utils

Submodules

Attributes

DEFAULT_PLOTJUGGLER_PUBLISH_PORT

DEFAULT_ZMQ_PUBLISH_PORT

Classes

PyRCFPublisherZMQ

A zero MQ publisher for publishing json serialised data to specified port.

PlotJugglerPublisher

PlotJugglerPublisher is used to stream data to PlotJuggler and plot them in real time.

PyRCFSubscriberBase

Base subscriber class.

PyRCFSubscriberZMQ

A zmq subscriber to read data published using PyRCFPublisherZMQ.

ComponentDataRecorderDataParser

A data parser for reading file created using ComponentDataRecorderDebugger.

Package Contents

class pyrcf.utils.data_io_utils.PyRCFPublisherZMQ(port: int = DEFAULT_ZMQ_PUBLISH_PORT)

Bases: PyRCFPublisherBase

A zero MQ publisher for publishing json serialised data to specified port.

socket
publish(data: dict)

Publish the provided dictionary data as a JSON string to the pre-defined port.

Parameters:

data (dict, optional) – Data as a dictionary.

close()

Close publisher cleanly.

__del__()
class pyrcf.utils.data_io_utils.PlotJugglerPublisher

Bases: PyRCFPublisherZMQ

PlotJugglerPublisher is used to stream data to PlotJuggler and plot them in real time. On PlotJuggler, choose ZMQ Subscriber for data streaming, the port is used to identify the data channel. Publishes data to tcp port 9872 (default for plotjuggler).

pyrcf.utils.data_io_utils.DEFAULT_PLOTJUGGLER_PUBLISH_PORT: int = 9872
pyrcf.utils.data_io_utils.DEFAULT_ZMQ_PUBLISH_PORT: int = 5001
class pyrcf.utils.data_io_utils.PyRCFSubscriberBase

Bases: abc.ABC

Base subscriber class.

abstract read_raw() str

Return raw string/bytes.

read_json() Mapping[str, Any]

Return as python dict (json).

close()

Close subscriber cleanly.

class pyrcf.utils.data_io_utils.PyRCFSubscriberZMQ(port: int = DEFAULT_ZMQ_PUBLISH_PORT, topic: str = '')

Bases: PyRCFSubscriberBase

A zmq subscriber to read data published using PyRCFPublisherZMQ.

context
socket
read_raw() str

Return raw string/bytes.

close()

Close subscriber cleanly.

__del__()
class pyrcf.utils.data_io_utils.ComponentDataRecorderDataParser(file_name: str, load_on_init: bool = True)

A data parser for reading file created using ComponentDataRecorderDebugger.

_filename
_data = None
_keys = None
_data_length = None
load_data()
property num_datapoints
Length of data points in the loaded data.
property key_names
get_all_data() Mapping[str, List[Any]]

Get all data as dictionary of lists.

Available keys: [“t”, “dt”, “robot_state”, “global_plan”, “agent_outputs”, “robot_cmd”, “debug_data”]

e.g. data[‘t’] will be a list of timesteps from all the control loop iterations that were recorded.

Returns:

Output data.

Available keys: [“t”, “dt”, “robot_state”, “global_plan”, “agent_outputs”, “robot_cmd”, “debug_data”]

Return type:

Mapping[str, List[Any]]

get_all_data_for_key(key_name: Literal['t', 'dt', 'robot_state', 'global_plan', 'agent_outputs', 'robot_cmd', 'debug_data'], field_name: str = None, as_ndarray_if_possible: bool = True) List[Any] | numpy.ndarray

Get all the data for a specified field for all the objects of a key in the data dictionary.

E.g. get_all_data_for_key(“robot_state”,”state_estimates.pose.position”) will return a numpy array of all the state_estimate.pose.position values from the data. If objects are not numbers or numpy arrays, they are returned as a list. So, get_all_data_for_key(“robot_state”, “state_estimates.pose”) will return a list of Pose3D objects.

Also allows index and key access for valid attributes. e.g.: get_all_data_for_key(“robot_state”, “state_estimates.end_effector_states.ee_poses[0].position[0]”) is a valid call to get the x values of the end-effector pose of the first end-effector in the end-effector state object’s ee_poses attribute.

Also handles [:] situations. E.g. “state_estiamates.end_effector_states.ee_poses[0][:].position” will return positions of first end-effector (parsing position attribute of Pose3D across all time stamps).

Parameters:
  • "t" (key_name (Literal[) – “robot_cmd”, “debug_data”]): The key to look for in the dictionary.

  • "dt" – “robot_cmd”, “debug_data”]): The key to look for in the dictionary.

  • "robot_state" – “robot_cmd”, “debug_data”]): The key to look for in the dictionary.

  • "global_plan" – “robot_cmd”, “debug_data”]): The key to look for in the dictionary.

  • "agent_outputs" – “robot_cmd”, “debug_data”]): The key to look for in the dictionary.

:param : “robot_cmd”, “debug_data”]): The key to look for in the dictionary. :param field_name: Nested attribute string to retrieve for the data in the

specified key value in the dictionary. Defaults to None.

Parameters:

as_ndarray_if_possible (bool, optional) – If the retrieved objects is a number or numpy array, this option will allow returning a numpy array of the combined values. Defaults to True.

Returns:

List of retrieved attributes from all objects of the specified

key from the loaded data.

Return type:

List[Any] | np.ndarray