#!/usr/bin/env python
# coding: utf-8
# # [Image Analyzer](#section1)
#
# ## Example using PySpark and Scikit-learn
# * Follow the steps in run_helper.sh, while using the default hdfs file naming in config.yaml
# * Then run each ipython notebook cell below to look at results
# * If the notebook examples do not work, check your config.yaml against the config shown at the bottom of the notebook
# * This notebook uses the example faces image data set of Dr. Libor Spacek
# In[1]:
get_ipython().run_line_magic('env', 'JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64')
get_ipython().run_line_magic('matplotlib', 'inline')
from __future__ import print_function, division
from pyspark import SparkConf
from pyspark import SparkContext
from StringIO import StringIO
import matplotlib.pyplot as plt
import numpy as np
from pprint import pprint
conf = SparkConf()
conf.set('spark.executor.instances', 10)
sc = SparkContext()
# ### [Example of each image's output](#map_each_image)
#
# #### These measurements are done for training and candidate images
# #### On each training or candidate image, the measurements may also be applied to patches
# #### The relevant code for this is map_each_image.py. (The function "example" in map_each_image.py can do these measurements locally on an image file.)
# * Kmeans centroids
# * Histogram
# * Perceptive hashes (abbrev.)
# * Ward cluster hashes (abbrev.) (hashing the output of the function seen in this scikit demo
# * Prinicipal components
# In[2]:
example = sc.pickleFile('hdfs:///t1/map_each_image/measures').take(1)[0]
print("Keys:", example[1].keys())
print("Centroids in 1 image flattened:", example[1]['cen'])
print("Histogram flattened:", example[1]['histo'])
print("Perceptive hash (abbrev.):", example[1]['phash'][:5])
print("Ward cluster hash (abbrev.):", example[1]['ward'][:5])
print('PCA factors and variance', example[1]['pca_fac'], example[1]['pca_var'])
# ### Based on the data above for each image, a kmeans algorithm is run for all training images
# #### The kmeans algorithm also tracks the most common perceptive hashes and ward cluster hashes
# #### image_mapper.py has the iterative kmeans loop on all images
# * This shows cluster to hash lookups
# In[3]:
print('Kmeans cluster to perceptive hash')
pprint(sc.pickleFile("hdfs:///t1/km/cluster_to_phash").take(2))
print('Kmeans cluster to Ward cluster hash')
pprint(sc.pickleFile("hdfs:///t1/km/cluster_to_phash").take(2))
# ### [Also we save the inverse mappings of hash to cluster](#mappings)
#
# #### This provides several ways to search for images by hash or kmeans cluster
# #### In config.yaml, see the kmeans_output dictionary that controls which lookup tables are created
# In[4]:
print('Perceptive hash to kmeans cluster')
pprint(sc.pickleFile("hdfs:///t1/km/phash_to_cluster").take(2))
print("Ward cluster hash to kmeans cluster")
pprint(sc.pickleFile("hdfs:///t1/km/ward_to_cluster").take(2))
# ### [Hash counts in kmeans clusters](#hash_counts)
#
# #### A dictionary for each kmeans cluster counts the most common N hashes per cluster
# * This shows the top ward cluster hashes in kmeans cluster with index 0
# In[5]:
clust0_ward = sc.pickleFile('hdfs:////t1/km/ward_unions').take(1)[0]
pprint({k:v for k,v in clust0_ward.items()})
# ### Using joins to finally get a matching image name
# #### These examples are ward hash to image key and perceptive hash to image key mappings
# In[6]:
print('ward_to_key\n')
pprint(sc.pickleFile('hdfs:///t1/km/ward_to_key').take(2))
print('\n\nphash_to_key\n')
pprint(sc.pickleFile('hdfs:///t1/km/phash_to_key').take(2))
# ### [Joins lead to a number of potentially matching images](#vote_count)
#
# * The example below shows the number of ward hash chunks matching a candidate
# * The candidate has a path name /fuzzy/
# * The others are the originals in /imgs/
# In[7]:
ward_matches = sc.pickleFile('hdfs:///t1/candidates/c1/ward_to_key_counts')
phash_matches = sc.pickleFile('hdfs:///t1/candidates/c1/phash_to_key_counts')
wm = ward_matches.collect()
pm = phash_matches.collect()
# In[8]:
joined = ward_matches.fullOuterJoin(phash_matches)
def best_votes(x):
if x[1][0] is None:
d1 = {}
else:
d1 = x[1][0][1]
if x[1][1] is None:
d2 = {}
else:
d2 = x[1][1][1]
for k in d1:
if k not in d2:
d2[k] = d1[k]
else:
d2[k] += d1[k]
d3 = sorted(d2.items(), key=lambda x:x[1])[-1]
return x[0],(d3,d2)
phash_and_ward = joined.map(best_votes).collect()
print("Example votes dictionary:")
pprint(phash_and_ward[0])
# #### [ Loading and comparing historical and matched images ](#matches)
#
# In[9]:
def load_image(image):
"""Load one image, where image = (key, blob)"""
from StringIO import StringIO
from PIL import Image
img_quads = []
img = Image.open(StringIO(image[1]))
return image[0], np.asarray(img, dtype=np.uint8)
for p in phash_and_ward:
cname, candidate = load_image(sc.binaryFiles(p[0]).collect()[0])
mname, matched = load_image(sc.binaryFiles(p[1][0][0]).collect()[0])
print("Candidate (fuzzy) " , cname)
plt.subplot(1,2,1)
plt.imshow(candidate)
print("Matched (original) " ,mname)
plt.subplot(1,2,2)
plt.imshow(matched)
plt.show()
# #### ['config' below shows the yaml config file loaded for this example](#config)
#
# In[11]:
config = {'actions': ['map_each_image', 'kmeans', 'find_similar'],
'candidate_batch': 'c1',
'candidate_has_mapped': False,
'candidate_measures_spec': '/t1/candidates/c1/measures',
'candidate_spec': '/fuzzy/*',
'example_data': '/imgs/',
'fuzzy_example_data': '/fuzzy/',
'in_memory_set_len': 8000000,
'input_spec': '/imgs/*',
'kmeans_group_converge': 10000,
'kmeans_output': {'cluster_to_flattened': True,
'cluster_to_key': True,
'cluster_to_phash': True,
'cluster_to_ward': True,
'flattened_to_cluster': True,
'flattened_to_key': True,
'flattened_to_phash': True,
'key_to_cluster': True,
'key_to_phash': True,
'phash_to_cluster': True,
'phash_to_flattened': True,
'phash_to_key': True,
'ward_to_cluster': True,
'ward_to_key': True},
'kmeans_sample': 2000,
'maxIterations': 15,
'max_iter_group': 10,
'n_clusters': 12,
'n_clusters_group': 8,
'patch': {'max_patches': 4,
'random_state': 0,
'window_as_fraction': [0.5, 0.5]},
'phash_bits': 128,
'phash_chunk_len': 2,
'quantiles': [5, 15, 25, 50, 75, 95],
'search_rounds': 1,
'search_sample_step': 100,
'test_name': 't1',
'ward_clusters': 8,
'ward_x_down': 16,
'x_down': 32}