Skip to content

Instantly share code, notes, and snippets.

@oscarruesga
Last active January 21, 2016 21:32
Show Gist options
  • Select an option

  • Save oscarruesga/1fa90266cdbb2884545a to your computer and use it in GitHub Desktop.

Select an option

Save oscarruesga/1fa90266cdbb2884545a to your computer and use it in GitHub Desktop.
PROYECTO FIN DE PROGRAMA EXPERTO EN BIG DATA UTAD - 2016
package com.moolileo.meetup.model
case class MeetupEvent(
event_id: String,
event_name: Option[String],
event_url: Option[String],
time: Option[Long]
)
case class MeetupGroupTopics(
topic_name: String,
urlkey: Option[String]
)
case class MeetupGroup(
group_id: Long,
group_name: String,
group_city: Option[String],
group_country: String,
group_state: Option[String],
group_urlname: Option[String],
group_lat: String,
group_lon: String,
group_topics: List[MeetupGroupTopics]
)
case class MeetupMember(
member_id: Long,
member_name: Option[String],
other_services: Option[String],
photo: Option[String]
)
case class MeetupVenue(
venue_id: Option[Int],
venue_name: Option[String],
lat: Option[String],
lon: Option[String]
)
case class MeetupRsvp(
rsvp_id: Long,
response: String,
guests: Int,
mtime: Long,
visibility : String,
event: MeetupEvent,
group: MeetupGroup,
member: MeetupMember,
venue: MeetupVenue
)
case class TotalCountryAttendees (
country: String,
coordinates: String,
attendees: Long
)
case class CountryAttendeesByDate (
key: String,
country: String,
coordinates: String,
attendees: Long,
day: Long,
month: Long,
year: Long
)
case class GlobalTrendingTopics(
topic: String,
counter: Long,
date: Long
)
case class CountryTrendingTopics(
key: String,
country: String,
coordinates: String,
topic: String,
counter: Long,
date: Long
)
package com.moolileo.meetup
import _root_.com.typesafe.config.ConfigFactory
import akka.japi.Util._
final class Settings {
protected val config = ConfigFactory.load.getConfig("meetup-stream-reader")
val MeetupRSVPWebSocketUrl = config.getString("meetup.ws.url")
var SparkMaster: String = config.getString("spark.master")
var StreamingBatchInterval = config.getInt("spark.streaming.batch.interval")
var SparkExecutorMemory = config.getBytes("spark.executor.memory")
var SparkCoresMax = config.getInt("spark.cores.max")
var DeployJars: Seq[String] = immutableSeq(
config.getStringList("spark.jars")).filter(new java.io.File(_).exists)
var ESNode = config.getString("es.node")
var ESPort = config.getString("es.port")
}
package com.moolileo.meetup.streaming
import com.moolileo.meetup.Settings
import com.moolileo.meetup.streaming.processes.EsPersistStreamRSVP
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object MeetupStreaming {
val settings = new Settings
import settings._
def main(args: Array[String]): Unit = {
val Array(master, esNode) = args.take(2)
if (master != null && !master.isEmpty) SparkMaster = master
if (esNode != null && !esNode.isEmpty) ESNode = esNode
val conf = new SparkConf(true)
.setMaster(SparkMaster)
.setAppName(getClass.getSimpleName)
.setJars(DeployJars)
.set("spark.executor.memory", SparkExecutorMemory.toString)
.set("spark.cores.max", SparkCoresMax.toString)
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(StreamingBatchInterval))
ssc.checkpoint("/tmp/data/meetup.dat")
val stream = new EsPersistStreamRSVP
stream.start(ssc, MeetupRSVPWebSocketUrl, ESNode, ESPort)
}
}
package com.moolileo.meetup.streaming.processes
import java.text.SimpleDateFormat
import java.util.Calendar
import com.moolileo.meetup.model._
import com.moolileo.meetup.websocket.WebSocketReader
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import org.elasticsearch.spark._
class EsPersistStreamRSVP extends Serializable with Logging {
def start(ssc: StreamingContext, websocket: String, EsNode: String, EsPort:String) {
val stream = ssc.receiverStream[MeetupRsvp](new WebSocketReader(websocket, StorageLevel.MEMORY_ONLY_SER))
stream.checkpoint(Seconds(60))
//stream.repartition(2)
//Bulk store of stream in elasticsearch
stream.foreachRDD(rdd => {
rdd.saveToEs("meetup-rsvps/meetup-rsvps",
Map( "es.mapping.timestamp" -> "mtime",
"es.mapping.id" -> "rsvp_id",
"es.nodes" -> EsNode,
"es.port" -> EsPort))
})
// Filter Accepted RSVP
val rsvpAccepted = stream.filter(_.response == "yes")
// Number attendees by Country by Period of Time
val rsvpByCountryByDate = rsvpAccepted
.map(
rsvp => ((rsvp.group.group_country, rsvp.group.group_lat + "," + rsvp.group.group_lon, rsvp.mtime), rsvp.guests + 1)
)
.reduceByKey(_ + _)
.map { case ((country, coords, date), attendees) =>
CountryAttendeesByDate(
date + "-" + country,
country,
coords,
attendees,
date,
{
val date2calc = Calendar.getInstance
date2calc.setTimeInMillis(date)
val cal = Calendar.getInstance()
cal.set( Calendar.HOUR, 0)
cal.set( Calendar.MINUTE, 0)
cal.set( Calendar.SECOND, 0)
cal.set( Calendar.MILLISECOND, 0)
cal.set( Calendar.DATE, 1 )
cal.set( Calendar.MONTH, date2calc.get(Calendar.MONTH) )
cal.set( Calendar.YEAR, date2calc.get(Calendar.YEAR) )
cal.getTimeInMillis
},
{
val date2calc = Calendar.getInstance
date2calc.setTimeInMillis(date)
val cal = Calendar.getInstance()
cal.set( Calendar.HOUR, 0)
cal.set( Calendar.MINUTE, 0)
cal.set( Calendar.SECOND, 0)
cal.set( Calendar.MILLISECOND, 0)
cal.set( Calendar.DATE, 1 )
cal.set( Calendar.MONTH, 0 )
cal.set( Calendar.YEAR, date2calc.get(Calendar.YEAR) )
cal.getTimeInMillis
}
)
}
rsvpByCountryByDate.foreachRDD(rdd => {
rdd.cache
rdd.saveToEs("meetup_attending_by_date/meetup_attending_by_date",
Map("es.write.operation" -> "upsert",
"es.mapping.id" -> "key",
"es.update.script.params" -> "inc:attendees",
"es.update.script" -> "ctx._source.attendees+=inc",
"es.nodes" -> EsNode,
"es.port" -> EsPort
)
)
rdd.unpersist()
})
//rsvpByCountryByDate.foreachRDD( rdd => rdd.foreach( x => println(x.key + " : " + x.attendees + " : " + x.coordinates + " : " + x.day + " : " + x.month + " : " + x.year) ))
val dateformat = new SimpleDateFormat("yyyyMMdd")
// Global Trending Topics
val globalTrendingTopics = stream
.flatMap(rsvp => rsvp.group.group_topics)
.map(topic => (topic.topic_name, 1))
.reduceByKeyAndWindow((curr: Int, acc: Int) => curr + acc, Minutes(5), Seconds(10))
.filter(t => t._2 > 10) // min threshold = 5
.transform((rdd, time) => rdd.map {
case (topic: String, count) => GlobalTrendingTopics(topic, count, time.milliseconds)
})
//globalTrendingTopics.foreachRDD( rdd => rdd.foreach( x => println(x.topic + " : " + x.counter + " : " + x.date) ))
globalTrendingTopics.foreachRDD(rdd => {
rdd.cache
rdd.saveToEs("meetup_global_trending_topics/meetup_global_trending_topics",
Map("es.write.operation" -> "upsert",
"es.mapping.id" -> "topic",
"es.mapping.timestamp" -> "date",
"es.update.script.params" -> "inc:counter",
"es.update.script" -> "ctx._source.counter+=inc",
"es.nodes" -> EsNode,
"es.port" -> EsPort
))
rdd.unpersist()
})
def mapdata(group: MeetupGroup) = {
group.group_topics.map(x => (x, group.group_country, group.group_lat + "," + group.group_lon))
}
// Country Trending Topics
val countryTrendingTopics = stream
.flatMap(rsvp => mapdata(rsvp.group))
.map(x => ((x._1.topic_name, x._2, x._3), 1))
.reduceByKeyAndWindow((curr: Int, acc: Int) => curr + acc, Minutes(5), Seconds(10))
.filter(t => t._2 > 10) // min threshold = 5
.transform((rdd, time) => rdd.map {
case ((topic: String, country: String, coord: String), count) => CountryTrendingTopics(topic + "-" + country, country, coord, topic, count, time.milliseconds)
})
//countryTrendingTopics.foreachRDD( rdd => rdd.foreach( x => println(x.key + " : " + x.counter + " : " + x.coordinates + " : " + x.date) ))
countryTrendingTopics.foreachRDD(rdd => {
rdd.cache
rdd.saveToEs("meetup_country_trending_topics/meetup_country_trending_topics",
Map("es.write.operation" -> "upsert",
"es.mapping.id" -> "key",
"es.mapping.timestamp" -> "date",
"es.update.script.params" -> "inc:counter",
"es.update.script" -> "ctx._source.counter+=inc",
"es.nodes" -> EsNode,
"es.port" -> EsPort
))
rdd.unpersist()
})
ssc.start()
ssc.awaitTermination()
}
}
package com.moolileo.meetup.websocket
import com.moolileo.meetup.model.MeetupRsvp
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.json4s._
import org.json4s.jackson.JsonMethods._
import scalawebsocket.WebSocket
class WebSocketReader(url: String, storageLevel: StorageLevel)
extends Receiver[MeetupRsvp](storageLevel) with Logging
{
@volatile private var webSocket: WebSocket = _
def onStart() {
try{
logInfo("Connecting to WebSocket: " + url)
val newWebSocket = WebSocket().open(url).onTextMessage({ msg: String => parseJson(msg) })
setWebSocket(newWebSocket)
logInfo("Connected to: WebSocket" + url)
} catch {
case e: Exception => restart("Error starting WebSocket stream", e)
}
}
def onStop() {
setWebSocket(null)
logInfo("WebSocket receiver stopped")
}
private def setWebSocket(newWebSocket: WebSocket) = synchronized {
if (webSocket != null) {
webSocket.shutdown()
}
webSocket = newWebSocket
}
private def parseJson(jsonStr: String): Unit =
{
implicit lazy val formats = DefaultFormats
try {
var json = parse(jsonStr)
val rsvp = json.extract[MeetupRsvp]
store(rsvp)
} catch {
case e: MappingException => logError("Unable to map JSON message to MeetupRsvp object:" + e.msg)
case e: Exception => logError("Unable to map JSON message to MeetupRsvp object")
}
}
}
[
{
"_id": "MEETUP-Trending-Topics",
"_type": "dashboard",
"_source": {
"title": "MEETUP - Trending Topics",
"hits": 0,
"description": "",
"panelsJSON": "[{\"col\":1,\"id\":\"Evolución-de-los-trending-topics-por-pais\",\"panelIndex\":1,\"row\":1,\"size_x\":6,\"size_y\":8,\"type\":\"visualization\"},{\"col\":10,\"id\":\"Trending-Topics-por-País\",\"panelIndex\":2,\"row\":3,\"size_x\":3,\"size_y\":6,\"type\":\"visualization\"},{\"col\":7,\"id\":\"Global-Trending-Topics\",\"panelIndex\":3,\"row\":1,\"size_x\":3,\"size_y\":8,\"type\":\"visualization\"},{\"col\":10,\"id\":\"Total-Topics\",\"panelIndex\":4,\"row\":1,\"size_x\":3,\"size_y\":2,\"type\":\"visualization\"},{\"col\":1,\"id\":\"Evolución-de-los-topics-por-pais-en-las-últimas-12-horas-en-intervalos-de-1-hora\",\"panelIndex\":5,\"row\":9,\"size_x\":12,\"size_y\":8,\"type\":\"visualization\"}]",
"optionsJSON": "{\"darkTheme\":true}",
"uiStateJSON": "{\"P-1\":{\"spy\":{\"mode\":{\"fill\":false,\"name\":null}}},\"P-2\":{\"spy\":{\"mode\":{\"fill\":false,\"name\":null}}},\"P-4\":{\"spy\":{\"mode\":{\"fill\":false,\"name\":null}}}}",
"version": 1,
"timeRestore": true,
"timeTo": "now",
"timeFrom": "now-2w/w",
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"filter\":[{\"meta\":{\"index\":\"meetup_country_trending_topics\",\"key\":\"topic\",\"value\":\"new\",\"disabled\":false,\"negate\":false,\"alias\":null},\"query\":{\"match\":{\"topic\":{\"query\":\"new\",\"type\":\"phrase\"}}},\"$state\":{\"store\":\"appState\"}},{\"query\":{\"query_string\":{\"analyze_wildcard\":true,\"query\":\"*\"}}}]}"
}
}
},
{
"_id": "MEETUP-Analytics",
"_type": "dashboard",
"_source": {
"title": "MEETUP Analytics",
"hits": 0,
"description": "",
"panelsJSON": "[{\"col\":1,\"id\":\"Evolución-de-los-trending-topics-por-pais\",\"panelIndex\":1,\"row\":1,\"size_x\":6,\"size_y\":8,\"type\":\"visualization\"},{\"col\":7,\"id\":\"Trending-Topics-por-País\",\"panelIndex\":2,\"row\":1,\"size_x\":3,\"size_y\":8,\"type\":\"visualization\"},{\"id\":\"Global-Trending-Topics\",\"type\":\"visualization\",\"panelIndex\":3,\"size_x\":3,\"size_y\":8,\"col\":10,\"row\":1}]",
"optionsJSON": "{\"darkTheme\":true}",
"uiStateJSON": "{\"P-1\":{\"spy\":{\"mode\":{\"fill\":false,\"name\":null}}},\"P-2\":{\"spy\":{\"mode\":{\"fill\":false,\"name\":null}}}}",
"version": 1,
"timeRestore": true,
"timeTo": "now",
"timeFrom": "now-2w/w",
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"filter\":[{\"query\":{\"query_string\":{\"analyze_wildcard\":true,\"query\":\"*\"}}}]}"
}
}
},
{
"_id": "Asistencia-a-Eventos",
"_type": "dashboard",
"_source": {
"title": "Asistencia a Eventos",
"hits": 0,
"description": "",
"panelsJSON": "[{\"col\":1,\"id\":\"Distribución-de-las-asistencia-a-eventos-geográficamente\",\"panelIndex\":1,\"row\":1,\"size_x\":7,\"size_y\":8,\"type\":\"visualization\"},{\"id\":\"Distribución-de-las-confirmaciones-de-asistencia-durante-la-última-hora.\",\"type\":\"visualization\",\"panelIndex\":2,\"size_x\":4,\"size_y\":7,\"col\":8,\"row\":1}]",
"optionsJSON": "{\"darkTheme\":true}",
"uiStateJSON": "{\"P-1\":{\"spy\":{\"mode\":{\"fill\":false,\"name\":null}}}}",
"version": 1,
"timeRestore": true,
"timeTo": "now",
"timeFrom": "now-15m",
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"filter\":[{\"query\":{\"query_string\":{\"analyze_wildcard\":true,\"query\":\"*\"}}}]}"
}
}
},
{
"_id": "Trending-Topics-por-País",
"_type": "visualization",
"_source": {
"title": "Trending Topics por País",
"visState": "{\"type\":\"table\",\"params\":{\"perPage\":100,\"showPartialRows\":true,\"showMeticsAtAllLevels\":true},\"aggs\":[{\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"type\":\"terms\",\"schema\":\"bucket\",\"params\":{\"field\":\"country\",\"size\":10000,\"order\":\"desc\",\"orderBy\":\"1\"}},{\"id\":\"3\",\"type\":\"terms\",\"schema\":\"bucket\",\"params\":{\"field\":\"topic\",\"size\":10000,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
"uiStateJSON": "{}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"index\":\"meetup_country_trending_topics\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
}
}
},
{
"_id": "Evolución-de-los-topics-por-pais-en-las-últimas-12-horas-en-intervalos-de-1-hora",
"_type": "visualization",
"_source": {
"title": "Evolución de los topics por pais en las últimas 12 horas en intervalos de 1 hora",
"visState": "{\"type\":\"line\",\"params\":{\"shareYAxis\":true,\"addTooltip\":true,\"addLegend\":true,\"showCircles\":true,\"smoothLines\":true,\"interpolate\":\"linear\",\"scale\":\"linear\",\"drawLinesBetweenPoints\":true,\"radiusRatio\":9,\"times\":[],\"addTimeMarker\":false,\"defaultYExtents\":false,\"setYExtents\":false,\"yAxis\":{}},\"aggs\":[{\"id\":\"1\",\"type\":\"sum\",\"schema\":\"metric\",\"params\":{\"field\":\"counter\"}},{\"id\":\"2\",\"type\":\"date_range\",\"schema\":\"segment\",\"params\":{\"field\":\"date\",\"ranges\":[{\"from\":\"now-1h/h\",\"to\":\"now\"},{\"from\":\"now-2h/h\",\"to\":\"now-1h/h\"},{\"to\":\"now-2h/h\",\"from\":\"now-3h/h\"},{\"to\":\"now-3h/h\",\"from\":\"now-4h/h\"},{\"to\":\"now-4h/h\",\"from\":\"now-5h/h\"},{\"to\":\"now-5h/h\",\"from\":\"now-6h/h\"},{\"to\":\"now-6h/h\",\"from\":\"now-7h/h\"},{\"to\":\"now-7h/h\",\"from\":\"now-8h/h\"},{\"to\":\"now-8h/h\",\"from\":\"now-9h/h\"},{\"to\":\"now-9h/h\",\"from\":\"now-10h/h\"},{\"to\":\"now-10h/h\",\"from\":\"now-11h/h\"},{\"to\":\"now-11h/h\",\"from\":\"now-12h/h\"}]}},{\"id\":\"3\",\"type\":\"terms\",\"schema\":\"group\",\"params\":{\"field\":\"topic\",\"size\":20,\"order\":\"desc\",\"orderBy\":\"1\"}},{\"id\":\"4\",\"type\":\"terms\",\"schema\":\"split\",\"params\":{\"field\":\"country\",\"size\":100,\"order\":\"desc\",\"orderBy\":\"_term\",\"row\":true}}],\"listeners\":{}}",
"uiStateJSON": "{}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"index\":\"meetup_country_trending_topics\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
}
}
},
{
"_id": "Evolución-de-los-trending-topics-por-pais",
"_type": "visualization",
"_source": {
"title": "Evolución de los trending topics por pais",
"visState": "{\"type\":\"tile_map\",\"params\":{\"addTooltip\":true,\"heatBlur\":15,\"heatMaxZoom\":16,\"heatMinOpacity\":0.1,\"heatNormalizeData\":true,\"heatRadius\":25,\"isDesaturated\":true,\"mapType\":\"Scaled Circle Markers\",\"wms\":{\"enabled\":false,\"options\":{\"attribution\":\"Maps provided by USGS\",\"format\":\"image/png\",\"layers\":\"0\",\"styles\":\"\",\"transparent\":true,\"version\":\"1.3.0\"},\"url\":\"https://basemap.nationalmap.gov/arcgis/services/USGSTopo/MapServer/WMSServer\"}},\"aggs\":[{\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"type\":\"geohash_grid\",\"schema\":\"segment\",\"params\":{\"field\":\"coordinates\",\"autoPrecision\":true,\"mapZoom\":3,\"mapCenter\":[29.305561325527698,-54.228515625],\"precision\":2}}],\"listeners\":{}}",
"uiStateJSON": "{}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"index\":\"meetup_country_trending_topics\",\"query\":{\"query_string\":{\"analyze_wildcard\":true,\"query\":\"*\"}},\"filter\":[]}"
}
}
},
{
"_id": "Distribución-de-las-asistencia-a-eventos-geográficamente",
"_type": "visualization",
"_source": {
"title": "Distribución de las asistencia a eventos geográficamente",
"visState": "{\"type\":\"tile_map\",\"params\":{\"addTooltip\":true,\"heatBlur\":15,\"heatMaxZoom\":16,\"heatMinOpacity\":0.1,\"heatNormalizeData\":true,\"heatRadius\":25,\"isDesaturated\":true,\"mapType\":\"Heatmap\",\"wms\":{\"enabled\":false,\"options\":{\"attribution\":\"Maps provided by USGS\",\"format\":\"image/png\",\"layers\":\"0\",\"styles\":\"\",\"transparent\":true,\"version\":\"1.3.0\"},\"url\":\"https://basemap.nationalmap.gov/arcgis/services/USGSTopo/MapServer/WMSServer\"}},\"aggs\":[{\"id\":\"1\",\"type\":\"sum\",\"schema\":\"metric\",\"params\":{\"field\":\"attendees\"}},{\"id\":\"3\",\"type\":\"geohash_grid\",\"schema\":\"segment\",\"params\":{\"field\":\"coordinates\",\"autoPrecision\":true,\"precision\":2}}],\"listeners\":{}}",
"uiStateJSON": "{}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"index\":\"meetup_attending_by_date\",\"query\":{\"query_string\":{\"analyze_wildcard\":true,\"query\":\"*\"}},\"filter\":[]}"
}
}
},
{
"_id": "Global-Trending-Topics",
"_type": "visualization",
"_source": {
"title": "Global Trending Topics",
"visState": "{\"type\":\"table\",\"params\":{\"perPage\":100,\"showPartialRows\":true,\"showMeticsAtAllLevels\":true},\"aggs\":[{\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"type\":\"terms\",\"schema\":\"bucket\",\"params\":{\"field\":\"topic\",\"size\":10000,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
"uiStateJSON": "{}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"index\":\"meetup_global_trending_topics\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
}
}
},
{
"_id": "Total-Topics",
"_type": "visualization",
"_source": {
"title": "Total Topics",
"visState": "{\"type\":\"metric\",\"params\":{\"fontSize\":\"85\"},\"aggs\":[{\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",\"params\":{}}],\"listeners\":{}}",
"uiStateJSON": "{}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"index\":\"meetup_global_trending_topics\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
}
}
},
{
"_id": "Distribución-de-las-confirmaciones-de-asistencia-durante-la-última-hora.",
"_type": "visualization",
"_source": {
"title": "Distribución de las confirmaciones de asistencia durante la última hora.",
"visState": "{\"type\":\"pie\",\"params\":{\"shareYAxis\":true,\"addTooltip\":true,\"addLegend\":false,\"isDonut\":true},\"aggs\":[{\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"type\":\"date_range\",\"schema\":\"segment\",\"params\":{\"field\":\"day\",\"ranges\":[{\"from\":\"now-1m/m\",\"to\":\"now\"},{\"to\":\"now-1m/m\",\"from\":\"now-2m/m\"},{\"to\":\"now-2m/m\",\"from\":\"now-3m/m\"},{\"to\":\"now-3m/m\",\"from\":\"now-4m/m\"}]}},{\"id\":\"3\",\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"country\",\"exclude\":{\"flags\":[\"MULTILINE\",\"UNICODE_CASE\"]},\"include\":{\"flags\":[\"UNICODE_CASE\"]},\"size\":100,\"order\":\"desc\",\"orderBy\":\"_term\"}}],\"listeners\":{}}",
"uiStateJSON": "{}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": "{\"index\":\"meetup_attending_by_date\",\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}},\"filter\":[]}"
}
}
}
]
{
"venue": {
"venue_name": "The Garden",
"lon": 115.841782,
"lat": -31.937145,
"venue_id": 1538303
},
"visibility": "public",
"response": "yes",
"guests": 0,
"member": {
"member_id": 189731425,
"photo": "http://photos3.meetupstatic.com/photos/member/d/f/f/5/thumb_251337333.jpeg",
"member_name": "Lena"
},
"rsvp_id": 1580487765,
"mtime": 1447485273314,
"event": {
"event_name": "Leederville Crawl !!!",
"event_id": "225530730",
"time": 1447498800000,
"event_url": "http://www.meetup.com/Bar-Pub-Club-Crawlies/events/225530730/"
},
"group": {
"group_topics": [
{
"urlkey": "singles",
"topic_name": "Singles"
},
{
"urlkey": "wine",
"topic_name": "Wine"
},
{
"urlkey": "beer",
"topic_name": "Beer"
},
{
"urlkey": "couples",
"topic_name": "Couples"
},
{
"urlkey": "nightlife",
"topic_name": "Nightlife"
},
{
"urlkey": "socialnetwork",
"topic_name": "Social Networking"
},
{
"urlkey": "social",
"topic_name": "Social"
},
{
"urlkey": "fun-times",
"topic_name": "Fun Times"
},
{
"urlkey": "drinking",
"topic_name": "Drinking"
},
{
"urlkey": "adventure",
"topic_name": "Adventure"
},
{
"urlkey": "pubs-bars",
"topic_name": "Pubs and Bars"
},
{
"urlkey": "young",
"topic_name": "Young"
},
{
"urlkey": "food-and-drink",
"topic_name": "Food and Drink"
},
{
"urlkey": "perth",
"topic_name": "Perth"
}
],
"group_city": "Perth",
"group_country": "au",
"group_id": 18822423,
"group_name": "Bar bars, pub crawls and night clubs",
"group_lon": 115.84,
"group_urlname": "Bar-Pub-Club-Crawlies",
"group_lat": -31.96
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.dummy</groupId>
<artifactId>meetup-rsvps-spark-es-writer</artifactId>
<version>1.0</version>
<properties>
<scala.version>2.10.4</scala.version>
<spark.version>1.6.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>eu.piotrbuda</groupId>
<artifactId>scalawebsocket_2.10</artifactId>
<version>0.1.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>2.2.0-rc1</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.1</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>assembly.xml</descriptor>
</descriptors>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.moolileo.meetup.streaming.MeetupStreaming</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
#!/bin/bash
set -x
HOST="http://localhost:9200"
SHARDS=3
REPLICAS=2
curl -XDELETE "$HOST/meetup-rsvps"
curl -XPUT "$HOST/meetup-rsvps/" -d "{
\"settings\" : {
\"index\" : {
\"number_of_shards\" : $SHARDS,
\"number_of_replicas\" : $REPLICAS
}
}
}"
curl -XPUT "$HOST/meetup-rsvps/_mappings/meetup-rsvps" -d '{
"properties" : {
"event" : {
"properties" : {
"event_id" : {
"type" : "string"
},
"event_name" : {
"type" : "string"
},
"event_url" : {
"type" : "string",
"index": "not_analyzed"
},
"time" : {
"type": "date",
"doc_values": true,
"format": "epoch_millis"
}
}
},
"group" : {
"properties" : {
"group_city" : {
"type" : "string",
"index": "not_analyzed"
},
"group_country" : {
"type" : "string",
"index": "not_analyzed"
},
"group_id" : {
"type" : "long"
},
"group_lat" : {
"type" : "string"
},
"group_lon" : {
"type" : "string"
},
"group_name" : {
"type" : "string"
},
"group_state" : {
"type" : "string",
"index": "not_analyzed"
},
"group_topics" : {
"properties" : {
"topic_name" : {
"type" : "string",
"index": "not_analyzed"
},
"urlkey" : {
"type" : "string",
"index": "not_analyzed"
}
}
},
"group_urlname" : {
"type" : "string"
}
}
},
"guests" : {
"type" : "long"
},
"member" : {
"properties" : {
"member_id" : {
"type" : "long"
},
"member_name" : {
"type" : "string"
},
"photo" : {
"type" : "string"
}
}
},
"mtime" : {
"type": "date",
"doc_values": true,
"format": "epoch_millis"
},
"response" : {
"type" : "string",
"index": "not_analyzed"
},
"rsvp_id" : {
"type" : "long"
},
"venue" : {
"properties" : {
"lat" : {
"type" : "string"
},
"lon" : {
"type" : "string"
},
"location" : {
"type" : "geo_point"
},
"venue_id" : {
"type" : "long"
},
"venue_name" : {
"type" : "string"
}
}
},
"visibility" : {
"type" : "string",
"index": "no"
}
}
}'
curl -XDELETE "$HOST/meetup_total_attending"
curl -XPUT "$HOST/meetup_total_attending/" -d "{
\"settings\" : {
\"index\" : {
\"number_of_shards\" : $SHARDS,
\"number_of_replicas\" : $REPLICAS
}
}
}"
curl -XPUT "$HOST/meetup_total_attending/_mappings/meetup_total_attending" -d '{
"properties": {
"attendees": {
"type": "long"
},
"country": {
"type": "string"
},
"coordinates" : {
"type" : "geo_point"
}
}
}'
curl -XDELETE "$HOST/meetup_attending_by_date"
curl -XPUT "$HOST/meetup_attending_by_date/" -d "{
\"settings\" : {
\"index\" : {
\"number_of_shards\" : $SHARDS,
\"number_of_replicas\" : $REPLICAS
}
}
}"
curl -XPUT "$HOST/meetup_attending_by_date/_mappings/meetup_attending_by_date" -d '{
"_timestamp": {
"enabled": true
},
"properties": {
"attendees": {
"type": "long"
},
"country": {
"type": "string"
},
"coordinates" : {
"type" : "geo_point"
},
"day": {
"type": "date",
"doc_values": true,
"format": "epoch_millis"
},
"month": {
"type": "date",
"doc_values": true,
"format": "epoch_millis"
},
"year": {
"type": "date",
"doc_values": true,
"format": "epoch_millis"
},
"key": {
"type": "string"
}
}
}'
curl -XDELETE "$HOST/meetup_global_trending_topics"
curl -XPUT "$HOST/meetup_global_trending_topics/" -d "{
\"settings\" : {
\"index\" : {
\"number_of_shards\" : $SHARDS,
\"number_of_replicas\" : $REPLICAS
}
}
}"
curl -XPUT "$HOST/meetup_global_trending_topics/_mappings/meetup_global_trending_topics" -d '{
"_timestamp": {
"enabled": true
},
"properties": {
"topic": {
"type": "string"
},
"counter": {
"type": "long"
},
"date": {
"type": "date",
"doc_values": true,
"format": "epoch_millis"
}
}
}'
curl -XDELETE "$HOST/meetup_country_trending_topics"
curl -XPUT "$HOST/meetup_country_trending_topics/" -d "{
\"settings\" : {
\"index\" : {
\"number_of_shards\" : $SHARDS,
\"number_of_replicas\" : $REPLICAS
}
}
}"
curl -XPUT "$HOST/meetup_country_trending_topics/_mappings/meetup_country_trending_topics" -d '{
"_timestamp": {
"enabled": true
},
"properties": {
"counter": {
"type": "long"
},
"country": {
"type": "string"
},
"topic": {
"type": "string"
},
"coordinates" : {
"type" : "geo_point"
},
"key": {
"type": "string"
},
"date": {
"type": "date",
"doc_values": true,
"format": "epoch_millis"
}
}
}'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment