Sharing large objects in a `multiprocessing.Pool`
UPDATE This post has made it into Keras itself as of Keras 2.0.7.
While developing Sequence
for Keras
, I stumble upon an issue when using multiprocessing.Pool
.
When you use read-only structure like Sequence
, you expect them to be really fast. But, I was getting the opposite, Sequences were now 2-5 times slower than generators. In this post, I’ll show what is the problem and how to resolve it.
Example
So let’s say you want to share a structure with some internal data like a Sequence.
class Sequence():
def __init__(self, my_list):
self.my_list = my_list
def __getitem__(self, item):
return self.my_list[item]
def __len__(self):
return len(self.my_list)
def get_item(seq, idx):
# Allows Python2 to pickle the Sequence
return seq[idx]
The list could be quite large, thousands of elements. To make it harder for Python
to correctly translate the list to a C array, we will make tuples of different types before creating a Sequence
to it.
files = ['test_string'] * 100000
# Make the test faster.
nb_files = min(len(list(files)), 100)
huge_list = ((x, [1, 3, 4]) for x in files)
seq = Sequence(list(huge_list))
Next, we have to initialize a multiprocessing.Pool
and a consumer-producer system.
We will create a Pool
of 5 workers to extracts data and a thread to enqueue the promises.
x = multiprocessing.Pool(5)
qu = Queue(10)
def run(qu):
for i in range(nb_files):
# Simply dequeue an item and wait for its result.
acc = qu.get(block=True).get()
th = threading.Thread(target=run, args=(qu,))
th.daemon = True
th.start()
Now that everything is set, we can start doing the test. We’ll see how much time it takes to extract 100 items from the queue.
st = time.time()
for i in range(nb_files):
qu.put(x.apply_async(func=get_item, args=(seq, i,)), block=True)
th.join()
print("Took list", time.time() - st)
# Took list 38.6540949344635
38 seconds to do such a simple task seems abnormal. The issue here is that passing seq
around is expensive since every call to get_item
will copy the seq
to the memory of the process.
Solution - Sharing!
To resolve this problem, we will share the list between every process of the Pool. To do that, we will use a Manager
.
A Manager is a little server that answers requests on the objects that it holds.
manager = multiprocessing.Manager()
big_list = ((x, [1, 3, 4]) for x in files)
ls = manager.list(list(big_list))
seq2 = Sequence(ls)
Here, we can directly use manager.list() which will create a shared list. Please note that you may need to create your own Proxy if you need other types to be shared.
We can now do some tests.
st = time.time()
for i in range(nb_files):
qu.put(x.apply_async(func=get_item, args=(seq2, i,)), block=True)
th.join()
print("Took Manager list", time.time() - st)
# Took Manager list 0.111
Now we’re talking! More than a 300x speedup for 4 lines of Python
.
Conclusion
In this post, we’ve shown a case where sharing an object between processes is better than just copying it. We’ve made a 300x improvement in about 4 lines of Python
that use multiprocessing.Manager
.