Skip to content

Instantly share code, notes, and snippets.

@hiroto-takatoshi
Last active December 18, 2017 00:08
Show Gist options
  • Save hiroto-takatoshi/2f314931a99c04c49bb844db7be2daa8 to your computer and use it in GitHub Desktop.
Save hiroto-takatoshi/2f314931a99c04c49bb844db7be2daa8 to your computer and use it in GitHub Desktop.
main topology(not exactly)
package bigdatateam.burstyword;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.task.ShellBolt;
import org.apache.storm.spout.ShellSpout;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class BurstyWordTopology {
public static class SplitSentenceBolt extends ShellBolt implements IRichBolt {
public SplitSentenceBolt() {
super("python", "split.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static class PawooSentenceSpout extends ShellSpout implements IRichSpout {
public PawooSentenceSpout() {
super("python", "spout.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static class RedisIOBolt extends ShellBolt implements IRichBolt {
public RedisIOBolt() {
super("python", "redisio.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("shit"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new PawooSentenceSpout(), 1);
builder.setBolt("split", new SplitSentenceBolt(), 1).shuffleGrouping("spout");
builder.setBolt("count", new RedisIOBolt(), 1).shuffleGrouping("split");
//conf.setDebug(true);
//String topologyName = "word-count";
//conf.setNumWorkers(3);
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
import storm
# main json formatter
import redis
# directly write to redis db for some data
import math
from scipy.special import comb
import sys
# redis word data entry format :
# A sorted set that stores Pj
# A sorted set that saves the word's numbers by the last update:
# - Last updated, a timestamp
# - n(i,j), real number
# - n'(i,j), generated number where N is normalized to 100, po * 100
# - po(i,j)
# - pg(i,j)
# - pb(i,j)
# A sorted set that saves Ni (updated in the upper bolt)
class RedisIOBolt(storm.BasicBolt):
def initialize(self, stormconf, context):
self.r = redis.Redis(host='localhost', port=6379, db=0)
pid = os.getpid()
base_path = '/var/log/takatoshi/'
logging.basicConfig(filename=base_path+__file__+str(pid)+'.log', level=logging.DEBUG)
logging.debug("abs path of py file: " + os.path.abspath(__file__))
def process(self, tup):
word = tup.values[0]
ts = tup.values[1]
t = r.get("lastupd_"+word)
if t is None:
t = 0
if t < ts:
r.getset("lastupd_"+word, ts)
r.getset("n_"+word, 1)
newpo = r.get("n_"+word) / r.get("total_"+ts)
r.getset("po_"+word, newpo)
newpj = t / ts * + 1 / ts * newpo
r.getset("pj_"+word, newpj)
newpg = comb(10,3) * (newpj ** (100 * newpo)) * ((1 - newpj) ** (100 - 100 * newpo))
r.getset("pg_"+word, newpg)
# TODO: newpb
else :
r.incr("n_"+word, 1)
newpo = r.get("n_"+word) / r.get("total_"+ts)
oldpo = r.getset("po_"+word, newpo)
if oldpo is None:
oldpo = 0
oldpj = r.get("pj_"+word)
newpj = oldpj - 1 /ts * oldpo + 1 / ts * newpo
r.getset("pj_"+word, newpj)
newpg = comb(10,3) * (newpj ** (100 * newpo)) * ((1 - newpj) ** (100 - 100 * newpo))
r.getset("pg_"+word, newpg)
RedisIOBolt.run()
# Received a whole sentence somehow, split it by MeCab
import MeCab
# for analysis
import storm
# main json formatter
import redis
# directly write to redis db for some data
import logging
class SplitSentenceBolt(storm.BasicBolt):
def initialize(self, stormconf, context):
f = open("Japanese.txt")
self.stopwords = f.read().split('\n')
f.close()
self.m = MeCab.Tagger("-Ochasen")
self.r = redis.Redis(host='localhost', port=6379, db=0)
pid = os.getpid()
base_path = '/var/log/takatoshi/'
logging.basicConfig(filename=base_path+__file__+str(pid)+'.log', level=logging.DEBUG)
logging.debug("abs path of py file: " + os.path.abspath(__file__))
def process(self, tup):
sentence = tup.values[0]
ts = tup.values[1]
res = self.m.parse(tmpstr).splitlines()[:-1]
output = set()
for _ in res:
fa = _.split('\t')
if u"名詞" in fa[3] and not fa[0] in self.stopwords:
output.add(fa[0])
if len(output):
r.incr('total_'+ts)
for _ in output:
storm.emit([_,ts])
SplitSentenceBolt().run()
# -*- coding: utf-8 -*-
# credit : https://qiita.com/kk6/items/8351a6541598cf7151ef
from bs4 import BeautifulSoup
import operator
from urllib.parse import urljoin
import json
import sys
import unicodedata
import time
import requests
# for http/2 streamjob
import storm
# main json formatter
import queue
# local queue
import logging
class MstdnStream:
"""Mastodon Steam Class
Usage::
>>> from mstdn import MstdnStream, MstdnStreamListner
>>> listener = MstdnStreamListner()
>>> stream = MstdnStream('https://pawoo.net', 'your-access-token', listener)
>>> stream.public()
"""
def __init__(self, base_url, access_token, listener):
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({'Authorization': 'Bearer ' + access_token})
self.listener = listener
def public(self):
url = urljoin(self.base_url, '/api/v1/streaming/public')
resp = self.session.get(url, stream=True)
resp.raise_for_status()
event = {}
start_time = time.time()
cnt = 0
for line in resp.iter_lines():
line = line.decode('utf-8')
if not line:
# End of content.
#cnt = cnt + 1
method_name = "on_{event}".format(event=event['event'])
f = operator.methodcaller(method_name, event['data'])
f(self.listener)
# refreash
event = {}
continue
if line.startswith(':'):
# TODO: Handle heatbeat
#print('startwith ":" {line}'.format(line=line))
#pass
pass
else:
key, value = line.split(': ', 1)
if key in event:
event[key] += value
else:
event[key] = value
class MstdnStreamListner:
def __init__(self, theq):
self.m = tagger
self.r = rserver
self.q = theq
self.ts = 1
self.start_time = time.time()
def _remove_attrs(self, soup, tag):
try:
for _ in soup.find_all(tag):
_.extract()
except AttributeError:
pass
def _kana(self, txt):
hiragana = "ぁあぃいぅうぇえぉおかがきぎくぐけげこごさざしじすずせぜそぞただちぢっつづてでとどなにぬねのはばぱひびぴふぶぷへべぺほぼぽまみむめもゃやゅゆょよらりるれろゎわゐゑをん"
katakana = "ァアィイゥウェエォオカガキギクグケゲコゴサザシジスズセゼソゾタダチヂッツヅテデトドナニヌネノハバパヒビピフブプヘベペホボポマミムメモャヤュユョヨラリルレロヮワヰヱヲンヴ"
for _ in txt:
if _ in hiragana or _ in katakana:
return True
return False
def on_update(self, data):
k = json.loads(data)['content']
soup = BeautifulSoup(k, "html.parser")
self._remove_attrs(soup, "a")
#_remove_attrs(soup, "p")
self._remove_attrs(soup, "span")
self._remove_attrs(soup, "br")
#print("", flush=True)
tmpstr = str(soup.prettify())[3:-5]
tmpstr = tmpstr.replace("<p>","").replace("</p>","")
tmpstr = unicodedata.normalize("NFKC", tmpstr)
#print(tmpstr)
if self._kana(tmpstr):
'''
print(tmpstr)
print("")
res = self.m.parse(tmpstr).splitlines()[:-1]
for _ in res:
fa = _.split('\t')
if u"名詞" in fa[3] and not fa[0] in self.stopwords:
print(fa[0], end=' ')
r.zincrby('count',fa[0])
print("")
print("-----------------------")
#pass
'''
if time.time() - self.start_time > 300:
self.ts = self.ts + 1
self.start_time = time.time()
self.q.put([tmpstr, ts])
def on_notification(self, data):
print(data)
def on_delete(self, data):
#print("Deleted: {id}".format(id=data))
pass
class PawooSentenceSpout(storm.Spout):
# do nothing on ack and fail signals
def initialize(self, conf, context):
self.q = queue.Queue()
listener = MstdnStreamListner(self.q)
stream = MstdnStream('https://pawoo.net', '785639d26ddb1f0aa3ddac884f9cdf64c0748ae7394388a322b44fa69ec596df', listener)
stream.public()
pid = os.getpid()
base_path = '/var/log/takatoshi/'
logging.basicConfig(filename=base_path+__file__+str(pid)+'.log', level=logging.DEBUG)
logging.debug("abs path of py file: " + os.path.abspath(__file__))
def nextTuple(self):
item = self.q.get()
if not item is None:
storm.emit(item)
PawooSentenceSpout().run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment