Skip to content

Instantly share code, notes, and snippets.

private def mergeValues(value1: GroupByValue, value2: GroupByValue): GroupByValue = {
if (value2.days.size > value1.days.size) {
value2.count = value1.count + value2.count
value1.days.foreach(d => value2.days.add(d))
value2
}
else {
value1.count = value1.count + value2.count
value2.days.foreach(d => value1.days.add(d))
value1
private def mergeValues(value1: (Int, mutable.Set[Int]), value2: (Int, mutable.Set[Int])): (Int, mutable.Set[Int]) = {
val newCount = value1._1 + value2._1
val dates = value1._2
dates.foreach(d => value2._2.add(d))
(newCount, value2._2)
}
val issueDate = LocalDate.parse(row(aggFieldsOffset.get("issue date").get), ISSUE_DATE_FORMAT)
val issueDateValues = mutable.Set[Int]()
issueDateValues.add(issueDate.toEpochDay.toInt)
result = (fieldOffset.map(fieldInfo => row(fieldInfo._2)).mkString(","), (1, issueDateValues))
aggValue.map {
case (key, value) => Array(key, value._1, value._2.mkString(",")).mkString("\t")
}.saveAsTextFile(s"/data/output/${now}", classOf[GzipCodec])
aggValue.map {
case (key, value) => Array(key, value._1, value._2.mkString(",")).mkString("\t")
}.saveAsTextFile(s"/data/output/${now}")
INFO: ssl initialised in ${jetty.base}/start.d/ssl.ini (created)
INFO: ssl enabled in /data/segmentation/segplat-deployments/app/application_secure/bin/${jetty.base}/start.d/ssl.ini
INFO: server initialised in ${jetty.base}/start.ini
INFO: server enabled in ${jetty.base}/start.ini
INFO: server enabled in <transitive>
val topXNumbers = randomNumbers
.filter(_ > 1000) //Stage 1
.map(value => (value, 1)) // Stage 1
.groupByKey() //Stage 2
.map(value => (value._1, value._2.sum)) //Stage 2
.sortBy(_._2, false) //Stage 3
.count() // Stage 3
/*
Code for
case class Trade(symbol: String, exchange: String, qty: Int, price: Double)
*/
public class Trade implements Product, Serializable
{
private final String symbol;
public double price(){
return this.price;
/*
Code for (String,String,Int,Double)
*/
public class Tuple4<T1, T2, T3, T4> implements Product4<T1, T2, T3, T4>, Serializable
{
private final T1 _1;
public T1 _1(){return (T1)this._1;}
public T2 _2(){
return (T2)this._2;
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>