Created
November 30, 2016 00:57
-
-
Save emaadmanzoor/cc12763a4133ca30fad8be065846ecc4 to your computer and use it in GitHub Desktop.
Described in detail here: http://www.eyeshalfclosed.com/blog/2016/07/22/spark-streaming-statistics/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Copyright 2016 Emaad Ahmed Manzoor | |
# License: Apache License, Version 2.0 | |
# http://www.eyeshalfclosed.com/blog/2016/07/22/spark-streaming-statistics/ | |
""" | |
Get Spark Streaming microbatch statistics: | |
- Batch start time | |
- Scheduling delay (in seconds) for each microbatch | |
- Processing time (in seconds) for each microbatch | |
Tested on Spark 2.0.0 running on YARN 2.7.2. | |
Time deltas are naive, do not run close to midnight (yet!). | |
Example usage: | |
python get_spark_streaming_batch_statistics.py \ | |
--master ec2-52-40-144-150.us-west-2.compute.amazonaws.com \ | |
--applicationId application_1469205272660_0006 | |
Output (batch start time, processing time, scheduling delay): | |
18:36:55 3.991 3783.837 | |
18:36:56 4.001 3786.832 | |
18:36:57 3.949 3789.862 | |
... | |
""" | |
import argparse | |
import datetime | |
import json | |
import re | |
import sys | |
import urllib2 | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--master', help='YARN ResourceManager URL', required=True) | |
parser.add_argument('--applicationId', help='YARN application ID', required=True) | |
args = vars(parser.parse_args()) | |
master_url = args['master'] | |
application_id = args['applicationId'] | |
stats_url = 'http://' + args['master'] + ':8088' + '/proxy/' + application_id +\ | |
'/api/v1/applications/' + application_id + '/jobs/' | |
try: | |
response = urllib2.urlopen(stats_url) | |
except: | |
print 'Could not access URL:', stats_url | |
sys.exit(-1) | |
batch_regex = re.compile(r'.*id=(\d+).*batch time (\d\d:\d\d:\d\d).*') | |
stats_json = json.loads(response.read()) | |
batch_stats = {} | |
for job in stats_json: | |
status = job['status'] | |
if status != 'SUCCEEDED': | |
continue # only stats for finished jobs | |
if not 'description' in job: | |
continue # job needs a batch start time | |
job_stats = {} | |
description = job['description'] | |
matches = batch_regex.match(description).groups() | |
batch_id = int(matches[0]) | |
batch_time = datetime.datetime.strptime(matches[1], "%H:%M:%S").time() | |
if not batch_id in batch_stats: | |
batch_stats[batch_id] = {'timestamp': batch_time, 'jobs': []} | |
job_stats = {'submissionTime': | |
datetime.datetime.strptime(job['submissionTime'], | |
'%Y-%m-%dT%H:%M:%S.%f%Z').time(), | |
'completionTime': | |
datetime.datetime.strptime(job['completionTime'], | |
'%Y-%m-%dT%H:%M:%S.%f%Z').time()} | |
batch_stats[batch_id]['jobs'].append(job_stats) | |
for batch_id, stats in sorted(batch_stats.iteritems()): | |
batch_start_time = stats['timestamp'] | |
jobs = stats['jobs'] | |
jobs = sorted(jobs, key=lambda x: x['submissionTime']) | |
scheduling_delay = datetime.datetime.combine(datetime.datetime(1,1,1,0,0,0), | |
jobs[0]['submissionTime']) -\ | |
datetime.datetime.combine(datetime.datetime(1,1,1,0,0,0), | |
batch_start_time) | |
processing_time = datetime.datetime.combine(datetime.datetime(1,1,1,0,0,0), | |
jobs[-1]['completionTime']) -\ | |
datetime.datetime.combine(datetime.datetime(1,1,1,0,0,0), | |
jobs[0]['submissionTime']) | |
print batch_start_time, | |
print processing_time.total_seconds(), | |
print scheduling_delay.total_seconds() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment