JVM学习笔记

整数的表达

在计算机系统中,数值一律用补码来表示和存储。
原因在于,使用补码,可以将符号位和数值域统一处理;
同时,加法和减法也可以统一处理。
此外,补码与原码相互转换,其运算过程是相同的,不需要额外的硬件电路。

  • 原码——第一位为符号位(0为整数,1为负数)
  • 反码——符号位不懂,原码取反
  • 负数补码——符号为不动,反码加1
  • 正数补码——和原码相同
    1
    2
    3
    4
    5
    6
    //打印整数的二进制表示
    int a = -6;
    for(int i = 0; i < 32; i++){
    int t = (a & 0x80000000>>>i)>>>(31-i);
    System.out.print(t);
    }




JVM需要对Java Library提供以下支持

  • 反射java.lang.reflect
  • ClassLoader
  • 初始化class和interface
  • 安全相关java.security
  • 多线程
  • 弱引用



JVM启动流程

Mark-Down




Java内存模型

Mark-Down

Java栈是线程私有的,Java堆是全局共享的。

PC寄存器

每个线程拥有一个PC寄存器
在线程创建时创建
指向下一条指令的地址
执行本地方法时,PC的值为undefined

方法区

保存装载的类信息

  • 类型的常量池
  • 字段,方法信息
  • 方法字节码
    通常和永久区(Perm)关联在一起

    Java堆

    和程序开发密切相关
    应用系统对象都保存在Java堆中
    所有线程共享Java堆
    对分代GC来说,堆也是分代的
    GC的主要工作区间
    Mark-Down

Java栈

线程私有
栈由一系列帧组成(因此Java栈也叫做帧栈)
帧保存一个方法的局部变量、操作数栈、常量池指针
每一个方法调用创建一个帧,并压栈

1
2
3
4
5
6
7
8
9
public class StackDemo{
public static int runStatic(int i, long l, float f, Object o, byte b){
return 0;
}

public int runInstance(char c, short s, boolean b){
return 0;
}
}

Mark-Down




Java栈上分配

小对象(一般几十个bytes),在没有逃逸的情况下,可以直接分配在栈上
直接分配在栈上,可以自动回收,减轻GC压力
大对象或者逃逸对象无法栈上分配



栈、堆、方法区交互

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class AppMain{ //运行时,jvm把appmain的信息都放入方法区
public static void main(String[] args){ //main方法本身放入方法区
Sample test1 = new Sample("测试1"); //test1引用,所以放到栈区,Sample是自定义对象应该放到堆里面
Sample test2 = new Sample("测试2");

test1.printName();
test2.printName();
}
}

public class Sample{ //运行时,jvm把appmain的信息都放入方法区
private String name; //new Sample实例后,name引用放入栈区里name对象放入堆里
public Sample(String name){
this.name = name;
}

//print方法本身放入方法区里
public void printName(){
System.out.println(name);
}

}

Mark-Down




工作内存和主存

当数据从主内存复制到工作存储时,必须出现两个动作

  • 由主内存执行的读(read)操作
  • 由工作内存执行的相应的load操作


    当数据从工作内存拷贝到主内存时,也出现两个操作
  • 由工作内存执行的存储(store)操作
  • 由主内存执行的相应的写(write)操作


    每一个操作都是原子的,即执行期间不会被中断
    对于普通变量,一个线程中更新的值,不能马上反应在其他变量中
    如果需要在其他线程中立即可见,需要使用volatile关键字
    Mark-Down

有序性

  • 在本线程内,操作都是有序的
  • 在线程外观察,操作都是无序的。(指令重排或主内存同步延时)

    指令重排的基本原则

  • 程序顺序原则:一个线程内保证语义的串行性
  • volatile规则:volatile变量的写,先发生于读
  • 锁规则:解锁(unlock)必然发生在随后的加锁(lock)前
  • 传递性:A先于B,B先于C,那么A必然先于C
  • 线程的start方法先于它的每一个动作
  • 线程的所有操作先于线程的终结(Thread.join())
  • 线程的中断(interrupt())先于被中断线程的代码
  • 对象的构造函数执行结束先于finalize()方法



    几种常见的GC算法

  • 引用计数法
  • 标记-清除法
  • 标记-压缩法
  • 复制算法

    引用计数法

    老牌垃圾回收算法
    通过引用计数来回收垃圾
    使用者:COM、ActionScript3、Python
    缺陷:引用和去引用伴随加法和减法,影响性能;很难处理循环引用问题

    标记-清除

    标记阶段:通过根节点,标记所有从根节点开始的可达对象,因此未被标记的对象就是未被引用的垃圾对象。
    清除阶段:清除所有未被标记的对象。
    Mark-Down


标记-压缩

它是标记-清除的优化,先标记,但之后并不简单的清理未被标记的对象,而是将所有的存活对象压缩到内存的一端。
之后,清理边界外所有空间。这样做,减少了内存碎片。
[标记-压缩] 算法适用于存活对象较多的场合,如老年代。
Mark-Down


复制算法

与标记-清除算法相比,复制算法是一种相对高效的回收方法。
不适用于存活对象较多的场合,如老年代。
将原有的内存空间分为两块,每次只使用其中一块,在垃圾回收时,将正在使用的内存中的存活对象复制到未使用的内存块中,
之后,清除正在使用的内存块中的所有对象,交换两个内存的角色,完成垃圾回收。
Mark-Down

复制算法的最大问题是:空间浪费 整合标记清理思想
Mark-Down


分代思想

  • 依据对象的存活周期进行分类,短命对象归为新生代,长命对象归为老年代。
  • 根据不同代的特点,选取合适的收集算法:少量对象存活,适合复制算法;大量对象存活,适合标记清理或标记压缩算法。

    可触及性

  • 可触及的:从根节点可以触及到这个对象
  • 可复活的:一旦所有引用被释放,就是可复活状态,因为finalize()中可能复活该对象
  • 不可触及的:在finalize()后,可能会进入不可触及状态,不可触及的对象不可能复活,可以回收
    避免使用finalize(),操作不慎可能导致错误。可以使用try-catch-finally来替代它。

STOP_THE_WORLD

Java中一种全局暂停的现象。
全局停顿,所有Java代码停止,native可以执行,但不能和JVM交互。
多半由于GC引起。

  • Dump线程
  • 死锁检查
  • 堆Dump
    危害:长时间服务停止,没有响应;遇到HA系统,可能引起主备切换,严重危害生产环境。

类装载器

class装载验证流程

加载——>链接[验证、准备、解析]——>初始化

加载

装载类的第一个阶段
取得类的二进制流
转为方法区数据结构
在Java堆中生成对应的java.lang.Class对象

验证

目的:保证Class流的格式是正确的

  • 文件格式的验证
  • 元数据验证
  • 字节码验证(很复杂)
  • 符号引用验证

    准备

    分配内存,并为类设置初始值(方法区中)
    1
    2
    public static int v = 1; //在准备阶段找那个,v会被设置为0,在初始化的<clinit>中才会被设置为1
    public static final int v = 1; //对于static final类型,在准备阶段就会被赋上正确的值

解析

符号引用替换为直接引用。

初始化

执行类构造器,static变量赋值语句,static{}语句。
子类的调用前保证父类的被调用。

是线程安全的。

什么是ClassLoader

ClassLoader是一个抽象
ClassLoader的实例将读入Java字节码将类装载到JVM中
ClassLoader可以定制,满足不同的字节码流获取方式
ClassLoader负责类装载过程中的加载阶段
ClassLoad的重要方法

  • public Class<?> loadClass(String name)throws ClassNotFoundException 载入并返回一个Class
  • protected final Class<?> defineClass(byte[] b, int off, int len) 定义一个类,不公开调用
  • protected Class<?> findClass(String name) throws ClassNotFoundException loadClass回调方法,自定义ClassLoader的推荐做法
  • protected final Class<?> findLoadedClass(String name) 寻找已经加载的类

JDK中ClassLoader默认设计模式

BootStrap ClassLoader(启动ClassLoader)
Extension ClassLoader(扩展ClassLoader)
App ClassLoader(应用ClassLoader/系统ClassLoader)
Custom ClassLoader(自定义ClassLoader)
每个ClassLoader都有一个Parent作为父亲。
Mark-Down

双亲模式的问题:顶层ClassLoader,无法加载底层ClassLoader的类。
解决办法:Thread.setContextClassLoader(),上下文加载器,基本思想是,在顶层ClassLoader中,传入底层ClassLoader的实例。
双亲模式是默认的模式,但不是必须这么做,Tomcat的WebappClassLoader就会先加载自己的Class,找不到再委托parent;
OSGi的ClassLoader形成网状结构,根据需要自由加载Class。

Java核心技术阅读笔记

在类之间,最常见的关系有:

  • 依赖(uses-a)
  • 聚合(has-a)
  • 继承(is-a)

订单系统

  • 项目(Item)
  • 订单(Order)
  • 送货地址(Shipping address)
  • 付款(Payment)
  • 账户(Account)

表达类关系的UML符号

Mark-Down

Mark-Down

2016大计划

2016已经快要过去一半 计划永远赶不上变化

  • 掌握前端技术
  • 掌握后端技术

前端技术

  • 会熟练使用html和css
  • 掌握javascript
  • 掌握js框架(了解angular、react、vue、jquery)
  • 掌握前端自动化(glup webpack)

后端技术

  • 精通J2EE
  • 使用php搭建后台
  • 掌握php框架
  • 学会nodejs

电商项目

完成网页前端和后端,时间允许在做手机端适配

Java多线程 0x04

知识点

  • 创建线程执行器
  • 创建固定大小的线程执行器
  • 在执行器中执行任务并返回结果
  • 运行多个任务并处理第一个结果
  • 运行多个任务并处理所有结果
  • 在执行器中延时执行任务
  • 在执行器中周期性执行任务
  • 在执行器中取消任务
  • 在执行器中控制任务的完成
  • 在执行器中分离任务的启动与结果的处理
  • 处理在执行器中被拒绝的任务

通常Java开发一个简单的并发任务,会创建Runnable对象,并创建对应的Thread的对象来执行它们。
但是,如果开发一个运行大量的并发任务,这个方法将突显以下劣势

  • 必须实现所有与Thread对象管理相关的代码,如线程的创建、结束以及结果获取。
  • 需要为每个任务创建一个Thread对象。如果需要执行大量的任务,这将大大地影响应用程序的处理能力。
  • 计算机的资源需要高效地进行控制和管理,创建过多的线程,将会导致系统负荷过重。

自JDK5之后,Java并发API提供了一套执行器框架(Executor Framework),它围绕着Executor接口和它的子接口ExecutorService,以及实现这两个接口的ThreadPoolExecutor类展开。
这套机制分离了任务的创建和执行,通过使用执行器,仅需要实现Runnable接口的对象,然后将这些对象发送给执行器即可。
执行器通过创建所需的线程,来负责这些Runnable对象的创建、实例化以及运行。
执行器使用了线程池来提高应用程序的性能。当发送一个任务给执行器时,执行器会尝试使用线程池中的线程来执行这个任务,避免了不断地创建和销毁线程而导致系统性能下降。

执行器框架另一个重要的优势是Callable接口,它类似于Runnable接口,但提供了两方面的增加:

  • 接口主方法名为call(),可以返回结果
  • 当发送一个Callable对象给执行器时,将获得一个实现了Future接口的对象。可以使用这个Future对象控制Callable对象的状态和结果。


    创建执行器

    执行器框架的第一步是创建ThreadPoolExecutor对象。
    可以使用ThreadPoolExecutor类提供的四个构造器来创建ThreadPoolExecutor对象,也可以使用Executors工厂类来创建。
    创建Task类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    public class Task implements Runnable{
    //存储任务的创建时间
    private Date initDate;

    //存储任务的名称
    private String name;

    public Task(String name){
    initDate = new Date();
    this.name = name;
    }

    @Override
    public void run(){
    //创建时间
    System.out.printf("%s: Task %s: Created on: %s\n", Thread.currentThread().getName(), name, initDate);
    //开始时间
    System.out.printf("%s: Task %s: Started on: %s\n",Thread.currentThread().getName(),name,new Date());

    try{
    Long duration = (long)(Math.random() * 10);
    System.out.printf("%s: Task %s: Doing a task during %d seconds\n", Thread.currentThread().getName(), name, duration);
    TimeUnit.SECONDS.sleep(duration);
    }catch(InterruptedException e){
    e.printStackTrace();
    }
    //结束时间
    System.out.printf("%s: Task %s: Finished on: %s\n", Thread.currentThread().getName(), name, new Date());
    }

    }

创建Server类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Server{
private ThreadPoolExecutor executor;

public Server(){
executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
}

public void executeTask(Task task){
System.out.printf("Server: A new task has arrived\n");
executor.execute(task);
System.out.printf("Server: Pool Size: %d\n",executor.getPoolSize());
System.out.printf("Server: Active Count: %d\n",executor.getActiveCount());
System.out.printf("Server: Completed Tasks: %d\n",executor.getCompletedTaskCount());
}

public void endServer(){
executor.shutdown();
}
}

主类

1
2
3
4
5
6
7
8
9
10
public class Main{
public static void main(String[] args){
Server server = new Server();
for(int i = 0; i < 100; i++){
Task task = new Task("Task " + i);
server.executeTask(task);
}
server.endServer();
}
}

Executors工厂类的newCachedThreadPool()方法创建了一个缓存线程池。
这个方法强制转换为ThreadPoolExecutor类型,并拥有所有的方法。
如果需要执行新任务,缓存线程池就会创建新线程;如果线程所允许的任务执行完成后并且这个线程可用,那么缓存线程池将会重用这些线程。线程重用的优点是减少了创建线程所花费的时间。然而,新任务固定会依赖线程来执行,因此晕车线程池也有缺点,如果发送过多的任务给执行器,系统的负荷将会过载。

仅当线程的数量是合理的或者线程只会运行很短的时间时,适合采用Executors工厂类的newCachedThreadPool()方法来创建执行器。

一旦创建了执行器,就可以使用executed()方法来发送Runnable或Callable任务。
ThreadPoolExecutor有以下方法

  • getPoolSize() 返回执行器线程池中的实际的线程数
  • getActiveCount() 返回执行器中正在执行任务的线程数
  • getCompletedTaskCount() 返回执行器已经完成的任务数
  • getLargestPoolSize() 返回曾经同时位于线程池中的最大线程数
  • shutdownNow() 这个方法立即关闭执行器。执行器将不再执行那些正在等待执行的任务。将返回等待执行任务的列表。正在执行的任务还将继续执行,但不等待这些任务完成。
  • isTerminated() 如果调用了shutdown()或shutdownNow()方法,并且执行器完成了关闭的过程,那么返回true。
  • isShutdown() 调用了shutdown()返回true
  • awaitTermination(long timeout,TimeUnit unit) 这个方法将阻塞所调用的线程,直到执行器完成任务或者达到所指定的timeout值。如果想等待任务的结束,而不管任务的持续时间,可以使用一个大的超时时间,比如DAYS。
    ThreadPoolExecutor类还有一个重要特性,必须主动地结束它。


创建固定大小的线程执行器

Executors提供了一个方法来创建一个固定大小的线程执行器。
这个执行器有一个线程数的最大值,如果发送超过这个最大值,执行器将不再创建额外的线程,剩下的任务将被阻塞直到执行器有空闲的线程可用。这可以保证执行器不会给应用程序带来性能不佳的问题。
创建Server类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Server{
private ThreadPoolExecutor executor;

public Server(){
executor = (ThreadPoolEexcutor)Executors.newFixedThreadPool(5);
}

public void executeTask(Task task){
System.out.printf("Server: A new task has arrived\n");
executor.execute(task);
System.out.printf("Server: Pool Size: %d\n", executor.getPoolSize());
System.out.printf("Server: Active Count: %d\n", executor.getActiveCount());
System.out.printf("Server: Task Count: %d\n",executor.getTaskCount());
System.out.printf("Server: Completed Task: %d\n", executor.getCompletedTaskCount());
}

public void endService(){
executor.shutdown();
}
}

创建Task类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Task implements Runnable{
private Date initDate;
private String name;

public Task(String name){
initDate = new Date();
this.name = name;
}

@Override
public void run(){
System.out.printf("%s: Task %s : Created on: %s\n", Thread.currentThread().getName(), name, initDate);
System.out.printf("%s: Task %s: Started on: %s\n", Thread.currentThread().getName(), name, new Date());

try{
Long duration = (long)(Math.random() * 10);
System.out.printf("%s: Task %s: Doing a task during %d seconds\n", Thread.currentThread().getName(), name, duration);
TimeUnit.SECONDS.sleep(duration);
}catch(InterruptedException e){
e.printStackTrace();
}

System.out.printf("%s: Task %s: Finished on: %s\n", Thread.currentThread().getName(), new Date(), name);
}
}

主类

1
2
3
4
5
6
7
8
9
10
public class Main{
public static void main(String[] args){
Server server = new Server();
for(int i = 0; i < 100; i++){
Task task = new Task("Task " + i);
server.executeTask(task);
}
server.endServer();
}
}

使用Executors工厂类newFiexedThreadPool()创建执行器。
Executors工厂类还提供了newSingleThreadExecutors()方法。这是一个创建固定大小线程执行器的极端情况,它将创建一个只有单个线程的执行器。因此,这个执行器只能在同一时间执行一个任务。

 

在执行器中执行任务并返回结果

执行器框架的优势之一是,可以在运行并发任务并返回结果。
Callable:这个接口声明了call()方法。可以在这个方法里实现任务的具体逻辑操作。Callable接口时一个泛型接口,这意味着必须声明call()方法返回的数据类型。
Future:这个接口声明了一些方法来获取由Callable对象产生的结果,并管理它们的状态。
创建FactorialCalculator类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class FactorialCalculator implements Callable<Integer>{
//存储任务计算的数字
private Integer number;

public FactorialCalculator(Integer number){
this.number = number;
}

@Override
public Integer call() throws Exception{
int num, result;
num = number.intValue();
result = 1;

if((num == 0 || (num == 1)){
result = 1;
}else{
for(int i = 2; i <= number; i++){
result *= i;
Thread.sleep(20);
}
}
System.out.printf("%s: %d\n", Thread.currentThread().getName(), result);
return result;
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class Main{
public static void main(String[] args){
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
//创建一个Future<Integer>类型的列表对象resultList
List<Future<Integer>> resultList = new ArrayList<>();
Random random = new Random();
for(int i = 0; i < 10; i++){
Integer number = new Integer(random.nextInt(10));
FactorialCalculator calculator = new FactorialCalculator(number);
Future<Integer> result = executor.submit(calculator);
resultList.add(result);
}

do{
//任务完成的数量
System.out.printf("Main: Number of Completed Tasks: %d\n",executor.getCompletedTaskCount());
//遍历10个Future对象,通过isDone()输出任务是否完成
for(int i = 0;i < resultList.size(); i++){
Future<Integer> result = resultList.get(i);
System.out.printf("Main: Task %d: %s\n", result.isDone());
}
try{
Thread.sleep(50);
}catch(InterruptedException e){
e.printStackTrace();
}

}while(executor.getCompletedTaskCount()<resultList.size());
System.out.printf("Main: Results\n");
for(int i = 0 ; i < resultList.size(); i++){
Future<Integer> result = resultList.get(i);
Integer number = null;
try{
number = result.get();
}catch(InterruptedException e){
e.printStackTrace();
}catch(ExecutionException e){
e.printStackTrace();
}
System.out.printf("Core: Task %d: %d\n",i,number);
}
executor.shutdown();
}
}

实现了带有泛型参数Integer类型的Callable接口,因此这个Integer类型将作为在调用call()方法时的返回的类型。
通过submit()发送一个Callable对象给执行器去执行,这个submit()方法接受Callable对象作为参数,并返回Future对象。
Future对象可以用于以下两个主要目的

  • 控制任务的状态:可以取消任务和检查任务是否已完成。为了达到这个目的,可使用isDone()方法来检查任务是否已完成。
  • 通过call()方法获取返回的结果。为了达到这个目的,可使用get()方法。这个方法一直等待直到Callable对象的call()方法执行并返回结果。如果get()方法在等待结果时线程中断了,则抛出一个InterruptedException异常。如果call()方法抛出异常那么get()方法将随之抛出ExecutionException异常。
    在调用Future对象的get()方法时,如果Future对象所控制的任务并未完成,那么这个方法将一直阻塞到任务完成。还有其他形式的get()
  • get(long timeout, TimeUnit unit):调用这个方法时,任务的结果并未准备好,则方法等待所指定的timeout时间。如果等待超过了指定的时间而任务的结果还没有准备好,那么这个方法将返回null。


    运行多个任务并处理第一个结果

    并发编程比较常见的一个问题,当采用多个并发任务解决一个问题时,往往只关心这些任务中的第一个结果。
    比如,对一个数组进行排序有很多算法,可以并发启动所有算法,但是对于一个给定的数字,第一个得到排序结果的算法就是最快的排序算法。
    创建UserValidator类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    public class UserValidator{
    private String name;
    public UserValidator(String name){
    this.name = name;
    }
    public boolean validate(String name, String password){
    Random random = new Random();
    try{
    Long duration = (long)(Math.random()*10);
    System.out.printf("Validator %s: Validating a user during %d seconds\n",this.name,duration);
    TimeUnit.SECONDS.sleep(duration);
    }catch(InterruptedException e){
    return false;
    }
    return random.nextBoolean();
    }

    public String getName(){
    return name;
    }
    }

创建TaskValidator类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class TaskValidator implements Callable<String>{
private UserValidator validator;
private String user;
private String password;
public TaskValidator(UserValidator validator,String user, String password){
this.validator = validator;
this.user = user;
this.password = password;
}

@Override
public String call() throws Exception{
if(!validator.validate(user, password)){
//如果用户没有通过UserValidator验证,抛出Exception
System.out.printf("%s: The user has not been found\n",validator.getName());
throw new Exception("Error validating user");
}
System.out.printf("%s: The user has been found\n",validator.getName());
return validator.getName();
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Main{
public static void main(String[] args){
String username = "test";
String password = "test";

UserValidator ldapValidator = new UserValidator("LDAP");
UserValidator dbValidator = new UserValidator("DateBase");

TaskValidator ldapTask = new TaskValidator(ldapValidator, username, password);

TaskValidator dbTask = new TaskValidator(dbValidator, username, password);

List<TaskValidator> taskList = new ArrayList<>();
taskList.add(ldapTask);
taskList.add(dbTask);

ExecutorService executor = (ExecutorService)Executors.newCachedThreadPool();
String result;
try{
result = executor.invokeAny(taskList);
System.out.printf("Main: Result: %s\n",result);
}catch(InterruptedException e){
e.printStackTrace();
}catch(ExecutionException e){
e.printStackTrace();
}

executor.shutdown();
System.out.printf("Main: End of the Execution\n");

}
}

invokeAny()方法接受一个任务类别,然后运行任务,并返回第一个完成任务并且没有抛出异常的任务的执行结果。
UserValidator对象,返回随机的boolean值。
每个UserValidator对象被TaskValidator对象使用,TaskValidator对象实现了Callable接口。
如果UserValidator类的validate()方法返回false,那么TaskValidator类将抛出Exeception异常,否则,返回true。

  • 如果两个任务都返回true值,那么invokeAny()方法的结果就是首先完成任务的名称。
  • 如果第一个任务返回true值,第二个任务抛出异常,那么invokeAny()方法的结果就是第一个任务的名称。
  • 如果第一个任务抛出Exception异常,第二个任务返回true值,那么invokeAny()方法的结果就是第二个任务的名称。
  • 如果两个任务都抛出Exception异常,那么invokeAny()方法将抛出ExecutionException异常。
    ThreadPoolExecutor类还提供了invokeAny()方法的其他版本,invokeAny(Collection<? extends Callable tasks, long timeout, TimeUnit unit):这个方法执行所有的任务,如果在给定的超时期满之前某个任务已经成功完成(也就是未抛出异常),则返回其结果。



运行多个任务并处理所有结果

执行器框架(Executor Framework)允许执行并发任务不需要考略线程创建和执行。
它还提供了可以用来在先执行器中执行任务的状态和获取任务运行结果的Future类。
如想等待任务结束,可以使用如下两种方法

  • 如果任务执行结束,那么Future接口的isDone()方法返回false,那么TaskValidator类将抛出Exeception异常,否则,返回true。
  • 在调用shutdown()方法后,ThreadPoolExecutor类的awaitTermination()方法会将线程休眠,直到所有的任务执行结束。
    创建Result类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class Result{
    private String name;
    private int value;
    public String getName(){
    return name;
    }
    public void setName(String name){
    this.name = name;
    }
    public int getValue(){
    return value;
    }
    public void setValue(int value){
    this.value = value;
    }
    }

创建Task类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Task implements Callable<Result>{
private String name;
public Task(String name){
this.name = name;
}

@Override
public Result call() throws Exception{
System.out.printf("%s: Staring\n", this.name);

try{
Long duration = (long)(Math.random()*10);
System.out.printf("%s: Waiting %d seconds for results.\n", this.name,duration);
TimeUnit.SECONDS.sleep(duration);
}catch(InterruptedException e){
e.printStackTrace();
}
int value = 0;
for(int i = 0; i < 5; i++){
value += (int)(Math.random()*100);
}

Result result = new Result();
result.setName(this.name);
result.setValue(value);
System.out.printf("%s: Ends\n", this.name);

return result;
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Main{
public static void main(String[] args){
ExecutorService executor = (ExecutorService)Executors.newCachedThreadPool();
List<Task> taskList = new ArrayList();
for(int i = 0; i < 3; i++){
Task task = new Task("Task-" + i);
taskList.add(task);
}

List<Future<Result>> resultList = null;
try{
resultList = executor.invokeAll(taskList);
}catch(InterruptedException e){
e.printStackTrace();
}
executor.shutdown();

System.out.printf("Core: Printing the results\n");
for(int i = 0 ; i < resultList.size(); i++){
Future<Result> future = resultList.get(i);
try{
Result result=future.get();
System.out.printf("%s: %s\n",result.getName(),result.getValue());
}catch(InterruptedException | ExecutionException){
e.printStackTrace();
}
}
}
}

通过invokeAll()方法等待所有任务的完成。
这个方法接受一个Callable对象列表,并返回一个Future对象列表。
在这个列表中,每个任务对应一个Future对象。Future对象列表中的第一个对象控制Callable列表中第一个任务,以此类推。
在存储结构的列表声明中,用在Future接口中的泛型参数的数据类型必须与Callable接口的泛型数据类型从相兼容。
使用Future对象仅用来获取任务的结果。当所有的任务执行结束时这个方法也执行结束了,如果在返回的Future对象上调用isDone()方法,那么所有的调用将返回true值。


在执行器中延时执行任务

想让任务在过一段时间后才被执行或者任务能够被周期性执行,就得用上ScheduleThreadPoolExecutor类。
创建Task类

1
2
3
4
5
6
7
8
9
10
11
12
public class Task implements Callable<String>{
private String name;
public Task(String name){
this.name = name;
}

@Override
public String call() throws Exception{
System.out.printf("%s: Starting at : %sn", name, new Date());
return "Hello World";
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Main{
public static void main(String[] args){
ScheduledExecutorService executor = (ScheduledExecutorService)Executors.newScheduledThreadPool(1);
System.out.printf("Main: Starting at: %sn", new Date());
for(int i = 0; i < 5; i++){
Task task = new Task("Task " + i);
executor.schedule(task, i + 1, TimeUnit.SECONDS);
}
executor.shutdown();
try{
executor.awaitTermination(1, TimeUnit.DAYS);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.printf("Core: Ends at : %sn", new Date());
}
}

ScheduleThreadPoolExecutor执行器,schedule()方法接收如下参数:

  • 即将执行的任务
  • 任务执行前所要等待的时间
  • 等待时间的单位,由TimeUnit类的一个常量来指定

也可以使用Runnable接口来实现任务,因为ScheduledThreadPoolExecutor类的schedule()方法可以同时接受这两种类型的任务。
Java推荐仅在开发定时任务程序时采用ScheduleThreadPoolExecutor类。
在调用shutdown()时,可以配置ScheduleThreadPoolExecutor的行为,默认是不论执行器是否结束,待处理的任务仍将被执行。
可以童工setExecuteExistingDelayedTasksAfterShutdownPolicy()来改变这个行为,传递false,执行shutdown后,待处理的任务将不会被执行。


在执行器中周期性执行任务

通过ScheduleThreadPoolExecutor类来执行周期性的任务。

1
2
3
4
5
6
7
8
9
10
public class Task implements Runnable{
private String name;
public Task(String name){
this.name = name;
}
@Override
public void run(){
System.out.printf("%s: Executed at: %s\n", new Date());
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Main{
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
System.out.printf("Main: Starting at: %s\n", new Date());

Task task = new Task("Task");
ScheduledFuture<?> result = executor.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);

for(int i = 0; i < 10; i++){
System.out.printf("Main: Delay: %d\n", result.getDelay(TimeUnit.MILLISECONDS));
try{
TimeUnit.MILLISECONDS.sleep(500);
}catch(InterruptedException e){
e.printStackTrace();
}
}

executor.shutdown();
System.out.printf("Main: No More tasks at: %s\n", new Date());
try{
TimeUnit.SECONDS.sleep(5);
}catch(InterruptedException e){
e.printStackTrace();
}

System.out.printf("Main: Finished at: %s\n", new Date());
}

scheduleAtFixedRate()方法发送任务,这个方法接收4个参数

  • 被周期性执行的任务
  • 任务第一次执行后的延时时间
  • 两次执行的时间周期
  • 第2个和第3个参数的时间单位
    两次执行之间的周期是指任务在两次执行开始时的时间间隔。
    如果有一个周期性的任务需要5秒钟,但是却让它每3秒执行一次,那么,在任务执行的过程中将会有两个任务实例同时存在。

ScheduleThreadPoolExecutor类还提供了其他方法来安排周期性任务的运行。比如,scheduleWithFixedRate()方法。这个方法与scheduledAtFixedRate()方法具有相同的参数,但是略有一些不同需要引起注意。在scheduleAtFixedRate()中,第3个参数表示任务两次执行开始时间的间隔,而在scheduledWithFixedDelay()方法中,第3个参数则是表示任务上一次执行结束的时间与任务下一次开始执行的时间间隔。

也可以配置ScheduleThreadPoolExecutor实心shutdown()方法的行为,默认行为是当调用shutdown()方法后,定时任务就结束了。可以通过ScheduleThreadPoolExecutor类的setContinueExistingPeriodicTasksAfterShutdownPolicy()方法来改变这个行为,传递参数true给这个方法,这样shutdown之后,周期性任务仍将继续执行。



在执行器中取消任务

当线程不再需要时就销毁,Futurn接口的cancel()方法来执行取消操作。
创建Task类

1
2
3
4
5
6
7
8
9
public class Task implements Callable<String>{
@Override
public String call() throws Exception{
while(true){
System.out.printf("Task: Test\n");
Thread.sleep(100);
}
}
}

创建主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Main{
public static void main(String[] args){
ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();

Task task = new Task();

System.out.printf("Main: Executing the Task\n");

Future<String> result = executor.submit(task);

try{
TimeUnit.SECONDS.sleep(2);
}catch(InterruptedException e){
e.printStackTrace();
}

System.out.printf("Main: Cancelling the Task\n");
result.cancel(true);

System.out.printf("Main: Cancelled: %s\n", result.isCancelled());
System.out.printf("Main: Done: %s\n", result.isDone());

executor.shutdown();
System.out.printf("Main: The executor has finished\n");
}
}

如果想取消一个已经发送给执行器的任务,可以使用Future接口的cancel()方法。
根据调用cancel()方法时所传递的参数以及任务的状态,这个方法的行为有些不同。

  • 如果任务已经完成,或者之前已经被取消,或者由于某种原因而不能被取消,那么方法将返回false并且任务也不能取消。
  • 如果任务在执行器中等待分配Thread对象来执行它,那么任务呗取消,并且不会开始执行。如果任务已经在运行,那么它依赖于调用cancel()方法时所传递的参数。如果传递的参数是true并且任务正在运行,那么任务将被取消。如果传递的参数为false并且任务正在运行,那么任务不会被取消。



在执行器中控制任务的完成

FutureTask类提供了一个名为done()的方法,允许在执行器中的任务执行结束之后,还可以执行一些代码。
这个方法可以被用来执行一些后期处理操作,比如:产生报表,通过邮件发送结果或释放一些系统资源。
当任务执行完成受FutureTask类控制时,这个方法在内部被FutureTask类调用。在任务结果设置好后以及任务的状态已经变为isDone之后,无论任务是否被取消或者正常结束,done()方法才被调用。
默认情况下,done()方法的实现为空,即没有任何具体的代码实现。我们可以覆盖FutureTask类并实现done()方法来改变这种行为。
创建ExecutableTask类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ExecutableTask implements Callable<String>{
private String name;
public ExecutableTask(String name){
this.name = name;
}

@Override
public String call() throws Exception{
try{
Long duration = (long)(Math.random()*10);
System.out.printf("%s: Waiting %d seconds for results.n",this.name, duration);
TimeUnit.SECONDS.sleep(duration);
}catch(InterruptedException e){

}
return "Hello world. I'm " + name;
}

public String getName(){
return name;
}
}

创建ResultTask类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ResultTask extends FutureTask<String>{
private String name;
public ResultTask(Callable<String> callable){
super(callable);
this.name = ((ExecutableTask) callable).getName();
}

@Override
protected void done(){
if(isCancelled()){
System.out.printf("%s: Has been cancelled\n",name);
}else{
System.out.printf("%s: Has finished\n",name);
}
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Main{
public static void main(String[] args){
ExecutorService executor = (ExecutorService)Executors.newCachedThreadPool();
ResultTask resultTasks[] = new ResultTask[5];
for(int i =0 ;i < 5; i++){
ExecutableTask executableTask = new ExecutableTask("Task " + i);
resultTasks[i] = new ResultTask(executableTask);
executor.submit(resultTasks[i]);
}

try{
TimeUnit.SECONDS.sleep(5);
}catch(InterruptedException e){
e.printStackTrace();
}

for(int i = 0; i < resultTasks.length; i++){
resultTasks[i].cancel(true);
}

for(int i = 0; i < resultTasks.length; i++){
try{
if(!resultTasks[i].isCancelled()){
System.out.printf("%s\n",resultTasks[i].get());
}
}catch(InterruptedException | ExecutionException e){
e.printStackTrace();
}
}

executor.shutdown();
}
}

当任务执行结束时,FutureTask类就会调用done()方法。
在创建好返回值以及改变任务 状态为isDone之后,FutureTask类就会在内部调用done()方法。
虽然我们无法改变任务的结果值,也无法改变任务的状态,但是可以通过任务来关闭系统资源、输出日志信息、发送通知等。



在执行器中分离任务的启动与结果的处理

如果需要在一个对象里发送任务给执行器,然后在另一个对象里处理结果。对于这种情况,Java并发APi提供了CompletionService类。
CompletionService类有一个方法用来发送任务给执行器,还有一个方法为下一个已经执行结束的任务获取Future对象。
从内部实现的机制来看,CompletionService类使用Executor对象来执行任务。这个行为的优势是可以共享CompletionService对象,并发送任务到执行器,然后其他的对象可以处理任务的结果。
第二个方法有一个不足之处,它只能为已经执行结束的任务获取Future对象,因此,这些Future对象只能被用来获取任务的结果。
创建ReportGenerator类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ReportGenerator implements Callable<String>{
private String sender;
private String title;
public ReportGenerator(String sender, String title){
this.sender = sender;
this.title = title;
}

@Override
public String call() throws Exception{
try{
Long duration = (long) (Math.random() * 10);
System.out.printf("%s_%s: ReportGenerator: Generating a report during %d seconds\n",this.sender, this.title, duration);
TimeUnit.SECONDS.sleep(duration);
}catch(InterruptedException e){
e.printStackTrace();
}
String ret = sender + ": " + title;
return ret;
}
}

创建ReportProcessor类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ReportProcessor implements Runnable{
private CompletionService<String> service;
private boolean end;
public ReportProcessor(CompletionService<String> service){
this.service = service;
end = false;
}

@Override
public void run(){
while(!end){
try{
Future<String> result = service.poll(20, TimeUnit.SECONDS);
if(result){
String report = result.get();
System.out.printf("ReportReceiver: Report Recived: %s\n", report);
}
}catch(InterruptedException e){
e.printStackTrace();
}catch(ExecutionException e){
e.printStackTrace();
}
}
}

public void setEnd(boolean end){
this.end = end;
}
}

创建ReportRequest类,模拟请求获取报告

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ReportRequest implements Runnable{
private String name;
private CompletionService<String> service;

public ReportRequest(String name, CompletionService<String> service){
this.name = name;
this.service = service;
}

@Override
public void run(){
ReportGenerator reportGenerator = new ReportGenerator(name, "Report");
service.submit(reportGenerator);
}
}

创建主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static void main(String[] args){
ExecutorService executor = (ExecutorService)Executors.newCachedThreadPool();
CompletionService<String> service = new ExecutorCompletionService<>(executor);

ReportRequest faceRequest = new ReportRequest("Face", service);
ReportRequest onlineRequest = new ReportRequest("Online", service);
Thread faceThread = new Thread(faceRequest);
Thread onlineThread = new Thread(onlineRequest);

ReportProcessor processor = new ReportProcessor(service);
Thread senderThread = new Thread(processor);

System.out.printf("Main: Starting the Threads\n");
faceThread.start();
onlineThread.start();
senderThread.start();

try{
System.out.printf("Main: Waiting for the report generators.\n");
faceThread.join();
onlineThread.join();
}catch(InterruptedException e){
e.printStackTrace();
}

System.out.printf("Main: Shuting down the executor.\n");
executor.shutdown();
try{
executor.awaitTermination(1, TimeUnit.DAYS);
}catch(InterruptedException e){
e.printStackTrace();
}

processor.setEnd(true);
System.out.printf("Main: Ends\n");
}

CompletionService类可以执行Callable或Runnable类型的任务,由于Runnable对象不能产生结果,因此CompletionService的基本原则不适用与此。
CompletionSe类提供了其他两种方法来获取任务已经完成的Future对象。这些方法如下:

  • poll() 无参数的poll()方法用于检查队列中是否有Future对象。如果队列为空,则立即返回null。否则,它将返回队列中的第一个元素,并移除这个元素。
  • take()这个方法也没有参数,它检查队列中是否有Future对象。如果队列为空,它将阻塞线程直到队列中有可用的元素。如果队列中有元素,它将返回队列中的第一个元素,并移除这个元素。



处理在执行器中被拒绝的任务

当我们想结束执行器的执行时,调用shutdown()方法来表示执行器应当结束。
但是,执行器只有等待正在运行的任务或者等待执行的任务结束后,才能真正结束。
如果在shutdown()方法与执行器结束之间发送一个任务给执行器,这个任务会被拒绝,因为这个时间段执行器已不再接受任务了。ThreadPoolExecutor类提供了一套机制,当任务呗拒绝时调用这套机制来处理它们。
创建RejectedTaskController类

1
2
3
4
5
6
7
8
9
public class RejectedTaskController implements RejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor){
System.out.printf("RejectedTaskController: The task %s has been rejected\n",r.toString());
System.out.printf("RejectedTaskController: %s\n",executor.toString());
System.out.printf("RejectedTaskController: Terminating: %s\n",executor.isTerminating());
System.out.printf("RejectedTaksController: Terminated: %s\n",executor.isTerminated());
}
}

创建Task类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Task implements Runnable{
private String name;
public Task(String name){
this.name = name;
}

@Override
public void run(){
System.out.printf("Task %s : Starting\n", name);
try{
Long duration = (long)(Math.random()* 10);
System.out.printf("Task %s: ReportGenerator: Generating a report during %d seconds\n",name , duration);
TimeUnit.SECONDS.sleep(duration);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.printf("Task %s: Ending\n", name);
}

public String toString(){
return name;
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Main{
public static void main(String[] args){
RejectedTaskController controller = new RejectedTaskController();
ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
executor.setRejectedExecutionHandler(controller);

System.out.printf("Main: Starting.\n") ;
for(int i = 0; i < 3; i++){
Task task = new Task("Task" + i);
executor.submit(task);
}

System.out.printf("Main: Shuting down the Executor.\n");
executor.shutdown();

System.out.printf("Main: Sending another Task\n");
Task task = new Task("RejectedTask");
executor.submit(task);

System.out.printf("Main: End.\n");

}
}

为了处理在执行器中被拒绝的任务,需要创建一个实现RejectedExceptionHandler接口的处理类。这个接口有一个rejectedExecution()方法,其中有两个参数:

  • 一个Runnable对象,用来存储被拒绝的任务;
  • 一个Executor对象,用来存储任务被拒绝的执行器。
    被执行器拒绝的每一个任务都将调用这个方法。需要先调用Executor类的setRejectedExecutionHandle()方法来设置用于被拒绝的任务的处理程序。
    当执行器接收一个任务并开始执行时,它先检查shutdown()方法是否已经被调用了。如果是,那么执行器就拒绝这个任务。首先,执行器会寻找通过setRejectedExecutionHandler()方法设置的用于被拒绝的任务的处理程序,如果找到一个处理程序,执行器就调用其rejectedExecution()方法:否则就抛出RejectedExecutionExeception异常。这是一个运行时异常,因此并不需要catch语句来对其进行处理。

Java多线程 0x03

知识点

  • 资源的并发访问控制
  • 资源的多副本的并发访问控制
  • 等待多个并发事件的完成
  • 在集合点的同步
  • 并发阶段任务的运行
  • 并发阶段任务中的阶段切换
  • 并发任务间的数据交换


介绍几个更高级的同步机制

  • 信号量(Semaphore):是一个计数器,用来保护一个或者多个共享资源的访问。
  • CountDownLatch:Java提供的同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许线程一直等待。
  • CyclicBarrier:Java同步辅助类,它允许多个线程在某个集合点(common point)处进行互相等待。
  • Phaser:Java同步辅助类,它把任务分成多个阶段运行,在开始下一个阶段之前,当前阶段内的所有线程都必须执行完成,这是JDK7中的新特性。
  • Exchanger:Java同步辅助类,它提供两个线程之间的数据交换点。


二进制信号量(Binary Semaphore)

信号量是一个计数器,用来保护一个或者多个共享资源的访问。
线程访问一个共享资源,必须先获得信号量。若信号量大于0,信号量减1,然后允许访问这个共享资源;否则,信号量已经是0,此时会把线程置入休眠等待信号量大于0。
线程访问完一个共享资源后,信号量必须释放,即将信号量加1。
二进制信号量(Binary Semaphore)是一种特殊的信号量,用来保护对唯一共享资源的访问。
创建PrintQueue类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class PrintQueue{
private final Semaphore semaphore;

public PrintQueue(){
semaphore = new Semaphore(1); //创建二进制信号量
}

public void printJob(Object document){
try{
semaphore.acquire();

Long duration = (long)(Math.random() * 10);
System.out.printf("%s: PrintQueue: Printing a Job during %d seconds\n", Thread.currentThread().getName(),duration);
TimeUnit.SECONDS.sleep(duration);
}catch(InterruptedException e){
e.printStackTrace();
}finally{
semaphore.release();
}
}
}

创建Job类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Job implements Runnable{
private PrintQueue printQueue;

public Job(PrintQueue printQueue){
this.printQueue = printQueue;
}

@Override
public void run(){
System.out.printf("%s: Going to print a job\n",Thread.currentThread().getName());
printQueue.printJob(new Object());
System.out.printf("%s: The document has been printed\n", Thread.currentThread().getName());
}
}

主类

1
2
3
4
5
6
7
8
9
10
public class Main{
public static void main(String[] args){
PrintQueue printQueue = new PrintQueue();

Thread thread[] = new Thread[10];
for(int i = 0; i < 10; i++){
thread[i] = new Thread(new Job(printQueue),"Thread " + i);
}

}

获取信号量的方法acquire(),释放信号量的方法release()。
Semaphore类还有其他两种acquire()方法。

  • acquireUninterruptibly():当信号量为0时,线程会被阻塞,可能会被中断,从而导致acquire()方法抛出InterruptedException异常。而acquireUninterruptibly()方法会忽略线程的中断并且不会抛出任何异常。
  • tryAcquire():这个方法试图获取信号量,从而避开线程的阻塞和等待。
    信号量的公平性,默认都是非公平性的,Semaphore类的构造函数也提供了boolean类型的模式选择。



信号量(Semaphore)

信号量可以用来保护一个资源的多个副本,或者被多个线程同时执行的临界区。
创建PrintQueue类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class PrintQueue{
private Semaphore semaphore;

private boolean freePrinters[];

private Lock lockPrinters;

public PrintQueue(){
semaphore = new Semaphore(3);
freePrinters = new boolean[3];
for(int i = 0; i < 3; i++){
freePrinters[i] = true;
}
lockPrinters = new ReentrantLock();
}

public void printJob(Object document){
try{
semaphore.acquire();
int assignedPrinter = getPrinter();
Long duration = (long)(Math.random()*10);
System.out.printf("%s: PrintQueue: Printing a Job in Printer %d during %d seconds\n",Thread.currentThread().getName(),assignedPrinter,duration);
TimeUnit.SECONDS.sleep(duration);

freePrinters[assignedPrinter] = true;
}catch(InterruptedException e){
e.printStackTrace();
}finally{
semaphore.release();
}
}

private int getPrinter(){
int ret = -1;

try{
lockPrinters.lock();
for(int i = 0; i < freePrinters.length; i++){
if(freePrinters[i]){
ret = i;
freePrinters[i] = false;
break;
}
}
}catch(Exception e){
e.printStackTrace();
}finally{
lockPrinters.unlock();
}
}
}

创建Job类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Job implements Runnable{
private PrintQueue printQueue;

public Job(PrintQueue printQueue){
this.printQueue = printQueue;
}

@Override
public void run(){
System.out.printf("%s: Going to print a job\n",Thread.currentThread().getName());
printQueue.printJob(new Object());
System.out.printf("%s: The document has been printed\n",Thread.currentThread().getName());
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
public class Main{
public static void main(String[] args){
PrintQueue printQueue = new PrintQueue();
Thread thread[] = new Thread[12];
for(int i = 0; i < 12; i++){
thread[i] = new Thread(new Job(printQueue), "Thread " + i);
}
for(int i = 0;i < 12; i++){
thread[i].start();
}
}
}



等待多个并发事件的完成

Java并发API提供了CountDownLatch类,它是一个同步辅助类。
在完成一组正在其他线程中执行的操作之前,它允许线程一直等待。
这个类使用一个整数进行初始化,这个整数就是线程要等待完成的操作的数目。
当一个线程要等待某些操作先执行完时,需要调用await()方法,这个方法让线程进入休眠直到等待的所有操作都完成。
当某一个操作完成后,它将调用countDown()方法将CountDownLatch类的内部计数器减1。
当计数器变成0的时候,CountDownLatch类将唤醒所有调用await()方法而进入休眠的线程。

创建视频会议类Videoconference

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Videoconference implements Runnable{
private final CountDownLatch controller;

public Videoconference(int number){
controller = new CountDownLatch(number);
}

public void arrive(String name){
System.out.printf("%s has arrived.\n",name);
controller.countDown();
System.out.printf("VideoConference: Waiting for %d participants.\n", controller.getCount());
}

@Override
public void run(){
System.out.printf("VideoConference: Initialization: %d participants.\n", controller.getCount());
try{
controller.await();
System.out.printf("VideoConference: All the participants have come\n");
System.out.printf("VideoConference: Let's start...\n");
}catch(InterruptedException e){
e.printStackTrace();
}
}
}

创建与会者类Participant

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Participant implements Runnable{
private Videoconference conference;
private String name;

public Participant(Videoconference conference,String name){
this.conference = conference;
this.name = name;
}

@Override
public void run(){
Long duration = (long)(Math.random()*10);
try{
TimeUnit.SECONDS.sleep(duration);
}catch(InterruptedException e){
e.printStackTrace();
}
conference.arrive(name);
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Main{
public static void main(String[] args){
Videoconference conference = new Videoconference(10);
Thread threadConference = new Thread(conference);
threadConference.start();

for(int i = 0; i < 10; i++){
Participant p = new Participant (conference, "Participant " + i);
Thread t = new Thread(p);
t.start();
}
}
}

CountDownLatch类有三个基本元素

  • 一个初始值,即定义必须等待的先行完成的操作的数目。
  • await()方法,需要等待其他事件完成的线程调用。
  • countDown()方法,每个被等待的事件在完成的时候调用。
    创建CountDownLatch对象时,使用构造其来初始化内部计数器。当countDown()方法被调用后,计数器将减1。当计数器到达0的时候,CountDownLatch对象将唤起所有在await()方法上等待的线程。
    CountDownLatch对象的内部计数器被初始化之后就不能被再次初始化或者修改。一旦计数器被初始化后,唯一能改变参数值的方法是countDown()方法。当计数器到达0时,所有因调用await()方法而等待的线程立即被唤醒,再执行countDown将不起任何作用。
    和其他同步方法相比,CountDownLatch机制有下述不同:
  • CountDownLatch机制不是用来保护共享资源或者临界区的,它是同步执行多个任务的一个或者多个线程。
  • CountDownLatch只准许进入一次。即一旦CountDownLatch的内部计数器到达0,再调用这个方法将不起作用。
    CountDownLatch类提供了另一种await()方法,即await(long time, TimeUnit unit)。这个方法被调用后,线程将休眠直到被中断,或者CountDownLatch的内部计数器达到0,或者指定的时间已经过期。
    第二个参数是TimeUnit类型,TimeUnit类是以下常量的枚举值:DAYS\HOURS\MICROSECONDS\MILLISECONDS\MINUTES\NANOSECONDS\SECONDS。



在集合点的同步

Java并发API提供了CyclicBarrier类,它也是一个同步辅助类。
它允许两个或多个线程在某个点上进行同步。
这个类和CountDownLatch类类似,但也有不同之处,使之成为更强大的类。
CyclicBarrier类有一个很有意义的改进,可以传入另一个Runnable对象作为初始化参数。
当所有线程都到达集合点后,CyclicBarrier将这个Runnable对象作为线程执行。
这个特性使得这个类在并行任务上可以媲美分治编程技术(Divide and Conquer Programming Technique)。
创建MatrixMock类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class MatrixMock{
private int data[][]; //二维数组data

//矩阵的行数 每行的长度 寻找的数字
public MatrixMock(int size, int length, int number){
int counter = 0;
data = new int[size][length];
Random random = new Random();
for(int i = 0 ;i < size; i++){
for(int j = 0; j < length; j++){
data[i][j] = random.nexInt(10);
if(data[i][j] == number){
counter++;
}
}
}
System.out.printf("Mock: There are %d ocurrences of number in generated data.\n", counter, number);


public int[] getRow(int row){
if((row >= 0) && (row < data.length)){
return data[row];
}
return null;
}
}

结果类Results

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Results{
private int data[];

public Results(int size){
data = new int[size];
}

public void setData(int position,int value){
data[position] = value;
}

public int[] getData(){
return data;
}
}

查找类Searcher类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
pubilc class Searcher implements Runnable{
private int firstRow;
private int lastRow;
private MatrixMock mock;
private Results results;
private int number;
private final CyclicBarrier barrier;

public Search(int firstRow,int lastRow,MatrixMock mock,Results results, int number,CyclicBarrier barrier){
this.firstRow = firstRow;
this.lastRow = lastRow;
this.mock = mock;
this.results = results;
this.number = number;
this.barrier = barrier;
}

@Override
public void run(){
int counter;
System.out.printf("%s: Processing lines from %d to %d.\n", Thread.currentThread().getName(),firstRow, lastRow);
for(int i = firstRow; i < lastRow; i++){
int row[] = mock.getRow(i);
counter = 0;
for(int j = 0; j < row.length; j++){
if(row[j] == number){
counter++;
}
}
results.setData(i, counter);
}
System.out.printf("%s: Lines processed.\n",Thread.currentThread().getName());

try{
barrier.await();
}catch(InterruptedException e){
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
}
}

创建Grouper类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Grouper implements Runnable{
private Results results;

public Grouper(Results results){
this.results = results;
}

@Override
public void run(){
int finalResult = 0;
System.out.println("Grouper: Processing results...\n");
int data[] = results.getData();
for(int number : data){
finalResult += number;
}
System.out.printf("Grouper: Total result: %d.\n",finalResult);
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Main{
public static void main(String[] args){
final int ROWS = 10000;
final int NUMBERS = 1000;
final int SEARCH = 5;
final int PARTICIPANTS = 5;
final int LINES_PARTICIPANT = 2000;
MatrixMock mock = new MatrixMock(ROWS, NUMBERS, SEARCH);
Results results = new Results(ROWS);
Grouper grouper = new Grouper(results);
CyclicBarrier barrier = new CyclicBarrier(PARTICIPANTS, grouper);
Searcher searchers[] = new Searcher[PARTICIPANTS];
for(int i = 0; i < PARTICIPANTS; i++){
searchers[i] = new Searcher(i*LINES_PARTICIPANT, (i*LINES_PARTICIPANT)+LINES_PARTICIPANT, mock, results,5, barrier);
Thread thread = new Thread(searchers[i]);
thread.start();
}
System.out.printf("Main: The main thread has finished.\n");
}
}

将矩阵分离成5个子集,并且在每个子集中使用线程进行查找,这些线程是查找类Searcher对象。
使用CyclicBarrier对象同步5个线程,执行Grouper查找任务处理结果,并且计算最终的结果。
CyclicBarrier类有一个内部计数器,可以控制指定数目的几个线程必须都到达集合点。
每一个线程到达集合点后就会调用await()方法通知CyclicBarrier对象,CyclicBarrier对象会让这个线程休眠直到其他所有的线程都到达集合点。
当所有线程都到达集合点之后,CyclicBarrier对象就唤醒所有的await()方法里等待的线程,同时,还可以以构造器传入的Runnable对象(Grouper对象)创建一个新的线程,以执行其他任务。
CyclicBarrier类还提供了getNumberWaiting()方法和getParties()方法,前者将返回在await()上阻塞的线程的数目,后者返回被CyclicBarrier对象同步的任务数。
CyclicBarrier类和CountDownLatch类不同,它可以重置回初始状态。
CyclicBarrier类提供了reset()方法完成的。当重置发生后,在await()方法中等待的线程将收到一个BrokenBarrierException异常。
CyclicBarrier对象有一种特殊的状态即损坏状态(Broken)。当很多线程在await()方法上等待的时候,如果其中一个线程被中断,这个线程将抛出InterruptedException异常,其他的等待线程将抛出BrokenBarrierException异常,于是CyclicBarrier对象就处于损坏状态了。
CyclicBarrier类提供了isBroken()方法,如果处于损坏状态就返回true,否则返回false。


并发阶段任务的运行

Java API提供了一个更复杂、更强大的同步辅助类,Phaser,它允许执行并发多阶段任务。
当有并发任务并且需要分解成几步执行时,这种机制非常适用。
Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步。
跟其他同步工具一样,必须对Phaser类中参与同步操作的任务数进行初始化,不同的是,可以动态地增加或者减少任务数。
使用Phaser类同步三个并发任务,这三个任务将在三个不同的文件夹及其子文件夹中查找过去24小时内修改过扩展名的.log的文件。
这3个任务分成以下三个步骤:

  • 在指定的文件夹及其子文件夹中获得扩展名为.log的文件
  • 对第一步的结果进行过滤,删除修改时间超过24小时的文件
  • 将结果打印到控制台
    在第一步和第二步结束的时候,都会检查所查找的结果列表是不是有元素存在。如果结果列表是空的 ,对应的线程将结束执行,并从phaser中删除。
    创建FileSearch类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    public class FileSearch implements Runnable{
    private String initPath;
    private String end;
    private List<String> results;
    private Phaser phaser;

    public FileSearch(String initPath, String end, Phaser phaser){
    this.initPath = initPath;
    this.end = end;
    this.phaser = phaser;
    results = new ArrayList<>();
    }

    @Override
    public void run(){
    phaser.arriveAndAwaitAdvance();

    System.out.printf("%s: Starting.\n", Thread.currentThread().getName());

    File file = new File(initPath);
    if(file.isDirectory()){
    directoryProcess(file);
    }

    if(!checkResults()){
    return;
    }

    filterResults();

    if(!checkResults()){
    return;
    }

    showInfo();
    phaser.arriveAndDeregister();
    System.out.printf("%s: Work completed.\n",Thread.currentThread().getName());
    }

    private void showInfo(){
    for(int i = 0; i < results.size(); i++){
    File file = new File(results.get(i));
    System.out.printf("%s: %s\n", Thread.currentThread().getName(), file.getAbsolutePath());
    }
    phaser.arriveAndAwaitAdvance();
    }

    //检查结果列表的长度
    private boolean checkResults(){
    if(results.isEmpty()){
    System.out.printf("%s: Phase %d: 0 results.\n", Thread.currentThread().getName(), phaser.getPhase());
    System.out.printf("%s: Phase %d: End.\n", Thread.currentThread().getName(), phaser.getPhase());
    phaser.arriveAndDeregister();//调用Phaser对象当前线程已经结束这个阶段
    return false;
    }else{
    System.out.printf("%s: Phase %d: %d results.\n", Thread.currentThread().getName(), phaser.getPhase(), results.size());
    phaser.arriveAndAwaitAdvance();
    return true;
    }
    }

    //对第一阶段查找到的文件列表进行过滤,将不是过去24小时修改的文件删除
    private void filterResults(){
    List<String> newResults = new ArrayList<>();
    long actualDate = new Date().getTime();
    for(int i = 0; i < results.size(); i++){
    File file = new File(results.get(i));
    long fileDate = file.lastModified();
    if(actualDate - fileDate < TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS)){
    newResults.add(results.get(i));
    }
    }
    results = newResults;
    }

    //对每个文件夹递归调用
    private void directoryProcess(File file){
    File list[] = file.listFiles();
    if(list != null){
    for(int i = 0; i < list.length; i++){
    if(list[i].isDirectory()){
    directoryProcess(list[i]);
    }else{
    fileProcess(list[i]);
    }
    }
    }
    }

    private void fileProcess(File file){
    if(file.getName().endsWith(end)){
    results.add(file.getAbsolutePath());
    }
    }
    }

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Main{
public static void main(String[] args){
Phaser phaser = new Phaser(3);

FileSearch system = new FileSearch("C:\\XXX", "log", phaser);
FileSearch apps = new FileSearch("C:\\XXXX", "log", phaser);
FileSearch documents = new FileSearch("C:\\XXXXX","log", phaser);

Thread systemThread = new Thread(system, "System");
systemThread.start();

Thread appsThread = new Thread(apps, "Apps");
appsThread.start();

Thread documentsThread = new Thread(documents, "Documents");
documentsThread.start();

try{
systemThread.join();
appsThread.join();
documentsThread.join();
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.printf("Terminated: %s\n", phaser.isTerminated());
}
}

开始时候创建Phaser对象,用于在每个阶段结束时对线程同步进行控制。Phaser构造器传入了参与阶段同步的线程的个数。
这个数字通知Phaser在唤醒所有休眠线程以进行下一个阶段之前,必须执行arriveAndAwaitAdvance()方法的线程数。
在run()方法开头调用arriveAndAwaitAdvanced这个方法可以保障在所有线程创建好之前没有线程开始执行任务。(同一起跑线)
在第一阶段和第二阶段结束的时候,检查这个阶段中是不是生成了结果集以及结果集中是不是有元素。
在第一阶段,checkResults()方法里调用arriveAndAwaitAdvance()方法。
在第二阶段,如果结果集是空的,对应线程没有理由继续执行,所以返回;但必须通知phaser对象参与同步的线程少了一个,调用arriverAndDeregister()方法。
在第三阶段结束的时候,在showInfo()方法中调用了phaser对下你给的arriveAndAwaitAdvance()方法,通过这个调用,确保三个线程都已完成。
当showInfo()方法执行完成之后,还调用了phaser对象的arriveAndDeregiester()方法,通过这个调用,撤销phaser中线程的注册。
一个Phaser对象有两种状态

  • 活跃态(Active):当存在参与同步的线程时,Phaser就是活跃态,并且在每个阶段结束的时候进行同步。
  • 终止态(Termination):当所有参与同步的线程都取消注册的时候,Phaser就处于终止状态,在这种状态下,Phaser没有任何参与者。
    当Phaser对象的onAdvance()方法返回true的时候,Phaser对象就处于终止态。
    当Phaser是终止态是,同步方法arriveAndAwaitAdvance()会立即返回,而且不会做任何同步的操作。
    Phaser类的一个重大特性就是不必对它的方法进行异常处理。不像其他的同步辅助类,被Phaser类置于休眠的线程不会影响中断事件,也不会抛出InterruptException异常。


    Phaser类提供了其他改变Phaser对象的方法
  • arrive():这个方法通知phaser对象一个参与者已经完成了当前阶段,但是它不应该等待其他参与者都完成当前阶段。必须小心使用这个方法,因为它不会与其他线程同步。
  • awaitAdvance(int phaser):如果传入的阶段参数与当前阶段一直,这个方法会将当前线程置于休眠,直到这个阶段所有参与者都运行完成。如果传入的阶段参数与当前阶段不一致,这个方法将立即返回。
  • awaitAdvanceInterruptibly(int phaser):这个方法根awaitAdvance(int phase)一样,不同之处是如果在这个方法中休眠的线程被中断,它将抛出InterruptedException异常。


    创建一个Phaser对象时,需要指出有多少个参与者。Phaser类提供两种方法增加注册者的数量。
  • register():这个方法将一个新的参与者注册到Phaser中,这个新的参与者将被当成没有执行完本阶段的线程。
  • bulkRegister(int Parties):这个方法将指定数目的参与者注册到Phaser中,所有这些新的参与者都将被当成没有执行完本阶段的线程。


    Phaser类提供了forceTermination()方法来强制phaser进入终止态,这个方法不管phaser中是否存在注册的参与线程。当一个参与线程产生错误时,强制phaser终止时很有意义的。
    当一个phaser处于终止态时,awaitAdvance()和arriveAndAwaitAdvance()方法立即返回一个负数,而不再是一个正值了。如果知道phaser可能会被终止,就需要验证这些方法的返回值,以确定phaser是不是被终止了。



并发阶段任务中的阶段切换

Phaser类提供了onAdvance()方法,它在phaser阶段改变的时候会被自动执行。
onAdvance()方法需要两个int型的传入参数:当前的阶段数以及注册的参与者数量。
它返回的是boolean值,true表示已经完成执行并进入终止态,false表示phaser还在继续执行。
创建MyPhaser类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class MyPhaser extends Phaser{
@Override
protected boolean onAdvance(int phase, int registeredParties){
switch(phase){
case 0:
return studentsArrived();

case 1:
return finishFirstExercise();

case 2:
return finishSecondExercise();

case 3:
return finishExam();

default:
return true;
}
}

private boolean studentsArrived(){
System.out.printf("Phaser: The exam are going to start. The students are ready.\n");
System.out.printf("Phaser: We have %d students.\n",getRegisteredParties());
return false;
}

private boolean finishFirstExercise() {
System.out.printf("Phaser: All the students has finished the first exercise.\n");
System.out.printf("Phaser: It's turn for the second one.\n");
return false;
}

private boolean finishSecondExercise() {
System.out.printf("Phaser: All the students has finished the second exercise.\n");
System.out.printf("Phaser: It's turn for the third one.\n");
return false;
}

private boolean finishExam() {
System.out.printf("Phaser: All the students has finished the exam.\n");
System.out.printf("Phaser: Thank you for your time.\n");
return true;
}

}

创建Student类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class Student implements Runnable{
private Phaser phaser;

public Student(Phaser phaser){
this.phaser = phaser;
}

public void run(){
System.out.printf("%s: Has arrived to do the exam. %s\n",Thread.currentThread().getName(),new Date());
phaser.arriveAndAwaitAdvance();
System.out.printf("%s: Is going to do the first exercise. %s\n",Thread.currentThread().getName(),new Date());
doExercise1();

System.out.printf("%s: Has done the first exercise. %s\n",Thread.currentThread().getName(),new Date());
phaser.arriveAndAwaitAdvance();
System.out.printf("%s: Is going to do the second exercise. %s\n",Thread.currentThread().getName(),new Date());
doExercise2();

System.out.printf("%s: Has done the second exercise. %s\n",Thread.currentThread().getName(),new Date());
phaser.arriveAndAwaitAdvance();
System.out.printf("%s: Is going to do the third exercise. %s\n",Thread.currentThread().getName(),new Date());
doExercise3();
System.out.printf("%s: Has finished the exam. %s\n",Thread.currentThread().getName(),new Date());
phaser.arriveAndAwaitAdvance();
}

private void doExercise1() {
try {
Long duration=(long)(Math.random()*10);
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private void doExercise2() {
try {
Long duration=(long)(Math.random()*10);
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private void doExercise3() {
try {
Long duration=(long)(Math.random()*10);
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Main {

public static void main(String[] args) {
MyPhaser phaser=new MyPhaser();

Student students[]=new Student[5];
for (int i=0; i<students.length; i++){
students[i]=new Student(phaser);
phaser.register();
}

Thread threads[]=new Thread[students.length];
for (int i=0; i<students.length; i++) {
threads[i]=new Thread(students[i],"Student "+i);
threads[i].start();
}

for (int i=0; i<threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

System.out.printf("Main: The phaser has finished: %s.\n",phaser.isTerminated());

}

}

模拟了有三道试题的考试过程。所有的学生做完第一道题才开始做第二道。
phaser对象进行阶段切换的时候,在所有arriveAndAwaitAdvance()方法里休眠的线程被唤醒之前,onAdvance()方法将被自动调用。



并发任务间的数据交换

Java并发API提供了Exchanger来实心并发任务之间交换数据。
Exchanger类允许在两个线程之间定义同步点(Synchronization Point)。当两个线程都到达同步点时,它们交换数据结构,因此第一个线程的数据结构进入到第二个线程中,同时第二个线程的数据结构进入到第一个线程中。
Exchanger类在生产者——消费者中很有用,但Exchanger只能同步两个线程,即一个线程生产者,一个线程消费者。
创建Producer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Producer implements Runnable{
private List<String> buffer;
private final Exchanger<List<String>> exchanger;
public Producer(List<String> buffer, Exchanger<List<String>> exchanger){
this.buffer = buffer;
this.exchanger = exchanger;
}


@Override
public void run(){
int cycle = 1;
for(int i = 0 ; i < 10 ; i++){
System.out.printf("Producer: Cycle %d\n", cycle);

for(int j = 0; j < 10; j++){
String message = "Event " + ((i*10) + j);
System.out.printf("Producer: %s\n", message);
buffer.add(message);
}

try{
buffer = exchanger.exchange(buffer);
}catch(InterruptedException e){
e.printStackTrace();
}

System.out.printf("Producer: %d\n", buffer.size());
cycle++;
}

}
}

创建Consumer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Consumer implements Runnable{
private List<String> buffer;
private final Exchanger<List<String>> exchanger;

public Consumer(List<String> buffer, Exchanger<List<String>> exchanger){
this.buffer = buffer;
this.exchanger = exchanger;
}

@Override
public void run(){
int cycle = 1;
for(int i = 0; i < 10; i++){
System.out.printf("Consumer: Cycle %d\n", cycle);
try{
buffer = exchanger.exchange(buffer);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.printf("Consumer: %d\n", buffer.size());

for(int j = 0; j < 10; j++){
String message = buffer.get(0);
System.out.printf("Consumer: %s\n", message);
buffer.remove(0);
}

cycle++;
}
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Main{
public static void main(String[] args){
List<String> buffer1 = new ArrayList();
List<String> buffer2 = new ArrayList();

Exchanger<List<String>> exchanger = new Exchanger<>();

Producer producer = new Producer(buffer1, exchanger);
Consumer consumer = new Consumer(buffer2, exchanger);

Thread threadProducer = new Thread(producer);
Thread threadConsumer = new Thread(consumer);

threadProducer.start();
threadConsumer.start();
}
}

消费者先创建一个空的缓存列表,然后通过调用Exchanger与生产者同步来获得可以消费的数据。生产者从一个空的缓存列表开始执行,它创建了10个字符串,然后存储在这个缓存中,并且使用exchanger队形与消费者同步。

Exchanger类还提供了另外的exchanger方法,即exchange(V data, long time, TimeUnit unit)方法。
V指要交换的数据结构
time指定等待的time值
TimeUnit可以是DAYS\HOURS\MICROSECONDS\MILLISECONDS\MINUTES\NANOSECONDS\SECONDS

Java多线程 0x02

知识点

  • 使用synchronized实现同步方法
  • 使用非依赖属性实现同步
  • 在同步代码块中使用条件
  • 使用锁实现同步
  • 使用读写锁同步数据访问
  • 修改锁的公平性
  • 在锁中使用多条件

临界区(Critical Section)是一个用以访问共享资源的代码块,这个代码块在同一时间内只允许一个线程执行。

Java提供了两种基本的同步机制

  • synchronized关键字机制
  • Lock接口及其实现机制


使用synchronized实现同步方法

如果一个对象已用synchronized关键字声明,那么只有一个执行线程被允许访问它。每一个用synchronized关键字声明的静态方法,同时只能被一个执行线程访问,但是其他线程可以访问这个对象的非静态方法。
创建Account类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Account {
private double balance;

public double getBalance() {
return balance;
}

public void setBalance(double balance) {
this.balance = balance;
}

public void addAmount(double amount) {
double tmp=balance;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
tmp+=amount;
balance=tmp;
}

public void subtractAmount(double amount) {
double tmp=balance;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
tmp-=amount;
balance=tmp;
}
}

创建Bank类

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Bank implements Runnable {
private Account account;

public Bank(Account account) {
this.account=account;
}

public void run() {
for (int i=0; i<100; i++){
account.subtractAmount(1000);
}
}
}

创建Company类

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Company implements Runnable {
private Account account;

public Company(Account account) {
this.account=account;
}

public void run() {
for (int i=0; i<100; i++){
account.addAmount(1000);
}
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Main {
public static void main(String[] args) {
Account account=new Account();
account.setBalance(1000);
Company company=new Company(account);
Thread companyThread=new Thread(company);
Bank bank=new Bank(account);
Thread bankThread=new Thread(bank);

System.out.printf("Account : Initial Balance: %f\n",account.getBalance());

companyThread.start();
bankThread.start();

try {
companyThread.join();
bankThread.join();
System.out.printf("Account : Final Balance: %f\n",account.getBalance());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

以上代码演示了错误的场景
下面是改进的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Account {

private double balance;
public double getBalance() {
return balance;
}

public void setBalance(double balance) {
this.balance = balance;
}

public synchronized void addAmount(double amount) {
double tmp=balance;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
tmp+=amount;
balance=tmp;
}


public synchronized void subtractAmount(double amount) {
double tmp=balance;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
tmp-=amount;
balance=tmp;
}

}

synchronized关键字机制避免了这类错误的方法。
一个对象的方法采用synchronized进行声明,只能被一个线程访问。如果线程A正在执行一个同步方法syncMethodA(),线程B要执行这个对象的其他同步方法syncMethodB(),线程B将被阻塞直达线程A访问完。但如果线程B访问的是同一个类的不同对象,那么两个线程都不会被阻塞。
synchronized关键字降低了应用的性能,因此只能在并发情景中需要修改共享数据的方法上使用它。
可以递归调用被synchronized声明的方法。当线程访问一个对象的同步方法时,它还可以用这个对象的其他的同步方法,也包含正在执行的方法,而不必再次去获取这个方法的访问权。
可以通过synchronized关键字来保护代码块(而不是整个方法)的访问。应该利用synchronized关键字:方法的其余部分保持在synchronized代码块之外,以获取更好的性能。临界区(即同一时间只能被一个线程访问的代码块)的访问应该尽可能的短。
通常来说,我们使用this关键字来引用正在执行的方法所属的对象。


使用非依赖属性实现同步

当使用synchronized来同步代码时,必须把对象引用作为参数传入。 通常情况下,使用this关键字来引用执行方法所属的对象。
下面模拟一个场景,有两个屏幕和两个售票处的电影院。
创建一个电影院类Cinema

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class Cinema{
private long vacanciesCinmea1;
private long vacanciesCinmea2;

private final Object controlCinema1, controlCinema2; //两个对象属性

public Cinema(){
controlCinema1 = new Object();
controlCinema2 = new Object();
vacanciesCinema1 = 20;
vacanciesCinema2 = 20;
}

//第一个电影院卖票,使用controlCinema1对象控制同步代码块
public boolean sellTicket1(int number){
synchronized(controlCineam1){
if(number < vacanciesCinema1){
vacancieCinema1 -= number;
return true;
}else{
return false;
}
}
}

public boolean sellTicket2(int number){
synchronized(controlCinema2){
if(number < vacanciesCinema2){
vacanciesCinema2 -= number;
return true;
}else{
return false;
}
}
}

//第一个电影院退票,使用controlCinema1控制同步代码块
public boolean returnTickets1(int number){
synchronized(controlCinema1){
vacanciesCinema1 += number;
return true;
}
}

public boolean returnTicket2(int number){
synchronized(controlCinema2){
vacanciesCinema2 += number;
return true;
}
}

public long getVacanciesCinema1(){
return vacanciesCinema1;
}

public long getVacanciesCinema2(){
return vacanciesCinema2;
}
}

创建售票处类TicketOffice

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TicketOffice1 implements Runnable{
private Cinema cinema;

public TicketOffice1(Cinema cinema){
this.cinema = cinema;
}

@Override
public void run(){
cinema.sellTickets1(3);
cinema.sellTickets1(2);
cinema.sellTickets1(1);
cinema.returnTicket1(3);
cinema.sellTickets1(5);
cinema.sellTickets2(2);
cinema.sellTickets2(2);
cinema.sellTickets2(2);
}
}



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TicketOffice2 implements Runnable{
private Cinema cinema;

public TicketOffice2(Cinema cinema){
this.cinema = cinema;
}

@Override
public void run(){
cinema.sellTickets2(2);
cinema.sellTickets2(4);
cinema.sellTickets1(2);
cinema.sellTickets1(1);
cinema.returnTickets2(2);
cinema.sellTicket1(3);
cinema.sellTicket2(2);
cinema.sellTicket1(2);
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Main{
public static void main(String[] args){
Cinema cinema = new Cinema();

TicketOffice1 ticketOffice1 = new TicketOffice1(cinema);
Thread thread1 = new Thread(ticketOffice1, "TicketOffice1");

TicketOffice2 ticketOffice2 = new TicketOffice2(cinema);
Thread thread2 = new Thread(ticketOffice2, "TicketOffice2");

thread1.start();
thread2.start();

try{
thread1.join();
thread2.join();
}catch(InterruptedException e){
e.printStackTrace();
}

System.out.printf("Room 1 Vacancies: %d\n", cinema.getVacanciesCinema1());
System.out.printf("Room 2 Vacancies: %d\n", cinema.getVacanciesCinema2());
}
}

用synchronized同步代码块,JVM保证同一时间只有一个线程能够访问这个对象的代码保护块(对象,不是类)。
上面这个例子,使用一个controlCinema来控制对vacanciesCinema属性的访问,所以同一时刻只有一个线程能够修改这个属性。vacanciesCinema1和vacanciesCinema2有分别的control对象,所以允许同时运行两个线程,一个修改vacanciesCinema1,另一个修改vacanciesCinema2。


在同步代码中使用条件

在并发编程中一个典型的问题是生产者-消费者问题。
Java在Object类中提供了wait()、notify()和notifyAll()方法。 线程可以在同步代码块中调用wait()方法。如果在同步代码块外调用wait(),JVM将抛出IllegalMonitorStateException异常。
当一个线程调用wait()方法后,JVM将这个线程置入休眠,并释放控制这个同步代码块的对象,同时允许其他线程执行这个对象控制的其他同步代码块。 为了唤醒这个线程,必须在这个对象控制的某个同步代码块中调用notify()或者notifyAll()方法。
下面结合例子学习下生产者-消费者问题。
创建EventStorage类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class EventStorage{
private int maxSize;
private List<Data> storage;

public EventStorage(){
maxSize = 10;
storage = new LinkedList<>();
}

public synchronized void set(){
while(storage.size() == maxSize){
try{
wait();
}catch(InterruptedException e){
e.printStackTrace();
}
}
storage.add(new Date());
System.out.printf("Set: %d", storage.size());
notify();
}

public synchronized void get(){
while(storage.size() == 0{
try{
wait();
}catch(InterruptedException e){
e.printStackTrace();
}
}
System.out.printf("Get : %d: %s", storage.size(), ((LinkedList<?>) storage).poll());
notify();
}
}

创建生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Producer implements Runnable{
private EventStorage storage;

public Producer(EventStorage storage){
this.storage = storage;
}

@Override
public void run(){
for(int i = 0; i < 100; i++){
storage.set();
}
}
}

创建消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Consumer implements Runnable{
private EventStorage storage;

public Consumer(EventStorage storage){
this.storage = storage;
}

@Override
public void run(){
for(int i = 0; i < 100; i++){
storage.get();
}
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Main{
public static void main(String[] args){
EventStorage storage = new EventStorage();

Producer producer = new Producer(storage);
Thread thread1 = new Thread(producer);

Consumer consumer = new Consumer(storage);
Thread thread2 = new Thread(consumer);

thread2.start();
thread1.start();
}
}

有了wait()和notify()机制,两个线程之间就有了通信。
其次,当其他线程调用notifyAll()方法时,挂起的线程将被唤醒并且再次检查条件。但notifyAll()并不保证哪个线程会被唤醒。



使用锁实现同步

Java提供了同步代码块的另一种机制,它比synchronized更强大也更灵活。
这种机制基于Lock接口及其实现类(例如ReentrantLock),提供了更多的好处。

  • 支持更灵活的同步代码块结构。使用synchronized只能在同一个synchronized块结构中获取和释放控制。Lock接口允许实现更复杂的临界区结构
  • 相比synchronized关键字,Lock接口提供了更多的功能。其中一个新功能是tryLock()方法的实现。这个方法试图获取锁,如果锁一杯其他线程获取,它将返回false并继续往下执行代码。使用锁的tryLock()方法, 通过返回值将得知是否有其他线程正在使用这个锁保护的代码块。
  • Lock接口允许分离读和写操作,允许多个读线程和只有一个写线程。
  • 相比synchronized,Lock接口具有更好的性能。
    好了,开始学习下ReentrantLock——Lock接口的实现类。
    创建PrintQueue类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class PrintQueue{
    private final Lock queueLock = new ReentrantLock();

    public void printJob(Object document){
    queueLock.lock();

    try{
    Long duration = (long) (Math.random() * 10000);
    System.out.printf("%s : PrintQueue: Printing a Job during %d seconds\n", Thread.currentThread().getName(), (duration/1000));
    }catch(InterruptedException e){
    e.printStackTrace();
    }finally{
    queueLock.unlock();
    }
    }
    }

创建Job类并实现Runnable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Job implements Runnable{
private PrintQueue printQueue;

public Job(PrintQueue printQueue){
this.printQueue = printQueue;
}

@Override
public void run(){
System.out.printf("%s: Going to print a document\n",Thread.currentThread().getName());
printQueue.printJob(new Object());
System.out.printf("%s: The document has been printed\n",Thread.currentThread().getName());
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Main(){
public static void main(String[] args){
PrintQueue printQueue = new PrintQueue();

Thread thread[] = new Thread[10];
for(int i = 0; i < 10; i++){
thread[i] = new Thread(new Job(printQueue), "Thread " + i);
}

for(int i = 0; i < 10; i++){
thread[i].start();
}
}
}

在这个临界区的开始,必须通过lock()方法获取对锁的控制。当线程A访问这个方法时,如果没有其他线程获取这个锁的控制,lock()方法将让线程A获取锁并且允许它立即执行临界区代码。否则,如果其他线程B正在执行这个锁保护的临界区代码,lock()方法将让线程A休眠直到线程B执行完临界区的代码。
在线程离开临界区的时候,必须使用unlock()方法来释放它持有的锁,以让其他线程来访问临界区。如果离开临界区的时候没有调用unlock()方法,其他线程将永久地等待,从而导致死锁(Deadlock)。
如果临界区使用了try-catch块,不要忘记将unlock()放入finally里。
Lock接口还提供了额另一个方法来获取锁,即tryLock(),跟lock()方法最大的不同是:线程使用tryLock()不能获取锁,tryLock()会立即返回,它不会讲线程置入休眠。tryLock()方法返回一个布尔值,true表示线程获取了锁,false表示没有获取锁。
ReentrantLock类也允许使用递归调用。如果一个线程获取了锁并且进行了递归调用,它将继续持有这个锁,因此调用lock()方法后也将立即返回,并且线程将继续执行递归调用。


使用读写锁实现同步数据访问

锁机制最大的改进之一就是ReadWriteLock接口和它的唯一实现类ReentrantReadWriteLock。
这个类有两个锁,一个是读操作锁,另一是写操作锁。
使用读操作锁可以允许多个线程同时访问,但是写操作锁只允许一个线程进行。
创建一PricesInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class PricesInfo{
private double price1;
private double price2;

private ReadWriteLock lock;

public PricesInfo(){
price1 = 1.0;
price2 = 2.0;
lock = new ReentrantReadWriteLock();
}

public double getPrice1(){
lock.readLock().lock();
double value = price1;
lock.readLock().unlock();
return value;
}

public double getPrice2(){
lock.readLock().lock();
double value = price2;
lock.readLock().unlock();
return value;
}

public void setPrices(double price1, double price2){
lock.writeLock().lock();
this.price1 = price1;
this.price2 = price2;
lock.writeLock().unlock();
}
}

创建Reader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Reader implements Runnable{
private PricesInfo pricesInfo;

public Reader(PricesInfo pricesInfo){
this.pricesInfo = pricesInfo;
}

@Override
public void run(){
for(int i = 0 ; i < 10 ; i++){
System.out.printf("%s: Price 1: %f\n",Thread.currentThread().getName(),pricesInfo.getPrice1());
System.out.printf("%s: Price 2: %f\n",Thread.currentThread().getName(),pricesInfo.getPrice2());
}
}
}

创建Writer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Writer implements Runnable{
private PricesInfo pricesInfo;

public Writer(PricesInfo pricesInfo){
this.pricesInfo = pricesInfo;
}

@Override
public void run(){
for(int i = 0 ; i < 3 ; i++){
System.out.printf("Writer: Attempt to modify the prices.\n");
pricesInfo.setPrices(Math.random()*10, Math.random()*8);
System.out.printf("Writer: Prices have been modified.\n");
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Main{
public static void main(String[] args){
PricesInfo pricesInfo = new PricesInfo();

Reader readers[] = new Reader[5];
Thread threadsReader[] = new Thread[5];

for(int i = 0; i < 5; i++){
readers[i] = new Reader(pricesInfo);
threadsReader[i] = new Thread(readers[i]);
}

Writer writer = new Writer(pricesInfo);
Thread threadWriter = new Thread(writer);

for(int i = 0 ;i < 5; i++){
threadsReader[i].start();
}
threadWriter.start();
}
}



修改锁的公平性

ReentrantLock和ReentrantReadWriteLock类的构造器都含有一个布尔参数fair,它允许控制这两个类的行为。
默认fair是false,它称为非公平模式(Non-Fair Mode)。
在非公平模式下,当有很多线程在等待锁时,锁将选择它们中的一个来访问临界区,这个选择是灭有任何约束的。
fair是true,公平模式(Fair Mode),当有很多线程在等待锁时,锁将选择它们中的一个来访问临界区,而且选择的是等待时间最长的。
这两种模式只适合于lock()和unlock方法。而Lock接口的tryLcok()方法没有将线程置于休眠,fair属性并不影响这个方法。

在锁中使用多条件(Multiple Condition)

一个锁可能关联一个或者多个条件,这些条件通过Condition接口声明。目的是允许线程获取锁并且查看等待的某一个条件是否满足,如果不满足就挂起直到某个线程唤醒它们。
Condition接口提供了挂起线程和唤起线程的机制。
还是以生产者消费者为例
创建FileMock类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class FileMock{
private String content[] ;
private int index;

public FileMock(int size, int length){
content = new String[size];
for(int i = 0 ; i< size; i++){
StringBuilder buffer = new StringBuilder(length);
for(int j = 0; j < length; j++){
int indict = (int)Math.random()*255;
buffer.append((char) indice);
}
content[i] = buffer.toString();
}
}

public boolean hasMoreLines(){
return index < content.length;
}

public String getLine(){
if(this.hasMoreLines()){
System.out.println("Mock: " + (content.length-index));
return content[index++];
}
return null;
}
}

实现数据缓冲类Buffer,它将被生产者和消费者共享。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class Buffer{
//存放共享数据
private LinkedList<String> buffer;

private int maxSize;

private ReentrantLock lock;

private Condition lines;
private Condition space;

//缓冲区是否还有数据
private boolean pendingLines;

public Buffer(int maxSize){
this.maxSize = maxSize;
buffer = new LinkedList();
lock = new ReentrantLock();
lines = lock.newCondition();
space = lock.newCondition();
pendingLines = true;
}

public void insert(String line){
lock.lock();
try{
//缓冲区是否还有空位
while(buffer.size() == maxSize){
space.await(); //当space的signal()或singalAll()时会被唤醒
}
buffer.offer(line);
System.out.printf("%s: Inserted Line: %d\n", Thread.currentThread()
.getName(), buffer.size());
lines.signalAll(); //唤醒等待缓冲区中有数据的线程
}catch(InterruptedException e){
e.printStackTrace();
}finally{
lock.unlock();
}
}

public String get(){
String line = null;
lock.lock();
try{
while((buffer.size() == 0) && (hasPendingLines())){
lines.await(); //缓存区是不是有数据行
}
if(hasPendingLines()){
line = buffer.poll();
System.out.printf("%s: Line Readed: %d\n",Thread.currentThread().getName(),buffer.size());
space.signalAll();
}
}catch(InterruptedException e){
e.printStackTrace();
}finally{
lock.unlock();
}
return line;
}

public void setPendingLines(boolean pendingLines){
this.pendingLines = pendingLines;
}

public boolean hasPendingLines(){
return pendingLines || buffer.size() > 0;
}
}

消费者类Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Consumer implements Runnable{
private Buffer buffer;

public Consumer(Buffer buffer){
this.buffer = buffer;
}

@Override
public void run(){
while(buffer.hasPendingLines()){
String line = buffer.get();
processLine(line);
}
}

private void processLine(String line){
try{
Random random = new Random();
Thread.sleep(random.nextInt(100));
}catch(InterruptedException e){
e.printStackTrace();
}
}
}

生成者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Producer implements Runnable{
private FileMock mock;
private Buffer buffer;

public Producer(FileMock mock, Buffer buffer){
this.mock = mock;
this.buffer = buffer;
}

@Override
public void run(){
buffer.setPendingLines(true);
while(mock.hasMoreLines()){
String line = mock.getLine();
buffer.insert(line);
}
buffer.setPendingLines(false);
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Main{
public static void main(String[] args){
FileMock mock = new FileMock(101, 10);

Buffer buffer = new Buffer(20);

Producer producer = new Producer(mock, buffer);
Thread threadProducer = new Thread(producer, "Producer");

Consumer consumers[] = new Consumer[3];
Thread threadConsumers[] = new Thread[3];

for(int i = 0; i < 3; i++){
consumers[i] = new Consumer(buffer);
threadConsumers[i] = new Thread(consummers[i], "Consumser " + i);
}

threadProducer.start();
for(int i = 0; i < 3; i++){
threadConsumers[i].start();
}
}
}

与锁绑定的所有条件对象都是通过Lock接口声明的newCondition()方法创建的。
在使用条件的时候,必须获取这个条件绑定的锁。
当线程调用条件的await()方法时,它将自动释放这个条件绑定的锁,其他某个线程才可以获取这个锁并且执行相同的操作,或者执行这个锁保护的另一个临界区代码。
当一个线程调用了条件对象的signal()或者signalAll()方法后,一个或者多个在该条件上挂起的线程将被唤醒,但这并不能保证让它们挂起的条件已经满足,所以必须在while循环中调用await(),在条件成立之前不能离开这个循环,如果条件不成立,将再次调用await()。
因调用await()方法的线程可能会被中断,所以必须处理InterruptException异常。


Condition接口还提供了await()方法的其他形式
await(long time, TimeUnit unit):直到发生以下情况之一前,线程将一直处于休眠状态

  • 其他某个线程中断当前线程
  • 其他某个线程调用了将当前线程挂起的条件的signal()或signalAll()方法。
  • 指定的等待时间已经过去。
  • 通过TimeUnit类的常量DAYS HOURS MiCROSECONDS MILLISECONDS MINUTES ANOSECONDS SECONDS指定的等待时间已经过去


    awaitUniterruptibly():它是不能中断的。这个线程将休眠知道其他某个线程调用了将它挂起的条件的signal()或signalAll()方法。
    awaitUnitl(Date date):直到发生以下情况之一之前,线程将一直处于休眠状态
  • 其他某个线程中断当前线程
  • 其他某个线程调用了将它挂起的条件的signal()或signalAll()
  • 指定的最后期限到了

也可以将条件与读写锁ReadLock和WriteLock一起用。

Java多线程 0x01

知识点

  • 线程的创建和运行
  • 线程信息的获取和设置
  • 线程的中断
  • 线程中断的控制
  • 线程的休眠和恢复
  • 等待线程的终止
  • 守护线程的创建和运行
  • 线程中不可控异常的处理
  • 线程局部变量的使用
  • 线程的分组
  • 线程组中不可控异常的处理
  • 使用工厂类创建线程



不得不讲的并发与并行

所有的并发处理都有排队等候,唤醒,执行至少三个这样的步骤.所以并发肯定是宏观概念,在微观上他们都是序列被处理的,只不过资源不会在某一个上被阻塞(一般是通过时间片轮转),所以在宏观上看多个几乎同时到达的请求同时在被处理。
并发的实质是一个物理CPU(也可以多个物理CPU) 在若干道程序之间多路复用,并发性是对有限物理资源强制行使多用户共享以提高效率。
并行性指两个或两个以上事件或活动在同一时刻发生。在多道程序环境下,并行性使多个程序同一时刻可在不同CPU上同时执行。



线程的创建和运行

Java提供了两种方式来创建线程:
1)继承Thread类,并覆盖run()方法
2)创建一个是实现Runnable接口的类。
下面以第二种方式创建10个简单的线程,每个线程完成计算和打印乘以1-10后的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Calculator implements Runnable{
private int number;
public Calculator(int number){
this.number = number;
}

@Override
public void run(){
for(int i = 1; i <= 10; i++){
System.out.printf("%s: %d * %d = %d\n", Thread.currentThread().getName(), number, i, i*number);
}
}
}

接着编写一个主类,进行测试。

1
2
3
4
5
6
7
8
9
public class Main{
public static void main(String[] args){
for(int i = 1; i <= 10; i++){
Calculator calculator = new Calculator(i);
Thread thread = new Thread(calculator);
thread.start();
}
}
}

简单说明下,对一个实现了Runnable接口的类来说,创建Thread对象并不会创建一个新的执行线程,同样,调用它的run()方法,也不会创建一个新的执行线程。只有当Thread调用start()方法时,才会创建一个新的执行线程来调用run()方法。
当一个程序的所有线程都运行完成时,更明确的说,当所有非守护线程都运行完成的时候,这个Java程序将宣告结束。
如果main线程结束了,而其余的线程仍将继续执行它们的任务,直到运行结束。但如果某一个线程调用了System.exit()指令来结束程序的执行,则所有的线程都将结束。


线程信息的获取和设置

Thread类有一些保存信息的属性,这些属性可以用来标识线程,如显示线程的状态、控制线程的优先级。
ID:保存了线程的唯一标识。
Name:保存了线程的名称。
Priority:保存了线程的优先级。从1到10,由低优先级到高优先级。
Status:保存了线程的状态。在Java中,有6种状态,new \ runnable \ blocked \ waiting \ timewaiting \ terminated。
下面我们来打印下线程的这些标识。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Calculator implements Runnable{
private int number;

public Calculator(int number){
this.number = number;
}

@Override
public void run(){
for(int i = 1; i <= 10; i++){
System.out.println("%s: %d * %d = %d\n", Thread.currentThread().getName(), number, i, i*number);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class Main{
public static void main(String[] args){
System.out.println("Minimum Priority:%s\n", Thread.MIN_PRIORITY);
System.out.println("Normal Priority:%s\n", Thread.NORM_PRIORITY);
System.out.println("Maximum Priority:%s\n", Thread.MAX_PRIORITY);

Thread threads[] = new Thread{10];
Thread.State status[] = new Thread.State[10];

for(int i = 0; i < 10; i++){
threads[i] = new Thread(new Calculator(i));
if((i%2) == 0){
threads[i].setPriority(Thread.MAX_PRIORITY);
}else{
threads[i].setPriority(Thread.MIN_PRIORITY);
}
threads[i].setName("Thread " +i);
}
//创建PrintWriter对象,用来把线程的状态演变写入到文件中
//try(){}catch{} 这个是Jdk7的新语法,可以看到这样代码简洁很多,不用在finally{}里关闭流。
try( FileWriter file = new FileWriter(".\\data\\log.txt");
PrintWriter pw = new PrintWriter(file);){

for(int i = 0; i < 10; i++){
pw.println("Main: Status of Thread " + i + " : " + threads[i].getState());
status[i] = threads[i].getState();
}

for(int i = 0 ; i < 10; i++){
threads[i].start();
}

boolean finish = false;
while(!finish){
for(int i = 0; i < 10; i++){
if(threads[i].getState() != status[i]){
writeThreadInfo(pw, threads[i], status[i]);
status[i] = threads[i].getState();
}
}

finish = true;

for(int i = 0 ; i < 10; i++){
finish = finish && (threads[i].getState() == State.TERMINATED);
}
}
}catch (IOException e){
e.printStackTrace();
}
}

private static void writeThreadInfo(PrintWriter pw, Thread thread, State state){
pw.printf("Main : Id %d - %s\n", thread.getId(), thread.getName());
pw.printf("Main : Priority: %d\n", thread.getPriority());
pw.printf("Main : Old state: %s\n", state);
pw.printf("Main : New state: %s\n", thread.getState());
pw.printf("Main : ************************************\n");
}
}

这样每个线程的状态演变都记录在log.txt里。
Thread类的属性存储了线程的所有信息,JVM使用线程的Priority来决定某一时刻由哪个线程来使用CPU,并根据线程的情景为它们设置实际的状态。
如果Thread类没有取名字,JVM会自动分配一个名字给它,如Thread-XX。
Thread的ID和状态是只读的,不能自己set。另外,setPriority()方法的值必须是1~10之内,超过这个访问会报IllegalArgumentException异常。



线程的中断

前面提到过单个线程结束并不会使进程结束,只有当所有的线程都结束了,这个进程才会结束。中途结束进程可以调用System.exit(),那么中途结束线程呢?Java也提供了中断机制。这种机制要求线程检查它是否被中断了,然后决定是不是影响这个中断请求。换句话说,线程是允许忽略中断请求并继续运行的。

接下来,还是以代码来说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class PrimeGenerator extends Thread{
@Override
public void run(){
long number = 1L;
while(true){
if(isPrime(number)){
System.out.println("Number %d is Prime\n", number);
}
if(isInterrupted()){
System.out.println("The Prime Generator has been Interrupted\n");
return;
}
number++;
}
}

/**
*是否是质数
*/

private boolean isPrime(long number){
if(number <= 2){
return true;
}
for(long i = 2; i < number; i++){
if((number % i )==0){
return false;
}
}
return true;
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Main{
public static void main(String[] args){
Thread task = new PrimeGenerator();
task.start();

try{
TimeUnit.SECONDS.sleep(5);
}catch(InterruptedException e){
e.printStackTrace();
}
task.interrupt();
}
}

Thread类有一个表明线程被中断与否的属性,它存放的是布尔值。线程的interrupt()方法被调用时,这个属性就会被设置为true。isInterrupted()方法只是返回这个属性的值。



线程中断的控制

已经学会中断线程,也学会了在线程对象中去控制这个中断。在实际编码中,如果线程实现了复杂的算法并且分布在几个方法中,或者线程里有递归调用等,我们就得使用一个更好的机制来控制线程的中断。Java提供了InterruptedException异常。当检查到线程中断时,就抛出这个异常,然后在run()中捕获并处理这个异常。
主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Main{
public static void main(String[] args){
FileSearch searcher = new FileSearch("C:\\", "autoexec.bat");
Thread thread = new Thread(searcher);
thread.start();
try{
TimeUnit.SECONDS.sleep(10);
}catch(InterruptedException e){
e.printStackTrace();
}

thread.interrupt();
}
}

这样不管递归调用多少次,只要抛出InterruptedException,就结束线程,释放资源。



线程的休眠和恢复

线程的休眠可以通过调用sleep()方法来实现。sleep()方法接受整型数值作为参数,以表明线程挂起执行的毫秒数。
sleep()方法的另一种使用方式是通过TimeUnit枚举类进行调用。这个方法也使用Thread类的sleep()方法来使当前线程休眠,但是它接受的参数单位是秒,最终会被转化成毫秒。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class FileClock implements Runnable{
@Override
public void run(){
for(int i = 0; i < 10; i++){
System.out.println("%s\n", new Date());
try{
TimeUnit.SECONDS.sleep(1);
}catch(InterruptedException e){
System.out.println("The FileClock has been interrupted");
}
}
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Main{
public static void main(String[] args){
FileClock clock = FileClock();
Thread thread = new Thread(clock);

thread.start();

try{
TimeUnit.SECONDS.sleep(5);
}catch(InterruptedException e){
e.printStackTrace();
}

thread.interrupt();
}
}

如果休眠中线程被中断,该方法就会立即抛出InterruptedException异常,而不需要等待到线程休眠时间结束



等待线程的终止

有时候,我们必须等待某个线程的终止,这个时候可以使用Thread类的join()方法。当一个线程对象的join()方法被调用时,调用它的线程将被挂起,直到这个线程对象完成它的任务。

1
2
3
4
5
6
7
8
9
10
11
12
public class DataSourcesLoader implements Runnable{    
@override
public void run(){
System.out.println("Begining data sources loading: %s\n", new Date());
try{
TimeUnit.SECONDS.sleep(4);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.println("Data sources loading has finished: %s\n", new Date());
}
}

1
2
3
4
5
6
7
8
9
10
11
12
public class NetworkConnectionsLoader implements Runnable{
@Override
public void run(){
System.out.println("Begining network connections loading:%s\n", new Date());
try{
TimeUnit.SECONDS.sleep(6);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.println("Network connections loading has finished: %s\n", new Date());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Main{
public static void main(String[] args){
DataSourcesLoader dsLoader = new DataSourcesLoader();
Thread thread1 = new Thread(dsLoader, "DataSourceThread");
thread1.start();

NetworkConnectionsLoader ncLoader = new NetworkConnectionsLoader();
Thread thread2 = new Thread(ncLoader, "NetworkConnectionLoader");
thread2.start();

try{
thread1.join();
thread2.join();
}catch(InterruptedException e){
e.printStackTrace();
}

System.out.println("Main: Configuration has been loader: %s\n", new Date());
}
}

运行发现,只有当DataSourcesLoader线程运行结束,NetworkConnectionsLoader线程也运行结束的时候,主线程对象才会继续运行并且打印出最终的信息。
另外,java还提供了另外两个形式的join()方法:join(long milliseconds) join(long milliseconds, long nanos)
这两种形式的join返回比join()方法多一个条件,就是指定的时间到时,线程也将继续执行。



守护线程的创建和运行

Java里有一个特殊的线程叫做守护(Daemon)线程。这种线程的优先级很低,通常当一个程序中没有其他线程运行的时候它才运行。当守护线程是程序中唯一运行的线程时,它的结束也就意味着整个程序的结束。
因为这种特性,守护线程一般用来作为其他普通线程的服务提供者,通常都是无限循环的,以等待服务请求或者执行线程任务。一个典型的守护线程就是Java的垃圾回收器(Garbage Collector)。
下面一个例子:创建两个线程,一个用户线程,它将事件写入到一个队列中;另一个是守护线程,它将管理这个队列,如果生成的事件超过10秒钟,就会被移除。


创建Event类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Event {
private Date date;
private String event;

public Date getDate() {
return date;
}


public void setDate(Date date) {
this.date = date;
}

public String getEvent() {
return event;
}

public void setEvent(String event) {
this.event = event;
}
}

创建WriteTask类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class WriterTask implements Runnable {

Deque<Event> deque; //事件队列

public WriterTask (Deque<Event> deque){
this.deque=deque;
}

@Override
public void run() {
// Writes 100 events
for (int i=1; i<100; i++) {
//每次循环中都会创建一个新的Event对象,并放入队列
Event event=new Event();
event.setDate(new Date());
event.setEvent(String.format("The thread %s has generated an event",Thread.currentThread().getId()));
deque.addFirst(event);
try {
//休眠一秒
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

创建CleanerTask类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class CleanerTask extends Thread {
private Deque<Event> deque;

public CleanerTask(Deque<Event> deque) {
this.deque = deque;
//设置为守护线程
setDaemon(true);
}

@Override
public void run() {
while (true) {
Date date = new Date();
clean(date);
}
}


private void clean(Date date) {
long difference;
boolean delete;

if (deque.size()==0) {
return;
}

delete=false;
do {
Event e = deque.getLast();
difference = date.getTime() - e.getDate().getTime();
if (difference > 10000) {
System.out.printf("Cleaner: %s\n",e.getEvent());
deque.removeLast();
delete=true;
}
} while (difference > 10000);
if (delete){
System.out.printf("Cleaner: Size of the queue: %d\n",deque.size());
}
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Main {
public static void main(String[] args) {

Deque<Event> deque=new ArrayDeque<Event>();
WriterTask writer=new WriterTask(deque);
for (int i=0; i<3; i++){
Thread thread=new Thread(writer);
thread.start();
}

CleanerTask cleaner=new CleanerTask(deque);
cleaner.start();

}
}

对程序的分析发现,队列中的对象不断增长直到30个,然后到程序结束,队列的长度维持在27~30之间。
这3个WriteTask,每个线程写入一个事件,然后休眠1秒钟。在第一个10秒钟内,队列中有30个事件,直到3个WriteTask都休眠后,CleanerTask才开始执行,但由于事件都小于10秒并未删除任何事件。在接下来的运行中,CleanerTask每秒删除3个对象,同时WriterTask会写入3个对象,所以队列的长度一直介于27~30之间。
setDaemon()只能在start()之前才能被调用,一旦线程开始运行,将不能再修改守护状态。
isDaemon()用来检查一个线程是不是守护线程。



线程中不可控异常的处理

在java中有两种异常。

  • 非运行时异常。 这种异常必须在方法声明的throws语句指定,或者在方法体内捕获。例如IOException和ClassNotFoundException。
  • 运行时异常。 例如:NumberFormatException。
    因为run()方法不支持throws语句,run()中抛出非运行异常是,必须捕获并且处理它。
    创建处理运行时异常的类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public class ExceptionHandler implements UncaughtExceptionHandler {
    @Override
    public void uncaughtException(Thread t, Throwable e) {
    System.out.printf("An exception has been captured\n");
    System.out.printf("Thread: %s\n",t.getId());
    System.out.printf("Exception: %s: %s\n",e.getClass().getName(),e.getMessage());
    System.out.printf("Stack Trace: \n");
    e.printStackTrace(System.out);
    System.out.printf("Thread status: %s\n",t.getState());
    }
    }

抛出异常的线程类

1
2
3
4
5
6
public class Task implements Runnable {
@Override
public void run() {
int numero=Integer.parseInt("TTT");
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Main {
public static void main(String[] args) {
Task task=new Task();
Thread thread=new Thread(task);
thread.setUncaughtExceptionHandler(new ExceptionHandler());
thread.start();

try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.printf("Thread has finished\n");

}
}

当一个线程抛出异常并且没有被捕获时(这种情况只可能是运行时异常),JVM检查这个线程是否被预置了未捕获异常处理器。如果找到,JVM将调用线程对象的这个方法,并将线程对象和异常作为传入参数。
如果线程没有被预置未捕获异常处理器,JVM将打印堆栈记录到控制台,并退出程序。
Thread类还有另一个静态方法setDefaultUncaughtExceptionHandler()。这个方法在应用程序中为所有的线程对象创建了一个异常处理器。
当线程抛出一个未捕获的异常时,JVM将为异常寻找以下3种可能的处理器。

  • 查找线程对象的未捕获异常处理器
  • (如果上面找不到)JVM将继续查找线程对象所在的线程组(ThreadGroup)的未捕获异常处理器
  • (如果上面也没找到)JVM将继续查找默认的未捕获异常处理器。
    如果没有一个处理器存在,JVM打印记录,然后退出。



    线程局部变量的使用

    共享数据是并发最核心的问题之一。
    Java并发API提供了一个干净的机制,即线程局部变量(Thread-Local Variable)。
    创建线程共享类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public class UnsafeTask implements Runnable{
    private Date startDate;

    @Override
    public void run() {
    startDate=new Date();
    System.out.printf("Starting Thread: %s : %s\n",Thread.currentThread().getId(),startDate);
    try {
    TimeUnit.SECONDS.sleep((int)Math.rint(Math.random()*10));
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.printf("Thread Finished: %s : %s\n",Thread.currentThread().getId(),startDate);
    }
    }

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Main {
public static void main(String[] args) {
UnsafeTask task=new UnsafeTask();

for (int i=0; i<3; i++){
Thread thread=new Thread(task);
thread.start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

这个每个线程有一个不同的开始时间,但当它们结束时,都有相同的startDate属性值。
用线程局部变量机制解决这个问题。
创建SafeTask类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class SafeTask implements Runnable {
private static ThreadLocal<Date> startDate= new ThreadLocal<Date>() {
protected Date initialValue(){
return new Date();
}
};

@Override
public void run() {
System.out.printf("Starting Thread: %s : %s\n",Thread.currentThread().getId(),startDate.get());
try {
TimeUnit.SECONDS.sleep((int)Math.rint(Math.random()*10));
} catch (InterruptedException e) {
e.printStackTrace();
}
// Writes the start date
System.out.printf("Thread Finished: %s : %s\n",Thread.currentThread().getId(),startDate.get());
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

public class SafeMain {

public static void main(String[] args) {
SafeTask task=new SafeTask();
for (int i=0; i<3; i++){
Thread thread=new Thread(task);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
thread.start();
}
}
}

它跟UnsafeTask类的run()方法实现了一样的功能,但是访问startDate属性的方式改变了。
现在,这3个线程对象都有它们自己的startDate属性值。
线程局部变量分别为每个线程存储了各自的属性值,并提供给每个线程使用。可以使用get()方法读取这个值,并用set()方法设置这个值。
如果线程第一次访问线程局部变量,线程局部变量可能还没有为它存储值,这时initialValue()被调用,并返回当前时间值。
线程局部变量也提供了remove()方法。
如果一个线程是从其他某个线程中创建的,这个类将提供继承的值。线程A在线程局部变量已有值,当它创建线程B,线程B的线程局部变量将跟线程A是一样的。可以覆盖childValue()方法,这个方法用来初始化子线程在线程局部变量中的值。它使用父线程在线程局部变量中的值作为传入参数。


线程的分组

Java并发API提供了把线程分组,对组内线程对象进行访问并操作它们。例如,对于一些执行同样任务的线程,不管组内多少线程在运行,只需要一个单一的调用,所有这些线程的运行都会被中断。
ThreadGroup类表示一组线程。 线程组可以包含线程对象,也可以包含其他线程组对象,它是一个树形结构。
下面创建10个线程查询相同的任务并让它们随机休眠一段时间,当其中一个线程查询到结果,将其他9个线程中断。
创建Result类

1
2
3
4
5
6
7
8
9
10
11
public class Result {
private String name;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

创建SearchTask类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class SearchTask implements Runnable {
private Result result;

public SearchTask(Result result) {
this.result=result;
}

@Override
public void run() {
String name=Thread.currentThread().getName();
System.out.printf("Thread %s: Start\n",name);
try {
doTask();
result.setName(name);
} catch (InterruptedException e) {
System.out.printf("Thread %s: Interrupted\n",name);
return;
}
System.out.printf("Thread %s: End\n",name);
}

private void doTask() throws InterruptedException {
Random random=new Random((new Date()).getTime());
int value=(int)(random.nextDouble()*100);
System.out.printf("Thread %s: %d\n",Thread.currentThread().getName(),value);
TimeUnit.SECONDS.sleep(value);
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class Main {
public static void main(String[] args) {
ThreadGroup threadGroup = new ThreadGroup("Searcher");
Result result=new Result();

SearchTask searchTask=new SearchTask(result);
for (int i=0; i<10; i++) {
Thread thread=new Thread(threadGroup, searchTask);
thread.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

System.out.printf("Number of Threads: %d\n",threadGroup.activeCount());
System.out.printf("Information about the Thread Group\n");
threadGroup.list();

Thread[] threads=new Thread[threadGroup.activeCount()];
threadGroup.enumerate(threads);
for (int i=0; i<threadGroup.activeCount(); i++) {
System.out.printf("Thread %s: %s\n",threads[i].getName(),threads[i].getState());
}

waitFinish(threadGroup);
threadGroup.interrupt();
}

private static void waitFinish(ThreadGroup threadGroup) {
while (threadGroup.activeCount()>9) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}

线程组类存储了线程对象和关联的线程组对象,并可以访问它们的信息(例如状态),将执行的操作应用到所有成员上(例如中断)。



线程组中不可控异常的处理

Java提供了捕获和处理异常的机制。
有的异常必须被捕获,或者必须使用方法的throws声明再次抛出,这类异常叫做非运行时异常。
还有一类异常叫做运行时异常,它们不需要被什么或者捕获。
建立一个方法来捕获线程组中的任何线程对象抛出的非捕获异常。
创建一个MyThreadGroup类,并继承ThreadGroup。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MyThreadGroup extends ThreadGroup {
public MyThreadGroup(String name) {
super(name);
}

@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.printf("The thread %s has thrown an Exception\n",t.getId());
e.printStackTrace(System.out);
System.out.printf("Terminating the rest of the Threads\n");
interrupt();
}
}

创建Task类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Task implements Runnable {
@Override
public void run() {
int result;
Random random=new Random(Thread.currentThread().getId());
while (true) {
result=1000/((int)(random.nextDouble()*1000));
System.out.printf("%s : %f\n",Thread.currentThread().getId(),result);
if (Thread.currentThread().isInterrupted()) {
System.out.printf("%d : Interrupted\n",Thread.currentThread().getId());
return;
}
}
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
public class Main {
public static void main(String[] args) {
MyThreadGroup threadGroup=new MyThreadGroup("MyThreadGroup");
Task task=new Task();
for (int i=0; i<2; i++){
Thread t=new Thread(threadGroup,task);
t.start();
}
}

}

前面说过,当线程抛出非捕获异常时,JVM将为这个异常寻找3种可能的处理器。
首先,寻找抛出这个异常的线程的非捕获异常处理器,如果这个处理器不存在,JVM继续查找这个线程所在线程组的非捕获异常处理器,上面的代码就是这种情况。



使用工厂类创建线程

使用工厂类,可以将对象的创建集中化,这样做的好处:

  • 更容易修改类,或者改变创建对象的方式
  • 更容易为有限资源限制对象的数目。例如,可以限制一个类型的对象不多于n个。
  • 更容易为创建的对象生成统计数据。


    Java提供了ThreadFactory接口。Java并发API的高级工具类也使用了线程工厂创建线程。
    创建MyThreadFactory类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public class MyThreadFactory implements ThreadFactory {
    private int counter;
    private String name;
    private List<String> stats;

    public MyThreadFactory(String name){
    counter=0;
    this.name=name;
    stats=new ArrayList<String>();
    }


    @Override
    public Thread newThread(Runnable r) {
    Thread t=new Thread(r,name+"-Thread_"+counter);
    counter++;
    stats.add(String.format("Created thread %d with name %s on %s\n",t.getId(),t.getName(),new Date()));
    return t;
    }

    public String getStats(){
    StringBuffer buffer=new StringBuffer();
    Iterator<String> it=stats.iterator();
    while (it.hasNext()) {
    buffer.append(it.next());
    }
    return buffer.toString();
    }
    }

创建Task类

1
2
3
4
5
6
7
8
9
10
public class Task implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Main {
public static void main(String[] args) {
MyThreadFactory factory=new MyThreadFactory("MyThreadFactory");
Task task=new Task();
Thread thread;

System.out.printf("Starting the Threads\n");
for (int i=0; i<10; i++){
thread=factory.newThread(task);
thread.start();
}
System.out.printf("Factory stats:\n");
System.out.printf("%s\n",factory.getStats());
}
}

ThreadFactory接口只有一个方法,即newThread,它以Runnable接口对象作为传入参数并且返回一个线程对象。当实现ThreadFactory接口时,必须实现覆盖这个方法。
可以通过增加一些变化来强化实现方法覆盖。

  • 创建一个个性化线程,如使用一个特殊的格式作为线程名,或者通过继承Thread类来创建自己的线程类
  • 保存新创建的线程的统计数据
  • 限制创建的线程的数量
  • 对生成的线程进行验证


    使用工厂设计模式是一个很好的编程实践,如果通过实现ThreadFactory接口来创建线程,以保证所有的线程都是使用这个工厂创建的。