pbootcms网站模板|日韩1区2区|织梦模板||网站源码|日韩1区2区|jquery建站特效-html5模板网

Spring異步MessageListener用例發(fā)生業(yè)務(wù)異常時(shí)如何讓

How to ask RabbitMQ to retry when business Exception occurs in Spring Asynchronous MessageListener use case(Spring異步MessageListener用例發(fā)生業(yè)務(wù)異常時(shí)如何讓RabbitMQ重試)
本文介紹了Spring異步MessageListener用例發(fā)生業(yè)務(wù)異常時(shí)如何讓RabbitMQ重試的處理方法,對(duì)大家解決問題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧!

問題描述

限時(shí)送ChatGPT賬號(hào)..

我正在運(yùn)行一個(gè) Spring AMQP 消息偵聽器.

I have a Spring AMQP message listener running.

public class ConsumerService implements MessageListener {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void onMessage(Message message) {
        try {
            testService.process(message); //This process method can throw Business Exception
        } catch (BusinessException e) {
           //Here we can just log the exception. How the retry attempt is made?
        } catch (Exception e) {
           //Here we can just log the exception.  How the retry attempt is made?
        }
    }
}

如您所見,在處理過程中可能會(huì)出現(xiàn)異常.由于 Catch 塊中的特定錯(cuò)誤,我想重試.我無法通過 onMessage 中的異常.如何告訴 RabbitMQ 有異常并重試?

As you can see, there could be exception coming out during process. I want to retry because of a particular error in Catch block. I cannot through exception in onMessage. How to tell RabbitMQ to there is an exception and retry?

推薦答案

由于 onMessage() 不允許拋出已檢查的異常,您可以將異常包裝在 RuntimeException 然后重新扔.

Since onMessage() doesn't allow to throw checked exceptions you can wrap the exception in a RuntimeException and re-throw it.

try {
    testService.process(message);
} catch (BusinessException e) {
    throw new RuntimeException(e);
}

但是請(qǐng)注意,這可能會(huì)導(dǎo)致郵件無限期地重新發(fā)送.以下是它的工作原理:

Note however that this may result in the message to be re-delivered indefinitely. Here is how this works:

RabbitMQ 支持拒絕消息并要求代理重新排隊(duì).這顯示在這里.但是 RabbitMQ 本身并沒有重試策略的機(jī)制,例如設(shè)置最大重試次數(shù)、延遲等.

RabbitMQ supports rejecting a message and asking the broker to requeue it. This is shown here. But RabbitMQ doesn't natively have a mechanism for retry policy, e.g. setting max retries, delay, etc.

使用 Spring AMQP 時(shí),requeue on reject"是默認(rèn)選項(xiàng).Spring 的 SimpleMessageListenerContainer 默認(rèn)情況下會(huì)在出現(xiàn)未處理的異常時(shí)執(zhí)行此操作.所以在你的情況下,你只需要重新拋出捕獲的異常.但是請(qǐng)注意,如果您無法處理消息并且總是拋出異常,則該異常將無限期地重新傳遞并導(dǎo)致無限循環(huán).

When using Spring AMQP, "requeue on reject" is the default option. Spring's SimpleMessageListenerContainer will by default do this when there is an unhandled exception. So in your case you just need to re-throw the caught exception. Note however that if you cannot process a message and you always throw the exception this will be re-delivered indefinitely and will result in an infinite loop.

您可以通過拋出 AmqpRejectAndDontRequeueException 異常,這種情況下消息不會(huì)被重新排隊(duì).

You can override this behaviour per message by throwing a AmqpRejectAndDontRequeueException exception, in which case the message will not be requeued.

您也可以通過設(shè)置完全關(guān)閉 SimpleMessageListenerContainer 的requeue on reject"行為

You can also switch off the "requeue on reject" behavior of SimpleMessageListenerContainer entirely by setting

container.setDefaultRequeueRejected(false) 

當(dāng)一條消息被拒絕并且沒有重新排隊(duì)時(shí),如果在 RabbitMQ 中設(shè)置了一個(gè),它將丟失或轉(zhuǎn)移到 DLQ.

When a message is rejected and not requeued it will either be lost or transferred to a DLQ, if one is set in RabbitMQ.

如果您需要具有最大嘗試、延遲等的重試策略,最簡(jiǎn)單的方法是設(shè)置一個(gè)彈簧無狀態(tài)"RetryOperationsInterceptor,它將在線程內(nèi)進(jìn)行所有重試(使用 Thread.sleep()) 而不會(huì)在每次重試時(shí)拒絕消息(因此每次重試都不會(huì)返回 RabbitMQ).當(dāng)重試用盡時(shí),默認(rèn)情況下將記錄一個(gè)警告并使用該消息.如果您想發(fā)送到 DLQ,您將需要 RepublishMessageRecoverer 或自定義 MessageRecoverer 拒絕消息無需重新排隊(duì)(在后一種情況下,您還應(yīng)該在隊(duì)列中設(shè)置 RabbitMQ DLQ).使用默認(rèn)消息恢復(fù)器的示例:

If you need a retry policy with max attempts, delay, etc the easiest is to setup a spring "stateless" RetryOperationsInterceptor which will do all retries within the thread (using Thread.sleep()) without rejecting the message on each retry (so without going back to RabbitMQ for each retry). When retries are exhausted, by default a warning will be logged and the message will be consumed. If you want to send to a DLQ you will need either a RepublishMessageRecoverer or a custom MessageRecoverer that rejects the message without requeuing (in that latter case you should also setup a RabbitMQ DLQ on the queue). Example with default message recoverer:

container.setAdviceChain(new Advice[] {
        org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
                .stateless()
                .maxAttempts(5)
                .backOffOptions(1000, 2, 5000)
                .build()
});

這顯然有一個(gè)缺點(diǎn),即您將在整個(gè)重試期間占用線程.您還可以選擇使用有狀態(tài)的"RetryOperationsInterceptor,它會(huì)在每次重試時(shí)將消息發(fā)送回 RabbitMQ,但延遲仍將通過 Thread.sleep() 實(shí)現(xiàn)code> 在應(yīng)用程序中,加上設(shè)置有狀態(tài)攔截器有點(diǎn)復(fù)雜.

This obviously has the drawback that you will occupy the Thread for the entire duration of the retries. You also have the option to use a "stateful" RetryOperationsInterceptor, which will send the message back to RabbitMQ for each retry, but the delay will still be implemented with Thread.sleep() within the application, plus setting up a stateful interceptor is a bit more complicated.

因此,如果您想在不占用 Thread 的情況下延遲重試,您將需要一個(gè)在 RabbitMQ 隊(duì)列上使用 TTL 的更復(fù)雜的自定義解決方案.如果您不想要指數(shù)退避(因此每次重試時(shí)延遲不會(huì)增加),它會(huì)更簡(jiǎn)單一些.要實(shí)現(xiàn)這樣的解決方案,您基本上在 rabbitMQ 上創(chuàng)建另一個(gè)帶有參數(shù)的隊(duì)列: "x-message-ttl": <delay time in milliseconds> and "x-dead-letter-exchange":"<原始隊(duì)列的名稱>".然后在主隊(duì)列上設(shè)置 "x-dead-letter-exchange":"".所以現(xiàn)在當(dāng)你拒絕并且不重新排隊(duì)消息時(shí),RabbitMQ 會(huì)將它重定向到第二個(gè)隊(duì)列.當(dāng) TTL 過期時(shí),它將被重定向到原始隊(duì)列,從而重新傳遞給應(yīng)用程序.所以現(xiàn)在你需要一個(gè)重試攔截器,它在每次失敗后拒絕發(fā)送給 RabbitMQ 的消息,并跟蹤重試計(jì)數(shù).為了避免需要在應(yīng)用程序中保持狀態(tài)(因?yàn)槿绻膽?yīng)用程序是集群的,則需要復(fù)制狀態(tài))您可以從 RabbitMQ 設(shè)置的 x-death 標(biāo)頭計(jì)算重試次數(shù).在此處查看有關(guān)此標(biāo)頭的更多信息.因此,此時(shí)實(shí)現(xiàn)自定義攔截器比使用這種行為自定義 Spring 有狀態(tài)攔截器更容易.

Therefore, if you want retries with delays without occupying a Thread you will need a much more involved custom solution using TTL on RabbitMQ queues. If you don't want exponential backoff (so delay doesn't increase on each retry) it's a bit simpler. To implement such a solution you basically create another queue on rabbitMQ with arguments: "x-message-ttl": <delay time in milliseconds> and "x-dead-letter-exchange":"<name of the original queue>". Then on the main queue you set "x-dead-letter-exchange":"<name of the queue with the TTL>". So now when you reject and don't requeue a message RabbitMQ will redirect it to the second queue. When TTL expires it will be redirected to the original queue and thus redelivered to the application. So now you need a retry interceptor that rejects the message to RabbitMQ after each failure and also keeps track of the retry count. To avoid the need to keep state in the application (because if your application is clustered you need to replicate state) you can calculate the retry count from the x-death header that RabbitMQ sets. See more info about this header here. So at that point implementing a custom interceptor is easier than customising the Spring stateful interceptor with this behaviour.

還要檢查 Spring AMQP 參考中關(guān)于重試的部分.

這篇關(guān)于Spring異步MessageListener用例發(fā)生業(yè)務(wù)異常時(shí)如何讓RabbitMQ重試的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!

【網(wǎng)站聲明】本站部分內(nèi)容來源于互聯(lián)網(wǎng),旨在幫助大家更快的解決問題,如果有圖片或者內(nèi)容侵犯了您的權(quán)益,請(qǐng)聯(lián)系我們刪除處理,感謝您的支持!

相關(guān)文檔推薦

Parsing an ISO 8601 string local date-time as if in UTC(解析 ISO 8601 字符串本地日期時(shí)間,就像在 UTC 中一樣)
How to convert Gregorian string to Gregorian Calendar?(如何將公歷字符串轉(zhuǎn)換為公歷?)
Java: What/where are the maximum and minimum values of a GregorianCalendar?(Java:GregorianCalendar 的最大值和最小值是什么/在哪里?)
Calendar to Date conversion for dates before 15 Oct 1582. Gregorian to Julian calendar switch(1582 年 10 月 15 日之前日期的日歷到日期轉(zhuǎn)換.公歷到儒略歷切換)
java Calendar setFirstDayOfWeek not working(java日歷setFirstDayOfWeek不起作用)
Java: getting current Day of the Week value(Java:獲取當(dāng)前星期幾的值)
主站蜘蛛池模板: 衡阳耐适防护科技有限公司——威仕盾焊接防护用品官网/焊工手套/焊接防护服/皮革防护手套 | 电渗析,废酸回收,双极膜-山东天维膜技术有限公司 | 细砂提取机,隔膜板框泥浆污泥压滤机,螺旋洗砂机设备,轮式洗砂机械,机制砂,圆锥颚式反击式破碎机,振动筛,滚筒筛,喂料机- 上海重睿环保设备有限公司 | 保定市泰宏机械制造厂-河北铸件厂-铸造厂-铸件加工-河北大件加工 | 辐射色度计-字符亮度测试-反射式膜厚仪-苏州瑞格谱光电科技有限公司 | 硅胶管挤出机厂家_硅胶挤出机生产线_硅胶条挤出机_臣泽智能装备 贵州科比特-防雷公司厂家提供贵州防雷工程,防雷检测,防雷接地,防雷设备价格,防雷产品报价服务-贵州防雷检测公司 | 工业CT-无锡璟能智能仪器有限公司 | HV全空气系统_杭州暖通公司—杭州斯培尔冷暖设备有限公司 | 沙盘模型公司_沙盘模型制作公司_建筑模型公司_工业机械模型制作厂家 | 发电机组|柴油发电机组-批发,上柴,玉柴,潍柴,康明斯柴油发电机厂家直销 | 科昊仪器超纯水机系统-可成气相液氮罐-美菱超低温冰箱-西安昊兴生物科技有限公司 | 成都中天自动化控制技术有限公司 | 螺杆泵_中成泵业| 校车_校车价格_19座幼儿园校车_幼儿园校车_大鼻子校车 | 北京模型公司-军事模型-工业模型制作-北京百艺模型沙盘公司 | 蜘蛛车-高空作业平台-升降机-高空作业车租赁-臂式伸缩臂叉装车-登高车出租厂家 - 普雷斯特机械设备(北京)有限公司 | 密封无忧网 _ 专业的密封产品行业信息网| 防水套管厂家-柔性防水套管-不锈钢|刚性防水套管-天翔管道 | 预制围墙_工程预制围墙_天津市瑞通建筑材料有限公司 | 综合管廊模具_生态,阶梯护坡模具_检查井模具制造-致宏模具厂家 | B2B网站_B2B免费发布信息网站_B2B企业贸易平台 - 企资网 | SMN-1/SMN-A ABB抽屉开关柜触头夹紧力检测仪-SMN-B/SMN-C-上海徐吉 | LHH药品稳定性试验箱-BPS系列恒温恒湿箱-意大利超低温冰箱-上海一恒科学仪器有限公司 | 密集架-手摇-智能-移动-价格_内蒙古档案密集架生产厂家 | 水质监测站_水质在线分析仪_水质自动监测系统_多参数水质在线监测仪_水质传感器-山东万象环境科技有限公司 | 杭州高温泵_热水泵_高温油泵|昆山奥兰克泵业制造有限公司 | 污水/卧式/潜水/钻井/矿用/大型/小型/泥浆泵,价格,参数,型号,厂家 - 安平县鼎千泵业制造厂 | 科研ELISA试剂盒,酶联免疫检测试剂盒,昆虫_植物ELISA酶免试剂盒-上海仁捷生物科技有限公司 | 空气弹簧|橡胶气囊|橡胶空气弹簧-上海松夏减震器有限公司 | 齿轮减速电机一体机_蜗轮蜗杆减速马达-德国BOSERL齿轮减速机带电机生产厂家 | 紫外荧光硫分析仪-硫含量分析仪-红外光度测定仪-泰州美旭仪器 | 工业电炉,台车式电炉_厂家-淄博申华工业电炉有限公司 | 塑胶地板-商用PVC地板-pvc地板革-安耐宝pvc塑胶地板厂家 | 南京展台搭建-南京展会设计-南京展览设计公司-南京展厅展示设计-南京汇雅展览工程有限公司 | 冷水机-冰水机-冷冻机-冷风机-本森智能装备(深圳)有限公司 | 衬塑管道_衬四氟管道厂家-淄博恒固化工设备有限公司 | 油液红外光谱仪-油液监测系统-燃油嗅探仪-上海冉超光电科技有限公司 | 龙门加工中心-数控龙门加工中心厂家价格-山东海特数控机床有限公司_龙门加工中心-数控龙门加工中心厂家价格-山东海特数控机床有限公司 | 电动百叶窗,开窗器,电动遮阳百叶,电动开窗机生产厂家-徐州鑫友工控科技发展有限公司 | 船用泵,船用离心泵,船用喷射泵,泰州隆华船舶设备有限公司 | 小型气象站_车载气象站_便携气象站-山东风途物联网 |