Note: reference article
Basic query example:
SELECT status_code,
COUNT(status_code) AS requests
FROM fastly_logs.example_com
GROUP BY requests
ORDER BY requests DESC| ''' | |
| Example Schema Validation | |
| Assumes the DataFrame `df` is already populated with schema: | |
| {id : int, day_cd : 8-digit code representing date, category : varchar(24), type : varchar(10), ind : varchar(1), purchase_amt : decimal(18,6) } | |
| Runs various checks to ensure data is valid (e.g. no NULL id and day_cd fields) and schema is valid (e.g. [category] cannot be larger than varchar(24)) | |
| ''' |
| #/usr/bin/python3 | |
| # -*- coding: utf-8 -*- | |
| import logging | |
| import airflow | |
| from airflow import DAG | |
| from datetime import timedelta, datetime | |
| from airflow.operators.dummy_operator import DummyOperator | |
| from airflow.operators.python_operator import PythonOperator | |
| from airflow.operators.http_operator import SimpleHttpOperator |
Note: reference article
Basic query example:
SELECT status_code,
COUNT(status_code) AS requests
FROM fastly_logs.example_com
GROUP BY requests
ORDER BY requests DESCNote: reference article
Basic query example:
SELECT status_code,
COUNT(status_code) AS requests
FROM fastly_logs.example_com
GROUP BY requests
ORDER BY requests DESC| CREATE EXTERNAL TABLE IF NOT EXISTS default.table | |
| ( | |
| `id` int, | |
| `name` string, | |
| `timestamp` string, | |
| `is_debug` boolean | |
| ) | |
| ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' | |
| WITH SERDEPROPERTIES ( | |
| 'escapeChar'='\\', |
| 1.mapPartition() instead of map() - when some expensive initializations like DBconnection need to be done | |
| 2.RDD Parallelism: for No parent RDDs, example, sc.parallelize(',,,',4),Unless specified YARN will try to use as many CPU cores as available | |
| This could be tuned using spark.default.parallelism property. | |
| - to find default parallelism use sc.defaultParallelism | |
| rdd.getNumPartitions() | |
| rdd = sc.parallelize(<value>, numSlices=4) | |
| rdd.getNumPartitions() will return 4 | |
| void sixteenRandomBytes(unsigned char buf[16]) { | |
| for (int i = 0; i < 16; i++) { | |
| buf[i] = rand() & 0xff; | |
| } | |
| } | |
| // PKCS #7 padding | |
| // Do this before encrypting to get the message | |
| // up to a multiple of 16 bytes. | |
| size_t pad(unsigned char *buf, size_t messageLength) { |
| from pyspark.sql.types import StringType, FloatType, StructField, StructType | |
| from pyspark.sql import SparkSession, SQLContext, Row | |
| import pyspark | |
| # spark initialization | |
| spark_context = pyspark.SparkContext.getOrCreate() | |
| spark_session = SparkSession(spark_context) \ | |
| .builder \ | |
| .enableHiveSupport() \ | |
| .getOrCreate() |
| package com.test; | |
| import com.test.schema.ContactType; | |
| import org.apache.kafka.clients.consumer.ConsumerConfig; | |
| import org.apache.kafka.clients.consumer.ConsumerRecord; | |
| import org.apache.spark.SparkConf; | |
| import org.apache.spark.api.java.JavaPairRDD; | |
| import org.apache.spark.api.java.function.*; | |
| import org.apache.spark.streaming.Durations; | |
| import org.apache.spark.streaming.api.java.JavaDStream; |
| package com.experiments.calvin | |
| import org.apache.kafka.common.serialization.StringDeserializer | |
| import org.apache.spark.sql.SparkSession | |
| import org.apache.spark.streaming.kafka010._ | |
| import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent | |
| import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe | |
| import org.apache.spark.streaming.{Seconds, StreamingContext} | |
| object ExampleApp extends App { |