Created
August 6, 2021 15:33
-
-
Save cahartsell/f813b415ae28c26010e03108b4aaf8e1 to your computer and use it in GitHub Desktop.
Simple python script for testing the read speed of ROS bag files using multiple threads/processes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import glob | |
import os | |
import rosbag | |
import concurrent.futures | |
import multiprocessing | |
import time | |
USE_MULTIPROCESSING = True # Flag to switch between concurrent/multiprocessing libraries | |
NUM_WORKERS = 8 # Number of worker threads/processes | |
ROS_TOPIC = "" # Set ROS topic to read from bagfiles here. Code currently assumes message type is nav_msgs/Odometry | |
DATA_DIR = "" # Set directory containing ROS bag files here. | |
def read_datafile(file_path): | |
# Read some data from the bag file and store in a python list | |
bag = rosbag.Bag(file_path) | |
data = [] | |
for topic, msg, timestamp in bag.read_messages(ROS_TOPIC): | |
data.append(msg.pose.pose.position.x) | |
return data | |
if __name__ == "__main__": | |
# Find all bag files in DATA_DIR | |
data_file_names = glob.glob(os.path.join(DATA_DIR, "*.bag"), recursive=True) | |
# Get file sizes and compute average | |
file_sizes = [] | |
for file in data_file_names: | |
file_sizes.append(os.path.getsize(file)) | |
total_file_size_mb = sum(file_sizes) / float(1024 * 1024) | |
avg_file_size_mb = total_file_size_mb / len(file_sizes) | |
# Start time | |
start_time = time.clock_gettime(time.CLOCK_MONOTONIC) | |
# Read Datafiles either using multiple threads or multiple processes | |
if USE_MULTIPROCESSING: | |
with multiprocessing.Pool(NUM_WORKERS) as pool: | |
# Start the read operations asynchronously | |
completion_count = 0 | |
dataset = [] | |
for data in pool.imap_unordered(read_datafile, data_file_names): | |
# Store data to dataset | |
if data is not None: | |
dataset.append(data) | |
# Print update every 10 bag files | |
completion_count += 1 | |
if completion_count % 10 == 0: | |
print("Processed %d/%d bag files..." % (completion_count, len(data_file_names))) | |
print("Finished processing bag files.") | |
else: | |
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: | |
# Start the read operations and mark each future with its filename | |
future_to_file = {executor.submit(read_datafile, file): file for file in data_file_names} | |
# As each thread completes, store the resulting datafile and report progress | |
completion_count = 0 | |
dataset = [] | |
for future in concurrent.futures.as_completed(future_to_file): | |
file = future_to_file[future] | |
try: | |
data = future.result() | |
except Exception as exc: | |
print('%r generated an exception: %s' % (file, exc)) | |
else: | |
if data is not None: | |
dataset.append(data) | |
# Print update every 10 bag files | |
completion_count += 1 | |
if completion_count % 10 == 0: | |
print("Processed %d/%d bag files..." % (completion_count, len(future_to_file))) | |
print("Finished processing bag files.") | |
# Compute time statistics | |
end_time = time.clock_gettime(time.CLOCK_MONOTONIC) | |
dt = end_time - start_time | |
print("Number of workers: %d" % NUM_WORKERS) | |
print("Bag file count, total size (MB), avg size (MB): %d, %.1f, %.1f" % (len(data_file_names), total_file_size_mb, avg_file_size_mb)) | |
print("Total time (s): %.3f" % dt) | |
print("Time per file (s): %.3f" % (dt/float(len(data_file_names)))) | |
print("Throughput (MB/s): %.1f" % (total_file_size_mb / dt)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment