我们使用的框架几乎都有网络通信的模块,比如常见的Dubbo、RocketMQ、ElasticSearch等。它们的网络通信模块使用Netty实现,之所以选择Netty,有两个主要原因:

本文以入门实践为主,通过原理+代码的方式,实现一个简易IM聊天功能。分为两个部分:Netty的核心概念、IM聊天简易实现。
既然是网络通信,那肯定有服务端和客户端。在客户端-A和客户端-B通信的过程中,实际上是利用服务端作为消息中转站,来实现A-B通信的。
不管是点-点通信,还是群通信,都可以认为是客户端-服务端之间的通信,有了这一点,许多设计方案都可以轻松理解。
(1) Boss线程:Boss线程负责监听端口,接受新的连接,监听连接的数据读写变化。
(2) Worker线程:Worker线程负责处理具体的业务逻辑,Boss线程接收到连接的读写变化后,然后交给Worker处理具体业务逻辑。
(3) 服务端的IO模型:Netty支持使用NIO和BIO进行通信,可以自行设置。一般使用NioServerSocketChannel来指定NIO模型。
(4) 服务端引导类:服务端通过引导类 ServerBootstrap来启动一系列的工作。
(1) Worker线程:客户端只有工作线程的概念,负责连接到服务端,监听数据读写变化。
(2) 客户端的IO模型:一般使用NioSocketChannel指定客户端的NIO模型
(3) 客户端引导类:客户端通过引导类Bootstrap来启动一些列工作。
(1) Handler:负责处理接受到的消息,大部分的业务逻辑都是放在Handler里处理。自定义的Handler一般继承于SimpleChannelInboundHandler或者ChannelInboundHandlerAdapter。
(2) ByteBuf和编码、解码:数据的载体,Java对象编码成字节码,存放于ByteBuf,然后发送出去。服务端接收到消息后,从ByteBuf中取出数据,解码成Java对象。
(3) 通讯协议:许多框架都会自定义一套自己的协议,这样比较符合业务。比如dubbo协议、hessian协议。
一般的协议包括如下部分:魔数、版本号、序列化算法、指令、数据长度、数据内容,其余的都是为了适配自身业务而定的。
(4) 粘包拆包
Netty属于上层应用,在发送消息时,还是通过底层操作系统将数据发送出去,操作系统在发送数据时,不会按照我们设想的消息长度去发送内容。这就需要我们在接收到内容时,自行做好内容的分割和等待。
比如有一条消息1024字节,如果接受的内容没这么长就需要继续等待,等这条消息的内容完整后,在处理。如果接受的内容包含了1条完整消息和1条不完整的消息,那么就需要拆分内容,将完整的消息先传递到后面处理,剩下不完整的消息则继续等待下一个内容。
Netty自带了几种拆包器:固定长度的拆包器 FixedLengthFrameDecoder、行拆包器 LineBasedFrameDecoder、分隔符拆包器 DelimiterBasedFrameDecoder、长度域拆包器LengthFieldBasedFrameDecoder。
一般在使用自定义协议时,会使用:长度域拆包器 LengthFieldBasedFrameDecoder。
(5) 空闲检测和定时心跳
在服务端和客户端的通信过程中,有时候会出现假死连接,或者长时间没有消息传递需要释放连接。对于这些连接,我们需要及时释放,毕竟每条连接都占用着CPU和内存资源。大量这种连接如果不及时释放,服务器资源迟早会耗尽,最终崩溃。
应对这种问题的解决方式是:Netty提供了IdleStateHandler做空闲检测,用来检测连接是否活跃,如果再指定的时间内,没有活跃,那么就关闭连接。然后就是客户端定时发送心跳请求,服务器响应心跳请求。
介绍完Netty的核心概念,接下来以一个简易的点对点IM聊天,将核心概念融入到案例中。IM聊天的核心模块大致是如下几个:
通信主体流程就是搭建好:服务端、客户端、两端正常建立连接进行通信。
服务端代码:
public static void main(String[] args) {
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    NioEventLoopGroup boss = new NioEventLoopGroup();
    NioEventLoopGroup worker = new NioEventLoopGroup();
    serverBootstrap
            .group(boss, worker)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer() {
                protected void initChannel(NioSocketChannel ch) {
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new SimpleChannelInboundHandler() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                            System.out.println("server accept: " + msg);
                        }
                    });
                }
            });
    serverBootstrap.bind(9000)
            .addListener(future -> {
                if (future.isSuccess()) {
                    System.out.println("端口9000绑定成功");
                } else {
                    System.err.println("端口9000绑定失败");
                }
            });
}  客户端代码:
public static void main(String[] args) throws InterruptedException {
    Bootstrap bootstrap = new Bootstrap();
    NioEventLoopGroup group = new NioEventLoopGroup();
    bootstrap.group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer() {
                @Override
                protected void initChannel(Channel ch) {
                    ch.pipeline().addLast(new StringEncoder());
                }
            });
    bootstrap.connect("127.0.0.1", 9000)
            .addListener(future -> {
                if (future.isSuccess()) {
                    System.out.println("链接服务端成功");
                    Channel channel = ((ChannelFuture) future).channel();
                    channel.writeAndFlush("我是客户端A");
                } else {
                    System.err.println("连接服务端失败");
                }
            });
} 定义数据包的抽象类,后续的各种类型的数据包都继承此类。数据包中定义通讯协议的各种字段。
@Data
public abstract class Packet {
    /**
     * 协议版本
     */
    private Byte version = 1;
    /**
     * 指令,此处有多种实现:比如登录、登出、单聊、建群等等
     *
     * @return
     */
    public abstract Byte getCommand();
    /**
     * 获取算法,默认使用JSON,如果使用其余算法,子类重写此方法
     *
     * @return
     */
    public Byte getSerializeAlgorithm() {
        return SerializerAlgorithm.JSON;
    }
}
public class LoginRequestPacket extends Packet {
    private String userName;
    private String password;
    @Override
    public Byte getCommand() {
        return Command.LOGIN_REQUEST;
    }
}定义序列化器,功能包括:序列化、反序列化。可以定义多种序列化算法,文中以JSON为例。
public interface Serializer {
    /**
     * 序列化算法
     *
     * @return
     */
    byte getSerializerAlgorithm();
    /**
     * java 对象转换成二进制
     */
    byte[] serialize(Object object);
    /**
     * 二进制转换成 java 对象
     */
     T deserialize(Class clazz, byte[] bytes);
}
public class JSONSerializer implements Serializer {
    @Override
    public byte getSerializerAlgorithm() {
        return SerializerAlgorithm.JSON;
    }
    @Override
    public byte[] serialize(Object object) {
        return JSON.toJSONBytes(object);
    }
    @Override
    public  T deserialize(Class clazz, byte[] bytes) {
        return JSON.parseObject(bytes, clazz);
    }
}    有了通讯协议、有了序列化协议,接下来就是对数据的编码和解码了。
public void encode(ByteBuf byteBuf, Packet packet) {
    Serializer serializer = getSerializer(packet.getSerializeAlgorithm());
    // 1. 序列化 java 对象
    byte[] bytes = serializer.serialize(packet);
    // 2. 实际编码过程
    byteBuf.writeInt(MAGIC_NUMBER);
    byteBuf.writeByte(packet.getVersion());
    byteBuf.writeByte(packet.getSerializeAlgorithm());
    byteBuf.writeByte(packet.getCommand());
    byteBuf.writeInt(bytes.length);
    byteBuf.writeBytes(bytes);
}
public Packet decode(ByteBuf byteBuf) {
    // 跳过 magic number
    byteBuf.skipBytes(4);
    // 跳过版本号
    byteBuf.skipBytes(1);
    // 读取序列化算法
    byte serializeAlgorithm = byteBuf.readByte();
    // 读取指令
    byte command = byteBuf.readByte();
    // 读取数据包长度
    int length = byteBuf.readInt();
    // 读取数据
    byte[] bytes = new byte[length];
    byteBuf.readBytes(bytes);
    Class extends Packet> requestType = getRequestType(command);
    Serializer serializer = getSerializer(serializeAlgorithm);
    if (requestType != null && serializer != null) {
        return serializer.deserialize(requestType, bytes);
    }
    return null;
}以上把通讯的基本架子和收发消息的数据包、协议、编解码器等基础工具已经做完,接下来就是编写Handler实现具体的业务逻辑了。
这里以客户端发起登录功能为例,分3步,消息收发也是类似:
效果如下:
核心代码如下:
bootstrap.connect("127.0.0.1", 9000)
                .addListener(future -> {
                    if (future.isSuccess()) {
                        System.out.println("连接服务端成功");
                        Channel channel = ((ChannelFuture) future).channel();
                        // 连接之后,假设再这里发起各种操作指令,采用异步线程开始发送各种指令,发送数据用到的的channel是必不可少的
                        sendActionCommand(channel);
                    } else {
                        System.err.println("连接服务端失败");
                    }
                });
private static void sendActionCommand(Channel channel) {
        // 直接采用控制台输入的方式,模拟操作指令
        Scanner scanner = new Scanner(System.in);
        LoginActionCommand loginActionCommand = new LoginActionCommand();
        new Thread(() -> {
            loginActionCommand.exec(scanner, channel);
        }).start();
    }protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {
    LoginResponsePacket loginResponsePacket = new LoginResponsePacket();
    loginResponsePacket.setVersion(loginRequestPacket.getVersion());
    loginResponsePacket.setUserName(loginRequestPacket.getUserName());
    if (valid(loginRequestPacket)) {
        loginResponsePacket.setSuccess(true);
        String userId = IDUtil.randomId();
        loginResponsePacket.setUserId(userId);
        System.out.println("[" + loginRequestPacket.getUserName() + "]登录成功");
        SessionUtil.bindSession(new Session(userId, loginRequestPacket.getUserName()), ctx.channel());
    } else {
        loginResponsePacket.setReason("校验失败");
        loginResponsePacket.setSuccess(false);
        System.out.println("登录失败!");
    }
    // 登录响应
    ctx.writeAndFlush(loginResponsePacket);
}
private boolean valid(LoginRequestPacket loginRequestPacket) {
    System.out.println("服务端LoginRequestHandler,正在校验客户端登录请求");
    return true;
}public class LoginResponseHandler extends SimpleChannelInboundHandler {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) {
        String userId = loginResponsePacket.getUserId();
        String userName = loginResponsePacket.getUserName();
        if (loginResponsePacket.isSuccess()) {
            System.out.println("[" + userName + "]登录成功,userId为: " + loginResponsePacket.getUserId());
            SessionUtil.bindSession(new Session(userId, userName), ctx.channel());
        } else {
            System.out.println("[" + userName + "]登录失败,原因为:" + loginResponsePacket.getReason());
        }
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        System.out.println("客户端连接被关闭!");
    }
} 主流程和主要功能已经实现,还剩最后一个空闲检测和定时心跳。
实现步骤:
核心代码:
/**
 * IM聊天空闲检测器
 * 比如:20秒内没有数据,则关闭通道
 */
public class ImIdleStateHandler extends IdleStateHandler {
    private static final int READER_IDLE_TIME = 20;
    public ImIdleStateHandler() {
        super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
    }
    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
        System.out.println(READER_IDLE_TIME + "秒内未读到数据,关闭连接!");
        ctx.channel().close();
    }
}public void channelActive(ChannelHandlerContext ctx) throws Exception {
        scheduleSendHeartBeat(ctx);
        super.channelActive(ctx);
    }
    private void scheduleSendHeartBeat(ChannelHandlerContext ctx) {
        // 此处无需使用scheduleAtFixedRate,因为如果通道失效后,就无需在发起心跳了,按照目前的方式是最好的:成功一次安排一次
        ctx.executor().schedule(() -> {
            if (ctx.channel().isActive()) {
                System.out.println("定时任务发送心跳!");
                ctx.writeAndFlush(new HeartBeatRequestPacket());
                scheduleSendHeartBeat(ctx);
            }
        }, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
    }public class ImIdleStateHandler extends IdleStateHandler {
    private static final int READER_IDLE_TIME = 20;
    public ImIdleStateHandler() {
        super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
    }
    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
        System.out.println(READER_IDLE_TIME + "秒内未读到数据,关闭连接!");
        ctx.channel().close();
    }
}本文介绍了Netty的核心概念,以及基本使用方法,希望能够帮到你。本文核心词:
本文完整代码:https://github.com/yclxiao/netty-demo.git
                网站题目:Netty入门实践:模拟IM聊天
                
                URL分享:http://www.csdahua.cn/qtweb/news0/478150.html
            
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网