Sharing large objects in a `multiprocessing.Pool`
UPDATE This post has made it into Keras itself as of Keras 2.0.7.
Keras, I stumble upon an issue when using
When you use read-only structure like
Sequence, you expect them to be really fast. But, I was getting the opposite, Sequences were now 2-5 times slower than generators. In this post, I’ll show what is the problem and how to resolve it.
So let’s say you want to share a structure with some internal data like a Sequence.
class Sequence(): def __init__(self, my_list): self.my_list = my_list def __getitem__(self, item): return self.my_list[item] def __len__(self): return len(self.my_list) def get_item(seq, idx): # Allows Python2 to pickle the Sequence return seq[idx]
The list could be quite large, thousands of elements. To make it harder for
Python to correctly translate the list to a C array, we will make tuples of different types before creating a
Sequence to it.
files = ['test_string'] * 100000 # Make the test faster. nb_files = min(len(list(files)), 100) huge_list = ((x, [1, 3, 4]) for x in files) seq = Sequence(list(huge_list))
Next, we have to initialize a
multiprocessing.Pool and a consumer-producer system.
We will create a
Pool of 5 workers to extracts data and a thread to enqueue the promises.
x = multiprocessing.Pool(5) qu = Queue(10) def run(qu): for i in range(nb_files): # Simply dequeue an item and wait for its result. acc = qu.get(block=True).get() th = threading.Thread(target=run, args=(qu,)) th.daemon = True th.start()
Now that everything is set, we can start doing the test. We’ll see how much time it takes to extract 100 items from the queue.
st = time.time() for i in range(nb_files): qu.put(x.apply_async(func=get_item, args=(seq, i,)), block=True) th.join() print("Took list", time.time() - st) # Took list 38.6540949344635
38 seconds to do such a simple task seems abnormal. The issue here is that passing
seq around is expensive since every call to
get_item will copy the
seq to the memory of the process.
Solution - Sharing!
To resolve this problem, we will share the list between every process of the Pool. To do that, we will use a
A Manager is a little server that answers requests on the objects that it holds.
manager = multiprocessing.Manager() big_list = ((x, [1, 3, 4]) for x in files) ls = manager.list(list(big_list)) seq2 = Sequence(ls)
Here, we can directly use manager.list() which will create a shared list. Please note that you may need to create your own Proxy if you need other types to be shared.
We can now do some tests.
st = time.time() for i in range(nb_files): qu.put(x.apply_async(func=get_item, args=(seq2, i,)), block=True) th.join() print("Took Manager list", time.time() - st) # Took Manager list 0.111
Now we’re talking! More than a 300x speedup for 4 lines of
In this post, we’ve shown a case where sharing an object between processes is better than just copying it. We’ve made a 300x improvement in about 4 lines of
Python that use