概念

该工具类可以做到让一组线程到达一个屏障点的时候被阻塞,直到最后一个线程到达才开启屏障,继续往下执行。CyclicBarrier默认的构造方法是一个CyclicBarrier(int parties)。参数parties表示屏障需要拦截的线程数量。每个线程都会去调用await()方法通知道CyclicBarrier我已经到达屏障了。然后当前这个线程被阻塞,一直到所有的线程都到达屏障。另外CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrier-Action),用于在线程到达屏障时,优先执行barrierAction中的业务代码,方便处理更复杂的业务场景需求。

应用场景

比如开会,要所有人都到齐了才能开会

代码示例

import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;

/**
 * @Author: BillYu
 * @Description:CyclicBarrier的使用
 * 让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程达到屏障时,屏障开放,所有被阻塞的线程会继续运行
 * @Date: Created in 15:11 2019-02-26.
 */
public class UseCyclicBarrier {

    private static CyclicBarrier barrier = new CyclicBarrier(5,new CollectThread());

    /**
     * 存放子线程工作结果的容器
     */
    private static ConcurrentHashMap<String,Long> resultMap = new ConcurrentHashMap<>();

    public static void main(String[] args) {
        for (int i=0;i<=4;i++){
            Thread thread = new Thread(new SubThread());
            thread.start();
        }
    }

    /**
     * 负责屏障开放以后的工作
     *
     * 所有线程到达await()后执行
     */
    private static class CollectThread implements Runnable{

        @Override
        public void run() {
            StringBuilder result = new StringBuilder();
            for (Map.Entry<String,Long> workResult: resultMap.entrySet()){
                result.append("["+workResult.getValue()+"]");
            }
            System.out.println("the result = "+result);
            System.out.println("do other business...");
        }
    }


    /**
     * 工作线程
     */
    private static class SubThread implements Runnable{
        @Override
        public void run() {
            //线程本身的处理结果
            long id = Thread.currentThread().getId();
            resultMap.put(String.valueOf(id),id);
            //随机决定工作线程是否睡眠
            Random r = new Random();
            try {
                if(r.nextBoolean()){
                    Thread.sleep(1000+id);
                    System.out.println("Thread_"+id+" ...do something ");
                }
                System.out.println(id+"...is await");
                //等待所有线程同时达到await的位置,才能同时往下执行
                barrier.await();
                Thread.sleep(1000+id);
                System.out.println("Thread_"+id+" ...do its business");
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }


}