https://kafka.apache.org/quickstart
wget http://apache.claz.org/kafka/2.1.0/kafka_2.12-2.1.0.tgz
tar -xzf kafka_2.12-2.1.0.tgz
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --zookeeper localhost:2181
http://dev.twitter.com
sudo ln -s /usr/lib/gcc/x86_64-amazon-linux/6.4.1/libgomp.spec /usr/lib64/libgomp.spec
sudo ln -s /usr/lib/gcc/x86_64-amazon-linux/6.4.1/libgomp.a /usr/lib64/libgomp.a
sudo ln -s /usr/lib64/libgomp.so.1.0.0 /usr/lib64/libgomp.so
R CMD javareconf -e
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
install.packages("rJava")
install.packages("rtweet")
install.packages("rkafka")
rtweet::create_token("appname", "", "", "", "")
producer <- rkafka::rkafka.createProducer("localhost:9092", "sync")
rkafka::rkafka.send(producer, "test", "localhost", "hello")
produce_tweets <- function(search, producer, topic, last = data.frame(), iters = 1) {
if (iters <= 0) return(last)
last_id <- last[nrow(last),]$status_id
tweets <- rtweet::search_tweets(paste(search, "filter:twimg"), max_id = last_id)
tweets <- tweets[!identical(tweets$status_id, last_id),]
if (nrow(tweets) > 0) {
tweets_subset <- tweets %>% select(
created_at, screen_name, text, is_retweet, favorite_count, retweet_count, media_url, location
)
for (idx in 1:nrow(tweets_subset)) {
capture.output(rkafka::rkafka.send(producer, topic, "localhost", jsonlite::toJSON(tweets_subset[idx,])))
Sys.sleep(0.1)
}
}
produce_tweets(search, producer, topic, tweets, iters - 1)
}
produce_tweets("sparklyr", producer, "test")
install.packages("googleAuthR")
devtools::install_github("javierluraschi/RoogleVision")
googleAuthR::gar_auth_service(scope = "https://www.googleapis.com/auth/cloud-platform", json_file="cloudml.json")
image_response <- RoogleVision::getGoogleVisionResponse("https://media-cdn.tripadvisor.com/media/photo-s/02/6b/c2/19/filename-48842881-jpg.jpg")
library(shiny)
library(sparklyr)
library(dplyr)
config <- spark_config()
config$sparklyr.shell.packages <- "org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2"
config$sparklyr.shell.files <- "cloudml.json"
sc <- spark_connect(master = "yarn", config = config)
read_options <- list(kafka.bootstrap.servers = "localhost:9092", subscribe = "test")
stream <- stream_read_kafka(sc, options = read_options) %>%
stream_write_memory("stream")
tbl(sc, "stream") %>%
transmute(value = as.character(value)) %>%
transmute(screen_name = get_json_object(value, '$[0].media_url[0]')) %>% filter(nchar(screen_name) > 4)
tbl(sc, "stream") %>%
transmute(value = as.character(value)) %>%
transmute(media = get_json_object(value, '$[0].media_url[0]')) %>% filter(nchar(media) > 5) %>%
head(n=2) %>%
spark_apply(function(df, media, context) {
googleAuthR::gar_auth_service(scope = "https://www.googleapis.com/auth/cloud-platform", json_file="cloudml.json")
RoogleVision::getGoogleVisionResponse(media)
}, columns = c(media = "character", lapply(image_response, class)), group_by = "media")
tbl(sc, "stream") %>%
transmute(value = as.character(value)) %>%
transmute(text = get_json_object(value, '$[0].text')) %>%
filter(nchar(text) > 4) %>%
ft_tokenizer(input_col = "text", output_col = "words") %>%
transmute(word = explode(words)) %>%
filter(nchar(text) > 4) %>%
group_by(word) %>%
summarize(total = n()) %>%
arrange(desc(total)) %>%
head(n = 10)
library(shiny)
library(wordcloud)
library(dplyr)
image_labeler <- function(df) {
googleAuthR::gar_auth_service(
scope = "https://www.googleapis.com/auth/cloud-platform",
json_file = "cloudml.json")
RoogleVision::getGoogleVisionResponse(
df$image, download = FALSE)$description
}
ui <- fluidPage(
titlePanel("Realtime Twitter Data with Spark and R"),
mainPanel(
plotOutput("aggregatePlot"),
plotOutput("labelsPlot")
)
)
server <- function(input, output) {
twitterStream <- stream_read_kafka(
sc,
options = list(
kafka.bootstrap.servers = "localhost:9092",
subscribe = "test")) %>%
transmute(value = as.character(value))
twitterWords <- twitterStream %>%
transmute(text = get_json_object(
value, "$[0].text")) %>%
filter(nchar(text) > 4) %>%
ft_tokenizer("text", "split") %>%
ft_stop_words_remover("split", "words") %>%
transmute(word = explode(words)) %>%
group_by(word) %>%
summarize(total = n()) %>%
reactiveSpark(intervalMillis = 5000)
twitterLabels <- twitterStream %>%
transmute(image = get_json_object(
value, "$[0].media_url[0]")) %>%
filter(nchar(image) > 5) %>%
spark_apply(
image_labeler,
columns = list(label = "character")) %>%
reactiveSpark(intervalMillis = 5000)
output$aggregatePlot <- renderPlot({
data <- twitterWords()
wordcloud(
data$word, data$total, scale = c(4,0.5),
min.freq = 1, max.words = 30,
colors = brewer.pal(8, "Dark2"))
})
output$labelsPlot <- renderPlot({
data <- twitterLabels() %>%
group_by(label) %>%
summarize(total = n())
wordcloud(
data$label, data$total, scale = c(4,0.5),
min.freq = 1, max.words = 30,
colors = brewer.pal(8, "Dark2"))
})
}
shinyApp(ui = ui, server = server)
runApp("realtime", display.mode = "showcase")
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test