Hiding IO latency in generators by async prefetching

Tricks with generators

Hiding IO latency by async prefetching

Overview

Its very common for interesting problems to not fit into memory.
In cases like this being able to process a subset of the problem at once is necessary to solving the problem.
Streams present a nice way of processing a limited subset. They let you program as if you were working with the full list, but only work with a single element in memory at a time. Alex recently wrote about streams.
You may already be used to implementing streams in this fashion with generators (python), iterators(C++), coroutines (Lua).
Ill use the term “Generator”, because ill be using Python.

If the time to produce/fetch an element is high, you can easily find yourself alternating between working with an element, and fetching an element.
Often fetching an element is a high latency operation, so your program ends up waiting on IO a lot of the time.
There are a number of ways of addressing this issue. Some that spring to mind are

  1. For certain tasks, use a library that supports non blocking operation. eg eventlet
  2. Explicitely parallelize the processing code (actors, worker thread pools, multiprocessing.Pool)
  3. Fetch elements in the background while processing the current element.

All 3 techniques are suited to specific situations.
I will discuss the third one in this article.

Its possible to wrap any generator such that it prefetches from the source generater in the background.
This has the benefit of overlapping the fetching of elements with normal execution.
This works best when the cost of processing the item is high, and the cost of fetching is not quite as high.
This will ensure that there is always an element ready for us when we finish processing one.

However this will not overlap fetching of individual elements.
It cannot do this with generators in the general case as they are stateful.
So this technique is not as helpful when the time it takes to process an element is tiny compared to the time to fetch.
For example, Ive seen FQL queries take over 5 seconds.

The Code

 
import functools
import Queue
import threading
 
def async_prefetch_wrapper(iterable, buffer=10):
	"""
	wraps an iterater such that it produces items in the background
	uses a bounded queue to limit memory consumption
	"""
	done = object()
	def worker(q,it):
		for item in it:
			q.put(item)
		q.put(done)
	# launch a thread to fetch the items in the background
	queue = Queue.Queue(buffer)
	it = iter(iterable)
	thread = threading.Thread(target=worker, args=(queue, it))
	thread.daemon = True
	thread.start()
	# pull the items of the queue as requested
	while True:
		item = queue.get()
		if item == done:
			return
		else:
			yield item
 
def async_prefetch(func):
	"""
	decorator to make generator functions fetch items in the background
	"""
	@functools.wraps(func)
	def wrapper(*args, **kwds):
		return async_prefetch_wrapper( func(*args, **kwds) )
	return wrapper

Pretty straight forward.

Results

So does it work?

First Ill do a simple test that creates a bunch of files and md5s them
heres the code

 
 
def test_setup():
	files = []
	lines = 1000000
	for i in xrange(100):
		filename = "tempfile%d.txt"%i
		files.append(filename)
		with open(filename, "w") as f:
			f.write( ("%d\n"%i)*lines )
	return files
 
def test_cleanup():
	for f in glob("tempfile*.txt"):
		os.unlink(f)
 
 
def contents(iterable):
	for filename in iterable:
		with open(filename, "rb") as f:
			contents = f.read()
		yield contents
 
def test():
	files = test_setup()
	for c in contents(files):
		hashlib.md5(c).digest()
	test_cleanup()
 
from timeit import Timer
t = Timer("test()", "from __main__ import test; gc.enable()")
print t.repeat(5, 1)

To use the async prefetch we change the contents generator to the following:

@async_prefetch
def contents(iterable):
	for filename in iterable:
		with open(filename, "rb") as f:
			contents = f.read()
		yield contents

Here are the results

without async prefetch 11.282730626378491 5.430316841944997 3.947590615567062 4.129772568860009 4.102539568576454
with async prefetch 6.155376451476375 3.790340392424177 3.384881807604039 3.436732252283459 3.415469144821479
without async prefetch and no md5 6.315582636899382 3.342140062493976 3.197938983865267 3.102182470118409 3.2230784219782134

Not a spectacular improvement, but not bad.
The best speedup we can hope for is reducing the total runtime by N*min(fetch time, work time).
You can see from the 3rd row that the total runtime is dominated by IO still. Ie this is a case with tiny workload vs slow IO.

Lets try it by replacing md5 with zlib compression.

without async prefetch. zlib compress 12.757559200959898 12.518354357886267 16.015608687696343 12.331753337365505 12.05284226067839
with async prefetch. zlib compress 10.578236130569667 9.156545245275586 12.359309772610764 9.072958714026505 8.881738391331858

not bad.

So this technique can give you a speedup when you have producer style generators that take time to produce elements.

Links

  1. Why dont you use mmap and madvise to opening files, i think you can reach same or better performance with it also in easy and less IO busy way.

  2. Good question.

    The file IO might have better performance if mmap was used.
    But iirc mmap is not noticeably faster than fread when reading the entire file in sequentially, so it probably wouldnt be much of a speed up. mmap is most useful for random access. But of course we’d have to benchmark to be sure.
    Also to pay the IO penalty in the other thread you’d need to mmap.read() the whole file.
    madvise could sidestep that, but doesnt exist in python, or windows.
    And lastly: I just didnt think of it 😉

    But the post isnt really about the IO, but about overlapping IO with processing.
    The file IO was just a convenient simple example. You could replace the contents() function with one that uses mmap.mmap and mmap.mread (no madvise in python), and still get the benefit from overlapping the IO with the processing.

    To be completely honest, if it was just python file IO and python socket IO you had to worry about, eventlet probably does all you need. You could replace this contents() with one that doesnt use async_prefetch and just uses eventlet, and maybe get better performance. It could give better disk utilization coz itll do file io concurrently on multiple files, and you wont have the GIL issues that another thread will cause.
    But one day you might need to work with something that eventlet doesnt support.

  3. thanks 😉 really good explanation. Just thought it about disk io problem. And after mmap because of kernel page caches you will get data almost instantly. You can do other jobs between madvise and read (Lighttpd using this method). Also i guess python mread doesn’t do exact same job as madvise 🙂

Leave a Comment


NOTE - You can use these HTML tags and attributes:
<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>