Last active
May 14, 2024 02:44
-
-
Save nitisht/5f5e0875af1356795036afdb9ad56f47 to your computer and use it in GitHub Desktop.
Use Spark to read / analyse / store Rosbag file formats for MinIO server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import time | |
from pyspark import SparkContext,SparkConf | |
import pyrosbag | |
from functools import partial | |
import pandas as pd | |
import numpy as np | |
from PIL import Image | |
from io import BytesIO | |
import rosbag | |
import cv2 | |
from cv_bridge import CvBridge | |
import subprocess | |
import os | |
from sensor_msgs.msg import Image | |
from cv_bridge import CvBridge | |
from PIL import Image | |
from io import BytesIO | |
import boto3 | |
conf = SparkConf().setAppName("ADAS") | |
sc =SparkContext(conf=conf) | |
sc._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId", "minio") | |
sc._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKeyId", "minio123") | |
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://localhost:9000") | |
sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true") | |
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") | |
fin = sc.newAPIHadoopFile( | |
path = "s3a://spark/HMB_1.bag", | |
inputFormatClass = "de.valtech.foss.RosbagMapInputFormat", | |
keyClass = "org.apache.hadoop.io.LongWritable", | |
valueClass = "org.apache.hadoop.io.MapWritable", | |
conf = {"RosbagInputFormat.chunkIdx":"/home/nitish/dev/rosbag/data/HMB_1.bag.idx.bin"}) | |
s3 = boto3.resource('s3', | |
endpoint_url='http://localhost:9000', | |
aws_access_key_id='minio', | |
aws_secret_access_key='minio123', | |
region_name='us-east-1') | |
def msg_map(r, func=str, conn={}): | |
from collections import namedtuple | |
from rosbag.bag import _get_message_type | |
if r[1]['header']['op'] == 2 and r[1]['header']['conn'] == conn['header']['conn']: | |
c = conn['data'] | |
c['datatype'] = str(c['type']) | |
c['msg_def'] = str(c['message_definition']) | |
c['md5sum'] = str(c['md5sum']) | |
c = namedtuple('GenericDict', c.keys())(**c) | |
msg_type = _get_message_type(c) | |
msg = msg_type() | |
msg.deserialize(r[1]['data']) | |
yield func(msg) | |
# store conn_a for further processing | |
conn_a = fin.filter(lambda r: r[1]['header']['op'] == 7).map(lambda r: r[1]) | |
#conn_a.coalesce(1).saveAsTextFile('s3a://spark/kv1.txt') | |
conn_d = {str(k['header']['topic']):k for k in conn_a.collect()} | |
# extraction of compressed images from bag | |
topics_compressed=['/center_camera/image_color/compressed'] | |
def f(msg): | |
return (msg.data) | |
res = fin.flatMap( | |
partial(msg_map, func=lambda r: r.data, conn=conn_d[topics_compressed[0]]) | |
).take(50) | |
## Convert compressed image bytearrays to CV2 format & upload to MinIO | |
count = 0 | |
for r in res: | |
img = cv2.imdecode(np.fromstring(BytesIO(r).read(), np.uint8), 1) | |
img_numpy = Image.fromarray(img, 'RGB') | |
memfile = BytesIO() | |
img_numpy.save(memfile,'PNG') | |
memfile.seek(0) | |
s3.Bucket('spark').put_object(Key="image/" + str(count),Body=memfile) | |
count += 1 |
Thank you for great information!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Pre-requisites:
bag.idx.bin
files created as explained here: https://github.com/valtech/ros_hadoopRun the file as