This tutorial provide useful tools in Python for: Script/code generation, running applicatios and operating system dependent functionalities, and working with files: reading, parsing, and writing.
The tutorial consists of three parts:
The glob
, os
, and shutil
modules provide important cross-platform functionalities to deal with the file system. To see the full documentation you can refer to these links:
http://docs.python.org/2/library/glob.html
import os
from os.path import join as join_path
help(os.getcwd)
print("----------------------------------------------------------")
help(os.chdir)
print("----------------------------------------------------------")
help(os.makedirs)
print("----------------------------------------------------------")
help(os.listdir)
print("----------------------------------------------------------")
help(os.remove)
print("----------------------------------------------------------")
help(join_path)
print("----------------------------------------------------------")
help(os.path.abspath)
# Change the curent directory to the location of the "Special_topics_part_1" directory
os.chdir("/Users/malastm/new_ACM_tutorials/ACM-Python-Tutorials-KAUST-2014/Special_topics_part_1/")
print("The current directory was : " + os.getcwd())
tutorial_example_dir = os.path.join(os.getcwd(), "example")
os.chdir(tutorial_example_dir)
print("The current directory changed to: " + os.getcwd())
print("Files in this directory: ", os.listdir('.'))
import glob
help(glob.glob)
import shutil
help(shutil.rmtree)
def ensure_dir(d):
""" Create a directory and ignore the error if the directory already exist"""
import os, errno
try:
os.makedirs(d)
except OSError as exc:
if exc.errno == errno.EEXIST:
pass
else: raise
def clean_all():
"""This function removes any generated previous result at this tutorial's example"""
import glob, shutil, os
for f in glob.glob('results/*.*'):
os.remove(f)
shutil.rmtree('results/patients', ignore_errors=True)
clean_all()
help(open)
print("----------------------------------------------------------")
print("----------------------------------------------------------")
print(dir(file))
print("----------------------------------------------------------")
help(file.close)
print("----------------------------------------------------------")
help(file.read)
print("----------------------------------------------------------")
help(file.write)
print("----------------------------------------------------------")
help(file.readline)
def print_file(f_name):
"""Print the contents of a file. The same operation of 'cat', the shell command"""
f = open(f_name, 'r')
print(f.read())
f.close()
In this example we will:
Before generating a script, it is a good practice to test the base script before using it as a template.
The following script takes person's information and write them in a file using human-readable text
print_file(join_path("scripts", "format_info.py") )
To run an application, a python script in our example, we use the subprocess
module. It contains functions to spawn new processes, work with their input/output/error pipes, and read their return code.
Here we use check_call
function at the module. It will raise an exception if the process does not terminate normally, that is, the return code is not zero.
For more information about the subprocess
module see: http://docs.python.org/2/library/subprocess.html
import subprocess
python_script = join_path("scripts", "format_info.py")
command = "Python " + python_script
arguments = " Ahmad 1.77 80 25"
ensure_dir('results')
print("We will run a process using this command: " + command + arguments)
sts = subprocess.check_call(command + arguments, shell=True)
print ("The return status of the process: " + str(sts))
Now read the resulting output file, then remove it
print(os.listdir('results'))
print_file(join_path("results", "patient_Ahmad_25.txt"))
os.remove(join_path("results", "patient_Ahmad_25.txt"))
The following example uses the Template class to generate a python script from a template string.
The following code cell contains the template string of our target script. The string contains the constant parts that will remain the same and the template placeholders that will be replaced with the desired values. The $
is used to indicate template placeholders that will be replaced with the desired string. In this string we have two placeholders: formula
and name
.
The goal of this code generation is to generate a script that will compute additional information and include it in the output file.
Note: special care is required for special characters. For example the \n
is replaced with \\n
. Also, if the desired script contains $
, it should be replaced with $$
in the template string.
script_template_txt = """#!/usr/bin/env python
import sys
name = str(sys.argv[1])
height = float(sys.argv[2])
weight = float(sys.argv[3])
age = int(sys.argv[4])
val = $formula
f = open('results/patients/patient_' + name + '_' + str(age) + '.txt', 'w')
f.write( "patient's name: " + name + "\\n")
f.write( "patient's age: " + str(age) + " Years\\n")
f.write( "patient's Weight: " + str(weight) + " kgs\\n")
f.write( "patient's height: " + str(height) + " Meters\\n")
f.write( "patient's $name: %f \\n" % val)
f.close()
"""
Now we create a template object, named script_template
. Then, the template variables are substituted with the Body Mass Index (BMI) formula. The output string is written out to a script file named calc_bmi.py
from string import Template
script_template = Template(script_template_txt)
formula = "weight/height**2"
name = "BMI"
script_txt = script_template.substitute(formula=formula, name=name)
with open(join_path("results","calc_bmi.py"), 'w') as f:
f.write(script_txt)
print_file(join_path("results","calc_bmi.py"))
Now create a directory for the results.
ensure_dir(join_path("results", "patients"))
Here we test our generated script.
python_script = join_path("results","calc_bmi.py")
command = "Python " + python_script
arguments = " Ahmad 1.77 80 25"
print("Execute: " + command + arguments)
subprocess.check_call(command + arguments, shell=True)
print_file(join_path("results", "patients", "patient_Ahmad_25.txt"))
os.remove(join_path("results", "patients", "patient_Ahmad_25.txt"))
Run the new script over several examples to generate the raw files in the patients
directory
patients = [('Williams',19,84,1.74), ('Johnson',23,82, 1.65),
('Jones', 25, 70, 1.8), ('Jones', 29, 85, 1.66),
('Smith', 30, 120, 1.9), ('ahmad', 35, 50.5, 1.5)]
for n, a, w, h in patients:
arguments = " %s %f %f %d" % (n, h, w, a)
subprocess.check_call(command + arguments, shell=True)
os.listdir(join_path('results', 'patients'))
The following function takes an object of a raw file, extracts/parses the desired values, and returns an ordered dictionary of the entries.
from collections import OrderedDict
def parse_bmi(fp):
fields = [("patient's name", "name", str),("patient's age", "age", int),
("patient's Weight", "weight", float),("patient's height", "height", float),("patient's BMI", "bmi", float)]
record = OrderedDict()
for f in fields:
record[f[1]] = 0
for ln in fp:
for f in fields:
if f[0] in ln:
# Using naive string parsing:
#val = ln.split(":")[1].strip().split(' ')[0]
val = ln.split(":")[1]
val = val.strip()
val = val.split(' ')[0]
val = map(f[2], [val])[0]
record[f[1]] = val
return record
In the following two code blocks the list of available raw files is prepared. Then, the files are parsed and placed in a list of dictionaries
files_list = glob.glob(join_path('results', 'patients', '*.txt') )
print (files_list)
entries =[]
for rf in files_list:
with open(rf, 'r') as fp:
record = parse_bmi(fp)
entries.append(record)
for i in entries: print(i)
We use the sorted
built-in function in Python. It returns a sorted list of the given iterable, a list of dictionaries in our example. In a flat list, for example [1,4,2]
, it is clear for sorting function what is greater/smaller to perform the sorting operation. Because we have a dictionary in each item of our list, the optional argument key
is used to specify the value we would like to compare in the dictionary. key
argument accepts a single argument function that returns the comparison value.
For more information see: http://docs.python.org/2/library/functions.html#sorted
# We will use the itemgetter function to specify our soring key from the dictionary
from operator import itemgetter
d0 = entries[0]
print(d0)
ig = itemgetter('age', 'name')
print (ig(d0))
# This is equivalent to:
print(d0['age'], d0['name'])
Now we sort the records according to the age then the name
from operator import itemgetter
entries = sorted(entries, key=itemgetter('age', 'name'))
for i in entries: print(i)
In the following code block, the list of dictionaries is stored in a CSV file. We use the csv
module for this purpose. It has many powerful functionalities to write/read CSV files. Here we use the DictWriter/DictReader classes to write/read data to/from CSV files. These two classes are specialized to deal with dictionary data types in python.
For more information see http://docs.python.org/2/library/csv.html.
from csv import DictWriter
fields = entries[0].keys()
with open( join_path('results', 'records.csv'), 'w' ) as output_file:
r = DictWriter(output_file,fieldnames=fields)
r.writeheader()
for k in entries:
r.writerow(k)
print_file( join_path('results', 'records.csv') )
from csv import DictReader
with open(join_path('results', 'records.csv'), 'rb') as output_file:
data = DictReader(output_file)
data = [k for k in data]
print(data)
Now extract the age
and bmi
values from the data
points = []
for d in data:
points.append((d['age'], d['bmi']))
print(points)
A good method to improve the reproducability of the experiment, is to create a tarball of all the source code and setup for each experiment. In a typical project, the tarball would contain all the source files, make files, build log file, and possibly the executable itself.
Here we have most of the source files in the notebook, so we do not have much to place in the tarball. The results here are also stored in the tarball, although it is not a good idea in a real life example.
import tarfile
f_list = [join_path("results", "patients", "*.txt"), join_path("results", "*.py"),
join_path("results", "patients", "*.csv"), join_path("..", "*.ipynb")]
f_list = [glob.glob(n) for n in f_list]
f_list = [n for nn in f_list for n in nn]
f_name = join_path("results", "bmi.tar.gz")
print "Writing project files to:" + f_name
with tarfile.open(f_name, "w:gz") as tar:
for n in f_list:
print "Adding to the tar file: " + n
tar.add(n)
In this exercise you are asked to modify the example of this tutorial and generate additional script.
The new generated script will accept Cholesterol level measurements. The output of this script will save the patient's information in a file as we did at the BMI example. The line containing the Cholesterol value should also include the risk level as a string. For example, a Cholesterol value = 300 will be written to the file as follows:
patient's Cholesterol level: 300 mg/dL (High risk)
We have three categories of risk for the Cholesterol level:
Level mg/dL | Interpretation |
---|---|
< 200 | Low risk |
200-240 | Borderline high risk |
> 240 | High risk |
the The following is required in this exercise:
results/bmi
directory, instead of writing directly to results
directory. Cholesterol results will similarly be written to results/cholesterol
directory.# Repeating some functions definitions if only the exercise is executed
def ensure_dir(d):
""" Create a directory and ignore the error if the directory already exist"""
import os, errno
try:
os.makedirs(d)
except OSError as exc:
if exc.errno == errno.EEXIST:
pass
else: raise
def print_file(f_name):
"""Print the contents of a file. The same operation of 'cat', the shell command"""
f = open(f_name, 'r')
print(f.read())
f.close()
import os
from os.path import join as join_path
from string import Template
# Full path to the special topics part 1 directory
os.chdir("/Users/malastm/new_ACM_tutorials/ACM-Python-Tutorials-KAUST-2014/Special_topics_part_1/")
os.chdir(join_path(os.getcwd(), "exercise"))
print("The current directory changed to: " + os.getcwd())
print("Files in this directory: ", os.listdir('.'))
script_template_txt = """#!/usr/bin/env python
import sys
name = str(sys.argv[1])
height = float(sys.argv[2])
weight = float(sys.argv[3])
age = int(sys.argv[4])
path = str(sys.argv[5])
$comp
f = open( path + 'patient_' + name + '_' + str(age) + '.txt', 'w')
f.write( "patient's name: " + name + "\\n")
f.write( "patient's age: " + str(age) + " Years\\n")
f.write( "patient's Weight: " + str(weight) + " kgs\\n")
f.write( "patient's height: " + str(height) + " Meters\\n")
$file_write
f.close()
"""
# First we generate the script for the BMI and test it
script_template = Template(script_template_txt)
comp = "val = weight/height**2"
file_write = 'f.write( "patient\'s BMI: " + str(val) + "\\n")'
script_txt = script_template.substitute(comp=comp, file_write=file_write)
ensure_dir(join_path('results', 'bmi'))
with open(join_path("results", "bmi","calc_bmi.py"), 'w') as f:
f.write(script_txt)
print_file(join_path("results", "bmi","calc_bmi.py"))
# Test the BMI script
import subprocess
bmi_results_path = join_path(os.getcwd(), "results", 'bmi', 'patients', '')
command = "Python " + join_path("results", "bmi","calc_bmi.py")
arguments = " Ahmad 1.77 80 25 " + bmi_results_path
ensure_dir(join_path("results", 'bmi', 'patients'))
print("Execute: " + command + arguments)
subprocess.check_call(command + arguments, shell=True)
print_file(join_path("results", "bmi", "patients", "patient_Ahmad_25.txt"))
os.remove(join_path("results", "bmi", "patients", "patient_Ahmad_25.txt"))
# Generate the script for the Cholesterol and test it
comp = """cholesterol = float(sys.argv[6])
if cholesterol < 200:
risk = 'Low risk'
elif cholesterol < 240:
risk = 'Borderline high risk'
else:
risk = 'High risk'
"""
file_write = 'f.write( "patient\'s Cholesterol: " + str(cholesterol) + " (" + risk + ")\\n")'
script_txt = script_template.substitute(comp=comp, file_write=file_write)
ensure_dir(join_path("results", 'cholesterol'))
with open(join_path("results", "cholesterol","calc_cholesterol.py"), 'w') as f:
f.write(script_txt)
print_file(join_path("results", "cholesterol","calc_cholesterol.py"))
# Test the Cholesterol script
chol_results_path = join_path(os.getcwd(), "results", 'cholesterol', 'patients', '')
command = "Python " + join_path("results", "cholesterol","calc_cholesterol.py")
arguments = " Ahmad 1.77 80 25 " + chol_results_path + " 100"
ensure_dir(join_path("results", 'cholesterol', 'patients'))
print("Execute: " + command + arguments)
subprocess.check_call(command + arguments, shell=True)
print_file(join_path("results", "cholesterol", "patients", "patient_Ahmad_25.txt"))
os.remove(join_path("results", "cholesterol", "patients", "patient_Ahmad_25.txt"))
# Generate the BMI results
patients = [('Williams',19,84,1.74), ('Johnson',23,82, 1.65),
('Jones', 25, 70, 1.8), ('Jones', 29, 85, 1.66),
('Smith', 30, 120, 1.9), ('ahmad', 35, 50.5, 1.5)]
command = "Python " + join_path("results", "bmi","calc_bmi.py")
for n, a, w, h in patients:
arguments = " %s %f %f %d " % (n, h, w, a) + bmi_results_path
subprocess.check_call(command + arguments, shell=True)
os.listdir(bmi_results_path)
# Generate the Cholesterol results
patients = [('Williams',19,84,1.74, 150), ('Johnson',23,82, 1.65, 220),
('Jones', 25, 70, 1.8, 200), ('Jones', 29, 85, 1.66, 250),
('Smith', 30, 120, 1.9, 210), ('ahmad', 35, 50.5, 1.5, 260)]
command = "Python " + join_path("results", "cholesterol","calc_cholesterol.py")
for n, a, w, h, c in patients:
arguments = " %s %f %f %d " % (n, h, w, a) + chol_results_path + " %f" % (c)
subprocess.check_call(command + arguments, shell=True)
os.listdir(chol_results_path)
def write_to_csv(data, f_name):
from csv import DictWriter
fields = data[0].keys()
with open( f_name, 'w' ) as output_file:
r = DictWriter(output_file,fieldnames=fields)
r.writeheader()
for k in data:
r.writerow(k)
# Repeating this function from the example
from collections import OrderedDict
def parse_bmi(fp):
fields = [("patient's name", "name", str),("patient's age", "age", int),
("patient's Weight", "weight", float),("patient's height", "height", float),("patient's BMI", "bmi", float)]
record = OrderedDict()
for f in fields:
record[f[1]] = 0
for ln in fp:
for f in fields:
if f[0] in ln:
# Using naive string parsing:
#val = ln.split(":")[1].strip().split(' ')[0]
val = ln.split(":")[1]
val = val.strip()
val = val.split(' ')[0]
val = map(f[2], [val])[0]
record[f[1]] = val
return record
# Parse the BMI results using the BMI parsing script then store to the CSV file
import glob
files_list = glob.glob(join_path("results", 'bmi', 'patients', '*.txt') )
print (files_list)
entries =[]
for rf in files_list:
with open(rf, 'r') as fp:
record = parse_bmi(fp)
entries.append(record)
for i in entries: print(i)
write_to_csv(entries, join_path("results", 'bmi', 'patients', 'records.csv'))
print_file(join_path("results", 'bmi', 'patients', 'records.csv'))
# Create a parsing function for the cholesterol results
from collections import OrderedDict
def parse_cholesterol(fp):
fields = [("patient's name", "name", str),("patient's age", "age", int),
("patient's Weight", "weight", float),("patient's height", "height", float),("patient's Cholesterol", "chol", float)]
record = OrderedDict()
for f in fields:
record[f[1]] = 0
for ln in fp:
for f in fields:
if f[0] in ln:
# Using naive string parsing:
#val = ln.split(":")[1].strip().split(' ')[0]
val = ln.split(":")[1]
val = val.strip()
val = val.split(' ')[0]
val = map(f[2], [val])[0]
record[f[1]] = val
# Handle the special case that extract the risk string from the cholestrol line
if "patient's Cholesterol" in ln:
val = ln.split(":")[1]
val = val.strip()
val = val.split(' ',1)[1][1:-1]
val = map(str, [val])[0]
record['chol risk'] = val
return record
# Parse the Cholesterol results using the Cholesterol parsing script
import glob
files_list = glob.glob(join_path("results", 'cholesterol', 'patients', '*.txt') )
print (files_list)
entries =[]
for rf in files_list:
with open(rf, 'r') as fp:
record = parse_cholesterol(fp)
entries.append(record)
for i in entries: print(i)
write_to_csv(entries, join_path("results", 'cholesterol', 'patients', 'records.csv'))
print_file(join_path("results", 'cholesterol', 'patients', 'records.csv'))
# Read both CSV files, merge the tables, and store all the data in a single CSV file
from csv import DictReader
from operator import itemgetter
with open(join_path('results', 'bmi', 'patients', 'records.csv'), 'rb') as output_file:
data = DictReader(output_file)
bmi_data = [k for k in data]
with open(join_path('results', 'cholesterol', 'patients', 'records.csv'), 'rb') as output_file:
data = DictReader(output_file)
chol_data = [k for k in data]
d0 = entries[0]
print(d0)
commons = itemgetter('age', 'name' ,'weight' , 'height')
# for each record in the cholesterol results add the BMI field to it from the bmi record of the same patient
all_data = []
for dc in chol_data:
for db in bmi_data:
if commons(db) == commons(dc):
dc['bmi'] = db['bmi']
all_data.append(dc)
write_to_csv(all_data, join_path("results", 'records_all_data.csv'))
print_file("results/records_all_data.csv")
Copyright 2014, Tareq Malas, ACM Student Member.