一起学RPC(一)

2018-08-08
一起读源码

在上一篇中废话了很多“大概”不相关的东西。而这篇就要认认真真的开始讲干货了(也有可能是水货,谁知道呢)。

上一篇文章主要介绍了与rpc中间件相关的但联系不是很大的spring xml标签的自定义实现。可以说是没有太多核心的东西,全文中的关键字就是“抄”。没错,只要有官方文档,什么都能照着抄。实在不行,对着源码的实现也能抄一把。联系到目前的工作中,也是复制粘贴一把梭。不得不说现在的编码要求是越来越低了。

总所周知,rpc顾名思义是远程过程调用,所谓的远程就是不在一个机器上。因此机器与机器之间的可靠通信可以说是rpc的基础设施了。那么本文的重点就是深入剖析这个基础设施的具体实现(的其中一部分,其余的还没认真看)。

在jupiter中,对这些基础设施的设计可算是下了一番功夫的。至少我看明白花了一点时间的。在jupiter的代码组织中,将网络传输这一块单独整成一个模块。很多开源项目也是这样做的,算是中规中矩了。

image

同时,为了以后的拓展,传输层还定义了一个高层次的抽象模块api。然后根据自己的喜好可以自由去切换传输层实现。这里默认只有基于Netty的实现。如果想添加Mina的实现也很容易,添加Mina依赖然后实现api中的接口就行了。

接下来就仔细探索一下基于Netty的服务端的实现细节。

抽象接口

jupiter的服务端层次结构十分简单。继承关系也很清晰。这张图很清晰的描述了继承关系:
image

通过命名可以体现出来这些抽象类或接口的含义。我想写代码的最高境界就是能做到变量名能恰如其分的表达其功用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface Transporter {
/**
* Returns the transport protocol
*/
Protocol protocol();
/**
* 传输层协议.
*/
enum Protocol {
TCP,
DOMAIN // Unix domain socket
}
}

最高层次的接口仅仅只定义了一个方法,返回到底使用的是什么协议。这里可选的只有TCP或者DOMAIN。关于tcp无需多言,但是这个unix domain socket就不是那么常见了。简单来讲就是用于机器内的通信,不是机器间的通信。具体使用场景我问了一下作者feng.jc,他回复了一个词:service mesh.然后就没有然后了。对此咱暂且不管。

接下来就是比较细化的一个接口了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public interface JAcceptor extends Transporter {
/**
* 绑定的地址
*/
SocketAddress localAddress();
/**
* 绑定的端口.
*/
int boundPort();
/**
* Acceptor options [parent, child].
*/
JConfigGroup configGroup();
/**
* 返回rpc处理器
*/
ProviderProcessor processor();
/**
* 设置ProviderProcessor 也就是实际的业务逻辑全部由这个东西处理
*/
void withProcessor(ProviderProcessor processor);
/**
* Start the server and wait until the server socket is closed.
* 默认调用start(true)
*/
void start() throws InterruptedException;
/**
* Start the server.
*/
void start(boolean sync) throws InterruptedException;
/**
* Shutdown the server gracefully.
*/
void shutdownGracefully();
}

这个接口也很清晰简单。符合一般的思路。接下来就是这些接口的抽象实现。

抽象实现

在走读抽象实现逻辑之前,有必要看看如果要直接启动这个transporter该怎么做。

1
2
3
4
public static void main(String[] args) throws InterruptedException {
JAcceptor acceptor = new JNettyTcpAcceptor(9999);
acceptor.start();
}

不得不说是非常简单。但是背后的工作可谓是非常多。

JNettyTcpAcceptor是最底层的实现类。在实例化的时候会传入参数端口号,这点无可厚非毫无争议。不传也是可以的,因为构造器有重载,会传入默认端口号18090。而实际上是去调用的父类的构造器。父类构造器的重载方法很多,就贴出一个全参数的重载实现,其余的请自行脑补。

1
2
3
4
5
public NettyTcpAcceptor(SocketAddress localAddress, int nBosses, int nWorkers, boolean isNative) {
super(Protocol.TCP, localAddress, nBosses, nWorkers);
this.isNative = isNative;
init();
}

然而恶心心的是这个构造器也去调用父类的构造函数。对于聪明的人来说这都不是事儿。

1
2
3
4
5
6
public NettyAcceptor(Protocol protocol, SocketAddress localAddress, int nBosses, int nWorkers) {
this.protocol = protocol;
this.localAddress = localAddress;
this.nBosses = nBosses;
this.nWorkers = nWorkers;
}

值得一提的仅仅只有后面两个参数。顾名思义代表的是boss的线程数和worker的线程数。如果对netty很熟悉这点就不需要解释太多。然后就是init()方法了。这个init方法的核心实现实际上是在顶层父类中完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected void init() {
ThreadFactory bossFactory = bossThreadFactory("jupiter.acceptor.boss");
ThreadFactory workerFactory = workerThreadFactory("jupiter.acceptor.worker");
boss = initEventLoopGroup(nBosses, bossFactory);
worker = initEventLoopGroup(nWorkers, workerFactory);
bootstrap = new ServerBootstrap().group(boss, worker);
// parent options
JConfig parent = configGroup().parent();
parent.setOption(JOption.IO_RATIO, 100);
// child options
JConfig child = configGroup().child();
child.setOption(JOption.IO_RATIO, 100);
}

这段代码做了3件事。创建了boss和worker;实例化了ServerBootstrap;把参数配置起来了。仅仅只做了这些事情,很符合抽象类的风格。需要细化的操作请继承,然后自定义实现,爱咋咋地。反正最后肯定会去调用的子类实现,前提是别把我全部覆盖掉,增量去拓展就行。

说了这么多,实际上抽象实现就是对server的“大致”实现。具体的定制得交给子类完成。

具体实现

在上面的demo中实例化的一定是一个具体子类。子类通过一系列父类中的初始化方法完成了前期的准备工作:tcp参数设置、boss和worker的设置等。而正真开启一个server的方法是start()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public void start() throws InterruptedException {
start(true);
}
@Override
public void start(boolean sync) throws InterruptedException {
// wait until the server socket is bind succeed.
ChannelFuture future = bind(localAddress).sync();
if (logger.isInfoEnabled()) {
logger.info("Jupiter TCP server start" + (sync ? ", and waits until the server socket closed." : ".")
+ JConstants.NEWLINE + " {}.", toString());
}
if (sync) {
// wait until the server socket is closed.
future.channel().closeFuture().sync();
}
}

start()方法只是入口,核心是bind().

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Override
public ChannelFuture bind(SocketAddress localAddress) {
ServerBootstrap boot = bootstrap();
initChannelFactory();
boot.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new IdleStateChecker(timer, JConstants.READER_IDLE_TIME_SECONDS, 0, 0),
idleStateTrigger,
CodecConfig.isCodecLowCopy() ? new LowCopyProtocolDecoder() : new ProtocolDecoder(),
encoder,
handler);
}
});
setOptions();
return boot.bind(localAddress);
}
protected void initChannelFactory() {
SocketChannelProvider.SocketType socketType = socketType();
switch (socketType) {
case NATIVE_EPOLL:
bootstrap().channelFactory(SocketChannelProvider.NATIVE_EPOLL_ACCEPTOR);
break;
case NATIVE_KQUEUE:
bootstrap().channelFactory(SocketChannelProvider.NATIVE_KQUEUE_ACCEPTOR);
break;
case JAVA_NIO:
bootstrap().channelFactory(SocketChannelProvider.JAVA_NIO_ACCEPTOR);
break;
default:
throw new IllegalStateException("Invalid socket type: " + socketType);
}
}

不得不说,bind方法层次也很清晰。其中调用了一个initChannelFactory()方法,其实没有什么高深莫测的地方。简单理解为和下面的代码类似:

1
2
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)

到此为止,整个server的启动流程就结束了。整个流程十分干净,没有任何涉及到业务的地方。可能稍微有一点和业务沾边的地方就是编解码器。这个的确是完全耦合到这个acceptor中去的,也就是说如果你想单纯的去用这个acceptor是不行的。因为只能针对特定的网络数据格式进行处理。但是针对这个项目而言是没有任何问题的,我想也没有人会仅仅去用其中的acceptor,再说也不是提供给开发者用的,这是给自己用的。

当然,其中的比较核心的东西没有去分析。因为实在是很复杂。我打算采用抽丝剥茧的方式将其逐步细化,毕竟害怕贪多嚼不烂。接下来要讨论的是jupiter的业务编解码器的实现。


留言: