本文共 6785 字,大约阅读时间需要 22 分钟。
一,首先引入依赖
<dependencies>
<!-- Netty依赖包 https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.25.Final</version> </dependency><dependency>
<groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.22</version> </dependency> </dependencies><build>
<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> <useSystemClassLoader>false</useSystemClassLoader> </configuration> </plugin> </plugins> </build>二,编写服务器端
/**
* 配置服务器的启动代码。最少需要设置服务器绑定的端口,用来监听连接请求。 * */ public class LiuServer {private final int port;
public LiuServer(int port) {
this.port = port; }public static void main(String[] args) throws Exception {
//设置端口值 int port = 8888; //呼叫服务器的 start() 方法 new LiuServer(port).start(); }public void start() throws Exception {
//Netty内部都是通过线程在处理各种数据,EventLoopGroup就是用来管理调度他们的,注册Channel,管理他们的生命周期。 //NioEventLoopGroup是一个处理I/O操作的多线程事件循环 //bossGroup作为boss,接收传入连接 //因为bossGroup仅接收客户端连接,不做复杂的逻辑处理,为了尽可能减少资源的占用,取值越小越好. NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); //workerGroup作为worker,处理boss接收的连接的流量和将接收的连接注册进入这个worke NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { //ServerBootstrap负责建立服务端 //你可以直接使用Channel去建立服务端,但是大多数情况下你无需做这种乏味的事情 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) //指定使用NioServerSocketChannel产生一个Channel用来接收连接 指定NIO的模式 NioServerSocketChannel对应TCP, NioDatagramChannel对应UDP .channel(NioServerSocketChannel.class) //设置 socket 地址使用所选的端口 .localAddress(new InetSocketAddress(port)) //ChannelInitializer用于配置一个新的Channel //用于向你的Channel当中添加ChannelInboundHandler的实现 //添加 EchoServerHandler 到 Channel 的 ChannelPipeline .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { //ChannelPipeline用于存放管理ChannelHandel //ChannelHandler用于处理请求响应的业务逻辑相关代码 // //配置通信数据的处理逻辑, 可以addLast多个 ch.pipeline().addLast( new LiuServerHandler()); } })//对Channel进行一些配置 //注意以下是socket的标准参数 //BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。 //Option是为了NioServerSocketChannel设置的,用来接收传入连接的 设置TCP缓冲区 .option(ChannelOption.SO_BACKLOG, 128) //保持连接 //是否启用心跳保活机制。在双方TCP套接字建立连接后(即都进入ESTABLISHED状态)并且在两个小时左右上层没有任何数据传输的情况下,这套机制才会被激活。 //childOption是用来给父级ServerChannel之下的Channels设置参数的 .childOption(ChannelOption.SO_KEEPALIVE, true); //绑定的服务器;sync 等待服务器关闭 ,也可以在该处再绑定端口。 bind返回future(异步), 加上sync阻塞在获取连接处 ChannelFuture f = b.bind().sync(); System.out.println(LiuServer.class.getName() + " 服务启动开始监听端口:" + f.channel().localAddress()); //sync()会同步等待连接操作结果,用户线程将在此wait(),直到连接操作完成之后,线程被notify(),用户代码继续执行 //closeFuture()当Channel关闭时返回一个ChannelFuture,用于链路检测 f.channel().closeFuture().sync(); } finally { //释放 channel 和 块,直到它被关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
编写服务器端处理类
/**
* 处理服务器端通道 * ChannelInboundHandlerAdapter继承自ChannelHandlerAdapter,实现了ChannelInboundHandler接口 * ChannelInboundHandler接口提供了不同的事件处理方法,可进行重写 * 实现了服务器的业务逻辑,决定了连接创建后和接收到信息后该如何处理 * * @author chenhx * @version DiscardServerHandler.java, v 0.1 2018-07-13 下午 4:05 *///Sharable注解 标识这类的实例之间可以在 channel 里面共享
@ChannelHandler.Sharable public class LiuServerHandler extends ChannelInboundHandlerAdapter { /** * 每个信息入站都会调用 * 接收数据进行处理 * 事件处理程序方法。每当从客户端接收到新数据时,使用该方法来接收客户端的消息。 在此示例中,接收到的消息的类型为ByteBuf。 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; System.out.println("客户端发来消息: " + in.toString(CharsetUtil.UTF_8)); //ChannelHandlerContext提供各种不同的操作用于触发不同的I/O时间和操作 //调用write方法来逐字返回接收到的信息 //这里我们不需要调用释放,因为Netty会在写的时候自动释放 //只调用write是不会释放的,它会缓存,直到调用flush //将所接收的消息返回给发送者。注意,这还没有冲刷数据 ctx.write(in); }/**
* 当前读操作读取的最后一个消息被channelRead()方法消费时调用. 如果ChannelOption.AUTO_READ 属性被设置为off, * 不会再尝试从当前channel中读取inbound数据, 直到ChannelHandlerContext.read()方法被调用. * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) { System.out.println("channel 通道读取完成"); //冲刷所有待审消息到远程节点。关闭通道后,操作完成 //第一种写法 写一个空的buf,并刷新写出区域。完成后关闭sock channel连接。 ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); //ctx.flush(); // 第二种方法:在client端关闭channel连接,这样的话,会触发两次channelReadComplete方法。 //ctx.flush().close().sync(); // 第三种:改成这种写法也可以,但是这中写法,没有第一种方法的好。 }/**
* 读操作时捕获到异常时调用 * * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { //打印异常堆栈跟踪 cause.printStackTrace(); //关闭通道 ctx.close(); } }三,编写客户端类
/**
* 通过host和port连接服务器。 * */ public class LiuClient {private final String host;
private final int port;public LiuClient(String host, int port) {
this.host = host; this.port = port; }public static void main(String[] args) throws Exception {
final String host = "127.0.0.1"; //端口与服务器端匹配 final int port = 8888;new LiuClient(host, port).start();
}public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); //指定 EventLoopGroup 来处理客户端事件。由于我们使用 NIO 传输,所以用到了 NioEventLoopGroup 的实现 b.group(group) .channel(NioSocketChannel.class) //设置服务器的地址和端口 .remoteAddress(new InetSocketAddress(host, port)) //当建立一个连接和一个新的通道时,创建添加到 EchoClientHandler 实例 到 channel pipeline .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new LiuClientHandler()); } });//连接到远程;等待连接完成 也可以在这里设置服务器地址和端口
ChannelFuture f = b.connect().sync(); //阻塞直到 Channel 关闭 f.channel().closeFuture().sync(); } finally { //调用 shutdownGracefully() 来关闭线程池和释放所有资源 group.shutdownGracefully().sync(); } } }客户端处理类
@ChannelHandler.Sharable
public class LiuClientHandler extends SimpleChannelInboundHandler<ByteBuf> { /** * 服务器的连接被建立后调用 * 建立连接后该 channelActive() 方法被调用一次 * * @param ctx */ @Override public void channelActive(ChannelHandlerContext ctx) { //当被通知该 channel 是活动的时候就发送信息 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Netty! " + Long.toString(System.currentTimeMillis()), CharsetUtil.UTF_8)); }/**
* 从服务器接收到数据调用 * * @param ctx * @param in */ @Override public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { System.out.println("服务器发来消息: " + in.toString(CharsetUtil.UTF_8)); }/**
* 捕获异常时调用 * * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { //记录错误日志并关闭 channel cause.printStackTrace(); ctx.close(); } }转载地址:http://mton.baihongyu.com/