Skip to content

Instantly share code, notes, and snippets.

View dipanjanS's full-sized avatar
:octocat:

Dipanjan (DJ) Sarkar dipanjanS

:octocat:
View GitHub Profile
from pyspark.sql import functions as F
(logs_df.agg(F.min(logs_df['content_size']).alias('min_content_size'),
F.max(logs_df['content_size']).alias('max_content_size'),
F.mean(logs_df['content_size']).alias('mean_content_size'),
F.stddev(logs_df['content_size']).alias('std_content_size'),
F.count(logs_df['content_size']).alias('count_content_size'))
.toPandas())
content_size_summary_df = logs_df.describe(['content_size'])
content_size_summary_df.toPandas()
udf_parse_time = udf(parse_clf_time)
logs_df = (logs_df.select('*', udf_parse_time(logs_df['timestamp'])
.cast('timestamp')
.alias('time'))
.drop('timestamp')
logs_df.show(10, truncate=True)
from pyspark.sql.functions import udf
month_map = {
'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
'Aug':8, 'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12
}
def parse_clf_time(text):
""" Convert Common Log time format into a Python datetime object
Args:
logs_df = logs_df.na.fill({'content_size': 0})
exprs = [count_null(col_name) for col_name in logs_df.columns]
logs_df.agg(*exprs).show()
null_content_size_df = base_df.filter(~base_df['value'].rlike(r'\s\d+$'))
null_content_size_df.count()
logs_df = logs_df[logs_df['status'].isNotNull()]
exprs = [count_null(col_name) for col_name in logs_df.columns]
logs_df.agg(*exprs).show()
bad_status_df = null_status_df.select(regexp_extract('value', host_pattern, 1).alias('host'),
regexp_extract('value', ts_pattern, 1).alias('timestamp'),
regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))
bad_status_df.show(truncate=False)
null_status_df = base_df.filter(~base_df['value'].rlike(r'\s(\d{3})\s'))
null_status_df.count()
from pyspark.sql.functions import col
from pyspark.sql.functions import sum as spark_sum
def count_null(col_name):
return spark_sum(col(col_name).isNull().cast('integer')).alias(col_name)
# Build up a list of column expressions, one per column.
exprs = [count_null(col_name) for col_name in logs_df.columns]
# Run the aggregation. The *exprs converts the list of expressions into