This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Title file |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val todayLog_DF = loadData(today,Extensions.ALL).withColumn("Ref",sqlBoolFunc(col("Date"))) | |
.withColumn("Date",from_unixtime(unix_timestamp(col("Date"),"MMM dd yyyy HH:mm:ss"),"MM dd yyyy HH:mm:ss")) | |
.selectExpr("Name","Date","SessID","Input","Output","Ref") | |
todayLog_DF |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
df.select( df("year").cast(IntegerType).as("year"), ... ) | |
to cast to the requested type? As a neat side effect, values not castable / "convertable" in that sense, will become null. | |
In case you need this as a helper method, use: | |
object DFHelper{ | |
def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = { | |
df.withColumn( cn, df(cn).cast(tpe) ) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cannot be Serializable, because it contains references to | |
Spark structures (i.e. SparkSession, SparkConf, etc...) as attributes. | |
Job aborted due to stage failure: Task not serializable: | |
If you see this error: | |
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: ... | |
The above error can be triggered when you intialize a variable on the driver (master), but then try to use it on one of the workers. In that case, Spark Streaming will try to serialize the object to send it over to the worker, and fail if the object is not serializable. Consider the following code snippet: | |
NotSerializable notSerializable = new NotSerializable(); | |
JavaRDD<String> rdd = sc.textFile("/tmp/myfile"); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<component name="libraryTable"> | |
<library name="SBT: com.fasterxml.jackson.core:jackson-annotations:2.8.5:jar"> | |
<CLASSES> | |
<root url="jar://$USER_HOME$/.ivy2/cache/com.fasterxml.jackson.core/jackson-annotations/bundles/jackson-annotations-2.8.5.jar!/" /> | |
</CLASSES> | |
<JAVADOC /> | |
<SOURCES> | |
<root url="jar://$USER_HOME$/.ivy2/cache/com.fasterxml.jackson.core/jackson-annotations/srcs/jackson-annotations-2.8.5-sources.jar!/" /> | |
</SOURCES> | |
</library> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class SomeEvent(value: Long) | |
val events = Source | |
.tick(0 seconds, 250 millis, "") | |
.zipWithIndex | |
.map { case (_, l) => | |
SomeEvent(l) | |
} | |
val group = Flow[SomeEvent].groupedWithin(100, 500 millis) // +/- 2 events per group |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val head: Row = brasSumCounting.head(1).toList(0) | |
val signInTotalCount = head.getAs[Long]("total_signin") | |
val logOffTotalCount = head.getAs[Long]("total_logoff") | |
val signInDistinctTotalCount = head.getAs[Long]("total_signin_distinct") | |
val logOffDistinctTotalCount = head.getAs[Long]("total_logoff_distinct") | |
val timeBrasCount = head.getAs[java.sql.Timestamp]("time") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//////////////////// Upsert ////////////////////////////////////////////////////////////////// | |
import java.sql._ | |
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch => | |
val dbc: Connection = DriverManager.getConnection("JDBCURL") | |
val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT") | |
batch.grouped("# Of Rows you want per batch").foreach { session => | |
session.foreach { x => | |
st.setDouble(1, x.getDouble(1)) | |
st.addBatch() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.SparkSession | |
import import java.util.concurrent.Executors | |
import scala.concurrent._ | |
import scala.concurrent.duration._ | |
object FancyApp { | |
def def appMain(args: Array[String]) = { | |
// configure spark | |
val spark = SparkSession | |
.builder |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### Lookup cache using mapWithState: | |
`import org.apache.spark.streaming.{ StreamingContext, Seconds } | |
val ssc = new StreamingContext(sc, batchDuration = Seconds(5)) | |
// checkpointing is mandatory | |
ssc.checkpoint("_checkpoints") | |
val rdd = sc.parallelize(0 to 9).map(n => (n, n % 2 toString)) |