pyrcf.utils.data_io_utils ========================= .. py:module:: pyrcf.utils.data_io_utils Submodules ---------- .. toctree:: :maxdepth: 1 /autoapi/pyrcf/utils/data_io_utils/pyrcf_publisher/index /autoapi/pyrcf/utils/data_io_utils/pyrcf_subscriber/index /autoapi/pyrcf/utils/data_io_utils/recorded_data_parser/index Attributes ---------- .. autoapisummary:: pyrcf.utils.data_io_utils.DEFAULT_PLOTJUGGLER_PUBLISH_PORT pyrcf.utils.data_io_utils.DEFAULT_ZMQ_PUBLISH_PORT Classes ------- .. autoapisummary:: pyrcf.utils.data_io_utils.PyRCFPublisherZMQ pyrcf.utils.data_io_utils.PlotJugglerPublisher pyrcf.utils.data_io_utils.PyRCFSubscriberBase pyrcf.utils.data_io_utils.PyRCFSubscriberZMQ pyrcf.utils.data_io_utils.ComponentDataRecorderDataParser Package Contents ---------------- .. py:class:: PyRCFPublisherZMQ(port: int = DEFAULT_ZMQ_PUBLISH_PORT) Bases: :py:obj:`PyRCFPublisherBase` A zero MQ publisher for publishing json serialised data to specified port. .. py:attribute:: socket .. py:method:: publish(data: dict) Publish the provided dictionary data as a JSON string to the pre-defined port. :param data: Data as a dictionary. :type data: dict, optional .. py:method:: close() Close publisher cleanly. .. py:method:: __del__() .. py:class:: PlotJugglerPublisher Bases: :py:obj:`PyRCFPublisherZMQ` PlotJugglerPublisher is used to stream data to PlotJuggler and plot them in real time. On PlotJuggler, choose ZMQ Subscriber for data streaming, the port is used to identify the data channel. Publishes data to tcp port 9872 (default for plotjuggler). .. py:data:: DEFAULT_PLOTJUGGLER_PUBLISH_PORT :type: int :value: 9872 .. py:data:: DEFAULT_ZMQ_PUBLISH_PORT :type: int :value: 5001 .. py:class:: PyRCFSubscriberBase Bases: :py:obj:`abc.ABC` Base subscriber class. .. py:method:: read_raw() -> str :abstractmethod: Return raw string/bytes. .. py:method:: read_json() -> Mapping[str, Any] Return as python dict (json). .. py:method:: close() Close subscriber cleanly. .. py:class:: PyRCFSubscriberZMQ(port: int = DEFAULT_ZMQ_PUBLISH_PORT, topic: str = '') Bases: :py:obj:`PyRCFSubscriberBase` A zmq subscriber to read data published using `PyRCFPublisherZMQ`. .. py:attribute:: context .. py:attribute:: socket .. py:method:: read_raw() -> str Return raw string/bytes. .. py:method:: close() Close subscriber cleanly. .. py:method:: __del__() .. py:class:: ComponentDataRecorderDataParser(file_name: str, load_on_init: bool = True) A data parser for reading file created using `ComponentDataRecorderDebugger`. .. py:attribute:: _filename .. py:attribute:: _data :value: None .. py:attribute:: _keys :value: None .. py:attribute:: _data_length :value: None .. py:method:: load_data() .. py:property:: num_datapoints Length of data points in the loaded data. .. py:property:: key_names .. py:method:: get_all_data() -> Mapping[str, List[Any]] Get all data as dictionary of lists. Available keys: ["t", "dt", "robot_state", "global_plan", "agent_outputs", "robot_cmd", "debug_data"] e.g. `data['t']` will be a list of timesteps from all the control loop iterations that were recorded. :returns: Output data. Available keys: ["t", "dt", "robot_state", "global_plan", "agent_outputs", "robot_cmd", "debug_data"] :rtype: Mapping[str, List[Any]] .. py:method:: get_all_data_for_key(key_name: Literal['t', 'dt', 'robot_state', 'global_plan', 'agent_outputs', 'robot_cmd', 'debug_data'], field_name: str = None, as_ndarray_if_possible: bool = True) -> List[Any] | numpy.ndarray Get all the data for a specified field for all the objects of a key in the data dictionary. E.g. get_all_data_for_key("robot_state","state_estimates.pose.position") will return a numpy array of all the state_estimate.pose.position values from the data. If objects are not numbers or numpy arrays, they are returned as a list. So, `get_all_data_for_key("robot_state", "state_estimates.pose")` will return a list of Pose3D objects. Also allows index and key access for valid attributes. e.g.: get_all_data_for_key("robot_state", "state_estimates.end_effector_states.ee_poses[0].position[0]") is a valid call to get the x values of the end-effector pose of the first end-effector in the end-effector state object's `ee_poses` attribute. Also handles [:] situations. E.g. "state_estiamates.end_effector_states.ee_poses[0][:].position" will return positions of first end-effector (parsing `position` attribute of `Pose3D` across all time stamps). :param key_name (Literal[ "t": "robot_cmd", "debug_data"]): The key to look for in the dictionary. :param "dt": "robot_cmd", "debug_data"]): The key to look for in the dictionary. :param "robot_state": "robot_cmd", "debug_data"]): The key to look for in the dictionary. :param "global_plan": "robot_cmd", "debug_data"]): The key to look for in the dictionary. :param "agent_outputs": "robot_cmd", "debug_data"]): The key to look for in the dictionary. :param : "robot_cmd", "debug_data"]): The key to look for in the dictionary. :param field_name: Nested attribute string to retrieve for the data in the specified key value in the dictionary. Defaults to None. :type field_name: str, optional :param as_ndarray_if_possible: If the retrieved objects is a number or numpy array, this option will allow returning a numpy array of the combined values. Defaults to True. :type as_ndarray_if_possible: bool, optional :returns: List of retrieved attributes from all objects of the specified key from the loaded data. :rtype: List[Any] | np.ndarray