Core Techniques used in our ETL

  • Generators
  • Partial function application
  • Batching / Chunking
  • Caching
In [1]:
import collections
import functools
import more_itertools
import json

Generators

python generators allow you to concisely create iterators.

They are a highlighted technique in this workshop because they provide:

  • Concise code
  • Deferred evaluation
  • Easy chaining for composing a tranformation process
In [2]:
# start with a function that produces a list of squared numbers
def squares_as_list(max_n):
    accum = []
    x = 1
    while x <= max_n:
        accum.append(x * x)
        x = x + 1
    return accum

# output the result
result = squares_as_list(10)
print('Type is: ' + str(type(result)))
for i in result:
    print(i)
Type is: <class 'list'>
1
4
9
16
25
36
49
64
81
100
In [3]:
# here is a similar function, but implemented as a generator
def squares_as_generator(max_n):
    x = 1
    while x < max_n:
        yield x * x
        x = x + 1


result = squares_as_generator(10)
print('Type is: ' + str(type(result)))

# loop directly as an iterable
print('All 10 using a loop')
for s in result:
    print(s)
    
print('Just 5 iterations to demonstrate deferred evaluation...')
another_gen = squares_as_generator(10)
print(next(another_gen))
print(next(another_gen))
print(next(another_gen))
print(next(another_gen))
print(next(another_gen))
Type is: <class 'generator'>
All 10 using a loop
1
4
9
16
25
36
49
64
81
Just 5 iterations to demonstrate deferred evaluation...
1
4
9
16
25

Chaining

Generators are first-class objects in python. So you can pass them as arguments (iterables) to other generators to change operations.

In [4]:
#
# Generator Chaining example
#

def f_A(n):
    x = 1
    while x < n:
        yield x * x
        x = x + 1
        
def f_B(iter_a):
    for y in iter_a:
        yield y + 10000
        
def f_C(iter_b):
    for z in iter_b:
        yield "'myprefix " + str(z) + "'"
        
# chain the first two
gen_a = f_A(10)
gen_b = f_B(gen_a)
print('First two chained')
for r in gen_b:
    print(r)

print('\nAll 3 chained')
gen_a = f_A(10)
gen_b = f_B(gen_a)
gen_c = f_C(gen_b)
for r in gen_c:
    print(r)
First two chained
10001
10004
10009
10016
10025
10036
10049
10064
10081

All 3 chained
'myprefix 10001'
'myprefix 10004'
'myprefix 10009'
'myprefix 10016'
'myprefix 10025'
'myprefix 10036'
'myprefix 10049'
'myprefix 10064'
'myprefix 10081'

Simplistic ETL

This code sample shows a very simple ETL which leverages generators and chaining.

This is somewhat contrived as it doesn't use a database. It uses a list as "source data" and a dictionary as a "destination" for inserting results. The main point is to show the separation of the 3 areas and how they can be chained together as generators.

In [5]:
# source: assume this list are the database rows
SOURCE_DATA = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

DESTINATION_DB = collections.OrderedDict()

def extractor(source_data):
    for item in source_data:
        yield item
        
def transformer(iter_extractor):
    for item in iter_extractor:
        # transform it into a tuple of (n, n^2)
        transformed_item = (item, item * item)
        yield transformed_item
        
def loader(iter_transformer, db):
    for item in iter_transformer:
        # insert each tuple as an item into the storage dictionary
        k = str(item[0])
        v = item[1]
        db[k] = v
        

# here is a simple example of chaining generators
extracted_gen = extractor(SOURCE_DATA)

transformed_gen = transformer(extracted_gen)

loader(transformed_gen, DESTINATION_DB)

# output the loaded results
print(json.dumps(DESTINATION_DB, indent=2))
{
  "1": 1,
  "2": 4,
  "3": 9,
  "4": 16,
  "5": 25,
  "6": 36,
  "7": 49,
  "8": 64,
  "9": 81,
  "10": 100
}

Partial functions

You can create partial function objects using functools.partial().

This allows you to "freeze" function arguments (args) or keyword (kwargs).

This is a quick method to implement encapsulation (bundling data with methods).

In [6]:
def add(x, y):
    return x + y

print('Simple addition')
print('1 + 2 = %d' % add(1, 2))
print('2 + 3 = %d' % add(2, 3))

print('partial add_1 function')
# NOTE: order of args matters!
add_1 = functools.partial(add, 1)

print('add_1(1) = %d' % add_1(1))
print('add_1(2) = %d' % add_1(2))

print('partial add_2 function')
add_2 = functools.partial(add, 2)

print('add_2(1) = %d' % add_2(1))
print('add_2(2) = %d' % add_2(2))
Simple addition
1 + 2 = 3
2 + 3 = 5
partial add_1 function
add_1(1) = 2
add_1(2) = 3
partial add_2 function
add_2(1) = 3
add_2(2) = 4
In [13]:
import functools

# similarly, you can freeze kwargs to avoid ordering constraints
def pow(x, n=1):
    return x ** n
    
print('regular')
print( pow(2, n=1) )

print('partial with n=2')
pow_2 = functools.partial(pow, n=2)
print(type(pow_2))
print( pow_2(2) )

print('partial with n=3')
pow_3 = functools.partial(pow, n=3)
print( pow_3(2) )
regular
2
partial with n=2
<class 'functools.partial'>
4
partial with n=3
8

For modules with single operations, you can quickly implement parameterization using partial functions.

In [8]:
# example: this tranformer generator has multiple kwargs which serve
# parameters indicating its behavior
def tranform_func_with_config(iter_extractor, translate=0, scale=1, type=int):
    for x in iter_extractor:
        t = x + translate
        t = scale * t
        t = type(t)
        
        yield (x,t)
        
# now we can create multiple transformer configurations via partial functions
# these configurations can be read from a JSON file
config_1 = {'translate': 1, 'scale': 2}
config_2 = {'scale': -1, 'type': str}

# create partial functions quickly by unpacking the configuration to freeze the kwargs
transform_1 = functools.partial(tranform_func_with_config, **config_1)
transform_2 = functools.partial(tranform_func_with_config, **config_2)

# let's output one of them
extracted_gen = extractor(SOURCE_DATA)
tranform_1_gen = transform_1(extracted_gen)

for t in tranform_1_gen:
    print(t)
    
# any questions?
(1, 4)
(2, 6)
(3, 8)
(4, 10)
(5, 12)
(6, 14)
(7, 16)
(8, 18)
(9, 20)
(10, 22)
In [9]:
# the real power is that the partial function _encapsulates_ the confirmation so that 
# other functions (like this simple process method) need not be concerned with it
def process(f_extractor, f_transformer, f_loader):
    
    # run the process
    extractor_gen = f_extractor(SOURCE_DATA)
    
    transformer_gen = f_transformer(extractor_gen)
    
    f_loader(transformer_gen, DESTINATION_DB)


DESTINATION_DB.clear()
print('configuration 1')
process(extractor, transform_1, loader)
print(json.dumps(DESTINATION_DB, indent=2))


DESTINATION_DB.clear()
print('\nconfiguration 2')
process(extractor, transform_2, loader)
print(json.dumps(DESTINATION_DB, indent=2))
configuration 1
{
  "1": 4,
  "2": 6,
  "3": 8,
  "4": 10,
  "5": 12,
  "6": 14,
  "7": 16,
  "8": 18,
  "9": 20,
  "10": 22
}

configuration 2
{
  "1": "-1",
  "2": "-2",
  "3": "-3",
  "4": "-4",
  "5": "-5",
  "6": "-6",
  "7": "-7",
  "8": "-8",
  "9": "-9",
  "10": "-10"
}

Batching

This is also known as "chunking". This is easy using more_itertools.chunked().

This consumes any iterable, but outputs its iterated items into batched lists of a maximum size. This greatly reduces complexity of your code because you need not worry about how many items your input iterator produces. You also don't need any edge case logic to handle 'remainder' items.

In [10]:
# range() is a python built-in.  since python 3, it is a generator!
source_gen = range(20)

print('normal consumption')
for item in source_gen:
    print(item)
    
print('\nbatched consumption')
source_gen = range(20)
chunk_size = 3
batched_gen = more_itertools.chunked(source_gen, chunk_size)
for item in batched_gen:
    print('{} of size {}: {}'.format(type(item), len(item), item))
normal consumption
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

batched consumption
<class 'list'> of size 3: [0, 1, 2]
<class 'list'> of size 3: [3, 4, 5]
<class 'list'> of size 3: [6, 7, 8]
<class 'list'> of size 3: [9, 10, 11]
<class 'list'> of size 3: [12, 13, 14]
<class 'list'> of size 3: [15, 16, 17]
<class 'list'> of size 2: [18, 19]

Caching

Some computations are time consuming. You can store pre-computed results in memory via a cache.

Python comes with a built-in caching function: functools.lru_cache(). You can easily wrap an "expensive" function so that it will cache a maximum number of results.

This cache uses a Least Recently Used cache replacement policy. This just means that if you need to add a new item to a cache that is full, review your existing items and evict the least recently used one before inserting a new item. This is most easily implemented with a hash table (for quick lookup) along with a doubly-linked list (for quickly locating the least recently used item to evict). Other data structures exist with some tradeoffs (e.g. data structures with age bits).

In [11]:
@functools.lru_cache(maxsize=4)
def cached_pow(x, n):
    print("-- Oh be careful... I'm expensive!")
    return x ** n

# this will run the actual method but cache the results
print('Populate cache with 2 different items')
print( cached_pow(2, 3) )
print( cached_pow(2, 4) )

# this will use cached results (notice the absence of the warning)
print('\nRe-run same requests so that it retrieves from the cache')
print( cached_pow(2, 3) )
print( cached_pow(2, 3) )
print( cached_pow(2, 4) )
print( cached_pow(2, 4) )
print( cached_pow(2, 4) )

# this will force an eviction (2+3 > 4 max items) of the first pow(2,3) result
print('\n3 more different items')
print( cached_pow(2, 5) )
print( cached_pow(2, 6) )
print( cached_pow(2, 7) )

# run the very last one along with (2,3) again to re-evaluate
print('\n(2,3) should have been evicted, will require an evaluation')
print( cached_pow(2, 7) )
print( cached_pow(2, 3) )

print('cache metrics')
cache_info = cached_pow.cache_info()
print(cache_info)
Populate cache with 2 different items
-- Oh be careful... I'm expensive!
8
-- Oh be careful... I'm expensive!
16

Re-run same requests so that it retrieves from the cache
8
8
16
16
16

3 more different items
-- Oh be careful... I'm expensive!
32
-- Oh be careful... I'm expensive!
64
-- Oh be careful... I'm expensive!
128

(2,3) should have been evicted, will require an evaluation
128
-- Oh be careful... I'm expensive!
8
cache metrics
CacheInfo(hits=6, misses=6, maxsize=4, currsize=4)

However, using this cache requires a bit of care. The documentation briefly mentions that:

...the positional and keyword arguments to the function must be hashable...

This is actually quite critical when working with the SQLAlchemy ORM. This is because the Session object should not be considered hasheable. It is a class instance that likely has a lot of internal state that is dynamically changing under the hood.

In [12]:
# a contrived session class which uses our contrived database
class CrankySession(object):
    def __init__(self, db):
        self.db = db
    
    def query(self, idx: int):
        print("-- fine fine... I'll check the database")
        return self.db[idx]
    
    def __hash__(self):
        raise RuntimeError("WATCH IT BUDDY! I'm not hashable!")
        
# let's use the lru_cache decorator disregarding the documentation regarding hashable arguments
@functools.lru_cache(maxsize=4)
def broken_session_lookup(session: CrankySession, idx: int):
    return session.query(idx)

# now try running it
session = CrankySession(DESTINATION_DB)
broken_session_lookup(session, "1")
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-12-3d8605c0ae18> in <module>()
     18 # now try running it
     19 session = CrankySession(DESTINATION_DB)
---> 20 broken_session_lookup(session, "1")

<ipython-input-12-3d8605c0ae18> in __hash__(self)
      9 
     10     def __hash__(self):
---> 11         raise RuntimeError("WATCH IT BUDDY! I'm not hashable!")
     12 
     13 # let's use the lru_cache decorator disregarding the documentation regarding hashable arguments

RuntimeError: WATCH IT BUDDY! I'm not hashable!

We can easily get around this using partial functions. Here is an example which implements a contrived (but simple Session) which explodes if you try and hash it.

In [ ]:
# start with an unwrapped function
def raw_session_lookup(session: CrankySession, idx: int):
    return session.query(idx)

# create a new partial function to "freeze" the session argument
partial_session_lookup = functools.partial(raw_session_lookup, session)

# now you can safely wrap the partial function with the lru_cache method
# NOTE: you need to call the wrapper directly rather than using a decorator syntax
cache_wrapper = functools.lru_cache(maxsize=4)
cached_session_lookup = cache_wrapper(partial_session_lookup)

# now call it to your heart's content
print(cached_session_lookup("1"))
print(cached_session_lookup("2"))
print(cached_session_lookup("2"))
print(cached_session_lookup("1"))
print(cached_session_lookup("1"))
cache_info = cached_session_lookup.cache_info()
print(cache_info)
In [ ]: