2010年11月1日 星期一

Multithread framework 2/2

http://www.zhuaxia.com/item/590227619/

java.util.concurrent 多線程框架 (下)


有時候在實際應用中,某些操作很耗時,但又不是不可或缺的步驟。 比如用網頁瀏覽器瀏覽新聞時,最重要的是要顯示文字內容,至於與新聞相匹配的圖片就沒有那麼重要的,所以此時首先保證文字信息先顯示,而圖片信息會後顯示,但又不能不顯示,由於下載圖片是一個耗時的操作,所以必須一開始就得下載。 


Java的並發庫的Future類就可以滿足這個要求。 Future的重要方法包括get()和cancel(),get()獲取數據對象,如果數據沒有加載,就會阻塞直到取到數據,而cancel()是取消數據加載。 另外一個get(timeout)操作,表示如果在timeout時間內沒有取到就失敗返回,而不再阻塞。 

下面的Demo簡單的說明了Future的使用方法:一個非常耗時的操作必須一開始啟動,但又不能一直等待;其他重要的事情又必須做,等完成後,就可以做不重要的事情。 

Java代碼 
  1. package concurrent;  
  2.   
  3. import java.util.concurrent.Callable;  
  4. import java.util.concurrent.ExecutionException;  
  5. import java.util.concurrent.ExecutorService;  
  6. import java.util.concurrent.Executors;  
  7. import java.util.concurrent.Future;  
  8.   
  9. public class TestFutureTask {  
  10. public static void main(String[] args)throws InterruptedException,  
  11. ExecutionException {  
  12. final ExecutorService exec = Executors.newFixedThreadPool(5);  
  13. Callable call = new Callable() {  
  14. public String call() throws Exception {  
  15. Thread.sleep(1000 * 5);  
  16. return “Other less important but longtime things.”;  
  17. }  
  18. };  
  19. Future task = exec.submit(call);  
  20. // 重要的事情  
  21. Thread.sleep(1000 * 3);  
  22. System.out.println(“Let's do important things.”);  
  23. // 其他不重要的事情  
  24. String obj = task.get();  
  25. System.out.println(obj);  
  26. // 關閉線程池  
  27. exec.shutdown();  
  28. }  
  29. }  

運行結果: 
Let's do important things. 
Other less important but longtime things. 

考慮以下場景:瀏覽網頁時,瀏覽器了5個線程下載網頁中的圖片文件,由於圖片大小、網站訪問速度等諸多因素的影響,完成圖片下載的時間就會有很大的不同。 如果先下載完成的圖片就會被先顯示到界面上,反之,後下載的圖片就後顯示。 


Java的並發庫的CompletionService可以滿足這種場景要求。 該接口有兩個重要方法:submit()和take()。 submit用於提交一個runnable或者callable,一般會提交給一個線程池處理;而take就是取出已經執行完畢runnable或者callable實例的Future對象,如果沒有滿足要求的,就等待了。 CompletionService還有一個對應的方法poll,該方法與take類似,只是不會等待,如果沒有滿足要求,就返回null對象。 

Java代碼 
  1. package concurrent;  
  2.   
  3. import java.util.concurrent.Callable;  
  4. import java.util.concurrent.CompletionService;  
  5. import java.util.concurrent.ExecutionException;  
  6. import java.util.concurrent.ExecutorCompletionService;  
  7. import java.util.concurrent.ExecutorService;  
  8. import java.util.concurrent.Executors;  
  9. import java.util.concurrent.Future;  
  10.   
  11. public class TestCompletionService {  
  12. public static void main(String[] args) throws InterruptedException,  
  13. ExecutionException {  
  14. ExecutorService exec = Executors.newFixedThreadPool(10);  
  15. CompletionService serv =  
  16. new ExecutorCompletionService(exec);  
  17.   
  18. for (int index = 0; index < 5; index++) {  
  19. final int NO = index;  
  20. Callable downImg = new Callable() {  
  21. public String call() throws Exception {  
  22. Thread.sleep((long) (Math.random() * 10000));  
  23. return “Downloaded Image ” + NO;  
  24. }  
  25. };  
  26. serv.submit(downImg);  
  27. }  
  28.   
  29. Thread.sleep(1000 * 2);  
  30. System.out.println(“Show web content”);  
  31. for (int index = 0; index < 5; index++) {  
  32. Future task = serv.take();  
  33. String img = task.get();  
  34. System.out.println(img);  
  35. }  
  36. System.out.println(“End”);  
  37. // 關閉線程池  
  38. exec.shutdown();  
  39. }  
  40. }  

運行結果: 
Show web content 
Downloaded Image 1 
Downloaded Image 2 
Downloaded Image 4 
Downloaded Image 0 
Downloaded Image 3 
End 

操作系統的信號量是個很重要的概念,在進程控制方面都有應用。 Java並發庫的Semaphore可以很輕鬆完成信號量控制,Semaphore可以控制某個資源可被同時訪問的個數,acquire()獲取一個許可,如果沒有就等待,而release()釋放一個許可。 比如在Windows下可以設置共享文件的最大客戶端訪問個數。 

Semaphore維護了當前訪問的個數,提供同步機制,控制同時訪問的個數。 在數據結構中鍊錶可以保存“無限”的節點,用Semaphore可以實現有限大小的鍊錶。 另外重入鎖ReentrantLock也可以實現該功能,但實現上要負責些,代碼也要復雜些。 

下面的Demo中申明了一個只有5個許可的Semaphore,而有20個線程要訪問這個資源,通過acquire()和release()獲取和釋放訪問許可。 

Java代碼 
  1. package concurrent;  
  2.   
  3. import java.util.concurrent.ExecutorService;  
  4. import java.util.concurrent.Executors;  
  5. import java.util.concurrent.Semaphore;  
  6.   
  7. public class TestSemaphore {  
  8. public static void main(String[] args) {  
  9. // 線程池  
  10. ExecutorService exec = Executors.newCachedThreadPool();  
  11. // 只能5個線程同時訪問  
  12. final Semaphore semp = new Semaphore(5);  
  13. // 模擬20個客戶端訪問  
  14. for (int index = 0; index < 20; index++) {  
  15. final int NO = index;  
  16. Runnable run = new Runnable() {  
  17. public void run() {  
  18. try {  
  19. // 獲取許可  
  20. semp.acquire();  
  21. System.out.println(“Accessing: ” + NO);  
  22. Thread.sleep((long) (Math.random() * 10000));  
  23. // 訪問完後,釋放  
  24. semp.release();  
  25. catch (InterruptedException e) {  
  26. }  
  27. }  
  28. };  
  29. exec.execute(run);  
  30. }  
  31. // 退出線程池  
  32. exec.shutdown();  
  33. }  
  34. }  

運行結果: 
Accessing: 0 
Accessing: 1 
Accessing: 2 
Accessing: 3 
Accessing: 4 
Accessing: 5 
Accessing: 6 
Accessing: 7 
Accessing: 8 
Accessing: 9 
Accessing: 10 
Accessing: 11 
Accessing: 12 
Accessing: 13 
Accessing: 14 
Accessing: 15 
Accessing: 16 
Accessing: 17 
Accessing: 18 
Accessing: 19 

沒有留言:

張貼留言