1. 为什么要写这篇文章

创新互联是一家专注于成都网站设计、成都网站建设与策划设计,青原网站建设哪家好?创新互联做网站,专注于网站建设10年,网设计领域的专业建站公司;建站业务涵盖:青原等地区。青原做网站价格咨询:18982081108
几年前 NoSQL 开始流行的时候,像其他团队一样,我们的团队也热衷于令人兴奋的新东西,并且计划替换一个应用程序的数据库。但是,当深入实现细节时,我们想起了一位智者曾经说过的话:“细节决定成败”。最终我们意识到 NoSQL 不是解决所有问题的银弹,而 NoSQL vs RDMS 的答案是:“视情况而定”。类似地,去年RxJava 和 Spring Reactor 这样的并发库加入了让人充满激情的语句,如异步非阻塞方法等。为了避免再犯同样的错误,我们尝试评估诸如 ExecutorService、 RxJava、Disruptor 和 Akka 这些并发框架彼此之间的差异,以及如何确定各自框架的正确用法。
本文中用到的术语在这里有更详细的描述。
2. 分析并发框架的示例用例
3. 快速更新线程配置
在开始比较并发框架的之前,让我们快速复习一下如何配置最优线程数以提高并行任务的性能。这个理论适用于所有框架,并且在所有框架中使用相同的线程配置来度量性能。
参考: http://baddotrobot.com/blog/2013/06/01/optimum-number-of-threads/
4. 性能测试结果
性能测试配置 GCP -> 处理器:Intel(R) Xeon(R) CPU @ 2.30GHz;架构:x86_64;CPU 内核:8个(注意:这些结果仅对该配置有意义,并不表示一个框架比另一个框架更好)。
5. 使用执行器服务并行化 IO 任务
5.1 何时使用?
如果一个应用程序部署在多个节点上,并且每个节点的 req/sec 小于可用的核心数量,那么 ExecutorService 可用于并行化任务,更快地执行代码。
5.2 什么时候适用?
如果一个应用程序部署在多个节点上,并且每个节点的 req/sec 远远高于可用的核心数量,那么使用 ExecutorService 进一步并行化只会使情况变得更糟。
当外部服务延迟增加到 400ms 时,性能测试结果如下(请求速率 @50 req/sec,8核)。
5.3 所有任务按顺序执行示例
- // I/O 任务:调用外部服务
 - String posts = JsonService.getPosts();
 - String comments = JsonService.getComments();
 - String albums = JsonService.getAlbums();
 - String photos = JsonService.getPhotos();
 - // 合并来自外部服务的响应
 - // (内存中的任务将作为此操作的一部分执行)
 - int userId = new Random().nextInt(10) + 1;
 - String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
 - String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);
 - // 构建最终响应并将其发送回客户端
 - String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
 - return response;
 
5.4 I/O 任务与 ExecutorService 并行执行代码示例
- // 添加 I/O 任务
 - List
 > ioCallableTasks = new ArrayList<>(); - ioCallableTasks.add(JsonService::getPosts);
 - ioCallableTasks.add(JsonService::getComments);
 - ioCallableTasks.add(JsonService::getAlbums);
 - ioCallableTasks.add(JsonService::getPhotos);
 - // 调用所有并行任务
 - ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
 - List
 > futuresOfIOTasks = ioExecutorService.invokeAll(ioCallableTasks); - // 获取 I/O 操作(阻塞调用)结果
 - String posts = futuresOfIOTasks.get(0).get();
 - String comments = futuresOfIOTasks.get(1).get();
 - String albums = futuresOfIOTasks.get(2).get();
 - String photos = futuresOfIOTasks.get(3).get();
 - // 合并响应(内存中的任务是此操作的一部分)
 - String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
 - String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);
 - // 构建最终响应并将其发送回客户端
 - return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
 
6. 使用执行器服务并行化 IO 任务(CompletableFuture)
与上述情况类似:处理传入请求的 HTTP 线程被阻塞,而 CompletableFuture 用于处理并行任务。
6.1 何时使用?
如果没有 AsyncResponse,性能与 ExecutorService 相同。如果多个 API 调用必须异步并且链接起来,那么这种方法更好(类似 Node 中的 Promises)。
- ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
 - // I/O 任务
 - CompletableFuture
 postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService); - CompletableFuture
 commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments, - ioExecutorService);
 - CompletableFuture
 albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums, - ioExecutorService);
 - CompletableFuture
 photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos, - ioExecutorService);
 - CompletableFuture.allOf(postsFuture, commentsFuture, albumsFuture, photosFuture).get();
 - // 从 I/O 任务(阻塞调用)获得响应
 - String posts = postsFuture.get();
 - String comments = commentsFuture.get();
 - String albums = albumsFuture.get();
 - String photos = photosFuture.get();
 - // 合并响应(内存中的任务将是此操作的一部分)
 - String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
 - String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);
 - // 构建最终响应并将其发送回客户端
 - return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
 
7. 使用 ExecutorService 并行处理所有任务
使用 ExecutorService 并行处理所有任务,并使用 @suspended AsyncResponse response 以非阻塞方式发送响应。
图片来自 http://tutorials.jenkov.com/java-nio/nio-vs-io.html
7.1 何时使用?
如果用例类似于服务器端聊天应用程序,在客户端响应之前,线程不需要保持连接,那么异步、非阻塞方法比同步通信更受欢迎。在这些用例中,系统资源可以通过异步、非阻塞方法得到更好的利用,而不仅仅是等待。
- // 为异步执行提交并行任务
 - ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
 - CompletableFuture
 postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService); - CompletableFuture
 commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments, - ioExecutorService);
 - CompletableFuture
 albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums, - ioExecutorService);
 - CompletableFuture
 photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos, - ioExecutorService);
 - // 当 /posts API 返回响应时,它将与来自 /comments API 的响应结合在一起
 - // 作为这个操作的一部分,将执行内存中的一些任务
 - CompletableFuture
 postsAndCommentsFuture = postsFuture.thenCombineAsync(commentsFuture, - (posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments),
 - ioExecutorService);
 - // 当 /albums API 返回响应时,它将与来自 /photos API 的响应结合在一起
 - // 作为这个操作的一部分,将执行内存中的一些任务
 - CompletableFuture
 albumsAndPhotosFuture = albumsFuture.thenCombineAsync(photosFuture, - (albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos),
 - ioExecutorService);
 - // 构建最终响应并恢复 http 连接,把响应发送回客户端
 - postsAndCommentsFuture.thenAcceptBothAsync(albumsAndPhotosFuture, (s1, s2) -> {
 - LOG.info("Building Async Response in Thread " + Thread.currentThread().getName());
 - String response = s1 + s2;
 - asyncHttpResponse.resume(response);
 - }, ioExecutorService);
 
8. RxJava
8.1 何时使用?
如果编码的场景适合异步非阻塞方式,那么可以*** RxJava 或任何响应式开发库。还具有诸如 back-pressure 之类的附加功能,可以在生产者和消费者之间平衡负载。
- int userId = new Random().nextInt(10) + 1;
 - ExecutorService executor = CustomThreads.getExecutorService(8);
 - // I/O 任务
 - Observable
 postsObservable = Observable.just(userId).map(o -> JsonService.getPosts()) - .subscribeOn(Schedulers.from(executor));
 - Observable
 commentsObservable = Observable.just(userId).map(o -> JsonService.getComments()) - .subscribeOn(Schedulers.from(executor));
 - Observable
 albumsObservable = Observable.just(userId).map(o -> JsonService.getAlbums()) - .subscribeOn(Schedulers.from(executor));
 - Observable
 photosObservable = Observable.just(userId).map(o -> JsonService.getPhotos()) - .subscribeOn(Schedulers.from(executor));
 - // 合并来自 /posts 和 /comments API 的响应
 - // 作为这个操作的一部分,将执行内存中的一些任务
 - Observable
 postsAndCommentsObservable = Observable - .zip(postsObservable, commentsObservable,
 - (posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments))
 - .subscribeOn(Schedulers.from(executor));
 - // 合并来自 /albums 和 /photos API 的响应
 - // 作为这个操作的一部分,将执行内存中的一些任务
 - Observable
 albumsAndPhotosObservable = Observable - .zip(albumsObservable, photosObservable,
 - (albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos))
 - .subscribeOn(Schedulers.from(executor));
 - // 构建最终响应
 - Observable.zip(postsAndCommentsObservable, albumsAndPhotosObservable, (r1, r2) -> r1 + r2)
 - .subscribeOn(Schedulers.from(executor))
 - .subscribe((response) -> asyncResponse.resume(response), e -> asyncResponse.resume("error"));
 
9. Disruptor
[Queue vs RingBuffer]
图片1:http://tutorials.jenkov.com/java-concurrency/blocking-queues.html
图片2:https://www.baeldung.com/lmax-disruptor-concurrency
9.1 何时使用?
Disruptor 框架在下列场合性能更好:与事件驱动的体系结构一起使用,或主要关注内存任务的单个生产者和多个消费者。
- static {
 - int userId = new Random().nextInt(10) + 1;
 - // 示例 Event-Handler; count down latch 用于使线程与 http 线程同步
 - EventHandler
 postsApiHandler = (event, sequence, endOfBatch) -> { - event.posts = JsonService.getPosts();
 - event.countDownLatch.countDown();
 - };
 - // 配置 Disputor 用于处理事件
 - DISRUPTOR.handleEventsWith(postsApiHandler, commentsApiHandler, albumsApiHandler)
 - .handleEventsWithWorkerPool(photosApiHandler1, photosApiHandler2)
 - .thenHandleEventsWithWorkerPool(postsAndCommentsResponseHandler1, postsAndCommentsResponseHandler2)
 - .handleEventsWithWorkerPool(albumsAndPhotosResponseHandler1, albumsAndPhotosResponseHandler2);
 - DISRUPTOR.start();
 - }
 - // 对于每个请求,在 RingBuffer 中发布一个事件:
 - Event event = null;
 - RingBuffer
 ringBuffer = DISRUPTOR.getRingBuffer(); - long sequence = ringBuffer.next();
 - CountDownLatch countDownLatch = new CountDownLatch(6);
 - try {
 - event = ringBuffer.get(sequence);
 - event.countDownLatch = countDownLatch;
 - event.startTime = System.currentTimeMillis();
 - } finally {
 - ringBuffer.publish(sequence);
 - }
 - try {
 - event.countDownLatch.await();
 - } catch (InterruptedException e) {
 - e.printStackTrace();
 - }
 
10. Akka
图片来自:https://blog.codecentric.de/en/2015/08/introduction-to-akka-actors/
10.1 示例代码
- // 来自 controller :
 - Actors.masterActor.tell(new Master.Request("Get Response", event, Actors.workerActor), ActorRef.noSender());
 - // handler :
 - public Receive createReceive() {
 - return receiveBuilder().match(Request.class, request -> {
 - Event event = request.event; // Ideally, immutable data structures should be used here.
 - request.worker.tell(new JsonServiceWorker.Request("posts", event), getSelf());
 - request.worker.tell(new JsonServiceWorker.Request("comments", event), getSelf());
 - request.worker.tell(new JsonServiceWorker.Request("albums", event), getSelf());
 - request.worker.tell(new JsonServiceWorker.Request("photos", event), getSelf());
 - }).match(Event.class, e -> {
 - if (e.posts != null && e.comments != null & e.albums != null & e.photos != null) {
 - int userId = new Random().nextInt(10) + 1;
 - String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, e.posts,
 - e.comments);
 - String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, e.albums,
 - e.photos);
 - String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
 - e.response = response;
 - e.countDownLatch.countDown();
 - }
 - }).build();
 - }
 
11. 总结
                分享文章:鸟瞰Java并发框架
                
                网页路径:http://www.csdahua.cn/qtweb/news23/35823.html
            
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网