pbootcms网站模板|日韩1区2区|织梦模板||网站源码|日韩1区2区|jquery建站特效-html5模板网

如何通過 MQTT 傳輸并使用 RabbitMQ 和 Spring-AMQP 在

How can I transmit via MQTT and Receive on AMQP with RabbitMQ and Spring-AMQP(如何通過 MQTT 傳輸并使用 RabbitMQ 和 Spring-AMQP 在 AMQP 上接收)
本文介紹了如何通過 MQTT 傳輸并使用 RabbitMQ 和 Spring-AMQP 在 AMQP 上接收的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!

問題描述

限時送ChatGPT賬號..

所以我已經讓 MQTT -> MQTT 和 AMQP -> AMQP 工作了;MQTT -> AMQP 的翻譯似乎并沒有在某處工作.這是我的測試,如果我的監聽器"也在使用 paho 的 MQTT 中,則它通過了,但是這個 rabbitmq 實現沒有.

So I've gotten MQTT -> MQTT and AMQP -> AMQP to work; the translation of MQTT -> AMQP doesn't seem to be working somewhere though. Here's my test, it passes if my "listener" is also in MQTT using paho, but this rabbitmq implementation doesn't.

@SpringBootTest
@SpringJUnitConfig
internal open class ProvisioningTest @Autowired constructor(
    private val mqtt: IMqttAsyncClient,
    private val mapper: ObjectMapper
) {

    @Test
    fun provision() {
        val entity = Foley(
            rfid = UUID.randomUUID().toString(),
        )

        val called = AtomicBoolean(false)
        mqtt.subscribe("foley/created", 1) { _, _ -> called.set(true) }

        mqtt.publish("foley/new", MqttMessage(mapper.writeValueAsBytes(entity)))

        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilTrue(called)
    }
}

這是將保存的實體發布到另一個隊列的偵聽器;當我發布到 MQTT 時,它永遠不會被調用.

this is the listener that publishes the saved entity to the other queue; it never gets called when I publish to MQTT.

@Service
open class Provisioning(private val repo: FoleyRepo) {
    private val log: Logger = LogManager.getLogger(this::class.java)

    @SendTo("foley.created")
    @RabbitListener(queuesToDeclare = [Queue("foley.new")] )
    open fun listen(entity: Foley): Foley {
        log.trace("saving: {}", entity)
        val save = repo.save(entity)
        log.debug("saved: {}", save)
        return save
    }

}

我的全部消息配置

@Configuration
open class MessagingConfig {

    @Bean
    open fun client(
        @Value("tcp://${mqtt.client.host:localhost}:${mqtt.client.port:1883}") uri: String,
        @Value("${mqtt.client.user:#{null}}") user: String?,
        @Value("${mqtt.client.pass:#{null}}") pass: CharArray?
    ): IMqttAsyncClient {

        val connOpt = MqttConnectOptions()
        user?.let { connOpt.userName = it }
        pass?.let { connOpt.password = it }
        connOpt.isCleanSession = false
        connOpt.isAutomaticReconnect = true
        val client = MqttAsyncClient(uri, MqttAsyncClient.generateClientId(), MemoryPersistence())
        client.connect(connOpt)
        return client
    }

    @Bean
    open fun messageConverter( om: ObjectMapper): MessageConverter {
        return Jackson2JsonMessageConverter(om)
    }

    @Bean
    open fun builder(): Jackson2ObjectMapperBuilderCustomizer {
        return Jackson2ObjectMapperBuilderCustomizer {
            it.modules(JavaTimeModule(), KotlinModule())
        }
    }
}

使用啟用了 mqtt 的 官方 docker rabbitmq 映像.

using the official docker rabbitmq image with mqtt enabled.

我需要糾正什么才能完成這項工作?

What do I need to correct to make this work?

推薦答案

MQTT插件發布到amq.topic,以mqtt主題名作為路由key.

The MQTT plugin publishes to the amq.topic with the mqtt topic name as the routing key.

在消費者端,它使用路由鍵將自動刪除隊列綁定到該交換;在以下示例中,隊列名為 mqtt-subscription-mqttConsumerqos1.

On the consumer side, it binds an auto-delete queue to that exchange, with the routing key; in the following example, the queue is named mqtt-subscription-mqttConsumerqos1.

為了通過 AMQP 接收 MQTT 消息,您需要將自己的隊列綁定到交換器.這是一個例子:

In order to receive MQTT messages over AMQP, you need to bind your own queue to the exchange. Here is an example:

@SpringBootApplication
public class So54995261Application {

    public static void main(String[] args) {
        SpringApplication.run(So54995261Application.class, args);
    }

    @Bean
    @ServiceActivator(inputChannel = "toMQTT")
    public MqttPahoMessageHandler sendIt(MqttPahoClientFactory clientFactory) {
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler("clientId", clientFactory);
        handler.setAsync(true);
        handler.setDefaultTopic("so54995261");
        return handler;
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://localhost:1883" });
        options.setUserName("guest");
        options.setPassword("guest".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    public MessageProducerSupport mqttInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("mqttConsumer",
                mqttClientFactory(), "so54995261");
        adapter.setCompletionTimeout(5000);
        return adapter;
    }

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows.from(mqttInbound())
                .handle(System.out::println)
                .get();
    }

    @RabbitListener(queues = "so54995261")
    public void listen(byte[] in) {
        System.out.println(new String(in));
    }

    @Bean
    public Queue queue() {
        return new Queue("so54995261");
    }

    @Bean
    public Binding binding() {
        return new Binding("so54995261", DestinationType.QUEUE, "amq.topic", "so54995261", null);
    }

    @Bean
    public ApplicationRunner runner(MessageChannel toMQTT) {
        return args -> toMQTT.send(new GenericMessage<>("foo"));
    }

}

這篇關于如何通過 MQTT 傳輸并使用 RabbitMQ 和 Spring-AMQP 在 AMQP 上接收的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!

【網站聲明】本站部分內容來源于互聯網,旨在幫助大家更快的解決問題,如果有圖片或者內容侵犯了您的權益,請聯系我們刪除處理,感謝您的支持!

相關文檔推薦

Parsing an ISO 8601 string local date-time as if in UTC(解析 ISO 8601 字符串本地日期時間,就像在 UTC 中一樣)
How to convert Gregorian string to Gregorian Calendar?(如何將公歷字符串轉換為公歷?)
Java: What/where are the maximum and minimum values of a GregorianCalendar?(Java:GregorianCalendar 的最大值和最小值是什么/在哪里?)
Calendar to Date conversion for dates before 15 Oct 1582. Gregorian to Julian calendar switch(1582 年 10 月 15 日之前日期的日歷到日期轉換.公歷到儒略歷切換)
java Calendar setFirstDayOfWeek not working(java日歷setFirstDayOfWeek不起作用)
Java: getting current Day of the Week value(Java:獲取當前星期幾的值)
主站蜘蛛池模板: 天津热油泵_管道泵_天津高温热油泵-天津市金丰泰机械泵业有限公司【官方网站】 | 机房监控|动环监控|动力环境监控系统方案产品定制厂家 - 迈世OMARA | 锂离子电池厂家-山东中信迪生电源 | 预制舱-电力集装箱预制舱-模块化预制舱生产厂家-腾达电器设备 | 圆盘鞋底注塑机_连帮鞋底成型注塑机-温州天钢机械有限公司 | 选宝石船-陆地水上开采「精选」色选机械设备-青州冠诚重工机械有限公司 | 大米加工设备|大米加工机械|碾米成套设备|大米加工成套设备-河南成立粮油机械有限公司 | 亿立分板机_曲线_锯片式_走刀_在线式全自动_铣刀_在线V槽分板机-杭州亿协智能装备有限公司 | 上海律师咨询_上海法律在线咨询免费_找对口律师上策法网-策法网 广东高华家具-公寓床|学生宿舍双层铁床厂家【质保十年】 | 德国GMN轴承,GMN角接触球轴承,GMN单向轴承,GMN油封,GMN非接触式密封 | 通风气楼_通风天窗_屋顶风机-山东美创通风设备有限公司 | 山东商品混凝土搅拌楼-环保型搅拌站-拌合站-分体仓-搅拌机厂家-天宇 | 一氧化氮泄露报警器,二甲苯浓度超标报警器-郑州汇瑞埔电子技术有限公司 | 柴油发电机组_柴油发电机_发电机组价格-江苏凯晨电力设备有限公司 | 无刷电机_直流无刷电机_行星减速机-佛山市藤尺机电设备有限公司 无菌检查集菌仪,微生物限度仪器-苏州长留仪器百科 | 知企服务-企业综合服务(ZiKeys.com)-品优低价、种类齐全、过程管理透明、速度快捷高效、放心服务,知企专家! | 企典软件一站式企业管理平台,可私有、本地化部署!在线CRM客户关系管理系统|移动办公OA管理系统|HR人事管理系统|人力 | 广东燎了网络科技有限公司官网-网站建设-珠海网络推广-高端营销型外贸网站建设-珠海专业h5建站公司「了了网」 | 硅PU球场、篮球场地面施工「水性、环保、弹性」硅PU材料生产厂家-广东中星体育公司 | 泥浆在线密度计厂家-防爆数字压力表-膜盒-远传压力表厂家-江苏大亚自控设备有限公司 | 工控机-工业平板电脑-研华工控机-研越无风扇嵌入式box工控机 | PO膜_灌浆膜及地膜供应厂家 - 青州市鲁谊塑料厂 | 浙江浩盛阀门有限公司| 注浆压力变送器-高温熔体传感器-矿用压力传感器|ZHYQ朝辉 | 无痕胶_可移胶_无痕双面胶带_可移无痕胶厂家-东莞凯峰 | 青岛空压机,青岛空压机维修/保养,青岛空压机销售/出租公司,青岛空压机厂家电话 | 环氧乙烷灭菌器_压力蒸汽灭菌器_低温等离子过氧化氢灭菌器 _低温蒸汽甲醛灭菌器_清洗工作站_医用干燥柜_灭菌耗材-环氧乙烷灭菌器_脉动真空压力蒸汽灭菌器_低温等离子灭菌设备_河南省三强医疗器械有限责任公司 | 九州网址_专注于提供网址大全分享推广中文网站导航服务 | 隔爆型防爆端子分线箱_防爆空气开关箱|依客思 | 纸箱网 -纸箱机械|设备|包装纸盒|包装印刷行业门户网站 | 槽钢冲孔机,槽钢三面冲,带钢冲孔机-山东兴田阳光智能装备股份有限公司 | 石家庄救护车出租_重症转院_跨省跨境医疗转送_活动赛事医疗保障_康复出院_放弃治疗_腾康26年医疗护送转诊团队 | 深圳市万色印象美业有限公司 | 自动气象站_农业气象站_超声波气象站_防爆气象站-山东万象环境科技有限公司 | 杭州荣奥家具有限公司-浙江办公家具,杭州办公家具厂 | 衬塑管道_衬四氟管道厂家-淄博恒固化工设备有限公司 | 智能交通网_智能交通系统_ITS_交通监控_卫星导航_智能交通行业 | 蓝莓施肥机,智能施肥机,自动施肥机,水肥一体化项目,水肥一体机厂家,小型施肥机,圣大节水,滴灌施工方案,山东圣大节水科技有限公司官网17864474793 | 高压微雾加湿器_工业加湿器_温室喷雾-昌润空气净化设备 | 智能监控-安防监控-监控系统安装-弱电工程公司_成都万全电子 | sfp光模块,高速万兆光模块工厂-性价比更高的光纤模块制造商-武汉恒泰通 |