百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程网 > 正文

异步化,高并发大杀器 异步操作解决方案

yuyutoo 2024-10-14 16:20 6 浏览 0 评论

聊聊如何让项目异步化的一些事。

1.同步和异步,阻塞和非阻塞

同步和异步,阻塞和非阻塞, 这个几个词已经是老生常谈,当时常常还是有很多同学分不清楚,以为同步肯定就是阻塞,异步肯定就是非阻塞,其他他们不是一回事。

同步和异步关注的是结果消息的通信机制

  • 同步:同步的意思就是调用方需要主动等待结果的返回
  • 异步:异步的意思就是不需要主动等待结果的返回,而是通过其他手段比如,状态通知,回调函数等。

阻塞和非阻塞主要关注的是等待结果返回调用方的状态

  • 阻塞:是指结果返回之前,当前线程被挂起,不做任何事
  • 非阻塞:是指结果在返回之前,线程可以做一些其他事,不会被挂起。

可以看见同步和异步,阻塞和非阻塞主要关注的点不同,有人会问同步还能非阻塞,异步还能阻塞?当然是可以的,下面为了更好的说明他们的组合之间的意思,用几个简单的例子说明: 1.同步阻塞:同步阻塞基本也是编程中最常见的模型,打个比方你去商店买衣服,你去了之后发现衣服卖完了,那你就在店里面一直等,期间不做任何事(包括看手机),等着商家进货,直到有货为止,这个效率很低。

2.同步非阻塞:同步非阻塞在编程中可以抽象为一个轮询模式,你去了商店之后,发现衣服卖完了,这个时候不需要傻傻的等着,你可以去其他地方比如奶茶店,买杯水,但是你还是需要时不时的去商店问老板新衣服到了吗。

3.异步阻塞:异步阻塞这个编程里面用的较少,有点类似你写了个线程池,submit然后马上future.get(),这样线程其实还是挂起的。有点像你去商店买衣服,这个时候发现衣服没有了,这个时候你就给老板留给电话,说衣服到了就给我打电话,然后你就守着这个电话,一直等着他响什么事也不做。这样感觉的确有点傻,所以这个模式用得比较少。

4.异步非阻塞:异步非阻塞这也是现在高并发编程的一个核心,也是今天主要讲的一个核心。好比你去商店买衣服,衣服没了,你只需要给老板说这是我的电话,衣服到了就打。然后你就随心所欲的去玩,也不用操心衣服什么时候到,衣服一到,电话一响就可以去买衣服了。

2.同步阻塞 PK 异步非阻塞

上面已经看到了同步阻塞的效率是多么的低,如果使用同步阻塞的方式去买衣服,你有可能一天只能买一件衣服,其他什么事都不能干,如果用异步非阻塞的方式去买,买衣服只是你一天中进行的一个小事。

我们把这个映射到我们代码中,当我们的线程发生一次rpc调用或者http调用,又或者其他的一些耗时的IO调用,发起之后,如果是同步阻塞,我们的这个线程就会被阻塞挂起,直到结果返回,试想一下如果IO调用很频繁那我们的CPU使用率其实是很低很低。正所谓是物尽其用,既然CPU的使用率被IO调用搞得很低,那我们就可以使用异步非阻塞,当发生IO调用时我并不马上关心结果,我只需要把回调函数写入这次IO调用,我这个时候线程可以继续处理新的请求,当IO调用结束结束时,会调用回调函数。而我们的线程始终处于忙碌之中,这样就能做更多的有意义的事了。

这里首先要说明的是,异步化不是万能,异步化并不能缩短你整个链路调用时间长的问题,但是他能极大的提升你的最大qps。一般我们的业务中有两处比较耗时:

  • cpu: cpu耗时指的是我们的一般的业务处理逻辑,比如一些数据的运算,对象的序列化。这些异步化是不能解决的,得需要靠一些算法的优化,或者一些高性能框架。
  • iowait: io耗时就像我们上面说的,一般发生在网络调用,文件传输中等等,这个时候线程一般会挂起阻塞。而我们的异步化通常用于解决这部分的问题。

3.哪些可以异步化?

上面说了异步化是用于解决IO阻塞的问题,而我们一般项目中可以使用异步化如下:

  • servlet异步化,springmvc异步化
  • rpc调用如(dubbo,thrift),http调用异步化
  • 数据库调用,缓存调用异步化

下面我会从上面几个方面进行异步化的介绍.

4.servlet异步化

对于Java开发程序员来说servlet并不陌生吧,在项目中不论你使用struts2,还是使用的springmvc,本质上都是封装的servlet。但是我们的一般的开发,其实都是使用的同步阻塞模式如下:

上面的模式优点在于编码简单,适合在项目启动初期,访问量较少,或者是CPU运算较多的项目

缺点在于,业务逻辑线程和servlet容器线程是同一个,一般的业务逻辑总得发生点IO,比如查询数据库,比如产生RPC调用,这个时候就会发生阻塞,而我们的servlet容器线程肯定是有限的,当servlet容器线程都被阻塞的时候我们的服务这个时候就会发生拒绝访问,线程不然我当然们可以通过增加机器的一系列手段来解决这个问题,但是俗话说得好靠人不如靠自己,靠别人替我分担请求,还不如我自己搞定。所以在servlet3.0之后支持了异步化,我们采用异步化之后就会变成如下:

在这里我们采用新的线程处理业务逻辑,IO调用的阻塞就不会影响我们的serlvet了,实现异步serlvet的代码也比较简单,如下:

 
  1. @WebServlet(name = "WorkServlet",urlPatterns = "/work",asyncSupported =true)
  2. public class WorkServlet extends HttpServlet{
  3. private static final long serialVersionUID = 1L;
  4. @Override
  5. protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
  6. this.doPost(req, resp);
  7. }
  8. @Override
  9. protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
  10. //设置ContentType,关闭缓存
  11. resp.setContentType("text/plain;charset=UTF-8");
  12. resp.setHeader("Cache-Control","private");
  13. resp.setHeader("Pragma","no-cache");
  14. final PrintWriter writer= resp.getWriter();
  15. writer.println("老师检查作业了");
  16. writer.flush();
  17. List<String> zuoyes=new ArrayList<String>();
  18. for (int i = 0; i < 10; i++) {
  19. zuoyes.add("zuoye"+i);;
  20. }
  21. //开启异步请求
  22. final AsyncContext ac=req.startAsync();
  23. doZuoye(ac, zuoyes);
  24. writer.println("老师布置作业");
  25. writer.flush();
  26. }
  27. private void doZuoye(final AsyncContext ac, final List<String> zuoyes) {
  28. ac.setTimeout(1*60*60*1000L);
  29. ac.start(new Runnable() {
  30. @Override
  31. public void run() {
  32. //通过response获得字符输出流
  33. try {
  34. PrintWriter writer=ac.getResponse().getWriter();
  35. for (String zuoye:zuoyes) {
  36. writer.println("\""+zuoye+"\"请求处理中");
  37. Thread.sleep(1*1000L);
  38. writer.flush();
  39. }
  40. ac.complete();
  41. } catch (Exception e) {
  42. e.printStackTrace();
  43. }
  44. }
  45. });
  46. }
  47. }

实现serlvet的关键在于http采取了长连接,也就是当请求打过来的时候就算有返回也不会关闭,因为可能还会有数据,直到返回关闭指令。 AsyncContext ac=req.startAsync(); 用于获取异步上下文,后续我们通过这个异步上下文进行回调返回数据,有点像我们买衣服的时候,给老板一个电话,而这个上下文也是一个电话,当有衣服到的时候,也就是当有数据准备好的时候就可以打电话发送数据了。 ac.complete(); 用来进行长链接的关闭。

## 4.1springmvc异步化 现在其实很少人来进行serlvet编程,都是直接采用现成的一些框架,比如struts2,springmvc。下面介绍下使用springmvc如何进行异步化:

  • 首先确认你的项目中的Servlet是3.0以上的!!,其次springMVC4.0+
 
  1. <dependency>
  2. <groupId>javax.servlet</groupId>
  3. <artifactId>javax.servlet-api</artifactId>
  4. <version>3.1.0</version>
  5. <scope>provided</scope>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.springframework</groupId>
  9. <artifactId>spring-webmvc</artifactId>
  10. <version>4.2.3.RELEASE</version>
  11. </dependency>
  • web.xml头部声明,必须要3.0,filter和serverlet设置为异步
 
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <web-app version="3.0" xmlns="http://java.sun.com/xml/ns/javaee"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
  5. http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd">
  6. <filter>
  7. <filter-name>testFilter</filter-name>
  8. <filter-class>com.TestFilter</filter-class>
  9. <async-supported>true</async-supported>
  10. </filter>
  11. <servlet>
  12. <servlet-name>mvc-dispatcher</servlet-name>
  13. <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
  14. .........
  15. <async-supported>true</async-supported>
  16. </servlet>
  • 使用springmvc封装了servlet的AsyncContext,使用起来比较简单。以前我们同步的模式的Controller是返回额ModelAndView,而异步模式直接生成一个defrredResult(支持我们超时扩展)即可保存上下文,下面给出如何和我们HttpClient搭配的简单demo
 
  1. @RequestMapping(value="/asynctask", method = RequestMethod.GET)
  2. public DeferredResult<String> asyncTask() throws IOReactorException {
  3. IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(1).build();
  4. ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
  5. PoolingNHttpClientConnectionManager conManager = new PoolingNHttpClientConnectionManager(ioReactor);
  6. conManager.setMaxTotal(100);
  7. conManager.setDefaultMaxPerRoute(100);
  8. CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(conManager).build();
  9. // Start the client
  10. httpclient.start();
  11. //设置超时时间200ms
  12. final DeferredResult<String> deferredResult = new DeferredResult<String>(200L);
  13. deferredResult.onTimeout(new Runnable() {
  14. @Override
  15. public void run() {
  16. System.out.println("异步调用执行超时!thread id is : " + Thread.currentThread().getId());
  17. deferredResult.setResult("超时了");
  18. }
  19. });
  20. System.out.println("/asynctask 调用!thread id is : " + Thread.currentThread().getId());
  21. final HttpGet request2 = new HttpGet("http://www.apache.org/");
  22. httpclient.execute(request2, new FutureCallback<HttpResponse>() {
  23. public void completed(final HttpResponse response2) {
  24. System.out.println(request2.getRequestLine() + "->" + response2.getStatusLine());
  25. deferredResult.setResult(request2.getRequestLine() + "->" + response2.getStatusLine());
  26. }
  27. public void failed(final Exception ex) {
  28. System.out.println(request2.getRequestLine() + "->" + ex);
  29. }
  30. public void cancelled() {
  31. System.out.println(request2.getRequestLine() + " cancelled");
  32. }
  33. });
  34. return deferredResult;
  35. }

注意: 在serlvet异步化中有个问题是filter的后置结果处理,没法使用,对于我们一些打点,结果统计直接使用serlvet异步是没法用的。在springmvc中就很好的解决了这个问题,springmvc采用了一个比较取巧的方式通过请求转发,能让请求再次过滤器。但是又引入了新的一个问题那就是过滤器会处理两次,这里可以通过SpringMVC源码中自身判断的方法,我们可以在filter中使用下面这句话来进行判断是不是属于springmvc转发过来的请求,从而不处理filter的前置事件,只处理后置事件:

 
  1. Object asyncManagerAttr = servletRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE);
  2. return asyncManagerAttr instanceof WebAsyncManager ;

5.全链路异步化

上面我们介绍了serlvet的异步化,相信细心的同学都看出来似乎并没有解决根本的问题,我的IO阻塞依然存在,只是换了个位置而已,当IO调用频繁同样会让业务线程池快速变满,虽然serlvet容器线程不被阻塞,但是这个业务依然会变得不可用。

那么怎么才能解决上面的问题呢?答案就是全链路异步化,全链路异步追求的是没有阻塞,打满你的CPU,把机器的性能压榨到极致模型图如下:

具体的NIO client到底做了什么事呢,具体如下面模型:

上面就是我们全链路异步的图了(部分线程池可以优化)。全链路的核心在于只要我们遇到IO调用的时候,我们就可以使用NIO,从而避免阻塞,也就解决了之前说的业务线程池被打满得到尴尬场景。

5.1远程调用异步化

我们一般远程调用使用rpc或者http。对于rpc来说一般thrift,http,motan等支持都异步调用,其内部原理也都是采用事件驱动的NIO模型,对于http来说一般的apachehttpclient和okhttp也都提供了异步调用。 下面简单介绍下Http异步化调用是怎么做的: 首先来看一个例子:

 
  1. public class HTTPAsyncClientDemo {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException, IOReactorException {
  3. //具体参数含义下文会讲
  4. //apache提供了ioReactor的参数配置,这里我们配置IO 线程为1
  5. IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(1).build();
  6. //根据这个配置创建一个ioReactor
  7. ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
  8. //asyncHttpClient使用PoolingNHttpClientConnectionManager管理我们客户端连接
  9. PoolingNHttpClientConnectionManager conManager = new PoolingNHttpClientConnectionManager(ioReactor);
  10. //设置总共的连接的最大数量
  11. conManager.setMaxTotal(100);
  12. //设置每个路由的连接的最大数量
  13. conManager.setDefaultMaxPerRoute(100);
  14. //创建一个Client
  15. CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(conManager).build();
  16. // Start the client
  17. httpclient.start();
  18. // Execute request
  19. final HttpGet request1 = new HttpGet("http://www.apache.org/");
  20. Future<HttpResponse> future = httpclient.execute(request1, null);
  21. // and wait until a response is received
  22. HttpResponse response1 = future.get();
  23. System.out.println(request1.getRequestLine() + "->" + response1.getStatusLine());
  24. // One most likely would want to use a callback for operation result
  25. final HttpGet request2 = new HttpGet("http://www.apache.org/");
  26. httpclient.execute(request2, new FutureCallback<HttpResponse>() {
  27. //Complete成功后会回调这个方法
  28. public void completed(final HttpResponse response2) {
  29. System.out.println(request2.getRequestLine() + "->" + response2.getStatusLine());
  30. }
  31. public void failed(final Exception ex) {
  32. System.out.println(request2.getRequestLine() + "->" + ex);
  33. }
  34. public void cancelled() {
  35. System.out.println(request2.getRequestLine() + " cancelled");
  36. }
  37. });
  38. }
  39. }

下面给出httpAsync的整个类图:

对于我们的HTTPAysncClient 其实最后使用的是InternalHttpAsyncClient,在InternalHttpAsyncClient中有个ConnectionManager,这个就是我们管理连接的管理器,而在httpAsync中只有一个实现那就是PoolingNHttpClientConnectionManager,这个连接管理器中有两个我们比较关心的一个是Reactor,一个是Cpool。

  • Reactor :所有的Reactor这里都是实现了IOReactor接口。在PoolingNHttpClientConnectionManager中会有拥有一个Reactor,那就是DefaultConnectingIOReactor,这个DefaultConnectingIOReactor,负责处理Acceptor。在DefaultConnectingIOReactor有个excutor方法,生成IOReactor也就是我们图中的BaseIOReactor,进行IO的操作。这个模型就是我们上面的1.2.2的模型
  • CPool :在PoolingNHttpClientConnectionManager中有个CPool,主要是负责控制我们连接,我们上面所说的maxTotal和defaultMaxPerRoute,都是由其进行控制,如果每个路由的满了他会断开最老的一个链接,如果总共的total满了他会放入leased队列,释放空间的时候就会将其重新连接。

5.2数据库调用异步化

对于数据库调用一般的框架并没有提供异步化的方法,这里推荐自己封装或者使用网上开源的,这里我们公司有个开源的 https://github.com/ainilife/zebra-dao/blob/master/README_ZH.md 能很好的支持异步化

6.最后

异步化并不是高并发的银弹,但是有了异步化的确能提高你机器的qps,吞吐量等等。上述讲的一些模型如果能合理的做一些优化,然后进行应用,相信能对你的服务有很大的帮助的。

相关推荐

当 Linux 根分区 (/) 已满时如何释放空间?

根分区(/)是Linux文件系统的核心,包含操作系统核心文件、配置文件、日志文件、缓存和用户数据等。当根分区满载时,系统可能出现无法写入新文件、应用程序崩溃甚至无法启动的情况。常见原因包括:...

玩转 Linux 之:磁盘分区、挂载知多少?

今天来聊聊linux下磁盘分区、挂载的问题,篇幅所限,不会聊的太底层,纯当科普!!1、Linux分区简介1.1主分区vs扩展分区硬盘分区表中最多能存储四个分区,但我们实际使用时一般只分为两...

Linux 文件搜索神器 find 实战详解,建议收藏

在Linux系统使用中,作为一个管理员,我希望能查找系统中所有的大小超过200M文件,查看近7天系统中哪些文件被修改过,找出所有子目录中的可执行文件,这些任务需求...

Linux 操作系统磁盘操作(linux 磁盘命令)

一、文档介绍本文档描述Linux操作系统下多种场景下的磁盘操作情况。二、名词解释...

Win10新版19603推送:一键清理磁盘空间、首次集成Linux文件管理器

继上周四的Build19592后,微软今晨面向快速通道的Insider会员推送Windows10新预览版,操作系统版本号Build19603。除了一些常规修复,本次更新还带了不少新功能,一起来了...

Android 16允许Linux终端使用手机全部存储空间

IT之家4月20日消息,谷歌Pixel手机正朝着成为强大便携式计算设备的目标迈进。2025年3月的更新中,Linux终端应用的推出为这一转变奠定了重要基础。该应用允许兼容的安卓设备...

Linux 系统管理大容量磁盘(2TB+)操作指南

对于容量超过2TB的磁盘,传统MBR分区表的32位寻址机制存在限制(最大支持2.2TB)。需采用GPT(GUIDPartitionTable)分区方案,其支持64位寻址,理论上限为9.4ZB(9....

Linux 服务器上查看磁盘类型的方法

方法1:使用lsblk命令lsblk输出说明:TYPE列显示设备类型,如disk(物理磁盘)、part(分区)、rom(只读存储)等。...

ESXI7虚机上的Ubuntu Linux 22.04 LVM空间扩容操作记录

本人在实际的使用中经常遇到Vmware上安装的Linux虚机的LVM扩容情况,最终实现lv的扩容,大多数情况因为虚机都是有备用或者可停机的情况,一般情况下通过添加一块物理盘再加入vg,然后扩容lv来实...

5.4K Star很容易!Windows读取Linux磁盘格式工具

[开源日记],分享10k+Star的优质开源项目...

Linux 文件系统监控:用脚本自动化磁盘空间管理

在Linux系统中,文件系统监控是一项非常重要的任务,它可以帮助我们及时发现磁盘空间不足的问题,避免因磁盘满而导致的系统服务不可用。通过编写脚本自动化磁盘空间管理,我们可以更加高效地处理这一问题。下面...

Linux磁盘管理LVM实战(linux实验磁盘管理)

LVM(逻辑卷管理器,LogicalVolumeManager)是一种在Linux系统中用于灵活管理磁盘空间的技术,通过将物理磁盘抽象为逻辑卷,实现动态调整存储容量、跨磁盘扩展等功能。本章节...

Linux查看文件大小:`ls`和`du`为何结果不同?一文讲透原理!

Linux查看文件大小:ls和du为何结果不同?一文讲透原理!在Linux运维中,查看文件大小是日常高频操作。但你是否遇到过以下困惑?...

使用 df 命令检查服务器磁盘满了,但用 du 命令发现实际小于磁盘容量

在Linux系统中,管理员或开发者经常会遇到一个令人困惑的问题:使用...

Linux磁盘爆满紧急救援指南:5步清理释放50GB+小白也能轻松搞定

“服务器卡死?网站崩溃?当Linux系统弹出‘Nospaceleft’的红色警报,别慌!本文手把手教你从‘删库到跑路’进阶为‘磁盘清理大师’,5个关键步骤+30条救命命令,快速释放磁盘空间,拯救你...

取消回复欢迎 发表评论: