使用多线程提高REST服务性能

Java基础

浏览数:157

2019-8-22

AD:资源代下载服务

目录

使用Runnable异步处理rest服务

使用DeferredResult异步处理rest服务

异步处理配置

一、前言

先来说一下为什么需要异步处理rest服务?
传统的同步处理:http请求进来,tomcat或者其他的容器会有一个相应的线程去处理http请求,所有的业务逻辑都会在这个线程中执行,最后会给出一个http响应。但是一般对于tomcat这种容器,它可以管理的线程是有数量的,当数量达到一定程度之后,再有请求进来,Tomcat就已经没办法处理了(因为所有的线程都已经在工作了)。

同步处理http请求

所谓的异步处理是什么?
异步处理指的是,当一个http请求进来之后Tomcat的主线程去调起一个副线程来执行业务逻辑,当副线程处理逻辑完成之后,主线程再将执行结果返回回去,在副线程处理业务逻辑的过程中,主线程是可以空闲出来去处理其他请求的。如果采用这种模式去处理的话,对于我们的服务器的吞吐量会有一个明显的提升

异步处理http请求

二、同步的处理方式

首先,为了效果明显,我先需要一个打印日志的对象logger

private Logger logger  = LoggerFactory.getLogger(getClass());

然后我去定义一个controller,模拟一个下订单的一个请求,其中的sleep就相当于下单的业务逻辑

 @RequestMapping("/order")
    public String order() throws InterruptedException {
        logger.info("主线程开始");
        Thread.sleep(1000);
        logger.info("主线程返回");
        return "success";
    }

最后访问这个接口,可以看到打印的输出内容:

2019-01-02 11:26:07.877  INFO 12364 --- [nio-8060-exec-1] com.tinner.web.async.AsyncController     : 主线程开始
2019-01-02 11:26:08.877  INFO 12364 --- [nio-8060-exec-1] com.tinner.web.async.AsyncController     : 主线程返回

可以看到都是一个线程[nio-8060-exec-1] 打印出来的

三、异步处理—使用Runnable

首先定义一个controller

@RequestMapping("/callable")
    public Callable<String> callable() throws InterruptedException {
        logger.info("主线程开始");
        //单开一个线程
        Callable<String> result = new Callable<String>() {
            @Override
            public String call() throws Exception {
                logger.info("副线程开始");
                Thread.sleep(1000);
                logger.info("副线程返回");
                return "success";
            }
        };
        logger.info("主线程返回");
        return result;
    }

当我们去访问的时候,可以看到打印的日志:

2019-01-02 11:37:21.098  INFO 13908 --- [nio-8060-exec-4] com.tinner.web.async.AsyncController     : 主线程开始
2019-01-02 11:37:21.099  INFO 13908 --- [nio-8060-exec-4] com.tinner.web.async.AsyncController     : 主线程返回
2019-01-02 11:37:21.108  INFO 13908 --- [      MvcAsync1] com.tinner.web.async.AsyncController     : 副线程开始
2019-01-02 11:37:22.108  INFO 13908 --- [      MvcAsync1] com.tinner.web.async.AsyncController     : 副线程返回

可以看到 主线程[nio-8060-exec-4]是在21秒开始的,几乎是在同时就返回了,副线程[MvcAsync1]也是在21秒开始,然后去睡了1秒,在22秒的时候返回了。主线程基本上没有任何的停顿,而是主线程在唤醒了副线程之后立刻就返回了。也就是说,副线程在处理业务的时间里面,主线程可以空闲出来去处理其他的业务请求。以此来提升服务器的吞吐量。

四、异步处理—使用DeferredResult

我已经知道了使用runnable去实现异步处理,为什么还需要使用DeferredResult去处理呢?是因为当我们使用runnable来异步处理的时候,副线程必须是由主线程来调起的,在真正的企业级开发里面有的时候场景是要比这个复杂的,我们还是来用下单这个例子来说明一下:


使用DeferredResult来进行异步处理

在图中可以看到,真正处理业务逻辑应用和接受下单请求的应用并不是一台服务器,是两台服务器,当应用1接受到下单请求之后,它会把这个请求放到一个消息队列mq里面,然后另一个服务器去监听这个消息队列,当它知道消息队列里面有下单的请求之后,应用2便会去处理下单的逻辑,当它将下单的业务处理完成之后,它会把处理结果放到这个消息队列中,同时在应用1里面有另外一个线程2去监听这个消息队列,当它发现这个消息队列中有处理下单的结果的时候,它会根据这个结果去返回一个http响应。
在这个场景里面,线程1和线程2完全是隔离的,它们俩谁也不知道对方的存在http请求是由线程1来处理的,而最终的处理结果是放在消息队列里面由线程2去监听的。
在这个场景下,实现Runnable是满足不了这个需求的,这时就需要用到DeferredResult

代码

我不会去开发应用2,我也不会去搭建这个消息队列,具体的做法:
1.我会用对象来模拟这个消息队列,在接受到下单请求之后会延迟一秒,处理完之后会在对象中放一个“处理完成”这样一个消息

package com.tinner.web.async;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class MockQueue {

    private Logger logger = LoggerFactory.getLogger(getClass());

    /**
     * 下单的消息
     * 当这个字符串有值的时候就认为接到了一个下单的消息
     */
    private String placeOrder;
    /**
     * 订单完成的消息
     * 当这个字符串有值的时候就认为订单处理完成
     */
    private String completeOrder;

    public String getPlaceOrder() {
        return placeOrder;
    }

    /**
     * 在收到下单请求之后睡一秒,然后相当于处理完成
     * @param placeOrder
     * @throws InterruptedException
     */
    public void setPlaceOrder(String placeOrder) throws InterruptedException {
        new Thread(() -> {
            logger.info("接到下单请求,"+placeOrder);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //订单处理完成
            this.completeOrder = placeOrder;
            logger.info("下单请求处理完成,"+placeOrder);
        }).start();
    }

    public String getCompleteOrder() {
        return completeOrder;
    }

    public void setCompleteOrder(String completeOrder) {
        this.completeOrder = completeOrder;
    }
}

2.开发线程1的处理

@Autowired
private MockQueue mockQueue;

@Autowired
private DeferredResultHolder deferredResultHolder;

@RequestMapping("/deferred")
    public DeferredResult<String> deferred() throws InterruptedException {
        logger.info("主线程开始");
        //生成一个随机的订单号
        String orderNum = RandomStringUtils.randomNumeric(8);
        //放到消息队列里面去
        mockQueue.setPlaceOrder(orderNum);

        DeferredResult<String> result = new DeferredResult();

        deferredResultHolder.getMap().put(orderNum,result);

        logger.info("主线程返回");
        return result;
    }

3.监听器(线程2)的代码,当监听到“处理完成”这个消息的时候它会把结果响应回去

package com.tinner.web.async;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

/**
 * 队列的监听器
 * ContextRefreshedEvent这个事件就是整个spring初始化完毕的一个事件
 * 监听这个事件就相当于“当系统整个启动起来之后我要做什么事情(监听消息队列里面的completeOrder中的值)”
 */
@Component
public class QueueListener implements ApplicationListener<ContextRefreshedEvent> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private MockQueue mockQueue;
    @Autowired
    private DeferredResultHolder deferredResultHolder;


    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {

        //因为是一个无限循环,所以需要单开一个线程
        new Thread(() -> {
            while (true){
                //当模拟的这个队列中订单完成的这个字段有值了,不为空
                if (StringUtils.isNotBlank(mockQueue.getCompleteOrder())){
                    String orderNum = mockQueue.getCompleteOrder();
                    logger.info("返回订单处理结果:"+orderNum);
                    //当调用setResult方法的时候就意味着整个订单处理的业务完成了,该去返回结果了
                    deferredResultHolder.getMap().get(orderNum).setResult("订单处理完成");
                    mockQueue.setCompleteOrder(null);
                }else{
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
}

4.开发DeferredResultHolder,因为我要在线程1、线程2这两个线程之间去传递DeferredResult对象,相当于是让他俩建立一定的联系

@Component
public class DeferredResultHolder {

    /**
     * key代表订单号,DeferredResult放的是处理结果
     */
    private Map<String, DeferredResult<String>> map  = new HashMap<String, DeferredResult<String>>() ;

    public Map<String, DeferredResult<String>> getMap() {
        return map;
    }

    public void setMap(Map<String, DeferredResult<String>> map) {
        this.map = map;
    }
}

运行

可以看到控制台中打印的结果:

2019-01-02 12:25:54.968  INFO 19356 --- [nio-8060-exec-1] com.tinner.web.async.AsyncController     : 主线程开始
2019-01-02 12:25:54.970  INFO 19356 --- [nio-8060-exec-1] com.tinner.web.async.AsyncController     : 主线程返回
2019-01-02 12:25:54.970  INFO 19356 --- [      Thread-37] com.tinner.web.async.MockQueue           : 接到下单请求,42147337
2019-01-02 12:25:55.970  INFO 19356 --- [      Thread-37] com.tinner.web.async.MockQueue           : 下单请求处理完成,42147337
2019-01-02 12:25:55.984  INFO 19356 --- [      Thread-24] com.tinner.web.async.QueueListener       : 返回订单处理结果:42147337

可以看到有三个线程去进行下单的这个业务逻辑:
1、主线程[nio-8060-exec-1]
2、[ Thread-37]为应用2的线程,接到下单请求然后去进行处理,
3、[ Thread-24]是应用1中的线程2监听到消息处理完毕,进行返回
这三个线程是相互隔离的,谁都不知道谁的存在,互相通过消息队列进行通讯。

五、相关异步配置

我们都知道拦截器,在webConfig中继承了WebMvcConfigurerAdapter类,在这个类中重写了addInterceptor方法去自定义拦截器的,但是在异步的情况下跟同步的处理是不一样的,里面有个configureAsyncSupport方法,用来配置异步支持的。其中的configurer有四个方法:


configurer中的方法

其中,registerCallableInterceptors和registerDeferredResultInterceptors可以针对Callable和DeferredResult两种异步方式去注册拦截器,里面有特定的异步拦截方法(比如handleTimeout异步请求如果超时了怎么处理)。
第三种方法setDefaultTimeout用来设置异步请求的超时时间,因为是开了异步线程去处理业务逻辑,那么那些线程有可能阻塞或者死掉没有响应,在多长的时间内,http就响应回去释放掉,需要用这个来设置。
第四种方法SetTaskExecutor,在默认的情况下,比如用runnable去执行的时候,Spring其实是用一个简单的异步线程池去处理的,它不是一个真正的一个线程池,而是每次都会创建一个新的线程,我们可以自定义设置一些可重用的线程池来替代Spring默认的不支持重用的线程池。

作者:Tinner丶