springboot多线程TaskExecutor的使用,以及使用@Async实现异步调用

springboot多线程TaskExecutor的使用,以及使用@Async实现异步调用

目录@Async实现异步调用pom.xml启动类定义controller定义接口实现类将isDone换程CountDownLatch来判断线程是否执行完实例化CountDownLatch并且制定线程个数,线程个数就是从本地异步调用的方法个输,并且传入线程任务中,每个线程执行完毕就调用countDown()方法。最后在调用await()方法。这样在线程计数为零之前,线程就会一直等待。将CountDownLatch传给方法方法TaskExecutor的使用注册TaskExecutor使用TaskExecutor

@Async实现异步调用

pom.xml

org.springframework.boot

spring-boot-starter-web

启动类

@EnableAsync

@SpringBootApplication

public class LearnutilsApplication {

public static void main(String[] args) {

SpringApplication.run(LearnutilsApplication.class, args);

}

/**

* 核心线程数10:线程池创建时初始化的线程数

* 最大线程数20:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程

* 缓冲队列200:用来缓冲执行任务的队列

* 允许线程的空闲时间60秒:超过了核心线程数之外的线程,在空闲时间到达之后会被销毁

* 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池

* 线程池对拒绝任务的处理策略:此处采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已被关闭,则会丢弃该任务

* 设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean

* 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住

*/

@EnableAsync

@Configuration

class TaskPoolConfig{

@Bean("taskExecutor")

public Executor taskExecutor(){

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(10);

executor.setMaxPoolSize(20);

executor.setQueueCapacity(200);

executor.setKeepAliveSeconds(60);

executor.setThreadNamePrefix("TaskExecutor-");

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

executor.setWaitForTasksToCompleteOnShutdown(true);

executor.setAwaitTerminationSeconds(60);

return executor;

}

}

}

定义controller

@RequestMapping(value = "/AsyncController")

@RestController

public class AsyncController {

@Autowired

private AsyncService asyncService;

@Autowired

private AsyncService2 asyncService2;

@Autowired

private AsyncService3 asyncService3;

@GetMapping(value = "/sendSms")

public String sendSms() throws Exception{

Future sms = asyncService.sendSms();

Future sms2 = asyncService2.sendSms();

Future sms3 = asyncService3.sendSms();

int i = 0;

for (;;) {

//如果都执行完就跳出循环,isDone方法,如果此线程执行完,true

if (sms.isDone() && sms2.isDone() && sms3.isDone()) {

break;

}

}

//get是获取结果集

return sms.get()+sms2.get()+sms3.get();

}

}

定义接口

public interface AsyncService {

Future sendSms();

}

实现类

@Service

public class AsyncServiceImpl implements AsyncService {

//Future 返回结果 AsyncResult

@Async("taskExecutor")

@Override

public Future sendSms() {

return new AsyncResult<>("000000");

}

}

将isDone换程CountDownLatch来判断线程是否执行完实例化CountDownLatch并且制定线程个数,线程个数就是从本地异步调用的方法个输,并且传入线程任务中,每个线程执行完毕就调用countDown()方法。最后在调用await()方法。这样在线程计数为零之前,线程就会一直等待。

AsyncResult用来封装结果集,否则结果集无法返回

@GetMapping(value = "/sendSms2")

public String sendSms2() throws Exception{

CountDownLatch downLatch = new CountDownLatch(3);

Future s = asyncService4.sendSms(downLatch);

Future s1 = asyncService5.sendSms(downLatch);

Future s2 = asyncService6.sendSms(downLatch);

downLatch.await();

return s.get()+s1.get()+s2.get();

}

将CountDownLatch传给方法

public interface AsyncService4 {

Future sendSms(CountDownLatch downLatch);

}

方法

@Service

public class AsyncService4Impl implements AsyncService4 {

@Async("taskExecutor")

@Override

public Future sendSms(CountDownLatch downLatch) {

downLatch.countDown();

return new AsyncResult<>("11111");

}

}

TaskExecutor的使用

注册TaskExecutor

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.core.task.TaskExecutor;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**

* @author yanjun

* @date 2019/8/1 16:04

**/

@Configuration

public class MainConfiguration {

@Bean

public TaskExecutor getTaskExecutor(){

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

// 设置核心线程数

executor.setCorePoolSize(5);

// 设置最大线程数

executor.setMaxPoolSize(10);

// 设置队列容量

executor.setQueueCapacity(20);

// 设置线程活跃时间(秒)

executor.setKeepAliveSeconds(60);

// 设置默认线程名称

executor.setThreadNamePrefix("post-lending-");

// 设置拒绝策略

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

// 等待所有任务结束后再关闭线程池

executor.setWaitForTasksToCompleteOnShutdown(true);

return executor;

}

}

使用TaskExecutor

@Autowired

private TaskExecutor taskExecutor;

public ResultVO findHandlingRecordByAssociationId(Integer associationId) throws InterruptedException{

Map map = new HashMap<>(2);

//线程计数器(等待所有线程执行完统一返回)

CountDownLatch countDownLatch = new CountDownLatch(10);

taskExecutor.execute(() -> {

try {

//service调用

map.put("HandlingRecord", legalLitigationService.findHandlingRecordByAssociationId(associationId));

}finally {

countDownLatch.countDown();

}

});

taskExecutor.execute(() -> {

try {

map.put("CaseBasic", legalLitigationService.findCaseDetailsById(associationId));

}finally {

countDownLatch.countDown();

}

});

countDownLatch.await();

return ResultVO.putSuccess(map);

}

相关推荐

三深化三提升是什么,三深化三提升内容是什么
日博365官网网址多少

三深化三提升是什么,三深化三提升内容是什么

📅 07-03 👁️ 5781
除螨仪滤芯能用多久 除螨仪的过滤网要多久更换?
日博365官网网址多少

除螨仪滤芯能用多久 除螨仪的过滤网要多久更换?

📅 07-22 👁️ 8183
银行卡被锁多久能自动解锁?
日博365官网网址多少

银行卡被锁多久能自动解锁?

📅 07-03 👁️ 7406