What I find really interesting is that their decision making process. At least the way the article is written suggests they thought of five possible solutions. However, they decided to go all-in on a single solution without actually doing any significant investigation or actually trying the others.
Option 1: Redis broker. Did they ever setup a test environment with a Redis broker to see how it compares to a RabbitMQ broker? Seems like they didn’t.
Option 2: Same as option 1. Did they actually try to setup a kafka broker for celery to test it out and see how it goes?
Option 3: Multiple brokers. They could have attempted a very simple setup starting with just two RabbitMQs and dividing the tasks roughly in half between them. This would allow scaling of RabbitMQ horizontally. At least try it and see.
Option 4: Upgrade versions. Did they even try setting up an environment with upgraded versions. They say there’s no guarantee it fixes their observed bugs. How about trying it out and doing some more observation?
Option 5: We decided to go all-in on this one without even trying the other 4 and doing any sort of comparison with actual tests or benchmarks. Maybe this was the best solution for them, and they chose wisely, but how do they know?
The big thing that gets me is they talk about the observed issue of celery workers that stop processing tasks as well as the limited observability of celery workers and RabbitMQ. These aren’t black boxes. They are open source. You can debug them yourselves, report the bugs, fix the bugs and submit patches, add features, fork it yourself if necessary, etc. The fact that they don’t know if new versions will fix their observed bugs makes it clear they never identified those bugs. What’s the point of running on an open source stack if you’re going to treat things like a black box anyway?
Quite a few of these issues too strike me as Celery problems rather than RabbitMQ. I’ve run into many of these similar issues and in every case it was due to Celery’s implementation not using RabbitMQ properly and was fixed with an internal patch to Celery.
The most blatant example is the countdown tasks. Celery has a very strange implementation of these (meant to be broker agnostic) where it consumes the task from queue, sees in the task custom headers (which is meaningless to RabbitMQ) that it should be delayed and then sits on the task and takes a new task. That results in heavy memory load on your celery client holding all these tasks in memory, and if you have acks_late set, RabbitMQ will be sitting on many tasks that are claimed by a client but not acked and _also_ have to sit in memory. But that is 100% a celery problem, not Rabbit, and we solved it by overriding countdowns to use DLX queues instead so that we could use Rabbit-native features. Not surprisingly, Rabbit performs a lot better when you’re using native built-in features.
> The fact that they don’t know if new versions will fix their observed bugs makes it clear they never identified those bugs. What’s the point of running on an open source stack if you’re going to treat things like a black box anyway?
This will probably not be a popular opinion here but the business of DoorDash is not to fix open source software.
I get your point. I use a lot of OSS in my business and we do contribute back. But not every business has the opportunity to observe the stack for days or weeks while the real clients suffer due to outages. Each client who is affected by an outage is most likely churning immediately and you're not going to win them back easily.
In essence, the migration to Kafka will only benefit them long term and they have ultimately made the right business choice.
A lot of people at DoorDash are from Uber. Uber is one of the largest installations of Kafka in the world, so a lot of key processes there are based of Kafka. There was probably inhouse expertise from ex-Uber engineers that made it easier to transition.
Redis is good only for a limited set of use cases.
1. Cache it's amazing... unless you need keys something more complex than key value, e.g. multiple keys, invalidate by pattern etc... So ... not so good actually.
2. Pubsub doesn't sale really good, and that'a a real limitation
3. The new pubsub is purely a copy of Kafka ( they admit that in docs) but in a much less supported, and feature rich version.
So despite I'm still using Redis in my Day today, as of 2020 I don't see redis in any new setup.
limited observability of celery workers and RabbitMQ
RabbitMQ has very good observability. It has built-in web dashboards as well as an API. Every day that I had to bang my head against the opacity of an org-wide Kafka or ActiveMQ setup at one job, I missed the simplicity and transparency of RabbitMQ from a previous job. Somehow Kafka earned the "trendy" badge though, so everyone just uses it by default.
One of the nice things of Celery is how it abstracts away the queues.
This is unfortunate if you want to see the queues from up close.
I don't have the full picture, but I'd move to multiple brokers first, which seems like a low effort move, then away from Celery, in order to split worker code from the rest of the application. In the meantime, I'd instrument the hell out of Celery to see what was going on and push those changes back into upstream.
Pardon my ignorance here, I have not worked with these kinds of large scale systems.
But the first stack I would reach for to handle a queue would be erlang/otp (or maybe elixir), is that not designed to handle large queues without failure?
Again I am absolutely no expert but I'd like to hear the views on this.
> The Kafka-consumer process is responsible for fetching messages from Kafka, and placing them on a local queue that is read by the task-execution processes
Ah, yep, as usual you end up with a queue for your queue because Kafka isn't actually a queue in the first place. This is why I recommend Kafka for _data_, rabbit for _tasks_. The article doesn't really explain how this works, but I'm guessing that one service holds commands in memory, then sends http messages to the workers. As they noted, you end up with lost tasks on restart, and that's the big gaping hole that a lot of people would not want to fall into - your mileage may vary. I assume the kafka reader service has to also track how busy each worker is, also lost on restart.
If you have some mapping from data to units of work, can't you just use callbacks to make sure the data isn't acked until the units of work complete successfully? I use kafka at work and so far haven't really run into this as an issue.
In the bit you quoted, I'd assume the local work queue has callback objects.
I enjoyed the read, but it glossed over some really important details:
> Failovers took more than 20 minutes to complete and would often get stuck requiring manual intervention. Messages were often lost in the process as well.
Data loss while RabbitMQ is down is certainly a problem.
> The Kafka-consumer process is responsible for fetching messages from Kafka, and placing them on a local queue that is read by the task-execution processes.
This is a bit confusing, so now instead of a centralized outage, grinding everything to a halt, the risk is distributed data loss as local queues go down? As long as they stay down, it should be limited data loss to the size of the queues times the number of consumers, but if an instance is flagging it could eat through a whole lot of messages before someone notices the problem. I guess it’s really dependent on the types of messages being processed and if they are idempotent enough to be replayed without consequence.
Yes, this makes little to me. It seems that a slow task-executor can still cause the local queue to fill up. Each task-executor should have its own queue.
It's a great writeup. But I'm personally curious about them hitting the limits of vertical scalability. They said they used the biggest node 'available to them' without clarifying how big that was.
If the issue was RabbitMQ hitting resource limits then upgrading its hardware (or swapping it out for some other similarly featureful broker but which is faster) might have fixed some of the problems too, albeit not things like insufficient observability.
I also wonder to what extent RabbitMQ suffers from being written in Erlang. Switching to a featureful broker with sharding and replication that isn't Kafka e.g. Artemis might have allowed them to avoid rewriting code to not use helpful features.
This performance blog suggests RabbitMQ may top out at ~4000 messages/sec when making things durable, which isn't especially good performance:
There are ways to scale rabbitmq past 1 node without giving up too much throughput.
- use multiple queues as each queue is single threaded
- use sharding plugins which present a single queue externally but use multiple queues across a cluster internally
- use lazy queues to get messages straight to disk, lower throughput but higher message count
It doesn’t really sound like the other options were seriously considered and it was more about moving away from celery. That’s fine - just be honest about it.
Per your tweet, yes, running without publish confirmations is a major unpleasant surprise that folks eventually run into.
However, that's less Celery's fault and more RabbitMQ's: pub confirms are a Rabbit-specific extension to their protocol, and are left off in most example implementations. MongoDB, also, chooses horrible default settings; however, I wouldn't blame a Mongo-backed product entirely for being hackable.
Additionally (I mentioned this in a sibling thread), Kafka's publish behavior is even worse. The vast majority of Kafka publishers' default settings don't even send the data to the broker without waiting for a response (like RabbitMQ without publish confirms); they send the data to a local buffer which is asynchronously flushed periodically/volumetrically. If your process crashes, you lose the buffer. Unpleasantly, if you force a buffer flush on every publish, Kafka's ingest rate goes from 10-100x RabbitMQ's to 0.01-0.1x RabbitMQ's.
I know this isn't a stylish opinion, but I've been using beanstalkd (https://beanstalkd.github.io/) in production for years now and never had a problem. Once in a while the server it's hosted on will have troubles, but the process itself is bulletproof and handles thousands of jobs/s without breaking a sweat. It has compacting binlogs, so for mission critical jobs that cannot be recreated you can save them to disk. For jobs that can be re-created, you can run an in-memory instance that can easily handle more jobs than most people here will ever need.
If you want redundancy or write-scaling, you can run 2+ of them side by side, round-robin your jobs to the available instances, and have your workers read from them (RR as well).
Beanstalkd is a lifesaver for job queues. It's the first thing I reach for when I need a queue. I'm excited to see how Redis' Disque turns out, but until then I'm happy with bean.
I've evaluated replacing bean with something like Kafka, but it just doesn't make any sense at all. A distributed log is NOT a queue, and even using something like RabbitMQ gets tricky if you want to pull jobs instead of push them. For dedicated worker queues at small-to-medium scale, I cannot recommend beanstalkd enough. No, it doesn't have native sharding or multi-region replication, but like I said, I run millions of jobs/hour through it and never needed any of that crap.
Beanstalkd is probably the only beautiful queue I instantly fell in love with. It shares it's DNS with memcached, and makes life so much simple! I know people have the tendency to write complex abstractions and questionable choices around simple things. But sometimes simplicity and capability to reason well is the answer to everything.
We have a dedicated RabbitMQ engineering team here at Erlang Solutions. If you need assistance with scaling RabbitMQ in distributed environments, check out one of our videos https://youtu.be/MFH-GDYdxwQ or reach out, our experts would be happy to assist.
honestly, a single rabbitmq node on a really big machine can handle a ton of messages without problems.
now that rabbitmq clustering is actually working (~7y ago it was basically impossible to setup clustering without having huge headaches), you can easily setup 3~5 machines in cluster and never have a single issue.
> Celery allows users to schedule tasks in the future with a countdown or ETA. Our heavy use of these countdowns resulted in noticeable load increases on the broker. Some of our outages were directly related to an increase in tasks with countdowns. Celery allows users to schedule tasks in the future with a countdown or ETA. Our heavy use of these countdowns resulted in noticeable load increases on the broker. Some of our outages were directly related to an increase in tasks with countdowns. We ultimately decided to restrict the use of countdowns in favor of another system we had in place
How this problem of scheduled /delayed tasks was solved after moving to Kafka? The another system they mentioned, is something else entirely than the solution proposed in the post?
This is fascinating. Doordash seems to have encountered many of the same problems we (Klaviyo) did using a RabbitMQ/Celery stack, but arrived at totally different answers. Klaviyo doubled down on making the tools work for us, rather than jumping to a new and untested (for us) broker/job framework technology and arrived at some pretty nice solutions:
Problems with observability? We added lots of custom StatsD and text logging instrumentation (Celery "signal" middleware), so that we could get e.g. accurate "how long do tasks like this spend waiting in the queue?" answers. Other than the inherent limitation of RabbitMQ being a strict queue (unlike Kafka, you can't "peek" at things in the middle of a topic without consuming that topic--we could do something hyper-complicated with exchanges and deliberate duplication of messages to address this limitation, but that doesn't sound worth it to me at all), observability of the brokers themselves seems pretty good. Coupled with Sentry reporting issues that occur during task processing, and some custom decorators to retry/delay/store tasks affected by common classes of systems issues, our visibility tends to be better than I've seen in any other asynchronous queue/message-bus system I've worked on.
Sneaky task disappearances turned out to be mostly bugs in the way we were starting/stopping workers, related to signal handling and rabbitmq "redelivery". By really understanding the Celery worker start/stop lifecycle and coupling that with how Systemd chooses to kill processes, we were able to reduce those to zero. Celery also had a couple of "bugs" (questionable behavior choices) in this area which were resolved in 4.4.
Celery ETA/countown task induced RabbitMQ load turned out to be because Celery made the questionable decision to queue ETA tasks on every single (eventual executor destination) worker node. We customized the celery task-dispatching code to route all ETA tasks to a set of workers which only buffer tasks, and re-deliver them to the actual work queues when their time is up. As a result, the domain of cases in which RabbitMQ had to "take back" (unack -> ready) large amounts of tasks went from "every time every worker restarts (and we deploy to them a lot!)" to "every time a very specific, single-purpose worker crashes", which reduced issues with that system to zero.
A lack of scale-out in RabbitMQ was addressed by adding additional separate brokers (and therefore celery "apps") along two axes: sometimes we peel off specific task workloads to their own broker, and in other cases we run "fleets" of identical brokers that tasks round-robin across, with a (currently manual, moving in the direction of automatic) circuit breaker to take a broker out of rotation if it ever has issues. We wrote a whole blog post[1] about scaling that specific set of RabbitMQs and workers. Totally agree with Doordash that rabbit's "HA" generally isn't, and that scale-out needs to happen across brokers.
Connection churn-related broker instability was addressed partially by scaling the number of brokers, but also on the consumer side, by carefully tuning per-worker concurrency to minimize connection counts while doing as much work as possible on a given piece of hardware, and by disabling the Celery remote control channel (pidbox queues). While that means that nice tools like e.g. Flower aren't as useful to us, it also means that the cost to a RabbitMQ broker of losing a whole bunch of consumer connections is much lower. When it comes to the "harakiri" churn of publisher connections discussed in the article, we haven't encountered connection issues due to our publisher tier. Doordash's web tier is almost certainly bigger than ours, but I'd make a deeply uneducated guess that we're at most an order of magnitude apart. I'd be curious to learn more about the story there, since, even at a reduced size, we regularly run 10ks of connections on a single broker with a pretty high flap-rate due to e.g. recycling webserver processes or restarting consumers due to code releases.
In general, I agree with the article and yesterday's Celery 5.0 release post comments, that Celery is a quirky piece of tech, and that RabbitMQ is far from simple to run. However, I'm generally pretty pleased with Klaviyo's approach to go "through" the problem by diving deep on issues we had and fixing them in the stack we chose, rather than tossing large parts of it and re-learning the foibles of some other piece of software. At present, we run dozens of brokers and process 100ks of tasks per second at peak volume. While nobody considers our setup simple or issue-free, it's one of the most fully understood pieces of technology we run.
While it's not out of the question for us to adopt it at some point in the future, Kafka was discouraging to us when hardening our RabbitMQ/Celery setup for a few reasons (though we do use it for some other pieces of our infrastructure which require it):
As the Doordash folks indicated in the article, Kafka is really not well-integrated with the Celery stack at all, so building in things like front-vs-back-of-queue retries (both of which are extremely useful in different situations), deferred delivery, and the ability to rapidly change the number of consumers on a topic all take effort. Each of those problems has a solution, or at least a response, in the Kafka ecosystem, but Python task-processing frameworks which integrate those behaviors are both unfamiliar to us, and significantly younger than Celery.
As with any clustered (rather than sharded) system, we lack expertise in understanding why publishes fail when Kafka is in a partially degraded state. With our existing Kafka workloads, many failure waves (consume or publish) happen without a full grasp of what's wrong/how to fix it. That's most definitely an "us" problem, and we are learning, but it's likely going to be quite awhile before we're at the comfort level that we currently have with, say, recovering message data from a data volume in the aftermath of a massive AWS-induced broker crash, or replacing RabbitMQ nodes that are experiencing elevated latency or flow control.
Lastly, we were unpleasantly surprised by Kafka publishers habit of lying to their clients and saying a message was published when it was in fact buffered in-memory, pending a periodic (or volume-initiated) flush operation. Our processes crash a lot, usually when we least want them to, and having those crashes cause the data loss of an entire pre-publish Kafka buffer has been extremely unpleasant for us. When we reduced "batch.size" to 1, we discovered, to our dismay, that Kafka's vaunted "way better than RabbitMQ" publish time and volume numbers were entirely dependent on batch-wise optimizations, and that publishes were tens of times slower with batch.size=1 Kafka than they were with pub-confirms-enabled RabbitMQ (RabbitMQ also has batch-wise optimizations with publish confirms, which I'd argue have vastly better reliability semantics than Kafka, but that's another story and we're not using those...yet; ask me if interested). Again, that's partly an "us" problem (our crash rate is high, and dropped Kafka-destined batches could be recovered in other ways if we spent the time), but one that we don't have to worry about in the Celery/RabbitMQ setup.
Edits: a few for clarity and removed a few things that the Doordash folks already covered and that my initial less-than-careful read didn't catch. The substance of my post didn't change.
Wow, great comment. I ran into similar issues that DoorDash had with Celery/RabbitMQ a while back (~2013-2014) so I'm surprised the plague still continues. It's for that reason that I pushed for going with SQS after that point.
The way I saw it, a message queue is a really fundamental piece of reliable distributed systems. It's fundamentally stateful and I would like it to never go down and certainly never result in dropped messages/tasks. Like with an RDBMS, if I can pay someone else to solve that problem for a reasonable fee, I will rush to give them my money. The next time I built a queued task management system, I used SQS. In the past 7 years I've used it, it's generally worked like a dream.
I can understand if for performance, cost optimization, transparency or portability reasons you still want to run your own RabbitMQ cluster. But my default bias is against that. So with that said, I'd like to ask a follow up question to this excellent comment: did you consider a managed queue like SQS? If so, why did you elect not to go with it? SQS even has a celery broker now (although certainly not as cleanly integrated as Redis or RabbitMQ).
> Kafka publishers habit of lying to their clients
I have no Kafka expertise but it feels like there ought to be a Kafka configuration option somewhere that tells it not to do that? (And in fact changing the batch size to 1 won't help if it continues ack'ing messages without syncing to disk)
Your rabbitmq connection churn should be at zero pretty much all the time. If you're seeing connection churn increase with volume you need to fix your client code.
I wonder how they handle retries with backoff? It's something that Redis backend is pretty good at with sorted maps but kinda hard to implement in Kafka.
I understand DoorDash/Uber to have Kafka in-house experience making this migration a no-brainer, but another option is to use RabbitMQ's AMQP protocol option and migrate to another AMQP-supporting product/project such as Apache's. Advantage of standardizing on protocols rather than products/projects and all. I wonder if anybody has done that.
AFAIK there's no RabbitMQ-compatible broker, unless you use AMQP 1.0 in RabbitMQ right now. Their AMQP 0.9 implementation has a lot of RMQ-specific stuff and there's no equivalent.
[+] [-] Apreche|5 years ago|reply
Option 1: Redis broker. Did they ever setup a test environment with a Redis broker to see how it compares to a RabbitMQ broker? Seems like they didn’t.
Option 2: Same as option 1. Did they actually try to setup a kafka broker for celery to test it out and see how it goes?
Option 3: Multiple brokers. They could have attempted a very simple setup starting with just two RabbitMQs and dividing the tasks roughly in half between them. This would allow scaling of RabbitMQ horizontally. At least try it and see.
Option 4: Upgrade versions. Did they even try setting up an environment with upgraded versions. They say there’s no guarantee it fixes their observed bugs. How about trying it out and doing some more observation?
Option 5: We decided to go all-in on this one without even trying the other 4 and doing any sort of comparison with actual tests or benchmarks. Maybe this was the best solution for them, and they chose wisely, but how do they know?
The big thing that gets me is they talk about the observed issue of celery workers that stop processing tasks as well as the limited observability of celery workers and RabbitMQ. These aren’t black boxes. They are open source. You can debug them yourselves, report the bugs, fix the bugs and submit patches, add features, fork it yourself if necessary, etc. The fact that they don’t know if new versions will fix their observed bugs makes it clear they never identified those bugs. What’s the point of running on an open source stack if you’re going to treat things like a black box anyway?
[+] [-] jdotjdot|5 years ago|reply
The most blatant example is the countdown tasks. Celery has a very strange implementation of these (meant to be broker agnostic) where it consumes the task from queue, sees in the task custom headers (which is meaningless to RabbitMQ) that it should be delayed and then sits on the task and takes a new task. That results in heavy memory load on your celery client holding all these tasks in memory, and if you have acks_late set, RabbitMQ will be sitting on many tasks that are claimed by a client but not acked and _also_ have to sit in memory. But that is 100% a celery problem, not Rabbit, and we solved it by overriding countdowns to use DLX queues instead so that we could use Rabbit-native features. Not surprisingly, Rabbit performs a lot better when you’re using native built-in features.
[+] [-] rad_gruchalski|5 years ago|reply
This will probably not be a popular opinion here but the business of DoorDash is not to fix open source software.
I get your point. I use a lot of OSS in my business and we do contribute back. But not every business has the opportunity to observe the stack for days or weeks while the real clients suffer due to outages. Each client who is affected by an outage is most likely churning immediately and you're not going to win them back easily.
In essence, the migration to Kafka will only benefit them long term and they have ultimately made the right business choice.
[+] [-] remote_phone|5 years ago|reply
[+] [-] maxdo|5 years ago|reply
1. Cache it's amazing... unless you need keys something more complex than key value, e.g. multiple keys, invalidate by pattern etc... So ... not so good actually.
2. Pubsub doesn't sale really good, and that'a a real limitation
3. The new pubsub is purely a copy of Kafka ( they admit that in docs) but in a much less supported, and feature rich version.
So despite I'm still using Redis in my Day today, as of 2020 I don't see redis in any new setup.
[+] [-] nitrogen|5 years ago|reply
RabbitMQ has very good observability. It has built-in web dashboards as well as an API. Every day that I had to bang my head against the opacity of an org-wide Kafka or ActiveMQ setup at one job, I missed the simplicity and transparency of RabbitMQ from a previous job. Somehow Kafka earned the "trendy" badge though, so everyone just uses it by default.
[+] [-] rbanffy|5 years ago|reply
This is unfortunate if you want to see the queues from up close.
I don't have the full picture, but I'd move to multiple brokers first, which seems like a low effort move, then away from Celery, in order to split worker code from the rest of the application. In the meantime, I'd instrument the hell out of Celery to see what was going on and push those changes back into upstream.
[+] [-] ww520|5 years ago|reply
[+] [-] tyfon|5 years ago|reply
But the first stack I would reach for to handle a queue would be erlang/otp (or maybe elixir), is that not designed to handle large queues without failure?
Again I am absolutely no expert but I'd like to hear the views on this.
[+] [-] quazar987|5 years ago|reply
In absence of any significant stats its safe to assume their decision were driver by this two statements from article.
"There were no in-house Celery or RabbitMQ experts at DoorDash who we could lean on to help devise a scaling strategy for this technology."
"DoorDash had in-house Kafka expertise"
[+] [-] kerblang|5 years ago|reply
Ah, yep, as usual you end up with a queue for your queue because Kafka isn't actually a queue in the first place. This is why I recommend Kafka for _data_, rabbit for _tasks_. The article doesn't really explain how this works, but I'm guessing that one service holds commands in memory, then sends http messages to the workers. As they noted, you end up with lost tasks on restart, and that's the big gaping hole that a lot of people would not want to fall into - your mileage may vary. I assume the kafka reader service has to also track how busy each worker is, also lost on restart.
[+] [-] thebean11|5 years ago|reply
In the bit you quoted, I'd assume the local work queue has callback objects.
[+] [-] move-on-by|5 years ago|reply
> Failovers took more than 20 minutes to complete and would often get stuck requiring manual intervention. Messages were often lost in the process as well.
Data loss while RabbitMQ is down is certainly a problem.
> The Kafka-consumer process is responsible for fetching messages from Kafka, and placing them on a local queue that is read by the task-execution processes.
This is a bit confusing, so now instead of a centralized outage, grinding everything to a halt, the risk is distributed data loss as local queues go down? As long as they stay down, it should be limited data loss to the size of the queues times the number of consumers, but if an instance is flagging it could eat through a whole lot of messages before someone notices the problem. I guess it’s really dependent on the types of messages being processed and if they are idempotent enough to be replayed without consequence.
[+] [-] secondcoming|5 years ago|reply
[+] [-] alexfromapex|5 years ago|reply
[+] [-] Fizzadar|5 years ago|reply
> This problem was never root caused, though we suspect an issue in the Celery workers themselves and not RabbitMQ.
So the problem wasn't Rabbit itself, but the usage/lack of understanding.
[+] [-] thu2111|5 years ago|reply
It's a great writeup. But I'm personally curious about them hitting the limits of vertical scalability. They said they used the biggest node 'available to them' without clarifying how big that was.
If the issue was RabbitMQ hitting resource limits then upgrading its hardware (or swapping it out for some other similarly featureful broker but which is faster) might have fixed some of the problems too, albeit not things like insufficient observability.
I also wonder to what extent RabbitMQ suffers from being written in Erlang. Switching to a featureful broker with sharding and replication that isn't Kafka e.g. Artemis might have allowed them to avoid rewriting code to not use helpful features.
This performance blog suggests RabbitMQ may top out at ~4000 messages/sec when making things durable, which isn't especially good performance:
https://softwaremill.com/mqperf/
[+] [-] jsmeaton|5 years ago|reply
I actually tweeted out some similar issues a week ago: https://twitter.com/jarshwah/status/1310820638655877120
There are ways to scale rabbitmq past 1 node without giving up too much throughput.
- use multiple queues as each queue is single threaded
- use sharding plugins which present a single queue externally but use multiple queues across a cluster internally
- use lazy queues to get messages straight to disk, lower throughput but higher message count
It doesn’t really sound like the other options were seriously considered and it was more about moving away from celery. That’s fine - just be honest about it.
[+] [-] zbentley|5 years ago|reply
However, that's less Celery's fault and more RabbitMQ's: pub confirms are a Rabbit-specific extension to their protocol, and are left off in most example implementations. MongoDB, also, chooses horrible default settings; however, I wouldn't blame a Mongo-backed product entirely for being hackable.
Additionally (I mentioned this in a sibling thread), Kafka's publish behavior is even worse. The vast majority of Kafka publishers' default settings don't even send the data to the broker without waiting for a response (like RabbitMQ without publish confirms); they send the data to a local buffer which is asynchronously flushed periodically/volumetrically. If your process crashes, you lose the buffer. Unpleasantly, if you force a buffer flush on every publish, Kafka's ingest rate goes from 10-100x RabbitMQ's to 0.01-0.1x RabbitMQ's.
[+] [-] jojo2000|5 years ago|reply
We also evaluated kafka vs. rabbitmq, and chose rabbitmq.
For people hitting performance degradation with rabbitmq this is a good reference [0]
RabbitMQ should be used as-is without a middleman as much as possible, to make the best possible use of all of its concepts.
[0] https://www.cloudamqp.com/blog/2018-01-19-part4-rabbitmq-13-...
[+] [-] orthecreedence|5 years ago|reply
If you want redundancy or write-scaling, you can run 2+ of them side by side, round-robin your jobs to the available instances, and have your workers read from them (RR as well).
Beanstalkd is a lifesaver for job queues. It's the first thing I reach for when I need a queue. I'm excited to see how Redis' Disque turns out, but until then I'm happy with bean.
I've evaluated replacing bean with something like Kafka, but it just doesn't make any sense at all. A distributed log is NOT a queue, and even using something like RabbitMQ gets tricky if you want to pull jobs instead of push them. For dedicated worker queues at small-to-medium scale, I cannot recommend beanstalkd enough. No, it doesn't have native sharding or multi-region replication, but like I said, I run millions of jobs/hour through it and never needed any of that crap.
[+] [-] maxpert|5 years ago|reply
[+] [-] ErlangSolutions|5 years ago|reply
[+] [-] michaelgiba|5 years ago|reply
[+] [-] fernandotakai|5 years ago|reply
now that rabbitmq clustering is actually working (~7y ago it was basically impossible to setup clustering without having huge headaches), you can easily setup 3~5 machines in cluster and never have a single issue.
[+] [-] avinassh|5 years ago|reply
How this problem of scheduled /delayed tasks was solved after moving to Kafka? The another system they mentioned, is something else entirely than the solution proposed in the post?
[+] [-] zbentley|5 years ago|reply
Problems with observability? We added lots of custom StatsD and text logging instrumentation (Celery "signal" middleware), so that we could get e.g. accurate "how long do tasks like this spend waiting in the queue?" answers. Other than the inherent limitation of RabbitMQ being a strict queue (unlike Kafka, you can't "peek" at things in the middle of a topic without consuming that topic--we could do something hyper-complicated with exchanges and deliberate duplication of messages to address this limitation, but that doesn't sound worth it to me at all), observability of the brokers themselves seems pretty good. Coupled with Sentry reporting issues that occur during task processing, and some custom decorators to retry/delay/store tasks affected by common classes of systems issues, our visibility tends to be better than I've seen in any other asynchronous queue/message-bus system I've worked on.
Sneaky task disappearances turned out to be mostly bugs in the way we were starting/stopping workers, related to signal handling and rabbitmq "redelivery". By really understanding the Celery worker start/stop lifecycle and coupling that with how Systemd chooses to kill processes, we were able to reduce those to zero. Celery also had a couple of "bugs" (questionable behavior choices) in this area which were resolved in 4.4.
Celery ETA/countown task induced RabbitMQ load turned out to be because Celery made the questionable decision to queue ETA tasks on every single (eventual executor destination) worker node. We customized the celery task-dispatching code to route all ETA tasks to a set of workers which only buffer tasks, and re-deliver them to the actual work queues when their time is up. As a result, the domain of cases in which RabbitMQ had to "take back" (unack -> ready) large amounts of tasks went from "every time every worker restarts (and we deploy to them a lot!)" to "every time a very specific, single-purpose worker crashes", which reduced issues with that system to zero.
A lack of scale-out in RabbitMQ was addressed by adding additional separate brokers (and therefore celery "apps") along two axes: sometimes we peel off specific task workloads to their own broker, and in other cases we run "fleets" of identical brokers that tasks round-robin across, with a (currently manual, moving in the direction of automatic) circuit breaker to take a broker out of rotation if it ever has issues. We wrote a whole blog post[1] about scaling that specific set of RabbitMQs and workers. Totally agree with Doordash that rabbit's "HA" generally isn't, and that scale-out needs to happen across brokers.
Connection churn-related broker instability was addressed partially by scaling the number of brokers, but also on the consumer side, by carefully tuning per-worker concurrency to minimize connection counts while doing as much work as possible on a given piece of hardware, and by disabling the Celery remote control channel (pidbox queues). While that means that nice tools like e.g. Flower aren't as useful to us, it also means that the cost to a RabbitMQ broker of losing a whole bunch of consumer connections is much lower. When it comes to the "harakiri" churn of publisher connections discussed in the article, we haven't encountered connection issues due to our publisher tier. Doordash's web tier is almost certainly bigger than ours, but I'd make a deeply uneducated guess that we're at most an order of magnitude apart. I'd be curious to learn more about the story there, since, even at a reduced size, we regularly run 10ks of connections on a single broker with a pretty high flap-rate due to e.g. recycling webserver processes or restarting consumers due to code releases.
In general, I agree with the article and yesterday's Celery 5.0 release post comments, that Celery is a quirky piece of tech, and that RabbitMQ is far from simple to run. However, I'm generally pretty pleased with Klaviyo's approach to go "through" the problem by diving deep on issues we had and fixing them in the stack we chose, rather than tossing large parts of it and re-learning the foibles of some other piece of software. At present, we run dozens of brokers and process 100ks of tasks per second at peak volume. While nobody considers our setup simple or issue-free, it's one of the most fully understood pieces of technology we run.
While it's not out of the question for us to adopt it at some point in the future, Kafka was discouraging to us when hardening our RabbitMQ/Celery setup for a few reasons (though we do use it for some other pieces of our infrastructure which require it):
As the Doordash folks indicated in the article, Kafka is really not well-integrated with the Celery stack at all, so building in things like front-vs-back-of-queue retries (both of which are extremely useful in different situations), deferred delivery, and the ability to rapidly change the number of consumers on a topic all take effort. Each of those problems has a solution, or at least a response, in the Kafka ecosystem, but Python task-processing frameworks which integrate those behaviors are both unfamiliar to us, and significantly younger than Celery.
As with any clustered (rather than sharded) system, we lack expertise in understanding why publishes fail when Kafka is in a partially degraded state. With our existing Kafka workloads, many failure waves (consume or publish) happen without a full grasp of what's wrong/how to fix it. That's most definitely an "us" problem, and we are learning, but it's likely going to be quite awhile before we're at the comfort level that we currently have with, say, recovering message data from a data volume in the aftermath of a massive AWS-induced broker crash, or replacing RabbitMQ nodes that are experiencing elevated latency or flow control.
Lastly, we were unpleasantly surprised by Kafka publishers habit of lying to their clients and saying a message was published when it was in fact buffered in-memory, pending a periodic (or volume-initiated) flush operation. Our processes crash a lot, usually when we least want them to, and having those crashes cause the data loss of an entire pre-publish Kafka buffer has been extremely unpleasant for us. When we reduced "batch.size" to 1, we discovered, to our dismay, that Kafka's vaunted "way better than RabbitMQ" publish time and volume numbers were entirely dependent on batch-wise optimizations, and that publishes were tens of times slower with batch.size=1 Kafka than they were with pub-confirms-enabled RabbitMQ (RabbitMQ also has batch-wise optimizations with publish confirms, which I'd argue have vastly better reliability semantics than Kafka, but that's another story and we're not using those...yet; ask me if interested). Again, that's partly an "us" problem (our crash rate is high, and dropped Kafka-destined batches could be recovered in other ways if we spent the time), but one that we don't have to worry about in the Celery/RabbitMQ setup.
1. https://klaviyo.tech/load-testing-our-event-pipeline-2019-42...
Edits: a few for clarity and removed a few things that the Doordash folks already covered and that my initial less-than-careful read didn't catch. The substance of my post didn't change.
[+] [-] yowlingcat|5 years ago|reply
The way I saw it, a message queue is a really fundamental piece of reliable distributed systems. It's fundamentally stateful and I would like it to never go down and certainly never result in dropped messages/tasks. Like with an RDBMS, if I can pay someone else to solve that problem for a reasonable fee, I will rush to give them my money. The next time I built a queued task management system, I used SQS. In the past 7 years I've used it, it's generally worked like a dream.
I can understand if for performance, cost optimization, transparency or portability reasons you still want to run your own RabbitMQ cluster. But my default bias is against that. So with that said, I'd like to ask a follow up question to this excellent comment: did you consider a managed queue like SQS? If so, why did you elect not to go with it? SQS even has a celery broker now (although certainly not as cleanly integrated as Redis or RabbitMQ).
[+] [-] soamv|5 years ago|reply
> Kafka publishers habit of lying to their clients
I have no Kafka expertise but it feels like there ought to be a Kafka configuration option somewhere that tells it not to do that? (And in fact changing the batch size to 1 won't help if it continues ack'ing messages without syncing to disk)
[+] [-] throwingtt|5 years ago|reply
[+] [-] bydlocoder|5 years ago|reply
[+] [-] shanemhansen|5 years ago|reply
If you can relax your constraints, then something like NSQ can be a great option that I heartily recommend.
[+] [-] tannhaeuser|5 years ago|reply
[+] [-] Plugawy|5 years ago|reply
[+] [-] 02020202|5 years ago|reply
[+] [-] kyuudou|5 years ago|reply
edit: in this sense - https://en.wiktionary.org/wiki/Kafkaesque