歡迎您光臨本站 註冊首頁

詳細分析JAVA 線程池

←手機掃碼閱讀     sl_ivan @ 2020-06-21 , reply:0

系統啟動一個新線程的成本是比較高的,因為它涉及與操作系統交互。在這種情形下,使用線程池可以很好地提高性能,尤其是當程序中需要創建大量生存期很短暫的線程時,更應該考慮使用線程池。

與數據庫連接池類似的是,線程池在系統啟動時即創建大量空閒的線程,程序將一個 Runnable 對象或 Callable 對象傳給線程池,線程池就會啟動一個線程來執行它們的 run() 或 call() 方法,當 run() 或 call() 方法執行結束後,該線程並不會死亡,而是再次返回線程池成為空閒狀態,等待執行下一個 Runnable 對象的 run() 或 call() 方法。

除此之外,使用線程池可以有效地控制系統中併發線程的數量,當系統中包含大量併發線程時,會導致系統性能劇烈下降,甚至導致 JVM 崩潰,而線程池的最大線程數參數可以控制系統中併發線程數不超過此數。

Java 8 改進的線程池

在 Java 5 以前,開發者必須手動實現自己的線程池;從 Java 5 開始, Java 內建支持線程池。 Java 5 新增了一個 Executors 工廠類來產生線程池,該工廠類包含如下幾個靜態工廠方法來創建線程池。

  • newCachedThreadPool():創建一個具有緩存功能的線程池,系統根據需要創建線程,這些線程將會被緩存在線程池中。

  • newFixedThreadPool(int nThreads):創建一個可重用的、具有固定線程數的線程池。

  • newSingleThreadExecutor():創建一個只有單線程的線程池,它相當於調用 newFixedThreadPool() 方法時傳入參數為1。

  • newScheduledThreadPool(int corePoolSize):創建具有指定線程數的線程池,它可以在指定延遲後執行線程任務。 corePoolSize 指池中所保存的線程數,即使線程是空閒的也被保存在線程池內。

  • newSingleThreadScheduledExecutor():創建只有一個線程的線程池,它可以在指定延遲後執行線程任務。

  • ExecutorService newWorkStealingPool(int parallelism):創建持有足夠的線程的線程池來支持給定的並行級別,該方法還會使用多個隊列來減少競爭。

  • ExecutorService newWorkStealingPool():該方法是前一個方法的簡化版本。如果當前機器有 4 個CPU, 則目標並行級別被設置為 4,也就是相當於為前一個方法傳入 4 作為參數。

上面7個方法中的前三個方法返回一個 ExecutorService 對象,該對象代表一個線程池,它可以執行 Runnable 對象或 Callable 對象所代表的線程;而中間兩個方法返回一個 ScheduledExecutorService 線程池,它是 ExecutorService 的子類,它可以在指定延遲後執行線程任務;最後兩個方法則是 Java 8 新增的,這兩個方法可充分利用多 CPU 並行的能力。這兩個方法生成的 work stealing 池,都相當於後臺線程池,如果所有的前臺線程都死亡了,work stealing 池中的線程會自動死亡。

由於目前計算機硬件的發展日新月異,即使普通用戶使用的電腦通常也都是多核 CPU,因此 Java 8 在線程支持上也增加了利用多 CPU 並行的能力,這樣可以更好地發揮底層硬件的性能。

ExecutorService 代表儘快執行線程的線程池(只要線程池中有空閒線程,就立即執行線程任務),程序只要將一個 Runnable 對象或 Callable 對象(代表線程任務)提交給該線程池,該線程池就會盡快執行該任務。 ExecutorService 裡提供瞭如下三個方法。

  • Future

      submit(Runnable task):將一個 Runnable 對象提交給指定的線程池,線程池將在有空閒線程時執行 Runnable 對象代表的任務。其中 Future 對象代表 Runnable 任務的返回值,但 run() 方法沒有返回值,所以 Future 對象將在 run() 方法執行結束後返回 null 。但可以調用 Future 的 isDone()、 isCancelled() 方法來獲得 Runnable 對象的執行狀態。
  • Future  submit(Runnable task,  T result):將一個 Runnable 對象提交給指定的線程池,線程池將在有空閒線程時執行 Runnable 對象代表的任務。其中 result 顯式指定線程執行結束後的返回值,所以 Future 對象將在 run() 方法執行結束後返回 result 。
  • Future  submit(Callable   task):將一個 Callable 對象提交給指定的線程池,線程池將在有空閒線程時執行 Callable 對象代表的任務。其中 Future 代表 Callable 對象裡 call() 方法的返回值。
       

ScheduledExecutorService 代表可在指定延遲後或週期性地執行線程任務的線程池,它提供瞭如下4個方法。

  • ScheduledFuture

      schedule (Callable  callable,  long delay, TimeUnit unit):指定 callable 任務將在 delay 延遲後執行。
  • ScheduledFuture

      schedule(Runnable command , long delay , TimeUnit unit):指定 command 任務將在 delay 延遲後執行。
  • ScheduledFuture

      scheduleAtFixedRate(Runnable command,  long initialDelay,  long period , TimeUnit unit) : 指定 command 任務將在 delay 延遲後執行,而且以設定頻率重複執行。也就是說,在 initialDelay 後開始執行,依次在 initialDelay + period 、 initialDelay +2* period …處重複執行,依此類推。
  • ScheduledFuture

       scheduleWithFixedDelay(Runnable command,  long initialDelay,  long delay, TimeUnit unit):創建並執行一個在給定初始延遲後首次啟用的定期操作,隨後在每一次執行終止和下一次執行開始之間都存在給定的延遲。如果任務在任一次執行時遇到異常,就會取消後續執行;否則,只能通過程序來顯式取消或終止該任務。
       

用完一個線程池後,應該調用該線程池的 shutdown() 方法,該方法將啟動線程池的關閉序列,調用 shutdown() 方法後的線程池不再接收新任務,但會將以前所有已提交任務執行完成。當線程池中的所有任務都執行完成後,池中的所有線程都會死亡;另外也可以調用線程池的 shutdownNow() 方法來關閉線程池,該方法試圖停止所有正在執行的活動任務,暫停處理正在等待的任務,並返回等待執行的任務列表。

使用線程池來執行線程任務的步驟如下。

  • 調用 Executors 類的靜態工廠方法創建一個 ExecutorService 對象,該對象代表一個線程池。

  • 創建 Runnable 實現類或 Callable 實現類的實例,作為線程執行任務。

  • 調用 ExecutorService 對象的 submit() 方法來提交 Runnable 實例或 Callable 實例。

  • 當不想提交任何任務時,調用 ExecutorService 對象的 shutdown() 方法來關閉線程池。

下面程序使用線程池來執行指定 Runnable 對象所代表的任務。

  //實現Runnable接口來定義一個簡單的  class TestThread implements Runnable{   public void run(){   for (int i = 0; i < 100 ; i++ ){    System.out.println(Thread.currentThread().getName()    + "的i值為:" + i);   }   }  }    public class ThreadPoolTest{   public static void main(String[] args) {   //創建一個具有固定線程數(6)的線程池   ExecutorService pool = Executors.newFixedThreadPool(6);   //向線程池中提交2個線程   pool.submit(new TestThread());   pool.submit(new TestThread());   //關閉線程池   pool.shutdown();   }  }

 

上面程序中創建 Runnable 實現類與最開始創建線程池並沒有太大差別,創建了 Runnable 實現類之後程序沒有直接創建線程、啟動線程來執行該 Runnable 任務,而是通過線程池來執行該任務,使用線程池來執行 Runnable 任務的代碼如程序中粗體字代碼所示。運行上面程序,將看到兩個線程交替執行的效果,如下圖所示。

Java 8  增強的 ForkJoinPool

現在計算機大多已向多 CPU 方向發展,即使普通 PC ,甚至小型智能設備(如手機)、多核處理器也已被廣泛應用。在未來的日子裡,處理器的核心數將會發展到更多。

雖然硬件上的多核 CPU 已經十分成熟,但很多應用程序並未為這種多核 CPU 做好準備,因此並不能很好地利用多核 CPU 的性能優勢。

為了充分利用多 CPU 、多核 CPU 的性能優勢,計算機軟件系統應該可以充分“挖掘”每個 CPU 的計算能力,絕不能讓某個 CPU 處於“空閒”狀態。為了充分利用多 CPU 、多核 CPU 的優勢,可以考慮把一個任務拆分成多個“小任務”,把多個“小任務”放到多個處理器核心上並行執行;當多個“小任務”執行完成之後,再將這些執行結果合併起來即可。

Java 7 提供了 ForkJoinPool 來支持將一個任務拆分成多個“小任務”並行計算,再把多個“小任務”的結果合併成總的計算結果。 ForkJoinPool 是 ExecutorService 的實現類,因此是一種特殊的線程池。ForkJoinPool 提供瞭如下兩個常用的構造器。

  • ForkJoinPool(int parallelism):創建一個包含 parallelism 個並行線程的 ForkJoinPool 。

  • ForkJoinPool():以 Runtime.availableProcessors() 方法的返回值作為 parallelism 參數來創建 ForkJoinPool

Java 8 進一步擴展了 ForkJoinPool 的功能 ,Java 8 為 ForkJoinPool 增加了通用池功能。 ForkJoinPool 類通過如下兩個靜態方法提供通用池功能。

  • ForkJoinPool commonPool():該方法返回一個通用池,通用池的運行狀態不會受 shutdown() 或 shutdownNow() 方法的影響。當然,如果程序直接執行 System.exit(0); 來終止虛擬機,通用池以及通用池中正在執行的任務都會被自動終止。

  • int getCommonPoolParallelism():該方法返回通用池的並行級別。

創建了 ForkJoinPool 實例之後,就可調用 ForkJoinPool 的 submit(ForkJoinTask task) 或 invoke(ForkJoinTask task) 方法來執行指定任務了。其中 ForkJoinTask 代表一個可以並行、合併的任務。ForkJoinTask 是一個抽象類,它還有兩個抽象子類 : RecursiveAction 和 RecursiveTask 。其中 RecursiveTask 代表有返回值的任務,而 RecursiveAction 代表沒有返回值的任務。

下面以執行沒有返回值的“大任務”(簡單地打印0〜300的數值)為例,程序將一個“大任務”拆分成多個“小任務”,並將任務交給 ForkJoinPool 來執行。

  import java.util.concurrent.ForkJoinPool;  import java.util.concurrent.RecursiveAction;  import java.util.concurrent.TimeUnit;    class PrintTask extends RecursiveAction{   // 每個“小任務”最多隻打印50個數   private static final int THRESHOLD = 50;   private int start;   private int end;      // 打印從 start 到 end 的任務   public PrintTask(int start, int end) {   this.start = start;   this.end = end;   }      @Override   protected void compute() {   // 當 end 與 start 之間的差小於 THRESHOLD 時,開始打印   if(end-start<THRESHOLD) {    for(int i=start;i<end;i++) {    System.out.println(Thread.currentThread().getName()+"的 i 值:"+i);    }   }else {    // 當 end 與 start 之間的差大於 THRESHOLD 時,即要打印的數超過50個時    // 將大任務分解成兩個“小任務”    int middle = (start+end)/2;    PrintTask left = new PrintTask(start, middle);    PrintTask right = new PrintTask(middle, end);    // 並行執行兩個“小任務”    left.fork();    right.fork();   }   }  }    public class ForkJoinPoolTest {   public static void main(String[] args) throws Exception {   ForkJoinPool pool = new ForkJoinPool();   // 提交可分解的 PrintTask 任務   pool.submit(new PrintTask(0, 300));   pool.awaitTermination(2, TimeUnit.SECONDS);   // 關閉線程池   pool.shutdown();   }  }

上面程序中的粗體字代碼實現了對指定打印任務的分解,分解後的任務分別調用 fork() 方法開始並行執行。運行上面程序,可以看到如下圖所示的結果。

從如上圖所示的執行結果來看, ForkJoinPool 啟動了 4個線程來執行這個打印任務――這是因為測試計算機的 CPU 是4核的。不僅如此,讀者可以看到程序雖然打印了 0〜299這300個數字,但並不是連續打印的,這是因為程序將這個打印任務進行了分解,分解後的任務會並行執行,所以不會按順序從0打印到299。

上面定義的任務是一個沒有返回值的打印任務,如果大任務是有返回值的任務,則可以讓任務繼承 RecursiveTask,其中泛型參數 T 就代表了該任務的返回值類型。下面程序示範了使用 RecursiveTask 對一個長度為100的數組的元素值進行累加。

  package com.jwen.demo4;    import java.util.Random;  import java.util.concurrent.ForkJoinPool;  import java.util.concurrent.Future;  import java.util.concurrent.RecursiveTask;  import java.util.function.Function;    class CalTask extends RecursiveTask{        // 每個“小任務”最多隻累加20個數   private static final int THRESHOLD = 20;   private int arr[];   private int start;   private int end;   // 累加從 start 到 end 的數組元素   public CalTask(int[] arr, int start, int end) {   this.arr = arr;   this.start = start;   this.end = end;   }      @Override   protected Integer compute() {   int sum = 0;   // 當 end 與 start 之間的差小於 THRESHOLD 時,開始進行實際累加   if(end-start<THRESHOLD) {    for(int i=start;i<end;i++) {    sum+=arr[i];    }    return sum;   }else {    // 當 end 與 start 之間的差大於 THRESHOLD 時,即要累加的數超過20個時    // 將大任務分解成兩個“小任務”    int middle = (start+end)/2;    CalTask left = new CalTask(arr, start, middle);    CalTask right = new CalTask(arr, middle, end);    // 並行執行兩個“小任務”    left.fork();    right.fork();    // 把兩個“小任務”累加的結果合併起來    return left.join()+right.join(); // ①   }   }  }    public class Sum {   public static void main(String[] args) throws Exception{   int[] arr = new int[100];   Random rand = new Random();   int total = 0;   // 初始化100個數字元素   for(int i=0,len = arr.length;i<len;i++) {    int tmp = rand.nextInt(20);    // 對數組元素賦值,並將數組元素的值添加到 sum 總和中    total +=(arr[i]=tmp);   }   System.out.println(total);   // 創建一個通用池   ForkJoinPool pool = ForkJoinPool.commonPool();   // 提交可分解的 CalTask 任務   Futurefuture = pool.submit(new CalTask(arr, 0, arr.length));   System.out.println(future.get());   // 關閉線程池   pool.shutdown();   }  }

 

上面程序與前一個程序基本相似,同樣是將任務進行了分解,並調用分解後的任務的 fork() 方法使它們並行執行。與前一個程序不同的是,現在任務是帶返回值的,因此程序還在①號代碼處將兩個分解後的“小任務”的返回值進行了合併。

運行上面程序,將可以看到程序通過 CalTask 計算出來的總和,與初始化數組元素時統計出來的總和總是相等,這表明程序一切正常。

Java 的確是一門非常優秀的編程語言,在多 CPU、多核 CPU 時代來到時,Java 語言的多線程已經為多核 CPU 做好了準備。

               

   


[sl_ivan ] 詳細分析JAVA 線程池已經有249次圍觀

http://coctec.com/docs/java/show-post-239372.html