問(wèn)題描述
我有一個(gè)應(yīng)用程序,它使用 RabbitMQ 作為消息隊(duì)列在兩個(gè)組件之間發(fā)送/接收消息:發(fā)送者和接收者.發(fā)件人以非常快的方式發(fā)送消息.接收方接收到消息,然后執(zhí)行一些非常耗時(shí)的任務(wù)(主要是為非常大的數(shù)據(jù)量寫(xiě)入數(shù)據(jù)庫(kù)).由于接收者需要很長(zhǎng)時(shí)間才能完成任務(wù)然后檢索隊(duì)列中的下一條消息,因此發(fā)送者將繼續(xù)快速填滿隊(duì)列.所以我的問(wèn)題是:這會(huì)導(dǎo)致消息隊(duì)列溢出嗎?
消息消費(fèi)者如下所示:
public void onMessage() throws IOException, InterruptedException {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");QueueingConsumer 消費(fèi)者 = 新 QueueingConsumer(channel);channel.basicConsume(queueName, true, consumer);而(真){QueueingConsumer.Delivery 交付 = consumer.nextDelivery();字符串消息 = new String(delivery.getBody());System.out.println(" [x] 收到'" + 消息 + "'");JSONObject json = new JSONObject(message);字符串 caseID = json.getString("caseID");//跟隨需要很長(zhǎng)時(shí)間dao.saveToDB(caseID);}}
消費(fèi)者收到的每條消息都包含一個(gè) caseID.對(duì)于每個(gè)caseID,都會(huì)將大量的數(shù)據(jù)保存到數(shù)據(jù)庫(kù)中,這需要很長(zhǎng)時(shí)間.目前只為 RabbitMQ 設(shè)置了一個(gè)消費(fèi)者,因?yàn)樯a(chǎn)者/消費(fèi)者使用相同的隊(duì)列來(lái)發(fā)布/訂閱 caseID.那么如何加快消費(fèi)者的吞吐量,讓消費(fèi)者趕上生產(chǎn)者,避免隊(duì)列中的消息溢出呢?是否應(yīng)該在消費(fèi)者部分使用多線程來(lái)加快消費(fèi)速度?或者我應(yīng)該使用多個(gè)消費(fèi)者同時(shí)消費(fèi)傳入的消息?或者有什么異步方式讓消費(fèi)者異步消費(fèi)消息而不等待它完成?歡迎任何建議.
這會(huì)導(dǎo)致消息隊(duì)列溢出嗎?"
是的.RabbitMQ 會(huì)進(jìn)入流控"狀態(tài),以防止隨著隊(duì)列長(zhǎng)度的增加而過(guò)度消耗內(nèi)存.它還將開(kāi)始將消息持久化到磁盤(pán),而不是將它們保存在內(nèi)存中.
<塊引用>"那么如何才能加快消費(fèi)者的吞吐量,讓消費(fèi)者可以趕上生產(chǎn)者,避免消息溢出排隊(duì)"
你有兩個(gè)選擇:
- 添加更多消費(fèi)者.請(qǐng)記住,如果您選擇此選項(xiàng),您的數(shù)據(jù)庫(kù)現(xiàn)在將由多個(gè)并發(fā)進(jìn)程操作.確保數(shù)據(jù)庫(kù)能夠承受額外的壓力.
- 提高消費(fèi)渠道的QOS值.這將從隊(duì)列中提取更多消息并將它們緩沖在消費(fèi)者上.這將增加整體處理時(shí)間;如果緩沖了 5 條消息,則第 5 條消息將花費(fèi)消息 1...5 的處理時(shí)間來(lái)完成.
<塊引用>
我應(yīng)該在消費(fèi)者部分使用多線程來(lái)加速消費(fèi)率?"
除非您有精心設(shè)計(jì)的解決方案,否則不會(huì).向應(yīng)用程序添加并行性將在消費(fèi)者端增加大量開(kāi)銷.您最終可能會(huì)耗盡 ThreadPool 或限制內(nèi)存使用.
在處理 AMQP 時(shí),您確實(shí)需要考慮每個(gè)流程的業(yè)務(wù)需求,以便設(shè)計(jì)出最優(yōu)的解決方案.您收到的消息對(duì)時(shí)間有多敏感?它們是否需要盡快持久化到數(shù)據(jù)庫(kù)中,或者這些數(shù)據(jù)是否立即可用對(duì)您的用戶來(lái)說(shuō)很重要?
如果數(shù)據(jù)不需要立即持久化,您可以修改您的應(yīng)用程序,以便消費(fèi)者只需從隊(duì)列中刪除消息并將它們保存到緩存集合中,例如在 Redis 中.引入第二個(gè)進(jìn)程,然后依次讀取和處理緩存的消息.這將確保您的隊(duì)列長(zhǎng)度不會(huì)增長(zhǎng)到足以導(dǎo)致流控制,同時(shí)防止您的數(shù)據(jù)庫(kù)被寫(xiě)入請(qǐng)求轟炸,這些寫(xiě)入請(qǐng)求通常比讀取請(qǐng)求更昂貴.您的消費(fèi)者現(xiàn)在只需從隊(duì)列中刪除消息,稍后由另一個(gè)進(jìn)程處理.
I have an application that uses RabbitMQ as the message queue to send/receive message between two components: sender and receiver. The sender sends message in a very fast way. The receiver receives the message and then does some very time-consuming task (mainly database writing for very large data size). Since the receiver takes a very long time to finish the task and then retrieve the next message in the queue, the sender will keep filling up the queue quickly. So my question is: Will this cause the message queue to overflow?
The message consumer looks like the following:
public void onMessage() throws IOException, InterruptedException {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
JSONObject json = new JSONObject(message);
String caseID = json.getString("caseID");
//following takes very long time
dao.saveToDB(caseID);
}
}
Each message received by the consumer contains a caseID. For each caseID, it will save large amount of data to the database, which takes very long time. Currently only one consumer is set up for the RabbitMQ since producer/consumer use the same queue for the publish/subscribe of caseID. So how can I speed up the consumer throughput so that the consumer can catch up with the producer and avoid the message overflow in the queue? Should I use multithreading in the consumer part to speed up the consumption rate? Or should I use multiple consumers to consume the incoming message simutaneously? Or is there any asynchronous way to let the consumer consume the message asynchronously without waiting it to finish? Any suggestions are welcome.
"Will this cause the message queue to overflow?"
Yes. RabbitMQ will enter a state of "flow control" to prevent excessive memory consumption as the queue length increases. It will also start persisting messages to disk, rather than hold them in memory.
"So how can I speed up the consumer throughput so that the consumer can catch up with the producer and avoid the message overflow in the queue"
You have 2 options:
- Add more consumers. Bear in mind that your DB will now be manipulated by multiple concurrent processes if you choose this option. Ensure that the DB can withstand the extra pressure.
- Increase the QOS value of the consuming channel. This will pull more messages from the queue and buffer them on the consumer. This will increase the overall processing time; if 5 messages are buffered, the 5th message will take the processing time of messages 1...5 to complete.
"Should I use multithreading in the consumer part to speed up the consumption rate?"
Not unless you have a well-designed solution. Adding parallelism to an application is going to add a lot of overhead on the consumer-side. You may end up exhausting the ThreadPool or throttling memory-usage.
When dealing with AMQP, you really need to consider the business requirement for each process in order to design the optimal solution. How time-sensitive are your incoming messages? Do they need to be persisted to DB ASAP, or does it matter to your users whether or not that data is available immediately?
If the data does not need to be persisted immediately, you could modify your application so that the consumer(s) simply remove messages from the queue and save them to a cached collection, in Redis, for example. Introduce a second process which then reads and processes the cached messages sequentially. This will ensure that your queue-length does not grow sufficiently to result in flow-control, while preventing your DB from being bombarded with write requests, which are typically more expensive than read requests. Your consumer(s) now simply remove messages from the queue, to be dealt with by another process later.
這篇關(guān)于RabbitMQ:快速生產(chǎn)者和慢消費(fèi)者的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!