Netty官方示例的实现

最近总是听到“RPC框架强无敌”、“到处都在用RPC”这种言论,打算了解下是怎么回事。所以捡起好久不碰的Netty用一用,根据官方示例做一遍。

参考:http://netty.io/wiki/user-guide-for-5.x.html

 

一、编写丢弃服务器

世界上最简单的协议不是“你好,世界!”,而是DISCARD。这是一种丢弃任何接收到的数据而无任何响应的协议。

(1)代码实现

要实现DISCARD协议,唯一需要做的是忽略所有接收的数据。

DiscardServerHandler.java

在Netty中,所有传递过来的msg对象在使用完毕后都应该主动释放(避免占用内存,在本例子中,所有收到的msg对象都立刻使用release()方法释放了),在一般情况下channelRead()方法中应该使用try finally进行释放。

接下来编写服务器的启动类DiscardServer。启动服务器后,传来的请求才能交给DiscardServerHandler(Handler)进行处理。

DiscardServer.java

至此,丢弃服务器的服务端已经编写完成了。运行DiscardServer之后,可以使用telnet链接8080端口,服务端不会有任何响应(所有请求都被Handler丢弃了)。

(2)简单分析

1.NioEventLoopGroup类

存在两个NioEventLoopGroup类对象:

要搞清楚这两个对象有什么作用,可以先看看group()方法:

group()方法的作用为:

Set the EventLoopGroup for the parent (acceptor) and the child (client). These EventLoopGroup’s are used to handle all the events and IO for ServerChannel and Channel’s.

结合官方文档理解,bossGroup是acceptor,负责接收传入的链接。workerGroup是client,一旦acceptor接受连接并将接受的连接注册到工作人员,client就处理接受的连接的流量。我暂时这样理解:“bossGroup负责接收传入的链接,workerGroup负责处理链接中的数据”。

2.childHandler()方法

childHandler()方法的参数为一个重写了initChannel()方法的匿名类。尝试从

进行理解,ChannelInitializer类的作用是对各种Channel进行初始化(Channel我个人理解为请求的处理类,负责对请求进行处理)。SocketChannel表示当前的Channel只处理socket链接。在initChannel()方法中,注册了我之前自定义的DiscardServerHandler对象,之后所有socket请求都由这个DiscardServerHandler对象进行处理。

二、获取收到的数据

我们已经完成了丢弃服务器,但是所有请求都石沉大海,怎么知道服务器能否正常工作呢?为了进行测试,改写一下DiscardServerHandler类(所有对请求都在自定义的Handler类中进行处理)。

DiscardServerHandler.java

这里的while循环可以简化为:

使用telnet链接8080端口,服务端可以收到输入的字符。

三、对请求做出响应

现在服务端可以接收请求了,但是并没有做出任何响应。改写一下DiscardServerHandler类,让服务器对请求做出响应。

(1)代码实现

DiscardServerHandler.java

每次在telnet中输入一个字符(即传输一个字符),服务端就会收到字符,并且返回。看上去就是telnet瞬间出现了两个相同的字符。

(2)输出自定义字符

如果想输出我们自定义的字符,就要写成这样:

把字符串response转成byte的形式,才能通过write()方法返回(需要注意的是ByteBuf 类,专门用于维护byte数组)。

四、编写时间服务器

本节中实现的TIME协议是协议。与前面的示例不同的是,它发送一个包含32位整数的消息,而不接收任何请求,并在发送消息后关闭连接。在此示例中,我们将学习如何构建和发送消息,并在完成时关闭连接。

(1)代码实现

因为我们将忽略任何收到的数据,一旦连接建立就发送消息,所以不能使用channelRead()方法(此方法有一个msg参数,是链接接收的数据。因为我们不需要接收数据,所以不使用channelRead()方法)。相反,我们应该使用(覆盖)channelActive()方法(建立链接即时响应)。

TimeServerHandler.java

写完Handler之后,别忘记在启动类DiscardServer中进行注册(不然Netty怎么知道你要使用TimeServerHandler处理请求呢?)

至此,时间服务器已经编写完成了。接下来我们可以使用linux上的rdate命令尝试对服务端进行访问。

默认情况下,rdate命令需要安装。如果在执行sudo apt-get install rdate时报出

Could not get lock /var/lib/dpkg/lock – open (11 Resource temporarily unavailable)……

错误,那么就是有别的进程占用了apt-get,导致无法获取锁。只要使用ps -aux查看当前所有用户的进程,找到并且kill掉所有_apt开头的进程,就可以进行安装。成功安装rdate后,可以执行(如果linux是虚拟机,host需要填ipv4地址,不能使127.0.0.1):

rdate -o <port> -p <host>

就可以看到服务端的响应:

11

(2)简单分析

1.ChannelFuture

ChannelFuture有点像java并发中的Future,是异步的传输任务(异步线程)。

那么问题来了,如果写成这样:

在ChannelFuture对象的writeAndFlush()方法完成异步传输之前,close()方法随时可能关闭链接,导致传输无法完成。所以我们需要一种“监听器”机制,监听到writeAndFlush()方法完成传输之后,才调用close()方法。

2.ChannelFutureListener

重点查看这段代码:

f是ChannelFuture对象,使用addListener()方法添加了一个监听器(匿名对象ChannelFutureListener),监听writeAndFlush()方法的执行状况。

一旦writeAndFlush()方法执行完毕,就会回调ChannelFutureListener对象中的operationComplete()方法,从而调用close()方法关闭链接。从而保证了“异步传输完毕后才关闭链接”。

五、编写客户端

Netty中的服务器和客户端存在差异,最大的区别是使用不同的Bootstrap和Channel。

(1)代码实现

我们先写一个客户端的Handler,用于处理从服务器传来的数据。

TimeClientHandler.java(类似之前的DiscardServerHandler.java,都是接收数据)

接下来编写客户端的启动类TimeClient。启动客户端之后,传来的请求才能交给TimeClientHandler(Handler)进行处理。

TimeClient.java

最需要注意的是,客户端使用了connect()方法(只需要请求一次),而不是bind()方法(服务端需要持续提供服务)。

至此,客户端已经编写完成了。我们首先打开时间服务器(启动DiscardServer类,如果不事先启动服务器,直接启动客户端,就会报出无法建立链接的错误),然后打开客户端(启动TimeClient.java)。

客户端会根据地址请求和服务器建立链接,一旦成功建立链接,服务器就会把请求交给服务端Handler处理,发送时间数据,并且尝试断开链接。客户端收到请求后,把请求交给客户端的Handler处理,并且尝试断开链接。链接断开后,客户端就自动关闭。

(2)可能出现的问题

客户端的实现看起来不难。但是在TimeClientHandler中,这段代码可能会报错:

错误为:IndexOutOfBoundsException。为什么会发生这种错误?我们在下一节中讨论。

六、处理基于流的传输

我个人理解为“解码”。因为此节内容有点难度,所以只能尝试去理解。

(1)流传输的问题

在TCP/IP的基于流的传输中,接收的数据被存储到套接字接收缓冲区中。不幸的是,基于流的传输的缓冲区不是分组的队列(不是按包进行储存),而是字节的队列(全部字节存到一起)。这意味着,即使将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一组字节。 因此,不能保证读的是您在远程定入的行数据。举个例子,假设操作系统的TCP/IP堆栈已收到三个数据包:

这三个包会被存到缓冲区中,变成一堆字节。应用程序也不知道如何拆分它们,可能会拆分成这样的数据段:

所以,无论是服务器侧还是客户端,都应该对接收到的数据进行碎片整理,将数据整合为“易于程序理解的一个或多个有意义的帧”(即正确拆分成数据段)。在上面的例子中,接收到的数据应该如下所示:

下面将提到两种解决方案:分别为创建内部累积缓冲区和拆分ChannelHandler。

(2)创建内部累积缓冲区

在上面的时间服务器中,我们传输的是一个int,只占用了四个字节(是非常少量的数据),不太可能被拆分到到不同的数据段内。问题是,随着流量加大,缓冲区内的字节将越来越多,拆分错误的几率也会上升。

为了解决此问题,我们可以构造一个内部累积缓冲区,每次读取4个字节(前提是我们已经知道要读取的内容是什么,这里是int,所以就读取4个字节),就能正确地(对缓冲区内的数据)分段。

Handler的功能是处理读取的数据,所以我们改写一下客户端的Handler。

TimeClientHandler.java

所有接收到的数据都会被缓存到缓冲区buf中。然后,处理程序必须检查buf是否有足够的数据(在此示例中为4个字节),当足够时就继续进行实际的业务逻辑。

否则,在有更多数据到达时Netty将再次调用channelRead()方法,直到累积4个字节,再执行下面的代码。

这里最难理解的handlerAdded()方法和handlerRemoved()方法(官方解释为生命周期的侦听器方法?),我暂时没弄清楚这两个方法的作用(只知道删掉会报错)。

至此,“创建内部累积缓冲区”的解决方案可以顺利解决问题。

(3)拆分ChannelHandler

使用“创建内部累积缓冲区”的解决方案,确实可以解决时间客户端的问题。但是我们一定要记得使用前提(协议):已知数据类型、字节长度。

最大的问题是,如果某种协议很复杂(举个例子:传输复数个可变长度的字段,这就无法约定数据类型和字节长度了),“创建内部累积缓冲区”就行不通了。对于这种复杂的情况,我们需要“拆分ChannelHandler”。

因为Netty允许增加多个ChannelHandler到ChannelPipeline,所以我们你可以把一整个ChannelHandler拆分成多个模块,从而减少应用的复杂程度。

举个例子,可以把TimeClientHandler拆分成2个处理器:TimeDecoder类负责处理数据拆分的问题、TimeClientHandler类负责原始版本的实现。

TimeDecoder.java

需要注意以下几点:

1.ByteToMessageDecoder是ChannelInboundHandler的一个实现类,可以让处理数据拆分的问题变得很简单。

2.每次接收新数据,ByteToMessageDecoder都会调用decode()方法处理内部缓冲区。

3.如果decode()方法中增加了一个对象到out列表,这就意味着解码器解码成功。ByteToMessageDecoder将会丢弃缓冲区中已经被读过的数据。

编写完解码器TimeDecoder之后,将其加入到ChannelPipeline中。修改TimeClient类:

(注意:使用“拆分ChannelHandler”的解决方案时,TimeClientHandler不需要做进行任何改动,不需要添加缓冲区)

至此,“拆分ChannelHandler”的解决方案可以顺利解决问题。

七、使用Pojo取代ByteBuf

官网的例子不太好理解,请参考另一篇文章:

使用Netty传输Pojo对象

八、关闭应用

我们可以使用shutdownGracefully()方法关闭EventLoopGroup对象:

shutdownGracefully()方法将返回一个Future对象,告诉我们“EventLoopGroup已经停止,所有属于该组的Channel都已经关闭了”。

九、总结

掌握了Netty的简单使用方法。以后给出更多的使用实例。

发表评论

电子邮件地址不会被公开。 必填项已用*标注