前言

在dubbo接口方法重载且入参未显式指定序列化id导致ClassCastException分析时候用到了dubbo的通信层和编解码,dubbo有个transport层,默认使用netty4进行网络通信,写的非常好,dubbo的netty4可以直接作为基础模块作为我们项目的通信框架。但是由于dubbo要兼容mina、graazz、netty5等网络通信,因此自定定义了一套Channel、ChannelHandler来适配不同的通信框架,我们看到时候容易搞混乱,实际我们只需要netty4而已,因此记录下dubbo provider和consumer的tcp建立监听和连接,方便排除问题,也用于以后使用到netty4的时候,直接可以迁移过去。

dubbo服务端和客户端建立连接记录

此xmind是针对dubbo系列一、dubbo启动流程的补充,增加了dubbo tcp的监听和连接,如下图 dubbo netty客户端&服务端启动流程,有了此图,以后遗忘了,也很容易回顾起来。

xmind下载地址

dubbo服务端

功能:监听tcp端口,创建NettyServer对象,此对象表示tcp 服务端,持有dubbo channelhandler chain、客户端连接的dubbo channel、监听端口绑定的netty channel。NettyServer也是个ChannelHandler。

服务端netty pipeline 【HeadContext InternalDecoder InternalEncoder NettyServerHandler TailContext】

接收客户端请求,触发pipeline的channelRead事件,执行顺序 HeadContext->InternalDecoder->InternalEncoder->NettyServerHandler->TailContext,仅执行netty pipeline inboud事件

响应客户端请求,触发pipeline的write事件,执行顺序 TailContext->NettyServerHandler->InternalEncoder->InternalDecoder->HeadContext,仅执行netty pipeline outboud事件

其中InternalDecoder/InternalEncoder分别用作dubbo协议解码/编码

重要的是NettyServerHandler,是个inboud&outbound,持有dubbo channelhandler chain(即NettyServer,而NettyServer又持有dubbo channelhandler chain),维护的客户端连接channel集合,而dubbo channel(即NettyChannel)又持有netty channel,在读取请求时候,经过解码后,把netty channel封装为dubbo channel(即NettyChannel),然后由dubbo channelhandler chain链式处理dubbo channel,这样针对不同的通信进行了统一封装,最后由dubbo channel chain调用最终的目标对象。那么我们扩展的话,也只是扩展dubbo channel chain而已。

这里的dubbo channelhandler chain是[NettyServer->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->DubboProtocol$1],由NettyServerHandler触发,每个channehandler职责不同,最终由 channelhandler DubboProtocol$1进行调用目标方法,执行业务逻辑。

dubbo客户端

即netty client,连接服务端,和服务端保持长连接,对象是NettyClient,同时也是个ChannelHandler,持有netty channel,持有dubbo channelhandler chain【MultiMessageHandler->HeartbeatHandler->AllDispatcher->DecodeHandler->HeaderExchangeHandler->DubboProtocol$1】。

设计方式和nett server基本相同,也是通过netty pipeline触发dubbo channel chain。

client 端netty pipeline 【HeadContext->InternalDecoder->InternalEncoder->NettyClientHandler->TailContext】。

请求服务端,触发pipeline的write事件,执行顺序 TailContext->NettyClientHandler->InternalEncoder->InternalDecoder->HeadContext,仅执行netty pipeline outboud事件。

接收服务端响应,触发pipeline的channelRead事件,执行顺序 HeadContext->InternalDecoder->InternalEncoder->NettyClientHandler->TailContext,仅执行netty pipeline inboud事件。

重要的是NettyClientHandler,是个inboud&outbound,持有dubbo channelhandler,即NettyClient(持有dubbo channelhandler chain,netty channel),那么也就持有dubbo channelhandler chain和netty channel,NettyClientHandler.handler即NettyClient->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->DubboProtocol$1

维护的客户端连接channel集合,而dubbo channel(即NettyChannel)又持有netty channel,在读取请求时候,经过解码后,把netty channel封装为dubbo channel(即NettyChannel),然后由dubbo channelhandler chain链式处理dubbo channel,这样针对不同的通信进行了统一封装,最后由dubbo channel chain调用最终的目标对象。那么我们扩展的话,也只是扩展dubbo channel chain而已。

这里的dubbo channelhandler chain是[NettyClient->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->DubboProtocol$1],由NettyClientHandler触发,每个channehandler职责不同,最终由 channelhandler DubboProtocol$1进行调用目标方法,执行业务逻辑。

dubbo请求通信层完整流程

总结了下dubbo从客户端到服务端的整个流程,包含IO线程和业务线程的切换,netty的执行,见下图

图中的是IO线程和业务线程切换处,发生在netty pipeline的执行中,下面详细说下

:客户端发送数据,这里触发执行netty pipeline的outbound事件,执行顺序TailContext->NettyClientHandler->InternalEncoder->InternalDecoder->HeadContext,其中在TailContext#writeAndFlush,封装余下pipeline及构造WriteAndFlushTask并添加到IO线程(netty work线程)的队列taskQueue,然后业务线程发送执行完毕。同时由于IO线程对象NioEventLoop自旋,执行runAllTasks操作从队列taskQueue取出WriteAndFlushTask任务执行,先执行write,再执行flush,因此要对应pipeline执行write & flush操作

即NettyClientHandler->InternalEncoder->InternalDecoder->HeadContext 先执行write操作,再执行flush操作

最后在HeadContext的write和flush操作内调用unsafe对象即NioSocketChannel$NioSocketChannelUnsafe(AbstractChannel.AbstractUnsafe).write(Object msg, ChannelPromise promise) & flush(),其中write是把数据写到缓冲区,flush是发送数据到网卡

其中在执行NettyClientHandler的write操作时候,还会执行Dubbo channelHandler chain的sent操作,即NettyClient->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->DubboProtocol$1的sent操作,钩子扩展,这里并没有实际功能。

NettyClientHandler的个netty channelhandler,持有了dubbo channelhandler chain【NettyClient->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->DubboProtocol$1】,即钩子作用。

:服务端接收到客户端数据,IO线程对象NioEventLoop自旋,执行processSelectedKeys操作,触发netty pipeline的channelRead事件,执行顺序HeadContext->InternalDecoder->InternalEncoder->NettyServerHandler->TailContext,先解码,再通过NettyServerHandler把解码结果提交给业务线程池处理,提交任务ChannelEventRunnable到业务线程池是由AllChannelHandler实现。

NettyServerHandler是netty channelhandler,持有了dubbo channelhandler chain【NettyServer->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->DubboProtocol$1】,

:服务端响应客户端,和基本相同,也是在TailContext#writeAndFlush进行线程切换,由业务线程切换到IO线程。这里触发的netty pipeline outbound事件,执行顺序【TailContext->NettyServerHandler->InternalEncoder->InternalDecoder->HeadContext】,其中在NettyServerHandler内触发dubbo channelhandler chain的sent操作,实际并无具体功能。

:客户端接收服务端响应,和基本相同。netty pipeline执行inboud事件,执行顺【HeadContext->InternalDecoder->InternalEncoder->NettyClientHandler->TailContext】,先解码,再通过NettyClientHandler把解码结果提交给业务线程池处理,提交任务ChannelEventRunnable到业务线程池是由AllChannelHandler实现。

注意:从上面看出,dubbo channelhandler chain是一部分在IO线程执行,一部分是被封装到ChannelEventRunnable在业务线程执行。dubbo channelhandler是职责链,不同handler功能不同。

总结这个流程图是为了更好的体会dubbo的整个请求流程,遇到问题参考此图就很容易找到问题,还有就是理解dubbo对于netty的使用,从这个总结中也看出了netty通信开发的固定的套路,1.增加netty channelhandler进行编解码,2.增加自定义netty channelhandler进行IO线程和业务线程的channelhandler就可以了。通常编解码在IO线程执行,业务在业务线程池执行,通信上需要注意到粘包和拆包的处理。

dubbo动态代理类实际例子,方便遇到问题有实例参考分析

//com.alibaba.dubbo.common.bytecode.Wrapper2
package com.alibaba.dubbo.common.bytecode; import com.alibaba.dubbo.common.bytecode.ClassGenerator;
import com.alibaba.dubbo.common.bytecode.NoSuchMethodException;
import com.alibaba.dubbo.common.bytecode.NoSuchPropertyException;
import com.alibaba.dubbo.common.bytecode.Wrapper;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import org.pangu.api.ProductService;
import org.pangu.dto.ProductDTO; public class Wrapper2 extends Wrapper implements ClassGenerator.DC {
public static String[] pns;//PropertyNames
public static Map pts;//Propertys
public static String[] mns;//MethodNames
public static String[] dmns;//DeclaredMethodNames
public static Class[] mts0;
public static Class[] mts1;
public static Class[] mts2; @Override
public String[] getPropertyNames() {
return pns;
} @Override
public boolean hasProperty(String name) {
return pts.containsKey(name);
} public Class getPropertyType(String name) {
return (Class) pts.get(name);
} @Override
public String[] getMethodNames() {
return mns;
} @Override
public String[] getDeclaredMethodNames() {
return dmns;
} @Override
public void setPropertyValue(Object object, String name, Object object2) {
try {
ProductService productService = (ProductService) object;
} catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(name)
.append("\" filed or setter method in class org.pangu.api.ProductService.").toString());
} @Override
public Object getPropertyValue(Object object, String name) {
try {
ProductService productService = (ProductService) object;
} catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(name)
.append("\" filed or setter method in class org.pangu.api.ProductService.").toString());
} public Object invokeMethod(Object object, String name, Class[] classArray, Object[] objectArray)
throws InvocationTargetException {
ProductService productService;
try {
productService = (ProductService) object;
} catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
try {
if ("findProduct".equals(name) && classArray.length == 1
&& classArray[0].getName().equals("java.lang.String")) {
return productService.findProduct((String) objectArray[0]);
}
if ("findProduct".equals(name) && classArray.length == 1
&& classArray[0].getName().equals("org.pangu.dto.ProductDTO")) {
return productService.findProduct((ProductDTO) objectArray[0]);
}
if ("selectProduct".equals(name) && classArray.length == 1) {
return productService.selectProduct((ProductDTO) objectArray[0]);
}
} catch (Throwable throwable) {
throw new InvocationTargetException(throwable);
}
throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(name)
.append("\" in class org.pangu.api.ProductService.").toString());
}
}

dubbo通信重要接口ChannelHandler设计说明

从前面分析可以看出,dubbo也有一套Channel、ChannelHandler,netty也有Channel、ChannelHandler,而且和netty定义的同名。两者区别是什么呢?

netty Channel是对网络通道的抽象,netty ChannelHandler是对Channel的处理器。

我的理解dubbo Channel也是对网络通道的的抽象,持有netty Channel,两者功能在抽象上是一样的,但是dubbo为了兼容mina、Grizzly,因此dubbo抽象的Channel是针对这几种通信框架的通道的抽象,做成一个统一模式,这些具体框架的通信变化并不会影响dubbo通信设计。

dubbo ChannelHandler是针对dubbo Channel的处理器,功能等同netty ChannelHandler。在dubbo中NettyClientHandler、NettyServerHandler分别是客户端和服务端连接netty pipeline和dubbo ChannelHandler chain的桥梁。

dubbo ChannelHandler

这里重要的是InternalEncoder/InternalDecoder、NettyServerHandler/NettyClientHandler

InternalEncoder/InternalDecoder:用于dubbo报文编解码

NettyServerHandler/NettyClientHandler:连接netty和dubbo ChannelHandler的桥梁(分别作用在服务端和客户端),这里dubbo为了灵活扩展(比如支持mina等),增加了和netty同名的 ChannelHandler、Channel,因此看代码的时候显得有些混乱,容易搞蒙圈。

dubbo ChannelHandler接口定义处理5个事件,分别是连接、断开、接收、发送、异常捕捉,是针对Channel的抽象。为什么要这么抽象呢?因为针对dubbo报文解码后有许多灵活变化,比如心跳处理、派发(dispatcher)处理、处理Request/Response、线程池调用业务方法、处理NettyServer/NettyClient事件,这样做是的目标是把netty ChannelHandler解耦,不需要实现大量的netty ChannelHandler来完成工作。只需要一个netty ChannelHandler(比如NettyServerHandler)就可以和dubbo ChannelHandler chain关联起来,方便了dubbo的扩展。从我们上图中分析,涉及到的一些dubbo ChannelHandler 作用如下:

dubbo ChannelHandler 作用
NettyServer 封装netty服务端事件,处理连接、断开、接收、发送、异常等事件
NettyClient 封装netty客户端事件,处理连接、断开、接收、发送、异常等事件
MultiMessageHandler 支持流中多消息报文批处理
HeartbeatHandler 支持心跳处理
AllChannelHandler 支持dubbo线程池调用业务方法
DecodeHandler 支持在dubbo线程池中解码
HeaderExchangeHandler 封装处理Request/Response的调用能力
ExchangeHandlerAdapter 用于查找服务方法并调用

既然有了dubbo ChannelHandler,即Channel处理器,那么自然也要有要处理的对象dubbo Channel

dubbo Channel

在 Dubbo 中会抽象出一个“端点(Endpoint)”的概念,我们可以通过一个 ip 和 port 唯一确定一个端点,两个端点之间会创建 TCP 连接,可以双向传输数据。Dubbo 将 Endpoint 之间的 TCP 连接抽象为通道(Channel),将发起请求的 Endpoint 抽象为客户端(Client),将接收请求的 Endpoint 抽象为服务端(Server)。这些抽象出来的概念,也是整个 dubbo-remoting-api 模块的基础。

Channel 是对两个 Endpoint 连接的抽象,好比连接两个位置的传送带,两个 Endpoint 传输的消息就好比传送带上的货物,消息发送端会往 Channel 写入消息,而接收端会从 Channel 读取消息。

dubbo Channel 我们就可以等同为netty 的Channel ,是个网络通道。那既然有了netty Channel ,为什么还要有个dubbo Channel 呢?由于我们使用dubbo都是使用的netty4,但是dubbo还支持mina、grizzly通信,他们这两种有没有Channel我就不知道了,这样做是为了抽象,做成统一模式,这些通信变化,但是dubbo设计的通信并不会变化。

dubbo Channel 从接口定义来看,具有收发数据能力和附加 KV 属性(即向通道增加一些属性),比如NettyChannel、NettyClient就封装了io.netty.channel.Channel

既然说到这里就得总结dubbo remoting层的重要接口

dubbo remoting层的重要接口

com.alibaba.dubbo.remoting.Endpoint:在 Dubbo 中会抽象出一个“端点(Endpoint)”的概念,我们可以通过一个 ip 和 port 唯一确定一个端点,两个端点之间会创建 TCP 连接,可以双向传输数据。

com.alibaba.dubbo.remoting.Channel:Dubbo 将 Endpoint 之间的 TCP 连接抽象为通道(Channel),Channel有收发消息能力,可以认为等同netty Channel。

com.alibaba.dubbo.remoting.Client:将发起请求的 Endpoint 抽象为客户端(Client)

com.alibaba.dubbo.remoting.Server:将接收请求的 Endpoint 抽象为服务端(Server)

Client、Server分别抽象了客户端和服务端,两者都继承了 Channel、Resetable 等接口,也就是说两者都具备了读写数据能力。Client 和 Server 本身都是 Endpoint,只不过在语义上区分了请求和响应的职责,两者都具备发送的能力,所以都继承了 Endpoint 接口。Client 和 Server 的主要区别是 Client 只能关联一个 Channel,而 Server 可以接收多个 Client 发起的 Channel 连接。所以在 Server 接口中定义了查询 Channel 的相关方法getChannels()/getChannel(InetSocketAddress remoteAddress)

com.alibaba.dubbo.remoting.ChannelHandler:ChannelHandler 是注册在 Channel 上的消息处理器,在 Netty 中也有类似的抽象。有ChannelHandler chain分别承担不同职责,功能灵活强大

com.alibaba.dubbo.remoting.Codec2:编解码的定义,被 @SPI 接口修饰了,表示该接口是一个扩展接口,同时其 encode() 方法和 decode() 方法都被 @Adaptive 注解修饰,也就会生成适配器类,其中会根据 URL 中的 codec 值确定具体的扩展实现类。

com.alibaba.dubbo.remoting.Transporter:Dubbo 在 Client 和 Server 之上又封装了一层Transporter 接口,Transporter 接口上有 @SPI 注解,它是一个扩展接口,默认使用“netty”这个扩展名,@Adaptive 注解的出现表示动态生成适配器类,会先后根据“server”“transporter”的值确定 RemotingServer 的扩展实现类,先后根据“client”“transporter”的值确定 Client 接口的扩展实现。具体有netty、mina、grizzly等不同通信实现。

Transporter 这一层抽象出来的接口,与 Netty 的核心接口是非常相似的。那为什么要单独抽象出 Transporter层,而不是像简易版 RPC 框架那样,直接让上层使用 Netty 呢?

其实这个问题的答案也呼之欲出了,Netty、Mina、Grizzly 这个 NIO 库对外接口和使用方式不一样,如果在上层直接依赖了 Netty 或是 Grizzly,就依赖了具体的 NIO 库实现,而不是依赖一个有传输能力的抽象,后续要切换实现的话,就需要修改依赖和接入的相关代码,非常容易改出 Bug。这也不符合设计模式中的开放-封闭原则。

有了 Transporter 层之后,我们可以通过 Dubbo SPI 修改使用的具体 Transporter 扩展实现,从而切换到不同的 Client 和 RemotingServer 实现,达到底层 NIO 库切换的目的,而且无须修改任何代码。即使有更先进的 NIO 库出现,我们也只需要开发相应的 dubbo-remoting-* 实现模块提供 Transporter、Client、Server 等核心接口的实现,即可接入,完全符合开放-封闭原则。

com.alibaba.dubbo.remoting.Transporters:它不是一个接口,而是门面类,其中封装了 Transporter 对象的创建(通过 Dubbo SPI)以及 ChannelHandler 的处理。在创建 Client 和 Server 的时候,可以指定多个 ChannelHandler 绑定到 Channel 来处理其中传输的数据。Transporters.connect() 方法和 bind() 方法中,会将多个 ChannelHandler 封装成一个 ChannelHandlerDispatcher 对象。

ChannelHandlerDispatcher 也是 ChannelHandler 接口的实现类之一,维护了一个 CopyOnWriteArraySet 集合,它所有的 ChannelHandler 接口实现都会调用其中每个 ChannelHandler 元素的相应方法。另外,ChannelHandlerDispatcher 还提供了增删该 ChannelHandler 集合的相关方法。

到此为止,Dubbo Transport 层的核心接口就介绍完了,这里简单总结一下:

简单总结一下:

  • Endpoint 接口抽象了“端点”的概念,这是所有抽象接口的基础。

  • 上层使用方会通过 Transporters 门面类获取到 Transporter 的具体扩展实现,然后通过 Transporter 拿到相应的 Client 和 Server 实现,就可以建立(或接收)Channel 与远端进行交互了。

  • 无论是 Client 还是 RemotingServer,都会使用 ChannelHandler 处理 Channel 中传输的数据,其中负责编解码的 ChannelHandler 被抽象出为 Codec2 接口。

重要接口的说明参考,写的很棒,直接拿来了

dubbo系列十一、dubbo transport层记录的


扫描二维码,在手机上阅读!