Elasticsearch - keep your streams under control
Updated with new class version
Ingesting a stream of data from a backend into Elasticsearch is pretty simple -- the service has this really nice bulk api where you can send in the same request for a bunch of indexing operations.
I've built a straightforward asynchronous consumer-producer system on top of our excellent Python client.
The producer is an iterator that returns documents to send to Elastic, and the consumer is a function that
iterates on those documents and sends them in batches using the
If your Elasticsearch service is nearby and you have a lot of network bandwidth, you can send colossal bulk requests -- sending documents in batches of 500 is pretty fast.
Of course, that's only possible if you have a lot of resources available, and that includes the RAM, where you collect the data you want to send over by batches.
As soon as your network gets more limited (smaller bandwidth, longer round-trips), big requests won't work anymore. Your network will get saturated and you will start to see time-outs. Sending requests of 50MB over the network don't make much sense in that context.
You can reduce the size of the batches, but the size of the document also matters and they may vary from one document to the other.
And in async Python it's pretty easy to pile up data in memory when the producer is pumping data faster than what the consumer can grab to send to Elasticsearch.
I was trying to find the most elegant way to deal with this in various network conditions, with a few goals in mind.
- making it as fast as possible
- have a configurable maximum memory Resident Set Size for the app
- have a configurable maximum bulk request size
- a way to throttle calls done in the producer when limits are reached
The program initially used an
asyncio.Queue but that queue is not aware of its
size in memory -- it just allows you to define a maximum number of items before
put blocks on the next insertion.
But that class is easy to override!
You can use Pympler to get the real memory size of a
Python object -- this is different from
sys.getsizeof because Pympler recursively look
at all objects attached to the object you measure. For simple data structures like
lists or dicts, it's pretty accurate -- and our docs are simple mappings:
from pympler import asizeof def get_size(ob): """Returns size in bytes""" return asizeof.asizeof(ob)
Using that function, I've created a
MemQueue class that will block any
attempt to put new data into the queue if it reaches a specific size in memory:
import asyncio class MemQueue(asyncio.Queue): def __init__( self, maxsize=0, maxmemsize=0, refresh_interval=1.0, refresh_timeout=120 ): super().__init__(maxsize) self.maxmemsize = maxmemsize self.refresh_interval = refresh_interval self._current_memsize = 0 self.refresh_timeout = refresh_timeout def _get(self): item_size, item = self._queue.popleft() self._current_memsize -= item_size return item_size, item def _put(self, item): self._current_memsize += item self._queue.append(item) def mem_full(self): if self.maxmemsize == 0: return False return self.qmemsize() >= self.maxmemsize def qmemsize(self): return self._current_memsize async def _wait_for_room(self, item): item_size = get_size(item) if self._current_memsize + item_size <= self.maxmemsize: return item_size start = time.time() while self._current_memsize + item_size >= self.maxmemsize: if time.time() - start >= self.refresh_timeout: raise asyncio.QueueFull() await asyncio.sleep(self.refresh_interval) return item_size async def put(self, item): item_size = await self._wait_for_room(item) return await super().put((item_size, item))
That's it for the queue! Producers can use it to put new documents for the bulk consumer.
Notice that when an item is added in the queue, its size is stored alongside the item,
so the class does not have to call
asizeof twice -- as this adds a bit of CPU overhead.
A simplified version of the producer:
MAX_QUEUE_SIZE = 100 * 1024 * 1024 queue = MemQueue(maxmemsize=MAX_QUEUE_SIZE) # the queue can hold 100MB async def producer(queue): for doc in some_source: queue.put(make_it_an_operation(doc)) queue.put('END')
put will block if
queue has reached 100MB. The function will get unblocked
once the consumer grabbed enough data so the queue is down to 100MB or
if it waits more than 120 seconds and then fail.
A simplified version of the consumer:
MAX_OPS = 500 # 500 docs per call at the max MAX_REQUEST_SIZE = 5 * 1024 * 1024 # 5MB async def consumer(queue): batch =  self.bulk_time = 0 self.bulking = True batch_size = 0 while True: op_size, operation = await queue.get() if operation == "END": break batch.extend(operation) batch_size += op_size if len(batch) >= MAX_OPS or batch_size > MAX_REQUEST_SIZE: await batch_bulk(batch) batch.clear() batch_size = 0 await asyncio.sleep(0) if len(batch) > 0: await batch_bulk(batch)
The consumer will pile up to 500 operations and sends them to Elasticsearch.
batch reaches 5MB before it has a chance to aggregate 500 operations, it
will stop there and send it out.
The loop uses the stored sizes returned by
queue.get so we don't have to
asizeof again -- which reduces the CPU overhead of calculating an object size.
That's it! with these memory guards, I know that the app will not exceed ~250M in memory and will emit bulk requests of 5MB maximum. Python 3.10 is doing a pretty decent job at garbage collecting data that streams through.
MAX_REQUEST_SIZE can be used to make sure the
application stays under control whatever data flows through it.