Springboot结合***ty对接硬件,实现主动发送报文和接受硬件报文(ModbusRTU或者TCP以及DTU)
一,认识
需要了解***ty 基本知识原理,不了解的可以查看我之前的博客,以及网上的资料,这里不在过多撰述。
二,开发思路
这里以对接硬件雷达水位计为例:
说一下思路, 这里场景各种设备连接DTU,然后通过DTU上报报文,和接收服务器下发的指令。
例如127.0.0.1:2233 就是你服务器的ip和端口,我们需要开发部署一个 java 开发的***ty 服务器来监听 2233端口, 从机配置我们的服务器ip和端口连接到***ty。
那么我们开发***ty 的思路应该是什么样子的。
- ***ty 监听端口;
- ***ty 保存通道长链接;
- 将***ty 的 里面的所有通过 存放到一个 ConcurrentHashMap 里面来进行管理;
- 通过 ***ty 监听 我们可以获取 从机上报到服务器的报文,我们进行业务处理;
- 通过Map 我们实现 定时下发报文,让从机回复响应;
三,准备工作
3.1 引入springboot依赖
springboot,依赖, 去掉tomcat ,我们这里只做服务器,并不需要tomcat,以及只用 starter
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
</parent>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
3.2 ***ty 核心包
<!-- ***ty架包 -->
<dependency>
<groupId>io.***ty</groupId>
<artifactId>***ty-all</artifactId>
</dependency>
3.2 以及hutool 这里我们会用 它的定时器
<dependency>
<groupId>***.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.6.1</version>
</dependency>
其他相关依赖 不在撰写, 数据库依赖以及 工具类依赖 ,自己按需引用
四,编写代码
4.1 编写***ty服务器
不在过多解释代码,每行都有注释
package ***.joygis.iot.***ty.server;
import io.***ty.bootstrap.ServerBootstrap;
import io.***ty.channel.ChannelFuture;
import io.***ty.channel.ChannelOption;
import io.***ty.channel.EventLoopGroup;
import io.***ty.channel.nio.NioEventLoopGroup;
import io.***ty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.***ponent;
import java.***.I***SocketAddress;
/**
* 功能描述: ***ty服务启动类
*
* @Author keLe
* @Date 2022/8/26
*/
@Slf4j
@***ponent
public class ***tyServer {
public void start(I***SocketAddress address) {
//配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 绑定线程池,编码解码
//服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
// 指定Channel
.channel(NioServerSocketChannel.class)
//使用指定的端口设置套接字地址
.localAddress(address)
//使用自定义处理类
.childHandler(new ***tyServerChannelInitializer())
//服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
.option(ChannelOption.SO_BACKLOG, 128)
//保持长连接,2小时无数据激活心跳机制
.childOption(ChannelOption.SO_KEEPALIVE, true)
//将小的数据包包装成更大的帧进行传送,提高网络的负载
.childOption(ChannelOption.TCP_NODELAY, true);
// 绑定端口,开始接收进来的连接
ChannelFuture future = bootstrap.bind(address).sync();
if (future.isSu***ess()) {
log.info("***ty服务器开始监听端口:{}",address.getPort());
}
//关闭channel和块,直到它被关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
4.2 编写***ty服务器 自定义处理类
package ***.joygis.iot.***ty.server;
import ***.joygis.iot.***ty.MyDecoder;
import ***.joygis.iot.***ty.MyEncoder;
import io.***ty.channel.ChannelInitializer;
import io.***ty.channel.ChannelPipeline;
import io.***ty.channel.socket.SocketChannel;
import io.***ty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
/**
* 功能描述: 服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器
*
* @Author keLe
* @Date 2022/8/26
*/
public class ***tyServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//接收消息格式,使用自定义解析数据格式
pipeline.addLast("decoder",new MyDecoder());
//发送消息格式,使用自定义解析数据格式
pipeline.addLast("encoder",new MyEncoder());
//针对客户端,如果在1分钟时没有想服务端发送写心跳(ALL),则主动断开
//如果是读空闲或者写空闲,不处理,这里根据自己业务考虑使用
//pipeline.addLast(new IdleStateHandler(600,0,0, TimeUnit.SECONDS));
//自定义的空闲检测
pipeline.addLast(new ***tyServerHandler());
}
}
接收消息格式,使用自定义解析数据格式工具类(5.23更新拆包粘包处理)
package ***.joygis.iot.***ty;
import io.***ty.buffer.ByteBuf;
import io.***ty.channel.ChannelHandlerContext;
import io.***ty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* 功能描述: 自定义接收消息格式
*
* @Author keLe
* @Date 2022/8/26
*/
public class MyDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
if (byteBuf.readableBytes() < 8) {
return;
}
//我们标记一下当前的readIndex的位置
byteBuf.markReaderIndex();
// 读取传送过来的消息的长度。ByteBuf 的readInt()方法会让他的readIndex增加4
int dataLength = 8;
//读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方
if (byteBuf.readableBytes() < dataLength) {
byteBuf.resetReaderIndex();
return;
}else if(byteBuf.readableBytes() == 9){
byteBuf.markReaderIndex();
byte[] data = new byte[9];
byteBuf.readBytes(data);
String msg = bytesToHexString(data);
list.add(msg);
}else {
byte[] data = new byte[dataLength];
byteBuf.readBytes(data);
String msg = bytesToHexString(data);
list.add(msg);
}
}
public String bytesToHexString(byte[] bArray) {
StringBuffer sb = new StringBuffer(bArray.length);
String sTemp;
for (int i = 0; i < bArray.length; i++) {
sTemp = Integer.toHexString(0xFF & bArray[i]);
if (sTemp.length() < 2) {
sb.append(0);
}
sb.append(sTemp.toUpperCase());
}
return sb.toString();
}
public static String toHexString1(byte[] b) {
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < b.length; ++i) {
buffer.append(toHexString1(b[i]));
}
return buffer.toString();
}
public static String toHexString1(byte b) {
String s = Integer.toHexString(b & 0xFF);
if (s.length() == 1) {
return "0" + s;
} else {
return s;
}
}
}
自定义发送消息格式,使用自定义解析数据格式工具类
/**
* 功能描述: 自定义发送消息格式
*
* @Author keLe
* @Date 2022/8/26
*/
public class MyEncoder extends MessageToByteEncoder<String> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) throws Exception {
//将16进制字符串转为数组
byteBuf.writeBytes(hexString2Bytes(s));
}
/**
* 功能描述: 16进制字符串转字节数组
* @Author keLe
* @Date 2022/8/26
* @param src 16进制字符串
* @return byte[]
*/
public static byte[] hexString2Bytes(String src) {
int l = src.length() / 2;
byte[] ret = new byte[l];
for (int i = 0; i < l; i++) {
ret[i] = (byte) Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue();
}
return ret;
}
}
4.3 编写***ty服务器 报文处理类
package ***.joygis.iot.***ty.server;
import ***.joygis.iot.***ty.ChannelMap;
import io.***ty.channel.Channel;
import io.***ty.channel.ChannelHandlerContext;
import io.***ty.channel.ChannelId;
import io.***ty.channel.ChannelInboundHandlerAdapter;
import io.***ty.handler.timeout.IdleState;
import io.***ty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import java.***.I***SocketAddress;
/**
* 功能描述: ***ty服务端处理类
*
* @Author keLe
* @Date 2022/8/26
*/
@Slf4j
public class ***tyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 功能描述: 有客户端连接服务器会触发此函数
* @Author keLe
* @Date 2022/8/26
* @param ctx 通道
* @return void
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
I***SocketAddress insocket = (I***SocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();
int clientPort = insocket.getPort();
//获取连接通道唯一标识
ChannelId channelId = ctx.channel().id();
//如果map中不包含此连接,就保存连接
if (ChannelMap.getChannelMap().containsKey(channelId)) {
log.info("客户端:{},是连接状态,连接通道数量:{} ",channelId,ChannelMap.getChannelMap().size());
} else {
//保存连接
ChannelMap.addChannel(channelId, ctx.channel());
log.info("客户端:{},连接***ty服务器[IP:{}-->PORT:{}]",channelId, clientIp,clientPort);
log.info("连接通道数量: {}",ChannelMap.getChannelMap().size());
}
}
/**
* 功能描述: 有客户端终止连接服务器会触发此函数
* @Author keLe
* @Date 2022/8/26
* @param ctx 通道处理程序上下文
* @return void
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
I***SocketAddress inSocket = (I***SocketAddress) ctx.channel().remoteAddress();
String clientIp = inSocket.getAddress().getHostAddress();
ChannelId channelId = ctx.channel().id();
//包含此客户端才去删除
if (ChannelMap.getChannelMap().containsKey(channelId)) {
//删除连接
ChannelMap.getChannelMap().remove(channelId);
log.info("客户端:{},连接***ty服务器[IP:{}-->PORT:{}]",channelId, clientIp,inSocket.getPort());
log.info("连接通道数量: " + ChannelMap.getChannelMap().size());
}
}
/**
* 功能描述: 有客户端发消息会触发此函数
* @Author keLe
* @Date 2022/8/26
* @param ctx 通道处理程序上下文
* @param msg 客户端发送的消息
* @return void
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("加载客户端报文,客户端id:{},客户端消息:{}",ctx.channel().id(), msg);
String data = String.valueOf(msg);
Integer water = Integer.parseInt(data.substring(6,10),16);
log.info("当前水位:{}cm",water);
//响应客户端
//this.channelWrite(ctx.channel().id(), msg);
}
/* @Override
public void channelRead***plete(ChannelHandlerContext ctx) throws Exception {
String bytes = "01 03 00 02 00 01 25 CA";
ctx.writeAndFlush(bytes);
}*/
/**
* 功能描述: 服务端给客户端发送消息
* @Author keLe
* @Date 2022/8/26
* @param channelId 连接通道唯一id
* @param msg 需要发送的消息内容
* @return void
*/
public void channelWrite(ChannelId channelId, Object msg) throws Exception {
Channel channel = ChannelMap.getChannelMap().get(channelId);
if (channel == null) {
log.info("通道:{},不存在",channelId);
return;
}
if (msg == null || msg == "") {
log.info("服务端响应空的消息");
return;
}
//将客户端的信息直接返回写入ctx
channel.write(msg);
//刷新缓存区
channel.flush();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
String socketString = ctx.channel().remoteAddress().toString();
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
log.info("Client:{},READER_IDLE 读超时",socketString);
ctx.disconnect();
Channel channel = ctx.channel();
ChannelId id = channel.id();
ChannelMap.removeChannelByName(id);
} else if (event.state() == IdleState.WRITER_IDLE) {
log.info("Client:{}, WRITER_IDLE 写超时",socketString);
ctx.disconnect();
Channel channel = ctx.channel();
ChannelId id = channel.id();
ChannelMap.removeChannelByName(id);
} else if (event.state() == IdleState.ALL_IDLE) {
log.info("Client:{},ALL_IDLE 总超时",socketString);
ctx.disconnect();
Channel channel = ctx.channel();
ChannelId id = channel.id();
ChannelMap.removeChannelByName(id);
}
}
}
/**
* 功能描述: 发生异常会触发此函数
* @Author keLe
* @Date 2022/8/26
* @param ctx 通道处理程序上下文
* @param cause 异常
* @return void
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
log.info("{}:发生了错误,此连接被关闭。此时连通数量:{}",ctx.channel().id(),ChannelMap.getChannelMap().size());
}
}
4.4 编写管理通道Map类
package ***.joygis.iot.***ty;
import io.***ty.channel.Channel;
import io.***ty.channel.ChannelId;
import io.***ty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.util.CollectionUtils;
import java.util.concurrent.ConcurrentHashMap;
/**
* 功能描述: 管理通道Map类
*
* @Author keLe
* @Date 2022/8/26
*/
public class ChannelMap {
/**
* 管理一个全局map,保存连接进服务端的通道数量
*/
private static final ConcurrentHashMap<ChannelId, Channel> CHANNEL_MAP = new ConcurrentHashMap<>(128);
public static ConcurrentHashMap<ChannelId, Channel> getChannelMap() {
return CHANNEL_MAP;
}
/**
* 获取指定name的channel
*/
public static Channel getChannelByName(ChannelId channelId){
if(CollectionUtils.isEmpty(CHANNEL_MAP)){
return null;
}
return CHANNEL_MAP.get(channelId);
}
/**
* 将通道中的消息推送到每一个客户端
*/
public static boolean pushNewsToAllClient(String obj){
if(CollectionUtils.isEmpty(CHANNEL_MAP)){
return false;
}
for(ChannelId channelId: CHANNEL_MAP.keySet()) {
Channel channel = CHANNEL_MAP.get(channelId);
channel.writeAndFlush(new TextWebSocketFrame(obj));
}
return true;
}
/**
* 将channel和对应的name添加到ConcurrentHashMap
*/
public static void addChannel(ChannelId channelId,Channel channel){
CHANNEL_MAP.put(channelId,channel);
}
/**
* 移除掉name对应的channel
*/
public static boolean removeChannelByName(ChannelId channelId){
if(CHANNEL_MAP.containsKey(channelId)){
CHANNEL_MAP.remove(channelId);
return true;
}
return false;
}
}
4.5 编写配置类
package ***.joygis.iot.***ty;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.***ponent;
/**
* 功能描述: 配置类
*
* @Author keLe
* @Date 2022/8/26
*/
@Setter
@Getter
@ToString
@***ponent
@Configuration
@PropertySource("classpath:application.yml")
@ConfigurationProperties(prefix = "socket")
public class SocketProperties {
private Integer port;
private String host;
}
appliction.yml
spring:
profiles:
active: test
resources:
cache:
period: 0
application:
name: iot-***ty
socket:
# 监听端口 8090
port: 8090
#ip地址
host: 127.0.0.1
4.6 springboot 启动,***ty也自启动,任务定时器池也启动
package ***.joygis.iot.config;
import ***.hutool.cron.CronUtil;
import ***.joygis.iot.***ty.SocketProperties;
import ***.joygis.iot.***ty.server.***tyServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.***mandLineRunner;
import org.springframework.stereotype.***ponent;
import javax.annotation.Resource;
import java.***.I***SocketAddress;
/**
* 功能描述: 任务队列
* @Author keLe
* @Date 2022/7/20
*/
@***ponent
@Slf4j
public class LaunchRunner implements ***mandLineRunner {
@Resource
private ***tyServer ***tyServer;
@Resource
private SocketProperties socketProperties;
@Override
public void run(String... args) throws Exception {
TaskRunner();
I***SocketAddress address = new I***SocketAddress(socketProperties.getHost(),socketProperties.getPort());
log.info("***ty服务器启动地址:"+socketProperties.getHost());
***tyServer.start(address);
}
/**
* 执行正在运行的任务
*/
private void TaskRunner() {
/**
* 任务队列启动
*/
CronUtil.setMatchSecond(true);
CronUtil.start();
log.info("\n-----------------------任务服务启动------------------------\n\t" +
"当前正在启动的{}个任务"+
"\n-----------------------------------------------------------\n\t"
, CronUtil.getScheduler().size()
);
}
}
4.7 编写定时下发报文
两个定时器,一个定时下发报文,一个定时删除不活跃的连接
package ***.joygis.iot.manage;
import ***.joygis.iot.***ty.ChannelMap;
import io.***ty.buffer.ByteBuf;
import io.***ty.buffer.Unpooled;
import io.***ty.channel.Channel;
import io.***ty.channel.ChannelFutureListener;
import io.***ty.channel.ChannelId;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.***ponent;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 功能描述: 定时发送Dtu报文
*
* @Author keLe
* @Date 2022/8/29
*/
@Slf4j
@***ponent
public class DtuManage {
public void sendMsg(){
ConcurrentHashMap<ChannelId, Channel> channelMap = ChannelMap.getChannelMap();
if(CollectionUtils.isEmpty(channelMap)){
return;
}
ConcurrentHashMap.KeySetView<ChannelId, Channel> channelIds = channelMap.keySet();
byte[] msgBytes = {0x01, 0x03, 0x00, 0x02, 0x00, 0x01, 0x25, (byte) 0xCA};
for(ChannelId channelId : channelIds){
Channel channel = ChannelMap.getChannelByName(channelId);
// 判断是否活跃
if(channel==null || !channel.isActive()){
ChannelMap.getChannelMap().remove(channelId);
log.info("客户端:{},连接已经中断",channelId);
return ;
}
// 指令发送
ByteBuf buffer = Unpooled.buffer();
log.info("开始发送报文:{}",Arrays.toString(msgBytes));
buffer.writeBytes(msgBytes);
channel.writeAndFlush(buffer).addListener((ChannelFutureListener) future -> {
if (future.isSu***ess()) {
log.info("客户端:{},回写成功:{}",channelId,Arrays.toString(msgBytes));
} else {
log.info("客户端:{},回写失败:{}",channelId,Arrays.toString(msgBytes));
}
});
}
}
/**
* 功能描述: 定时删除不活跃的连接
* @Author keLe
* @Date 2022/8/26
* @return void
*/
public void deleteInactiveConnections(){
ConcurrentHashMap<ChannelId, Channel> channelMap = ChannelMap.getChannelMap();
if(!CollectionUtils.isEmpty(channelMap)){
for (Map.Entry<ChannelId, Channel> next : channelMap.entrySet()) {
ChannelId channelId = next.getKey();
Channel channel = next.getValue();
if (!channel.isActive()) {
channelMap.remove(channelId);
log.info("客户端:{},连接已经中断",channelId);
}
}
}
}
}
五 ,测试
使用网络助手进行调试
百度云盘下载地址:https://pan.baidu.***/s/1dcVk9MH88RMRF9dmR3mH5g
提取码:z7h0
5.1 启动服务
5.2 通过网络助手连接发送指令
发送报文
定时发送报文
六 总结
我们有了方向,才有了思路,就会有具体落地。
如果对***ty还有不太明白的地方,可以看看我的后续博客,持续更新中。