系統啟動一個新線程的成本是比較高的,因為它涉及與操作系統交互。在這種情形下,使用線程池可以很好地提高性能,尤其是當程序中需要創建大量生存期很短暫的線程時,更應該考慮使用線程池。
與數據庫連接池類似的是,線程池在系統啟動時即創建大量空閒的線程,程序將一個 Runnable 對象或 Callable 對象傳給線程池,線程池就會啟動一個線程來執行它們的 run() 或 call() 方法,當 run() 或 call() 方法執行結束後,該線程並不會死亡,而是再次返回線程池成為空閒狀態,等待執行下一個 Runnable 對象的 run() 或 call() 方法。
除此之外,使用線程池可以有效地控制系統中併發線程的數量,當系統中包含大量併發線程時,會導致系統性能劇烈下降,甚至導致 JVM 崩潰,而線程池的最大線程數參數可以控制系統中併發線程數不超過此數。
Java 8 改進的線程池
在 Java 5 以前,開發者必須手動實現自己的線程池;從 Java 5 開始, Java 內建支持線程池。 Java 5 新增了一個 Executors 工廠類來產生線程池,該工廠類包含如下幾個靜態工廠方法來創建線程池。
newCachedThreadPool():創建一個具有緩存功能的線程池,系統根據需要創建線程,這些線程將會被緩存在線程池中。
newFixedThreadPool(int nThreads):創建一個可重用的、具有固定線程數的線程池。
newSingleThreadExecutor():創建一個只有單線程的線程池,它相當於調用 newFixedThreadPool() 方法時傳入參數為1。
newScheduledThreadPool(int corePoolSize):創建具有指定線程數的線程池,它可以在指定延遲後執行線程任務。 corePoolSize 指池中所保存的線程數,即使線程是空閒的也被保存在線程池內。
newSingleThreadScheduledExecutor():創建只有一個線程的線程池,它可以在指定延遲後執行線程任務。
ExecutorService newWorkStealingPool(int parallelism):創建持有足夠的線程的線程池來支持給定的並行級別,該方法還會使用多個隊列來減少競爭。
ExecutorService newWorkStealingPool():該方法是前一個方法的簡化版本。如果當前機器有 4 個CPU, 則目標並行級別被設置為 4,也就是相當於為前一個方法傳入 4 作為參數。
上面7個方法中的前三個方法返回一個 ExecutorService 對象,該對象代表一個線程池,它可以執行 Runnable 對象或 Callable 對象所代表的線程;而中間兩個方法返回一個 ScheduledExecutorService 線程池,它是 ExecutorService 的子類,它可以在指定延遲後執行線程任務;最後兩個方法則是 Java 8 新增的,這兩個方法可充分利用多 CPU 並行的能力。這兩個方法生成的 work stealing 池,都相當於後臺線程池,如果所有的前臺線程都死亡了,work stealing 池中的線程會自動死亡。
由於目前計算機硬件的發展日新月異,即使普通用戶使用的電腦通常也都是多核 CPU,因此 Java 8 在線程支持上也增加了利用多 CPU 並行的能力,這樣可以更好地發揮底層硬件的性能。
ExecutorService 代表儘快執行線程的線程池(只要線程池中有空閒線程,就立即執行線程任務),程序只要將一個 Runnable 對象或 Callable 對象(代表線程任務)提交給該線程池,該線程池就會盡快執行該任務。 ExecutorService 裡提供瞭如下三個方法。
Future
submit(Runnable task):將一個 Runnable 對象提交給指定的線程池,線程池將在有空閒線程時執行 Runnable 對象代表的任務。其中 Future 對象代表 Runnable 任務的返回值,但 run() 方法沒有返回值,所以 Future 對象將在 run() 方法執行結束後返回 null 。但可以調用 Future 的 isDone()、 isCancelled() 方法來獲得 Runnable 對象的執行狀態。ScheduledExecutorService 代表可在指定延遲後或週期性地執行線程任務的線程池,它提供瞭如下4個方法。
ScheduledFuture
ScheduledFuture
schedule(Runnable command , long delay , TimeUnit unit):指定 command 任務將在 delay 延遲後執行。ScheduledFuture
scheduleAtFixedRate(Runnable command, long initialDelay, long period , TimeUnit unit) : 指定 command 任務將在 delay 延遲後執行,而且以設定頻率重複執行。也就是說,在 initialDelay 後開始執行,依次在 initialDelay + period 、 initialDelay +2* period …處重複執行,依此類推。ScheduledFuture
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):創建並執行一個在給定初始延遲後首次啟用的定期操作,隨後在每一次執行終止和下一次執行開始之間都存在給定的延遲。如果任務在任一次執行時遇到異常,就會取消後續執行;否則,只能通過程序來顯式取消或終止該任務。用完一個線程池後,應該調用該線程池的 shutdown() 方法,該方法將啟動線程池的關閉序列,調用 shutdown() 方法後的線程池不再接收新任務,但會將以前所有已提交任務執行完成。當線程池中的所有任務都執行完成後,池中的所有線程都會死亡;另外也可以調用線程池的 shutdownNow() 方法來關閉線程池,該方法試圖停止所有正在執行的活動任務,暫停處理正在等待的任務,並返回等待執行的任務列表。
使用線程池來執行線程任務的步驟如下。
調用 Executors 類的靜態工廠方法創建一個 ExecutorService 對象,該對象代表一個線程池。
創建 Runnable 實現類或 Callable 實現類的實例,作為線程執行任務。
調用 ExecutorService 對象的 submit() 方法來提交 Runnable 實例或 Callable 實例。
當不想提交任何任務時,調用 ExecutorService 對象的 shutdown() 方法來關閉線程池。
下面程序使用線程池來執行指定 Runnable 對象所代表的任務。
//實現Runnable接口來定義一個簡單的 class TestThread implements Runnable{ public void run(){ for (int i = 0; i < 100 ; i++ ){ System.out.println(Thread.currentThread().getName() + "的i值為:" + i); } } } public class ThreadPoolTest{ public static void main(String[] args) { //創建一個具有固定線程數(6)的線程池 ExecutorService pool = Executors.newFixedThreadPool(6); //向線程池中提交2個線程 pool.submit(new TestThread()); pool.submit(new TestThread()); //關閉線程池 pool.shutdown(); } }
上面程序中創建 Runnable 實現類與最開始創建線程池並沒有太大差別,創建了 Runnable 實現類之後程序沒有直接創建線程、啟動線程來執行該 Runnable 任務,而是通過線程池來執行該任務,使用線程池來執行 Runnable 任務的代碼如程序中粗體字代碼所示。運行上面程序,將看到兩個線程交替執行的效果,如下圖所示。
Java 8 增強的 ForkJoinPool
現在計算機大多已向多 CPU 方向發展,即使普通 PC ,甚至小型智能設備(如手機)、多核處理器也已被廣泛應用。在未來的日子裡,處理器的核心數將會發展到更多。
雖然硬件上的多核 CPU 已經十分成熟,但很多應用程序並未為這種多核 CPU 做好準備,因此並不能很好地利用多核 CPU 的性能優勢。
為了充分利用多 CPU 、多核 CPU 的性能優勢,計算機軟件系統應該可以充分“挖掘”每個 CPU 的計算能力,絕不能讓某個 CPU 處於“空閒”狀態。為了充分利用多 CPU 、多核 CPU 的優勢,可以考慮把一個任務拆分成多個“小任務”,把多個“小任務”放到多個處理器核心上並行執行;當多個“小任務”執行完成之後,再將這些執行結果合併起來即可。
Java 7 提供了 ForkJoinPool 來支持將一個任務拆分成多個“小任務”並行計算,再把多個“小任務”的結果合併成總的計算結果。 ForkJoinPool 是 ExecutorService 的實現類,因此是一種特殊的線程池。ForkJoinPool 提供瞭如下兩個常用的構造器。
ForkJoinPool(int parallelism):創建一個包含 parallelism 個並行線程的 ForkJoinPool 。
ForkJoinPool():以 Runtime.availableProcessors() 方法的返回值作為 parallelism 參數來創建 ForkJoinPool
Java 8 進一步擴展了 ForkJoinPool 的功能 ,Java 8 為 ForkJoinPool 增加了通用池功能。 ForkJoinPool 類通過如下兩個靜態方法提供通用池功能。
ForkJoinPool commonPool():該方法返回一個通用池,通用池的運行狀態不會受 shutdown() 或 shutdownNow() 方法的影響。當然,如果程序直接執行 System.exit(0); 來終止虛擬機,通用池以及通用池中正在執行的任務都會被自動終止。
int getCommonPoolParallelism():該方法返回通用池的並行級別。
創建了 ForkJoinPool 實例之後,就可調用 ForkJoinPool 的 submit(ForkJoinTask task) 或 invoke(ForkJoinTask task) 方法來執行指定任務了。其中 ForkJoinTask 代表一個可以並行、合併的任務。ForkJoinTask 是一個抽象類,它還有兩個抽象子類 : RecursiveAction 和 RecursiveTask 。其中 RecursiveTask 代表有返回值的任務,而 RecursiveAction 代表沒有返回值的任務。
下面以執行沒有返回值的“大任務”(簡單地打印0〜300的數值)為例,程序將一個“大任務”拆分成多個“小任務”,並將任務交給 ForkJoinPool 來執行。
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; class PrintTask extends RecursiveAction{ // 每個“小任務”最多隻打印50個數 private static final int THRESHOLD = 50; private int start; private int end; // 打印從 start 到 end 的任務 public PrintTask(int start, int end) { this.start = start; this.end = end; } @Override protected void compute() { // 當 end 與 start 之間的差小於 THRESHOLD 時,開始打印 if(end-start<THRESHOLD) { for(int i=start;i<end;i++) { System.out.println(Thread.currentThread().getName()+"的 i 值:"+i); } }else { // 當 end 與 start 之間的差大於 THRESHOLD 時,即要打印的數超過50個時 // 將大任務分解成兩個“小任務” int middle = (start+end)/2; PrintTask left = new PrintTask(start, middle); PrintTask right = new PrintTask(middle, end); // 並行執行兩個“小任務” left.fork(); right.fork(); } } } public class ForkJoinPoolTest { public static void main(String[] args) throws Exception { ForkJoinPool pool = new ForkJoinPool(); // 提交可分解的 PrintTask 任務 pool.submit(new PrintTask(0, 300)); pool.awaitTermination(2, TimeUnit.SECONDS); // 關閉線程池 pool.shutdown(); } }
上面程序中的粗體字代碼實現了對指定打印任務的分解,分解後的任務分別調用 fork() 方法開始並行執行。運行上面程序,可以看到如下圖所示的結果。
從如上圖所示的執行結果來看, ForkJoinPool 啟動了 4個線程來執行這個打印任務――這是因為測試計算機的 CPU 是4核的。不僅如此,讀者可以看到程序雖然打印了 0〜299這300個數字,但並不是連續打印的,這是因為程序將這個打印任務進行了分解,分解後的任務會並行執行,所以不會按順序從0打印到299。
上面定義的任務是一個沒有返回值的打印任務,如果大任務是有返回值的任務,則可以讓任務繼承 RecursiveTask
package com.jwen.demo4; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; import java.util.function.Function; class CalTask extends RecursiveTask{ // 每個“小任務”最多隻累加20個數 private static final int THRESHOLD = 20; private int arr[]; private int start; private int end; // 累加從 start 到 end 的數組元素 public CalTask(int[] arr, int start, int end) { this.arr = arr; this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; // 當 end 與 start 之間的差小於 THRESHOLD 時,開始進行實際累加 if(end-start<THRESHOLD) { for(int i=start;i<end;i++) { sum+=arr[i]; } return sum; }else { // 當 end 與 start 之間的差大於 THRESHOLD 時,即要累加的數超過20個時 // 將大任務分解成兩個“小任務” int middle = (start+end)/2; CalTask left = new CalTask(arr, start, middle); CalTask right = new CalTask(arr, middle, end); // 並行執行兩個“小任務” left.fork(); right.fork(); // 把兩個“小任務”累加的結果合併起來 return left.join()+right.join(); // ① } } } public class Sum { public static void main(String[] args) throws Exception{ int[] arr = new int[100]; Random rand = new Random(); int total = 0; // 初始化100個數字元素 for(int i=0,len = arr.length;i<len;i++) { int tmp = rand.nextInt(20); // 對數組元素賦值,並將數組元素的值添加到 sum 總和中 total +=(arr[i]=tmp); } System.out.println(total); // 創建一個通用池 ForkJoinPool pool = ForkJoinPool.commonPool(); // 提交可分解的 CalTask 任務 Futurefuture = pool.submit(new CalTask(arr, 0, arr.length)); System.out.println(future.get()); // 關閉線程池 pool.shutdown(); } }
上面程序與前一個程序基本相似,同樣是將任務進行了分解,並調用分解後的任務的 fork() 方法使它們並行執行。與前一個程序不同的是,現在任務是帶返回值的,因此程序還在①號代碼處將兩個分解後的“小任務”的返回值進行了合併。
運行上面程序,將可以看到程序通過 CalTask 計算出來的總和,與初始化數組元素時統計出來的總和總是相等,這表明程序一切正常。
Java 的確是一門非常優秀的編程語言,在多 CPU、多核 CPU 時代來到時,Java 語言的多線程已經為多核 CPU 做好了準備。
[sl_ivan ] 詳細分析JAVA 線程池已經有249次圍觀