" Stream processing frameworks are not only for real-time computation but an infrastructure for never ending continuous data processing "
Conceptual view
Physical View
import itertools
from streamparse.spout import Spout
class SentenceSpout(Spout):
def initialize(self, stormconf, context):
self.sentences = [
"She advised him to take a long holiday, so he immediately quit work and took a trip around the world",
"I was very glad to get a present from her",
"He will be here in half an hour",
"She saw him eating a sandwich",
]
self.sentences = itertools.cycle(self.sentences)
def next_tuple(self):
sentence = next(self.sentences)
self.emit([sentence])
def ack(self, tup_id):
pass # if a tuple is processed properly, do nothing
def fail(self, tup_id):
pass # if a tuple fails to process, do nothing
if __name__ == '__main__':
SentenceSpout().run()
import re
from streamparse.bolt import Bolt
class SentenceSplitterBolt(Bolt):
def process(self, tup):
sentence = tup.values[0] # extract the sentence
sentence = re.sub(r"[,.;!\?]", "", sentence) # get rid of punctuation
words = [[word.strip()] for word in sentence.split(" ") if word.strip()]
if not words:
# no words to process in the sentence, fail the tuple
self.fail(tup)
return
self.emit_many(words)
self.ack(tup) # tell Storm the tuple has been processed successfully
if __name__ == '__main__':
SentenceSplitterBolt().run()
{"sentence-splitter" (shell-bolt-spec
;; inputs, where does this bolt recieve it's tuples from?
{"word-spout-1" :shuffle
}
;; command to run
["python" "sentence_splitter.py"]
;; output spec, what tuples does this bolt emit?
["word"]
;; configuration parameters
:p 2)
"word-counter" (shell-bolt-spec
;; recieves tuples from "sentence-splitter", grouped by word
{"sentence-splitter" ["word"]}
["python" "word_counter.py"]
["word" "count"])
"word-count-saver" (shell-bolt-spec
{"word-counter" :shuffle}
["python" "word_saver.py"]
;; does not emit any fields
[])}
** Grouping **
How that stream should be partitioned among the bolt’s tasks.
There are variety of ways you can use, but the most common one's used are :
Note: For detailed list please refer to URL : http://storm.incubator.apache.org/documentation/Concepts.html#stream-groupings
** Physical View **
File/Folder | Contents |
---|---|
config.json | Configuration information for all of your topologies. |
fabfile.py | Optional custom fabric tasks. |
project.clj | leiningen project file, can be used to add external JVM dependencies |
src/ | Python source files (bolts/spouts/etc.) for topologies. |
tasks.py | Optional custom invoke tasks. |
topologies/ | Contains topology definitions written using the Clojure DSL for Storm. |
virtualenvs/ | Contains pip requirements files in order to install dependencies on remote Storm servers. |
#DSL for defining topologies
#----------------------------
(ns wordcount
(:use [backtype.storm.clojure])
(:gen-class))
(def wordcount
[
;; spout configuration
{"word-spout" (shell-spout-spec
["python" "words.py"]
["word"]
)
}
;; bolt configuration
{"count-bolt" (shell-bolt-spec
{"word-spout" :shuffle}
["python" "wordcount.py"]
["word" "count"]
:p 2
)
}
]
)
** Enough of talking, now let's build something. **
Sample message
--------------
{
"action": "edit",
"change_size": 98,
"flags": null,
"is_anon": false,
"is_bot": false,
"is_minor": false,
"is_new": false,
"is_unpatrolled": false,
"ns": "Main",
"page_title": "Abu Bakr al-Baghdadi",
"parent_rev_id": "615788913",
"rev_id": "615784505",
"summary": "/* As Caliph of the Islamic State */ this should have pointed out that some sources deny this was al-Baghdadi - Wikipedia cannot say in its own voice that this is genuine while it is contested",
"url": "http://en.wikipedia.org/w/index.php?diff=615788913&oldid=615784505",
"user": "Dougweller"
}
[user@localhost opt]# sparse quickstart wikipedia_editLogs_trends
%%bash
cd /opt/wikipedia_editLogs_trends/
ls
_build config.json fabfile.py log-cleaner.log project.clj README.md _resources src tasks.py topologies virtualenvs zookeeper.out
#src/KafkaSpout.py
from streamparse.spout import Spout
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
class KafkaSpout(Spout):
def initialize(self, stormconf, context):
self.kafka = KafkaClient('127.0.0.1:9092')
self.consumer = SimpleConsumer(self.kafka,"SpoutTopologyGroup","WikiPediaLogs")
def next_tuple(self):
for message in self.consumer:
self.emit([message])
if __name__ == '__main__':
KafkaSpout().run()
#src/ParseJson.py
from collections import Counter
from streamparse.bolt import Bolt
import json
import time
class ParseJson(Bolt):
def initialize(self, conf, ctx):
self.counts = Counter()
def process(self, tup):
# Few castings
null = ""
true = True
false = False
##Main Logic
list_words = []
json_object = json.loads(str(tup.values[0]))
action_str = "g2_" + str(json_object['action'])
list_words.append([action_str])
##Bot or Not
if json_object['is_bot']:
list_words.append(["bot"])
else:
list_words.append(["human"])
##Anon or LoggedIN
if json_object['is_anon']:
list_words.append(["g3_Anon"])
else:
list_words.append(["g3_LoggedIN"])
self.emit_many(list_words)
if __name__ == '__main__':
ParseJson().run()
#src/Counts
from collections import Counter
from collections import defaultdict
from streamparse.bolt import Bolt
from websocket import create_connection
import json
import time
class WordCounter(Bolt):
def initialize(self, conf, ctx):
#self.counts = Counter()
self.counts = defaultdict(int)
self.ws = create_connection("ws://localhost:2546")
def process(self, tup):
word = tup.values[0]
self.counts[word] += 1
self.ws.send(json.dumps(self.counts))
self.emit([word,self.counts[word]])
if __name__ == '__main__':
WordCounter().run()
#Topology
(ns wikipediaEditLogs
(:use [backtype.storm.clojure])
(:gen-class))
(def wordcount
[
;; spout configuration
{"kafka-spout" (shell-spout-spec
["python" "KafkaSpout.py"]
["word"]
)
}
;; bolt configuration
{"parse-bolt" (shell-bolt-spec
{"kafka-spout" :shuffle}
["python" "ParseJson.py"]
["word"]
:p 2
)
;; bolt configuration
"count-bolt" (shell-bolt-spec
{"parse-bolt" ["word"]}
["python" "wordcount.py"]
["word" "count"]
:p 2
)
}
]
)
[user@localhost opt]# sparse run
[user@localhost opt]# sparse submit --name topology_name