知识点
- 创建线程执行器
- 创建固定大小的线程执行器
- 在执行器中执行任务并返回结果
- 运行多个任务并处理第一个结果
- 运行多个任务并处理所有结果
- 在执行器中延时执行任务
- 在执行器中周期性执行任务
- 在执行器中取消任务
- 在执行器中控制任务的完成
- 在执行器中分离任务的启动与结果的处理
- 处理在执行器中被拒绝的任务
通常Java开发一个简单的并发任务,会创建Runnable对象,并创建对应的Thread的对象来执行它们。
但是,如果开发一个运行大量的并发任务,这个方法将突显以下劣势
- 必须实现所有与Thread对象管理相关的代码,如线程的创建、结束以及结果获取。
- 需要为每个任务创建一个Thread对象。如果需要执行大量的任务,这将大大地影响应用程序的处理能力。
- 计算机的资源需要高效地进行控制和管理,创建过多的线程,将会导致系统负荷过重。
自JDK5之后,Java并发API提供了一套执行器框架(Executor Framework),它围绕着Executor接口和它的子接口ExecutorService,以及实现这两个接口的ThreadPoolExecutor类展开。
这套机制分离了任务的创建和执行,通过使用执行器,仅需要实现Runnable接口的对象,然后将这些对象发送给执行器即可。
执行器通过创建所需的线程,来负责这些Runnable对象的创建、实例化以及运行。
执行器使用了线程池来提高应用程序的性能。当发送一个任务给执行器时,执行器会尝试使用线程池中的线程来执行这个任务,避免了不断地创建和销毁线程而导致系统性能下降。
执行器框架另一个重要的优势是Callable接口,它类似于Runnable接口,但提供了两方面的增加:
- 接口主方法名为call(),可以返回结果
- 当发送一个Callable对象给执行器时,将获得一个实现了Future接口的对象。可以使用这个Future对象控制Callable对象的状态和结果。
创建执行器
执行器框架的第一步是创建ThreadPoolExecutor对象。
可以使用ThreadPoolExecutor类提供的四个构造器来创建ThreadPoolExecutor对象,也可以使用Executors工厂类来创建。
创建Task类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public class Task implements Runnable{
//存储任务的创建时间
private Date initDate;
//存储任务的名称
private String name;
public Task(String name){
initDate = new Date();
this.name = name;
}
public void run(){
//创建时间
System.out.printf("%s: Task %s: Created on: %s\n", Thread.currentThread().getName(), name, initDate);
//开始时间
System.out.printf("%s: Task %s: Started on: %s\n",Thread.currentThread().getName(),name,new Date());
try{
Long duration = (long)(Math.random() * 10);
System.out.printf("%s: Task %s: Doing a task during %d seconds\n", Thread.currentThread().getName(), name, duration);
TimeUnit.SECONDS.sleep(duration);
}catch(InterruptedException e){
e.printStackTrace();
}
//结束时间
System.out.printf("%s: Task %s: Finished on: %s\n", Thread.currentThread().getName(), name, new Date());
}
}
创建Server类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public class Server{
private ThreadPoolExecutor executor;
public Server(){
executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
}
public void executeTask(Task task){
System.out.printf("Server: A new task has arrived\n");
executor.execute(task);
System.out.printf("Server: Pool Size: %d\n",executor.getPoolSize());
System.out.printf("Server: Active Count: %d\n",executor.getActiveCount());
System.out.printf("Server: Completed Tasks: %d\n",executor.getCompletedTaskCount());
}
public void endServer(){
executor.shutdown();
}
}
主类1
2
3
4
5
6
7
8
9
10public class Main{
public static void main(String[] args){
Server server = new Server();
for(int i = 0; i < 100; i++){
Task task = new Task("Task " + i);
server.executeTask(task);
}
server.endServer();
}
}
Executors工厂类的newCachedThreadPool()方法创建了一个缓存线程池。
这个方法强制转换为ThreadPoolExecutor类型,并拥有所有的方法。
如果需要执行新任务,缓存线程池就会创建新线程;如果线程所允许的任务执行完成后并且这个线程可用,那么缓存线程池将会重用这些线程。线程重用的优点是减少了创建线程所花费的时间。然而,新任务固定会依赖线程来执行,因此晕车线程池也有缺点,如果发送过多的任务给执行器,系统的负荷将会过载。
仅当线程的数量是合理的或者线程只会运行很短的时间时,适合采用Executors工厂类的newCachedThreadPool()方法来创建执行器。
一旦创建了执行器,就可以使用executed()方法来发送Runnable或Callable任务。
ThreadPoolExecutor有以下方法
- getPoolSize() 返回执行器线程池中的实际的线程数
- getActiveCount() 返回执行器中正在执行任务的线程数
- getCompletedTaskCount() 返回执行器已经完成的任务数
- getLargestPoolSize() 返回曾经同时位于线程池中的最大线程数
- shutdownNow() 这个方法立即关闭执行器。执行器将不再执行那些正在等待执行的任务。将返回等待执行任务的列表。正在执行的任务还将继续执行,但不等待这些任务完成。
- isTerminated() 如果调用了shutdown()或shutdownNow()方法,并且执行器完成了关闭的过程,那么返回true。
- isShutdown() 调用了shutdown()返回true
- awaitTermination(long timeout,TimeUnit unit) 这个方法将阻塞所调用的线程,直到执行器完成任务或者达到所指定的timeout值。如果想等待任务的结束,而不管任务的持续时间,可以使用一个大的超时时间,比如DAYS。
ThreadPoolExecutor类还有一个重要特性,必须主动地结束它。
创建固定大小的线程执行器
Executors提供了一个方法来创建一个固定大小的线程执行器。
这个执行器有一个线程数的最大值,如果发送超过这个最大值,执行器将不再创建额外的线程,剩下的任务将被阻塞直到执行器有空闲的线程可用。这可以保证执行器不会给应用程序带来性能不佳的问题。
创建Server类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class Server{
private ThreadPoolExecutor executor;
public Server(){
executor = (ThreadPoolEexcutor)Executors.newFixedThreadPool(5);
}
public void executeTask(Task task){
System.out.printf("Server: A new task has arrived\n");
executor.execute(task);
System.out.printf("Server: Pool Size: %d\n", executor.getPoolSize());
System.out.printf("Server: Active Count: %d\n", executor.getActiveCount());
System.out.printf("Server: Task Count: %d\n",executor.getTaskCount());
System.out.printf("Server: Completed Task: %d\n", executor.getCompletedTaskCount());
}
public void endService(){
executor.shutdown();
}
}
创建Task类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public class Task implements Runnable{
private Date initDate;
private String name;
public Task(String name){
initDate = new Date();
this.name = name;
}
public void run(){
System.out.printf("%s: Task %s : Created on: %s\n", Thread.currentThread().getName(), name, initDate);
System.out.printf("%s: Task %s: Started on: %s\n", Thread.currentThread().getName(), name, new Date());
try{
Long duration = (long)(Math.random() * 10);
System.out.printf("%s: Task %s: Doing a task during %d seconds\n", Thread.currentThread().getName(), name, duration);
TimeUnit.SECONDS.sleep(duration);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.printf("%s: Task %s: Finished on: %s\n", Thread.currentThread().getName(), new Date(), name);
}
}
主类1
2
3
4
5
6
7
8
9
10public class Main{
public static void main(String[] args){
Server server = new Server();
for(int i = 0; i < 100; i++){
Task task = new Task("Task " + i);
server.executeTask(task);
}
server.endServer();
}
}
使用Executors工厂类newFiexedThreadPool()创建执行器。
Executors工厂类还提供了newSingleThreadExecutors()方法。这是一个创建固定大小线程执行器的极端情况,它将创建一个只有单个线程的执行器。因此,这个执行器只能在同一时间执行一个任务。
在执行器中执行任务并返回结果
执行器框架的优势之一是,可以在运行并发任务并返回结果。
Callable:这个接口声明了call()方法。可以在这个方法里实现任务的具体逻辑操作。Callable接口时一个泛型接口,这意味着必须声明call()方法返回的数据类型。
Future:这个接口声明了一些方法来获取由Callable对象产生的结果,并管理它们的状态。
创建FactorialCalculator类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class FactorialCalculator implements Callable<Integer>{
//存储任务计算的数字
private Integer number;
public FactorialCalculator(Integer number){
this.number = number;
}
public Integer call() throws Exception{
int num, result;
num = number.intValue();
result = 1;
if((num == 0 || (num == 1)){
result = 1;
}else{
for(int i = 2; i <= number; i++){
result *= i;
Thread.sleep(20);
}
}
System.out.printf("%s: %d\n", Thread.currentThread().getName(), result);
return result;
}
}
主类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44public class Main{
public static void main(String[] args){
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
//创建一个Future<Integer>类型的列表对象resultList
List<Future<Integer>> resultList = new ArrayList<>();
Random random = new Random();
for(int i = 0; i < 10; i++){
Integer number = new Integer(random.nextInt(10));
FactorialCalculator calculator = new FactorialCalculator(number);
Future<Integer> result = executor.submit(calculator);
resultList.add(result);
}
do{
//任务完成的数量
System.out.printf("Main: Number of Completed Tasks: %d\n",executor.getCompletedTaskCount());
//遍历10个Future对象,通过isDone()输出任务是否完成
for(int i = 0;i < resultList.size(); i++){
Future<Integer> result = resultList.get(i);
System.out.printf("Main: Task %d: %s\n", result.isDone());
}
try{
Thread.sleep(50);
}catch(InterruptedException e){
e.printStackTrace();
}
}while(executor.getCompletedTaskCount()<resultList.size());
System.out.printf("Main: Results\n");
for(int i = 0 ; i < resultList.size(); i++){
Future<Integer> result = resultList.get(i);
Integer number = null;
try{
number = result.get();
}catch(InterruptedException e){
e.printStackTrace();
}catch(ExecutionException e){
e.printStackTrace();
}
System.out.printf("Core: Task %d: %d\n",i,number);
}
executor.shutdown();
}
}
实现了带有泛型参数Integer类型的Callable接口,因此这个Integer类型将作为在调用call()方法时的返回的类型。
通过submit()发送一个Callable对象给执行器去执行,这个submit()方法接受Callable对象作为参数,并返回Future对象。
Future对象可以用于以下两个主要目的
- 控制任务的状态:可以取消任务和检查任务是否已完成。为了达到这个目的,可使用isDone()方法来检查任务是否已完成。
- 通过call()方法获取返回的结果。为了达到这个目的,可使用get()方法。这个方法一直等待直到Callable对象的call()方法执行并返回结果。如果get()方法在等待结果时线程中断了,则抛出一个InterruptedException异常。如果call()方法抛出异常那么get()方法将随之抛出ExecutionException异常。
在调用Future对象的get()方法时,如果Future对象所控制的任务并未完成,那么这个方法将一直阻塞到任务完成。还有其他形式的get() - get(long timeout, TimeUnit unit):调用这个方法时,任务的结果并未准备好,则方法等待所指定的timeout时间。如果等待超过了指定的时间而任务的结果还没有准备好,那么这个方法将返回null。
运行多个任务并处理第一个结果
并发编程比较常见的一个问题,当采用多个并发任务解决一个问题时,往往只关心这些任务中的第一个结果。
比如,对一个数组进行排序有很多算法,可以并发启动所有算法,但是对于一个给定的数字,第一个得到排序结果的算法就是最快的排序算法。
创建UserValidator类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public class UserValidator{
private String name;
public UserValidator(String name){
this.name = name;
}
public boolean validate(String name, String password){
Random random = new Random();
try{
Long duration = (long)(Math.random()*10);
System.out.printf("Validator %s: Validating a user during %d seconds\n",this.name,duration);
TimeUnit.SECONDS.sleep(duration);
}catch(InterruptedException e){
return false;
}
return random.nextBoolean();
}
public String getName(){
return name;
}
}
创建TaskValidator类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public class TaskValidator implements Callable<String>{
private UserValidator validator;
private String user;
private String password;
public TaskValidator(UserValidator validator,String user, String password){
this.validator = validator;
this.user = user;
this.password = password;
}
public String call() throws Exception{
if(!validator.validate(user, password)){
//如果用户没有通过UserValidator验证,抛出Exception
System.out.printf("%s: The user has not been found\n",validator.getName());
throw new Exception("Error validating user");
}
System.out.printf("%s: The user has been found\n",validator.getName());
return validator.getName();
}
}
主类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32public class Main{
public static void main(String[] args){
String username = "test";
String password = "test";
UserValidator ldapValidator = new UserValidator("LDAP");
UserValidator dbValidator = new UserValidator("DateBase");
TaskValidator ldapTask = new TaskValidator(ldapValidator, username, password);
TaskValidator dbTask = new TaskValidator(dbValidator, username, password);
List<TaskValidator> taskList = new ArrayList<>();
taskList.add(ldapTask);
taskList.add(dbTask);
ExecutorService executor = (ExecutorService)Executors.newCachedThreadPool();
String result;
try{
result = executor.invokeAny(taskList);
System.out.printf("Main: Result: %s\n",result);
}catch(InterruptedException e){
e.printStackTrace();
}catch(ExecutionException e){
e.printStackTrace();
}
executor.shutdown();
System.out.printf("Main: End of the Execution\n");
}
}
invokeAny()方法接受一个任务类别,然后运行任务,并返回第一个完成任务并且没有抛出异常的任务的执行结果。
UserValidator对象,返回随机的boolean值。
每个UserValidator对象被TaskValidator对象使用,TaskValidator对象实现了Callable接口。
如果UserValidator类的validate()方法返回false,那么TaskValidator类将抛出Exeception异常,否则,返回true。
- 如果两个任务都返回true值,那么invokeAny()方法的结果就是首先完成任务的名称。
- 如果第一个任务返回true值,第二个任务抛出异常,那么invokeAny()方法的结果就是第一个任务的名称。
- 如果第一个任务抛出Exception异常,第二个任务返回true值,那么invokeAny()方法的结果就是第二个任务的名称。
- 如果两个任务都抛出Exception异常,那么invokeAny()方法将抛出ExecutionException异常。
ThreadPoolExecutor类还提供了invokeAny()方法的其他版本,invokeAny(Collection<? extends Callabletasks, long timeout, TimeUnit unit):这个方法执行所有的任务,如果在给定的超时期满之前某个任务已经成功完成(也就是未抛出异常),则返回其结果。
运行多个任务并处理所有结果
执行器框架(Executor Framework)允许执行并发任务不需要考略线程创建和执行。
它还提供了可以用来在先执行器中执行任务的状态和获取任务运行结果的Future类。
如想等待任务结束,可以使用如下两种方法
- 如果任务执行结束,那么Future接口的isDone()方法返回false,那么TaskValidator类将抛出Exeception异常,否则,返回true。
- 在调用shutdown()方法后,ThreadPoolExecutor类的awaitTermination()方法会将线程休眠,直到所有的任务执行结束。
创建Result类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public class Result{
private String name;
private int value;
public String getName(){
return name;
}
public void setName(String name){
this.name = name;
}
public int getValue(){
return value;
}
public void setValue(int value){
this.value = value;
}
}
创建Task类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30public class Task implements Callable<Result>{
private String name;
public Task(String name){
this.name = name;
}
public Result call() throws Exception{
System.out.printf("%s: Staring\n", this.name);
try{
Long duration = (long)(Math.random()*10);
System.out.printf("%s: Waiting %d seconds for results.\n", this.name,duration);
TimeUnit.SECONDS.sleep(duration);
}catch(InterruptedException e){
e.printStackTrace();
}
int value = 0;
for(int i = 0; i < 5; i++){
value += (int)(Math.random()*100);
}
Result result = new Result();
result.setName(this.name);
result.setValue(value);
System.out.printf("%s: Ends\n", this.name);
return result;
}
}
主类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29public class Main{
public static void main(String[] args){
ExecutorService executor = (ExecutorService)Executors.newCachedThreadPool();
List<Task> taskList = new ArrayList();
for(int i = 0; i < 3; i++){
Task task = new Task("Task-" + i);
taskList.add(task);
}
List<Future<Result>> resultList = null;
try{
resultList = executor.invokeAll(taskList);
}catch(InterruptedException e){
e.printStackTrace();
}
executor.shutdown();
System.out.printf("Core: Printing the results\n");
for(int i = 0 ; i < resultList.size(); i++){
Future<Result> future = resultList.get(i);
try{
Result result=future.get();
System.out.printf("%s: %s\n",result.getName(),result.getValue());
}catch(InterruptedException | ExecutionException){
e.printStackTrace();
}
}
}
}
通过invokeAll()方法等待所有任务的完成。
这个方法接受一个Callable对象列表,并返回一个Future对象列表。
在这个列表中,每个任务对应一个Future对象。Future对象列表中的第一个对象控制Callable列表中第一个任务,以此类推。
在存储结构的列表声明中,用在Future接口中的泛型参数的数据类型必须与Callable接口的泛型数据类型从相兼容。
使用Future对象仅用来获取任务的结果。当所有的任务执行结束时这个方法也执行结束了,如果在返回的Future对象上调用isDone()方法,那么所有的调用将返回true值。
在执行器中延时执行任务
想让任务在过一段时间后才被执行或者任务能够被周期性执行,就得用上ScheduleThreadPoolExecutor类。
创建Task类1
2
3
4
5
6
7
8
9
10
11
12public class Task implements Callable<String>{
private String name;
public Task(String name){
this.name = name;
}
public String call() throws Exception{
System.out.printf("%s: Starting at : %sn", name, new Date());
return "Hello World";
}
}
主类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public class Main{
public static void main(String[] args){
ScheduledExecutorService executor = (ScheduledExecutorService)Executors.newScheduledThreadPool(1);
System.out.printf("Main: Starting at: %sn", new Date());
for(int i = 0; i < 5; i++){
Task task = new Task("Task " + i);
executor.schedule(task, i + 1, TimeUnit.SECONDS);
}
executor.shutdown();
try{
executor.awaitTermination(1, TimeUnit.DAYS);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.printf("Core: Ends at : %sn", new Date());
}
}
ScheduleThreadPoolExecutor执行器,schedule()方法接收如下参数:
- 即将执行的任务
- 任务执行前所要等待的时间
- 等待时间的单位,由TimeUnit类的一个常量来指定
也可以使用Runnable接口来实现任务,因为ScheduledThreadPoolExecutor类的schedule()方法可以同时接受这两种类型的任务。
Java推荐仅在开发定时任务程序时采用ScheduleThreadPoolExecutor类。
在调用shutdown()时,可以配置ScheduleThreadPoolExecutor的行为,默认是不论执行器是否结束,待处理的任务仍将被执行。
可以童工setExecuteExistingDelayedTasksAfterShutdownPolicy()来改变这个行为,传递false,执行shutdown后,待处理的任务将不会被执行。
在执行器中周期性执行任务
通过ScheduleThreadPoolExecutor类来执行周期性的任务。1
2
3
4
5
6
7
8
9
10public class Task implements Runnable{
private String name;
public Task(String name){
this.name = name;
}
public void run(){
System.out.printf("%s: Executed at: %s\n", new Date());
}
}
主类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class Main{
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
System.out.printf("Main: Starting at: %s\n", new Date());
Task task = new Task("Task");
ScheduledFuture<?> result = executor.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
for(int i = 0; i < 10; i++){
System.out.printf("Main: Delay: %d\n", result.getDelay(TimeUnit.MILLISECONDS));
try{
TimeUnit.MILLISECONDS.sleep(500);
}catch(InterruptedException e){
e.printStackTrace();
}
}
executor.shutdown();
System.out.printf("Main: No More tasks at: %s\n", new Date());
try{
TimeUnit.SECONDS.sleep(5);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.printf("Main: Finished at: %s\n", new Date());
}
scheduleAtFixedRate()方法发送任务,这个方法接收4个参数
- 被周期性执行的任务
- 任务第一次执行后的延时时间
- 两次执行的时间周期
- 第2个和第3个参数的时间单位
两次执行之间的周期是指任务在两次执行开始时的时间间隔。
如果有一个周期性的任务需要5秒钟,但是却让它每3秒执行一次,那么,在任务执行的过程中将会有两个任务实例同时存在。
ScheduleThreadPoolExecutor类还提供了其他方法来安排周期性任务的运行。比如,scheduleWithFixedRate()方法。这个方法与scheduledAtFixedRate()方法具有相同的参数,但是略有一些不同需要引起注意。在scheduleAtFixedRate()中,第3个参数表示任务两次执行开始时间的间隔,而在scheduledWithFixedDelay()方法中,第3个参数则是表示任务上一次执行结束的时间与任务下一次开始执行的时间间隔。
也可以配置ScheduleThreadPoolExecutor实心shutdown()方法的行为,默认行为是当调用shutdown()方法后,定时任务就结束了。可以通过ScheduleThreadPoolExecutor类的setContinueExistingPeriodicTasksAfterShutdownPolicy()方法来改变这个行为,传递参数true给这个方法,这样shutdown之后,周期性任务仍将继续执行。
在执行器中取消任务
当线程不再需要时就销毁,Futurn接口的cancel()方法来执行取消操作。
创建Task类1
2
3
4
5
6
7
8
9public class Task implements Callable<String>{
public String call() throws Exception{
while(true){
System.out.printf("Task: Test\n");
Thread.sleep(100);
}
}
}
创建主类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class Main{
public static void main(String[] args){
ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
Task task = new Task();
System.out.printf("Main: Executing the Task\n");
Future<String> result = executor.submit(task);
try{
TimeUnit.SECONDS.sleep(2);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.printf("Main: Cancelling the Task\n");
result.cancel(true);
System.out.printf("Main: Cancelled: %s\n", result.isCancelled());
System.out.printf("Main: Done: %s\n", result.isDone());
executor.shutdown();
System.out.printf("Main: The executor has finished\n");
}
}
如果想取消一个已经发送给执行器的任务,可以使用Future接口的cancel()方法。
根据调用cancel()方法时所传递的参数以及任务的状态,这个方法的行为有些不同。
- 如果任务已经完成,或者之前已经被取消,或者由于某种原因而不能被取消,那么方法将返回false并且任务也不能取消。
- 如果任务在执行器中等待分配Thread对象来执行它,那么任务呗取消,并且不会开始执行。如果任务已经在运行,那么它依赖于调用cancel()方法时所传递的参数。如果传递的参数是true并且任务正在运行,那么任务将被取消。如果传递的参数为false并且任务正在运行,那么任务不会被取消。
在执行器中控制任务的完成
FutureTask类提供了一个名为done()的方法,允许在执行器中的任务执行结束之后,还可以执行一些代码。
这个方法可以被用来执行一些后期处理操作,比如:产生报表,通过邮件发送结果或释放一些系统资源。
当任务执行完成受FutureTask类控制时,这个方法在内部被FutureTask类调用。在任务结果设置好后以及任务的状态已经变为isDone之后,无论任务是否被取消或者正常结束,done()方法才被调用。
默认情况下,done()方法的实现为空,即没有任何具体的代码实现。我们可以覆盖FutureTask类并实现done()方法来改变这种行为。
创建ExecutableTask类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class ExecutableTask implements Callable<String>{
private String name;
public ExecutableTask(String name){
this.name = name;
}
public String call() throws Exception{
try{
Long duration = (long)(Math.random()*10);
System.out.printf("%s: Waiting %d seconds for results.n",this.name, duration);
TimeUnit.SECONDS.sleep(duration);
}catch(InterruptedException e){
}
return "Hello world. I'm " + name;
}
public String getName(){
return name;
}
}
创建ResultTask类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public class ResultTask extends FutureTask<String>{
private String name;
public ResultTask(Callable<String> callable){
super(callable);
this.name = ((ExecutableTask) callable).getName();
}
protected void done(){
if(isCancelled()){
System.out.printf("%s: Has been cancelled\n",name);
}else{
System.out.printf("%s: Has finished\n",name);
}
}
}
主类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33public class Main{
public static void main(String[] args){
ExecutorService executor = (ExecutorService)Executors.newCachedThreadPool();
ResultTask resultTasks[] = new ResultTask[5];
for(int i =0 ;i < 5; i++){
ExecutableTask executableTask = new ExecutableTask("Task " + i);
resultTasks[i] = new ResultTask(executableTask);
executor.submit(resultTasks[i]);
}
try{
TimeUnit.SECONDS.sleep(5);
}catch(InterruptedException e){
e.printStackTrace();
}
for(int i = 0; i < resultTasks.length; i++){
resultTasks[i].cancel(true);
}
for(int i = 0; i < resultTasks.length; i++){
try{
if(!resultTasks[i].isCancelled()){
System.out.printf("%s\n",resultTasks[i].get());
}
}catch(InterruptedException | ExecutionException e){
e.printStackTrace();
}
}
executor.shutdown();
}
}
当任务执行结束时,FutureTask类就会调用done()方法。
在创建好返回值以及改变任务 状态为isDone之后,FutureTask类就会在内部调用done()方法。
虽然我们无法改变任务的结果值,也无法改变任务的状态,但是可以通过任务来关闭系统资源、输出日志信息、发送通知等。
在执行器中分离任务的启动与结果的处理
如果需要在一个对象里发送任务给执行器,然后在另一个对象里处理结果。对于这种情况,Java并发APi提供了CompletionService类。
CompletionService类有一个方法用来发送任务给执行器,还有一个方法为下一个已经执行结束的任务获取Future对象。
从内部实现的机制来看,CompletionService类使用Executor对象来执行任务。这个行为的优势是可以共享CompletionService对象,并发送任务到执行器,然后其他的对象可以处理任务的结果。
第二个方法有一个不足之处,它只能为已经执行结束的任务获取Future对象,因此,这些Future对象只能被用来获取任务的结果。
创建ReportGenerator类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public class ReportGenerator implements Callable<String>{
private String sender;
private String title;
public ReportGenerator(String sender, String title){
this.sender = sender;
this.title = title;
}
public String call() throws Exception{
try{
Long duration = (long) (Math.random() * 10);
System.out.printf("%s_%s: ReportGenerator: Generating a report during %d seconds\n",this.sender, this.title, duration);
TimeUnit.SECONDS.sleep(duration);
}catch(InterruptedException e){
e.printStackTrace();
}
String ret = sender + ": " + title;
return ret;
}
}
创建ReportProcessor类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29public class ReportProcessor implements Runnable{
private CompletionService<String> service;
private boolean end;
public ReportProcessor(CompletionService<String> service){
this.service = service;
end = false;
}
public void run(){
while(!end){
try{
Future<String> result = service.poll(20, TimeUnit.SECONDS);
if(result){
String report = result.get();
System.out.printf("ReportReceiver: Report Recived: %s\n", report);
}
}catch(InterruptedException e){
e.printStackTrace();
}catch(ExecutionException e){
e.printStackTrace();
}
}
}
public void setEnd(boolean end){
this.end = end;
}
}
创建ReportRequest类,模拟请求获取报告1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public class ReportRequest implements Runnable{
private String name;
private CompletionService<String> service;
public ReportRequest(String name, CompletionService<String> service){
this.name = name;
this.service = service;
}
public void run(){
ReportGenerator reportGenerator = new ReportGenerator(name, "Report");
service.submit(reportGenerator);
}
}
创建主类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public static void main(String[] args){
ExecutorService executor = (ExecutorService)Executors.newCachedThreadPool();
CompletionService<String> service = new ExecutorCompletionService<>(executor);
ReportRequest faceRequest = new ReportRequest("Face", service);
ReportRequest onlineRequest = new ReportRequest("Online", service);
Thread faceThread = new Thread(faceRequest);
Thread onlineThread = new Thread(onlineRequest);
ReportProcessor processor = new ReportProcessor(service);
Thread senderThread = new Thread(processor);
System.out.printf("Main: Starting the Threads\n");
faceThread.start();
onlineThread.start();
senderThread.start();
try{
System.out.printf("Main: Waiting for the report generators.\n");
faceThread.join();
onlineThread.join();
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.printf("Main: Shuting down the executor.\n");
executor.shutdown();
try{
executor.awaitTermination(1, TimeUnit.DAYS);
}catch(InterruptedException e){
e.printStackTrace();
}
processor.setEnd(true);
System.out.printf("Main: Ends\n");
}
CompletionService类可以执行Callable或Runnable类型的任务,由于Runnable对象不能产生结果,因此CompletionService的基本原则不适用与此。
CompletionSe类提供了其他两种方法来获取任务已经完成的Future对象。这些方法如下:
- poll() 无参数的poll()方法用于检查队列中是否有Future对象。如果队列为空,则立即返回null。否则,它将返回队列中的第一个元素,并移除这个元素。
- take()这个方法也没有参数,它检查队列中是否有Future对象。如果队列为空,它将阻塞线程直到队列中有可用的元素。如果队列中有元素,它将返回队列中的第一个元素,并移除这个元素。
处理在执行器中被拒绝的任务
当我们想结束执行器的执行时,调用shutdown()方法来表示执行器应当结束。
但是,执行器只有等待正在运行的任务或者等待执行的任务结束后,才能真正结束。
如果在shutdown()方法与执行器结束之间发送一个任务给执行器,这个任务会被拒绝,因为这个时间段执行器已不再接受任务了。ThreadPoolExecutor类提供了一套机制,当任务呗拒绝时调用这套机制来处理它们。
创建RejectedTaskController类1
2
3
4
5
6
7
8
9public class RejectedTaskController implements RejectedExecutionHandler{
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor){
System.out.printf("RejectedTaskController: The task %s has been rejected\n",r.toString());
System.out.printf("RejectedTaskController: %s\n",executor.toString());
System.out.printf("RejectedTaskController: Terminating: %s\n",executor.isTerminating());
System.out.printf("RejectedTaksController: Terminated: %s\n",executor.isTerminated());
}
}
创建Task类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public class Task implements Runnable{
private String name;
public Task(String name){
this.name = name;
}
public void run(){
System.out.printf("Task %s : Starting\n", name);
try{
Long duration = (long)(Math.random()* 10);
System.out.printf("Task %s: ReportGenerator: Generating a report during %d seconds\n",name , duration);
TimeUnit.SECONDS.sleep(duration);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.printf("Task %s: Ending\n", name);
}
public String toString(){
return name;
}
}
主类1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public class Main{
public static void main(String[] args){
RejectedTaskController controller = new RejectedTaskController();
ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
executor.setRejectedExecutionHandler(controller);
System.out.printf("Main: Starting.\n") ;
for(int i = 0; i < 3; i++){
Task task = new Task("Task" + i);
executor.submit(task);
}
System.out.printf("Main: Shuting down the Executor.\n");
executor.shutdown();
System.out.printf("Main: Sending another Task\n");
Task task = new Task("RejectedTask");
executor.submit(task);
System.out.printf("Main: End.\n");
}
}
为了处理在执行器中被拒绝的任务,需要创建一个实现RejectedExceptionHandler接口的处理类。这个接口有一个rejectedExecution()方法,其中有两个参数:
- 一个Runnable对象,用来存储被拒绝的任务;
- 一个Executor对象,用来存储任务被拒绝的执行器。
被执行器拒绝的每一个任务都将调用这个方法。需要先调用Executor类的setRejectedExecutionHandle()方法来设置用于被拒绝的任务的处理程序。
当执行器接收一个任务并开始执行时,它先检查shutdown()方法是否已经被调用了。如果是,那么执行器就拒绝这个任务。首先,执行器会寻找通过setRejectedExecutionHandler()方法设置的用于被拒绝的任务的处理程序,如果找到一个处理程序,执行器就调用其rejectedExecution()方法:否则就抛出RejectedExecutionExeception异常。这是一个运行时异常,因此并不需要catch语句来对其进行处理。