SpringBoot线程池和Java线程池怎么使用

免费教程   2024年02月11日 5:46  

这篇文章主要介绍“SpringBoot线程池和Java线程池怎么使用”,在日常操作中,相信很多人在SpringBoot线程池和Java线程池怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”SpringBoot线程池和Java线程池怎么使用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

SpringBoot线程池和Java线程池的用法和实现原理使用默认的线程池方式一:通过@Async注解调用publicclassAsyncTest{@Asyncpublicvoidasync(Stringname)throwsInterruptedException{System.out.println("async"+name+""+Thread.currentThread().getName());Thread.sleep(1000);}}

启动类上需要添加@EnableAsync注解,否则不会生效。

@SpringBootApplication//@EnableAsyncpublicclassTest1Application{publicstaticvoidmain(String[]args)throwsInterruptedException{ConfigurableApplicationContextrun=SpringApplication.run(Test1Application.class,args);AsyncTestbean=run.getBean(AsyncTest.class);for(intindex=0;index<=10;++index){bean.async(String.valueOf(index));}}}方式二:直接注入ThreadPoolTaskExecutor

此时可不加@EnableAsync注解

@SpringBootTestclassTest1ApplicationTests{@ResourceThreadPoolTaskExecutorthreadPoolTaskExecutor;@TestvoidcontextLoads(){Runnablerunnable=()->{System.out.println(Thread.currentThread().getName());};for(intindex=0;index<=10;++index){threadPoolTaskExecutor.submit(runnable);}}}线程池默认配置信息

SpringBoot线程池的常见配置:

spring:task:execution:pool:core-size:8max-size:16#默认是Integer.MAX_VALUEkeep-alive:60s#当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止allow-core-thread-timeout:true#是否允许核心线程超时,默认truequeue-capacity:100#线程队列的大小,默认Integer.MAX_VALUEshutdown:await-termination:false#线程关闭等待thread-name-prefix:task-#线程名称的前缀SpringBoot 线程池的实现原理

TaskExecutionAutoConfiguration类中定义了ThreadPoolTaskExecutor,该类的内部实现也是基于java原生的ThreadPoolExecutor类。initializeExecutor()方法在其父类中被调用,但是在父类中RejectedExecutionHandler被定义为了private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();,并通过initialize()方法将AbortPolicy传入initializeExecutor()中。

注意在TaskExecutionAutoConfiguration类中,ThreadPoolTaskExecutor类的bean的名称为:applicationTaskExecutor和taskExecutor。

//TaskExecutionAutoConfiguration#applicationTaskExecutor()@Lazy@Bean(name={APPLICATION_TASK_EXECUTOR_BEAN_NAME,AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME})@ConditionalOnMissingBean(Executor.class)publicThreadPoolTaskExecutorapplicationTaskExecutor(TaskExecutorBuilderbuilder){returnbuilder.build();}//ThreadPoolTaskExecutor#initializeExecutor()@OverrideprotectedExecutorServiceinitializeExecutor(ThreadFactorythreadFactory,RejectedExecutionHandlerrejectedExecutionHandler){BlockingQueue<Runnable>queue=createQueue(this.queueCapacity);ThreadPoolExecutorexecutor;if(this.taskDecorator!=null){executor=newThreadPoolExecutor(this.corePoolSize,this.maxPoolSize,this.keepAliveSeconds,TimeUnit.SECONDS,queue,threadFactory,rejectedExecutionHandler){@Overridepublicvoidexecute(Runnablecommand){Runnabledecorated=taskDecorator.decorate(command);if(decorated!=command){decoratedTaskMap.put(decorated,command);}super.execute(decorated);}};}else{executor=newThreadPoolExecutor(this.corePoolSize,this.maxPoolSize,this.keepAliveSeconds,TimeUnit.SECONDS,queue,threadFactory,rejectedExecutionHandler);}if(this.allowCoreThreadTimeOut){executor.allowCoreThreadTimeOut(true);}this.threadPoolExecutor=executor;returnexecutor;}//ExecutorConfigurationSupport#initialize()publicvoidinitialize(){if(logger.isInfoEnabled()){logger.info("InitializingExecutorService"+(this.beanName!=null?"'"+this.beanName+"'":""));}if(!this.threadNamePrefixSet&&this.beanName!=null){setThreadNamePrefix(this.beanName+"-");}this.executor=initializeExecutor(this.threadFactory,this.rejectedExecutionHandler);}覆盖默认的线程池

覆盖默认的taskExecutor对象,bean的返回类型可以是ThreadPoolTaskExecutor也可以是Executor。

@ConfigurationpublicclassThreadPoolConfiguration{@Bean("taskExecutor")publicThreadPoolTaskExecutortaskExecutor(){ThreadPoolTaskExecutortaskExecutor=newThreadPoolTaskExecutor();//设置线程池参数信息taskExecutor.setCorePoolSize(10);taskExecutor.setMaxPoolSize(50);taskExecutor.setQueueCapacity(200);taskExecutor.setKeepAliveSeconds(60);taskExecutor.setThreadNamePrefix("myExecutor--");taskExecutor.setWaitForTasksToCompleteOnShutdown(true);taskExecutor.setAwaitTerminationSeconds(60);//修改拒绝策略为使用当前线程执行taskExecutor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());//初始化线程池taskExecutor.initialize();returntaskExecutor;}}管理多个线程池

如果出现了多个线程池,例如再定义一个线程池taskExecutor2,则直接执行会报错。此时需要指定bean的名称即可。

@Bean("taskExecutor2")publicThreadPoolTaskExecutortaskExecutor2(){ThreadPoolTaskExecutortaskExecutor=newThreadPoolTaskExecutor();//设置线程池参数信息taskExecutor.setCorePoolSize(10);taskExecutor.setMaxPoolSize(50);taskExecutor.setQueueCapacity(200);taskExecutor.setKeepAliveSeconds(60);taskExecutor.setThreadNamePrefix("myExecutor2--");taskExecutor.setWaitForTasksToCompleteOnShutdown(true);taskExecutor.setAwaitTerminationSeconds(60);//修改拒绝策略为使用当前线程执行taskExecutor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());//初始化线程池taskExecutor.initialize();returntaskExecutor;}

引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。

@ResourceThreadPoolTaskExecutortaskExecutor2;

对于使用@Async注解的多线程则在注解中指定bean的名字即可。

@Async("taskExecutor2")publicvoidasync(Stringname)throwsInterruptedException{System.out.println("async"+name+""+Thread.currentThread().getName());Thread.sleep(1000);}

线程池的四种拒绝策略

JAVA常用的四种线程池

ThreadPoolExecutor类的构造函数如下:

publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueue<Runnable>workQueue){this(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,Executors.defaultThreadFactory(),defaultHandler);}newCachedThreadPool

不限制最大线程数(maximumPoolSize=Integer.MAX_VALUE),如果有空闲的线程超过需要,则回收,否则重用已有的线程。

newThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,newSynchronousQueue<Runnable>());newFixedThreadPool

定长线程池,超出线程数的任务会在队列中等待。

returnnewThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue<Runnable>());newScheduledThreadPool

类似于newCachedThreadPool,线程数无上限,但是可以指定corePoolSize。可实现延迟执行、周期执行。

publicScheduledThreadPoolExecutor(intcorePoolSize){super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}

周期执行:

ScheduledExecutorServicescheduledThreadPool=Executors.newScheduledThreadPool(5);scheduledThreadPool.scheduleAtFixedRate(()->{System.out.println("rate");},1,1,TimeUnit.SECONDS);

延时执行:

scheduledThreadPool.schedule(()->{System.out.println("delay3seconds");},3,TimeUnit.SECONDS);newSingleThreadExecutor

单线程线程池,可以实现线程的顺序执行。

publicstaticExecutorServicenewSingleThreadExecutor(){returnnewFinalizableDelegatedExecutorService(newThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueue<Runnable>()));} 线程池中的四种拒绝策略

CallerRunsPolicy:线程池让调用者去执行。

AbortPolicy:如果线程池拒绝了任务,直接报错。

DiscardPolicy:如果线程池拒绝了任务,直接丢弃。

DiscardOldestPolicy:如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。

CallerRunsPolicy

直接在主线程中执行了run方法。

publicstaticclassCallerRunsPolicyimplementsRejectedExecutionHandler{publicCallerRunsPolicy(){}publicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){if(!e.isShutdown()){r.run();}}}

效果类似于:

Runnablethread=()->{System.out.println(Thread.currentThread().getName());try{Thread.sleep(0);}catch(InterruptedExceptione){thrownewRuntimeException(e);}};thread.run();AbortPolicy

直接抛出RejectedExecutionException异常,并指示任务的信息,线程池的信息。、

publicstaticclassAbortPolicyimplementsRejectedExecutionHandler{publicAbortPolicy(){}publicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){thrownewRejectedExecutionException("Task"+r.toString()+"rejectedfrom"+e.toString());}}

DiscardPolicy

什么也不做。

publicstaticclassDiscardPolicyimplementsRejectedExecutionHandler{publicDiscardPolicy(){}publicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){}}

DiscardOldestPolicy

e.getQueue().poll(): 取出队列最旧的任务。

e.execute(r): 当前任务入队。

publicstaticclassDiscardOldestPolicyimplementsRejectedExecutionHandler{publicDiscardOldestPolicy(){}publicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){if(!e.isShutdown()){e.getQueue().poll();e.execute(r);}}}Java 线程复用的原理

java的线程池中保存的是java.util.concurrent.ThreadPoolExecutor.Worker对象,该对象在 被维护在private final HashSet<Worker> workers = new HashSet<Worker>();。workQueue是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到workQueue队列中。

privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{/***Thisclasswillneverbeserialized,butweprovidea*serialVersionUIDtosuppressajavacwarning.*/privatestaticfinallongserialVersionUID=6138294804551838833L;/**Threadthisworkerisrunningin.Nulliffactoryfails.*/finalThreadthread;/**Initialtasktorun.Possiblynull.*/RunnablefirstTask;/**Per-threadtaskcounter*/volatilelongcompletedTasks;/***CreateswithgivenfirsttaskandthreadfromThreadFactory.*@paramfirstTaskthefirsttask(nullifnone)*/Worker(RunnablefirstTask){setState(-1);//inhibitinterruptsuntilrunWorkerthis.firstTask=firstTask;this.thread=getThreadFactory().newThread(this);}/**DelegatesmainrunlooptoouterrunWorker*/publicvoidrun(){runWorker(this);}//Lockmethods////Thevalue0representstheunlockedstate.//Thevalue1representsthelockedstate.protectedbooleanisHeldExclusively(){returngetState()!=0;}protectedbooleantryAcquire(intunused){if(compareAndSetState(0,1)){setExclusiveOwnerThread(Thread.currentThread());returntrue;}returnfalse;}protectedbooleantryRelease(intunused){setExclusiveOwnerThread(null);setState(0);returntrue;}publicvoidlock(){acquire(1);}publicbooleantryLock(){returntryAcquire(1);}publicvoidunlock(){release(1);}publicbooleanisLocked(){returnisHeldExclusively();}voidinterruptIfStarted(){Threadt;if(getState()>=0&&(t=thread)!=null&&!t.isInterrupted()){try{t.interrupt();}catch(SecurityExceptionignore){}}}}

work对象的执行依赖于runWorker(),与我们平时写的线程不同,该线程处在一个循环中,并不断地从队列中获取新的任务执行。因此线程池中的线程才可以复用,而不是像我们平常使用的线程一样执行完毕就结束。

finalvoidrunWorker(Workerw){Threadwt=Thread.currentThread();Runnabletask=w.firstTask;w.firstTask=null;w.unlock();//allowinterruptsbooleancompletedAbruptly=true;try{while(task!=null||(task=getTask())!=null){w.lock();//Ifpoolisstopping,ensurethreadisinterrupted;//ifnot,ensurethreadisnotinterrupted.This//requiresarecheckinsecondcasetodealwith//shutdownNowracewhileclearinginterruptif((runStateAtLeast(ctl.get(),STOP)||(Thread.interrupted()&&runStateAtLeast(ctl.get(),STOP)))&&!wt.isInterrupted())wt.interrupt();try{beforeExecute(wt,task);Throwablethrown=null;try{task.run();}catch(RuntimeExceptionx){thrown=x;throwx;}catch(Errorx){thrown=x;throwx;}catch(Throwablex){thrown=x;thrownewError(x);}finally{afterExecute(task,thrown);}}finally{task=null;w.completedTasks++;w.unlock();}}completedAbruptly=false;}finally{processWorkerExit(w,completedAbruptly);}}

到此,关于“SpringBoot线程池和Java线程池怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

域名注册
购买VPS主机

您或许对下面这些文章有兴趣:                    本月吐槽辛苦排行榜

看贴要回贴有N种理由!看帖不回贴的后果你懂得的!


评论内容 (*必填):
(Ctrl + Enter提交)   

部落快速搜索栏

各类专题梳理

网站导航栏

X
返回顶部