Last active
November 18, 2022 12:40
-
-
Save robodhruv/f9e185ec14556e905373c5be72718d6d to your computer and use it in GitHub Desktop.
Script to extract messages on desired topics, and save them in a CSV file. Easy to modify as per requirements.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This script collects data from all the bag files in the directory passed as the first | |
argument when launching. In particular, it collects the rostopics listed in the | |
concerned list and generates a CSV file containing the same. The readings are triggered | |
by the trigger topic. | |
Author: Dhruv Ilesh Shah | |
[email protected] | [email protected] | |
""" | |
import rosbag | |
import numpy as np | |
import sys, os | |
import csv | |
import yaml | |
rostopics = ["/polaris_h_rad", "/lilred_velocity_controller/odom"] | |
trigger = 0 # This stores the index of the trigger topic. Here, /polaris_h_rad | |
print(chr(27) + "[2J") | |
print "Entering", sys.argv[1] | |
for filename in os.listdir(sys.argv[1]): | |
print "Opening", filename | |
bag = rosbag.Bag(os.path.join(sys.argv[1], filename)) | |
bag_info = yaml.load(bag._get_yaml_info()) | |
print "Found bag with duration", bag_info['duration'], "seconds" | |
print "Parsing", filename | |
csv_file = filename + ".csv" | |
csvfile = open(csv_file, 'wb') | |
writer = csv.writer(csvfile, delimiter=',') | |
flag = 0 | |
row = [] | |
for msg in bag.read_messages(topics=rostopics): | |
if msg[0] == rostopics[trigger]: | |
# Trigger found. Get ready to accept other topics | |
flag = 1 | |
row = [] | |
# print "- - - - - - - - - - - - - - - - - - - - - - - -" | |
if flag: | |
if msg[0] in rostopics: | |
if msg[0] == "/polaris_h_rad": | |
# print "Rad: ", msg[1].multi_array.layout.dim[0].size | |
row.append(msg[1].multi_array.layout.dim[0].size) | |
elif msg[0] == "/lilred_velocity_controller/odom": | |
# print "Odom: ", msg[1].pose.pose.position | |
row.extend([msg[1].pose.pose.position.x, msg[ | |
1].pose.pose.position.y, msg[1].pose.pose.position.z]) | |
flag = 0 | |
writer.writerow(row) | |
print "Parsed data. Saved as", csv_file |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
This script saves each topic in a bagfile as a csv. | |
Accepts a filename as an optional argument. Operates on all bagfiles in current directory if no argument provided | |
Written by Nick Speal in May 2013 at McGill University's Aerospace Mechatronics Laboratory | |
www.speal.ca | |
Supervised by Professor Inna Sharf, Professor Meyer Nahon | |
''' | |
import rosbag, sys, csv | |
import time | |
import string | |
import os #for file management make directory | |
import shutil #for file management, copy file | |
#verify correct input arguments: 1 or 2 | |
if (len(sys.argv) > 2): | |
print "invalid number of arguments: " + str(len(sys.argv)) | |
print "should be 2: 'bag2csv.py' and 'bagName'" | |
print "or just 1 : 'bag2csv.py'" | |
sys.exit(1) | |
elif (len(sys.argv) == 2): | |
listOfBagFiles = [sys.argv[1]] | |
numberOfFiles = "1" | |
print "reading only 1 bagfile: " + str(listOfBagFiles[0]) | |
elif (len(sys.argv) == 1): | |
listOfBagFiles = [f for f in os.listdir(".") if f[-4:] == ".bag"] #get list of only bag files in current dir. | |
numberOfFiles = str(len(listOfBagFiles)) | |
print "reading all " + numberOfFiles + " bagfiles in current directory: \n" | |
for f in listOfBagFiles: | |
print f | |
print "\n press ctrl+c in the next 10 seconds to cancel \n" | |
time.sleep(10) | |
else: | |
print "bad argument(s): " + str(sys.argv) #shouldnt really come up | |
sys.exit(1) | |
count = 0 | |
for bagFile in listOfBagFiles: | |
count += 1 | |
print "reading file " + str(count) + " of " + numberOfFiles + ": " + bagFile | |
#access bag | |
bag = rosbag.Bag(bagFile) | |
bagContents = bag.read_messages() | |
bagName = bag.filename | |
#create a new directory | |
folder = string.rstrip(bagName, ".bag") | |
try: #else already exists | |
os.makedirs(folder) | |
except: | |
pass | |
shutil.copyfile(bagName, folder + '/' + bagName) | |
#get list of topics from the bag | |
listOfTopics = [] | |
for topic, msg, t in bagContents: | |
if topic not in listOfTopics: | |
listOfTopics.append(topic) | |
for topicName in listOfTopics: | |
#Create a new CSV file for each topic | |
filename = folder + '/' + string.replace(topicName, '/', '_slash_') + '.csv' | |
with open(filename, 'w+') as csvfile: | |
filewriter = csv.writer(csvfile, delimiter = ',') | |
firstIteration = True #allows header row | |
for subtopic, msg, t in bag.read_messages(topicName): # for each instant in time that has data for topicName | |
#parse data from this instant, which is of the form of multiple lines of "Name: value\n" | |
# - put it in the form of a list of 2-element lists | |
msgString = str(msg) | |
msgList = string.split(msgString, '\n') | |
instantaneousListOfData = [] | |
for nameValuePair in msgList: | |
splitPair = string.split(nameValuePair, ':') | |
for i in range(len(splitPair)): #should be 0 to 1 | |
splitPair[i] = string.strip(splitPair[i]) | |
instantaneousListOfData.append(splitPair) | |
#write the first row from the first element of each pair | |
if firstIteration: # header | |
headers = ["rosbagTimestamp"] #first column header | |
for pair in instantaneousListOfData: | |
headers.append(pair[0]) | |
filewriter.writerow(headers) | |
firstIteration = False | |
# write the value from each pair to the file | |
values = [str(t)] #first column will have rosbag timestamp | |
for pair in instantaneousListOfData: | |
if len(pair) > 1: | |
values.append(pair[1]) | |
filewriter.writerow(values) | |
bag.close() | |
print "Done reading all " + numberOfFiles + " bag files." |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment