pbootcms网站模板|日韩1区2区|织梦模板||网站源码|日韩1区2区|jquery建站特效-html5模板网

在 RabbitMQ 中分組接收消息,最好使用 Spring AMQP

Group received messages in RabbitMQ, preferably using Spring AMQP?(在 RabbitMQ 中分組接收消息,最好使用 Spring AMQP?)
本文介紹了在 RabbitMQ 中分組接收消息,最好使用 Spring AMQP?的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!

問題描述

限時送ChatGPT賬號..

我從服務 (S) 接收消息,該服務將每個單獨的屬性更改作為單獨的消息發布到實體.一個人為的例子是這樣的實體:

I'm receiving messages from a service (S) that publishes each individual property change to an entity as a separate message. A contrived example would be an entity like this:

Person {
    id: 123
    name: "Something",
    address: {...}
}

如果姓名和地址在同一事務中更新,則 (S) 將發布兩條消息,PersonNameCorrectedPersonMoved.問題出在接收端,我在其中存儲此 Person 實體的投影,并且每個屬性更改都會導致寫入數據庫.所以在這個例子中,會有兩次對數據庫的寫入,但是如果我可以在短時間內批量處理消息并按 id 分組,那么我只需要對數據庫進行一次寫入.

If name and address are updated in the same transaction then (S) will publish two messages, PersonNameCorrected and PersonMoved. The problem is on the receiving side where I'm storing a projection of this Person entity and each property change causes a write to the database. So in this example there would be two writes to the database but if I could batch messages for a short period of time and group them by id then I would only have to make a single write to the database.

在 RabbitMQ 中通常如何處理這個問題?Spring AMQP 是否提供了更簡單的抽象?

How does one typically handle this in RabbitMQ? Does Spring AMQP provide an easier abstraction?

請注意,我已經簡要查看了 prefetch 但我不確定這是否是要走的路.如果我理解正確,預取也是基于每個連接的.我試圖在 per-queue 的基礎上實現這一點,因為如果批處理(從而增加延遲)是可行的方法,我不想將此延遲添加到我使用的所有隊列中服務(但僅限于那些需要group-by-id"功能的人).

Note that I have looked briefly at prefetch but I'm not sure if this is the way to go. Also prefetch, if I understand it correctly, is per connection basis. I'm trying to achieve this on a per-queue basis, because if batching (and thus added latency) is the way to go I wouldn't like to add this latency to ALL queues consumed by my service (but only to those that need the "group-by-id" features).

推薦答案

對于這種情況,Prefetch 無濟于事.

Prefetch won't help for a case like this.

考慮使用 Spring Integration,它的適配器位于 Spring 之上AMQP;它還提供了一個聚合器,可用于在將消息發送到管道中的下一個階段之前將它們組合在一起.

Consider using Spring Integration which has adapters that sit on top of Spring AMQP; it also provides an aggregrator which can be used to group messages together before sending them on to the next stage in the pipeline.

編輯

這是一個用于演示的快速啟動應用...

Here's a quick boot app to demostrate...

@SpringBootApplication
public class So42969130Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(So42969130Application.class, args)
            .close();
    }

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private Handler handler;

    @Override
    public void run(String... args) throws Exception {
        this.template.convertAndSend("so9130", new PersonNameChanged(123));
        this.template.convertAndSend("so9130", new PersonMoved(123));
        this.handler.latch.await(10, TimeUnit.SECONDS);
    }

    @Bean
    public IntegrationFlow flow(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "so9130")
                        .messageConverter(converter()))
                .aggregate(a -> a
                        .correlationExpression("payload.id")
                        .releaseExpression("false") // open-ended release, timeout only
                        .sendPartialResultOnExpiry(true)
                        .groupTimeout(2000))
                .handle(handler())
                .get();
    }

    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public Handler handler() {
        return new Handler();
    }

    @Bean
    public Queue queue() {
        return new Queue("so9130", false, false, true);
    }

    public static class Handler {

        private final CountDownLatch latch = new CountDownLatch(1);

        @ServiceActivator
        public void handle(Collection<?> aggregatedData) {
            System.out.println(aggregatedData);
            this.latch.countDown();
        }

    }

    public static class PersonNameChanged {

        private int id;

        PersonNameChanged() {
        }

        PersonNameChanged(int id) {
            this.id = id;
        }

        public int getId() {
            return this.id;
        }

        public void setId(int id) {
            this.id = id;
        }

        @Override
        public String toString() {
            return "PersonNameChanged [id=" + this.id + "]";
        }

    }

    public static class PersonMoved {

        private int id;

        PersonMoved() {
        }

        PersonMoved(int id) {
            this.id = id;
        }

        public int getId() {
            return this.id;
        }

        public void setId(int id) {
            this.id = id;
        }

        @Override
        public String toString() {
            return "PersonMoved [id=" + this.id + "]";
        }

    }

}

波姆:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>so42969130</artifactId>
    <version>2.0.0-BUILD-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>so42969130</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-java-dsl</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

結果:

2017-03-23 09:56:57.501  INFO 75217 --- [ask-scheduler-2] .s.i.a.AbstractCorrelatingMessageHandler : 
    Expiring MessageGroup with correlationKey[123]
[PersonNameChanged [id=123], PersonMoved [id=123]]

這篇關于在 RabbitMQ 中分組接收消息,最好使用 Spring AMQP?的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!

【網站聲明】本站部分內容來源于互聯網,旨在幫助大家更快的解決問題,如果有圖片或者內容侵犯了您的權益,請聯系我們刪除處理,感謝您的支持!

相關文檔推薦

Parsing an ISO 8601 string local date-time as if in UTC(解析 ISO 8601 字符串本地日期時間,就像在 UTC 中一樣)
How to convert Gregorian string to Gregorian Calendar?(如何將公歷字符串轉換為公歷?)
Java: What/where are the maximum and minimum values of a GregorianCalendar?(Java:GregorianCalendar 的最大值和最小值是什么/在哪里?)
Calendar to Date conversion for dates before 15 Oct 1582. Gregorian to Julian calendar switch(1582 年 10 月 15 日之前日期的日歷到日期轉換.公歷到儒略歷切換)
java Calendar setFirstDayOfWeek not working(java日歷setFirstDayOfWeek不起作用)
Java: getting current Day of the Week value(Java:獲取當前星期幾的值)
主站蜘蛛池模板: 新密高铝耐火砖,轻质保温砖价格,浇注料厂家直销-郑州荣盛窑炉耐火材料有限公司 | 神超官网_焊接圆锯片_高速钢锯片_硬质合金锯片_浙江神超锯业制造有限公司 | 杭州公司变更法人-代理记账收费价格-公司注销代办_杭州福道财务管理咨询有限公司 | 华中线缆有限公司-电缆厂|电缆厂家|电线电缆厂家 | 深圳货架厂_仓库货架公司_重型仓储货架_线棒货架批发-深圳市诺普泰仓储设备有限公司 | 浙江建筑资质代办_二级房建_市政_电力_安许_劳务资质办理公司 | 贴片电感_贴片功率电感_贴片绕线电感_深圳市百斯特电子有限公司 贴片电容代理-三星电容-村田电容-风华电容-国巨电容-深圳市昂洋科技有限公司 | 西安标准厂房_陕西工业厂房_西咸新区独栋厂房_长信科技产业园官方网站 | vr安全体验馆|交通安全|工地安全|禁毒|消防|安全教育体验馆|安全体验教室-贝森德(深圳)科技 | 东莞螺丝|东莞螺丝厂|东莞不锈钢螺丝|东莞组合螺丝|东莞精密螺丝厂家-东莞利浩五金专业紧固件厂家 | 宝鸡市人民医院| 东莞市踏板石餐饮管理有限公司_正宗桂林米粉_正宗桂林米粉加盟_桂林米粉加盟费-东莞市棒子桂林米粉 | 不锈钢搅拌罐_高速搅拌罐厂家-无锡市凡格德化工装备科技有限公司 | 定硫仪,量热仪,工业分析仪,马弗炉,煤炭化验设备厂家,煤质化验仪器,焦炭化验设备鹤壁大德煤质工业分析仪,氟氯测定仪 | 中天寰创-内蒙古钢结构厂家|门式刚架|钢结构桁架|钢结构框架|包头钢结构煤棚 | 大型果蔬切片机-水果冬瓜削皮机-洗菜机切菜机-肇庆市凤翔餐饮设备有限公司 | 生物颗粒燃烧机-生物质燃烧机-热风炉-生物颗粒蒸汽发生器-丽水市久凯能源设备有限公司 | 济南展厅设计施工_数字化展厅策划设计施工公司_山东锐尚文化传播有限公司 | 餐饮加盟网_特色餐饮加盟店_餐饮连锁店加盟 | 真空乳化机-灌装封尾机-首页-温州精灌 | 玉米深加工机械,玉米加工设备,玉米加工机械等玉米深加工设备制造商-河南成立粮油机械有限公司 | 海德莱电力(HYDELEY)-无功补偿元器件生产厂家-二十年专业从事电力电容器 | 浴室柜-浴室镜厂家-YINAISI · 意大利设计师品牌 | 咿耐斯 |-浙江台州市丰源卫浴有限公司 | 槽钢冲孔机,槽钢三面冲,带钢冲孔机-山东兴田阳光智能装备股份有限公司 | 精雕机-火花机-精雕机 cnc-高速精雕机-电火花机-广东鼎拓机械科技有限公司 | 120kv/2mA直流高压发生器-60kv/2mA-30kva/50kv工频耐压试验装置-旭明电工 | 透平油真空滤油机-变压器油板框滤油机-滤油车-华之源过滤设备 | 水热合成反应釜-防爆高压消解罐-西安常仪仪器设备有限公司 | ?水马注水围挡_塑料注水围挡_防撞桶-常州瑞轩水马注水围挡有限公司 | COD分析仪|氨氮分析仪|总磷分析仪|总氮分析仪-圣湖Greatlake | 数显水浴恒温振荡器-分液漏斗萃取振荡器-常州市凯航仪器有限公司 | 挤出熔体泵_高温熔体泵_熔体出料泵_郑州海科熔体泵有限公司 | 健康管理师报考条件,考试时间,报名入口—首页 | 原子吸收设备-国产分光光度计-光谱分光光度计-上海光谱仪器有限公司 | 冲击式破碎机-冲击式制砂机-移动碎石机厂家_青州市富康机械有限公司 | 外观设计_设备外观设计_外观设计公司_产品外观设计_机械设备外观设计_东莞工业设计公司-意品深蓝 | 西安微信朋友圈广告投放_微信朋友圈推广_西安度娘网络科技有限公司 | 棉柔巾代加工_洗脸巾oem_一次性毛巾_浴巾生产厂家-杭州禾壹卫品科技有限公司 | 软文发布-新闻发布推广平台-代写文章-网络广告营销-自助发稿公司媒介星 | 河南中专学校|职高|技校招生-河南中职中专网 | 电机保护器-电动机综合保护器-上海硕吉电器有限公司 |