Springboot结合Netty对接硬件,实现主动发送报文和接受硬件报文(ModbusRTU或者TCP以及DTU)

Springboot结合***ty对接硬件,实现主动发送报文和接受硬件报文(ModbusRTU或者TCP以及DTU

一,认识

需要了解***ty 基本知识原理,不了解的可以查看我之前的博客,以及网上的资料,这里不在过多撰述。

二,开发思路

这里以对接硬件雷达水位计为例:

说一下思路, 这里场景各种设备连接DTU,然后通过DTU上报报文,和接收服务器下发的指令。

例如127.0.0.1:2233 就是你服务器的ip和端口,我们需要开发部署一个 java 开发的***ty 服务器来监听 2233端口, 从机配置我们的服务器ip和端口连接到***ty。

那么我们开发***ty 的思路应该是什么样子的。

  1. ***ty 监听端口;
  2. ***ty 保存通道长链接;
  3. 将***ty 的 里面的所有通过 存放到一个 ConcurrentHashMap 里面来进行管理;
  4. 通过 ***ty 监听 我们可以获取 从机上报到服务器的报文,我们进行业务处理;
  5. 通过Map 我们实现 定时下发报文,让从机回复响应;

三,准备工作

3.1 引入springboot依赖

springboot,依赖, 去掉tomcat ,我们这里只做服务器,并不需要tomcat,以及只用 starter

	<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
    </parent>

	<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
    </dependency>

3.2 ***ty 核心包

		<!-- ***ty架包 -->
        <dependency>
            <groupId>io.***ty</groupId>
            <artifactId>***ty-all</artifactId>
        </dependency>

3.2 以及hutool 这里我们会用 它的定时器

		<dependency>
            <groupId>***.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>4.6.1</version>
        </dependency>

其他相关依赖 不在撰写, 数据库依赖以及 工具类依赖 ,自己按需引用

四,编写代码

4.1 编写***ty服务器

不在过多解释代码,每行都有注释

package ***.joygis.iot.***ty.server;

import io.***ty.bootstrap.ServerBootstrap;
import io.***ty.channel.ChannelFuture;
import io.***ty.channel.ChannelOption;
import io.***ty.channel.EventLoopGroup;
import io.***ty.channel.nio.NioEventLoopGroup;
import io.***ty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.***ponent;

import java.***.I***SocketAddress;

/**
 * 功能描述: ***ty服务启动类
 *
 * @Author keLe
 * @Date 2022/8/26
 */
@Slf4j
@***ponent
public class ***tyServer {
    public void start(I***SocketAddress address) {
        //配置服务端的NIO线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 绑定线程池,编码解码
            //服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝
            ServerBootstrap bootstrap = new ServerBootstrap()
                    .group(bossGroup, workerGroup)
                    // 指定Channel
                    .channel(NioServerSocketChannel.class)
                    //使用指定的端口设置套接字地址
                    .localAddress(address)
                    //使用自定义处理类
                    .childHandler(new ***tyServerChannelInitializer())
                    //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //保持长连接,2小时无数据激活心跳机制
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //将小的数据包包装成更大的帧进行传送,提高网络的负载
                    .childOption(ChannelOption.TCP_NODELAY, true);
            // 绑定端口,开始接收进来的连接
            ChannelFuture future = bootstrap.bind(address).sync();
            if (future.isSu***ess()) {
                log.info("***ty服务器开始监听端口:{}",address.getPort());
            }
            //关闭channel和块,直到它被关闭
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

4.2 编写***ty服务器 自定义处理类

package ***.joygis.iot.***ty.server;

import ***.joygis.iot.***ty.MyDecoder;
import ***.joygis.iot.***ty.MyEncoder;
import io.***ty.channel.ChannelInitializer;
import io.***ty.channel.ChannelPipeline;
import io.***ty.channel.socket.SocketChannel;
import io.***ty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

/**
 * 功能描述: 服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器
 *
 * @Author keLe
 * @Date 2022/8/26
 */
public class ***tyServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        //接收消息格式,使用自定义解析数据格式
        pipeline.addLast("decoder",new MyDecoder());
        //发送消息格式,使用自定义解析数据格式
        pipeline.addLast("encoder",new MyEncoder());

        //针对客户端,如果在1分钟时没有想服务端发送写心跳(ALL),则主动断开
        //如果是读空闲或者写空闲,不处理,这里根据自己业务考虑使用
        //pipeline.addLast(new IdleStateHandler(600,0,0, TimeUnit.SECONDS));
        //自定义的空闲检测
        pipeline.addLast(new ***tyServerHandler());
    }
}

接收消息格式,使用自定义解析数据格式工具类(5.23更新拆包粘包处理)

package ***.joygis.iot.***ty;

import io.***ty.buffer.ByteBuf;
import io.***ty.channel.ChannelHandlerContext;
import io.***ty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * 功能描述: 自定义接收消息格式
 *
 * @Author keLe
 * @Date 2022/8/26
 */
public class MyDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        if (byteBuf.readableBytes() < 8) {
            return;
        }
        //我们标记一下当前的readIndex的位置
        byteBuf.markReaderIndex();
        // 读取传送过来的消息的长度。ByteBuf 的readInt()方法会让他的readIndex增加4
        int dataLength = 8;

        //读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方
        if (byteBuf.readableBytes() < dataLength) {
            byteBuf.resetReaderIndex();
            return;
        }else if(byteBuf.readableBytes() == 9){
            byteBuf.markReaderIndex();
            byte[] data = new byte[9];
            byteBuf.readBytes(data);
            String msg = bytesToHexString(data);
            list.add(msg);
        }else {
            byte[] data = new byte[dataLength];
            byteBuf.readBytes(data);
            String msg = bytesToHexString(data);
            list.add(msg);
        }
    }

    public String bytesToHexString(byte[] bArray) {
        StringBuffer sb = new StringBuffer(bArray.length);
        String sTemp;
        for (int i = 0; i < bArray.length; i++) {
            sTemp = Integer.toHexString(0xFF & bArray[i]);
            if (sTemp.length() < 2) {
                sb.append(0);
            }
            sb.append(sTemp.toUpperCase());
        }
        return sb.toString();
    }

    public static String toHexString1(byte[] b) {
        StringBuffer buffer = new StringBuffer();
        for (int i = 0; i < b.length; ++i) {
            buffer.append(toHexString1(b[i]));
        }
        return buffer.toString();
    }

    public static String toHexString1(byte b) {
        String s = Integer.toHexString(b & 0xFF);
        if (s.length() == 1) {
            return "0" + s;
        } else {
            return s;
        }
    }

}

自定义发送消息格式,使用自定义解析数据格式工具类

/**
 * 功能描述: 自定义发送消息格式
 *
 * @Author keLe
 * @Date 2022/8/26
 */
public class MyEncoder extends MessageToByteEncoder<String> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) throws Exception {
        //将16进制字符串转为数组
        byteBuf.writeBytes(hexString2Bytes(s));
    }

    /**
     * 功能描述: 16进制字符串转字节数组
     * @Author keLe
     * @Date 2022/8/26
     * @param src 16进制字符串
     * @return byte[]
     */
    public static byte[] hexString2Bytes(String src) {
        int l = src.length() / 2;
        byte[] ret = new byte[l];
        for (int i = 0; i < l; i++) {
            ret[i] = (byte) Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue();
        }
        return ret;
    }
}

4.3 编写***ty服务器 报文处理类

package ***.joygis.iot.***ty.server;

import ***.joygis.iot.***ty.ChannelMap;
import io.***ty.channel.Channel;
import io.***ty.channel.ChannelHandlerContext;
import io.***ty.channel.ChannelId;
import io.***ty.channel.ChannelInboundHandlerAdapter;
import io.***ty.handler.timeout.IdleState;
import io.***ty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;

import java.***.I***SocketAddress;

/**
 * 功能描述: ***ty服务端处理类
 *
 * @Author keLe
 * @Date 2022/8/26
 */
@Slf4j
public class ***tyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 功能描述: 有客户端连接服务器会触发此函数
     * @Author keLe
     * @Date 2022/8/26
     * @param  ctx 通道
     * @return void
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        I***SocketAddress insocket = (I***SocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        int clientPort = insocket.getPort();
        //获取连接通道唯一标识
        ChannelId channelId = ctx.channel().id();
        //如果map中不包含此连接,就保存连接
        if (ChannelMap.getChannelMap().containsKey(channelId)) {
            log.info("客户端:{},是连接状态,连接通道数量:{} ",channelId,ChannelMap.getChannelMap().size());
        } else {
            //保存连接
            ChannelMap.addChannel(channelId, ctx.channel());
            log.info("客户端:{},连接***ty服务器[IP:{}-->PORT:{}]",channelId, clientIp,clientPort);
            log.info("连接通道数量: {}",ChannelMap.getChannelMap().size());
        }
    }

    /**
     * 功能描述: 有客户端终止连接服务器会触发此函数
     * @Author keLe
     * @Date 2022/8/26
     * @param  ctx 通道处理程序上下文
     * @return void
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        I***SocketAddress inSocket = (I***SocketAddress) ctx.channel().remoteAddress();
        String clientIp = inSocket.getAddress().getHostAddress();
        ChannelId channelId = ctx.channel().id();
        //包含此客户端才去删除
        if (ChannelMap.getChannelMap().containsKey(channelId)) {
            //删除连接
            ChannelMap.getChannelMap().remove(channelId);
            log.info("客户端:{},连接***ty服务器[IP:{}-->PORT:{}]",channelId, clientIp,inSocket.getPort());
            log.info("连接通道数量: " + ChannelMap.getChannelMap().size());
        }
    }

    /**
     * 功能描述: 有客户端发消息会触发此函数
     * @Author keLe
     * @Date 2022/8/26
     * @param  ctx 通道处理程序上下文
     * @param  msg 客户端发送的消息
     * @return void
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("加载客户端报文,客户端id:{},客户端消息:{}",ctx.channel().id(), msg);
        String data = String.valueOf(msg);
        Integer water = Integer.parseInt(data.substring(6,10),16);
        log.info("当前水位:{}cm",water);
        //响应客户端
        //this.channelWrite(ctx.channel().id(), msg);
    }

   /* @Override
    public void channelRead***plete(ChannelHandlerContext ctx) throws Exception {
        String bytes = "01 03 00 02 00 01 25 CA";
        ctx.writeAndFlush(bytes);
    }*/

    /**
     * 功能描述: 服务端给客户端发送消息
     * @Author keLe
     * @Date 2022/8/26
     * @param  channelId 连接通道唯一id
     * @param  msg 需要发送的消息内容
     * @return void
     */
    public void channelWrite(ChannelId channelId, Object msg) throws Exception {
        Channel channel = ChannelMap.getChannelMap().get(channelId);
        if (channel == null) {
            log.info("通道:{},不存在",channelId);
            return;
        }
        if (msg == null || msg == "") {
            log.info("服务端响应空的消息");
            return;
        }
        //将客户端的信息直接返回写入ctx
        channel.write(msg);
        //刷新缓存区
        channel.flush();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        String socketString = ctx.channel().remoteAddress().toString();
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                log.info("Client:{},READER_IDLE 读超时",socketString);
                ctx.disconnect();
                Channel channel = ctx.channel();
                ChannelId id = channel.id();
                ChannelMap.removeChannelByName(id);
            } else if (event.state() == IdleState.WRITER_IDLE) {
                log.info("Client:{}, WRITER_IDLE 写超时",socketString);
                ctx.disconnect();
                Channel channel = ctx.channel();
                ChannelId id = channel.id();
                ChannelMap.removeChannelByName(id);
            } else if (event.state() == IdleState.ALL_IDLE) {
                log.info("Client:{},ALL_IDLE 总超时",socketString);
                ctx.disconnect();
                Channel channel = ctx.channel();
                ChannelId id = channel.id();
                ChannelMap.removeChannelByName(id);
            }
        }
    }

    /**
     * 功能描述: 发生异常会触发此函数
     * @Author keLe
     * @Date 2022/8/26
     * @param  ctx 通道处理程序上下文
     * @param  cause 异常
     * @return void
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        log.info("{}:发生了错误,此连接被关闭。此时连通数量:{}",ctx.channel().id(),ChannelMap.getChannelMap().size());
    }

}

4.4 编写管理通道Map类

package ***.joygis.iot.***ty;

import io.***ty.channel.Channel;
import io.***ty.channel.ChannelId;
import io.***ty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.util.CollectionUtils;

import java.util.concurrent.ConcurrentHashMap;

/**
 * 功能描述: 管理通道Map类
 *
 * @Author keLe
 * @Date 2022/8/26
 */
public class ChannelMap {

    /**
     * 管理一个全局map,保存连接进服务端的通道数量
     */
    private static final ConcurrentHashMap<ChannelId, Channel> CHANNEL_MAP = new ConcurrentHashMap<>(128);

    public static ConcurrentHashMap<ChannelId, Channel> getChannelMap() {
        return CHANNEL_MAP;
    }

    /**
     *  获取指定name的channel
     */
    public static Channel getChannelByName(ChannelId channelId){
        if(CollectionUtils.isEmpty(CHANNEL_MAP)){
            return null;
        }
        return CHANNEL_MAP.get(channelId);
    }

    /**
     *  将通道中的消息推送到每一个客户端
     */
    public static boolean pushNewsToAllClient(String obj){
        if(CollectionUtils.isEmpty(CHANNEL_MAP)){
            return false;
        }
        for(ChannelId channelId: CHANNEL_MAP.keySet()) {
            Channel channel = CHANNEL_MAP.get(channelId);
            channel.writeAndFlush(new TextWebSocketFrame(obj));
        }
        return true;
    }

    /**
     *  将channel和对应的name添加到ConcurrentHashMap
     */
    public static void addChannel(ChannelId channelId,Channel channel){
        CHANNEL_MAP.put(channelId,channel);
    }

    /**
     *  移除掉name对应的channel
     */
    public static boolean removeChannelByName(ChannelId channelId){
        if(CHANNEL_MAP.containsKey(channelId)){
            CHANNEL_MAP.remove(channelId);
            return true;
        }
        return false;
    }

}

4.5 编写配置类

package ***.joygis.iot.***ty;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.***ponent;

/**
 * 功能描述: 配置类
 *
 * @Author keLe
 * @Date 2022/8/26
 */
@Setter
@Getter
@ToString
@***ponent
@Configuration
@PropertySource("classpath:application.yml")
@ConfigurationProperties(prefix = "socket")
public class SocketProperties {
    private Integer port;
    private String host;

}

appliction.yml

spring:
  profiles:
    active: test
  resources:
    cache:
      period: 0
  application:
    name: iot-***ty

socket:
  # 监听端口 8090
  port: 8090
  #ip地址
  host: 127.0.0.1

4.6 springboot 启动,***ty也自启动,任务定时器池也启动

package ***.joygis.iot.config;

import ***.hutool.cron.CronUtil;

import ***.joygis.iot.***ty.SocketProperties;
import ***.joygis.iot.***ty.server.***tyServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.***mandLineRunner;
import org.springframework.stereotype.***ponent;

import javax.annotation.Resource;
import java.***.I***SocketAddress;

/**
 * 功能描述: 任务队列
 * @Author keLe
 * @Date 2022/7/20
 */
@***ponent
@Slf4j
public class LaunchRunner implements ***mandLineRunner {

    @Resource
    private ***tyServer ***tyServer;

    @Resource
    private SocketProperties socketProperties;


    @Override
    public void run(String... args) throws Exception {
        TaskRunner();
        I***SocketAddress address = new I***SocketAddress(socketProperties.getHost(),socketProperties.getPort());
        log.info("***ty服务器启动地址:"+socketProperties.getHost());
        ***tyServer.start(address);
    }
    /**
     * 执行正在运行的任务
     */
    private  void TaskRunner() {
        /**
         * 任务队列启动
         */
        CronUtil.setMatchSecond(true);
        CronUtil.start();
        log.info("\n-----------------------任务服务启动------------------------\n\t" +
                        "当前正在启动的{}个任务"+
                        "\n-----------------------------------------------------------\n\t"
                , CronUtil.getScheduler().size()

        );
    }


}

4.7 编写定时下发报文

两个定时器,一个定时下发报文,一个定时删除不活跃的连接

package ***.joygis.iot.manage;

import ***.joygis.iot.***ty.ChannelMap;
import io.***ty.buffer.ByteBuf;
import io.***ty.buffer.Unpooled;
import io.***ty.channel.Channel;
import io.***ty.channel.ChannelFutureListener;
import io.***ty.channel.ChannelId;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.***ponent;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 功能描述: 定时发送Dtu报文
 *
 * @Author keLe
 * @Date 2022/8/29
 */
@Slf4j
@***ponent
public class DtuManage {


    public void sendMsg(){
        ConcurrentHashMap<ChannelId, Channel> channelMap = ChannelMap.getChannelMap();
        if(CollectionUtils.isEmpty(channelMap)){
            return;
        }
        ConcurrentHashMap.KeySetView<ChannelId, Channel> channelIds = channelMap.keySet();
        byte[] msgBytes = {0x01, 0x03, 0x00, 0x02, 0x00, 0x01, 0x25, (byte) 0xCA};
        for(ChannelId channelId : channelIds){
            Channel channel = ChannelMap.getChannelByName(channelId);
            // 判断是否活跃
            if(channel==null || !channel.isActive()){
                ChannelMap.getChannelMap().remove(channelId);
                log.info("客户端:{},连接已经中断",channelId);
                return ;
            }
            // 指令发送
            ByteBuf buffer = Unpooled.buffer();
            log.info("开始发送报文:{}",Arrays.toString(msgBytes));
            buffer.writeBytes(msgBytes);
            channel.writeAndFlush(buffer).addListener((ChannelFutureListener) future -> {
                if (future.isSu***ess()) {
                    log.info("客户端:{},回写成功:{}",channelId,Arrays.toString(msgBytes));
                } else {
                    log.info("客户端:{},回写失败:{}",channelId,Arrays.toString(msgBytes));
                }
            });
        }
    }

    /**
     * 功能描述: 定时删除不活跃的连接
     * @Author keLe
     * @Date 2022/8/26
     * @return void
     */
    public void deleteInactiveConnections(){
        ConcurrentHashMap<ChannelId, Channel> channelMap = ChannelMap.getChannelMap();
        if(!CollectionUtils.isEmpty(channelMap)){
            for (Map.Entry<ChannelId, Channel> next : channelMap.entrySet()) {
                ChannelId channelId = next.getKey();
                Channel channel = next.getValue();
                if (!channel.isActive()) {
                    channelMap.remove(channelId);
                    log.info("客户端:{},连接已经中断",channelId);
                }
            }
        }
    }
}

五 ,测试

使用网络助手进行调试
百度云盘下载地址:https://pan.baidu.***/s/1dcVk9MH88RMRF9dmR3mH5g
提取码:z7h0

5.1 启动服务

5.2 通过网络助手连接发送指令

发送报文

定时发送报文

六 总结

我们有了方向,才有了思路,就会有具体落地。
如果对***ty还有不太明白的地方,可以看看我的后续博客,持续更新中。

转载请说明出处内容投诉
CSS教程_站长资源网 » Springboot结合Netty对接硬件,实现主动发送报文和接受硬件报文(ModbusRTU或者TCP以及DTU)

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买