Java多线程 0x03

知识点

  • 资源的并发访问控制
  • 资源的多副本的并发访问控制
  • 等待多个并发事件的完成
  • 在集合点的同步
  • 并发阶段任务的运行
  • 并发阶段任务中的阶段切换
  • 并发任务间的数据交换


介绍几个更高级的同步机制

  • 信号量(Semaphore):是一个计数器,用来保护一个或者多个共享资源的访问。
  • CountDownLatch:Java提供的同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许线程一直等待。
  • CyclicBarrier:Java同步辅助类,它允许多个线程在某个集合点(common point)处进行互相等待。
  • Phaser:Java同步辅助类,它把任务分成多个阶段运行,在开始下一个阶段之前,当前阶段内的所有线程都必须执行完成,这是JDK7中的新特性。
  • Exchanger:Java同步辅助类,它提供两个线程之间的数据交换点。


二进制信号量(Binary Semaphore)

信号量是一个计数器,用来保护一个或者多个共享资源的访问。
线程访问一个共享资源,必须先获得信号量。若信号量大于0,信号量减1,然后允许访问这个共享资源;否则,信号量已经是0,此时会把线程置入休眠等待信号量大于0。
线程访问完一个共享资源后,信号量必须释放,即将信号量加1。
二进制信号量(Binary Semaphore)是一种特殊的信号量,用来保护对唯一共享资源的访问。
创建PrintQueue类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class PrintQueue{
private final Semaphore semaphore;

public PrintQueue(){
semaphore = new Semaphore(1); //创建二进制信号量
}

public void printJob(Object document){
try{
semaphore.acquire();

Long duration = (long)(Math.random() * 10);
System.out.printf("%s: PrintQueue: Printing a Job during %d seconds\n", Thread.currentThread().getName(),duration);
TimeUnit.SECONDS.sleep(duration);
}catch(InterruptedException e){
e.printStackTrace();
}finally{
semaphore.release();
}
}
}

创建Job类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Job implements Runnable{
private PrintQueue printQueue;

public Job(PrintQueue printQueue){
this.printQueue = printQueue;
}

@Override
public void run(){
System.out.printf("%s: Going to print a job\n",Thread.currentThread().getName());
printQueue.printJob(new Object());
System.out.printf("%s: The document has been printed\n", Thread.currentThread().getName());
}
}

主类

1
2
3
4
5
6
7
8
9
10
public class Main{
public static void main(String[] args){
PrintQueue printQueue = new PrintQueue();

Thread thread[] = new Thread[10];
for(int i = 0; i < 10; i++){
thread[i] = new Thread(new Job(printQueue),"Thread " + i);
}

}

获取信号量的方法acquire(),释放信号量的方法release()。
Semaphore类还有其他两种acquire()方法。

  • acquireUninterruptibly():当信号量为0时,线程会被阻塞,可能会被中断,从而导致acquire()方法抛出InterruptedException异常。而acquireUninterruptibly()方法会忽略线程的中断并且不会抛出任何异常。
  • tryAcquire():这个方法试图获取信号量,从而避开线程的阻塞和等待。
    信号量的公平性,默认都是非公平性的,Semaphore类的构造函数也提供了boolean类型的模式选择。



信号量(Semaphore)

信号量可以用来保护一个资源的多个副本,或者被多个线程同时执行的临界区。
创建PrintQueue类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class PrintQueue{
private Semaphore semaphore;

private boolean freePrinters[];

private Lock lockPrinters;

public PrintQueue(){
semaphore = new Semaphore(3);
freePrinters = new boolean[3];
for(int i = 0; i < 3; i++){
freePrinters[i] = true;
}
lockPrinters = new ReentrantLock();
}

public void printJob(Object document){
try{
semaphore.acquire();
int assignedPrinter = getPrinter();
Long duration = (long)(Math.random()*10);
System.out.printf("%s: PrintQueue: Printing a Job in Printer %d during %d seconds\n",Thread.currentThread().getName(),assignedPrinter,duration);
TimeUnit.SECONDS.sleep(duration);

freePrinters[assignedPrinter] = true;
}catch(InterruptedException e){
e.printStackTrace();
}finally{
semaphore.release();
}
}

private int getPrinter(){
int ret = -1;

try{
lockPrinters.lock();
for(int i = 0; i < freePrinters.length; i++){
if(freePrinters[i]){
ret = i;
freePrinters[i] = false;
break;
}
}
}catch(Exception e){
e.printStackTrace();
}finally{
lockPrinters.unlock();
}
}
}

创建Job类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Job implements Runnable{
private PrintQueue printQueue;

public Job(PrintQueue printQueue){
this.printQueue = printQueue;
}

@Override
public void run(){
System.out.printf("%s: Going to print a job\n",Thread.currentThread().getName());
printQueue.printJob(new Object());
System.out.printf("%s: The document has been printed\n",Thread.currentThread().getName());
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
public class Main{
public static void main(String[] args){
PrintQueue printQueue = new PrintQueue();
Thread thread[] = new Thread[12];
for(int i = 0; i < 12; i++){
thread[i] = new Thread(new Job(printQueue), "Thread " + i);
}
for(int i = 0;i < 12; i++){
thread[i].start();
}
}
}



等待多个并发事件的完成

Java并发API提供了CountDownLatch类,它是一个同步辅助类。
在完成一组正在其他线程中执行的操作之前,它允许线程一直等待。
这个类使用一个整数进行初始化,这个整数就是线程要等待完成的操作的数目。
当一个线程要等待某些操作先执行完时,需要调用await()方法,这个方法让线程进入休眠直到等待的所有操作都完成。
当某一个操作完成后,它将调用countDown()方法将CountDownLatch类的内部计数器减1。
当计数器变成0的时候,CountDownLatch类将唤醒所有调用await()方法而进入休眠的线程。

创建视频会议类Videoconference

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Videoconference implements Runnable{
private final CountDownLatch controller;

public Videoconference(int number){
controller = new CountDownLatch(number);
}

public void arrive(String name){
System.out.printf("%s has arrived.\n",name);
controller.countDown();
System.out.printf("VideoConference: Waiting for %d participants.\n", controller.getCount());
}

@Override
public void run(){
System.out.printf("VideoConference: Initialization: %d participants.\n", controller.getCount());
try{
controller.await();
System.out.printf("VideoConference: All the participants have come\n");
System.out.printf("VideoConference: Let's start...\n");
}catch(InterruptedException e){
e.printStackTrace();
}
}
}

创建与会者类Participant

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Participant implements Runnable{
private Videoconference conference;
private String name;

public Participant(Videoconference conference,String name){
this.conference = conference;
this.name = name;
}

@Override
public void run(){
Long duration = (long)(Math.random()*10);
try{
TimeUnit.SECONDS.sleep(duration);
}catch(InterruptedException e){
e.printStackTrace();
}
conference.arrive(name);
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Main{
public static void main(String[] args){
Videoconference conference = new Videoconference(10);
Thread threadConference = new Thread(conference);
threadConference.start();

for(int i = 0; i < 10; i++){
Participant p = new Participant (conference, "Participant " + i);
Thread t = new Thread(p);
t.start();
}
}
}

CountDownLatch类有三个基本元素

  • 一个初始值,即定义必须等待的先行完成的操作的数目。
  • await()方法,需要等待其他事件完成的线程调用。
  • countDown()方法,每个被等待的事件在完成的时候调用。
    创建CountDownLatch对象时,使用构造其来初始化内部计数器。当countDown()方法被调用后,计数器将减1。当计数器到达0的时候,CountDownLatch对象将唤起所有在await()方法上等待的线程。
    CountDownLatch对象的内部计数器被初始化之后就不能被再次初始化或者修改。一旦计数器被初始化后,唯一能改变参数值的方法是countDown()方法。当计数器到达0时,所有因调用await()方法而等待的线程立即被唤醒,再执行countDown将不起任何作用。
    和其他同步方法相比,CountDownLatch机制有下述不同:
  • CountDownLatch机制不是用来保护共享资源或者临界区的,它是同步执行多个任务的一个或者多个线程。
  • CountDownLatch只准许进入一次。即一旦CountDownLatch的内部计数器到达0,再调用这个方法将不起作用。
    CountDownLatch类提供了另一种await()方法,即await(long time, TimeUnit unit)。这个方法被调用后,线程将休眠直到被中断,或者CountDownLatch的内部计数器达到0,或者指定的时间已经过期。
    第二个参数是TimeUnit类型,TimeUnit类是以下常量的枚举值:DAYS\HOURS\MICROSECONDS\MILLISECONDS\MINUTES\NANOSECONDS\SECONDS。



在集合点的同步

Java并发API提供了CyclicBarrier类,它也是一个同步辅助类。
它允许两个或多个线程在某个点上进行同步。
这个类和CountDownLatch类类似,但也有不同之处,使之成为更强大的类。
CyclicBarrier类有一个很有意义的改进,可以传入另一个Runnable对象作为初始化参数。
当所有线程都到达集合点后,CyclicBarrier将这个Runnable对象作为线程执行。
这个特性使得这个类在并行任务上可以媲美分治编程技术(Divide and Conquer Programming Technique)。
创建MatrixMock类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class MatrixMock{
private int data[][]; //二维数组data

//矩阵的行数 每行的长度 寻找的数字
public MatrixMock(int size, int length, int number){
int counter = 0;
data = new int[size][length];
Random random = new Random();
for(int i = 0 ;i < size; i++){
for(int j = 0; j < length; j++){
data[i][j] = random.nexInt(10);
if(data[i][j] == number){
counter++;
}
}
}
System.out.printf("Mock: There are %d ocurrences of number in generated data.\n", counter, number);


public int[] getRow(int row){
if((row >= 0) && (row < data.length)){
return data[row];
}
return null;
}
}

结果类Results

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Results{
private int data[];

public Results(int size){
data = new int[size];
}

public void setData(int position,int value){
data[position] = value;
}

public int[] getData(){
return data;
}
}

查找类Searcher类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
pubilc class Searcher implements Runnable{
private int firstRow;
private int lastRow;
private MatrixMock mock;
private Results results;
private int number;
private final CyclicBarrier barrier;

public Search(int firstRow,int lastRow,MatrixMock mock,Results results, int number,CyclicBarrier barrier){
this.firstRow = firstRow;
this.lastRow = lastRow;
this.mock = mock;
this.results = results;
this.number = number;
this.barrier = barrier;
}

@Override
public void run(){
int counter;
System.out.printf("%s: Processing lines from %d to %d.\n", Thread.currentThread().getName(),firstRow, lastRow);
for(int i = firstRow; i < lastRow; i++){
int row[] = mock.getRow(i);
counter = 0;
for(int j = 0; j < row.length; j++){
if(row[j] == number){
counter++;
}
}
results.setData(i, counter);
}
System.out.printf("%s: Lines processed.\n",Thread.currentThread().getName());

try{
barrier.await();
}catch(InterruptedException e){
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
}
}

创建Grouper类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Grouper implements Runnable{
private Results results;

public Grouper(Results results){
this.results = results;
}

@Override
public void run(){
int finalResult = 0;
System.out.println("Grouper: Processing results...\n");
int data[] = results.getData();
for(int number : data){
finalResult += number;
}
System.out.printf("Grouper: Total result: %d.\n",finalResult);
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Main{
public static void main(String[] args){
final int ROWS = 10000;
final int NUMBERS = 1000;
final int SEARCH = 5;
final int PARTICIPANTS = 5;
final int LINES_PARTICIPANT = 2000;
MatrixMock mock = new MatrixMock(ROWS, NUMBERS, SEARCH);
Results results = new Results(ROWS);
Grouper grouper = new Grouper(results);
CyclicBarrier barrier = new CyclicBarrier(PARTICIPANTS, grouper);
Searcher searchers[] = new Searcher[PARTICIPANTS];
for(int i = 0; i < PARTICIPANTS; i++){
searchers[i] = new Searcher(i*LINES_PARTICIPANT, (i*LINES_PARTICIPANT)+LINES_PARTICIPANT, mock, results,5, barrier);
Thread thread = new Thread(searchers[i]);
thread.start();
}
System.out.printf("Main: The main thread has finished.\n");
}
}

将矩阵分离成5个子集,并且在每个子集中使用线程进行查找,这些线程是查找类Searcher对象。
使用CyclicBarrier对象同步5个线程,执行Grouper查找任务处理结果,并且计算最终的结果。
CyclicBarrier类有一个内部计数器,可以控制指定数目的几个线程必须都到达集合点。
每一个线程到达集合点后就会调用await()方法通知CyclicBarrier对象,CyclicBarrier对象会让这个线程休眠直到其他所有的线程都到达集合点。
当所有线程都到达集合点之后,CyclicBarrier对象就唤醒所有的await()方法里等待的线程,同时,还可以以构造器传入的Runnable对象(Grouper对象)创建一个新的线程,以执行其他任务。
CyclicBarrier类还提供了getNumberWaiting()方法和getParties()方法,前者将返回在await()上阻塞的线程的数目,后者返回被CyclicBarrier对象同步的任务数。
CyclicBarrier类和CountDownLatch类不同,它可以重置回初始状态。
CyclicBarrier类提供了reset()方法完成的。当重置发生后,在await()方法中等待的线程将收到一个BrokenBarrierException异常。
CyclicBarrier对象有一种特殊的状态即损坏状态(Broken)。当很多线程在await()方法上等待的时候,如果其中一个线程被中断,这个线程将抛出InterruptedException异常,其他的等待线程将抛出BrokenBarrierException异常,于是CyclicBarrier对象就处于损坏状态了。
CyclicBarrier类提供了isBroken()方法,如果处于损坏状态就返回true,否则返回false。


并发阶段任务的运行

Java API提供了一个更复杂、更强大的同步辅助类,Phaser,它允许执行并发多阶段任务。
当有并发任务并且需要分解成几步执行时,这种机制非常适用。
Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步。
跟其他同步工具一样,必须对Phaser类中参与同步操作的任务数进行初始化,不同的是,可以动态地增加或者减少任务数。
使用Phaser类同步三个并发任务,这三个任务将在三个不同的文件夹及其子文件夹中查找过去24小时内修改过扩展名的.log的文件。
这3个任务分成以下三个步骤:

  • 在指定的文件夹及其子文件夹中获得扩展名为.log的文件
  • 对第一步的结果进行过滤,删除修改时间超过24小时的文件
  • 将结果打印到控制台
    在第一步和第二步结束的时候,都会检查所查找的结果列表是不是有元素存在。如果结果列表是空的 ,对应的线程将结束执行,并从phaser中删除。
    创建FileSearch类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    public class FileSearch implements Runnable{
    private String initPath;
    private String end;
    private List<String> results;
    private Phaser phaser;

    public FileSearch(String initPath, String end, Phaser phaser){
    this.initPath = initPath;
    this.end = end;
    this.phaser = phaser;
    results = new ArrayList<>();
    }

    @Override
    public void run(){
    phaser.arriveAndAwaitAdvance();

    System.out.printf("%s: Starting.\n", Thread.currentThread().getName());

    File file = new File(initPath);
    if(file.isDirectory()){
    directoryProcess(file);
    }

    if(!checkResults()){
    return;
    }

    filterResults();

    if(!checkResults()){
    return;
    }

    showInfo();
    phaser.arriveAndDeregister();
    System.out.printf("%s: Work completed.\n",Thread.currentThread().getName());
    }

    private void showInfo(){
    for(int i = 0; i < results.size(); i++){
    File file = new File(results.get(i));
    System.out.printf("%s: %s\n", Thread.currentThread().getName(), file.getAbsolutePath());
    }
    phaser.arriveAndAwaitAdvance();
    }

    //检查结果列表的长度
    private boolean checkResults(){
    if(results.isEmpty()){
    System.out.printf("%s: Phase %d: 0 results.\n", Thread.currentThread().getName(), phaser.getPhase());
    System.out.printf("%s: Phase %d: End.\n", Thread.currentThread().getName(), phaser.getPhase());
    phaser.arriveAndDeregister();//调用Phaser对象当前线程已经结束这个阶段
    return false;
    }else{
    System.out.printf("%s: Phase %d: %d results.\n", Thread.currentThread().getName(), phaser.getPhase(), results.size());
    phaser.arriveAndAwaitAdvance();
    return true;
    }
    }

    //对第一阶段查找到的文件列表进行过滤,将不是过去24小时修改的文件删除
    private void filterResults(){
    List<String> newResults = new ArrayList<>();
    long actualDate = new Date().getTime();
    for(int i = 0; i < results.size(); i++){
    File file = new File(results.get(i));
    long fileDate = file.lastModified();
    if(actualDate - fileDate < TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS)){
    newResults.add(results.get(i));
    }
    }
    results = newResults;
    }

    //对每个文件夹递归调用
    private void directoryProcess(File file){
    File list[] = file.listFiles();
    if(list != null){
    for(int i = 0; i < list.length; i++){
    if(list[i].isDirectory()){
    directoryProcess(list[i]);
    }else{
    fileProcess(list[i]);
    }
    }
    }
    }

    private void fileProcess(File file){
    if(file.getName().endsWith(end)){
    results.add(file.getAbsolutePath());
    }
    }
    }

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Main{
public static void main(String[] args){
Phaser phaser = new Phaser(3);

FileSearch system = new FileSearch("C:\\XXX", "log", phaser);
FileSearch apps = new FileSearch("C:\\XXXX", "log", phaser);
FileSearch documents = new FileSearch("C:\\XXXXX","log", phaser);

Thread systemThread = new Thread(system, "System");
systemThread.start();

Thread appsThread = new Thread(apps, "Apps");
appsThread.start();

Thread documentsThread = new Thread(documents, "Documents");
documentsThread.start();

try{
systemThread.join();
appsThread.join();
documentsThread.join();
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.printf("Terminated: %s\n", phaser.isTerminated());
}
}

开始时候创建Phaser对象,用于在每个阶段结束时对线程同步进行控制。Phaser构造器传入了参与阶段同步的线程的个数。
这个数字通知Phaser在唤醒所有休眠线程以进行下一个阶段之前,必须执行arriveAndAwaitAdvance()方法的线程数。
在run()方法开头调用arriveAndAwaitAdvanced这个方法可以保障在所有线程创建好之前没有线程开始执行任务。(同一起跑线)
在第一阶段和第二阶段结束的时候,检查这个阶段中是不是生成了结果集以及结果集中是不是有元素。
在第一阶段,checkResults()方法里调用arriveAndAwaitAdvance()方法。
在第二阶段,如果结果集是空的,对应线程没有理由继续执行,所以返回;但必须通知phaser对象参与同步的线程少了一个,调用arriverAndDeregister()方法。
在第三阶段结束的时候,在showInfo()方法中调用了phaser对下你给的arriveAndAwaitAdvance()方法,通过这个调用,确保三个线程都已完成。
当showInfo()方法执行完成之后,还调用了phaser对象的arriveAndDeregiester()方法,通过这个调用,撤销phaser中线程的注册。
一个Phaser对象有两种状态

  • 活跃态(Active):当存在参与同步的线程时,Phaser就是活跃态,并且在每个阶段结束的时候进行同步。
  • 终止态(Termination):当所有参与同步的线程都取消注册的时候,Phaser就处于终止状态,在这种状态下,Phaser没有任何参与者。
    当Phaser对象的onAdvance()方法返回true的时候,Phaser对象就处于终止态。
    当Phaser是终止态是,同步方法arriveAndAwaitAdvance()会立即返回,而且不会做任何同步的操作。
    Phaser类的一个重大特性就是不必对它的方法进行异常处理。不像其他的同步辅助类,被Phaser类置于休眠的线程不会影响中断事件,也不会抛出InterruptException异常。


    Phaser类提供了其他改变Phaser对象的方法
  • arrive():这个方法通知phaser对象一个参与者已经完成了当前阶段,但是它不应该等待其他参与者都完成当前阶段。必须小心使用这个方法,因为它不会与其他线程同步。
  • awaitAdvance(int phaser):如果传入的阶段参数与当前阶段一直,这个方法会将当前线程置于休眠,直到这个阶段所有参与者都运行完成。如果传入的阶段参数与当前阶段不一致,这个方法将立即返回。
  • awaitAdvanceInterruptibly(int phaser):这个方法根awaitAdvance(int phase)一样,不同之处是如果在这个方法中休眠的线程被中断,它将抛出InterruptedException异常。


    创建一个Phaser对象时,需要指出有多少个参与者。Phaser类提供两种方法增加注册者的数量。
  • register():这个方法将一个新的参与者注册到Phaser中,这个新的参与者将被当成没有执行完本阶段的线程。
  • bulkRegister(int Parties):这个方法将指定数目的参与者注册到Phaser中,所有这些新的参与者都将被当成没有执行完本阶段的线程。


    Phaser类提供了forceTermination()方法来强制phaser进入终止态,这个方法不管phaser中是否存在注册的参与线程。当一个参与线程产生错误时,强制phaser终止时很有意义的。
    当一个phaser处于终止态时,awaitAdvance()和arriveAndAwaitAdvance()方法立即返回一个负数,而不再是一个正值了。如果知道phaser可能会被终止,就需要验证这些方法的返回值,以确定phaser是不是被终止了。



并发阶段任务中的阶段切换

Phaser类提供了onAdvance()方法,它在phaser阶段改变的时候会被自动执行。
onAdvance()方法需要两个int型的传入参数:当前的阶段数以及注册的参与者数量。
它返回的是boolean值,true表示已经完成执行并进入终止态,false表示phaser还在继续执行。
创建MyPhaser类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class MyPhaser extends Phaser{
@Override
protected boolean onAdvance(int phase, int registeredParties){
switch(phase){
case 0:
return studentsArrived();

case 1:
return finishFirstExercise();

case 2:
return finishSecondExercise();

case 3:
return finishExam();

default:
return true;
}
}

private boolean studentsArrived(){
System.out.printf("Phaser: The exam are going to start. The students are ready.\n");
System.out.printf("Phaser: We have %d students.\n",getRegisteredParties());
return false;
}

private boolean finishFirstExercise() {
System.out.printf("Phaser: All the students has finished the first exercise.\n");
System.out.printf("Phaser: It's turn for the second one.\n");
return false;
}

private boolean finishSecondExercise() {
System.out.printf("Phaser: All the students has finished the second exercise.\n");
System.out.printf("Phaser: It's turn for the third one.\n");
return false;
}

private boolean finishExam() {
System.out.printf("Phaser: All the students has finished the exam.\n");
System.out.printf("Phaser: Thank you for your time.\n");
return true;
}

}

创建Student类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class Student implements Runnable{
private Phaser phaser;

public Student(Phaser phaser){
this.phaser = phaser;
}

public void run(){
System.out.printf("%s: Has arrived to do the exam. %s\n",Thread.currentThread().getName(),new Date());
phaser.arriveAndAwaitAdvance();
System.out.printf("%s: Is going to do the first exercise. %s\n",Thread.currentThread().getName(),new Date());
doExercise1();

System.out.printf("%s: Has done the first exercise. %s\n",Thread.currentThread().getName(),new Date());
phaser.arriveAndAwaitAdvance();
System.out.printf("%s: Is going to do the second exercise. %s\n",Thread.currentThread().getName(),new Date());
doExercise2();

System.out.printf("%s: Has done the second exercise. %s\n",Thread.currentThread().getName(),new Date());
phaser.arriveAndAwaitAdvance();
System.out.printf("%s: Is going to do the third exercise. %s\n",Thread.currentThread().getName(),new Date());
doExercise3();
System.out.printf("%s: Has finished the exam. %s\n",Thread.currentThread().getName(),new Date());
phaser.arriveAndAwaitAdvance();
}

private void doExercise1() {
try {
Long duration=(long)(Math.random()*10);
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private void doExercise2() {
try {
Long duration=(long)(Math.random()*10);
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private void doExercise3() {
try {
Long duration=(long)(Math.random()*10);
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Main {

public static void main(String[] args) {
MyPhaser phaser=new MyPhaser();

Student students[]=new Student[5];
for (int i=0; i<students.length; i++){
students[i]=new Student(phaser);
phaser.register();
}

Thread threads[]=new Thread[students.length];
for (int i=0; i<students.length; i++) {
threads[i]=new Thread(students[i],"Student "+i);
threads[i].start();
}

for (int i=0; i<threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

System.out.printf("Main: The phaser has finished: %s.\n",phaser.isTerminated());

}

}

模拟了有三道试题的考试过程。所有的学生做完第一道题才开始做第二道。
phaser对象进行阶段切换的时候,在所有arriveAndAwaitAdvance()方法里休眠的线程被唤醒之前,onAdvance()方法将被自动调用。



并发任务间的数据交换

Java并发API提供了Exchanger来实心并发任务之间交换数据。
Exchanger类允许在两个线程之间定义同步点(Synchronization Point)。当两个线程都到达同步点时,它们交换数据结构,因此第一个线程的数据结构进入到第二个线程中,同时第二个线程的数据结构进入到第一个线程中。
Exchanger类在生产者——消费者中很有用,但Exchanger只能同步两个线程,即一个线程生产者,一个线程消费者。
创建Producer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Producer implements Runnable{
private List<String> buffer;
private final Exchanger<List<String>> exchanger;
public Producer(List<String> buffer, Exchanger<List<String>> exchanger){
this.buffer = buffer;
this.exchanger = exchanger;
}


@Override
public void run(){
int cycle = 1;
for(int i = 0 ; i < 10 ; i++){
System.out.printf("Producer: Cycle %d\n", cycle);

for(int j = 0; j < 10; j++){
String message = "Event " + ((i*10) + j);
System.out.printf("Producer: %s\n", message);
buffer.add(message);
}

try{
buffer = exchanger.exchange(buffer);
}catch(InterruptedException e){
e.printStackTrace();
}

System.out.printf("Producer: %d\n", buffer.size());
cycle++;
}

}
}

创建Consumer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Consumer implements Runnable{
private List<String> buffer;
private final Exchanger<List<String>> exchanger;

public Consumer(List<String> buffer, Exchanger<List<String>> exchanger){
this.buffer = buffer;
this.exchanger = exchanger;
}

@Override
public void run(){
int cycle = 1;
for(int i = 0; i < 10; i++){
System.out.printf("Consumer: Cycle %d\n", cycle);
try{
buffer = exchanger.exchange(buffer);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.printf("Consumer: %d\n", buffer.size());

for(int j = 0; j < 10; j++){
String message = buffer.get(0);
System.out.printf("Consumer: %s\n", message);
buffer.remove(0);
}

cycle++;
}
}
}

主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Main{
public static void main(String[] args){
List<String> buffer1 = new ArrayList();
List<String> buffer2 = new ArrayList();

Exchanger<List<String>> exchanger = new Exchanger<>();

Producer producer = new Producer(buffer1, exchanger);
Consumer consumer = new Consumer(buffer2, exchanger);

Thread threadProducer = new Thread(producer);
Thread threadConsumer = new Thread(consumer);

threadProducer.start();
threadConsumer.start();
}
}

消费者先创建一个空的缓存列表,然后通过调用Exchanger与生产者同步来获得可以消费的数据。生产者从一个空的缓存列表开始执行,它创建了10个字符串,然后存储在这个缓存中,并且使用exchanger队形与消费者同步。

Exchanger类还提供了另外的exchanger方法,即exchange(V data, long time, TimeUnit unit)方法。
V指要交换的数据结构
time指定等待的time值
TimeUnit可以是DAYS\HOURS\MICROSECONDS\MILLISECONDS\MINUTES\NANOSECONDS\SECONDS