Skip to content

Instantly share code, notes, and snippets.

View Hungsiro506's full-sized avatar

Hưng Vũ Hungsiro506

View GitHub Profile
@Hungsiro506
Hungsiro506 / Redis TestApp
Created October 30, 2017 07:52 — forked from uromahn/ Redis TestApp
Jedis test with Redis Cluster
Title file
val todayLog_DF = loadData(today,Extensions.ALL).withColumn("Ref",sqlBoolFunc(col("Date")))
.withColumn("Date",from_unixtime(unix_timestamp(col("Date"),"MMM dd yyyy HH:mm:ss"),"MM dd yyyy HH:mm:ss"))
.selectExpr("Name","Date","SessID","Input","Output","Ref")
todayLog_DF
df.select( df("year").cast(IntegerType).as("year"), ... )
to cast to the requested type? As a neat side effect, values not castable / "convertable" in that sense, will become null.
In case you need this as a helper method, use:
object DFHelper{
def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
df.withColumn( cn, df(cn).cast(tpe) )
}
}
cannot be Serializable, because it contains references to
Spark structures (i.e. SparkSession, SparkConf, etc...) as attributes.
Job aborted due to stage failure: Task not serializable:
If you see this error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: ...
The above error can be triggered when you intialize a variable on the driver (master), but then try to use it on one of the workers. In that case, Spark Streaming will try to serialize the object to send it over to the worker, and fail if the object is not serializable. Consider the following code snippet:
NotSerializable notSerializable = new NotSerializable();
JavaRDD<String> rdd = sc.textFile("/tmp/myfile");
<component name="libraryTable">
<library name="SBT: com.fasterxml.jackson.core:jackson-annotations:2.8.5:jar">
<CLASSES>
<root url="jar://$USER_HOME$/.ivy2/cache/com.fasterxml.jackson.core/jackson-annotations/bundles/jackson-annotations-2.8.5.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES>
<root url="jar://$USER_HOME$/.ivy2/cache/com.fasterxml.jackson.core/jackson-annotations/srcs/jackson-annotations-2.8.5-sources.jar!/" />
</SOURCES>
</library>
case class SomeEvent(value: Long)
val events = Source
.tick(0 seconds, 250 millis, "")
.zipWithIndex
.map { case (_, l) =>
SomeEvent(l)
}
val group = Flow[SomeEvent].groupedWithin(100, 500 millis) // +/- 2 events per group
val head: Row = brasSumCounting.head(1).toList(0)
val signInTotalCount = head.getAs[Long]("total_signin")
val logOffTotalCount = head.getAs[Long]("total_logoff")
val signInDistinctTotalCount = head.getAs[Long]("total_signin_distinct")
val logOffDistinctTotalCount = head.getAs[Long]("total_logoff_distinct")
val timeBrasCount = head.getAs[java.sql.Timestamp]("time")
//////////////////// Upsert //////////////////////////////////////////////////////////////////
import java.sql._
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch =>
val dbc: Connection = DriverManager.getConnection("JDBCURL")
val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")
batch.grouped("# Of Rows you want per batch").foreach { session =>
session.foreach { x =>
st.setDouble(1, x.getDouble(1))
st.addBatch()
import org.apache.spark.sql.SparkSession
import import java.util.concurrent.Executors
import scala.concurrent._
import scala.concurrent.duration._
object FancyApp {
def def appMain(args: Array[String]) = {
// configure spark
val spark = SparkSession
.builder
### Lookup cache using mapWithState:
`import org.apache.spark.streaming.{ StreamingContext, Seconds }
val ssc = new StreamingContext(sc, batchDuration = Seconds(5))
// checkpointing is mandatory
ssc.checkpoint("_checkpoints")
val rdd = sc.parallelize(0 to 9).map(n => (n, n % 2 toString))