pbootcms网站模板|日韩1区2区|织梦模板||网站源码|日韩1区2区|jquery建站特效-html5模板网

RabbitMQ:快速生產(chǎn)者和慢消費(fèi)者

RabbitMQ: fast producer and slow consumer(RabbitMQ:快速生產(chǎn)者和慢消費(fèi)者)
本文介紹了RabbitMQ:快速生產(chǎn)者和慢消費(fèi)者的處理方法,對(duì)大家解決問(wèn)題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧!

問(wèn)題描述

限時(shí)送ChatGPT賬號(hào)..

我有一個(gè)應(yīng)用程序,它使用 RabbitMQ 作為消息隊(duì)列在兩個(gè)組件之間發(fā)送/接收消息:發(fā)送者和接收者.發(fā)件人以非常快的方式發(fā)送消息.接收方接收到消息,然后執(zhí)行一些非常耗時(shí)的任務(wù)(主要是為非常大的數(shù)據(jù)量寫(xiě)入數(shù)據(jù)庫(kù)).由于接收者需要很長(zhǎng)時(shí)間才能完成任務(wù)然后檢索隊(duì)列中的下一條消息,因此發(fā)送者將繼續(xù)快速填滿隊(duì)列.所以我的問(wèn)題是:這會(huì)導(dǎo)致消息隊(duì)列溢出嗎?

消息消費(fèi)者如下所示:

public void onMessage() throws IOException, InterruptedException {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");QueueingConsumer 消費(fèi)者 = 新 QueueingConsumer(channel);channel.basicConsume(queueName, true, consumer);而(真){QueueingConsumer.Delivery 交付 = consumer.nextDelivery();字符串消息 = new String(delivery.getBody());System.out.println(" [x] 收到'" + 消息 + "'");JSONObject json = new JSONObject(message);字符串 caseID = json.getString("caseID");//跟隨需要很長(zhǎng)時(shí)間dao.saveToDB(caseID);}}

消費(fèi)者收到的每條消息都包含一個(gè) caseID.對(duì)于每個(gè)caseID,都會(huì)將大量的數(shù)據(jù)保存到數(shù)據(jù)庫(kù)中,這需要很長(zhǎng)時(shí)間.目前只為 RabbitMQ 設(shè)置了一個(gè)消費(fèi)者,因?yàn)樯a(chǎn)者/消費(fèi)者使用相同的隊(duì)列來(lái)發(fā)布/訂閱 caseID.那么如何加快消費(fèi)者的吞吐量,讓消費(fèi)者趕上生產(chǎn)者,避免隊(duì)列中的消息溢出呢?是否應(yīng)該在消費(fèi)者部分使用多線程來(lái)加快消費(fèi)速度?或者我應(yīng)該使用多個(gè)消費(fèi)者同時(shí)消費(fèi)傳入的消息?或者有什么異步方式讓消費(fèi)者異步消費(fèi)消息而不等待它完成?歡迎任何建議.

解決方案

這會(huì)導(dǎo)致消息隊(duì)列溢出嗎?"

是的.RabbitMQ 會(huì)進(jìn)入流控"狀態(tài),以防止隨著隊(duì)列長(zhǎng)度的增加而過(guò)度消耗內(nèi)存.它還將開(kāi)始將消息持久化到磁盤(pán),而不是將它們保存在內(nèi)存中.

<塊引用>

"那么如何才能加快消費(fèi)者的吞吐量,讓消費(fèi)者可以趕上生產(chǎn)者,避免消息溢出排隊(duì)"

你有兩個(gè)選擇:

  1. 添加更多消費(fèi)者.請(qǐng)記住,如果您選擇此選項(xiàng),您的數(shù)據(jù)庫(kù)現(xiàn)在將由多個(gè)并發(fā)進(jìn)程操作.確保數(shù)據(jù)庫(kù)能夠承受額外的壓力.
  2. 提高消費(fèi)渠道的QOS值.這將從隊(duì)列中提取更多消息并將它們緩沖在消費(fèi)者上.這將增加整體處理時(shí)間;如果緩沖了 5 條消息,則第 5 條消息將花費(fèi)消息 1...5 的處理時(shí)間來(lái)完成.

<塊引用>

我應(yīng)該在消費(fèi)者部分使用多線程來(lái)加速消費(fèi)率?"

除非您有精心設(shè)計(jì)的解決方案,否則不會(huì).向應(yīng)用程序添加并行性將在消費(fèi)者端增加大量開(kāi)銷.您最終可能會(huì)耗盡 ThreadPool 或限制內(nèi)存使用.

在處理 AMQP 時(shí),您確實(shí)需要考慮每個(gè)流程的業(yè)務(wù)需求,以便設(shè)計(jì)出最優(yōu)的解決方案.您收到的消息對(duì)時(shí)間有多敏感?它們是否需要盡快持久化到數(shù)據(jù)庫(kù)中,或者這些數(shù)據(jù)是否立即可用對(duì)您的用戶來(lái)說(shuō)很重要?

如果數(shù)據(jù)不需要立即持久化,您可以修改您的應(yīng)用程序,以便消費(fèi)者只需從隊(duì)列中刪除消息并將它們保存到緩存集合中,例如在 Redis 中.引入第二個(gè)進(jìn)程,然后依次讀取和處理緩存的消息.這將確保您的隊(duì)列長(zhǎng)度不會(huì)增長(zhǎng)到足以導(dǎo)致流控制,同時(shí)防止您的數(shù)據(jù)庫(kù)被寫(xiě)入請(qǐng)求轟炸,這些寫(xiě)入請(qǐng)求通常比讀取請(qǐng)求更昂貴.您的消費(fèi)者現(xiàn)在只需從隊(duì)列中刪除消息,稍后由另一個(gè)進(jìn)程處理.

I have an application that uses RabbitMQ as the message queue to send/receive message between two components: sender and receiver. The sender sends message in a very fast way. The receiver receives the message and then does some very time-consuming task (mainly database writing for very large data size). Since the receiver takes a very long time to finish the task and then retrieve the next message in the queue, the sender will keep filling up the queue quickly. So my question is: Will this cause the message queue to overflow?

The message consumer looks like the following:

public void onMessage() throws IOException, InterruptedException {
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");

        JSONObject json = new JSONObject(message);
        String caseID = json.getString("caseID");
        //following takes very long time            
        dao.saveToDB(caseID);
    }
}

Each message received by the consumer contains a caseID. For each caseID, it will save large amount of data to the database, which takes very long time. Currently only one consumer is set up for the RabbitMQ since producer/consumer use the same queue for the publish/subscribe of caseID. So how can I speed up the consumer throughput so that the consumer can catch up with the producer and avoid the message overflow in the queue? Should I use multithreading in the consumer part to speed up the consumption rate? Or should I use multiple consumers to consume the incoming message simutaneously? Or is there any asynchronous way to let the consumer consume the message asynchronously without waiting it to finish? Any suggestions are welcome.

解決方案

"Will this cause the message queue to overflow?"

Yes. RabbitMQ will enter a state of "flow control" to prevent excessive memory consumption as the queue length increases. It will also start persisting messages to disk, rather than hold them in memory.

"So how can I speed up the consumer throughput so that the consumer can catch up with the producer and avoid the message overflow in the queue"

You have 2 options:

  1. Add more consumers. Bear in mind that your DB will now be manipulated by multiple concurrent processes if you choose this option. Ensure that the DB can withstand the extra pressure.
  2. Increase the QOS value of the consuming channel. This will pull more messages from the queue and buffer them on the consumer. This will increase the overall processing time; if 5 messages are buffered, the 5th message will take the processing time of messages 1...5 to complete.

"Should I use multithreading in the consumer part to speed up the consumption rate?"

Not unless you have a well-designed solution. Adding parallelism to an application is going to add a lot of overhead on the consumer-side. You may end up exhausting the ThreadPool or throttling memory-usage.

When dealing with AMQP, you really need to consider the business requirement for each process in order to design the optimal solution. How time-sensitive are your incoming messages? Do they need to be persisted to DB ASAP, or does it matter to your users whether or not that data is available immediately?

If the data does not need to be persisted immediately, you could modify your application so that the consumer(s) simply remove messages from the queue and save them to a cached collection, in Redis, for example. Introduce a second process which then reads and processes the cached messages sequentially. This will ensure that your queue-length does not grow sufficiently to result in flow-control, while preventing your DB from being bombarded with write requests, which are typically more expensive than read requests. Your consumer(s) now simply remove messages from the queue, to be dealt with by another process later.

這篇關(guān)于RabbitMQ:快速生產(chǎn)者和慢消費(fèi)者的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!

【網(wǎng)站聲明】本站部分內(nèi)容來(lái)源于互聯(lián)網(wǎng),旨在幫助大家更快的解決問(wèn)題,如果有圖片或者內(nèi)容侵犯了您的權(quán)益,請(qǐng)聯(lián)系我們刪除處理,感謝您的支持!

相關(guān)文檔推薦

Parsing an ISO 8601 string local date-time as if in UTC(解析 ISO 8601 字符串本地日期時(shí)間,就像在 UTC 中一樣)
How to convert Gregorian string to Gregorian Calendar?(如何將公歷字符串轉(zhuǎn)換為公歷?)
Java: What/where are the maximum and minimum values of a GregorianCalendar?(Java:GregorianCalendar 的最大值和最小值是什么/在哪里?)
Calendar to Date conversion for dates before 15 Oct 1582. Gregorian to Julian calendar switch(1582 年 10 月 15 日之前日期的日歷到日期轉(zhuǎn)換.公歷到儒略歷切換)
java Calendar setFirstDayOfWeek not working(java日歷setFirstDayOfWeek不起作用)
Java: getting current Day of the Week value(Java:獲取當(dāng)前星期幾的值)
主站蜘蛛池模板: 不锈钢复合板|钛复合板|金属复合板|南钢集团安徽金元素复合材料有限公司-官网 | 档案密集柜_手动密集柜_智能密集柜_内蒙古档案密集柜-盛隆柜业内蒙古密集柜直销中心 | 硫化罐-胶管硫化罐-山东鑫泰鑫智能装备有限公司 | 高防护蠕动泵-多通道灌装系统-高防护蠕动泵-www.bjhuiyufluid.com慧宇伟业(北京)流体设备有限公司 | 天津货架厂_穿梭车货架_重型仓储货架_阁楼货架定制-天津钢力仓储货架生产厂家_天津钢力智能仓储装备 | 东亚液氮罐-液氮生物容器-乐山市东亚机电工贸有限公司 | 滚筒烘干机_转筒烘干机_滚筒干燥机_转筒干燥机_回转烘干机_回转干燥机-设备生产厂家 | 天一线缆邯郸有限公司_煤矿用电缆厂家_矿用光缆厂家_矿用控制电缆_矿用通信电缆-天一线缆邯郸有限公司 | 焊管生产线_焊管机组_轧辊模具_焊管设备_焊管设备厂家_石家庄翔昱机械 | 神超官网_焊接圆锯片_高速钢锯片_硬质合金锯片_浙江神超锯业制造有限公司 | 钢格板|热镀锌钢格板|钢格栅板|钢格栅|格栅板-安平县昊泽丝网制品有限公司 | 斗式提升机_链式斗提机_带式斗提机厂家无锡市鸿诚输送机械有限公司 | 校服厂家,英伦校服定做工厂,园服生产定制厂商-东莞市艾咪天使校服 | 继电器模组-IO端子台-plc连接线-省配线模组厂家-世麦德 | 镀锌角钢_槽钢_扁钢_圆钢_方矩管厂家_镀锌花纹板-海邦钢铁(天津)有限公司 | 论文查重_免费论文查重_知网学术不端论文查重检测系统入口_论文查重软件 | 杜康白酒加盟_杜康酒代理_杜康酒招商加盟官网_杜康酒厂加盟总代理—杜康酒神全国运营中心 | 折弯机-刨槽机-数控折弯机-数控刨槽机-数控折弯机厂家-深圳豐科机械有限公司 | H型钢切割机,相贯线切割机,数控钻床,数控平面钻,钢结构设备,槽钢切割机,角钢切割机,翻转机,拼焊矫一体机 | 塑料撕碎机_编织袋撕碎机_废纸撕碎机_生活垃圾撕碎机_废铁破碎机_河南鑫世昌机械制造有限公司 | 电磁流量计厂家_涡街流量计厂家_热式气体流量计-青天伟业仪器仪表有限公司 | 香港新时代国际美容美发化妆美甲培训学校-26年培训经验,值得信赖! | 排烟防火阀-消防排烟风机-正压送风口-厂家-价格-哪家好-德州鑫港旺通风设备有限公司 | 污泥烘干机-低温干化机-工业污泥烘干设备厂家-焦作市真节能环保设备科技有限公司 | 南京展台搭建-南京展会设计-南京展览设计公司-南京展厅展示设计-南京汇雅展览工程有限公司 | 上海风淋室_上海风淋室厂家_上海风淋室价格_上海伯淋 | 电缆隧道在线监测-智慧配电站房-升压站在线监测-江苏久创电气科技有限公司 | led全彩屏-室内|学校|展厅|p3|户外|会议室|圆柱|p2.5LED显示屏-LED显示屏价格-LED互动地砖屏_蕙宇屏科技 | 不锈钢复合板厂家_钛钢复合板批发_铜铝复合板供应-威海泓方金属复合材料股份有限公司 | 武汉高低温试验箱_恒温恒湿试验箱厂家-武汉蓝锐环境科技有限公司 | 吲哚菁绿衍生物-酶底物法大肠菌群检测试剂-北京和信同通科技发展有限公司 | 存包柜厂家_电子存包柜_超市存包柜_超市电子存包柜_自动存包柜-洛阳中星 | 环氧乙烷灭菌器_压力蒸汽灭菌器_低温等离子过氧化氢灭菌器 _低温蒸汽甲醛灭菌器_清洗工作站_医用干燥柜_灭菌耗材-环氧乙烷灭菌器_脉动真空压力蒸汽灭菌器_低温等离子灭菌设备_河南省三强医疗器械有限责任公司 | 磁力加热搅拌器-多工位|大功率|数显恒温磁力搅拌器-司乐仪器官网 | 珠海网站建设_响应网站建设_珠海建站公司_珠海网站设计与制作_珠海网讯互联 | 哈尔滨京科脑康神经内科医院-哈尔滨治疗头痛医院-哈尔滨治疗癫痫康复医院 | 砖机托板价格|免烧砖托板|空心砖托板厂家_山东宏升砖机托板厂 | 高防护蠕动泵-多通道灌装系统-高防护蠕动泵-www.bjhuiyufluid.com慧宇伟业(北京)流体设备有限公司 | PU树脂_水性聚氨酯树脂_聚氨酯固化剂_聚氨酯树脂厂家_宝景化工 | 安德建奇火花机-阿奇夏米尔慢走丝|高维|发那科-北京杰森柏汇 | 世界箱包品牌十大排名,女包小众轻奢品牌推荐200元左右,男包十大奢侈品牌排行榜双肩,学生拉杆箱什么品牌好质量好 - Gouwu3.com |