Created
August 7, 2017 10:45
-
-
Save AlexArcPy/78232f105ac11f6cdc5857dc6c77b14d to your computer and use it in GitHub Desktop.
Generate a video street view stream based on a route polyline feature class using Mapillary API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
* generate a list of (x,y) for points along the route polyline feature class | |
with the set interval (using arcpy); | |
* save image from the API for each point trying to find the closest one as well | |
as the one looking at the next point in the route (using requests); | |
* merge the images into an output avi video file (using opencv); | |
''' | |
import os | |
import json | |
import requests | |
import arcpy | |
import cv2 | |
WKID = 4326 | |
CLIENT_ID = 'YOUR_CLIENT_ID' | |
IMAGES_FOLDER = r'C:\GIS\temp\RouteVision' | |
ROUTE = r'C:\GIS\Temp\ArcGISHomeFolder\scratch.gdb\MapillaryRoute' | |
IMAGES_URL = "https://a.mapillary.com/v3/images?client_id={client_id}".format( | |
client_id=CLIENT_ID) | |
#---------------------------------------------------------------------- | |
def get_points_along_line(input_line_feature, interval, number_of_points=None): | |
"""given a single polyline feature shape will return a list of point features | |
positioned along the line at set interval""" | |
out_pnts = [] | |
intervals_list = [] | |
if number_of_points: | |
interval = input_line_feature.length / number_of_points | |
i = 0 | |
while i < input_line_feature.length: | |
intervals_list.append(i) | |
out_pnts.append(input_line_feature.positionAlongLine(i)) | |
i += interval | |
out_pnts.append( | |
input_line_feature.positionAlongLine(input_line_feature.length)) | |
intervals_list.append(input_line_feature.length) | |
return out_pnts | |
#---------------------------------------------------------------------- | |
def get_coords_along_line(interval=50): | |
"""get list of (x,y) for every point along the line with the defined interval""" | |
line_geometry = [g[0] for g in arcpy.da.SearchCursor(ROUTE, 'SHAPE@')][0] | |
points = get_points_along_line(line_geometry, interval) | |
return [(pnt.projectAs(arcpy.SpatialReference(WKID)).firstPoint.X, | |
pnt.projectAs(arcpy.SpatialReference(WKID)).firstPoint.Y) | |
for pnt in points] | |
#---------------------------------------------------------------------- | |
def get_closest_feature(point, next_point): | |
"""return a geoJSON feature for the closest image to the input point""" | |
search_radius = 10 | |
search_step = 10 | |
res = requests.get( | |
IMAGES_URL, | |
params={ | |
'closeto': point, | |
'lookat': next_point, | |
'radius': search_radius | |
}) | |
while not res.json()['features']: | |
try: | |
print( | |
"Could not find image in radius of {0}m, will search with radius of {1}m". | |
format(search_radius, search_radius + search_step)) | |
search_radius += search_step | |
res = requests.get( | |
IMAGES_URL, | |
params={ | |
'closeto': point, | |
'lookat': next_point, | |
'radius': search_radius | |
}) | |
search_step += 10 | |
except: | |
print("Could not read image. Reason: ", res) | |
return None | |
search_radius = 10 | |
search_step = 10 | |
return res.json()['features'][0] | |
#---------------------------------------------------------------------- | |
def get_image_from_key(key): | |
"""return the image source file for the image key""" | |
url = 'https://d1cuyjsrcm0gby.cloudfront.net/{key}/thumb-1024.jpg'.format( | |
key=key) | |
return requests.get(url) | |
#---------------------------------------------------------------------- | |
def save_images_from_coords(points_coords): | |
"""download and save images for every (x,y) pair in the point list""" | |
for idx, point in enumerate(points_coords, 1): | |
try: | |
next_point = points_coords[idx] | |
except: | |
next_point = None | |
closest_feature = get_closest_feature(point, next_point) | |
if closest_feature: | |
image_data = get_image_from_key( | |
closest_feature['properties']['key']) | |
with open( | |
os.path.join(IMAGES_FOLDER, '{0}.jpg'.format(idx)), | |
'wb') as f: | |
f.write(image_data.content) | |
#---------------------------------------------------------------------- | |
def images_folder_to_video(input_folder, output_avi_path): | |
"""create an video file joining the input images""" | |
fourcc = cv2.VideoWriter_fourcc('M', 'J', 'P', 'G') | |
height, width, cols = cv2.imread( | |
os.path.join(input_folder, os.listdir(input_folder)[0])).shape | |
out = cv2.VideoWriter(output_avi_path, fourcc, 2, (width, height)) | |
for i in xrange(1, len(os.listdir(input_folder)) + 1): | |
img = cv2.imread(os.path.join(input_folder, '{0}.jpg'.format(i))) | |
out.write(img) | |
cv2.destroyAllWindows() | |
out.release() | |
if __name__ == '__main__': | |
points_coords = get_coords_along_line() | |
save_images_from_coords(points_coords) | |
images_folder_to_video( | |
IMAGES_FOLDER, output_avi_path=r'C:\GIS\Temp\output.avi') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment