Created
December 19, 2015 19:19
-
-
Save Yengas/47e59f3d6f6706d0faa2 to your computer and use it in GitHub Desktop.
Twitter stream client with raw TLS socket
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package client | |
| import ( | |
| "bufio" | |
| "crypto/tls" | |
| "encoding/json" | |
| "errors" | |
| "fmt" | |
| "net/http/httputil" | |
| "net/url" | |
| "regexp" | |
| "strconv" | |
| "strings" | |
| "time" | |
| "github.com/garyburd/go-oauth/oauth" | |
| ) | |
| var ( | |
| // ErrInvalidResponse is returned when server responsed with a malformed first line | |
| ErrInvalidResponse = errors.New("HTTP response code isn't ok") | |
| // ErrResponseNotOk ia returned when server responds with an response code other than 200 | |
| ErrResponseNotOk = errors.New("Response code isn't 200") | |
| ) | |
| // Key struct is used to store your twitter credentials. | |
| type Key struct { | |
| Key, Secret, Token, TokenSecret string | |
| } | |
| // Stream is created by Client's Track method and is an instance of an streaming api connection between you and twitter. | |
| type Stream struct { | |
| tracks []string | |
| credentials Key | |
| connection *tls.Conn | |
| killed bool | |
| responseChannel chan []byte | |
| // For each newly created tweets/statuses | |
| Tweets chan Tweet | |
| // For any encounters occured during the initialization or reading period of the stream. | |
| Err chan error | |
| // For when your current api limits can't handle the filtered stream you're following. | |
| Limits chan Limit | |
| // For when you're not processing tweets fast enough | |
| Warnings chan Stall | |
| // For each deleted statuses/tweets | |
| Deletes chan DeletedStatus | |
| } | |
| // Limit notice is given when your filtered stream matched more tweets than your current limit allows. | |
| type Limit struct { | |
| Track string `json:"track"` | |
| } | |
| type limitMessage struct { | |
| Limit Limit `json:"limit"` | |
| } | |
| // DeletedStatus is given when an user deletes his/her status. | |
| type DeletedStatus struct { | |
| IDStr string `json:"id_str"` | |
| UserIDStr string `json:"user_id_str"` | |
| } | |
| type deleteMessage struct { | |
| Delete struct { | |
| Status DeletedStatus `json:"status"` | |
| } `json:"delete"` | |
| } | |
| // Disconnect is given when your stream is closed by twitter. | |
| type Disconnect struct { | |
| Code int `json:"code"` | |
| StreamName string `json:"stream_name"` | |
| Reason string `json:"reason"` | |
| } | |
| type disconnectMessage struct { | |
| Disconnect Disconnect `json:"disconnect"` | |
| } | |
| // Stall is given when you fall behind the twitter queue. | |
| type Stall struct { | |
| Code string `json:"code"` | |
| Message string `json:"message"` | |
| PercentFUll int `json:"percent_full"` | |
| } | |
| type stallMessage struct { | |
| Warning Stall `json:"warning"` | |
| } | |
| // HashTag struct represents an instance of a hashtag in a tweet | |
| type HashTag struct { | |
| Text string `json:"text"` | |
| // Where the hashtag was present in the text | |
| Indices []int `json:"indices"` | |
| } | |
| // Mention is an instance of an user mention in a tweet | |
| type Mention struct { | |
| ScreenName string `json:"screen_name"` | |
| Name string `json:"name"` | |
| IDStr string `json:"id_str"` | |
| // Where the mention was present in the text | |
| Indicies []int `json:"indices"` | |
| } | |
| // Entity holds HashTags, Mentions and other entity information for a tweet | |
| type Entity struct { | |
| HashTags []HashTag `json:"hashtags"` | |
| Mentions []Mention `json:"user_mentions"` | |
| } | |
| // RubyDate is the format used by twitter on stream data | |
| type RubyDate time.Time | |
| // UnmarshalJSON converts a given string into RubyDate | |
| func (rubyDate *RubyDate) UnmarshalJSON(data []byte) error { | |
| if rubyDate == nil { | |
| return errors.New("Cannot set an nil reference") | |
| } | |
| line := strings.Trim(string(data), `"`) | |
| date, err := time.Parse(time.RubyDate, line) | |
| if err != nil { | |
| return err | |
| } | |
| *rubyDate = RubyDate(date) | |
| return err | |
| } | |
| // Tweet is an instance of a response, returned from twitter streaming api. | |
| type Tweet struct { | |
| IDStr string `json:"id_str"` | |
| Created RubyDate `json:"created_at"` | |
| Text string `json:"text"` | |
| Source string `json:"source"` | |
| Truncated bool `json:"truncated"` | |
| ReplyStatusIDStr string `json:"in_reply_to_status_id_str"` | |
| ReplyUserIDStr string `json:"in_reply_to_user_id_str"` | |
| ReplyScreenName string `json:"in_reply_to_screen_name"` | |
| Timestamp string `json:"timestamp_ms"` | |
| Tags Entity `json:"entities"` | |
| } | |
| // Client holds the | |
| type Client struct { | |
| credentials Key | |
| connection *tls.Conn | |
| uri *url.URL | |
| } | |
| // NewClient creates a new streaming api client and returns it... | |
| func NewClient(credentials Key) *Client { | |
| client := Client{credentials: credentials} | |
| return &client | |
| } | |
| // Utility funcs START | |
| func parseTweet(line []byte) (Tweet, error) { | |
| var tweet Tweet | |
| err := json.Unmarshal(line, &tweet) | |
| return tweet, err | |
| } | |
| func parseDeletion(line []byte) (DeletedStatus, error) { | |
| var message deleteMessage | |
| err := json.Unmarshal(line, &message) | |
| if err != nil { | |
| return message.Delete.Status, err | |
| } | |
| return message.Delete.Status, nil | |
| } | |
| func parseLimit(line []byte) (Limit, error) { | |
| var message limitMessage | |
| err := json.Unmarshal(line, &message) | |
| if err != nil { | |
| return message.Limit, err | |
| } | |
| return message.Limit, nil | |
| } | |
| func parseStall(line []byte) (Stall, error) { | |
| var message stallMessage | |
| err := json.Unmarshal(line, &message) | |
| if err != nil { | |
| return message.Warning, err | |
| } | |
| return message.Warning, nil | |
| } | |
| func parseDisconnect(line []byte) (Disconnect, error) { | |
| var message disconnectMessage | |
| err := json.Unmarshal(line, &message) | |
| if err != nil { | |
| return message.Disconnect, err | |
| } | |
| return message.Disconnect, nil | |
| } | |
| func validateResponse(reader *bufio.Reader) (map[string]string, error) { | |
| line, err := reader.ReadBytes('\n') | |
| if err != nil { | |
| return nil, err | |
| } | |
| headers := map[string]string{} | |
| r, _ := regexp.Compile("1 ([0-9]+) ") | |
| match := r.FindStringSubmatch(string(line)) | |
| if len(match) < 1 { | |
| return nil, ErrInvalidResponse | |
| } | |
| if match[1] != "200" { | |
| headers["Code"] = match[1] | |
| return headers, ErrResponseNotOk | |
| } | |
| header, err := reader.ReadBytes('\n') | |
| for err == nil && strings.TrimSpace(string(header)) != "" { | |
| h := strings.Split(string(header), ":") | |
| headers[h[0]] = strings.TrimSpace(h[1]) | |
| header, err = reader.ReadBytes('\n') | |
| } | |
| return headers, nil | |
| } | |
| func startsWith(body []byte, substr string) bool { | |
| if len(body) < len(substr) { | |
| return false | |
| } | |
| offset := 0 | |
| for i := 0; i < len(substr); i++ { | |
| index := i + offset | |
| if body[index] == 10 || body[index] == 13 { | |
| offset++ | |
| i-- | |
| continue | |
| } | |
| if body[index] != byte(substr[i]) { | |
| return false | |
| } | |
| } | |
| return true | |
| } | |
| // Utility funcs END | |
| // Parses the responses gotten from the twitter stream api from a line of bytes to tweet struct until the stream is closed. | |
| func (stream *Stream) parse() { | |
| for line := range stream.responseChannel { | |
| if startsWith(line, `{"created_at"`) { | |
| tweet, err := parseTweet(line) | |
| if err == nil { | |
| //select { | |
| /*case */ stream.Tweets <- tweet //: | |
| //default: | |
| //} | |
| } | |
| } else if startsWith(line, `{"delete"`) { | |
| status, err := parseDeletion(line) | |
| if err == nil { | |
| select { | |
| case stream.Deletes <- status: | |
| default: | |
| } | |
| } | |
| } else if startsWith(line, `{"disconnect"`) { | |
| disconnect, err := parseDisconnect(line) | |
| if err == nil { | |
| stream.Err <- fmt.Errorf("Disconnected from tw stream api. Code: %d, Reason: \"%s\"", disconnect.Code, disconnect.Reason) | |
| } | |
| } else if startsWith(line, `{"warning"`) { | |
| warning, err := parseStall(line) | |
| if err == nil { | |
| select { | |
| case stream.Warnings <- warning: | |
| default: | |
| } | |
| } | |
| } else if startsWith(line, `{"limit"`) { | |
| limit, err := parseLimit(line) | |
| if err == nil { | |
| select { | |
| case stream.Limits <- limit: | |
| default: | |
| fmt.Println("Didn't send limit.") | |
| } | |
| } | |
| } | |
| } | |
| } | |
| // Close method closes the channels and tcp connection between twitter. | |
| func (stream *Stream) Close() { | |
| stream.killed = true | |
| stream.connection.Close() | |
| close(stream.Tweets) | |
| close(stream.Err) | |
| } | |
| // Initializes the connection between twitter/client and then starts the routines for reading and parsing the tweets. | |
| func (stream *Stream) initialize() { | |
| uri, err := url.Parse("https://stream.twitter.com/1.1/statuses/filter.json") | |
| // Create the form body for the request | |
| query := uri.Query() | |
| query.Add("track", strings.Join(stream.tracks, ",")) | |
| config := tls.Config{InsecureSkipVerify: true} | |
| oauthClient := &oauth.Client{ | |
| Credentials: oauth.Credentials{ | |
| Token: stream.credentials.Key, | |
| Secret: stream.credentials.Secret, | |
| }, | |
| SignatureMethod: oauth.HMACSHA1, | |
| } | |
| c := &oauth.Credentials{ | |
| Token: stream.credentials.Token, | |
| Secret: stream.credentials.TokenSecret, | |
| } | |
| headers := map[string]string{ | |
| "Content-Type": "application/x-www-form-urlencoded", | |
| "Authorization": oauthClient.AuthorizationHeader(c, "POST", uri, query), | |
| "Content-Length": strconv.Itoa(len(query.Encode()))} | |
| connection, err := tls.Dial("tcp", uri.Host+":443", &config) | |
| if err != nil { | |
| stream.Err <- err | |
| return | |
| } | |
| stream.connection = connection | |
| fmt.Fprintf(connection, "POST /1.1/statuses/filter.json HTTP/1.1\r\n") | |
| fmt.Fprintf(connection, "%s: %s\r\n", "Host", uri.Host) | |
| for header, value := range headers { | |
| fmt.Fprintf(connection, "%s: %s\r\n", header, value) | |
| } | |
| // One new line before the post body | |
| fmt.Fprintf(connection, "\r\n") | |
| fmt.Fprintf(connection, "%s", query.Encode()) | |
| // Start the reading & parsing of the stream api. | |
| go stream.parse() | |
| go stream.read() | |
| } | |
| // read continously reads the twitter streaming api | |
| func (stream *Stream) read() error { | |
| reader := bufio.NewReader(stream.connection) | |
| _, err := validateResponse(reader) | |
| if err != nil { | |
| stream.Err <- err | |
| return err | |
| } | |
| /* Twitter stream api uses chunked encoding so we need an chunkedreader from httputil. | |
| We then wrap it with a bufio reader to have the utility of ReadBytes(chr) */ | |
| chunkedReader := bufio.NewReader(httputil.NewChunkedReader(reader)) | |
| for { | |
| line, err := chunkedReader.ReadBytes('\r') | |
| if stream.killed { | |
| // Make sure the routine dies after the user calls stream.Close | |
| return nil | |
| } | |
| if err != nil { | |
| stream.Err <- err | |
| return err | |
| } | |
| stream.responseChannel <- line | |
| } | |
| } | |
| // Track creates a new connection with the credentials held in Client | |
| func (client *Client) Track(tracks ...string) *Stream { | |
| stream := Stream{ | |
| credentials: client.credentials, | |
| tracks: tracks, | |
| Tweets: make(chan Tweet), | |
| Deletes: make(chan DeletedStatus), | |
| Warnings: make(chan Stall), | |
| Limits: make(chan Limit), | |
| Err: make(chan error), | |
| responseChannel: make(chan []byte), | |
| } | |
| go stream.initialize() | |
| return &stream | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "fmt" | |
| "time" | |
| "bitbucket.com/polldyme/services/tracker/client" | |
| ) | |
| func main() { | |
| fmt.Println("Hello, World!") | |
| k := client.Key{ | |
| Key: "xxxxxx", | |
| Secret: "xxxxx", | |
| Token: "xxxxx-xxx", | |
| TokenSecret: "xxxxxx", | |
| } | |
| c := client.NewClient(k) | |
| // todo: stream `type Stream struct` olsun | |
| // | |
| // note: Client'a doğrudan eklemek yerine böyle bir type yapalım istedim çünkü | |
| // bu client'a Track() dışında api methodları eklersek diğer kullanımlarımız | |
| // için böyle daha düzenli ve anlamlı olacak | |
| // | |
| // todo: client.Track() çalıştıldığında önceki track'le birlikte oluşturulan | |
| // Tweet ve Err channelları close() ile kapatılsın ki for select döngümüz | |
| // sonlanabilsin. | |
| // bu durumda Client içinde currentStream adlı unexported bir alanımız olmalı | |
| stream := c.Track("justin", "obama", "abd", "recep") | |
| for { | |
| select { | |
| case tweet := <-stream.Tweets: | |
| if tweet.IDStr == "" { | |
| fmt.Println("not a valid tweet") | |
| } | |
| fmt.Printf("Gotten tweet idstr is: %s @%s\n", tweet.IDStr, time.Time(tweet.Created).String()) | |
| time.Sleep(time.Second) | |
| case err := <-stream.Err: | |
| // err could be about network issues or invalid keys or status != 200 | |
| // or disconnect by twitter | |
| if err != nil { | |
| // time.Sleep(time.Second * 20) | |
| // bu aşamada kendimi tekrar Track()'i çalıştıracağız yani kelimelerin | |
| // client'da saklanmasına gerek yok bence app içinde tutarız onları | |
| // return | |
| } | |
| fmt.Println("Error recieved:") | |
| fmt.Println(err) | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment