This script takes as input a pointer to a S3 bucket containing a set of text files, and outputs a set of JSON files, each of which contains a set of prior strings (e.g. 1, 2, 3 or 4 prior words separated by spaces) as keys, pointing to arrays of the most common occurances for the next word in the sequence. The JSON files can be used in next-word prediction applications, as demonstrated in this blog post.
Amazon ElasticMapReduce (EMR) is used to manage the Hadoop cluster for the calculations. Two MapReduce steps are carried out in sequence:
These two steps are run for each value of N considered. (In the configuration below, N = [2,3,4,5] .)
The following are the main configuration parameters for this script. Note that variables 'vocabSize' and 'occCutoff' are parameters that can be tuned to trade off model size and performance.
Also note that AWS credentials are required, as described below, along with a previously-computed list of 1-grams, sorted by frequency, that is used to produce the vocabulary list. (Note that the list of 1-grams can be computed by using this script with the same text input folder, the variable 'Nlist' set to [1], and only using the phase_1 mapper and reducer.)
inputfoldername = 's3://wordpredictor1/input/'
bucketname = 'wordpredictor2'
vocabSize = 1000
occCutoff = 10
numKeep = 5
Nlist = [2,3,4,5]
masterType = "m3.xlarge"
workerType = "m3.xlarge"
numWorkers = 2
maxTime = 5 * 60 * 60
checkTime = 30
Libraries: boto is used for AWS interaction, Paramiko for ssh, and Pandas is only used here for nice table outputs in IPython (could thus be cut out if desired).
import sys, os, time
import ast, json
import pandas as pd
from boto.s3.connection import S3Connection
from boto.s3.key import Key
from boto.emr.connection import EmrConnection
from boto.emr.instance_group import InstanceGroup
from boto.emr.step import StreamingStep
Access keys and passwords: in external files, formatted as lines of "[name] = [value]", as per the AWS rootkey.csv download.
AWSAccessKeyId, AWSSecretKey = ( line.strip().split('=')[1] for line in open('/Users/brian/rootkey.csv','r') )
sshKeyName, instancePass, mysqlPass, myIP = ( line.strip().split('=')[1] for line in open('/Users/brian/passwords.csv','r') )
We will pull our vocabulary list from the 'vocabSize' number of most frequent words in a previously-computed list of 1-grams.
vocabList_df = pd.read_table("ngrams1.tsv", nrows=vocabSize, names=["word","occ"])
vocabList_df.head()
word | occ | |
---|---|---|
0 | ##s## | 9299843 |
1 | ##es## | 9299843 |
2 | the | 4751890 |
3 | to | 2753081 |
4 | and | 2411141 |
vocabList = set(vocabList_df.word.tolist())
For each N (ie. for each N-gram level), two MapReduce steps are required, hereafter referred to as 'phase_1' and 'phase_2'. The mapper and reducer python scripts (Hadoop streaming is used to allow python) are copied below - they should be saved as files in the working directory, from which they will be copied (with modifications as necessary) to S3 to be used by EMR.
For local testing, note that you can test a mapper outside of Hadoop with the following:
head -50 text.sample.txt > testfile
cat testfile | ./phase1_mapper2.py
Or test both a mapper and a reducer:
cat testfile | ./phase1_mapper2.py | sort | ./reducer.py
phase1_mapper_template.py
#!/usr/bin/python
import sys, re
n = $n$
vocabSet = $vocabList$
for line in sys.stdin:
# tokenize string
s = re.sub("[.!?;]+", " ##s## ", line)
s = "##s## " + s
s = re.sub("##s##[\s]+##s##", "", s)
regex = "[^\s,:=<>/\\)\\(\"]+"
tokens = re.findall(regex, s.lower())
# replace non-vocab tokens with "##unkn##"
for i,t in enumerate(tokens):
if not t in vocabSet:
tokens[i] = "##unkn##"
# find n-grams
ngrams = []
for i in range(n-1,len(tokens)):
new = ""
for j in range(n-1,-1,-1):
new += tokens[i-j] + " "
ngrams.append(new.strip())
# output n-grams
for l in ngrams:
print "{0}\t{1}".format(str(l).strip(), 1)
phase1_reducer.py
#!/usr/bin/python
import sys
oldKey = None
s = 0
for line in sys.stdin:
data_mapped = line.strip().split("\t")
if len(data_mapped) != 2:
continue
thisKey, thisVal = data_mapped
if oldKey and oldKey != thisKey:
print str(oldKey).strip(), "\t", str(s)
oldKey = thisKey
s = 0
oldKey = thisKey
s += int(thisVal)
if oldKey != None:
print str(oldKey).strip(), "\t", str(s)
phase2_mapper_template.py
#!/usr/bin/python
import sys, re
n = $n$
occCutoff = $occCutoff$
for line in sys.stdin:
l = line.strip().split()
if len(l) != n + 1:
continue
# filter lines to only include those with more than occCutoff occurances
if int(l[n]) > occCutoff and not "#" in l[n-1]:
# format output to be tab delimited as : preface - word - occurances
preface = l[0]
for i in range(1,n-1):
preface += " " + l[i]
print preface + "\t" + l[n-1] + "\t" + l[n]
phase2_reducer_template.py
#!/usr/bin/python
import sys
numKeep = $numKeep$
oldKey = None
topX = []
sum = 0
for line in sys.stdin:
data_mapped = line.strip().split("\t")
if len(data_mapped) != 3:
continue
thisKey, word, occ = data_mapped
if oldKey and oldKey != thisKey:
print str(oldKey).strip() + "\t" + str(sum) + "\t" + str(topX)
topX = []
sum = 0
topX.append((word,int(occ)))
if len(topX) > numKeep:
topX = sorted(topX,key=lambda x: x[1],reverse=True)
o = topX.pop()
oldKey = thisKey
sum += int(occ)
if oldKey != None:
print str(oldKey).strip() + "\t" + str(sum) + "\t" + str(topX)
phase1_mapper_template_file = 'phase1_mapper_template.py'
phase1_reducer_file = 'phase1_reducer.py'
phase2_mapper_template_file = 'phase2_mapper_template.py'
phase2_reducer_template_file = 'phase2_reducer_template.py'
In this section, we set up the necessary files and folders on S3 that will be referenced and used by EMR in the next section. (General notes on the use of boto for S3 can be found here.)
s3_conn = S3Connection(AWSAccessKeyId, AWSSecretKey)
bucket = s3_conn.get_bucket(bucketname)
# add mappers directory to local if it does not exist
if not os.path.exists('mappers'):
os.makedirs('mappers')
# write a separate phase1 mapper function for each value of n
for n in Nlist:
mf = "mappers/phase1_mapper" + str(n) + ".py"
with open(mf,'w') as mff:
with open(phase1_mapper_template_file,'r') as mtf:
mff.write(mtf.read().replace("$n$", str(n)).replace("$vocabList$", str(vocabList)))
k = bucket.new_key("phase1_mapper" + str(n) + ".py")
o = k.set_contents_from_filename(mf)
k = bucket.new_key('phase1_reducer.py')
o = k.set_contents_from_filename(phase1_reducer_file)
# write a separate phase2 mapper function for each value of n
for n in Nlist:
mf = "mappers/phase2_mapper" + str(n) + ".py"
with open(mf,'w') as mff:
with open(phase2_mapper_template_file,'r') as mtf:
mff.write(mtf.read().replace("$n$", str(n)).replace("$occCutoff$", str(occCutoff)))
k = bucket.new_key("phase2_mapper" + str(n) + ".py")
o = k.set_contents_from_filename(mf)
with open("phase2_reducer.py",'w') as mff:
with open(phase2_reducer_template_file,'r') as mtf:
mff.write(mtf.read().replace("$numKeep$", str(numKeep)))
k = bucket.new_key("phase2_reducer.py")
o = k.set_contents_from_filename("phase2_reducer.py")
k = bucket.new_key("phase1_output/")
o = k.set_contents_from_string('')
k = bucket.new_key("phase2_output/")
o = k.set_contents_from_string('')
This section configures and launches the computations on EMR. (See notes on using boto to configure jobs and connecting to EMR and launching jobs.)
emr_conn = EmrConnection(AWSAccessKeyId, AWSSecretKey)
instance_groups = []
instance_groups.append(InstanceGroup(
num_instances = 1,
role = "MASTER",
type = masterType,
market = "ON_DEMAND",
name = "Main node"))
instance_groups.append(InstanceGroup(
num_instances = numWorkers,
role = "CORE",
type = workerType,
market = "ON_DEMAND",
name = "Worker nodes"))
steps = []
for n in Nlist:
steps.append( StreamingStep(
name = "phase1_" + str(n),
mapper = "s3://" + bucketname + "/phase1_mapper" + str(n) + ".py",
combiner = "s3://" + bucketname + "/phase1_reducer.py",
reducer = "s3://" + bucketname + "/phase1_reducer.py",
input = inputfoldername,
output = "s3://" + bucketname + "/phase1_output/n" + str(n) + "/") )
steps.append( StreamingStep(
name = "phase2_" + str(n),
mapper = "s3://" + bucketname + "/phase2_mapper" + str(n) + ".py",
reducer = "s3://" + bucketname + "/phase2_reducer.py",
input = "s3://" + bucketname + "/phase1_output/n" + str(n) + "/",
output = "s3://" + bucketname + "/phase2_output/n" + str(n) + "/") )
cluster_id = emr_conn.run_jobflow(
name = "ngramcalc",
instance_groups = instance_groups,
log_uri = "s3://" + bucketname + "/logs/",
steps = steps,
ec2_keyname = sshKeyName,
ami_version = "latest")
The following periodically checks the status of the EMR job and waits for completion or failure before moving on. Note that with the current configuration of this script, the EMR job requires about 39 minutes.
count = 0
current_state = ""
while count < maxTime:
time.sleep(checkTime)
job_desc = emr_conn.describe_jobflow(cluster_id)
if job_desc.state != current_state:
current_state = job_desc.state
print current_state
if current_state == 'COMPLETED' or current_state == 'FAILED':
count = maxTime
else:
count += checkTime
STARTING BOOTSTRAPPING RUNNING SHUTTING_DOWN COMPLETED
The results are then downloaded from S3.
# make output directory if not already there
if not os.path.exists('output'):
os.makedirs('output')
# download all of the results to that directory
for n in Nlist:
outfilelist = bucket.list("phase2_output/n" + str(n) + "/")
for key in outfilelist:
key.get_contents_to_filename("output/" + key.name.replace("/","."))
And combined to produce a single file for each Ngram level.
filenames = os.listdir('output')
for n in Nlist:
catlist = "cat"
for f in filenames:
if ("output.n" + str(n) + ".part") in f:
catlist += " output/" + f
catlist += " > ngrams" + str(n) + ".tsv"
os.system(catlist)
pd.read_table("ngrams" + str(Nlist[0]) + ".tsv", nrows=10, names=["preface","sum","output"])
preface | sum | output | |
---|---|---|---|
0 | ' | 5612 | [('i', 656), ('and', 633), ('he', 464), ('said... |
1 | -- | 46879 | [('and', 4705), ('the', 4027), ('a', 3080), ('... |
2 | 000 | 20160 | [('in', 2426), ('to', 1862), ('people', 1619),... |
3 | 16 | 5032 | [('years', 464), ('and', 431), ('percent', 278... |
4 | 2010 | 6638 | [('and', 1028), ('the', 512), ('to', 334), ('w... |
5 | 30 | 21261 | [('p', 4837), ('a', 2476), ('minutes', 2093), ... |
6 | 5 | 26158 | [('million', 2562), ('percent', 2303), ('minut... |
7 | according | 24553 | [('a', 13), ('to', 24479), ('the', 61)] |
8 | added | 10294 | [('to', 1834), ('a', 1600), ('that', 1352), ('... |
9 | alone | 6199 | [('in', 996), ('and', 620), ('with', 370), ('i... |
The results are then output to JSON for use in this blog post or in similar applications.
for n in Nlist:
outobj = {}
with open("ngrams" + str(n) + ".tsv","r") as f:
for line in f:
outobj[line.split("\t")[0]] = [ast.literal_eval(line.split("\t")[2].strip()), int(line.split("\t")[1].strip())]
with open("ngrams" + str(n) + ".json","w") as f:
f.write(json.dumps(outobj))