Last active
December 18, 2017 00:08
-
-
Save hiroto-takatoshi/2f314931a99c04c49bb844db7be2daa8 to your computer and use it in GitHub Desktop.
main topology(not exactly)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package bigdatateam.burstyword; | |
import java.util.HashMap; | |
import java.util.Map; | |
import org.apache.storm.Config; | |
import org.apache.storm.LocalCluster; | |
import org.apache.storm.StormSubmitter; | |
import org.apache.storm.task.ShellBolt; | |
import org.apache.storm.spout.ShellSpout; | |
import org.apache.storm.topology.BasicOutputCollector; | |
import org.apache.storm.topology.IRichBolt; | |
import org.apache.storm.topology.IRichSpout; | |
import org.apache.storm.topology.OutputFieldsDeclarer; | |
import org.apache.storm.topology.TopologyBuilder; | |
import org.apache.storm.topology.base.BaseBasicBolt; | |
import org.apache.storm.tuple.Fields; | |
import org.apache.storm.tuple.Tuple; | |
import org.apache.storm.tuple.Values; | |
public class BurstyWordTopology { | |
public static class SplitSentenceBolt extends ShellBolt implements IRichBolt { | |
public SplitSentenceBolt() { | |
super("python", "split.py"); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields("word")); | |
} | |
@Override | |
public Map<String, Object> getComponentConfiguration() { | |
return null; | |
} | |
} | |
public static class PawooSentenceSpout extends ShellSpout implements IRichSpout { | |
public PawooSentenceSpout() { | |
super("python", "spout.py"); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields("sentence")); | |
} | |
@Override | |
public Map<String, Object> getComponentConfiguration() { | |
return null; | |
} | |
} | |
public static class RedisIOBolt extends ShellBolt implements IRichBolt { | |
public RedisIOBolt() { | |
super("python", "redisio.py"); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields("shit")); | |
} | |
@Override | |
public Map<String, Object> getComponentConfiguration() { | |
return null; | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
TopologyBuilder builder = new TopologyBuilder(); | |
builder.setSpout("spout", new PawooSentenceSpout(), 1); | |
builder.setBolt("split", new SplitSentenceBolt(), 1).shuffleGrouping("spout"); | |
builder.setBolt("count", new RedisIOBolt(), 1).shuffleGrouping("split"); | |
//conf.setDebug(true); | |
//String topologyName = "word-count"; | |
//conf.setNumWorkers(3); | |
Config conf = new Config(); | |
conf.setDebug(true); | |
if (args != null && args.length > 0) { | |
conf.setNumWorkers(3); | |
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); | |
} | |
else { | |
conf.setMaxTaskParallelism(3); | |
LocalCluster cluster = new LocalCluster(); | |
cluster.submitTopology("word-count", conf, builder.createTopology()); | |
Thread.sleep(10000); | |
cluster.shutdown(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import storm | |
# main json formatter | |
import redis | |
# directly write to redis db for some data | |
import math | |
from scipy.special import comb | |
import sys | |
# redis word data entry format : | |
# A sorted set that stores Pj | |
# A sorted set that saves the word's numbers by the last update: | |
# - Last updated, a timestamp | |
# - n(i,j), real number | |
# - n'(i,j), generated number where N is normalized to 100, po * 100 | |
# - po(i,j) | |
# - pg(i,j) | |
# - pb(i,j) | |
# A sorted set that saves Ni (updated in the upper bolt) | |
class RedisIOBolt(storm.BasicBolt): | |
def initialize(self, stormconf, context): | |
self.r = redis.Redis(host='localhost', port=6379, db=0) | |
pid = os.getpid() | |
base_path = '/var/log/takatoshi/' | |
logging.basicConfig(filename=base_path+__file__+str(pid)+'.log', level=logging.DEBUG) | |
logging.debug("abs path of py file: " + os.path.abspath(__file__)) | |
def process(self, tup): | |
word = tup.values[0] | |
ts = tup.values[1] | |
t = r.get("lastupd_"+word) | |
if t is None: | |
t = 0 | |
if t < ts: | |
r.getset("lastupd_"+word, ts) | |
r.getset("n_"+word, 1) | |
newpo = r.get("n_"+word) / r.get("total_"+ts) | |
r.getset("po_"+word, newpo) | |
newpj = t / ts * + 1 / ts * newpo | |
r.getset("pj_"+word, newpj) | |
newpg = comb(10,3) * (newpj ** (100 * newpo)) * ((1 - newpj) ** (100 - 100 * newpo)) | |
r.getset("pg_"+word, newpg) | |
# TODO: newpb | |
else : | |
r.incr("n_"+word, 1) | |
newpo = r.get("n_"+word) / r.get("total_"+ts) | |
oldpo = r.getset("po_"+word, newpo) | |
if oldpo is None: | |
oldpo = 0 | |
oldpj = r.get("pj_"+word) | |
newpj = oldpj - 1 /ts * oldpo + 1 / ts * newpo | |
r.getset("pj_"+word, newpj) | |
newpg = comb(10,3) * (newpj ** (100 * newpo)) * ((1 - newpj) ** (100 - 100 * newpo)) | |
r.getset("pg_"+word, newpg) | |
RedisIOBolt.run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Received a whole sentence somehow, split it by MeCab | |
import MeCab | |
# for analysis | |
import storm | |
# main json formatter | |
import redis | |
# directly write to redis db for some data | |
import logging | |
class SplitSentenceBolt(storm.BasicBolt): | |
def initialize(self, stormconf, context): | |
f = open("Japanese.txt") | |
self.stopwords = f.read().split('\n') | |
f.close() | |
self.m = MeCab.Tagger("-Ochasen") | |
self.r = redis.Redis(host='localhost', port=6379, db=0) | |
pid = os.getpid() | |
base_path = '/var/log/takatoshi/' | |
logging.basicConfig(filename=base_path+__file__+str(pid)+'.log', level=logging.DEBUG) | |
logging.debug("abs path of py file: " + os.path.abspath(__file__)) | |
def process(self, tup): | |
sentence = tup.values[0] | |
ts = tup.values[1] | |
res = self.m.parse(tmpstr).splitlines()[:-1] | |
output = set() | |
for _ in res: | |
fa = _.split('\t') | |
if u"名詞" in fa[3] and not fa[0] in self.stopwords: | |
output.add(fa[0]) | |
if len(output): | |
r.incr('total_'+ts) | |
for _ in output: | |
storm.emit([_,ts]) | |
SplitSentenceBolt().run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# credit : https://qiita.com/kk6/items/8351a6541598cf7151ef | |
from bs4 import BeautifulSoup | |
import operator | |
from urllib.parse import urljoin | |
import json | |
import sys | |
import unicodedata | |
import time | |
import requests | |
# for http/2 streamjob | |
import storm | |
# main json formatter | |
import queue | |
# local queue | |
import logging | |
class MstdnStream: | |
"""Mastodon Steam Class | |
Usage:: | |
>>> from mstdn import MstdnStream, MstdnStreamListner | |
>>> listener = MstdnStreamListner() | |
>>> stream = MstdnStream('https://pawoo.net', 'your-access-token', listener) | |
>>> stream.public() | |
""" | |
def __init__(self, base_url, access_token, listener): | |
self.base_url = base_url | |
self.session = requests.Session() | |
self.session.headers.update({'Authorization': 'Bearer ' + access_token}) | |
self.listener = listener | |
def public(self): | |
url = urljoin(self.base_url, '/api/v1/streaming/public') | |
resp = self.session.get(url, stream=True) | |
resp.raise_for_status() | |
event = {} | |
start_time = time.time() | |
cnt = 0 | |
for line in resp.iter_lines(): | |
line = line.decode('utf-8') | |
if not line: | |
# End of content. | |
#cnt = cnt + 1 | |
method_name = "on_{event}".format(event=event['event']) | |
f = operator.methodcaller(method_name, event['data']) | |
f(self.listener) | |
# refreash | |
event = {} | |
continue | |
if line.startswith(':'): | |
# TODO: Handle heatbeat | |
#print('startwith ":" {line}'.format(line=line)) | |
#pass | |
pass | |
else: | |
key, value = line.split(': ', 1) | |
if key in event: | |
event[key] += value | |
else: | |
event[key] = value | |
class MstdnStreamListner: | |
def __init__(self, theq): | |
self.m = tagger | |
self.r = rserver | |
self.q = theq | |
self.ts = 1 | |
self.start_time = time.time() | |
def _remove_attrs(self, soup, tag): | |
try: | |
for _ in soup.find_all(tag): | |
_.extract() | |
except AttributeError: | |
pass | |
def _kana(self, txt): | |
hiragana = "ぁあぃいぅうぇえぉおかがきぎくぐけげこごさざしじすずせぜそぞただちぢっつづてでとどなにぬねのはばぱひびぴふぶぷへべぺほぼぽまみむめもゃやゅゆょよらりるれろゎわゐゑをん" | |
katakana = "ァアィイゥウェエォオカガキギクグケゲコゴサザシジスズセゼソゾタダチヂッツヅテデトドナニヌネノハバパヒビピフブプヘベペホボポマミムメモャヤュユョヨラリルレロヮワヰヱヲンヴ" | |
for _ in txt: | |
if _ in hiragana or _ in katakana: | |
return True | |
return False | |
def on_update(self, data): | |
k = json.loads(data)['content'] | |
soup = BeautifulSoup(k, "html.parser") | |
self._remove_attrs(soup, "a") | |
#_remove_attrs(soup, "p") | |
self._remove_attrs(soup, "span") | |
self._remove_attrs(soup, "br") | |
#print("", flush=True) | |
tmpstr = str(soup.prettify())[3:-5] | |
tmpstr = tmpstr.replace("<p>","").replace("</p>","") | |
tmpstr = unicodedata.normalize("NFKC", tmpstr) | |
#print(tmpstr) | |
if self._kana(tmpstr): | |
''' | |
print(tmpstr) | |
print("") | |
res = self.m.parse(tmpstr).splitlines()[:-1] | |
for _ in res: | |
fa = _.split('\t') | |
if u"名詞" in fa[3] and not fa[0] in self.stopwords: | |
print(fa[0], end=' ') | |
r.zincrby('count',fa[0]) | |
print("") | |
print("-----------------------") | |
#pass | |
''' | |
if time.time() - self.start_time > 300: | |
self.ts = self.ts + 1 | |
self.start_time = time.time() | |
self.q.put([tmpstr, ts]) | |
def on_notification(self, data): | |
print(data) | |
def on_delete(self, data): | |
#print("Deleted: {id}".format(id=data)) | |
pass | |
class PawooSentenceSpout(storm.Spout): | |
# do nothing on ack and fail signals | |
def initialize(self, conf, context): | |
self.q = queue.Queue() | |
listener = MstdnStreamListner(self.q) | |
stream = MstdnStream('https://pawoo.net', '785639d26ddb1f0aa3ddac884f9cdf64c0748ae7394388a322b44fa69ec596df', listener) | |
stream.public() | |
pid = os.getpid() | |
base_path = '/var/log/takatoshi/' | |
logging.basicConfig(filename=base_path+__file__+str(pid)+'.log', level=logging.DEBUG) | |
logging.debug("abs path of py file: " + os.path.abspath(__file__)) | |
def nextTuple(self): | |
item = self.q.get() | |
if not item is None: | |
storm.emit(item) | |
PawooSentenceSpout().run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment