Skip to content

Instantly share code, notes, and snippets.

View Hungsiro506's full-sized avatar

Hưng Vũ Hungsiro506

View GitHub Profile
case class SomeEvent(value: Long)
val events = Source
.tick(0 seconds, 250 millis, "")
.zipWithIndex
.map { case (_, l) =>
SomeEvent(l)
}
val group = Flow[SomeEvent].groupedWithin(100, 500 millis) // +/- 2 events per group
<component name="libraryTable">
<library name="SBT: com.fasterxml.jackson.core:jackson-annotations:2.8.5:jar">
<CLASSES>
<root url="jar://$USER_HOME$/.ivy2/cache/com.fasterxml.jackson.core/jackson-annotations/bundles/jackson-annotations-2.8.5.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES>
<root url="jar://$USER_HOME$/.ivy2/cache/com.fasterxml.jackson.core/jackson-annotations/srcs/jackson-annotations-2.8.5-sources.jar!/" />
</SOURCES>
</library>
cannot be Serializable, because it contains references to
Spark structures (i.e. SparkSession, SparkConf, etc...) as attributes.
Job aborted due to stage failure: Task not serializable:
If you see this error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: ...
The above error can be triggered when you intialize a variable on the driver (master), but then try to use it on one of the workers. In that case, Spark Streaming will try to serialize the object to send it over to the worker, and fail if the object is not serializable. Consider the following code snippet:
NotSerializable notSerializable = new NotSerializable();
JavaRDD<String> rdd = sc.textFile("/tmp/myfile");
df.select( df("year").cast(IntegerType).as("year"), ... )
to cast to the requested type? As a neat side effect, values not castable / "convertable" in that sense, will become null.
In case you need this as a helper method, use:
object DFHelper{
def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
df.withColumn( cn, df(cn).cast(tpe) )
}
}
val todayLog_DF = loadData(today,Extensions.ALL).withColumn("Ref",sqlBoolFunc(col("Date")))
.withColumn("Date",from_unixtime(unix_timestamp(col("Date"),"MMM dd yyyy HH:mm:ss"),"MM dd yyyy HH:mm:ss"))
.selectExpr("Name","Date","SessID","Input","Output","Ref")
todayLog_DF
@Hungsiro506
Hungsiro506 / Redis TestApp
Created October 30, 2017 07:52 — forked from uromahn/ Redis TestApp
Jedis test with Redis Cluster
Title file
date -d '1 hour ago' '+%Y-%m-%d'
`Result` : 2017-11-13
date -d '1 hour ago' '+%Y-%m-%d %H'
Result : 2017-11-13 22
date -d '1 hour ago' '+%H'
22
date '+%H'
23
scala> val dns = spark.sqlContext.read.parquet("/data/dns/dns-extracted-two-hours/2017-11-22-02/out/")
dns: org.apache.spark.sql.DataFrame = [value: string]
scala> val splited = dns.withColumn("temp",split(col("value"),"\\t"))
splited: org.apache.spark.sql.DataFrame = [value: string, temp: array<string>]
scala> val df = splited.select((0 until 25).map(i => col("temp").getItem(i).as(s"col$i")): _*)
df: org.apache.spark.sql.DataFrame = [col0: string, col1: string ... 23 more fields]
scala> val npic = df.where("col24 = '-1'").select("col2")
scala> val df = spark.sqlContext.read.csv("/data/dns/cached_ip/*")
df: org.apache.spark.sql.DataFrame = [_c0: string]
scala> val cached = df
cached: org.apache.spark.sql.DataFrame = [_c0: string]
scala> val npic = spark.sqlContext.read.csv("/data/dns/npic_dns/*")
npic: org.apache.spark.sql.DataFrame = [_c0: string]
scala> val allo = spark.sqlContext.read.csv("/user/hungvd8/internet_user_profile_duration/Allocated-IPs2017-11-21.csv/*")
// Purpose:
// - Ensures that a resource is deterministically disposed of once it goes out of scope
// - Use this pattern when working with resources that should be closed or managed after use
//
// The benefit of this pattern is that it frees the developer from the responsibility of
// explicitly managing resources
import scala.io.Source
import java.io._