import re import bz2 import os from os.path import expanduser, join, exists from configparser import ConfigParser from time import time import numpy as np from concurrent.futures import ThreadPoolExecutor from libcloud.storage.types import Provider from libcloud.storage.types import ContainerDoesNotExistError from libcloud.storage.types import ObjectDoesNotExistError from libcloud.storage.providers import get_driver DATA_FOLDER = expanduser('~/data/mnist8m') SVMLIGHT_DATA_FOLDER = join(DATA_FOLDER, 'svmlight') NUMPY_DATA_FOLDER = join(DATA_FOLDER, 'numpy') MNIST8M_SRC_URL = ('http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/' 'datasets/multiclass/mnist8m.bz2') MNIST8M_SRC_FILENAME = MNIST8M_SRC_URL.rsplit('/', 1)[1] MNIST8M_SRC_FILEPATH = join(DATA_FOLDER, MNIST8M_SRC_FILENAME) CHUNK_FILENAME_PREFIX = "mnist8m-chunk-" CHUNK_SIZE = 100000 if not exists(DATA_FOLDER): os.makedirs(DATA_FOLDER) if not exists(MNIST8M_SRC_FILEPATH): cmd = "(cd '%s' && wget -c '%s')" % (DATA_FOLDER, MNIST8M_SRC_URL) print(cmd) os.system(cmd) if not exists(SVMLIGHT_DATA_FOLDER): os.makedirs(SVMLIGHT_DATA_FOLDER) chunk_filenames = [fn for fn in os.listdir(SVMLIGHT_DATA_FOLDER) if (fn.startswith(CHUNK_FILENAME_PREFIX) and fn.endswith('.svmlight'))] chunk_filenames.sort() def get_svmlight_filename(chunk_idx): chunk_filename = "%s%03d.svmlight" % (CHUNK_FILENAME_PREFIX, chunk_idx) return join(SVMLIGHT_DATA_FOLDER, chunk_filename) if not chunk_filenames: chunk_filenames = [] with bz2.BZ2File(MNIST8M_SRC_FILEPATH) as source: target, line_no, chunk_idx = None, 0, 0 for line in source: line_no += 1 if target is None: chunk_filename = get_svmlight_filename(chunk_idx) target = open(chunk_filename, 'wb') chunk_idx += 1 chunk_filenames.append(chunk_filename) target.write(line) if line_no >= CHUNK_SIZE: target.close() target, line_no = None, 0 if target is not None: target.close() from IPython.parallel import Client client = Client() lb_view = client.load_balanced_view() len(lb_view) def parse_svmlight_chunk(input_chunk_filename, output_chunk_filename, output_chunk_labels_filename, n_features, chunk_size=CHUNK_SIZE): # Import dependencies lazily to be able to run this function # on remote nodes of the cluster in parallel with IPython from sklearn.datasets import load_svmlight_file if (not exists(output_chunk_filename) or not exists(output_chunk_labels_filename)): X, y = load_svmlight_file(input_chunk_filename, n_features=n_features) np.savez_compressed(output_chunk_filename, X.toarray() / 255.) np.savez_compressed(output_chunk_labels_filename, y) def get_numpy_filenames(i): data = "%s%03d_data.npz" % (CHUNK_FILENAME_PREFIX, chunk_idx) labels = "%s%03d_labels.npz" % (CHUNK_FILENAME_PREFIX, chunk_idx) return ( join(NUMPY_DATA_FOLDER, data), join(NUMPY_DATA_FOLDER, labels), ) tasks = [] n_features = 28 ** 2 # hardcoded for now for i in range(81): # 8100000 lines // 100000 lines per chunk: svmlight_chunk_name = get_svmlight_filename(i) data_chunk_name, label_chunk_name = get_numpy_filenames(i) tasks.append(lb_view.apply(parse_svmlight_chunk, svmlight_chunk_name, data_chunk_name, label_chunk_name, n_features)) sum(t.ready() for t in tasks), len(tasks) CONFIGFILE_PATH = 'cloudstorage.ini' CONTAINER_NAME = "mnist8m" def build_driver(configfile_path=CONFIGFILE_PATH, section='account'): config = ConfigParser() config.read(configfile_path) provider_name = config.get(section, 'libcloud_provider') driver_type = get_driver(provider_name) account_name = config.get(section, 'account_name') account_secret = config.get(section, 'account_secret') return driver_type(account_name, account_secret) driver = build_driver() def get_or_create_container(driver, container_name=CONTAINER_NAME): try: return driver.get_container(container_name) except ContainerDoesNotExistError: return driver.create_container(container_name) container = get_or_create_container(driver) def upload_object(local_folder, object_name, container_name=CONTAINER_NAME, skip_if_exists=True): driver = build_driver() # libcloud drivers are not thread-safe container = get_or_create_container(driver, container_name) filepath = os.path.join(local_folder, object_name) if skip_if_exists: try: # Check the size to deal with partially uploaded files ob = container.get_object(object_name) if ob.size == os.stat(filepath).st_size: return ob except ObjectDoesNotExistError: pass return container.upload_object(filepath, object_name, extra={'content_type': 'application/octet-stream'}) n_workers = 10 filenames = os.listdir(NUMPY_DATA_FOLDER) tic = time() with ThreadPoolExecutor(max_workers=n_workers) as e: for f in filenames: e.submit(upload_object, local_folder, f) print("Uploaded {} files with {} workers in {:0.3f}s".format( len(filenames), n_workers, time() - tic))