import itertools
from streamparse.spout import Spout
from websocket import create_connection
import json
class webSocketSpout(Spout):
def initialize(self, stormconf, context):
self.ws = create_connection("ws://websocket.local.local:9000/")
def next_tuple(self):
result = self.ws.recv()
json_object = json.loads(result)
self.emit([result])
if __name__ == '__main__':
webSocketSpout().run()
'''
Sample event:
{
"action": "edit",
"change_size": 328,
"flags": "M",
"hashtags": [],
"is_anon": false,
"is_bot": false,
"is_minor": true,
"is_new": false,
"is_unpatrolled": false,
"mentions": [],
"ns": "Main",
"page_title": "St. Andre Bessette Catholic Secondary School",
"parent_rev_id": "663970563",
"rev_id": "659207915",
"summary": "replace with infobox school per TfD",
"url": "http://en.wikipedia.org/w/index.php?diff=663970563&oldid=659207915",
"user": "Frietjes"
}
'''
from collections import Counter
from streamparse.bolt import Bolt
from redis import StrictRedis
import json
import time
class jsonParser(Bolt):
def initialize(self, conf, ctx):
self.counts = Counter()
def process(self, tup):
keys = []
json_object = json.loads(str(tup.values[0]))
keys.append(["isanon_" + str(json_object["is_anon"])])
if json_object.get("is_anon"):
keys.append(["anon_anon"])
else:
keys.append(["anon_loggedin"])
if json_object.get("is_bot"):
keys.append(["bot_bot"])
else:
keys.append(["bot_human"])
keys.append(["action_" + json_object["action"]])
if json_object.get("geo_ip"):
keys.append(["country_" + json_object["geo_ip"]["country_name"]])
self.emit_many(keys)
class RedisBolt(Bolt):
def initialize(self, conf, ctx):
self.redis = StrictRedis(host="redishost")
self.counter = Counter()
def process(self, tup):
keys, = tup.values
key, word = keys.split("_")
#self.log_count(word)
if key == 'action':
self.redis.zincrby("actions", str(word), 1)
self.redis.zadd(key,int(time.time()), word)
else:
self.redis.zincrby(key, str(word), 1)
(ns wikipedialogs
(:use [streamparse.specs])
(:gen-class))
(defn wikipedialogs [options]
[
;; spout configuration
{"websocket-spout" (python-spout-spec
options
"spouts.websocket.webSocketSpout"
["word"]
:p 1
)
}
;; bolt configuration
{"parser-bolt" (python-bolt-spec
options
{"websocket-spout" :shuffle}
"bolts.bolts.jsonParser"
["word"]
:p 2
)
;; bolt configuration
"redis-bolt" (python-bolt-spec
options
{"count-bolt" :shuffle}
"bolts.bolts.RedisBolt"
;; does not emit any fields
:p 2
)
}
]
)
## Submit topology
# sparse run
# sparse submit --name wikipedialogs
Image Source :Official docs
Tuple Tree :
A spout tuple is not considered fully complete until all the tuples in the tree have finished processing.
We can leverage the Reliability API by anchoring, which is essentially tagging the new tuple with input tuple.
Special tasks dedicated, called ACKER Tasks.