Last active
April 25, 2022 03:22
-
-
Save weimzh/4e97b35bcf70ba63cc650cbd54816207 to your computer and use it in GitHub Desktop.
multiple rosbag reader in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright (c) 2022, Wei Mingzhi. | |
# Released under BSD License. | |
# https://opensource.org/licenses/BSD-3-Clause | |
import rosbag | |
class _PriorityQueue(object): | |
def __init__(self): | |
self.queue = [] | |
def is_empty(self): | |
return len(self.queue) == 0 | |
def insert(self, data): | |
self.queue.append(data) | |
def delete(self): | |
max = 0 | |
for i in range(len(self.queue)): | |
if self.queue[i] > self.queue[max]: | |
max = i | |
item = self.queue[max] | |
del self.queue[max] | |
return item | |
class _BagMsgEntry(object): | |
def __init__(self, bag_it, msg, topic, ts): | |
self.bag_it = bag_it | |
self.msg = msg | |
self.topic = topic | |
self.ts = ts | |
def __gt__(self, other): | |
return self.ts < other.ts | |
class MultiBagReader(object): | |
def __init__(self, bags, topics=None, start_time=None, end_time=None): | |
self._pq = _PriorityQueue() | |
for bag in bags: | |
bag_it = bag.read_messages(topics=topics, start_time=start_time, end_time=end_time) | |
self._put_next_msg(bag_it) | |
def _put_next_msg(self, bag_it): | |
try: | |
topic, msg, t = bag_it.__next__() | |
entry = _BagMsgEntry(bag_it, msg, topic, t) | |
self._pq.insert(entry) | |
except StopIteration: | |
pass | |
def read_messages(self): | |
while not self._pq.is_empty(): | |
try: | |
m = self._pq.delete() | |
self._put_next_msg(m.bag_it) | |
yield (m.topic, m.msg, m.ts) | |
except IndexError: | |
return |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment