歡迎您光臨本站 註冊首頁

前言

CountDownLatch和CyclicBarrier兩個同為java併發編程的重要工具類,它們在諸多多線程併發或並行場景中得到了廣泛的應用。但兩者就其內部實現和使用場景而言是各有所側重的。

內部實現差異

前者更多依賴經典的AQS機制和CAS機制來控制器內部狀態的更迭和計數器本身的變化,而後者更多依靠可重入Lock等機制來控制其內部併發安全性和一致性。

   public class {     //Synchronization control For CountDownLatch.     //Uses AQS state to represent count.    private static final class Sync extends AbstractQueuedSynchronizer {      private static final long serialVersionUID = 4982264981922014374L;        Sync(int count) {        setState(count);      }        int getCount() {        return getState();      }        protected int tryAcquireShared(int acquires) {        return (getState() == 0) ? 1 : -1;      }        protected boolean tryReleaseShared(int releases) {        // Decrement count; signal when transition to zero        for (;;) {          int c = getState();          if (c == 0)            return false;          int nextc = c-1;          if (compareAndSetState(c, nextc))            return nextc == 0;        }      }    }      private final Sync sync;    ... ...//   }

 

   public class CyclicBarrier {    /**     * Each use of the barrier is represented as a generation instance.     * The generation changes whenever the barrier is tripped, or     * is reset. There can be many generations associated with threads     * using the barrier - due to the non-deterministic way the lock     * may be allocated to waiting threads - but only one of these     * can be active at a time (the one to which {@code count} applies)     * and all the rest are either broken or tripped.     * There need not be an active generation if there has been a break     * but no subsequent reset.     */    private static class Generation {      boolean broken = false;    }      /** The lock for guarding barrier entry */    private final ReentrantLock lock = new ReentrantLock();    /** Condition to wait on until tripped */    private final Condition trip = lock.newCondition();    /** The number of parties */    private final int parties;    /* The command to run when tripped */    private final Runnable barrierCommand;    /** The current generation */    private Generation generation = new Generation();      /**     * Number of parties still waiting. Counts down from parties to 0     * on each generation. It is reset to parties on each new     * generation or when broken.     */    private int count;      /**     * Updates state on barrier trip and wakes up everyone.     * Called only while holding lock.     */    private void nextGeneration() {      // signal completion of last generation      trip.signalAll();      // set up next generation      count = parties;      generation = new Generation();    }      /**     * Sets current barrier generation as broken and wakes up everyone.     * Called only while holding lock.     */    private void breakBarrier() {      generation.broken = true;      count = parties;      trip.signalAll();    }      /**     * Main barrier code, covering the various policies.     */    private int dowait(boolean timed, long nanos)      throws InterruptedException, BrokenBarrierException,          TimeoutException {      final ReentrantLock lock = this.lock;      lock.lock();      try {        final Generation g = generation;          if (g.broken)          throw new BrokenBarrierException();          if (Thread.interrupted()) {          breakBarrier();          throw new InterruptedException();        }          int index = --count;        if (index == 0) { // tripped          boolean ranAction = false;          try {            final Runnable command = barrierCommand;            if (command != null)              command.run();            ranAction = true;            nextGeneration();            return 0;          } finally {            if (!ranAction)              breakBarrier();          }        }          // loop until tripped, broken, interrupted, or timed out        for (;;) {          try {            if (!timed)              trip.await();            else if (nanos > 0L)              nanos = trip.awaitNanos(nanos);          } catch (InterruptedException ie) {            if (g == generation && ! g.broken) {              breakBarrier();              throw ie;            } else {              // We're about to finish waiting even if we had not              // been interrupted, so this interrupt is deemed to              // "belong" to subsequent execution.              Thread.currentThread().interrupt();            }          }            if (g.broken)            throw new BrokenBarrierException();            if (g != generation)            return index;            if (timed && nanos <= 0L) {            breakBarrier();            throw new TimeoutException();          }        }      } finally {        lock.unlock();      }    }    ... ... //   }

 

實戰 - 展示各自的使用場景

  /**   *類說明:共5個初始化子線程,6個閉鎖釦除點,扣除完畢後,主線程和業務線程才能繼續執行   */  public class UseCountDownLatch {        static CountDownLatch latch = new CountDownLatch(6);      /*初始化線程*/    private static class InitThread implements Runnable{        public void run() {        System.out.println("Thread_"+Thread.currentThread().getId()           +" ready init work......");        latch.countDown();        for(int i =0;i<2;i++) {          System.out.println("Thread_"+Thread.currentThread().getId()             +" ........continue do its work");        }      }    }      /*業務線程等待latch的計數器為0完成*/    private static class BusiThread implements Runnable{        public void run() {        try {          latch.await();        } catch (InterruptedException e) {          e.printStackTrace();        }        for(int i =0;i<3;i++) {          System.out.println("BusiThread_"+Thread.currentThread().getId()             +" do business-----");        }      }    }      public static void main(String[] args) throws InterruptedException {      new Thread(new Runnable() {        public void run() {          SleepTools.ms(1);          System.out.println("Thread_"+Thread.currentThread().getId()             +" ready init work step 1st......");          latch.countDown();          System.out.println("begin step 2nd.......");          SleepTools.ms(1);          System.out.println("Thread_"+Thread.currentThread().getId()             +" ready init work step 2nd......");          latch.countDown();        }      }).start();      new Thread(new BusiThread()).start();      for(int i=0;i<=3;i++){        Thread thread = new Thread(new InitThread());        thread.start();      }      latch.await();      System.out.println("Main do ites work........");    }  }
  /**   *類說明:共4個子線程,他們全部完成工作後,交出自己結果,   *再被統一釋放去做自己的事情,而交出的結果被另外的線程拿來拼接字符串   */  class UseCyclicBarrier {    private static CyclicBarrier barrier        = new CyclicBarrier(4,new CollectThread());      //存放子線程工作結果的容器    private static ConcurrentHashMapresultMap        = new ConcurrentHashMap();      public static void main(String[] args) {      for(int i=0;i<4;i++){        Thread thread = new Thread(new SubThread());        thread.start();      }      }      /*彙總的任務*/    private static class CollectThread implements Runnable{        @Override      public void run() {        StringBuilder result = new StringBuilder();        for(Map.EntryworkResult:resultMap.entrySet()){          result.append("["+workResult.getValue()+"]");        }        System.out.println(" the result = "+ result);        System.out.println("do other business........");      }    }      /*相互等待的子線程*/    private static class SubThread implements Runnable{      @Override      public void run() {        long id = Thread.currentThread().getId();        resultMap.put(Thread.currentThread().getId()+"",id);        try {            Thread.sleep(1000+id);            System.out.println("Thread_"+id+" ....do something ");          barrier.await();          Thread.sleep(1000+id);          System.out.println("Thread_"+id+" ....do its business ");          barrier.await();        } catch (Exception e) {          e.printStackTrace();        }      }    }  }

 

 兩者總結

1. Cyclicbarrier結果彙總的Runable線程可以重複被執行,通過多次觸發await()方法,countdownlatch可以調用await()方法多次;cyclicbarrier若沒有結果彙總,則調用一次await()就夠了;

2. New cyclicbarrier(threadCount)的線程數必須與實際的用戶線程數一致;

3. 協調線程同時運行:countDownLatch協調工作線程執行,是由外面線程協調;cyclicbarrier是由工作線程之間相互協調運行;

4. 從構造函數上看出:countDownlatch控制運行的計數器數量和線程數沒有關係;cyclicbarrier構造中傳入的線程數等於實際執行線程數;

5. countDownLatch在不能基於執行子線程的運行結果做處理,而cyclicbarrier可以;

6. 就使用場景而言,countdownlatch 更適用於框架加載前的一系列初始化工作等場景; cyclicbarrier更適用於需要多個用戶線程執行後,將運行結果彙總再計算等典型場景;


[niceskyabc ] 詳解java CountDownLatch和CyclicBarrier在內部實現和場景上的區別已經有230次圍觀

http://coctec.com/docs/java/show-post-237817.html