RPC研究: 同步调用和异步调用获取结果技术实现分析

RPC研究: 同步调用和异步调用获取结果技术实现分析

技术博客 admin 230 浏览

一、问题提出

假设 consumer 为服务消费者,provider 为服务提供者。

在 consumer 发送数据请求,直到 provider 返回数据,这一段时间, consumer 该做什么?


  1. :线程同步一直等待,直到数据返回,接着处理数据
  2. :但是线程挂起的等待,等到数据来的时候再唤醒线程,接着处理数据
  3. 不等:线程在成功发送数据请求后就直接结束,不处理后续数据
  4. 不等:线程在成功发送数据请求后就直接结束,最后调用预留的函数
  5. 其他

简言之,在进行 RPC 远程调用时,客户端是如何优雅地度过从发起请求到服务端返回数据这段时间的

带着问题,去研究一下,开源的 RPC 框架。

二、XXL-RPC

xxl-rpc:分布式服务框架XXL-RPC

参考代码:com.xxl.rpc.core.remoting.invoker.reference.XxlRpcReferenceBean。

给出了四种实现模式: 分别是 SYNC、FUTURE、CALLBACK、ONEWAY

相关类位置: com.xxl.rpc.core.remoting.invoker.call.CallType

2.1 SYNC (同步)

wait、notifyAll

使用了 wait、notifyAll 阻塞等待。

主要逻辑:

  1. 发送请求成功后,调用 Object#wait 将线程阻塞
  2. 远程 rpc 成功后,执行 notifyAll 唤醒

详细代码如下:

  1. clientInstance.asyncSend(finalAddress, xxlRpcRequest); 发送 RPC 请求

  2. futureResponse.get(timeout, TimeUnit.MILLISECONDS) 阻塞等待

详细代码如下:

com.xxl.rpc.core.remoting.net.params.XxlRpcFutureResponse#get(long, java.util.concurrent.TimeUnit)

csharp
代码解读
复制代码
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (!done) { synchronized (lock) { try { if (timeout < 0) { // 阻塞等待 lock.wait(); } else { long timeoutMillis = (TimeUnit.MILLISECONDS==unit)?timeout:TimeUnit.MILLISECONDS.convert(timeout , unit); lock.wait(timeoutMillis); } } catch (InterruptedException e) { throw e; } } } if (!done) { throw new XxlRpcException("xxl-rpc, request timeout at:"+ System.currentTimeMillis() +", request:" + request.toString()); } // 返回结果 return response; }

3. 设置 response 并执行唤醒通知

ini
代码解读
复制代码
public void setResponse(XxlRpcResponse response) { this.response = response; synchronized (lock) { done = true; lock.notifyAll(); } }

非常经典的 wait、notifyAll。

wait、notifyAll 只解决了阻塞等待问题,没有解决线程和返回数据绑定的问题。

通过代码阅读,xxl-rpc 使用 requestId,将 requestId 和 response 进行映射,放在 ConcurentHashMap 里面。服务端返回时,通过 requestId 将返回数据填充到 response 里面。

RequestId + Response + ConcurrentHashMap 组合几乎是标配方案。其他 RPC 逻辑相似。


处理逻辑大致如下:

简单小结一下:

  1. wait、notifyAll 阻塞唤醒
  2. requestId 和 response 绑定,借助 ConcurentHashMap 作为临时存储介质

CountDownLatch 方式补充

以前公司内部的一款轻量级 RPC 中,使用了CountDownLatch,使用方式大致如下:

scss
代码解读
复制代码
// 1. 设置 CountDownLatch final CountDownLatch latch = new CountDownLatch(1); final RpcResult result = new RpcResult(); // 2.发起远程 RPC 调用。 RPC 返回结果, 触发 callback 调用,修改 CountDownLatch remoteInvoke(getHost(requestLine), request, new CallBack(timeout) { @Override public void handled(Response response) throws Exception { try { Response.Status status = response.startLine(); byte[] bytes = response.body().bytes(); ...... result.setStateCode(status.code()); } catch (Exception e) { result.setCause(t); } finally { // 必须放在 final里面 latch.countDown(); } } }); ..... // 等待RPC调用完成 latch.await(xxx,xxx); .....

实现异步等待唤醒。

2.2 FUTURE (异步)

使用 ThreadLocal 绑定绑定 Future。在需要结果的时候通过 Future#get 获取结果

绑定结果具体代码如下所示:

注意,这里的 Future 接口是 JDK 并发包中的 Future 接口,那么我们获取结果,可以像下面这样异步获取吗?

实际上 XxlRpcInvokeFuture 实现 Future 接口,并实现了 get 方法()。 没有用 JDK 默认实现类,有自己的实现方法。

XxlRpcInvokeFuture 复用了 XxlRpcResponse 的能力。而 XxlRpcResponse 的 get 刚刚我们已经分析过了,采用的是 wait、notifyll

注意一个细节,就是 done 变量。

为什么会有这个变量值的存在?

试想一下,如果我们的代码先调用了 notifyAll、然后再调用 wait(), 那么这个线程将永远阻塞!


由于是异步,我们不能保证 wait 在前,notifyAll 在后,因此通过 done 变量控制,如果 notify 在前调用了, wait 就不执行调用了。 这一点还是考虑周全!

看一下使用方式:

很显然,上面三行代码相关性比较差劲,如果不清楚底层,上面的代码会让人一头雾水。


RPC 的封装对于使用者尽可能友好,且屏蔽细节。


2.3 CALLBACK (回调)

在方案二的基础上,增加了成功和失败的回调。相关代码如下:

使用如下:

逻辑实现上与方案二 FUTURE 一致。

2.4 ONEWAY (单向调用)

这种方法就是发起一个请求即完成任务,不关注返回结果,用法也十分简单。

2.5 简单总结

  • SYNC: 比较不错,巧妙地使用了 wait、notifyAll、done变量、requestId等, 算得上好的封装!
  • FUTURE:封装稍微粗糙,使用上不友好!
  • CALLBACK:封装回调中规中矩,但和 FUTURE 一样,获取结果的用法,不友好
  • ONEWAY: 仅发送请求,简单。

个人推荐用 SYNC 比较好,FUTURE、CALLBACK 不是很推荐上层封装对于使用者和阅读者不友好!

那么看一看其他的 RPC 框架,会不会好一些

三、guide-rpc-framework

guide-rpc-framework

在 github 上搜索 RPC , guide-rpc-framework 关注度高,阅读也挺容易。 根据作者描述,是一款提升技术能力的学习框架。于是 clone 学习了一下源码。

后来发现它的处理还不错!(参考了 dubbo 实现)

3.1 CompletableFuture 介绍

它使用 CompletableFuture 接口的能力。通过手动调用 CompletableFuture#complete 完成阻塞的唤醒。 非常灵活。

通过案例来理解这个 CompletableFuture 线程工具类。具体代码如下(建议理解清楚):

手动调用 CompletableFuture#complete 之前, future.get() 一直处于线程等待。

arduino
代码解读
复制代码
public class CompletableFutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { // 创建一个CompletableFuture对象 CompletableFuture<String> future = new CompletableFuture<>(); // 异步线程执行任务 new Thread(() -> { try { // 模拟异步计算,睡眠 5 秒 TimeUnit.SECONDS.sleep(5); // 计算完成,调用complete方法 future.complete("异步计算结果"); } catch (InterruptedException e) { future.completeExceptionally(e); } }).start(); // 获取CompletableFuture的结果,这里会阻塞直到异步操作完成 String result = future.get(); // String result = future.get(3,TimeUnit.SECONDS); System.out.println(result); } }

CompletableFuture#complete 既能填充结果,也能唤醒线程,功能强大!

3.2 具体实现

github.javaguide.remoting.transport.netty.client.UnprocessedRequests#complete

在完成数据请求后,回调 complete,并填充结果。

在发送之前创建 CompletableFuture 给到发送者,使之持有 CompletableFuture 的引用

get() 阻塞获取结果

这种实现方式,和 XXL-RPC 的 SYNC 效果是一致的。不过仅仅实现了一种模式!

这个框架感觉还不错,可以研究一下!不管是学习 netty、还是学习 RPC 都是入门不错的选择!

接下来看一下业界比较流行的 RPC,会不会更加优雅一些!

四、dubbo

框架地址:异步调用 | Apache Dubbo 版本3.3

4.1 具体实现

从 dubbo 进行远程调用的入口进入分析:

关键代码: org.apache.dubbo.rpc.protocol.AbstractInvoker#invoke

分析这两段代码:

scss
代码解读
复制代码
// do invoke rpc invocation and return async result AsyncRpcResult asyncResult = doInvokeAndReturn(invocation); // wait rpc result if sync waitForResultIfSync(asyncResult, invocation);

返回结果的核心类:org.apache.dubbo.rpc.AsyncRpcResult

AsyncRpcResult 也使用 CompletableFuture 对结果进行封装。 这个逻辑和 guide-rpc-framework 处理逻辑几乎是相似的

接下来我们看看发送远程请求后的逻辑。

具体代码位置:org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke

注意,这里使用了独立的 ExecutorService,且使用 url 做隔离。这样保证集群各个机器之间数据请求互不影响!

阻塞等待 waitForResultIfSync

4.2 小节

dubbo 考虑周全如下:

  • 数据包装丰富
  • 异常处理
  • 优雅关闭
  • 超时考虑
  • 其他

guide-rpc-framework 的 CompletableFuture 算是 dubbo 的简易版本。 dubbo 被企业广泛使用,可靠性不言而喻。建议有空深入研究一下!

最后再看看 sofa-rpc 的情况怎么样

五、sofa-rpc

框架地址:github.com/sofastack/s…

SOFARPC 框架提供了多种调用方式来满足不同的场景需求,包括同步调用(Sync)、异步调用(Future)、回调调用(Callback)以及单向调用(Oneway)

主要是通过 CountDownLatch 实现

5.1 invokeSync(同步)

核心代码实现在:com.alipay.remoting.BaseRemoting

默认实现:

最终通过 countDownLatch 获取异步结果

非常经典的 countDown() \ await()

5.2 invokeWithFuture(异步)

看一下具体实现:com.alipay.remoting.InvokeFuture

显然这个类也是通过 countDownLatch 实现阻塞等待的。

InvokeFuture 通过 CountDownLatch 实现异步调用。

5.3 invokeWithCallback 和 oneway

invokeWithCallback: 在 InvokeFuture 的基础上增加了回调。

oneway:单项调用,最简单,不介绍。

过分析,sofa-rpc 使用 CountDownLatch 实现了同步、异步结果的获取。

六、其他 rpc

以前公司内部 RPC,则使用 guava#ListenableFuture 来实现同步、异步结果。

com.google.common.util.concurrent.AbstractFuture 查看默认实现类,使用的是 LockSupport 核心接口

当然还有其他很多不错的 RPC,限于精力就不过多介绍。

七、最后总结

通过对几款 RPC 的分析。 远程 RPC 结果的调用方式,大致有四种:

  1. 同步等待获取结果
  2. 异步获取结果
  3. 获取结果时回调(同步、异步都可以)
  4. 单向调用

具体实现技术:

  • notify、wait
  • completableFuture
  • CountDownLatch
  • guava#ListenableFuture (LockSupport)
  • 其他

对于异步调用:主要通过对阻塞工具类的包装,持有其类的引用,在需要返回数据时通过工具类阻塞获取结果。

到此本文结束,感谢阅读!

源文:RPC研究: 同步调用和异步调用获取结果技术实现分析

如有侵权请联系站点删除!

技术合作服务热线,欢迎来电咨询!