import csv with open("sample.txt", 'rb') as f1: reader = csv.reader(f1) for line in reader: print line !cat sample.txt %%writefile max_temperature_map.py #!/usr/bin/env python class max_temp_map(Mapreduce): ''' A simple doc string ''' import re import sys for line in sys.stdin: val = line.strip() (year, temp, q) = (val[15:19], val[87:92], val[92:93]) if (temp != "+9999" and re.match("[01459]", q)): print "%s\t%s" % (year, temp) %%writefile max_temperature_reduce.py #!/usr/bin/env python class max_temp_reduce(Mapred): ''' ''' import sys (last_key, max_val) = (None, -sys.maxint) for line in sys.stdin: (key, val) = line.strip().split("\t") if last_key and last_key != key: print "%s\t%s" % (last_key, max_val) (last_key, max_val) = (key, int(val)) else: (last_key, max_val) = (key, max(max_val, int(val))) if last_key: print "%s\t%s" % (last_key, max_val) !cat sample.txt | max_temperature_map.py | \sort | max_temperature_reduce.py %%cmd hadoop jar HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \ -input sample.txt \ -output output \ -mapper "max_temperature_map.py | sort | max_temperature_reduce.py" \ -reducer max_temperature_reduce.py Great but that process was a little time consuming Lets try this out in Pig %%writefile max_temp.pig -- max_temp.pig: Finds the maximum temperature by year records = LOAD 'input/ncdc/micro-tab/sample.txt' AS (year:chararray, temperature:int, quality:int); filtered_records = FILTER records BY temperature != 9999 AND (quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9); grouped_records = GROUP filtered_records BY year; max_temp = FOREACH grouped_records GENERATE group, MAX(filtered_records.temperature); DUMP max_temp; !pig -x local !pig max_temp.pig cd C:\\Users\\Andrew\\Documents\\Hadoop\\Karung/ !git add Karung.ipynb !git commit