Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save javierluraschi/169589b50429ba75a91b3f8f902193ef to your computer and use it in GitHub Desktop.
Save javierluraschi/169589b50429ba75a91b3f8f902193ef to your computer and use it in GitHub Desktop.
Analyzing Twitter Stream using Spark and R
https://kafka.apache.org/quickstart

wget http://apache.claz.org/kafka/2.1.0/kafka_2.12-2.1.0.tgz
tar -xzf kafka_2.12-2.1.0.tgz

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --zookeeper localhost:2181

http://dev.twitter.com

sudo ln -s /usr/lib/gcc/x86_64-amazon-linux/6.4.1/libgomp.spec /usr/lib64/libgomp.spec
sudo ln -s /usr/lib/gcc/x86_64-amazon-linux/6.4.1/libgomp.a /usr/lib64/libgomp.a
sudo ln -s /usr/lib64/libgomp.so.1.0.0 /usr/lib64/libgomp.so

R CMD javareconf -e 

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
install.packages("rJava")
install.packages("rtweet")
install.packages("rkafka")

rtweet::create_token("appname", "", "", "", "")

producer <- rkafka::rkafka.createProducer("localhost:9092", "sync")
rkafka::rkafka.send(producer, "test", "localhost", "hello")

produce_tweets <- function(search, producer, topic, last = data.frame(), iters = 1) {
    if (iters <= 0) return(last)
    
    last_id <- last[nrow(last),]$status_id
    tweets <- rtweet::search_tweets(paste(search, "filter:twimg"), max_id = last_id)
    
    tweets <- tweets[!identical(tweets$status_id, last_id),]
    
    if (nrow(tweets) > 0) {
        tweets_subset <- tweets %>% select(
            created_at, screen_name, text, is_retweet, favorite_count, retweet_count, media_url, location
        )
        
        for (idx in 1:nrow(tweets_subset)) {
            capture.output(rkafka::rkafka.send(producer, topic, "localhost", jsonlite::toJSON(tweets_subset[idx,])))
            Sys.sleep(0.1)
        }
    }
    
    produce_tweets(search, producer, topic, tweets, iters - 1)
}

produce_tweets("sparklyr", producer, "test")
install.packages("googleAuthR")
devtools::install_github("javierluraschi/RoogleVision")

googleAuthR::gar_auth_service(scope = "https://www.googleapis.com/auth/cloud-platform", json_file="cloudml.json")

image_response <- RoogleVision::getGoogleVisionResponse("https://media-cdn.tripadvisor.com/media/photo-s/02/6b/c2/19/filename-48842881-jpg.jpg")
library(shiny)
library(sparklyr)
library(dplyr)

config <- spark_config()
config$sparklyr.shell.packages <- "org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2"
config$sparklyr.shell.files <- "cloudml.json"
sc <- spark_connect(master = "yarn", config = config)

read_options <- list(kafka.bootstrap.servers = "localhost:9092", subscribe = "test")

stream <- stream_read_kafka(sc, options = read_options) %>%
    stream_write_memory("stream")

tbl(sc, "stream") %>%
  transmute(value = as.character(value)) %>%
  transmute(screen_name = get_json_object(value, '$[0].media_url[0]')) %>% filter(nchar(screen_name) > 4)

tbl(sc, "stream") %>% 
    transmute(value = as.character(value)) %>%
    transmute(media = get_json_object(value, '$[0].media_url[0]')) %>% filter(nchar(media) > 5) %>%
    head(n=2) %>%
    spark_apply(function(df, media, context) {
        googleAuthR::gar_auth_service(scope = "https://www.googleapis.com/auth/cloud-platform", json_file="cloudml.json")
        RoogleVision::getGoogleVisionResponse(media)
    }, columns = c(media = "character", lapply(image_response, class)), group_by = "media")
tbl(sc, "stream") %>%
  transmute(value = as.character(value)) %>%
  transmute(text = get_json_object(value, '$[0].text')) %>%
  filter(nchar(text) > 4) %>%
  ft_tokenizer(input_col = "text", output_col = "words") %>%
  transmute(word = explode(words)) %>%
  filter(nchar(text) > 4) %>%
  group_by(word) %>%
  summarize(total = n()) %>%
  arrange(desc(total)) %>%
  head(n = 10)
library(shiny)
library(wordcloud)
library(dplyr)

image_labeler <- function(df) {
  googleAuthR::gar_auth_service(
    scope = "https://www.googleapis.com/auth/cloud-platform",
    json_file = "cloudml.json")
    
  RoogleVision::getGoogleVisionResponse(
    df$image, download = FALSE)$description
}

ui <- fluidPage(
  titlePanel("Realtime Twitter Data with Spark and R"),
  mainPanel(
    plotOutput("aggregatePlot"),
    plotOutput("labelsPlot")
  )
)

server <- function(input, output) {
  twitterStream <- stream_read_kafka(
    sc,
    options = list(
      kafka.bootstrap.servers = "localhost:9092",
      subscribe = "test")) %>%
    transmute(value = as.character(value))
  
  twitterWords <- twitterStream %>%
    transmute(text = get_json_object(
      value, "$[0].text")) %>%
    filter(nchar(text) > 4) %>%
    ft_tokenizer("text", "split") %>%
    ft_stop_words_remover("split", "words") %>%
    transmute(word = explode(words)) %>%
    group_by(word) %>%
    summarize(total = n()) %>%
    reactiveSpark(intervalMillis = 5000)
  
  twitterLabels <- twitterStream %>%
    transmute(image = get_json_object(
      value, "$[0].media_url[0]")) %>%
    filter(nchar(image) > 5) %>%
    spark_apply(
      image_labeler,
      columns = list(label = "character")) %>%
    reactiveSpark(intervalMillis = 5000)
  
  output$aggregatePlot <- renderPlot({
    data <- twitterWords()
    wordcloud(
      data$word, data$total, scale = c(4,0.5),
      min.freq = 1, max.words = 30,
      colors = brewer.pal(8, "Dark2"))
  })
  
  output$labelsPlot <- renderPlot({
    data <- twitterLabels() %>%
      group_by(label) %>%
      summarize(total = n())
    
    wordcloud(
      data$label, data$total, scale = c(4,0.5),
      min.freq = 1, max.words = 30,
      colors = brewer.pal(8, "Dark2"))
  })
}

shinyApp(ui = ui, server = server)
runApp("realtime", display.mode = "showcase")
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment