Ordered multi-processed generator in Keras

** UPDATE ** This post has made it into Keras as of Keras 2.0.6.

If you have used Keras extensively, you are probably aware that using model.fit_generator(..., pickle_safe=True) may not do what you expect. In fact, generators cannot be shared across processes in Python. This cause your generator to be copied instead. So your training loop will see the same data over and over again. While this is not that big of a deal in most cases, this cause the function model.predict_generator to be useless for workers > 1.

Solving the problem

To solve this problem, I have been toying around with Pytorch’s Dataset to add it into Keras code base. In this post, I’ll share my solution that I hope, will be merged into Keras soon enough.


Let’s start with some import, we’ll need a ProcessPoolExecutor to submit jobs to the different processes. A single thread-safe queue is all what we need.

import os
import time
from concurrent.futures import ProcessPoolExecutor
from itertools import cycle
from queue import Queue
from threading import Thread

As explained earlier, I used Pytorch’s Dataset object.

class Dataset():
    def __getitem__(self, index):
        raise NotImplementedError

    def __len__(self):
        raise NotImplementedError

For our tests, we shall create a fake Dataset. Here, the time.sleep(1) is representing all the preprocessing tasks, reading a file, data augmentation, resizing. For the sake of this post, the dataset is an echo dataset. It returns what you give him.

class ExampleDataset(Dataset):
    def __getitem__(self, index):
        return os.getpid(), index

    def __len__(self):
        return 100

The class MultiProcessExecutor was made to replace GeneratorEnqueuer from Keras. While the constructor lacks many arguments, this is just to showcase the power of this object. The MultiProcessExecutor will create a ProcessPoolExecutor and when there is place in the queue, it will submit a task to the executor with executor.submit.

Submitting a request will return a Future object, you can wait on that object to get the result. The Future objects are queued so that they will be read in order.

class MultiProcessExecutor():
    def __init__(self, dataset):
        self.workers = 5
        self.executor = ProcessPoolExecutor(self.workers)
        self.futures = {}
        self.dataset = dataset
        self.queue = Queue(self.workers * 2)
        self.run_thread = None

    def start(self):
        self.run_thread = Thread(target=self.run)
        self.run_thread.daemon = True

    def run(self):
        """ This will queue up 2*'workers' tasks in order """
        indexes = cycle(range(len(self.dataset)))
        for i in indexes:
            self.queue.put(self.executor.submit(self.dataset.__getitem__, [i]), block=True)

    def get_item(self):
        while True:
            yield self.queue.get(block=True).result()


To test our executor, we will ask for 100 items while timing it. executor.get_item() is returning a generator that will wait for the queue and will return the Future’s result.

dataset = ExampleDataset()
executor = MultiProcessExecutor(dataset)
getter = executor.get_item()
start = time.time()
for i in range(100):
    result = next(getter)
print("Took executor",time.time()-start)
Took executor 20.045644760131836

Pretty much what we expected. 5 workers doing a 100 second task should reduce the time by 5.

Now, let’s compare it to Keras’s GeneratorEnqueuer. We’ll create a generator that is doing exactly the same thing as our ExampleDataset.

from keras.engine.training import GeneratorEnqueuer
def keras_gen():
    while True:
        yield os.getpid()
qu = GeneratorEnqueuer(keras_gen(),pickle_safe=True)
start = time.time()
for i in range(100):
    while not qu.queue.qsize():
    result = qu.queue.get()
print("Took Keras",time.time()-start)
Took Keras 20.02438259124756

Pretty much the same as the executor. But, the data that GeneratorEnqueuer will provide won’t be in order.


In this post, we’ve shown how to use an executor. In the future, this work should be integrated with Keras model.*_generator.

UPDATE This feature has been merged into Keras 2.0.5 and it is named Sequence.

Written on June 4, 2017