Sharing large objects in a `multiprocessing.Pool`

UPDATE This post has made it into Keras itself as of Keras 2.0.7.

While developing Sequence for Keras, I stumble upon an issue when using multiprocessing.Pool.

When you use read-only structure like Sequence, you expect them to be really fast. But, I was getting the opposite, Sequences were now 2-5 times slower than generators. In this post, I’ll show what is the problem and how to resolve it.

Example

So let’s say you want to share a structure with some internal data like a Sequence.

class Sequence():
    def __init__(self, my_list):
        self.my_list = my_list

    def __getitem__(self, item):
        return self.my_list[item]

    def __len__(self):
        return len(self.my_list)

def get_item(seq, idx):
    # Allows Python2 to pickle the Sequence
    return seq[idx]

The list could be quite large, thousands of elements. To make it harder for Python to correctly translate the list to a C array, we will make tuples of different types before creating a Sequence to it.

files = ['test_string'] * 100000
# Make the test faster.
nb_files = min(len(list(files)), 100)

huge_list = ((x, [1, 3, 4]) for x in files)
seq = Sequence(list(huge_list))

Next, we have to initialize a multiprocessing.Pool and a consumer-producer system. We will create a Pool of 5 workers to extracts data and a thread to enqueue the promises.

x = multiprocessing.Pool(5)
qu = Queue(10)

def run(qu):
    for i in range(nb_files):
        # Simply dequeue an item and wait for its result.
        acc = qu.get(block=True).get()

th = threading.Thread(target=run, args=(qu,))
th.daemon = True
th.start()

Now that everything is set, we can start doing the test. We’ll see how much time it takes to extract 100 items from the queue.

st = time.time()
for i in range(nb_files):
    qu.put(x.apply_async(func=get_item, args=(seq, i,)), block=True)

th.join()
print("Took list", time.time() - st)
# Took list 38.6540949344635

38 seconds to do such a simple task seems abnormal. The issue here is that passing seq around is expensive since every call to get_item will copy the seq to the memory of the process.

Solution - Sharing!

To resolve this problem, we will share the list between every process of the Pool. To do that, we will use a Manager.

A Manager is a little server that answers requests on the objects that it holds.

manager = multiprocessing.Manager()
big_list = ((x, [1, 3, 4]) for x in files)
ls = manager.list(list(big_list))
seq2 = Sequence(ls)

Here, we can directly use manager.list() which will create a shared list. Please note that you may need to create your own Proxy if you need other types to be shared.

We can now do some tests.

st = time.time()
for i in range(nb_files):
    qu.put(x.apply_async(func=get_item, args=(seq2, i,)), block=True)

th.join()
print("Took Manager list", time.time() - st)
# Took Manager list 0.111

Now we’re talking! More than a 300x speedup for 4 lines of Python.

Conclusion

In this post, we’ve shown a case where sharing an object between processes is better than just copying it. We’ve made a 300x improvement in about 4 lines of Python that use multiprocessing.Manager.

Written on July 8, 2017