Hacker News new | past | comments | ask | show | jobs | submit login
Thread-Per-Core Buffer Management for a modern storage system (vectorized.io)
118 points by arjunnarayan on Dec 12, 2020 | hide | past | favorite | 36 comments



More threads (i.e. shared state) is a huge mistake if you are trying to maintain a storage subsystem with synchronous access semantics.

I am starting to think you can handle all storage requests for a single logical node on just one core/thread. I have been pushing 5~10 million JSON-serialized entities to disk per second with a single managed thread in .NET Core (using a Samsung 970 Pro for testing). This includes indexing and sequential integer key assignment. This testing will completely saturate the drive (over 1 gigabyte per second steady-state). Just getting an increment of a 64 bit integer over a million times per second across an arbitrary number of threads is a big ask. This is the difference you can see when you double down on single threaded ideology for this type of problem domain.

The technical trick to my success is to run all of the database operations in micro batches (10~1000 microseconds per). I use LMAX Disruptor, so the batches are formed naturally based on throughput conditions. Selecting data structures and algorithms that work well in this type of setup is critical. Append-only is a must with flash and makes orders of magnitude difference in performance. Everything else (b-tree algorithms, etc) follows from this realization.

Put another way, If you find yourself using Task or async/await primitives when trying to talk to something as fast as NVMe flash, you need to rethink your approach. The overhead with multiple threads, task parallel abstractions, et. al. is going to cripple any notion of high throughput in a synchronous storage domain.


Indeed. I think you have different saturation points the wider the use cases you hit. One example w/ a single-core (which btw, agreed whole heartedly for io) is checksumming + decoding.

For kafka, we have multiple indexes - a time index and an offset index which are simple metadata. the trouble becomes on how you handle decompression+checksumming+compression for supporting compacted topics. ( https://github.com/vectorizedio/redpanda/blob/dev/src/v/stor... )

So single core starts to get saturated while doing both fore-ground and background requests.

.....

Now assume that you handle that with correct priorities for IO and CPU scheduling.... the next bottleneck will be keeping up w/ background tasks.

So then you start to add more threads. but as you mentioned and what I tried to highligiht in that article was that the cost of implicit or simple synchronization is very expensive (as noted by you intuition)

The thread-per-core buffer management with defer destructors is really handy at doing 3 things explicitly

1. your cross core communication is explicit - that is you give it shares as part of a quota so that you understand how your system priorities are working across the system for any kind of workload. This is helpful to prioritize foreground and background work.

2. there is effectively a const memory addresses once you parse it - so you treat it is largely immutable and you can add hooks (say crash if modified on a remote core)

3. makes memory accounting fast. i.e.: instead of pushing a global barrier for the allocator you simply send a message back to the originating core for allocator accounting. This becomes hugely important as you start to increase the number of cores.


>>> the trouble becomes on how you handle decompression+checksumming+compression

gzip will cap 1 MB/s with the strongest compression setting and 50 MB/s with the fastest setting, which is really slow.

The first step to improve kafka is for kafka to adopt zstd compression.

Another thing that really hurts is SSL. Desktop CPU with AES instructions can push 1 GB/s so it's not too bad, but that may not the the CPU you have or the default algorithm used by the software.


Kafka has `zstd` encoding.

Here is our version of the streaming decoder i wrote a while ago https://github.com/vectorizedio/redpanda/blob/dev/src/v/comp...

that's our default for our internal RPC as well.

in fact kafka protocol support lz4, zstd, snappy, gzip all of them. and you can change them per batch. compression is good w/ kafka.


lz4 is a good option for really high-performance compression as well. (Zstd is my general recommmendation, and both beat the pants off of gzip, but for very high throughput applications lz4 still beats zstd. Both are designs from Yann Collet.)


indeed. though, the recent zstd changes w/ different levels of compression sort of close the gap in perf that lz4 had over zstd. (if interested in this kind of detail for a new streaming storage engiene, i gave a talk last week at the facebook performance summit - https://twitter.com/perfsummit1/status/1337603028677902336)


They're using shared-nothing between threads so shared state isn't a problem. It sounds like the Redpanda architecture is almost the same as what you're talking about.


Yes I am seeing some similarities in threading model for sure.

That said, there are a lot of other simultaneous considerations at play when we are talking about punching through business entity storage rates that exceed the rated IOPS capacity of the underlying storage medium. My NVMe test drive can only push ~500k write IOPS in the most ideal case, but I am able to write several million logical entities across those operations due to batching/single-writer effects.


So depending on the disks, we found that different IO sizes will yield optimal settings for saturating disks. As an anecdote in clouds, the IOPS is the key principle and often you can drive higher throughput depending on your DMA block size (i.e.: 128KB vs 16KB etc)... obviously a tradeoff on the memory pressure. but you can test them all easily w/ `fio`


I learned the same thing while writing a log structured merge tree. Single threaded writes are a must - not only for performance but also simplicity of implementation.

I'm curious what about your use required implementing your own storage subsystem rather than using an embedded key value store like RocksDB.


Came to a similar conclusion back in the days when writing a raytracer, and it stopped scaling past 8 or so cores.

Ended up with a system where each thread accumulated results in small buffers, appended pointers to those buffers to a shared "buffer list" which was very fast due to low contention using typical spinlock+mutex combo.

The thread that overflowed the buffer list would then become the single writer by taking on the responsibility to accumulate the results to the shared output image. It would start by swapping in a fresh list, so the other threads could carry on.

The system would self-tune by regulating the size of the shared buffer list so that the other threads could keep working while the one "writer thread" accumulated.

Probably had room for improvement, but after this change it scaled almost linearly to a least 32 cores, which was the largest system available for testing at the time.

The reason for not simply allocating a full output image per thread and accumulate post-render was mainly due to the memory requirements for large output images.


RocksDB has two big limitations that preclude its use for many types of high-performance data infrastructure (which it sounds like the OP's use case was). First, its throughput performance is much worse (integer factor) than what can be achieved with a different design for some applications. Second, it isn't designed to work well for very large storage volumes. Again, easy to remedy if you design your own storage engine or use an alternative one. There are storage engines that will happily drive a petabyte of storage across a large array of NVMe devices at the theoretical limits of the hardware, though not so much in open source.

Another thing to consider is that you lose significant performance in a few different dimensions if your storage I/O scheduler design is not tightly coupled to your execution scheduler design. While it requires writing more code it also eliminates a bunch of rough edges. This alone is the reason many database-y applications write their own storage engines. For people that do it for a living, writing an excellent custom storage engine isn't that onerous.

RocksDB is a fine choice for applications where performance and scale are not paramount or your hardware is limited. On large servers with hefty workloads, you'll probably want to use something else.


agreed w/ andrew. rocksdb is pretty heavy. for streaming logs, something much much simpler yields significant performance improvements specially when tied to the IO+CPU priority scheduling.


What would be some of your choices?


This is a hard question because everything is really dependent on your threading model. One has to start w/ the threading model. At the time I wrote the first line of code in Jan 2019, there wasn't anything that was really amenable to the seastar::future<> / task-based scheduler with truly async IO (enforced by a reactor stall if greater than 500 micros).... so we wrote our own from scratch .... in fact we wrote it many times over, the first version attempted to use flatbuffers atop my old project - https://github.com/smfrpc/smf but the linearization of buffers proved too costly for long running processes which led to the fragmented buffer approach in the blog post mentioned.


Extreme performance (namely, low latency under heavy load) is the principal requirement. I have yet to see anything that can touch my approach, especially under mixed read/write workloads. I am able to write (small, <4k) business entities to disk at a rate higher than the drives themselves are able to write blocks. This is not something that is feasible in any multithreaded storage architecture.

A secondary requirement is extreme simplicity and safety. My entire implementation is written in managed code and can be understood by a junior developer in one weekend. There is not a single line of code in support of a database feature that we aren't actually going to use.

The final requirement is zero external cost to employ this code. If I own my database implementation, Oracle cannot bill me.

The nice-to-have is being able to follow a breakpoint all the way from user tapping a button down into the b-tree rotation condition logic in the database engine. It also makes profiling performance issues a trivial affair. I like being able to see the actual code in my database engine that is causing a hotpath. This visibility is where additional innovation is possible over time.


If anybody's interested, there's a Seastar inspired library for Rust that is being developed https://github.com/DataDog/glommio


Disappointed to see that you spent 25% of article space to describe in detail all the ways in which computer hardware got faster, then you promised to show how your project is taking advantage of this, but you are not showing any performance measurements at all. Just a very fancy architecture.

Correct me if I’m wrong, but the only number that I can find is a guarantee that you do not exceed 500 us of latency when handling a request. And it’s not clear if this is a guarantee at all, since you say just that the system will throw a traceback in case of latency spikes.

I would have liked to see the how latency varies under load, how much throughput you can achieve, how the latency long tail looks like on a long-running production load, and comparisons with off-the-shelf systems tuned reasonably.


Hi there! Don't be disappointed :) perf benchmarks coming. The article was about architecture. As a new project we have a lot of content to put out... Stay tuned. Very likely early next month.


Looking forward to it!


Noah here, developer at Vectorized. Happy to answer any questions.


What kind of gaps are there currently between Kafka and RedPanda? Particularly in the "Enterprise" world (e.g. security, etc).


In progress right now are transactions and ACLs. A preliminary form of ACLs with SCRAM will be available later this month. Transactions will come early next year. Those are probably the most visible differences.


That's great! Would it be correct to assume that KSQL works?


Having not tried, I would expect ksql to not work until transaction support lands. That said, perhaps there are some ways to configure it to avoid dependencies on those underlying APIs.


The article called out 500usec as an upper bound for compute. How do you handle heavier compute operations (TlS, encoding / decoding, ...)


on seastar, you yield a lot. so loops go from

    for( ... : collection) {}
to

   return seastar::do_for_each(collection, callback);


I'd be interested in the write amplification since Redpanda went pretty low level in the IO layer. How do you guarantee atomic writes when virtually no disk provides guarantees other than on a page level which could result in destroying already written data if a write to the same page fails - at least in theory - and so one has to resort to writing data multiple times.


The write is a 2 stage function which works quite well for the kafka layer since it's all batch. Let me explain.

1. First, we have a core-local shared chunk cache w/ DMA aligned buffers. https://github.com/vectorizedio/redpanda/blob/dev/src/v/stor...

2. Second, we eagerly dispatch IO blocks w/ manual accounting of which offset on the DMA section it is at https://github.com/vectorizedio/redpanda/blob/dev/src/v/stor...

3. We adaptively fallocate data to prevent metadata contention on the file handle itself.

4. We issue an fdatasync() - since we are interested in the data being safe (data corruption w/ checksums, etc it's too long to type, but i can expand on a blog post if interested)

5. so imagine a big write (for simplicity) say 1MB. This will get broken up into 16-128KB DMA writes. The last step is an fdatasync for acks=-1 (raft needs this)

There are nuances between each line, but hopefully this gives you a hint on how it's done.


Thanks for your explanation. I'll dive into the source to check it in a bit more detail but your reply gave me a good overview to get me started :)


Cool. If you want help hacking in it there is a community slack at vectorized.io/slack or github discussions work too


What is the point of talking performance-by-thread-per-core if raft sits in front of it, ie. only one will do the work at any time anyway?


so redpanda partitions 'raft' groups per kafka partition. so in the `topic/partition` model every partition is it's own raft group (similar to multi raft in cockroachdb). So it is in fact even more important due to the replication cost and therefore the additional work of checksumming, compression, etc.

Last, a coordinator core for the one of the TCP connections from a client will likely make requests to remote cores (say you receive a request on core 44, but the destination is core 66), so having a thread per core with explicit message passing is pretty fundamental.

    ss::future<std::vector<append_entries_reply>>
    dispatch_hbeats_to_core(ss::shard_id shard, hbeats_ptr requests) {
        return with_scheduling_group(
          get_scheduling_group(),
          [this, shard, r = std::move(requests)]() mutable {
              return _group_manager.invoke_on(
                shard,
                get_smp_service_group(),
                [this, r = std::move(r)](ConsensusManager& m) mutable {
                    return dispatch_hbeats_to_groups(m, std::move(r));
                });
          });
    }

Here is some code that shows importance of accounting the x-core comms explicitly


Ok, thanks. Does redpanda do some kind of auto anti-affinity on hosts for partition group to spread across remote cores?

ps. redpanda link from article is broken, goes to https://vectorized.io/blog/tpc-buffers/vectorized.io/redpand... 404


Oh shoot! thank you... fixing the link give me 5 mins.

So currently the partition allocator - https://github.com/vectorizedio/redpanda/blob/dev/src/v/clus... - is primitive.

But we have a working-not-yet-exposed HTTP admin api on the controller that allows for Out Of Band placement.

so the mechanics are there, but not yet integrated w/ the partition allocator.

Thinking that we integrate w/ k8s more deeply next year.

The thinking at least is that at install we generate some machine labels say in /etc/redpanda/labels.json or smth like that and then the partition allocator can take simple constraints.

I worked on a few schedulers for www.concord.io with Fenzo on top of mesos 6 years ago and this worked nicely for both 'affinity', 'soft affinity' and anti-affinity constraints.

Do you have any thoughts on how you'd like this exposed?


@arjunnarayan Have you evaluated the performance against vanilla Kafka / Confluent Cloud? Where can I see the results?




Consider applying for YC's Fall 2025 batch! Applications are open till Aug 4

Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact

Search: