pyrcf.utils.data_io_utils
Submodules
Attributes
Classes
A zero MQ publisher for publishing json serialised data to specified port. |
|
PlotJugglerPublisher is used to stream data to PlotJuggler and plot them in real time. |
|
Base subscriber class. |
|
A zmq subscriber to read data published using PyRCFPublisherZMQ. |
|
A data parser for reading file created using ComponentDataRecorderDebugger. |
Package Contents
- class pyrcf.utils.data_io_utils.PyRCFPublisherZMQ(port: int = DEFAULT_ZMQ_PUBLISH_PORT)
Bases:
PyRCFPublisherBaseA zero MQ publisher for publishing json serialised data to specified port.
- socket
- publish(data: dict)
Publish the provided dictionary data as a JSON string to the pre-defined port.
- Parameters:
data (dict, optional) – Data as a dictionary.
- close()
Close publisher cleanly.
- __del__()
- class pyrcf.utils.data_io_utils.PlotJugglerPublisher
Bases:
PyRCFPublisherZMQPlotJugglerPublisher is used to stream data to PlotJuggler and plot them in real time. On PlotJuggler, choose ZMQ Subscriber for data streaming, the port is used to identify the data channel. Publishes data to tcp port 9872 (default for plotjuggler).
- pyrcf.utils.data_io_utils.DEFAULT_PLOTJUGGLER_PUBLISH_PORT: int = 9872
- pyrcf.utils.data_io_utils.DEFAULT_ZMQ_PUBLISH_PORT: int = 5001
- class pyrcf.utils.data_io_utils.PyRCFSubscriberBase
Bases:
abc.ABCBase subscriber class.
- abstract read_raw() str
Return raw string/bytes.
- read_json() Mapping[str, Any]
Return as python dict (json).
- close()
Close subscriber cleanly.
- class pyrcf.utils.data_io_utils.PyRCFSubscriberZMQ(port: int = DEFAULT_ZMQ_PUBLISH_PORT, topic: str = '')
Bases:
PyRCFSubscriberBaseA zmq subscriber to read data published using PyRCFPublisherZMQ.
- socket
- read_raw() str
Return raw string/bytes.
- close()
Close subscriber cleanly.
- __del__()
- class pyrcf.utils.data_io_utils.ComponentDataRecorderDataParser(file_name: str, load_on_init: bool = True)
A data parser for reading file created using ComponentDataRecorderDebugger.
- _filename
- _data = None
- _keys = None
- _data_length = None
- load_data()
- property num_datapoints
Length of data points in the loaded data.
- property key_names
- get_all_data() Mapping[str, List[Any]]
Get all data as dictionary of lists.
Available keys: [“t”, “dt”, “robot_state”, “global_plan”, “agent_outputs”, “robot_cmd”, “debug_data”]
e.g. data[‘t’] will be a list of timesteps from all the control loop iterations that were recorded.
- Returns:
- Output data.
Available keys: [“t”, “dt”, “robot_state”, “global_plan”, “agent_outputs”, “robot_cmd”, “debug_data”]
- Return type:
Mapping[str, List[Any]]
- get_all_data_for_key(key_name: Literal['t', 'dt', 'robot_state', 'global_plan', 'agent_outputs', 'robot_cmd', 'debug_data'], field_name: str = None, as_ndarray_if_possible: bool = True) List[Any] | numpy.ndarray
Get all the data for a specified field for all the objects of a key in the data dictionary.
E.g. get_all_data_for_key(“robot_state”,”state_estimates.pose.position”) will return a numpy array of all the state_estimate.pose.position values from the data. If objects are not numbers or numpy arrays, they are returned as a list. So, get_all_data_for_key(“robot_state”, “state_estimates.pose”) will return a list of Pose3D objects.
Also allows index and key access for valid attributes. e.g.: get_all_data_for_key(“robot_state”, “state_estimates.end_effector_states.ee_poses[0].position[0]”) is a valid call to get the x values of the end-effector pose of the first end-effector in the end-effector state object’s ee_poses attribute.
Also handles [:] situations. E.g. “state_estiamates.end_effector_states.ee_poses[0][:].position” will return positions of first end-effector (parsing position attribute of Pose3D across all time stamps).
- Parameters:
"t" (key_name (Literal[) – “robot_cmd”, “debug_data”]): The key to look for in the dictionary.
"dt" – “robot_cmd”, “debug_data”]): The key to look for in the dictionary.
"robot_state" – “robot_cmd”, “debug_data”]): The key to look for in the dictionary.
"global_plan" – “robot_cmd”, “debug_data”]): The key to look for in the dictionary.
"agent_outputs" – “robot_cmd”, “debug_data”]): The key to look for in the dictionary.
:param : “robot_cmd”, “debug_data”]): The key to look for in the dictionary. :param field_name: Nested attribute string to retrieve for the data in the
specified key value in the dictionary. Defaults to None.
- Parameters:
as_ndarray_if_possible (bool, optional) – If the retrieved objects is a number or numpy array, this option will allow returning a numpy array of the combined values. Defaults to True.
- Returns:
- List of retrieved attributes from all objects of the specified
key from the loaded data.
- Return type:
List[Any] | np.ndarray