For this Homework we'll explore data analytics with a distributed computing stack, namely Spark + Hadoop's HDFS. You'll configure your own HDFS and Spark "cluster" for distributed operation. The master and slave machines will actually all run on the same instance, but the process of configuring and running is identical. You'll be able to view cluster diagonistics on your system as though it were a distributed system.
You would apply the same steps to deploy and run a production cluster with HDFS and Spark. The process is really quite simple, but make sure you do each step carefully and verify that you have each stage working before proceeding to the next.
In this lab we'll be viewing the Spark console from your EC2 instance. The Spark consoles are by default on port 8080, 8081 and 4040 which you can't access directly through your instance's firewall. Therefore you'll have to
Edit your ssh connection script (or Putty config.) on your local machine to add a tunnel from port 8080 on your client to 8080 on your EC2 instance. The format is exactly the same as the tunnels you have from port 8888 etc to the matching port on the instance for Ipython use.
Edit your ssh connection script or Putty on your client machine to add tunnels from port 4040 and 8081 on localhost to 4040 and 8081 respectively on the EC2 instance. These ports have more Spark job info.
Edit ~/.bashrc on the EC2 instance and copy the "PATH=" and "export PATH" lines from ~/.bash_profile. This is so that the executor, which is actually running on the same machine, gets the right path and runs the right version of Python.
Do this before connecting to your instance to run IPython.
This notebook is designed to run on your EC2 instance. Open an ssh connection on your instance, and create a directory for this HW, say "hw7" and cd
into it. Right click on the download link at the top right of this page, and then do
wget <paste>
to downlaod this notebook onto your instance. Then start ipython as normal:
ipython notebook
and open this notebook. Later we will connect this running notebook to Spark.
As before, its more convenient to access this notebook from a local (laptop) browser. Open a browser, and point it at http://localhost:8888
or whaterever ipython printed when it started the notebook.
Hadoop's distributed file system comprises a name node and various data nodes. See e.g. see this description
Your EC2 instance has a hadoop installation in /opt/hadoop
. Notice also that if you type the following from a bash shell:
echo $HADOOP_PREFIX /opt/hadoop
i.e. the variable $HADOOP_PREFIX has already been configured in your ~/.bash_profile
to point to the HADOOP install directory (if this variable isnt set for some reason, please modify your .bash_profile so that it is)
The hadoop configuration files are in $HADOOP_PREFIX/etc/hadoop
. Change to that directory now. Check the contents of each of these configuration files:
core-site.xml
contains the specification of the hdfs service:
<configuration> <property> <name>fs.default.name</name> <value>hdfs://localhost:9000</value> </property> </configuration>
You use the prefix "hdfs://localhost:9000/" in filenames to access HDFS files from this machine. From another machine you should replace "localhost" with the IP address of your instance.
hdfs-site.xml
contains details of the hdfs service:
<configuration> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>dfs.name.dir</name> <value>/data/hdfs/name</value> </property> <property> <name>dfs.data.dir</name> <value>/data/hdfs/data</value> </property> </configuration>
This configuration defines a replication factor of 1 for files (i.e. only one rather than multiple copies are kept), and names for the data directories on the machine where HDFS data is kept. The "name" directory is use by the namenode to keep track of file blocks and other meta data. The "data" directory contains the contents of those blocks. In a full distributed implementation, the name node and name directory would be on one machine while the data node service and data directories would be on multiple other machines.
Make sure that the directories specified above, i.e. /data/hdfs/name
and /data/hdfs/data
exist and are writable.
Although we wont be using mapreduce this time, its useful to do this configuration.
In mapred-site.xml
<configuration> <property> <name>mapred.job.tracker</name> <value>hdfs://localhost:9001</value> </property> </configuration>
which specifies that the job tracker service should run on port 9001 if you start mapreduce.
Hadoop uses ssh for inter-server communication, even on the same machine. Try doing:
ssh localhost
It will probably throw an error because the ssh key for ec2-user has been changed. That's OK. Go ahead and remove the "known_hosts" file
rm ~/.ssh/known_hosts
and try again. It should work this time. You will be logged into your machine (recursively) twice, so just do:
exit
to return to a logged-in-once prompt.
Just like a normal file system, you have to format HDFS before its first use. Make sure that the hadoop binary directories are in your path (/opt/hadoop/bin and /opt/hadoop/sbin) if not edit your .bash_profile
so that they are.
To format hdfs, do
hadoop namenode -format
You should only need to do this once, unless your HDFS get corrupted somehow. If your FS does get corrupted, remove everything in the data directories first before reformatting.
Just like formatting a disk, this step will destroy any data you have saved in HDFS.
To start the hdfs service do
cd $HADOOP_PREFIX sbin/start-dfs.sh
and say "yes" if it prompts you to add the host "0.0.0.0" (localhost by another name) to the list of known hosts.
Now you can explore the hadoop filesystem. The list of file system commands is available here
Initially there will be nothing there, so lets create a directory:
hadoop fs -mkdir /mydata
and then to look at the results do
hadoop fs -ls /
From the documentation you can see that most Unix file system commands are available, except those that relate to a working directory (like "cd"). HDFS doesnt persist any state between commands so there is no notion of a working directory. So you always need to give it absolute paths to files and directories.
Next lets add a large data file to HDFS. cd to the directory /data/rcv1
. This directory contains the RCV1 news article data, including raw text files. There are four files
lyrl2004_tokens_test_pt0.dat.gz lyrl2004_tokens_test_pt1.dat.gz lyrl2004_tokens_test_pt2.dat.gz lyrl2004_tokens_test_pt3.dat.gz
Unzip these files, and concatenate them into a single text file called all.txt. Upload this file to hdfs with:
hadoop fs -put all.txt /mydata/rcv1_raw.txt
Then doing
hadoop fs -ls /mydata
should produce an output like
Found 1 items -rw-r--r-- 1 ec2-user supergroup 608585485 2015-11-06 16:33 /mydata/rcv1_raw.txt
which shows a file of 608 MB. Check that your own file's size matches.
Remember that you specifed namenode and data directories in /data/hdfs/{name,data}
. The data directory does indeed contain your data. Browse down the directory hierarchy until you find a directory containing large blocks of data. You may find it helpful to use the "du" command to find which directories have large contents under them.
What size are the file blocks in your HDFS?
Configuring Spark for distributed computing is similar. First, you need to make sure that the shell variable SPARK_HOME
points to /opt/spark
. Edit your .bash_profile so this is true and call
source ~/.bash_profile
in your ec2 connection to make sure that it gets set in the current environment.
The subdirectory $SPARK_HOME/conf
contains configuration files. Change to this directory. There are templates for the configuration files which only need a few modifications so copy these to the actual config file names:
cp slaves.template slaves cp spark-env.sh.template spark-env.sh cp spark-defaults.conf.template spark-defaults.conf
To start Spark, cd to the main spark directory /opt/spark
and do
sbin/start-all.sh
You should now be able to view the Spark Master console by pointing your local (laptop) browser at
http://localhost:8080
if you have trouble, make sure you created the SSH tunnel mentioned in "Setup" at the start of this HW.
The Spark console on http://localhost:8080
will tell you where the Spark service is running (normally port 7077) and various other useful information. With this info, we can connect to the Spark service from IPython.
The Master Console also contains links to a Console for the main Executor and the Spark Application console. These links point directly to the corresponding ports on your EC2 instance. But since they're behind a firewall you cant access them through these links. However, since we made tunnels before you can access them through the tunnels:
Spark Executor Console is at http://localhost:8081
. This contains information about executors, like how much memory is allocated for each one. Open a browser window to this URL (you can click on the link in this notebook) and take a look.
NOTE: We will connect to Spark directly from this running Ipython notebook. You dont have to copy cells into Pyspark. So execute the following cell directly to set Python paths to the PySpark libraries:
import sys
import os
spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/pyspark'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))
Then we create a Spark Context that wraps the Spark Server. DO THIS ONLY ONCE PER SESSION. The server wont allow you to have multiple Spark contexts. The Spark application server is on port 7077 and we get the local host name with a call to platform.node()
import pyspark
from pyspark.context import SparkContext
import platform
import atexit
nodename = platform.node()
sc = SparkContext(master="spark://"+nodename+":7077", appName="hw7")
atexit.register(lambda: sc.stop())
You could also run your queries in an IPython shell running on another machine. You would change the URL to
master="spark://<machinename>:7077"
after making sure that this port was open between the two machines.
You can also create a special ipython profile containing the above initialization. Some instructions for doing that are given here
Lets check that things are working before going further. Lets load a local file (make sure you fixed permissions as in Lab 9).
a = sc.textFile("/data/MovieLens/movies.dat")
a.count() # There should be 10681 movies
Now lets see if we can import a large file from hdfs:
raw = sc.textFile("hdfs://localhost:9000/mydata/rcv1_raw.txt")
Which runs as usual at lightning speed. But that's because the copy didnt happen yet. To make Spark really copy we compute the size again.
raw.count()
Although we could have used any HDFS URI in the call to "textFile" above, Spark is optimized to run with its executors on the slave nodes of an HDFS cluster. When used in that mode, Spark will pull data primarily from local files in HDFS reducing the network load.
Now look at the Spark application console which is at http://localhost:4040 (Note: there is a link to this URL from the Spark master console but it won't work because its on the wrong side of the tunnels that we made).
Click on the line for job you just ran (it should be the top line).
Look at the stages of the job. You will see some summary metrics for it.
Click also on the "storage" tab, what do you see?
You'll probably be surprised, but that's because "storage" here refers to Spark's memory cached storage. There isnt any yet, because we havent asked Spark to cache anything. Lets do that now:
raw.cache()
raw.take(10)
Now go back to the application console, click on the "storage" tab if you're not there already and refresh the browser window. What do you see this time?
Now that Spark and HDFS are up and running, your task should be quite easy. Write a series of Spark commands to do the following:
Filter the "raw" RDD to remove lines starting with ".I" or ".W"
Split each line at whitespace and contatenate all the words into a single RDD of Strings which are words.
Make an RDD of (word, 1) pairs from the last RDD of Strings.
Reduce the last RDD into (word, count) pairs such that each word is unique.
Count the number of unique words.
Sort the last RDD by count in descending order.
Make a loglog plot of word frequency vs rank for the last RDD. You'll need to convert it to an np:array.
Save the last word/count RDD to HDFS as "hdfs://localhost:9000/mydata/rcv1_counts"
Include code cells below this one containing all your code. Execute the cells, and include the output.
TIPS: Make sure you check your intermediate results after each step. e.g. use the "take()" method to look at the first few rows in each RDD. This will also force execution of any pending Spark queries, so that running time is spread out over your session.
This time the application console will contain information about every stage in the calculation you ran.
Take another look. The Event Timeline is particularly helpful for analyzing performance.
Submit this notebook to the HW7 submission link.
You can control-C from the terminal window where you started ipython to stop the notebook browser and the Spark context it contains. To stop Spark proper, cd to $SPARK_HOME
and do
sbin/stop-all.sh
Change to the $HADOOP_PREFIX
directory and do:
sbin/stop-dfs.sh
Note in theory you dont need to do this if you are going to use HDFS next time. HDFS runs as a service and should start automatically the next time your instance starts.
You can do this exercise in Scala from a spark-shell prompt. To do this, shutdown any IPython notebooks.
Make sure Spark and HDFS have been started as per the instructions above.
Start Spark shell and point it at the master service like this:
cd $SPARK_HOME bin/spark-shell --master spark://${HOSTNAME}:7077
From there follow the sequence of steps in "Your Mission" above. You may find it convenient to develop your queries in a script (a text file of scala lines) that you load with the :load filename
command from the Spark prompt.
Make a note of the time it takes to perform each step in the process using the Spark Application Console, which is once again on http://localhost:4040
Rerun the PySpark notebook and note the times for each step. Do you see a difference?
You should have enough information to debug most problems that might happen.