Skip to content

Instantly share code, notes, and snippets.

@alea12
Last active December 27, 2015 05:19
Show Gist options
  • Save alea12/6a2d4062800a251f9f2b to your computer and use it in GitHub Desktop.
Save alea12/6a2d4062800a251f9f2b to your computer and use it in GitHub Desktop.
twitter_to_bq.rb
require 'google/api_client'
require 'twitter'
require 'pp'
class BigQuery
APP_NAME = 'https://twitter.com/9m'
TABLE_ID = 'timeline'
SCHEMA_FIELDS = [
{
name: 'status_id',
type: 'INTEGER'
},
{
name: 'screen_name',
type: 'STRING'
},
{
name: 'text',
type: 'STRING'
},
{
name: 'created_at',
type: 'TIMESTAMP'
},
{
name: 'json',
type: 'STRING'
}
]
def initialize(opts = {})
@client = Google::APIClient.new(
application_name: APP_NAME,
application_version: '0.1'
)
key = Google::APIClient::PKCS12.load_key(File.open(
opts[:key], mode: 'rb'),
"notasecret"
)
@asserter = Google::APIClient::JWTAsserter.new(
opts[:service_email],
"https://www.googleapis.com/auth/bigquery",
key
)
@client.authorization = @asserter.authorize
@bq = @client.discovered_api("bigquery", "v2")
@project_id = opts[:project_id]
@dataset = opts[:dataset]
@table_id = TABLE_ID
end
def create_table
response = @client.execute({
:api_method => @bq.tables.insert,
:parameters => {
'projectId' => @project_id,
'datasetId' => @dataset
},
:body_object => {
'tableReference' => {
'projectId' => @project_id,
'datasetId' => @dataset,
'tableId' => TABLE_ID
},
'schema' => {
'fields' => SCHEMA_FIELDS
}
}
})
JSON.parse(response.body)
end
def insert(rows)
rows = rows.map do |row|
row[:created_at] = format_timestamp(row[:created_at])
{
'json' => row
}
end
response = @client.execute({
:api_method => @bq.tabledata.insert_all,
:parameters => {
'projectId' => @project_id,
'datasetId' => @dataset,
'tableId' => @table_id,
},
:body_object => {
"rows" => rows
}
})
JSON.parse(response.body)
end
private
def format_timestamp(time)
time.strftime("%Y-%m-%d %H:%M:%S")
end
end
class Timeline
def initialize(bq)
@bq = bq
@rows = []
end
def client
@client ||= Twitter::Streaming::Client.new do |config|
config.consumer_key = ''
config.consumer_secret = ''
config.access_token = ''
config.access_token_secret = ''
end
end
def row_object(object)
{
status_id: object.id,
screen_name: object.user.screen_name,
text: fulltext_object(object),
created_at: object.created_at,
json: object.to_h.to_json
}
end
def fulltext_object(object)
''.tap do |text|
text << object.text
if object.entities?
object.uris.each do |url|
text = text.gsub(url.url.to_s, url.expanded_url.to_s)
end
object.media.each do |media|
text = text.gsub(media.url.to_s, media.expanded_url.to_s)
end
end
end
end
def run
client.user do |object|
next unless object.is_a? Twitter::Tweet
@rows << row_object(object)
if @rows.size > 10
pp @bq.insert(@rows)
@rows = []
end
end
rescue => e
@bq.insert(@rows)
raise e
end
end
bq = BigQuery.new({
client_id: '',
service_email: '',
key: '',
project_id: '',
dataset: '',
})
# スキーマ作成(作成済みならエラーメッセージが返るだけ)
pp bq.create_table
Timeline.new(bq).run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment