Last active
March 24, 2019 11:24
-
-
Save kriskbx/bc6ab4ef938fe458e834 to your computer and use it in GitHub Desktop.
Ruby POS58 Tweetprinter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
# -*- encoding : utf-8 -*- | |
# A simple script to search for tweets that contain certain hashtags and print them on a POS58 printer. | |
# Make sure the printer is connected and you can print by writing to /dev/usb/lp0 (or similar) | |
# | |
# Run `ruby ruby_pos58_tweetprinter.rb` for using the streaming API: instant printing. | |
# Run `ruby ruby_pos58_tweetprinter.rb get` for using the good old REST API. You can put this in a cronjob. | |
# | |
# Copyright (c) 2015 http://github.com/kriskbx | |
# Licensed under the MIT License (MIT) http://opensource.org/licenses/MIT | |
require 'rubygems' | |
require 'sqlite3' | |
require 'twitter' | |
require 'date' | |
require 'time' | |
require 'timeout' | |
require 'RMagick' | |
require 'escper' | |
include Magick | |
## | |
## POS Printer | |
## | |
class POSPrinter | |
# Init | |
def initialize( port = '/dev/usb/lp0' ) | |
@port = port | |
@asciify = Escper::Asciifier.new | |
end | |
def out( s ) | |
File.open( @port,'w' ) { |f| f.write s } | |
end | |
# Print out a string | |
def print( s ) | |
out( @asciify.process( s ) ); | |
end | |
# Bold start | |
def b | |
out( "\x1b\x45\x01" ) | |
end | |
# Bold end | |
def be | |
out( "\x1b\x45\x00" ) | |
end | |
# New line | |
def nl | |
out( "\n" ) | |
end | |
# Image | |
def img( path ) | |
i = Image.read( path ).first | |
i.change_geometry!('380x500') { |cols, rows, img| | |
img.resize!(cols, rows) | |
} | |
i.quantize(256, Magick::GRAYColorspace) | |
out( Escper::Img.new( i, :obj ).to_s ) | |
nl | |
nl | |
end | |
end | |
## | |
## Print my Tweets | |
## | |
class POSTwitter | |
DBFILE = 'database.sqlite' | |
DBNAME = 'twitter' | |
@@streaming = true | |
# Init | |
def initialize( consumer_key, consumer_secret, access_token, access_token_secret, hashtags ) | |
@consumer_key = consumer_key | |
@consumer_secret = consumer_secret | |
@access_token = access_token | |
@access_token_secret = access_token_secret | |
@hashtags = hashtags | |
@PR = POSPrinter.new() | |
initDB | |
ARGV.each do|a| | |
if a.include? "get" | |
@@streaming = false | |
end | |
end | |
initTwitter | |
getTweets | |
end | |
## | |
## DB Stuff | |
## | |
# Init DB | |
def initDB | |
@DB = SQLite3::Database.new( DBFILE ) | |
createDB | |
end | |
# Create DB | |
def createDB | |
@DB.execute( "CREATE TABLE IF NOT EXISTS #{DBNAME} (id INTEGER PRIMARY KEY,username TEXT,message TEXT,datetime DATETIME,picture TEXT)" ) | |
end | |
# Read last entry | |
def last | |
res = @DB.execute( "SELECT id FROM #{DBNAME} ORDER BY datetime DESC LIMIT 1" ) | |
return res | |
end | |
# Insert entry | |
def insert( entry_id, username, message, datetime, picture = '' ) | |
res = @DB.execute( "INSERT INTO #{DBNAME} (id,username,message,datetime,picture) VALUES (?,?,?,?,?)", entry_id, username, message, datetime, picture ) | |
return res | |
end | |
# Entry exists? | |
def exists( entry_id ) | |
res = @DB.execute( "SELECT id FROM #{DBNAME} WHERE id = ? LIMIT 1", entry_id ) | |
return !res.empty? | |
end | |
## | |
## Twitter Stuff | |
## | |
# Init Twitter | |
def initTwitter | |
if @@streaming | |
@TW = Twitter::Streaming::Client.new do |config| | |
config.consumer_key = @consumer_key | |
config.consumer_secret = @consumer_secret | |
config.access_token = @access_token | |
config.access_token_secret = @access_token_secret | |
end | |
else | |
@TW = Twitter::REST::Client.new do |config| | |
config.consumer_key = @consumer_key | |
config.consumer_secret = @consumer_secret | |
end | |
end | |
end | |
# Get and save Tweets | |
def getTweets | |
if @@streaming | |
puts "#" + DateTime.now.to_s + " - STREAMING TWEETS ..." | |
@TW.filter( track: @hashtags.join(",") ) do |tweet| | |
if tweet.is_a?(Twitter::Tweet) | |
processTweet( tweet ) | |
end | |
end | |
else | |
puts "#" + DateTime.now.to_s + " - SEARCHING TWEETS ..." | |
@TW.search( @hashtags * " ", { result_type: "recent", since_id: last ? last : "", count: 20 } ).each do |tweet| | |
processTweet( tweet ) | |
end | |
end | |
end | |
# Process Tweet | |
def processTweet( tweet ) | |
if !exists( tweet.id ) | |
n_tweet = { :id => tweet.id, :username => tweet.user.screen_name, :message => tweet.text, :datetime => tweet.created_at.to_s, :picture => ( tweet.media.first ? tweet.media.first.media_url.to_s : "" ) } | |
continueProcess = true | |
@hashtags.each do|h| | |
if !n_tweet[:message].include? h | |
continueProcess = false | |
end | |
end | |
if continueProcess | |
puts "--------------------------------------------------------------" | |
puts "##{n_tweet[:datetime]} - #{n_tweet[:username]}: #{n_tweet[:message]}" | |
puts "#" + DateTime.now.to_s + " - PRINTING ..." | |
printTweet( n_tweet ) | |
puts "#" + DateTime.now.to_s + " - SAVING ..." | |
insert( n_tweet[:id], n_tweet[:username], n_tweet[:message], n_tweet[:datetime], n_tweet[:picture] ) | |
puts "--------------------------------------------------------------" | |
end | |
end | |
end | |
# Print Tweet | |
def printTweet( tweet ) | |
@PR.b | |
@PR.print( "#{tweet[:username]}\n" ) | |
@PR.be | |
@PR.print( "#{tweet[:message]}\n" ) | |
if !tweet[:picture].index('http').nil? | |
puts "#" + DateTime.now.to_s + " - IMAGE RESAMPLING ..." | |
@PR.img( tweet[:picture] ) | |
end | |
@PR.nl | |
@PR.nl | |
end | |
end | |
# Instance | |
twitter = POSTwitter.new( "comsumer_key", "consumer_secret", "access_token", "access_token_secret", ['printmytweet'] ); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
that looks pretty awesome - do you think you could replicate that for instagram hashtags?