Skip to content

Instantly share code, notes, and snippets.

@cahartsell
Created August 6, 2021 15:33
Show Gist options
  • Save cahartsell/f813b415ae28c26010e03108b4aaf8e1 to your computer and use it in GitHub Desktop.
Save cahartsell/f813b415ae28c26010e03108b4aaf8e1 to your computer and use it in GitHub Desktop.
Simple python script for testing the read speed of ROS bag files using multiple threads/processes.
import glob
import os
import rosbag
import concurrent.futures
import multiprocessing
import time
USE_MULTIPROCESSING = True # Flag to switch between concurrent/multiprocessing libraries
NUM_WORKERS = 8 # Number of worker threads/processes
ROS_TOPIC = "" # Set ROS topic to read from bagfiles here. Code currently assumes message type is nav_msgs/Odometry
DATA_DIR = "" # Set directory containing ROS bag files here.
def read_datafile(file_path):
# Read some data from the bag file and store in a python list
bag = rosbag.Bag(file_path)
data = []
for topic, msg, timestamp in bag.read_messages(ROS_TOPIC):
data.append(msg.pose.pose.position.x)
return data
if __name__ == "__main__":
# Find all bag files in DATA_DIR
data_file_names = glob.glob(os.path.join(DATA_DIR, "*.bag"), recursive=True)
# Get file sizes and compute average
file_sizes = []
for file in data_file_names:
file_sizes.append(os.path.getsize(file))
total_file_size_mb = sum(file_sizes) / float(1024 * 1024)
avg_file_size_mb = total_file_size_mb / len(file_sizes)
# Start time
start_time = time.clock_gettime(time.CLOCK_MONOTONIC)
# Read Datafiles either using multiple threads or multiple processes
if USE_MULTIPROCESSING:
with multiprocessing.Pool(NUM_WORKERS) as pool:
# Start the read operations asynchronously
completion_count = 0
dataset = []
for data in pool.imap_unordered(read_datafile, data_file_names):
# Store data to dataset
if data is not None:
dataset.append(data)
# Print update every 10 bag files
completion_count += 1
if completion_count % 10 == 0:
print("Processed %d/%d bag files..." % (completion_count, len(data_file_names)))
print("Finished processing bag files.")
else:
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
# Start the read operations and mark each future with its filename
future_to_file = {executor.submit(read_datafile, file): file for file in data_file_names}
# As each thread completes, store the resulting datafile and report progress
completion_count = 0
dataset = []
for future in concurrent.futures.as_completed(future_to_file):
file = future_to_file[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (file, exc))
else:
if data is not None:
dataset.append(data)
# Print update every 10 bag files
completion_count += 1
if completion_count % 10 == 0:
print("Processed %d/%d bag files..." % (completion_count, len(future_to_file)))
print("Finished processing bag files.")
# Compute time statistics
end_time = time.clock_gettime(time.CLOCK_MONOTONIC)
dt = end_time - start_time
print("Number of workers: %d" % NUM_WORKERS)
print("Bag file count, total size (MB), avg size (MB): %d, %.1f, %.1f" % (len(data_file_names), total_file_size_mb, avg_file_size_mb))
print("Total time (s): %.3f" % dt)
print("Time per file (s): %.3f" % (dt/float(len(data_file_names))))
print("Throughput (MB/s): %.1f" % (total_file_size_mb / dt))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment