序
mqtt(message queuing telemetry transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限的iot场景。这里简单介绍一下如何在springboot中集成。
maven
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-integration</artifactid> </dependency> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-stream</artifactid> </dependency> <dependency> <groupid>org.springframework.integration</groupid> <artifactid>spring-integration-mqtt</artifactid> </dependency>
配置client factory
@bean public mqttpahoclientfactory mqttclientfactory() { defaultmqttpahoclientfactory factory = new defaultmqttpahoclientfactory(); factory.setserveruris("tcp://demo:1883"); // factory.setusername("guest"); // factory.setpassword("guest"); return factory; }
配置consumer
@bean public integrationflow mqttinflow() { return integrationflows.from(mqttinbound()) .transform(p -> p + ", received from mqtt") .handle(logger()) .get(); } private logginghandler logger() { logginghandler logginghandler = new logginghandler("info"); logginghandler.setloggername("sisample"); return logginghandler; } @bean public messageproducersupport mqttinbound() { mqttpahomessagedrivenchanneladapter adapter = new mqttpahomessagedrivenchanneladapter("sisampleconsumer", mqttclientfactory(), "sisampletopic"); adapter.setcompletiontimeout(5000); adapter.setconverter(new defaultpahomessageconverter()); adapter.setqos(1); return adapter; }
配置producer
@bean public integrationflow mqttoutflow() { //console input // return integrationflows.from(characterstreamreadingmessagesource.stdin(), // e -> e.poller(pollers.fixeddelay(1000))) // .transform(p -> p + " sent to mqtt") // .handle(mqttoutbound()) // .get(); return integrationflows.from(outchannel()) .handle(mqttoutbound()) .get(); } @bean public messagechannel outchannel() { return new directchannel(); } @bean public messagehandler mqttoutbound() { mqttpahomessagehandler messagehandler = new mqttpahomessagehandler("sisamplepublisher", mqttclientfactory()); messagehandler.setasync(true); messagehandler.setdefaulttopic("sisampletopic"); return messagehandler; }
配置messaginggateway
@messaginggateway(defaultrequestchannel = "outchannel") public interface msgwriter { void write(string note); }
这样就大功告成了
doc
spring-integration-samples-mqtt
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。
如对本文有疑问, 点击进行留言回复!!
跟我学Springboot开发后端管理系统6:缓存框架Caffeine
《Oracle Java EE编程自学与面试指南》01-02、Web应用类型
Error: Avoided redundant navigation to current location: “/XXX“.的问题
Avoided redundant navigation to current location:
荐 四十一、Vue项目上手 | 用户管理系统 实现用户修改和删除功能(完成篇)
网友评论