网关绑定MQTT实现发布订阅 202003

股票资讯

原标题:网关绑定MQTT实现发布订阅

作者:isWulongbo

来源:分段故障不考虑社区

前言

实现MQTT协议的中间件很多,本文使用的是企业级的EMQX EnterPrise,不了解的可以看看前面的博客。这里主要介绍SpringBoot2.0集成MQTT实现消息推送的功能。

创建项目

创建父项目

打开想法,然后单击文件>:新建>;项目选择Spring Initializer >: JDK版本>:下一步,创建如下所示的项目

单击下一步,我们检查前两个开发人员工具,我们检查第一个网络工具,我们暂时不需要在这里检查安全框架和SQL,我们只检查第一个消息中间件,我们不需要检查云组件。

依次单击“下一步完成”创建一个好项目

删除src,。gitignore、HELP.md、mvnw和mvnw.cmd目录。本文采用网关绑定,需要引入以下依赖关系:

& lt依赖性>。

& ltgroupId>。org . spring framework . integration & lt;/groupId>。

& ltartifactId>。spring集成流<。/artifactId >

& lt/dependency>。

& lt依赖性>。

& ltgroupId>。org . spring framework . integration & lt;/groupId>。

& ltartifactId>。spring-integration-mqtt<。/artifactId >

& lt/dependency>。

父项目pom文件:

& lt?xml version= "1.0 "编码= "UTF-8 "?>。

& lt项目xmlns = " http://maven . Apache . org/POM/4 . 0 . 0 " xmlns:xsi = " http://www . w3 . org/2001/XMLSchema-instance "

https://maven.apache.org/xsd/maven-4.0.0.xsd"

& lt模型版本>4.0.0<。/modelVersion >

& lt包装>。pom<。/packaging>。

& lt模块>。

& lt模块>回弹_ emqx _ common & lt/module>。

& lt模块>回弹_emqx_publish<。/module>。

& lt模块>回弹_emqx_subscribe<。/module>。

& lt/modules>。

& lt父级>。

& ltgroupId>。org . spring framework . boot & lt;/groupId>。

& ltartifactId>。spring-boot-starter-parent<。/artifactId >

& lt版本>。2.4.1<。/version>。

& ltrelativePath/>& lt!-从存储库中查找父级->;

& lt/parent>。

& ltgroupId>。com . Baba . wlb & lt;/groupId>。

& ltartifactId>。回弹_emqx<。/artifactId >

& lt版本>。1.0-快照<。/version>。

& lt名称>。回弹_emqx<。/name>。

& ltdeion>。弹簧靴演示项目& lt/deion>。

& lt属性>。

& ltjava.version>。1.8<。/java.version>。

& lt/properties>。

& lt依赖关系>。

& lt依赖性>。

& ltgroupId>。org . spring framework . boot & lt;/groupId>。

& ltartifactId>。spring-boot-starter-integration & lt。/artifactId >

& lt/dependency>。

& lt依赖性>。

& ltgroupId>。org . spring framework . integration & lt;/groupId>。

& ltartifactId>。spring集成流<。/artifactId >

& lt/dependency>。

& lt依赖性>。

& ltgroupId>。org . spring framework . integration & lt;/groupId>。

& ltartifactId>。spring-integration-mqtt<。/artifactId >

& lt/dependency>。

& lt依赖性>。

& ltgroupId>。org . spring framework . boot & lt;/groupId>。

& ltartifactId>。spring-boot-starter-web<。/artifactId >

& lt/dependency>。

& lt依赖性>。

& ltgroupId>。org . spring framework . boot & lt;/groupId>。

& ltartifactId>。spring-boot-devtools<。/artifactId >

& lt范围>。运行时<。/scope>。

& lt可选>。true<。/optional>。

& lt/dependency>。

& lt依赖性>。

& ltgroupId>。org.projectlombok<。/groupId>。

& ltartifactId>。lombok<。/artifactId >

& lt可选>。true<。/optional>。

& lt/dependency>。

& lt依赖性>。

& ltgroupId>。org . spring framework . boot & lt;/groupId>。

& ltartifactId>。弹簧-启动-启动-测试<。/artifactId >

& lt范围>。测试<。/scope>。

& lt/dependency>。

& lt依赖性>。

& ltgroupId>。org . spring framework . integration & lt;/groupId>。

& ltartifactId>。spring集成测试<。/artifactId >

& lt范围>。测试<。/scope>。

& lt/dependency>。

& lt/dependencies>。

& ltbuild>。

& lt插件>。

& ltplugin>。

& ltgroupId>。org . spring framework . boot & lt;/groupId>。

& ltartifactId>。spring-boot-maven-plug & lt;/artifactId >

& lt!-& lt;配置>。->;

& lt!-& lt;排除>。->;

& lt!-& lt;排除>。->;

& lt!-& lt;groupId>。org.projectlombok<。/groupId>。->;

& lt!-& lt;artifactId>。lombok<。/artifactId >->;

& lt!-& lt;/exclude>。->;

& lt!-& lt;/excludes>。->;

& lt!-& lt;/configuration>。->;

& lt配置>。

& ltmainClass>。com . Baba . wlb . publish . PublishAPPLication & lt;/mainClass >

& lt/configuration>。

& lt/plugin>。

& lt/plugins>。

& lt/build>。

& lt/project>。

创建子项目

在父项目中单击“新建”。>。模块>接下来分别创建三个子项目:

回弹_ emqx _公共

回弹_ emqx _发布

回弹_emqx_subscribe

回弹_ emqx _公共

在此模块下,创建以下软件包

注意:(配置包中暂时没有公共配置,因为我尝试了很久,发现只有main class‘main class’可以加载到dropped配置中,其他模块不能加载通用配置。不知道是不是漏了什么评论。希望懂这部分的人多多指教!所以我必须将配置分成不同的模块)

系统常数:Constants.java

package com . Baba . wlb . share . common;

/**

* @作者乌龙波

* @日期2020/12/29 13:50

* @ 1.0版

*/

/**

*系统常数

*/

公共类常量{

公共静态最终字符串MQTT _ PUBLISH _ CHannel = " MQTTPUBLISH CHannel ";

公共静态最终字符串MQTT _ SUBSCRIBE _ CHannel = " MQTTTSSUBSCRIBEChannel ";

}

Emqx配置类别:EmqxMqttProperties.java

package com . Baba . wlb . share . properties;

导入lombok。数据;

import org . spring framework . boot . context . properties . configuration properties;

import org . spring framework . stereotype . component;

/**

* @作者乌龙波

* @日期2020/12/29 11:33

* @ 1.0版

*/

/**

*配置文件

*/

@数据

@组件

@ ConfigurationPropertieS(" wulongbo . mqtt . emqx ")

公共类EmqxMqttProperties {

私有字符串用户名;

私有字符串密码;

私有字符串主机URl;

私有字符串客户端标识;

私有字符串默认主题;

私有整数超时;

私有整数保持活动;

整数私有qos

私有整数版本;

}

在资源资源目录下,创建一个新的应用程序-公共. yml的yml文件

注意:方法1:以application-*.yml的形式命名方法2:模块之间不需要写依赖配置。只需在公共模块的资源目录中添加一个配置文件夹,并在其中创建application.yml文件

这是官网介绍的(附官网地址)

https://docs . spring . io/spring-boot/docs/current/reference/html single/# boot-features-external-config-application-property-files

在这里选择第一种方法。

Yml配置文件:application-common.yml

乌龙波:

mqtt:

emqx:

用户名:admin

密码:公共

#tcp://ip:端口

主机-url: tcp://39.102.56.91:1883

client-id:wulongbo $ { random . value }

默认-主题:乌龙波_主题

#默认主题:$SYS/brokers/+/clients/#

超时:60

保持活力:60

#服务质量:{0:最多传输一次/1:至少分发一次,可重复/2:仅分发一次,可重复}

qos: 1

版本:4

注意:我自己的EMQX已经启用了Mysql身份验证登录和关闭匿名登录,所以我需要正确的用户名和密码

回弹_ emqx _发布

在此模块下,创建以下软件包

配置类:EmqxMqttConfig.java

package com . Baba . wlb . publish . config;

/**

* @作者乌龙波

* @日期2020/12/29 11:38

* @ 1.0版

*/

import com . Baba . wlb . share . common . constants;

import . com . Baba . wlb . share . properties . emqxmqtproperties;

import lombok . extern . SLF 4j . SLF 4j;

import org . eclipse . PAHO . client . mqtt v3 . MQTTConnectOptions;

import org . spring framework . context . annotation . Bean;

导入org . spring framework . context . annotation . configuration;

import org . spring framework . integration . annotation . integrationcomponentscan;

import org . spring framework . integration . annotation . ServiCe activator;

import org . spring framework . integration . channel . DirectChannel;

import org . spring framework . integration . mqtt . core . DefaultmqttpahoclientFactory;

import org . spring framework . integration . mqtt . core . mqttpahoclientfactory;

import org . spring framework . integration . mqtt . outbound . mqttpahomessagehandler;

import org . spring framework . messaging . message channel;

import org . spring framework . messaging . MessageHandler;

导入javax . annotation . resource;

/**

* EMQX配置工具类

*/

@配置

@IntegrationComponentScan //消息扫描

@Slf4j

公共类EmqxMqttConfig {

@资源

私有EmqxMqttProperties EmqxMqttProperties;

/**

* MQTT连接

*/

@豆

公共MQTTConnectOptions GetMQTTConnectOptions {

//设置相关属性

MQTTConnectOptions MQTTConnectOptions =新的MQTTConnectOptions;

mqtconnections . set username(emqxmqtproperties . getusername);

mqttconnectoptions . set password(emqxmqtproperties . getpassword . tochararray);

设置服务器uri(新字符串

配置类:EmqxMqttConfig.java

package com . Baba . wlb . subscribe . config;

/**

* @作者乌龙波

* @日期2020/12/29 11:38

* @ 1.0版

*/

import com . Baba . wlb . share . common . constants;

import . com . Baba . wlb . share . properties . emqxmqtproperties;

import lombok . extern . SLF 4j . SLF 4j;

import org . eclipse . PAHO . client . mqtt v3 . MQTTConnectOptions;

import org . spring framework . context . annotation . Bean;

导入org . spring framework . context . annotation . configuration;

import org . spring framework . integration . annotation . integrationcomponentscan;

import org . spring framework . integration . annotation . ServiCe activator;

import org . spring framework . integration . channel . DirectChannel;

import org . spring framework . integration . core . MessageProductor;

import org . spring framework . integration . mqtt . core . DefaultmqttpahoclientFactory;

import org . spring framework . integration . mqtt . core . mqttpahoclientfactory;

import org . spring framework . integration . mqtt . inbound . mqttpahomessagedrivenchanneladapter;

import org . spring framework . integration . mqtt . outbound . mqttpahomessagehandler;

import org . spring framework . integration . mqtt . support . DefaultPahomessageConverter;

import org . spring framework . messaging . message channel;

import org . spring framework . messaging . MessageHandler;

导入javax . annotation . resource;

/**

* EMQX配置工具类

*/

@配置

@IntegrationComponentScan //消息扫描

@Slf4j

公共类EmqxMqttConfig {

@资源

私有EmqxMqttProperties EmqxMqttProperties;

/**

* MQTT连接

*/

@豆

公共MQTTConnectOptions GetMQTTConnectOptions {

//设置相关属性

MQTTConnectOptions MQTTConnectOptions =新的MQTTConnectOptions;

mqtconnections . set username(emqxmqtproperties . getusername);

mqttconnectoptions . set password(emqxmqtproperties . getpassword . tochararray);

设置服务器uri(新字符串

并按照下图依次添加模块之间的依赖关系

最后,我们可以分别在发布和订阅模块的pom文件中引入公共依赖

& lt依赖关系>。

& lt依赖性>。

& ltgroupId>。com . Baba . wlb & lt;/groupId>。

& ltartifactId>。回弹_ emqx _ common & lt/artifactId >

& lt版本>。1.0-快照<。/version>。

& lt/dependency>。

& lt/dependencies>。

启动项目

分别启动发布应用程序和订阅应用程序

端口是:1001,1002

邮递员测试

打开邮递员:启动获取请求

localhost:1001/publish/emqxppublish?topic =乌龙波_ topic & amp数据=我是一条信息

至于服务业务模块对消息的处理,是根据主题过滤还是根据播放负载区分,取决于具体的业务场景和设计需求。当然EMQX有一个更解耦的方式,就是规则引擎响应每个事件,还有一个HTTP API供我们调用,读者可以灵活使用。

SegmentFault认为社区应该与文章作者有更多的互动和交流。回搜狐多看

负责编辑:


以上就是网关绑定MQTT实现发布订阅202003的全部内容了,喜欢我们网站的可以继续关注弘蓓股票网其他的资讯!