我們先來找到通訊協議的入口點吧。透過Protocol介面查詢通訊協議入口點,我們根據介面的export方法搜尋發現入口了,在ServiceConfig的doExportUrlsFor1Protocol方法,如下圖:
然後我們進入 protocol。export(invoker)方法發現有很多實現類,根據spi(不懂的請看之前寫的容器篇)檢視配置檔案能找到如下
registry=com。alibaba。dubbo。registry。integration。RegistryProtocol
dubbo=com。alibaba。dubbo。rpc。protocol。dubbo。DubboProtocol //這個是預設的,我們在Protocol介面上可以看到spi的註解
filter=com。alibaba。dubbo。rpc。protocol。ProtocolFilterWrapper
listener=com。alibaba。dubbo。rpc。protocol。ProtocolListenerWrapper
mock=com。alibaba。dubbo。rpc。support。MockProtocol
injvm=com。alibaba。dubbo。rpc。protocol。injvm。InjvmProtocol
rmi=com。alibaba。dubbo。rpc。protocol。rmi。RmiProtocol
hessian=com。alibaba。dubbo。rpc。protocol。hessian。HessianProtocol
com。alibaba。dubbo。rpc。protocol。http。HttpProtocol
com。alibaba。dubbo。rpc。protocol。webservice。WebServiceProtocol
thrift=com。alibaba。dubbo。rpc。protocol。thrift。ThriftProtocol
memcached=com。alibaba。dubbo。rpc。protocol。memcached。MemcachedProtocol
redis=com。alibaba。dubbo。rpc。protocol。redis。RedisProtocol
rest=com。alibaba。dubbo。rpc。protocol。rest。RestProtocol
進入DubboProtocol。export(Invoker
程式碼:
private void openServer(URL url) {
// find server。
String key = url。getAddress();
//client 也可以暴露一個只有server可以呼叫的服務。
boolean isServer = url。getParameter(Constants。IS_SERVER_KEY,true);
if (isServer) {
ExchangeServer server = serverMap。get(key);
if (server == null) {
serverMap。put(key, createServer(url)); //createServer是建立服務
} else {
//server支援reset,配合override功能使用
server。reset(url);
}
}
}
繼續進入createServer,上原始碼
private ExchangeServer createServer(URL url) {
//預設開啟server關閉時傳送readonly事件
url = url。addParameterIfAbsent(Constants。CHANNEL_READONLYEVENT_SENT_KEY, Boolean。TRUE。toString());
//預設開啟heartbeat
url = url。addParameterIfAbsent(Constants。HEARTBEAT_KEY, String。valueOf(Constants。DEFAULT_HEARTBEAT));
String str = url。getParameter(Constants。SERVER_KEY, Constants。DEFAULT_REMOTING_SERVER);
if (str != null && str。length() > 0 && ! ExtensionLoader。getExtensionLoader(Transporter。class)。hasExtension(str))
throw new RpcException(“Unsupported server type: ” + str + “, url: ” + url);
url = url。addParameter(Constants。CODEC_KEY, Version。isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec。NAME);
ExchangeServer server;
try {
server = Exchangers。bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException(“Fail to start server(url: ” + url + “) ” + e。getMessage(), e);
}
str = url。getParameter(Constants。CLIENT_KEY);
if (str != null && str。length() > 0) {
Set
if (!supportedTypes。contains(str)) {
throw new RpcException(“Unsupported client type: ” + str);
}
}
return server;
}
dubbo從要暴漏的服務的URL中取得相關的配置(host,port等)進行服務端server的建立,同上面的server = Exchangers。bind(url, requestHandler) 正式建立服務。
所以基本的建立步驟是
export() ——> openServer() ——> createServer() ——> server = Exchangers。bind(url, requestHandler);
我們進行來看 Exchangers。bind(url, requestHandler)
原始碼:
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException(“url == null”);
}
if (handler == null) {
throw new IllegalArgumentException(“handler == null”);
}
url = url。addParameterIfAbsent(Constants。CODEC_KEY, “exchange”);
return getExchanger(url)。bind(url, handler);
}
然後透過getExchanger(url)。bind(url, handler)的bing進入 HeaderExchanger類
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters。bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
在進入Transporters類的bing的
public static Server bind(URL url, ChannelHandler。。。 handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException(“url == null”);
}
if (handlers == null || handlers。length == 0) {
throw new IllegalArgumentException(“handlers == null”);
}
ChannelHandler handler;
if (handlers。length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter()。bind(url, handler);
}
透過bing可以知道他講呼叫:GrizzlyTransporter,MinaTransporter,NettyTransporter 透過spi預設是呼叫NettyTransporter
到這裡我們基本明白dubbo的通訊預設是交給了netty來處理,
我們在看下doOPen方法
@Override
protected void doOpen() throws Throwable {
NettyHelper。setNettyLoggerFactory();
ExecutorService boss = Executors。newCachedThreadPool(new NamedThreadFactory(“NettyServerBoss”, true));
ExecutorService worker = Executors。newCachedThreadPool(new NamedThreadFactory(“NettyServerWorker”, true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl()。getPositiveParameter(Constants。IO_THREADS_KEY, Constants。DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler。getChannels();
// https://issues。jboss。org/browse/NETTY-365
// https://issues。jboss。org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory(“NettyIdleTimer”, true));
bootstrap。setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer。this);
ChannelPipeline pipeline = Channels。pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline。addLast(“timer”, new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline。addLast(“decoder”, adapter。getDecoder());//解碼
pipeline。addLast(“encoder”, adapter。getEncoder());//編碼
pipeline。addLast(“handler”, nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap。bind(getBindAddress());
}
瞭解netty的同學,肯定早已習慣這個方法的寫法,就是建立了netty的server嘛,到這裡dubbo的服務建立完畢了,這個時候控制檯見列印:
[DUBBO] Start NettyServer bind /0。0。0。0:20880, export /192。168。4。241:20880, dubbo version: 2。8。4, current host: 127。0。0。1