Tricks with generators
Hiding IO latency by async prefetching
Its very common for interesting problems to not fit into memory.
In cases like this being able to process a subset of the problem at once is necessary to solving the problem.
Streams present a nice way of processing a limited subset. They let you program as if you were working with the full list, but only work with a single element in memory at a time. Alex recently wrote about streams.
You may already be used to implementing streams in this fashion with generators (python), iterators(C++), coroutines (Lua).
Ill use the term “Generator”, because ill be using Python.
If the time to produce/fetch an element is high, you can easily find yourself alternating between working with an element, and fetching an element.
Often fetching an element is a high latency operation, so your program ends up waiting on IO a lot of the time.
There are a number of ways of addressing this issue. Some that spring to mind are
- For certain tasks, use a library that supports non blocking operation. eg eventlet
- Explicitely parallelize the processing code (actors, worker thread pools, multiprocessing.Pool)
- Fetch elements in the background while processing the current element.
All 3 techniques are suited to specific situations.
I will discuss the third one in this article.
Its possible to wrap any generator such that it prefetches from the source generater in the background.
This has the benefit of overlapping the fetching of elements with normal execution.
This works best when the cost of processing the item is high, and the cost of fetching is not quite as high.
This will ensure that there is always an element ready for us when we finish processing one.
However this will not overlap fetching of individual elements.
It cannot do this with generators in the general case as they are stateful.
So this technique is not as helpful when the time it takes to process an element is tiny compared to the time to fetch.
For example, Ive seen FQL queries take over 5 seconds.
import functools import Queue import threading def async_prefetch_wrapper(iterable, buffer=10): """ wraps an iterater such that it produces items in the background uses a bounded queue to limit memory consumption """ done = object() def worker(q,it): for item in it: q.put(item) q.put(done) # launch a thread to fetch the items in the background queue = Queue.Queue(buffer) it = iter(iterable) thread = threading.Thread(target=worker, args=(queue, it)) thread.daemon = True thread.start() # pull the items of the queue as requested while True: item = queue.get() if item == done: return else: yield item def async_prefetch(func): """ decorator to make generator functions fetch items in the background """ @functools.wraps(func) def wrapper(*args, **kwds): return async_prefetch_wrapper( func(*args, **kwds) ) return wrapper
Pretty straight forward.
So does it work?
First Ill do a simple test that creates a bunch of files and md5s them
heres the code
def test_setup(): files =  lines = 1000000 for i in xrange(100): filename = "tempfile%d.txt"%i files.append(filename) with open(filename, "w") as f: f.write( ("%d\n"%i)*lines ) return files def test_cleanup(): for f in glob("tempfile*.txt"): os.unlink(f) def contents(iterable): for filename in iterable: with open(filename, "rb") as f: contents = f.read() yield contents def test(): files = test_setup() for c in contents(files): hashlib.md5(c).digest() test_cleanup() from timeit import Timer t = Timer("test()", "from __main__ import test; gc.enable()") print t.repeat(5, 1)
To use the async prefetch we change the contents generator to the following:
@async_prefetch def contents(iterable): for filename in iterable: with open(filename, "rb") as f: contents = f.read() yield contents
Here are the results
|without async prefetch||11.282730626378491||5.430316841944997||3.947590615567062||4.129772568860009||4.102539568576454|
|with async prefetch||6.155376451476375||3.790340392424177||3.384881807604039||3.436732252283459||3.415469144821479|
|without async prefetch and no md5||6.315582636899382||3.342140062493976||3.197938983865267||3.102182470118409||3.2230784219782134|
Not a spectacular improvement, but not bad.
The best speedup we can hope for is reducing the total runtime by N*min(fetch time, work time).
You can see from the 3rd row that the total runtime is dominated by IO still. Ie this is a case with tiny workload vs slow IO.
Lets try it by replacing md5 with zlib compression.
|without async prefetch. zlib compress||12.757559200959898||12.518354357886267||16.015608687696343||12.331753337365505||12.05284226067839|
|with async prefetch. zlib compress||10.578236130569667||9.156545245275586||12.359309772610764||9.072958714026505||8.881738391331858|
So this technique can give you a speedup when you have producer style generators that take time to produce elements.