Running applications and data processing in Python

This tutorial provide useful tools in Python for: Script/code generation, running applicatios and operating system dependent functionalities, and working with files: reading, parsing, and writing.

The tutorial consists of three parts:

  • Introducing relevant Python modules and functions that are useful in the context of this tutorial
  • A complete example
  • Exercise

Dealing with the file system and path

The glob, os, and shutil modules provide important cross-platform functionalities to deal with the file system. To see the full documentation you can refer to these links:

http://docs.python.org/2/library/glob.html

http://docs.python.org/2/library/os.html

http://docs.python.org/2/library/shutil.html

In [ ]:
import os
from os.path import join as join_path
help(os.getcwd)
print("----------------------------------------------------------")
help(os.chdir)
print("----------------------------------------------------------")
help(os.makedirs)
print("----------------------------------------------------------")
help(os.listdir)
print("----------------------------------------------------------")
help(os.remove)
print("----------------------------------------------------------")
help(join_path)
print("----------------------------------------------------------")
help(os.path.abspath)
In [ ]:
# Change the curent directory to the location of the "Special_topics_part_1" directory
os.chdir("/Users/malastm/new_ACM_tutorials/ACM-Python-Tutorials-KAUST-2014/Special_topics_part_1/")

print("The current directory was       : " + os.getcwd())

tutorial_example_dir = os.path.join(os.getcwd(), "example")
os.chdir(tutorial_example_dir)
print("The current directory changed to: " + os.getcwd())

print("Files in this directory: ", os.listdir('.'))
In [ ]:
import glob

help(glob.glob)
In [ ]:
import shutil

help(shutil.rmtree)
In [ ]:
def ensure_dir(d):
    """ Create a directory and ignore the error if the directory already exist"""
    import os, errno
    try:
        os.makedirs(d)
    except OSError as exc:
        if exc.errno == errno.EEXIST:
            pass
        else: raise

def clean_all():
    """This function removes any generated previous result at this tutorial's example"""
    import glob, shutil, os
    for f in glob.glob('results/*.*'):
        os.remove(f)
    shutil.rmtree('results/patients', ignore_errors=True)

clean_all()

working with files: reading and writing

In [ ]:
help(open)
print("----------------------------------------------------------")
print("----------------------------------------------------------")
print(dir(file))
print("----------------------------------------------------------")
help(file.close)
print("----------------------------------------------------------")
help(file.read)
print("----------------------------------------------------------")
help(file.write)
print("----------------------------------------------------------")
help(file.readline)

A useful function to print the contents of a file given its name

In [ ]:
def print_file(f_name):
    """Print the contents of a file. The same operation of 'cat', the shell command"""
    f = open(f_name, 'r')
    print(f.read())
    f.close()

Complete example

In this example we will:

  • generate a script/code from a template,
  • use Python to run the script over several inputs to generate the raw results files,
  • extract/parse the desired information from the raw files and place them in a dictionary,
  • sort the data,
  • write the results table in a CSV file, and finally
  • archive the code file and information for reproducibility

Before generating a script, it is a good practice to test the base script before using it as a template.

The following script takes person's information and write them in a file using human-readable text

In [ ]:
print_file(join_path("scripts", "format_info.py") )

To run an application, a python script in our example, we use the subprocess module. It contains functions to spawn new processes, work with their input/output/error pipes, and read their return code.

Here we use check_call function at the module. It will raise an exception if the process does not terminate normally, that is, the return code is not zero.

For more information about the subprocess module see: http://docs.python.org/2/library/subprocess.html

In [ ]:
import subprocess

python_script = join_path("scripts", "format_info.py")
command = "Python " + python_script
arguments = " Ahmad 1.77 80 25"

ensure_dir('results')

print("We will run a process using this command: " + command + arguments)
sts = subprocess.check_call(command + arguments, shell=True)
print ("The return status of the process: " + str(sts))

Now read the resulting output file, then remove it

In [ ]:
print(os.listdir('results'))
In [ ]:
print_file(join_path("results", "patient_Ahmad_25.txt"))
os.remove(join_path("results", "patient_Ahmad_25.txt"))

Using the Template class for code generation

The following example uses the Template class to generate a python script from a template string.

The following code cell contains the template string of our target script. The string contains the constant parts that will remain the same and the template placeholders that will be replaced with the desired values. The $ is used to indicate template placeholders that will be replaced with the desired string. In this string we have two placeholders: formula and name.

The goal of this code generation is to generate a script that will compute additional information and include it in the output file.

Note: special care is required for special characters. For example the \n is replaced with \\n. Also, if the desired script contains $, it should be replaced with $$ in the template string.

In [ ]:
script_template_txt = """#!/usr/bin/env python

import sys

name = str(sys.argv[1])
height = float(sys.argv[2])
weight = float(sys.argv[3])
age = int(sys.argv[4])

val = $formula

f = open('results/patients/patient_' + name + '_' + str(age) + '.txt', 'w')

f.write( "patient's name: " + name + "\\n")
f.write( "patient's age: " + str(age) + " Years\\n")
f.write( "patient's Weight: " + str(weight) + " kgs\\n")
f.write( "patient's height: " + str(height) + " Meters\\n")

f.write( "patient's $name: %f \\n" % val)

f.close()
"""

Now we create a template object, named script_template. Then, the template variables are substituted with the Body Mass Index (BMI) formula. The output string is written out to a script file named calc_bmi.py

In [ ]:
from string import Template

script_template = Template(script_template_txt)
formula = "weight/height**2"
name = "BMI"
script_txt = script_template.substitute(formula=formula, name=name)

with open(join_path("results","calc_bmi.py"), 'w') as f:
    f.write(script_txt)

print_file(join_path("results","calc_bmi.py"))

Now create a directory for the results.

In [ ]:
ensure_dir(join_path("results", "patients"))

Here we test our generated script.

In [ ]:
python_script = join_path("results","calc_bmi.py")
command = "Python " + python_script
arguments = " Ahmad 1.77 80 25"

print("Execute: " + command + arguments)
subprocess.check_call(command + arguments, shell=True)

print_file(join_path("results", "patients", "patient_Ahmad_25.txt"))
os.remove(join_path("results", "patients", "patient_Ahmad_25.txt"))

Run the new script over several examples to generate the raw files in the patients directory

In [ ]:
patients = [('Williams',19,84,1.74), ('Johnson',23,82, 1.65), 
            ('Jones', 25, 70, 1.8), ('Jones', 29, 85, 1.66),
            ('Smith', 30, 120, 1.9), ('ahmad', 35, 50.5, 1.5)]

for n, a, w, h in patients:
    arguments = " %s %f %f %d" % (n, h, w, a)
    subprocess.check_call(command + arguments, shell=True)
In [ ]:
os.listdir(join_path('results', 'patients'))

Parsing the results

The following function takes an object of a raw file, extracts/parses the desired values, and returns an ordered dictionary of the entries.

In [ ]:
from collections import OrderedDict
def parse_bmi(fp):
    fields = [("patient's name", "name", str),("patient's age", "age", int),
              ("patient's Weight", "weight", float),("patient's height", "height", float),("patient's BMI", "bmi", float)]
    record = OrderedDict()
    for f in fields:
        record[f[1]] = 0
    for ln in fp:
        for f in fields:
            if f[0] in ln:
                # Using naive string parsing:
                #val = ln.split(":")[1].strip().split(' ')[0]
                val = ln.split(":")[1]
                val = val.strip()
                val = val.split(' ')[0]
                val = map(f[2], [val])[0]
                
                record[f[1]] = val

    return record

In the following two code blocks the list of available raw files is prepared. Then, the files are parsed and placed in a list of dictionaries

In [ ]:
files_list = glob.glob(join_path('results', 'patients', '*.txt') )
print (files_list)
In [ ]:
entries =[]
for rf in files_list:    
    with open(rf, 'r') as fp: 
        record = parse_bmi(fp)
        entries.append(record)

for i in entries: print(i)

Sorting the results

We use the sorted built-in function in Python. It returns a sorted list of the given iterable, a list of dictionaries in our example. In a flat list, for example [1,4,2], it is clear for sorting function what is greater/smaller to perform the sorting operation. Because we have a dictionary in each item of our list, the optional argument key is used to specify the value we would like to compare in the dictionary. key argument accepts a single argument function that returns the comparison value.

For more information see: http://docs.python.org/2/library/functions.html#sorted

In [ ]:
# We will use the itemgetter function to specify our soring key from the dictionary
from operator import itemgetter

d0 = entries[0]
print(d0)

ig = itemgetter('age', 'name')
print (ig(d0))
# This is equivalent to:
print(d0['age'], d0['name'])

Now we sort the records according to the age then the name

In [ ]:
from operator import itemgetter

entries = sorted(entries, key=itemgetter('age', 'name'))
In [ ]:
for i in entries: print(i)

Writing to a CSV file

In the following code block, the list of dictionaries is stored in a CSV file. We use the csv module for this purpose. It has many powerful functionalities to write/read CSV files. Here we use the DictWriter/DictReader classes to write/read data to/from CSV files. These two classes are specialized to deal with dictionary data types in python.

For more information see http://docs.python.org/2/library/csv.html.

In [ ]:
from csv import DictWriter
fields = entries[0].keys()

with open( join_path('results', 'records.csv'), 'w' ) as output_file:
        r = DictWriter(output_file,fieldnames=fields)
        r.writeheader()
        for k in entries:
            r.writerow(k)
In [ ]:
print_file( join_path('results', 'records.csv') )

Loading data from CSV file and and extracting data from the table

In [ ]:
from csv import DictReader
with open(join_path('results', 'records.csv'), 'rb') as output_file:
    data = DictReader(output_file)
    data = [k for k in data]
print(data)

Now extract the age and bmi values from the data

In [ ]:
points = []
for d in data:
    points.append((d['age'], d['bmi']))
print(points)

Archiving the source files

A good method to improve the reproducability of the experiment, is to create a tarball of all the source code and setup for each experiment. In a typical project, the tarball would contain all the source files, make files, build log file, and possibly the executable itself.

Here we have most of the source files in the notebook, so we do not have much to place in the tarball. The results here are also stored in the tarball, although it is not a good idea in a real life example.

In [ ]:
import tarfile
f_list = [join_path("results", "patients", "*.txt"), join_path("results", "*.py"), 
          join_path("results", "patients", "*.csv"), join_path("..", "*.ipynb")]
f_list = [glob.glob(n) for n in f_list]
f_list = [n for nn in f_list for n in nn]

f_name = join_path("results", "bmi.tar.gz")
print "Writing project files to:" + f_name
with tarfile.open(f_name, "w:gz") as tar:
    for n in f_list:
        print "Adding to the tar file: " + n
        tar.add(n)

Exercise

In this exercise you are asked to modify the example of this tutorial and generate additional script.

The new generated script will accept Cholesterol level measurements. The output of this script will save the patient's information in a file as we did at the BMI example. The line containing the Cholesterol value should also include the risk level as a string. For example, a Cholesterol value = 300 will be written to the file as follows:

patient's Cholesterol level: 300 mg/dL (High risk)

We have three categories of risk for the Cholesterol level:

Level mg/dL Interpretation
< 200 Low risk
200-240 Borderline high risk
> 240 High risk

the The following is required in this exercise:

  • Modify the script template to be able to generate both the BMI and Cholesterol level scripts. Because the script template will be different, you will have to change the arguments you pass to the template to generate the BMI script.
  • To further generalize the generated scripts (for both BMI and Cholesterol), the template will accept and use an argument containing the absolute path to the destination directory of the results file.
  • The BMI results will be written to results/bmi directory, instead of writing directly to results directory. Cholesterol results will similarly be written to results/cholesterol directory.
  • For each of the BMI and the Cholesterol, generate the scripts and run them to produce results files.
  • Create a CSV file for each of the BMI and Cholesterol experiments
  • Load the data from each of the CSV files, merge the data in one table, and write it again to a single CSV file

Solution

In [ ]:
# Repeating some functions definitions if only the exercise is executed
def ensure_dir(d):
    """ Create a directory and ignore the error if the directory already exist"""
    import os, errno
    try:
        os.makedirs(d)
    except OSError as exc:
        if exc.errno == errno.EEXIST:
            pass
        else: raise
def print_file(f_name):
    """Print the contents of a file. The same operation of 'cat', the shell command"""
    f = open(f_name, 'r')
    print(f.read())
    f.close()
In [ ]:
import os
from os.path import join as join_path
from string import Template

# Full path to the special topics part 1 directory
os.chdir("/Users/malastm/new_ACM_tutorials/ACM-Python-Tutorials-KAUST-2014/Special_topics_part_1/")

os.chdir(join_path(os.getcwd(), "exercise"))
print("The current directory changed to: " + os.getcwd())
print("Files in this directory: ", os.listdir('.'))

script_template_txt = """#!/usr/bin/env python

import sys

name = str(sys.argv[1])
height = float(sys.argv[2])
weight = float(sys.argv[3])
age = int(sys.argv[4])
path = str(sys.argv[5])

$comp

f = open( path + 'patient_' + name + '_' + str(age) + '.txt', 'w')

f.write( "patient's name: " + name + "\\n")
f.write( "patient's age: " + str(age) + " Years\\n")
f.write( "patient's Weight: " + str(weight) + " kgs\\n")
f.write( "patient's height: " + str(height) + " Meters\\n")

$file_write

f.close()
"""
In [ ]:
# First we generate the script for the BMI and test it
script_template = Template(script_template_txt)
comp = "val = weight/height**2"
file_write = 'f.write( "patient\'s BMI: " + str(val) + "\\n")'
script_txt = script_template.substitute(comp=comp, file_write=file_write)

ensure_dir(join_path('results', 'bmi'))
with open(join_path("results", "bmi","calc_bmi.py"), 'w') as f:
    f.write(script_txt)

print_file(join_path("results", "bmi","calc_bmi.py"))
In [ ]:
# Test the BMI script

import subprocess
bmi_results_path = join_path(os.getcwd(), "results", 'bmi', 'patients', '') 
command = "Python " + join_path("results", "bmi","calc_bmi.py")
arguments = " Ahmad 1.77 80 25 " + bmi_results_path
ensure_dir(join_path("results", 'bmi', 'patients'))


print("Execute: " + command + arguments)
subprocess.check_call(command + arguments, shell=True)

print_file(join_path("results", "bmi", "patients", "patient_Ahmad_25.txt"))
os.remove(join_path("results", "bmi", "patients", "patient_Ahmad_25.txt"))
In [ ]:
# Generate the script for the Cholesterol and test it
comp = """cholesterol = float(sys.argv[6])
if cholesterol < 200:
    risk = 'Low risk'
elif cholesterol < 240:
    risk = 'Borderline high risk'
else:
    risk = 'High risk'
"""
file_write = 'f.write( "patient\'s Cholesterol: " + str(cholesterol) + " (" + risk + ")\\n")'

script_txt = script_template.substitute(comp=comp, file_write=file_write)

ensure_dir(join_path("results", 'cholesterol'))
with open(join_path("results", "cholesterol","calc_cholesterol.py"), 'w') as f:
    f.write(script_txt)

print_file(join_path("results", "cholesterol","calc_cholesterol.py"))
In [ ]:
# Test the Cholesterol script
chol_results_path = join_path(os.getcwd(), "results", 'cholesterol', 'patients', '') 

command = "Python " + join_path("results", "cholesterol","calc_cholesterol.py")
arguments = " Ahmad 1.77 80 25 " + chol_results_path + " 100"
ensure_dir(join_path("results", 'cholesterol', 'patients'))


print("Execute: " + command + arguments)
subprocess.check_call(command + arguments, shell=True)

print_file(join_path("results", "cholesterol", "patients", "patient_Ahmad_25.txt"))
os.remove(join_path("results", "cholesterol", "patients", "patient_Ahmad_25.txt"))
In [ ]:
# Generate the BMI results

patients = [('Williams',19,84,1.74), ('Johnson',23,82, 1.65), 
            ('Jones', 25, 70, 1.8), ('Jones', 29, 85, 1.66),
            ('Smith', 30, 120, 1.9), ('ahmad', 35, 50.5, 1.5)]

command = "Python " + join_path("results", "bmi","calc_bmi.py")

for n, a, w, h in patients:
    arguments = " %s %f %f %d " % (n, h, w, a) + bmi_results_path
    subprocess.check_call(command + arguments, shell=True)

os.listdir(bmi_results_path)
In [ ]:
# Generate the Cholesterol results

patients = [('Williams',19,84,1.74, 150), ('Johnson',23,82, 1.65, 220), 
            ('Jones', 25, 70, 1.8, 200), ('Jones', 29, 85, 1.66, 250),
            ('Smith', 30, 120, 1.9, 210), ('ahmad', 35, 50.5, 1.5, 260)]

command = "Python " + join_path("results", "cholesterol","calc_cholesterol.py")

for n, a, w, h, c in patients:
    arguments = " %s %f %f %d " % (n, h, w, a) + chol_results_path + " %f" % (c)
    subprocess.check_call(command + arguments, shell=True)

os.listdir(chol_results_path)
In [ ]:
def write_to_csv(data, f_name):
    from csv import DictWriter
    fields = data[0].keys()
    
    with open( f_name, 'w' ) as output_file:
            r = DictWriter(output_file,fieldnames=fields)
            r.writeheader()
            for k in data:
                r.writerow(k)
In [ ]:
# Repeating this function from the example
from collections import OrderedDict
def parse_bmi(fp):
    fields = [("patient's name", "name", str),("patient's age", "age", int),
              ("patient's Weight", "weight", float),("patient's height", "height", float),("patient's BMI", "bmi", float)]
    record = OrderedDict()
    for f in fields:
        record[f[1]] = 0
    for ln in fp:
        for f in fields:
            if f[0] in ln:
                # Using naive string parsing:
                #val = ln.split(":")[1].strip().split(' ')[0]
                val = ln.split(":")[1]
                val = val.strip()
                val = val.split(' ')[0]
                val = map(f[2], [val])[0]
                
                record[f[1]] = val

    return record
In [ ]:
# Parse the BMI results using the BMI parsing script then store to the CSV file
import glob
files_list = glob.glob(join_path("results", 'bmi', 'patients', '*.txt') )
print (files_list)

entries =[]
for rf in files_list:    
    with open(rf, 'r') as fp: 
        record = parse_bmi(fp)
        entries.append(record)

for i in entries: print(i)
    
write_to_csv(entries, join_path("results", 'bmi', 'patients', 'records.csv'))
print_file(join_path("results", 'bmi', 'patients', 'records.csv'))
In [ ]:
# Create a parsing function for the cholesterol results
from collections import OrderedDict
def parse_cholesterol(fp):
    fields = [("patient's name", "name", str),("patient's age", "age", int),
              ("patient's Weight", "weight", float),("patient's height", "height", float),("patient's Cholesterol", "chol", float)]
    record = OrderedDict()
    for f in fields:
        record[f[1]] = 0
    for ln in fp:
        for f in fields:
            if f[0] in ln:
                # Using naive string parsing:
                #val = ln.split(":")[1].strip().split(' ')[0]
                val = ln.split(":")[1]
                val = val.strip()
                val = val.split(' ')[0]
                val = map(f[2], [val])[0]
                record[f[1]] = val
        
        # Handle the special case that extract the risk string from the cholestrol line
        if "patient's Cholesterol" in ln:
            val = ln.split(":")[1]
            val = val.strip()
            val = val.split(' ',1)[1][1:-1]
            val = map(str, [val])[0]
            record['chol risk'] = val        

    return record
In [ ]:
# Parse the Cholesterol results using the Cholesterol parsing script
import glob
files_list = glob.glob(join_path("results", 'cholesterol', 'patients', '*.txt') )
print (files_list)

entries =[]
for rf in files_list:    
    with open(rf, 'r') as fp: 
        record = parse_cholesterol(fp)
        entries.append(record)

for i in entries: print(i)

write_to_csv(entries, join_path("results", 'cholesterol', 'patients', 'records.csv'))
print_file(join_path("results", 'cholesterol', 'patients', 'records.csv'))
In [ ]:
# Read both CSV files, merge the tables, and store all the data in a single CSV file
from csv import DictReader
from operator import itemgetter

with open(join_path('results', 'bmi', 'patients', 'records.csv'), 'rb') as output_file:
    data = DictReader(output_file)
    bmi_data = [k for k in data]

with open(join_path('results', 'cholesterol', 'patients', 'records.csv'), 'rb') as output_file:
    data = DictReader(output_file)
    chol_data = [k for k in data]

d0 = entries[0]
print(d0)

commons = itemgetter('age', 'name' ,'weight' , 'height')
# for each record in the cholesterol results add the BMI field to it from the bmi record of the same patient
all_data = []
for dc in chol_data:
    for db in bmi_data:
        if commons(db) == commons(dc):
            dc['bmi'] = db['bmi']
    all_data.append(dc)

write_to_csv(all_data, join_path("results", 'records_all_data.csv'))

print_file("results/records_all_data.csv")

Copyright 2014, Tareq Malas, ACM Student Member.