JAVA并发-同步工具类CyclicBarrier、CountDownLatch、Semaphore

图片 1

同步器

为每种特定的同步问题提供了解决方案

1.CyclicBarrier

CyclicBarrier初始化时规定一个数目,然后计算调用了CyclicBarrier.await()进入等待的线程数。当线程数达到了这个数目时,所有进入等待状态的线程被唤醒并继续。
功能:可以使一定数量的参与方反复在指定的地方(就是调用await()的地方)汇集(阻塞自己)
,只有所有参与方都调用了await(),栅栏就会打开,所有线程阻塞才会解除

适用场景:CyclicBarrier可以用在所有子线程之间互相等待多次的情形。如并行迭代算法,这种算法通常将一个问题拆分成一系列相互独立的子问题

code

public static void main(String[] args){ //CyclicBarrier 是几个线程导了一个点后等其他线程到了一起出发  哪个地方等就在哪个地方调用await()
    ExecutorService threadpool=Executors.newCachedThreadPool();
    final CyclicBarrier cb=new CyclicBarrier(3);
    for(int i=0;i<3;i++){
        Runnable runnable=new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep((long)Math.random()*10000);
                    System.out.println("线程" + Thread.currentThread().getName() +  "即将到达地点1,当前已有"+(cb.getNumberWaiting()+1 )+"个线程");
                    //到达出发点
                    cb.await();
                    Thread.sleep((long)Math.random()*10000);

                    System.out.println("线程"+Thread.currentThread().getName()+"即将到达集合地点2,当前已有"+(cb.getNumberWaiting()+1)+"个线程");

                    cb.await();
                    Thread.sleep((long)Math.random()*10000);
                    System.out.println("线程"+Thread.currentThread().getName()+"即将到达集合地点3,当前已有"+(cb.getNumberWaiting()+1)+"个线程");
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        };
        threadpool.execute(runnable);           
    }
}

锁的机制从整体的运行转态来讲核心就是:阻塞,解除阻塞,但是如果仅仅是这点功能,那么JUC并不能称为一个优秀的线程开发框架,然而是因为在juc里面提供了大量方便的同步工具辅助类。

Semaphore

Semaphore【信号标;旗语】,通过计数器控制对共享资源的访问。

测试类:

    package concurrent;

    import concurrent.thread.SemaphoreThread;

    import java.util.concurrent.Semaphore;

    /**
    * 拿客
    * www.coderknock.com
    * QQ群:213732117
    * 创建时间:2016年08月08日
    * 描述:
    */
    public class SemaphoreTest {

       public static void main(String[] args) {
           //在Thread里声明并不是同一个对象
           Semaphore semaphore = new Semaphore(3);
           SemaphoreThread testA = new SemaphoreThread("A", semaphore);
           SemaphoreThread testB = new SemaphoreThread("B", semaphore);
           SemaphoreThread testC = new SemaphoreThread("C", semaphore);
           SemaphoreThread testD = new SemaphoreThread("D", semaphore);
           SemaphoreThread testE = new SemaphoreThread("E", semaphore);
           SemaphoreThread testF = new SemaphoreThread("F", semaphore);
           SemaphoreThread testG = new SemaphoreThread("G", semaphore);
           testA.start();
           testB.start();
           testC.start();
           testD.start();
           testE.start();
           testF.start();
           testG.start();
       }
   }

线程写法:

   package concurrent.thread;

   import org.apache.logging.log4j.LogManager;
   import org.apache.logging.log4j.Logger;

   import java.util.concurrent.Semaphore;

   /**
    * 拿客
    * www.coderknock.com
    * QQ群:213732117
    * 创建时间:2016年08月08日
    * 描述:
    */
   public class SemaphoreThread extends Thread {
       private static final Logger logger = LogManager.getLogger(SemaphoreThread.class);
       //创建有3个信号量的信号量计数器
       public Semaphore semaphore;

       public SemaphoreThread(String name, Semaphore semaphore) {
           setName(name);
           this.semaphore = semaphore;
       }

       @Override
       public void run() {
           try {
               logger.debug(getName() + " 取号等待... " + System.currentTimeMillis());
               //取出一个信号
               semaphore.acquire();
               logger.debug(getName() + " 提供服务... " + System.currentTimeMillis());
               sleep(1000);
               logger.debug(getName() + " 完成服务... " + System.currentTimeMillis());

           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           logger.debug(getName() + " 释放... " + System.currentTimeMillis());
           //释放一个信号
           semaphore.release();
       }
   }

执行结果【以下所有输出结果中[]中为线程名称- 后为输出的内容】:

    [C] - C 取号等待... 1470642024037
    [F] - F 取号等待... 1470642024036
    [E] - E 取号等待... 1470642024036
    [B] - B 取号等待... 1470642024037
    [D] - D 取号等待... 1470642024037
    [A] - A 取号等待... 1470642023965
    [D] - D 提供服务... 1470642024039
    [C] - C 提供服务... 1470642024039
    [G] - G 取号等待... 1470642024036
    [F] - F 提供服务... 1470642024040
    [D] - D 完成服务... 1470642025039
    [C] - C 完成服务... 1470642025039
    [D] - D 释放... 1470642025040
    [F] - F 完成服务... 1470642025040
    [C] - C 释放... 1470642025041
    [B] - B 提供服务... 1470642025042
    [A] - A 提供服务... 1470642025042
    [F] - F 释放... 1470642025043
    [E] - E 提供服务... 1470642025043
    [A] - A 完成服务... 1470642026043
    [B] - B 完成服务... 1470642026043
    [B] - B 释放... 1470642026043
    [A] - A 释放... 1470642026043
    [G] - G 提供服务... 1470642026044
    [E] - E 完成服务... 1470642026045
    [E] - E 释放... 1470642026045
    [G] - G 完成服务... 1470642027045
    [G] - G 释放... 1470642027046

可以看到,当3个信号量被领取完之后,之后的线程会阻塞在领取信号的位置,当有信号量释放之后才会继续执行。

CountDownLatch

CountDownLatch【倒计时锁】,线程中调用countDownLatch.await()使进程进入阻塞状态,当达成指定次数后(通过countDownLatch.countDown())继续执行每个线程中剩余的内容。

测试类:

package concurrent.thread;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.CountDownLatch;

/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class package concurrent;

import concurrent.thread.CountDownLatchThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;

/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class CountDownLatchTest {

    private static final Logger logger = LogManager.getLogger(CountDownLatchTest.class);

    public static void main(String[] args) throws InterruptedException {
        //设定当达成三个计数时触发
        CountDownLatch countDownLatch = new CountDownLatch(3);
        new CountDownLatchThread("A", countDownLatch).start();
        new CountDownLatchThread("B", countDownLatch).start();
        new CountDownLatchThread("C", countDownLatch).start();
        new CountDownLatchThread("D", countDownLatch).start();
        new CountDownLatchThread("E", countDownLatch).start();
        for (int i = 3; i > 0; i--) {
            Thread.sleep(1000);
            logger.debug(i);
            countDownLatch.countDown();
        }
    }
}

线程类:

package concurrent.thread;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.CountDownLatch;

/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class CountDownLatchThread extends Thread {
    private static final Logger logger = LogManager.getLogger(CountDownLatchThread.class);
    //计数器
    private CountDownLatch countDownLatch;

    public CountDownLatchThread(String name, CountDownLatch countDownLatch) {
        setName(name);
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        logger.debug("执行操作...");
        try {
            sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.debug("等待计数器达到标准...");
        try {
            //让线程进入阻塞状态,等待计数达成后释放
            countDownLatch.await();
            logger.debug("计数达成,继续执行...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

执行结果:

 [E] - 执行操作...
 [B] - 执行操作...
 [A] - 执行操作...
 [C] - 执行操作...
 [D] - 执行操作...
 [main] DEBUG concurrent.CountDownLatchTest - 3
 [B] - 等待计数器达到标准...
 [E] - 等待计数器达到标准...
 [C] - 等待计数器达到标准...
 [D] - 等待计数器达到标准...
 [A] - 等待计数器达到标准...
 [main] DEBUG concurrent.CountDownLatchTest - 2
 [main] DEBUG concurrent.CountDownLatchTest - 1
 [E] - 计数达成,继续执行...
 [C] - 计数达成,继续执行...
 [B] - 计数达成,继续执行...
 [D] - 计数达成,继续执行...
 [A] - 计数达成,继续执行...

CyclicBarrier

CyclicBarrier【Cyclic周期,循环的
Barrier屏障,障碍】循环的等待阻塞的线程个数到达指定数量后使参与计数的线程继续执行并可执行特定线程(使用不同构造函数可以不设定到达后执行),其他线程仍处于阻塞等待再一次达成指定个数。

测试类:

package concurrent;

import concurrent.thread.CyclicBarrierThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.CyclicBarrier;

/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class CyclicBarrierTest {

    private static final Logger logger = LogManager.getLogger(CyclicBarrierTest.class);

    public static void main(String[] args) {
          //可以使用CyclicBarrier(int parties)不设定到达后执行的内容
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
            logger.debug("---计数到达后执行的内容----");
        });
        new CyclicBarrierThread("A", cyclicBarrier).start();
        new CyclicBarrierThread("B", cyclicBarrier).start();
        new CyclicBarrierThread("C", cyclicBarrier).start();
        new CyclicBarrierThread("D", cyclicBarrier).start();
        new CyclicBarrierThread("E", cyclicBarrier).start();
        new CyclicBarrierThread("A2", cyclicBarrier).start();
        new CyclicBarrierThread("B2", cyclicBarrier).start();
        new CyclicBarrierThread("C2", cyclicBarrier).start();
        new CyclicBarrierThread("D2", cyclicBarrier).start();
        new CyclicBarrierThread("E2", cyclicBarrier).start();
        //需要注意的是,如果线程数不是上面设置的等待数量的整数倍,比如这个程序中又加了个线程,
        // 那么当达到5个数量时,只会执行达到时的五个线程的内容,
        // 剩余一个线程会出于阻塞状态导致主线程无法退出,程序无法结束
        // new CyclicBarrierThread("F", cyclicBarrier).start();//将这行注释去掉程序无法自动结束
    }
}

线程类:

package concurrent.thread;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class CyclicBarrierThread extends Thread {

    private static final Logger logger = LogManager.getLogger(CyclicBarrierThread.class);

    private CyclicBarrier cyclicBarrier;

    public CyclicBarrierThread(String name, CyclicBarrier cyclicBarrier) {
        super(name);
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        logger.debug("执行操作...");
        try {
            int time = new Random().nextInt(10) * 1000;
            logger.debug("休眠" + time/1000 + "秒");
            sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.debug("等待计数器达到标准...");
        try {
            //让线程进入阻塞状态,等待计数达成后释放
            cyclicBarrier.await();
            logger.debug("计数达成,继续执行...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

执行结果:

 [A] - 执行操作...
 [A] - 休眠0秒
 [E2] - 执行操作...
 [E2] - 休眠5秒
 [D2] - 执行操作...
 [D2] - 休眠4秒
 [C2] - 执行操作...
 [C2] - 休眠4秒
 [B2] - 执行操作...
 [B2] - 休眠6秒
 [A2] - 执行操作...
 [A2] - 休眠8秒
 [E] - 执行操作...
 [E] - 休眠5秒
 [D] - 执行操作...
 [D] - 休眠0秒
 [C] - 执行操作...
 [C] - 休眠3秒
 [B] - 执行操作...
 [B] - 休眠7秒
 [A] - 等待计数器达到标准...
 [D] - 等待计数器达到标准...
 [C] - 等待计数器达到标准...
 [D2] - 等待计数器达到标准...
 [C2] - 等待计数器达到标准...
 [C2] DEBUG concurrent.CyclicBarrierTest - ---计数到达后执行的内容----
 [C2] - 计数达成,继续执行...
 [A] - 计数达成,继续执行...
 [C] - 计数达成,继续执行...
 [D2] - 计数达成,继续执行...
 [D] - 计数达成,继续执行...
 [E2] - 等待计数器达到标准...
 [E] - 等待计数器达到标准...
 [B2] - 等待计数器达到标准...
 [B] - 等待计数器达到标准...
 [A2] - 等待计数器达到标准...
 [A2] DEBUG concurrent.CyclicBarrierTest - ---计数到达后执行的内容----
 [E] - 计数达成,继续执行...
 [B2] - 计数达成,继续执行...
 [E2] - 计数达成,继续执行...
 [B] - 计数达成,继续执行...
 [A2] - 计数达成,继续执行...

可以想象成以前不正规的长途汽车站的模式:

不正规的长途汽车站会等待座位坐满之后才发车,到达目的地之后继续等待然后循环进行。每个人都是一个Thread,上车后触发cyclicBarrier.await();,当坐满时就是达到指定达成数的时候,车辆发车就是达成后统一执行的内容,发车后车上的人们就可以聊天之类的操作了【我们暂且理解为上车后人们就都不能动了O(∩_∩)O~】。

2.CountDownLatch(闭锁)

CountDownLatch先初始化规定一个计数器,没调用一次countDwon()计数器减一,如果数量变成0就达到结束状态,闭锁打开(await()方法阻塞解除)

闭锁的作用相当于一扇门,闭锁的状态没有结束时这扇门一直视关闭的,闭锁达到结束状态时将允许所有线程通过

适用场景:CountDownLatch可以应用于主线程等待所有子线程结束后再继续执行的情况。闭锁可以用来确定某些活动直到其他活动都完成了才继续执行
如确保某个服务在其他服务启动之后才能启动;在多玩家的游戏中确保所有玩家就绪后才执行

Code

  public static void main(String[] args){
    ExecutorService threadpool = Executors.newCachedThreadPool();
    final CountDownLatch cdorder=new CountDownLatch(1);
    final CountDownLatch cdanswer=new CountDownLatch(3);
    for(int i=0;i<3;i++){
        Runnable runnable=new Runnable() {

            @Override
            public void run() {

                try {
                    System.out.println("线程"+Thread.currentThread().getName()+"正准备接受命令");
                    cdorder.await();
                    System.out.println("线程"+Thread.currentThread().getName()+"已接受命令");

                    Thread.sleep((long)Math.random()*10000);
                    System.out.println("线程"+Thread.currentThread().getName()+"回应命令处理结果");
                    cdanswer.countDown();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        };
        threadpool.execute(runnable);
    }

    try {
        Thread.sleep((long)Math.random()*10000);
        System.out.println("线程"+Thread.currentThread().getName()+"即将发布命令");
        cdorder.countDown();
        System.out.println("线程"+Thread.currentThread().getName()+"已发送命令,正在等待结果");
        cdanswer.await();
        System.out.println("线程"+Thread.currentThread().getName()+"已收到所有响应结果");


    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    threadpool.shutdown();

}

CountDownLatch与CyclicBarrier区别

1.CountDownLatch减计数方式
CyclicBarrier 加计数方式

2.CountDownLatch计算为0时释放所有等待的线程
CyclicBarrier 计数达到指定值时释放所有等待线程

3.CountDownLatch计数为0时,无法重置
CyclicBarrier 计数达到指定值时,计数置为0重新开始

4.CountDownLatch调用countDown()方法计数减一,调用await()方法只进行阻塞,对计数没任何影响
CyclicBarrier 调用await()方法计数加1,若加1
的值不等于构造方法的值,则线程阻塞

5.CountDownLatch不可重复利用
CyclicBarrier可重复利用

6.线程在countDown()之后,会继续执行自己的任务,而CyclicBarrier会在所有线程任务结束之后,才会进行后续任务

Semaphore信号量

Semaphore通常用于限制可以访问某些资源(物理or逻辑)的线程数目。

例如,大家排队去银行办理业务,但是只有两个银行窗口提供服务,来了10个人需要办理业务,所以这10个排队的人员需要依次使用这两个业务窗口来办理业务。

 

观察Semaphore类的基本定义:

public class Semaphore extends Object implements Serializable

Semaphore类中定义的方法有如下几个:

  • 构造方法:

    public Semaphore(int premits),设置服务的信号量;
    
  • 构造方法:

    public Semaphore(int premits,boolean fair) ,是否为公平锁;
    
  • 等待执行:

    public void acquireUninterruptibly(int permits)
    
    • 设置的信号量上如果有阻塞的线程对象存在,那么将一直持续阻塞状态。 
  • 释放线程的阻塞状态:

    public void release(int permits);
    
  • 返回可用的资源个数:

    public int availablePermits();
    

 范例:实现银行排队业务办理

package so.strong.mall.concurrent;
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreDemo {
    public static void main(String[] args) {
        final Semaphore semaphore = new Semaphore(2); //现在允许操作的资源一共有2个
        final Random random = new Random(); //模拟每一个用户办理业务的时间
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() { //每一个线程就是一个要办理业务的人员
                    if (semaphore.availablePermits() > 0) { //现有空余窗口
                        System.out.println("[" + Thread.currentThread().getName() + "]进入银行,没有人办理业务");
                    } else { //没有空余位置
                        System.out.println("[" + Thread.currentThread().getName() + "]排队等候办理业务");
                    }
                    try {
                        semaphore.acquire(); //从信号量中获得操作许可
                        System.out.println("[" + Thread.currentThread().getName() + "]开始办理业务");
                        TimeUnit.SECONDS.sleep(random.nextInt(10)); //模拟办公延迟
                        System.out.println("[" + Thread.currentThread().getName() + "]结束业务办理");
                        semaphore.release(); //当前线程离开办公窗口
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, "顾客-" + i).start();
        }
    }
}

[顾客-0]进入银行,没有人办理业务
[顾客-0]开始办理业务
[顾客-1]进入银行,没有人办理业务
[顾客-1]开始办理业务
[顾客-2]排队等候办理业务
[顾客-3]排队等候办理业务
[顾客-4]排队等候办理业务
[顾客-5]排队等候办理业务
[顾客-6]排队等候办理业务
[顾客-7]排队等候办理业务
[顾客-8]排队等候办理业务
[顾客-9]排队等候办理业务
[顾客-0]结束业务办理
[顾客-2]开始办理业务
[顾客-1]结束业务办理
[顾客-3]开始办理业务
[顾客-2]结束业务办理
[顾客-4]开始办理业务
[顾客-3]结束业务办理
[顾客-5]开始办理业务
[顾客-4]结束业务办理
[顾客-6]开始办理业务
[顾客-5]结束业务办理
[顾客-7]开始办理业务
[顾客-7]结束业务办理
[顾客-8]开始办理业务
[顾客-6]结束业务办理
[顾客-9]开始办理业务
[顾客-8]结束业务办理

这种信号量的处理在实际开发中有什么用呢?例如,现在对于数据库的连接一共有2个连接,那么可能有10个用户等待进行数据库操作,能够使用的连接个数为2个,这样10个用户就需要排队依次使用这两个连接来进行数据库操作。

 

CountDownLatch与CyclicBarrier区别:

CountDownLatch是一个或多个线程等待计数达成后继续执行,await()调用并没有参与计数。

CyclicBarrier则是N个线程等待彼此执行到零界点之后再继续执行,await()调用的同时参与了计数,并且CyclicBarrier支持条件达成后执行某个动作,而且这个过程是循环性的。

3.Semaphore(信号量)

计数信号量用来控制同时访问某个特定资源的操作数量,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个
acquire(),然后再获取该许可。每个 release()
添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore
只对可用许可的号码进行计数,并采取相应的行动。拿到信号量的线程可以进入代码,否则就等待。通过acquire()和release()获取和释放访问许可。初始值为1的Semaphore二值信号量可以用作互斥体

适用场景:Semaphore可以用于做流量控制,特别公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流控

Code

public static void main(String[] args){

    ExecutorService threadpool = Executors.newCachedThreadPool();

    final Semaphore sp=new Semaphore(3);


    for(int i=0;i<10;i++){
        Runnable runnable=new Runnable() {

            @Override
            public void run() {                 
                    try {
                        sp.acquire();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    System.out.println("线程" + Thread.currentThread().getName() +" 进入,当前有" + (3- sp.availablePermits())+ " 个线程并发");
                    try {
                        Thread.sleep((long)(Math.random()*10000));
                    } catch (InterruptedException e) {                          
                        e.printStackTrace();
                    }
                    System.out.println("线程"+Thread.currentThread().getName()+"即将离开");
                    sp.release();

                    System.out.println("线程"+Thread.currentThread().getName()+"已离开,当前已有"+(3-sp.availablePermits())+"个线程并发");
            }
        };

        threadpool.execute(runnable);

    }

}

控制任务提交速度

class BoundedExecutor{//使用Semaphore控制任务的提交速率
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec,Semaphore semaphore){
    this.exec=exec;
    this.semaphore=semaphore;
}
public void submitTask(final Runnable command) throws InterruptedException{
    semaphore.acquire();
    try{
        exec.execute(new Runnable() {

            @Override
            public void run() {
                try{
                    command.run();
                }finally{
                    semaphore.release();
                }

            }
        });
    }catch(RejectedExecutionException e){
        semaphore.release();
    }
    }
  }

CountDownLatch闭锁

CoundDownLatch描述的是一个计数的减少,下面首先观察一个程序的简单问题:

范例:编写一个简单的多线程开发

package so.strong.mall.concurrent;
public class CountDownDemo {
    public static void main(String[] args) {
        for (int i = 0; i < 2; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("["+Thread.currentThread().getName()+"]线程应用执行完毕");
                }
            },"线程对象-"+i).start();
        }
        System.out.println("[***主线程***]所有的程序执行完毕");
    }
}

[***主线程***]所有的程序执行完毕
[线程对象-1]线程应用执行完毕
[线程对象-0]线程应用执行完毕  

现在可以发现,对于此时应该保证所有的线程执行完毕后在执行程序的输出计算,就好比:旅游团集合人员乘车离开。应该保证所有的线程都执行完毕了(指定个数的线程),这样的话就必须做一个计数处理。

CoundDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。

CoundDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

图片 1

 

CoundDownLatch类之中的常用方法有如下几个:

  • 构造方法:

    public CountDownLatch(int count);  //要设置一个等待的线程个数;
    
  • 减少等待个数:

    public void countDown();  
    
  • 等待countDownLatch为0:

    public void await() throws InterruptedException;
    

      

范例:利用CountDownLatch解决之前的设计问题

package so.strong.mall.concurrent;
import java.util.concurrent.CountDownLatch;

public class CountDownDemo {
    public static void main(String[] args) throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(2); //2个线程全部执行完毕后可继续执行
        for (int i = 0; i < 2; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("[" + Thread.currentThread().getName() + "]线程应用执行完毕");
                    countDownLatch.countDown(); //减少等待的线程个数
                }
            }, "线程对象-" + i).start();
        }
        countDownLatch.await(); //等待计数的结束(个数为0)
        System.out.println("[***主线程***]所有的程序执行完毕");
    }
}

[线程对象-0]线程应用执行完毕
[线程对象-1]线程应用执行完毕
[***主线程***]所有的程序执行完毕

  

Exchanger

Exchanger<T> 用于线程间进行数据交换

测试类:

package concurrent;

import concurrent.pojo.ExchangerPojo;
import concurrent.thread.ExchangerThread;

import java.util.HashMap;
import java.util.concurrent.Exchanger;

/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class ExchangerTest {

    public static void main(String[] args) {
        Exchanger<HashMap<String, ExchangerPojo>> exchanger = new Exchanger<>();
        new ExchangerThread("A", exchanger).start();
        new ExchangerThread("B", exchanger).start();
    }
}

实体类:

package concurrent.pojo;

import com.alibaba.fastjson.JSON;

import java.util.Date;
import java.util.List;

/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class ExchangerPojo {
    private int intVal;
    private String strVal;
    private List<String> strList;
    private Date date;

    public ExchangerPojo(int intVal, String strVal, List<String> strList, Date date) {
        this.intVal = intVal;
        this.strVal = strVal;
        this.strList = strList;
        this.date = date;
    }

    public int getIntVal() {
        return intVal;
    }

    public void setIntVal(int intVal) {
        this.intVal = intVal;
    }

    public String getStrVal() {
        return strVal;
    }

    public void setStrVal(String strVal) {
        this.strVal = strVal;
    }

    public List<String> getStrList() {
        return strList;
    }

    public void setStrList(List<String> strList) {
        this.strList = strList;
    }

    public Date getDate() {
        return date;
    }

    public void setDate(Date date) {
        this.date = date;
    }

    @Override
    public String toString() {
        return JSON.toJSONString(this);
    }
}

线程类:

package concurrent.thread;

import concurrent.pojo.ExchangerPojo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.*;
import java.util.concurrent.Exchanger;

/**
 * 拿客
 * www.coderknock.com
 * QQ群:213732117
 * 创建时间:2016年08月08日
 * 描述:
 */
public class ExchangerThread extends Thread {
    private Exchanger<HashMap<String, ExchangerPojo>> exchanger;

    private static final Logger logger = LogManager.getLogger(ExchangerThread.class);

    public ExchangerThread(String name, Exchanger<HashMap<String, ExchangerPojo>> exchanger) {
        super(name);
        this.exchanger = exchanger;
    }

    @Override
    public void run() {
        HashMap<String, ExchangerPojo> map = new HashMap<>();
        logger.debug(getName() + "提供者提供数据...");
        Random random = new Random();
        for (int i = 0; i < 3; i++) {
            int index = random.nextInt(10);
            List<String> list = new ArrayList<>();
            for (int j = 0; j < index; j++) {
                list.add("list ---> " + j);
            }
            ExchangerPojo pojo = new ExchangerPojo(index, getName() + "提供的数据", list, new Date());
            map.put("第" + i + "个数据", pojo);
        }
        try {
            int time = random.nextInt(10);
            logger.debug(getName() + "等待" + time + "秒....");
            for (int i = time; i > 0; i--) {
                sleep(1000);
                logger.debug(getName() + "---->" + i);
            }
              //等待exchange是会进入阻塞状态,可以在一个线程中与另一线程多次交互,此处就不写多次了
            HashMap<String, ExchangerPojo> getMap = exchanger.exchange(map);
            time = random.nextInt(10);
            logger.debug(getName() + "接受到数据等待" + time + "秒....");
            for (int i = time; i > 0; i--) {
                sleep(1000);
                logger.debug(getName() + "---->" + i);
            }
            getMap.forEach((x, y) -> {
                logger.debug(x + " -----> " + y.toString());
            });
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

执行结果:

 [B] - B提供者提供数据...
 [A] - A提供者提供数据...
 [A] - A等待2秒....
 [B] - B等待0秒....
 [A] - A---->2
 [A] - A---->1
 [B] - B接受到数据等待1秒....
 [A] - A接受到数据等待4秒....
 [B] - B---->1
 [A] - A---->4
 [B] - 第0个数据 -----> {"date":1470652252049,"intVal":5,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4"],"strVal":"A提供的数据"}
 [B] - 第1个数据 -----> {"date":1470652252049,"intVal":1,"strList":["list ---> 0"],"strVal":"A提供的数据"}
 [B] - 第2个数据 -----> {"date":1470652252049,"intVal":4,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3"],"strVal":"A提供的数据"}
 [A] - A---->3
 [A] - A---->2
 [A] - A---->1
 [A] - 第0个数据 -----> {"date":1470652252057,"intVal":1,"strList":["list ---> 0"],"strVal":"B提供的数据"}
 [A] - 第1个数据 -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list ---> 5"],"strVal":"B提供的数据"}
 [A] - 第2个数据 -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list ---> 5"],"strVal":"B提供的数据"}

 CyclicBarrier栅栏

CyclicBarrierCountDownLatch是非常类似的,CyclicBarrier核心的概念是在于设置一个等待线程的数量边界,到达了此边界之后进行执行。

CyclicBarrier类是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点(Common
Barrier Point)。

CyclicBarrier类是一种同步机制,它能够对处理一些算法的线程实现同。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这里,然后所有线程才可以继续做其他事情。

通过调用CyclicBarrier对象的await()方法,两个线程可以实现互相等待。一旦N个线程在等待CyclicBarrier达成,所有线程将被释放掉去继续执行。

 

CyclicBarrier类的主要方法如下:

  • 构造方法:

    public CyclicBarrier(int parties);//设置等待的边界;
    
  • 傻傻的等待其他线程:

    public int await() throws InterruptedException, BrokenBarrierException;
    
  • 等待其他线程:

    public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException;
    
  • 重置等待的线程个数:

    public void reset();
    

范例:观察CyclicBarrier进行等待处理

package so.strong.mall.concurrent;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierDemo {
    public static void main(String[] args) throws Exception{
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); //当凑够2个线程金进行触发
        for (int i = 0; i < 3 ; i++) {
            final int second = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("["+Thread.currentThread().getName()+"]-等待开始");
                    try {
                        TimeUnit.SECONDS.sleep(second);
                        cyclicBarrier.await(); //等待处理
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println("["+Thread.currentThread().getName()+"]-等待结束");
                }
            },"娱乐者-"+i).start();
        }
    }
}

 

[娱乐者-0]-等待开始
[娱乐者-1]-等待开始
[娱乐者-2]-等待开始
[娱乐者-1]-等待结束
[娱乐者-0]-等待结束

  如果不想一直等待则可以设置超时时间,则超过了等待时间之后将会出现”TimeoutException”。

package so.strong.mall.concurrent;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierDemo {
    public static void main(String[] args) throws Exception{
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); //当凑够2个线程金进行触发
        for (int i = 0; i < 3 ; i++) {
            final int second = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("["+Thread.currentThread().getName()+"]-等待开始");
                    try {
                        TimeUnit.SECONDS.sleep(second);
                        cyclicBarrier.await(6,TimeUnit.SECONDS); //等待处理
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println("["+Thread.currentThread().getName()+"]-等待结束");
                }
            },"娱乐者-"+i).start();
        }
    }
}

[娱乐者-0]-等待开始
[娱乐者-1]-等待开始
[娱乐者-2]-等待开始
[娱乐者-1]-等待结束
[娱乐者-0]-等待结束
Disconnected from the target VM, address: '127.0.0.1:63717', transport: 'socket'
[娱乐者-2]-等待结束
java.util.concurrent.TimeoutException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:427)
    at so.strong.mall.concurrent.CyclicBarrierDemo$1.run(CyclicBarrierDemo.java:21)
    at java.lang.Thread.run(Thread.java:745)

 

CyclicBarrier还有一个特点是可以进行重置处理

范例:重置处理

package so.strong.mall.concurrent;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;


public class CyclicBarrierResetDemo {
    public static void main(String[] args) throws Exception {
        final CyclicBarrier cb = new CyclicBarrier(2); //当凑够2个线程就进行触发
        for (int i = 0; i < 3; i++) {
            final int second = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("[" + Thread.currentThread().getName() + "]-等待开始");
                    try {
                        if (second == 2) {
                            cb.reset(); //重置
                            System.out.println("[重置处理****]" + Thread.currentThread().getName());
                        } else {
                            TimeUnit.SECONDS.sleep(second);
                            cb.await(6,TimeUnit.SECONDS);//等待处理
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println("[" + Thread.currentThread().getName() + "]-等待结束");
                }
            }, "娱乐者-" + i).start();
        }
    }
}

[娱乐者-0]-等待开始
[娱乐者-1]-等待开始
[娱乐者-2]-等待开始
[重置处理****]娱乐者-2
[娱乐者-2]-等待结束
[娱乐者-1]-等待结束
[娱乐者-0]-等待结束

 

CountDownLatch与CyclicBarrier的区别

  • CountDownLatch最大的特征是进行一个数据减法的操作等待,所有的统计操作一旦开始之中就必须执行countDown()方法,如果等待个数不是0,就被一只等待,并且无法重置。
  • CyclicBarrier设置一个等待的临界点,并且可以有多个等待线程出现,只要满足了临界点就触发了线程的执行代码后将重新开始进行计数处理操作,也可以直接利用reset()方法执行重置操作。

 

Phaser

Phaser个人感觉兼具了CountDownLatch与CyclicBarrier的功能,并提供了分阶段的能力。

实现分阶段的CyclicBarrier的功能

测试代码:

package concurrent;

import concurrent.thread.PhaserThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.Phaser;

/**
 * 拿客
 * 网站:www.coderknock.com
 * QQ群:213732117
 * 三产 创建于 2016年08月08日 21:25:30。
 */
public class PhaserTest {

    private static final Logger logger = LogManager.getLogger(PhaserTest.class);

    public static void main(String[] args) {
        Phaser phaser = new Phaser() {
            /**此方法有2个作用:
             * 1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。
             * 2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。例如:若此方法返回值为 phase>=3,其含义为当整个线程执行了4个阶段后,程序终止。
             * */
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                logger.debug("阶段--->" + phase);
                logger.debug("注册的线程数量--->" + registeredParties);
                return super.onAdvance(phase, registeredParties);
            }
        };

        for (int i = 3; i > 0; i--) {
            new PhaserThread("第" + i + "个", phaser).start();
        }
    }
}

线程代码:

package concurrent.thread;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Random;
import java.util.concurrent.Phaser;

/**
 * 拿客
 * 网站:www.coderknock.com
 * QQ群:213732117
 * 三产 创建于 2016年08月08日 21:16:55。
 */
public class PhaserThread extends Thread {

    private Phaser phaser;

    private static final Logger logger = LogManager.getLogger(PhaserThread.class);

    public PhaserThread(String name, Phaser phaser) {
        super(name);
        this.phaser = phaser;
        //把当前线程注册到Phaser
        this.phaser.register();
        logger.debug("name为" + name + "的线程注册了" + this.phaser.getRegisteredParties() + "个线程");
    }

    @Override
    public void run() {
        logger.debug("进入...");
        phaser.arrive();
        for (int i = 6; i > 0; i--) {
            int time = new Random().nextInt(5);
            try {
                logger.debug("睡眠" + time + "秒");
                sleep(time * 1000);
                if (i == 1) {
                    logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties());
                    logger.debug("最后一次触发,并注销自身");
                    phaser.arriveAndDeregister();
                    logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties());
                } else {
                    logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties());
                    logger.debug(i + "--->触发并阻塞...");
                    phaser.arriveAndAwaitAdvance();//相当于CyclicBarrier.await();
                    logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties());
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        logger.debug("注销完成之后注册的线程数量--->" + phaser.getRegisteredParties());
    }
}

执行结果:

 [main] - name为第3个的线程注册了1个线程
 [main] - name为第2个的线程注册了2个线程
 [main] - name为第1个的线程注册了3个线程
 [第3个] - 进入...
 [第2个] - 进入...
 [第3个] - 睡眠2秒
 [第2个] - 睡眠1秒
 [第1个] - 进入...
 [第1个] - 阶段--->0
 [第1个] - 注册的线程数量--->3
 [第1个] - 睡眠4秒
 [第2个] - 未完成的线程数量:3
 [第2个] - 6--->触发并阻塞...
 [第3个] - 未完成的线程数量:2
 [第3个] - 6--->触发并阻塞...
 [第1个] - 未完成的线程数量:1
 [第1个] - 6--->触发并阻塞...
 [第1个] - 阶段--->1
 [第1个] - 注册的线程数量--->3
 [第1个] - 未完成的线程数量:3
 [第3个] - 未完成的线程数量:3
 [第2个] - 未完成的线程数量:3
 [第1个] - 睡眠1秒
 [第3个] - 睡眠0秒
 [第2个] - 睡眠4秒
 [第3个] - 未完成的线程数量:3
 [第3个] - 5--->触发并阻塞...
 [第1个] - 未完成的线程数量:2
 [第1个] - 5--->触发并阻塞...
 [第2个] - 未完成的线程数量:1
 [第2个] - 5--->触发并阻塞...
 [第2个] - 阶段--->2
 [第2个] - 注册的线程数量--->3
 [第2个] - 未完成的线程数量:3
 [第3个] - 未完成的线程数量:3
 [第1个] - 未完成的线程数量:3
 [第2个] - 睡眠0秒
 [第3个] - 睡眠2秒
 [第2个] - 未完成的线程数量:3
 [第1个] - 睡眠2秒
 [第2个] - 4--->触发并阻塞...
 [第3个] - 未完成的线程数量:2
 [第1个] - 未完成的线程数量:2
 [第3个] - 4--->触发并阻塞...
 [第1个] - 4--->触发并阻塞...
 [第1个] - 阶段--->3
 [第1个] - 注册的线程数量--->3
 [第1个] - 未完成的线程数量:3
 [第3个] - 未完成的线程数量:3
 [第2个] - 未完成的线程数量:3
 [第1个] - 睡眠2秒
 [第3个] - 睡眠1秒
 [第2个] - 睡眠4秒
 [第3个] - 未完成的线程数量:3
 [第3个] - 3--->触发并阻塞...
 [第1个] - 未完成的线程数量:2
 [第1个] - 3--->触发并阻塞...
 [第2个] - 未完成的线程数量:1
 [第2个] - 3--->触发并阻塞...
 [第2个] - 阶段--->4
 [第2个] - 注册的线程数量--->3
 [第2个] - 未完成的线程数量:3
 [第3个] - 未完成的线程数量:3
 [第1个] - 未完成的线程数量:3
 [第2个] - 睡眠2秒
 [第1个] - 睡眠2秒
 [第3个] - 睡眠4秒
 [第2个] - 未完成的线程数量:3
 [第1个] - 未完成的线程数量:3
 [第2个] - 2--->触发并阻塞...
 [第1个] - 2--->触发并阻塞...
 [第3个] - 未完成的线程数量:1
 [第3个] - 2--->触发并阻塞...
 [第3个] - 阶段--->5
 [第3个] - 注册的线程数量--->3
 [第3个] - 未完成的线程数量:3
 [第1个] - 未完成的线程数量:3
 [第2个] - 未完成的线程数量:3
 [第3个] - 睡眠2秒
 [第1个] - 睡眠3秒
 [第2个] - 睡眠0秒
 [第2个] - 未完成的线程数量:3
 [第2个] - 最后一次触发,并注销自身
 [第2个] - 未完成的线程数量:2
 [第2个] - 注销完成之后注册的线程数量--->2
 [第3个] - 未完成的线程数量:2
 [第3个] - 最后一次触发,并注销自身
 [第3个] - 未完成的线程数量:1
 [第3个] - 注销完成之后注册的线程数量--->1
 [第1个] - 未完成的线程数量:1
 [第1个] - 最后一次触发,并注销自身
 [第1个] - 阶段--->6
 [第1个] - 注册的线程数量--->0
 [第1个] - 未完成的线程数量:0
 [第1个] - 注销完成之后注册的线程数量--->0

上面代码中,当所有线程进行到arriveAndAwaitAdvance()时会触发计数并且将线程阻塞,等计数数量等于注册线程数量【即所有线程都执行到了约定的地方时,会放行,是所有线程得以继续执行,并触发onAction事件】。我们可以在onAction中根据不同阶段执行不同内容的操作。

实现分阶段的CountDownLatch的功能

只需将上面的测试类更改如下:

package concurrent;

import concurrent.thread.PhaserThread;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.Phaser;

import static jodd.util.ThreadUtil.sleep;

/**
 * 拿客
 * 网站:www.coderknock.com
 * QQ群:213732117
 * 三产 创建于 2016年08月08日 21:25:30。
 */
public class PhaserTest {

    private static final Logger logger = LogManager.getLogger(PhaserTest.class);

    public static void main(String[] args) {
        //这里其实相当于已经注册了3个线程,但是并没有实际的线程
        int coutNum=3;
        Phaser phaser = new Phaser(coutNum) {
            /**此方法有2个作用:
             * 1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。
             * 2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。例如:若此方法返回值为 phase>=3,其含义为当整个线程执行了4个阶段后,程序终止。
             * */
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                logger.debug("阶段--->" + phase);
                logger.debug("注册的线程数量--->" + registeredParties);
                return registeredParties==coutNum;//当后只剩下coutNum个线程时说明所有真实的注册的线程已经运行完成,测试可以终止Phaser
            }
        };

        for (int i = 3; i > 0; i--) {
            new PhaserThread("第" + i + "个", phaser).start();
        }

        //当phaser未终止时循环注册这块儿可以使用实际的业务处理
        while (!phaser.isTerminated()) {
            sleep(1000);
            logger.debug("触发一次");
            phaser.arrive(); //相当于countDownLatch.countDown();
        }
    }
}
You can leave a response, or trackback from your own site.

Leave a Reply

网站地图xml地图