我們先來找到通訊協議的入口點吧。透過Protocol介面查詢通訊協議入口點,我們根據介面的export方法搜尋發現入口了,在ServiceConfig的doExportUrlsFor1Protocol方法,如下圖:

Java學習筆記——dubbo服務之底層通訊協議Protocol

然後我們進入 protocol。export(invoker)方法發現有很多實現類,根據spi(不懂的請看之前寫的容器篇)檢視配置檔案能找到如下

Java學習筆記——dubbo服務之底層通訊協議Protocol

registry=com。alibaba。dubbo。registry。integration。RegistryProtocol

dubbo=com。alibaba。dubbo。rpc。protocol。dubbo。DubboProtocol //這個是預設的,我們在Protocol介面上可以看到spi的註解

filter=com。alibaba。dubbo。rpc。protocol。ProtocolFilterWrapper

listener=com。alibaba。dubbo。rpc。protocol。ProtocolListenerWrapper

mock=com。alibaba。dubbo。rpc。support。MockProtocol

injvm=com。alibaba。dubbo。rpc。protocol。injvm。InjvmProtocol

rmi=com。alibaba。dubbo。rpc。protocol。rmi。RmiProtocol

hessian=com。alibaba。dubbo。rpc。protocol。hessian。HessianProtocol

com。alibaba。dubbo。rpc。protocol。http。HttpProtocol

com。alibaba。dubbo。rpc。protocol。webservice。WebServiceProtocol

thrift=com。alibaba。dubbo。rpc。protocol。thrift。ThriftProtocol

memcached=com。alibaba。dubbo。rpc。protocol。memcached。MemcachedProtocol

redis=com。alibaba。dubbo。rpc。protocol。redis。RedisProtocol

rest=com。alibaba。dubbo。rpc。protocol。rest。RestProtocol

進入DubboProtocol。export(Invoker invoker)方法裡面有個 openServer(url);

程式碼:

private void openServer(URL url) {

// find server。

String key = url。getAddress();

//client 也可以暴露一個只有server可以呼叫的服務。

boolean isServer = url。getParameter(Constants。IS_SERVER_KEY,true);

if (isServer) {

ExchangeServer server = serverMap。get(key);

if (server == null) {

serverMap。put(key, createServer(url)); //createServer是建立服務

} else {

//server支援reset,配合override功能使用

server。reset(url);

}

}

}

繼續進入createServer,上原始碼

private ExchangeServer createServer(URL url) {

//預設開啟server關閉時傳送readonly事件

url = url。addParameterIfAbsent(Constants。CHANNEL_READONLYEVENT_SENT_KEY, Boolean。TRUE。toString());

//預設開啟heartbeat

url = url。addParameterIfAbsent(Constants。HEARTBEAT_KEY, String。valueOf(Constants。DEFAULT_HEARTBEAT));

String str = url。getParameter(Constants。SERVER_KEY, Constants。DEFAULT_REMOTING_SERVER);

if (str != null && str。length() > 0 && ! ExtensionLoader。getExtensionLoader(Transporter。class)。hasExtension(str))

throw new RpcException(“Unsupported server type: ” + str + “, url: ” + url);

url = url。addParameter(Constants。CODEC_KEY, Version。isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec。NAME);

ExchangeServer server;

try {

server = Exchangers。bind(url, requestHandler);

} catch (RemotingException e) {

throw new RpcException(“Fail to start server(url: ” + url + “) ” + e。getMessage(), e);

}

str = url。getParameter(Constants。CLIENT_KEY);

if (str != null && str。length() > 0) {

Set supportedTypes = ExtensionLoader。getExtensionLoader(Transporter。class)。getSupportedExtensions();

if (!supportedTypes。contains(str)) {

throw new RpcException(“Unsupported client type: ” + str);

}

}

return server;

}

dubbo從要暴漏的服務的URL中取得相關的配置(host,port等)進行服務端server的建立,同上面的server = Exchangers。bind(url, requestHandler) 正式建立服務。

所以基本的建立步驟是

export() ——> openServer() ——> createServer() ——> server = Exchangers。bind(url, requestHandler);

我們進行來看 Exchangers。bind(url, requestHandler)

原始碼:

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {

if (url == null) {

throw new IllegalArgumentException(“url == null”);

}

if (handler == null) {

throw new IllegalArgumentException(“handler == null”);

}

url = url。addParameterIfAbsent(Constants。CODEC_KEY, “exchange”);

return getExchanger(url)。bind(url, handler);

}

然後透過getExchanger(url)。bind(url, handler)的bing進入 HeaderExchanger類

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {

return new HeaderExchangeServer(Transporters。bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));

}

在進入Transporters類的bing的

public static Server bind(URL url, ChannelHandler。。。 handlers) throws RemotingException {

if (url == null) {

throw new IllegalArgumentException(“url == null”);

}

if (handlers == null || handlers。length == 0) {

throw new IllegalArgumentException(“handlers == null”);

}

ChannelHandler handler;

if (handlers。length == 1) {

handler = handlers[0];

} else {

handler = new ChannelHandlerDispatcher(handlers);

}

return getTransporter()。bind(url, handler);

}

Java學習筆記——dubbo服務之底層通訊協議Protocol

透過bing可以知道他講呼叫:GrizzlyTransporter,MinaTransporter,NettyTransporter 透過spi預設是呼叫NettyTransporter

到這裡我們基本明白dubbo的通訊預設是交給了netty來處理,

我們在看下doOPen方法

@Override

protected void doOpen() throws Throwable {

NettyHelper。setNettyLoggerFactory();

ExecutorService boss = Executors。newCachedThreadPool(new NamedThreadFactory(“NettyServerBoss”, true));

ExecutorService worker = Executors。newCachedThreadPool(new NamedThreadFactory(“NettyServerWorker”, true));

ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl()。getPositiveParameter(Constants。IO_THREADS_KEY, Constants。DEFAULT_IO_THREADS));

bootstrap = new ServerBootstrap(channelFactory);

final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);

channels = nettyHandler。getChannels();

// https://issues。jboss。org/browse/NETTY-365

// https://issues。jboss。org/browse/NETTY-379

// final Timer timer = new HashedWheelTimer(new NamedThreadFactory(“NettyIdleTimer”, true));

bootstrap。setPipelineFactory(new ChannelPipelineFactory() {

public ChannelPipeline getPipeline() {

NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer。this);

ChannelPipeline pipeline = Channels。pipeline();

/*int idleTimeout = getIdleTimeout();

if (idleTimeout > 10000) {

pipeline。addLast(“timer”, new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));

}*/

pipeline。addLast(“decoder”, adapter。getDecoder());//解碼

pipeline。addLast(“encoder”, adapter。getEncoder());//編碼

pipeline。addLast(“handler”, nettyHandler);

return pipeline;

}

});

// bind

channel = bootstrap。bind(getBindAddress());

}

瞭解netty的同學,肯定早已習慣這個方法的寫法,就是建立了netty的server嘛,到這裡dubbo的服務建立完畢了,這個時候控制檯見列印:

[DUBBO] Start NettyServer bind /0。0。0。0:20880, export /192。168。4。241:20880, dubbo version: 2。8。4, current host: 127。0。0。1