歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

Java並行編程–從並行任務集獲取反饋

在並行任務啟動後,強制性地從並行任務得到反饋。

假想有一個程序,可以發送批郵件,還使用了多線程機制。你想知道有多少郵件成功發送嗎?你想知道在實際發送過程期間,這個批處理工作的實時進展嗎?

要實現多線程的這種反饋,我們可以使用Callable接口。此接口的工作方式基本上與Runnable相同,但是執行方法(call())會返回一個值,該值反映了執行計算的結果。

  1. package com.ricardozuasti;  
  2.   
  3. import java.util.concurrent.Callable;  
  4.   
  5. public class FictionalEmailSender implements Callable<Boolean>{  
  6.     private String to;  
  7.     private String subject;  
  8.     private String body;  
  9.     public FictionalEmailSender(String to, String subject, String body){  
  10.         this.to = to;  
  11.         this.subject = subject;  
  12.         this.body = body;  
  13.     }  
  14.   
  15.     @Override  
  16.     public Boolean call() throws InterruptedException {  
  17.         // 在0~0.5秒間模擬發送郵件   
  18.         Thread.sleep(Math.round(Math.random()*0.5*1000));  
  19.         // 假設我們有80%的幾率成功發送郵件   
  20.         if(Math.random()>0.2){  
  21.             return true;  
  22.         }else{  
  23.             return false;  
  24.         }  
  25.     }  
  26.       
  27. }  

注意:Callable接口可用於返回任意數據類型,因此我們的任務可以返回我們需要的任何信息。

現在,我們使用一個線程池ExecutorService來發送郵件,由於我們的任務是以Callable接口實現的,我們提交執行的每個新任務,都會得到一個Future引用。注意我們要使用直接的構造器創建ExecutorService,而不是使用來自Executors的工具方法創建。這是因為使用指定類ThreadPoolExecutor提供了一些方法可以派上用場。

  1. package com.ricardozuasti;  
  2.   
  3. import java.util.concurrent.Future;  
  4. import java.util.concurrent.LinkedBlockingQueue;  
  5. import java.util.concurrent.ThreadPoolExecutor;  
  6. import java.util.concurrent.TimeUnit;  
  7. import java.util.ArrayList;  
  8. import java.util.List;  
  9.   
  10. public class Concurrency2 {  
  11.     public static void main(String[] args){  
  12.         try{  
  13.             ThreadPoolExecutor executor = new ThreadPoolExecutor(30301,  
  14.      TimeUnit.SECONDS, new LinkedBlockingQueue());  
  15.             List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(9000);  
  16.             // 發送垃圾郵件, 用戶名假設為4位數字   
  17.             for(int i=1000; i<10000; i++){  
  18.                 futures.add(executor.submit(new FictionalEmailSender(i+"@sina.com",  
  19.                         "Knock, knock, Neo""The Matrix has you...")));  
  20.             }  
  21.             // 提交所有的任務後,關閉executor   
  22.             System.out.println("Starting shutdown...");  
  23.             executor.shutdown();  
  24.               
  25.             // 每秒鐘打印執行進度   
  26.             while(!executor.isTerminated()){  
  27.                 executor.awaitTermination(1, TimeUnit.SECONDS);  
  28.                 int progress = Math.round((executor.getCompletedTaskCount()  
  29. *100)/executor.getTaskCount());  
  30.                 System.out.println(progress + "% done (" +   
  31. executor.getCompletedTaskCount() + " emails have been sent).");  
  32.             }  
  33.             // 現在所有郵件已發送完, 檢查futures, 看成功發送的郵件有多少   
  34.             int errorCount = 0;  
  35.             int successCount = 0;  
  36.             for(Future<Boolean> future : futures){  
  37.                 if(future.get()){  
  38.                     successCount++;  
  39.                 }else{  
  40.                     errorCount++;  
  41.                 }  
  42.             }  
  43.             System.out.println(successCount + " emails were successfully sent, but " +  
  44.                     errorCount + " failed.");  
  45.         }catch(Exception ex){  
  46.             ex.printStackTrace();  
  47.         }  
  48.     }  
  49. }  
Copyright © Linux教程網 All Rights Reserved