This notebook is an example to demonstrate how to preprocess a large dataset in the svmlight format to convert into chunked, dense numpy arrays that are them compressed individually and stored in a cloud object store on Amazon S3 or Azure Blob Store for later consumption by machine learning models.
import re
import bz2
import os
from os.path import expanduser, join, exists
from configparser import ConfigParser
from time import time
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from libcloud.storage.types import Provider
from libcloud.storage.types import ContainerDoesNotExistError
from libcloud.storage.types import ObjectDoesNotExistError
from libcloud.storage.providers import get_driver
DATA_FOLDER = expanduser('~/data/mnist8m')
SVMLIGHT_DATA_FOLDER = join(DATA_FOLDER, 'svmlight')
NUMPY_DATA_FOLDER = join(DATA_FOLDER, 'numpy')
MNIST8M_SRC_URL = ('http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/'
'datasets/multiclass/mnist8m.bz2')
MNIST8M_SRC_FILENAME = MNIST8M_SRC_URL.rsplit('/', 1)[1]
MNIST8M_SRC_FILEPATH = join(DATA_FOLDER, MNIST8M_SRC_FILENAME)
CHUNK_FILENAME_PREFIX = "mnist8m-chunk-"
CHUNK_SIZE = 100000
Download the mnist8m.bz2
source file into the data folder if not previously downloaded:
if not exists(DATA_FOLDER):
os.makedirs(DATA_FOLDER)
if not exists(MNIST8M_SRC_FILEPATH):
cmd = "(cd '%s' && wget -c '%s')" % (DATA_FOLDER, MNIST8M_SRC_URL)
print(cmd)
os.system(cmd)
Decompress the big bz2 source file and chunk the source svmlight formatted data file to make it easier to process it in parallel:
if not exists(SVMLIGHT_DATA_FOLDER):
os.makedirs(SVMLIGHT_DATA_FOLDER)
chunk_filenames = [fn for fn in os.listdir(SVMLIGHT_DATA_FOLDER)
if (fn.startswith(CHUNK_FILENAME_PREFIX)
and fn.endswith('.svmlight'))]
chunk_filenames.sort()
def get_svmlight_filename(chunk_idx):
chunk_filename = "%s%03d.svmlight" % (CHUNK_FILENAME_PREFIX, chunk_idx)
return join(SVMLIGHT_DATA_FOLDER, chunk_filename)
if not chunk_filenames:
chunk_filenames = []
with bz2.BZ2File(MNIST8M_SRC_FILEPATH) as source:
target, line_no, chunk_idx = None, 0, 0
for line in source:
line_no += 1
if target is None:
chunk_filename = get_svmlight_filename(chunk_idx)
target = open(chunk_filename, 'wb')
chunk_idx += 1
chunk_filenames.append(chunk_filename)
target.write(line)
if line_no >= CHUNK_SIZE:
target.close()
target, line_no = None, 0
if target is not None:
target.close()
Parse the svmlight formatted chunks into dense numpy arrays and store the resulting chunks as compressed binary files using NumPy own format.
from IPython.parallel import Client
client = Client()
lb_view = client.load_balanced_view()
len(lb_view)
4
def parse_svmlight_chunk(input_chunk_filename, output_chunk_filename,
output_chunk_labels_filename,
n_features, chunk_size=CHUNK_SIZE):
# Import dependencies lazily to be able to run this function
# on remote nodes of the cluster in parallel with IPython
from sklearn.datasets import load_svmlight_file
if (not exists(output_chunk_filename)
or not exists(output_chunk_labels_filename)):
X, y = load_svmlight_file(input_chunk_filename, n_features=n_features)
np.savez_compressed(output_chunk_filename, X.toarray() / 255.)
np.savez_compressed(output_chunk_labels_filename, y)
def get_numpy_filenames(i):
data = "%s%03d_data.npz" % (CHUNK_FILENAME_PREFIX, chunk_idx)
labels = "%s%03d_labels.npz" % (CHUNK_FILENAME_PREFIX, chunk_idx)
return (
join(NUMPY_DATA_FOLDER, data),
join(NUMPY_DATA_FOLDER, labels),
)
tasks = []
n_features = 28 ** 2 # hardcoded for now
for i in range(81): # 8100000 lines // 100000 lines per chunk:
svmlight_chunk_name = get_svmlight_filename(i)
data_chunk_name, label_chunk_name = get_numpy_filenames(i)
tasks.append(lb_view.apply(parse_svmlight_chunk,
svmlight_chunk_name,
data_chunk_name,
label_chunk_name,
n_features))
sum(t.ready() for t in tasks), len(tasks)
(0, 81)
CONFIGFILE_PATH = 'cloudstorage.ini'
Let's use Apache Libcloud to upload the chunk objects to a permanent store for later usage in ephemeral VMs. We will store the credential in a configuration file named cloudstorage.ini
. Here is the expected content for the Windows Azure Cloud:
[account]
libcloud_provider = azure_blobs
account_name = myacount
account_secret = primarykey
On Amazon S3, the config file would look like:
[account]
libcloud_provider = s3
account_name = aws_key_id
account_secret = aws_secret_key
Apache Libcloud supports many more Cloud Object Store providers.
The objects will be stored in a specific container. On some providers, the container name must be globally unique (such as is the case for bucket names on S3). On others like Azure, the container names are local to the cloud storage account. In case of conflict, just change the container name:
CONTAINER_NAME = "mnist8m"
The following function parse the cloudstorage.ini
file and build a Libcloud driver instance. This instance is not thread safe, hence we wrap the driver instanciation in a function to be reused in individual threads.
def build_driver(configfile_path=CONFIGFILE_PATH, section='account'):
config = ConfigParser()
config.read(configfile_path)
provider_name = config.get(section, 'libcloud_provider')
driver_type = get_driver(provider_name)
account_name = config.get(section, 'account_name')
account_secret = config.get(section, 'account_secret')
return driver_type(account_name, account_secret)
driver = build_driver()
The following utility function checks that a container with a specific name exits on the Cloud Storage provider, otherwise it creates it:
def get_or_create_container(driver, container_name=CONTAINER_NAME):
try:
return driver.get_container(container_name)
except ContainerDoesNotExistError:
return driver.create_container(container_name)
container = get_or_create_container(driver)
We can now write a function that uploads invidual local files to a target object container. As this function will be called in parallel in various threads we instanciate a dedicated driver inside.
def upload_object(local_folder, object_name, container_name=CONTAINER_NAME, skip_if_exists=True):
driver = build_driver() # libcloud drivers are not thread-safe
container = get_or_create_container(driver, container_name)
filepath = os.path.join(local_folder, object_name)
if skip_if_exists:
try:
# Check the size to deal with partially uploaded files
ob = container.get_object(object_name)
if ob.size == os.stat(filepath).st_size:
return ob
except ObjectDoesNotExistError:
pass
return container.upload_object(filepath, object_name,
extra={'content_type': 'application/octet-stream'})
Finally let us upload all the chunks and labels from the MNIST8M dataset in parallel to speedup the upload. As IPython does not seem to be fully compatible with gevent monkeypatching we will use Python threads to upload data in parallel:
n_workers = 10
filenames = os.listdir(NUMPY_DATA_FOLDER)
tic = time()
with ThreadPoolExecutor(max_workers=n_workers) as e:
for f in filenames:
e.submit(upload_object, local_folder, f)
print("Uploaded {} files with {} workers in {:0.3f}s".format(
len(filenames), n_workers, time() - tic))
Uploaded 83 files with 10 workers in 281.750s