Skip to content

Instantly share code, notes, and snippets.

@Yengas
Created December 19, 2015 19:19
Show Gist options
  • Select an option

  • Save Yengas/47e59f3d6f6706d0faa2 to your computer and use it in GitHub Desktop.

Select an option

Save Yengas/47e59f3d6f6706d0faa2 to your computer and use it in GitHub Desktop.
Twitter stream client with raw TLS socket
package client
import (
"bufio"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"net/http/httputil"
"net/url"
"regexp"
"strconv"
"strings"
"time"
"github.com/garyburd/go-oauth/oauth"
)
var (
// ErrInvalidResponse is returned when server responsed with a malformed first line
ErrInvalidResponse = errors.New("HTTP response code isn't ok")
// ErrResponseNotOk ia returned when server responds with an response code other than 200
ErrResponseNotOk = errors.New("Response code isn't 200")
)
// Key struct is used to store your twitter credentials.
type Key struct {
Key, Secret, Token, TokenSecret string
}
// Stream is created by Client's Track method and is an instance of an streaming api connection between you and twitter.
type Stream struct {
tracks []string
credentials Key
connection *tls.Conn
killed bool
responseChannel chan []byte
// For each newly created tweets/statuses
Tweets chan Tweet
// For any encounters occured during the initialization or reading period of the stream.
Err chan error
// For when your current api limits can't handle the filtered stream you're following.
Limits chan Limit
// For when you're not processing tweets fast enough
Warnings chan Stall
// For each deleted statuses/tweets
Deletes chan DeletedStatus
}
// Limit notice is given when your filtered stream matched more tweets than your current limit allows.
type Limit struct {
Track string `json:"track"`
}
type limitMessage struct {
Limit Limit `json:"limit"`
}
// DeletedStatus is given when an user deletes his/her status.
type DeletedStatus struct {
IDStr string `json:"id_str"`
UserIDStr string `json:"user_id_str"`
}
type deleteMessage struct {
Delete struct {
Status DeletedStatus `json:"status"`
} `json:"delete"`
}
// Disconnect is given when your stream is closed by twitter.
type Disconnect struct {
Code int `json:"code"`
StreamName string `json:"stream_name"`
Reason string `json:"reason"`
}
type disconnectMessage struct {
Disconnect Disconnect `json:"disconnect"`
}
// Stall is given when you fall behind the twitter queue.
type Stall struct {
Code string `json:"code"`
Message string `json:"message"`
PercentFUll int `json:"percent_full"`
}
type stallMessage struct {
Warning Stall `json:"warning"`
}
// HashTag struct represents an instance of a hashtag in a tweet
type HashTag struct {
Text string `json:"text"`
// Where the hashtag was present in the text
Indices []int `json:"indices"`
}
// Mention is an instance of an user mention in a tweet
type Mention struct {
ScreenName string `json:"screen_name"`
Name string `json:"name"`
IDStr string `json:"id_str"`
// Where the mention was present in the text
Indicies []int `json:"indices"`
}
// Entity holds HashTags, Mentions and other entity information for a tweet
type Entity struct {
HashTags []HashTag `json:"hashtags"`
Mentions []Mention `json:"user_mentions"`
}
// RubyDate is the format used by twitter on stream data
type RubyDate time.Time
// UnmarshalJSON converts a given string into RubyDate
func (rubyDate *RubyDate) UnmarshalJSON(data []byte) error {
if rubyDate == nil {
return errors.New("Cannot set an nil reference")
}
line := strings.Trim(string(data), `"`)
date, err := time.Parse(time.RubyDate, line)
if err != nil {
return err
}
*rubyDate = RubyDate(date)
return err
}
// Tweet is an instance of a response, returned from twitter streaming api.
type Tweet struct {
IDStr string `json:"id_str"`
Created RubyDate `json:"created_at"`
Text string `json:"text"`
Source string `json:"source"`
Truncated bool `json:"truncated"`
ReplyStatusIDStr string `json:"in_reply_to_status_id_str"`
ReplyUserIDStr string `json:"in_reply_to_user_id_str"`
ReplyScreenName string `json:"in_reply_to_screen_name"`
Timestamp string `json:"timestamp_ms"`
Tags Entity `json:"entities"`
}
// Client holds the
type Client struct {
credentials Key
connection *tls.Conn
uri *url.URL
}
// NewClient creates a new streaming api client and returns it...
func NewClient(credentials Key) *Client {
client := Client{credentials: credentials}
return &client
}
// Utility funcs START
func parseTweet(line []byte) (Tweet, error) {
var tweet Tweet
err := json.Unmarshal(line, &tweet)
return tweet, err
}
func parseDeletion(line []byte) (DeletedStatus, error) {
var message deleteMessage
err := json.Unmarshal(line, &message)
if err != nil {
return message.Delete.Status, err
}
return message.Delete.Status, nil
}
func parseLimit(line []byte) (Limit, error) {
var message limitMessage
err := json.Unmarshal(line, &message)
if err != nil {
return message.Limit, err
}
return message.Limit, nil
}
func parseStall(line []byte) (Stall, error) {
var message stallMessage
err := json.Unmarshal(line, &message)
if err != nil {
return message.Warning, err
}
return message.Warning, nil
}
func parseDisconnect(line []byte) (Disconnect, error) {
var message disconnectMessage
err := json.Unmarshal(line, &message)
if err != nil {
return message.Disconnect, err
}
return message.Disconnect, nil
}
func validateResponse(reader *bufio.Reader) (map[string]string, error) {
line, err := reader.ReadBytes('\n')
if err != nil {
return nil, err
}
headers := map[string]string{}
r, _ := regexp.Compile("1 ([0-9]+) ")
match := r.FindStringSubmatch(string(line))
if len(match) < 1 {
return nil, ErrInvalidResponse
}
if match[1] != "200" {
headers["Code"] = match[1]
return headers, ErrResponseNotOk
}
header, err := reader.ReadBytes('\n')
for err == nil && strings.TrimSpace(string(header)) != "" {
h := strings.Split(string(header), ":")
headers[h[0]] = strings.TrimSpace(h[1])
header, err = reader.ReadBytes('\n')
}
return headers, nil
}
func startsWith(body []byte, substr string) bool {
if len(body) < len(substr) {
return false
}
offset := 0
for i := 0; i < len(substr); i++ {
index := i + offset
if body[index] == 10 || body[index] == 13 {
offset++
i--
continue
}
if body[index] != byte(substr[i]) {
return false
}
}
return true
}
// Utility funcs END
// Parses the responses gotten from the twitter stream api from a line of bytes to tweet struct until the stream is closed.
func (stream *Stream) parse() {
for line := range stream.responseChannel {
if startsWith(line, `{"created_at"`) {
tweet, err := parseTweet(line)
if err == nil {
//select {
/*case */ stream.Tweets <- tweet //:
//default:
//}
}
} else if startsWith(line, `{"delete"`) {
status, err := parseDeletion(line)
if err == nil {
select {
case stream.Deletes <- status:
default:
}
}
} else if startsWith(line, `{"disconnect"`) {
disconnect, err := parseDisconnect(line)
if err == nil {
stream.Err <- fmt.Errorf("Disconnected from tw stream api. Code: %d, Reason: \"%s\"", disconnect.Code, disconnect.Reason)
}
} else if startsWith(line, `{"warning"`) {
warning, err := parseStall(line)
if err == nil {
select {
case stream.Warnings <- warning:
default:
}
}
} else if startsWith(line, `{"limit"`) {
limit, err := parseLimit(line)
if err == nil {
select {
case stream.Limits <- limit:
default:
fmt.Println("Didn't send limit.")
}
}
}
}
}
// Close method closes the channels and tcp connection between twitter.
func (stream *Stream) Close() {
stream.killed = true
stream.connection.Close()
close(stream.Tweets)
close(stream.Err)
}
// Initializes the connection between twitter/client and then starts the routines for reading and parsing the tweets.
func (stream *Stream) initialize() {
uri, err := url.Parse("https://stream.twitter.com/1.1/statuses/filter.json")
// Create the form body for the request
query := uri.Query()
query.Add("track", strings.Join(stream.tracks, ","))
config := tls.Config{InsecureSkipVerify: true}
oauthClient := &oauth.Client{
Credentials: oauth.Credentials{
Token: stream.credentials.Key,
Secret: stream.credentials.Secret,
},
SignatureMethod: oauth.HMACSHA1,
}
c := &oauth.Credentials{
Token: stream.credentials.Token,
Secret: stream.credentials.TokenSecret,
}
headers := map[string]string{
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": oauthClient.AuthorizationHeader(c, "POST", uri, query),
"Content-Length": strconv.Itoa(len(query.Encode()))}
connection, err := tls.Dial("tcp", uri.Host+":443", &config)
if err != nil {
stream.Err <- err
return
}
stream.connection = connection
fmt.Fprintf(connection, "POST /1.1/statuses/filter.json HTTP/1.1\r\n")
fmt.Fprintf(connection, "%s: %s\r\n", "Host", uri.Host)
for header, value := range headers {
fmt.Fprintf(connection, "%s: %s\r\n", header, value)
}
// One new line before the post body
fmt.Fprintf(connection, "\r\n")
fmt.Fprintf(connection, "%s", query.Encode())
// Start the reading & parsing of the stream api.
go stream.parse()
go stream.read()
}
// read continously reads the twitter streaming api
func (stream *Stream) read() error {
reader := bufio.NewReader(stream.connection)
_, err := validateResponse(reader)
if err != nil {
stream.Err <- err
return err
}
/* Twitter stream api uses chunked encoding so we need an chunkedreader from httputil.
We then wrap it with a bufio reader to have the utility of ReadBytes(chr) */
chunkedReader := bufio.NewReader(httputil.NewChunkedReader(reader))
for {
line, err := chunkedReader.ReadBytes('\r')
if stream.killed {
// Make sure the routine dies after the user calls stream.Close
return nil
}
if err != nil {
stream.Err <- err
return err
}
stream.responseChannel <- line
}
}
// Track creates a new connection with the credentials held in Client
func (client *Client) Track(tracks ...string) *Stream {
stream := Stream{
credentials: client.credentials,
tracks: tracks,
Tweets: make(chan Tweet),
Deletes: make(chan DeletedStatus),
Warnings: make(chan Stall),
Limits: make(chan Limit),
Err: make(chan error),
responseChannel: make(chan []byte),
}
go stream.initialize()
return &stream
}
package main
import (
"fmt"
"time"
"bitbucket.com/polldyme/services/tracker/client"
)
func main() {
fmt.Println("Hello, World!")
k := client.Key{
Key: "xxxxxx",
Secret: "xxxxx",
Token: "xxxxx-xxx",
TokenSecret: "xxxxxx",
}
c := client.NewClient(k)
// todo: stream `type Stream struct` olsun
//
// note: Client'a doğrudan eklemek yerine böyle bir type yapalım istedim çünkü
// bu client'a Track() dışında api methodları eklersek diğer kullanımlarımız
// için böyle daha düzenli ve anlamlı olacak
//
// todo: client.Track() çalıştıldığında önceki track'le birlikte oluşturulan
// Tweet ve Err channelları close() ile kapatılsın ki for select döngümüz
// sonlanabilsin.
// bu durumda Client içinde currentStream adlı unexported bir alanımız olmalı
stream := c.Track("justin", "obama", "abd", "recep")
for {
select {
case tweet := <-stream.Tweets:
if tweet.IDStr == "" {
fmt.Println("not a valid tweet")
}
fmt.Printf("Gotten tweet idstr is: %s @%s\n", tweet.IDStr, time.Time(tweet.Created).String())
time.Sleep(time.Second)
case err := <-stream.Err:
// err could be about network issues or invalid keys or status != 200
// or disconnect by twitter
if err != nil {
// time.Sleep(time.Second * 20)
// bu aşamada kendimi tekrar Track()'i çalıştıracağız yani kelimelerin
// client'da saklanmasına gerek yok bence app içinde tutarız onları
// return
}
fmt.Println("Error recieved:")
fmt.Println(err)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment