歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

ThreadPoolExecutor-線程池開發的使用

好久沒有寫過筆記了,最近做的一個項目涉及打線程池和隊列的開發,覺得在這個項目中學習到的還是挺多的,對線程安全,並發的知識有加深認知;當然,現在用過的東西並不是代表以後還能娴熟的使用,做好筆記非常重要;

1:必須明白為什麼要使用線程池:(這點很重要)

  a:手上項目所需,因為項目主要的目的是實現多線程的數據推送;需要創建多線程的話,那就要處理好線程安全的問題;因為項目需要,還涉及到排隊下載的功能,所以就選擇了線程池來管理線程以及線程池裡面的任務隊列workQueue來實現項目所需的功能;

  b:在實際使用中,服務器在創建和銷毀線程上花費的時間和消耗的系統資源都相當大,甚至可能要比在處理實際的用戶請求的時間和資源要多的多。除了創建和銷毀線程的開銷之外,活動的線程也需要消耗系統資源。如果在一個jvm裡創建太多的線程,可能會使系統由於過度消耗內存或“切換過度”而導致系統資源不足。為了防止資源不足,服務器應用程序需要采取一些辦法來限制任何給定時刻處理的請求數目,盡可能減少創建和銷毀線程的次數,特別是一些資源耗費比較大的線程的創建和銷毀,盡量利用已有對象來進行服務,這就是“池化資源”技術產生的原因。 線程池主要用來解決線程生命周期開銷問題和資源不足問題(這段是摘自網絡)

2:如何創建一個線程池:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

這裡只是創建線程池其中的一個構造函數;其實其他的構造函數最終還是調用的這個構造函數;

說明一下這些參數的作用:

corePoolSize:核心池的大小,在創建了線程池後,線程池中的線程數為0,當有任務來之後,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中;

maximumPoolSize:線程池最大線程數,它表示在線程池中最多能創建多少個線程;這個參數是跟後面的阻塞隊列聯系緊密的;只有當阻塞隊列滿了,如果還有任務添加到線程池的話,會嘗試new 一個Thread的進行救急處理,立馬執行對應的runnable任務;如果繼續添加任務到線程池,且線程池中的線程數已經達到了maximumPoolSize,那麼線程就會就會執行reject操作(這裡後面會提及到)

keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止;默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用;即當線程池中的線程數大於corePoolSize時,如果一個線程空閒的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法並設置了參數為true,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的阻塞隊列大小為0;(這部分通過查看ThreadPoolExecutor的源碼分析--getTask()部分);

unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性(時間單位)

workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這裡的阻塞隊列有以下幾種選擇  

  ArrayBlockingQueue;

  LinkedBlockingQueue;

  SynchronousQueue;

  ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。

threadFactory:線程工廠,主要用來創建線程:默認值 DefaultThreadFactory;

handler:表示當拒絕處理任務時的策略,就是上面提及的reject操作;有以下四種取值:

  ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。(默認handle)

  ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。

  ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然後重新嘗試執行任務(重復此過程)

  ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

3:對線程池的基本使用及其部分源碼的分析(注意:這裡的源碼分析是基於jdk1.6;)

a:線程池的狀態

volatile int runState;
 static final int RUNNING = 0; 運行狀態
static final int SHUTDOWN = 1; 關閉狀態;SHUTDOWN狀態,此時線程池不能夠接受新的任務,它會等待所有任務執行完畢
static final int STOP = 2;停止狀態;此時線程池不能接受新的任務,並且會去嘗試終止正在執行的任務
static final int TERMINATED = 3;終止狀態;當線程池處於SHUTDOWN或STOP狀態,並且所有工作線程已經銷毀,任務緩存隊列已經清空或執行結束後,線程池被設置為TERMINATED狀態

b:參數再次說明。這是摘自網絡的解釋,我覺得他比喻的很好,所以這裡直接就用它的解釋

  這裡要重點解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個變量。

  corePoolSize在很多地方被翻譯成核心池大小,其實我的理解這個就是線程池的大小。舉個簡單的例子:

  假如有一個工廠,工廠裡面有10個工人,每個工人同時只能做一件任務。

  因此只要當10個工人中有工人是空閒的,來了任務就分配給空閒的工人做;

  當10個工人都有任務在做時,如果還來了任務,就把任務進行排隊等待;

  如果說新任務數目增長的速度遠遠大於工人做任務的速度,那麼此時工廠主管可能會想補救措施,比如重新招4個臨時工人進來;

  然後就將任務也分配給這4個臨時工人做;

  如果說著14個工人做任務的速度還是不夠,此時工廠主管可能就要考慮不再接收新的任務或者拋棄前面的一些任務了。

  當這14個工人當中有人空閒時,而新任務增長的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。

  這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

  也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量突然過大時的一種補救措施。

  不過為了方便理解,在本文後面還是將corePoolSize翻譯成核心池大小。

  largestPoolSize只是一個用來起記錄作用的變量,用來記錄線程池中曾經有過的最大線程數目,跟線程池的容量沒有任何關系。

c:添加線程池任務的入口就是execute();

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();//任務為空時拋出異常
    //如果線程池線程大小小於核心線程,就新建一個線程加入任務並啟動線程
    //如果線程池線程大小大於核心線且且添加任務到線程失敗,就把任務添加到阻塞隊列
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {//新建線程並啟動
        if (runState == RUNNING && workQueue.offer(command)) {//添加任務到隊列
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);//添加到隊列失敗或已滿,做拒接任務處理策略
        }
        //若阻塞隊列失敗或已滿;這裡新建一個線程並啟動做應急處理(這裡就是用到了maximumPoolSize參數)
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // 若線程池的線程超過了maximumPoolSize;就做拒絕處理任務策略
    }
}

-->>繼續跟蹤代碼到addIfUnderCorePoolSize(Runnable firstTask):函數名稱就可以看出來這個函數要執行的什麼;如果線程池的線程小於核心線程數corePoolSize就新建線程加入任務並啟動線程【在今後的開發中盡量把需要做的功能在函數名體現出來】

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;//獲取當前線程池的鎖
        mainLock.lock();//加鎖
        try {
            /*
            這裡線程池線程大小還需要判斷一次;前面的判斷過程中並沒有加鎖,因此可能在execute方法判斷的時候poolSize小於corePoolSize,而判斷完之後,在其他線程中又向線程池提交了任務,就可能導致poolSize不小於corePoolSize了,所以需要在這個地方繼續���斷
            */
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);//新建線程
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();//若創建線程超過,就啟動線程池的線程
        return true;
    }
    private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);//worker:ThreadPoolExecutor的內部類;
        Thread t = threadFactory.newThread(w);//使用線程工廠創建一個線程
        if (t != null) {
            w.thread = t;
            workers.add(w);//保存線程池正在運行的線程
            int nt = ++poolSize;//線程池的線程數加1
            if (nt > largestPoolSize)
                largestPoolSize = nt;
        }
        return t;
    }

-->>接下來定位worker類,看看線程池裡的線程是如何執行的

上面的addIfUnderCorePoolSize(..)已經把線程啟動了;現在就直接查看worker 的run()方法了

public void run() {
    try {
        Runnable task = firstTask;//該線程的第一個任務,執行完後就從阻塞隊列取任務執行
        firstTask = null;
        while (task != null || (task = getTask()) != null) {//getTask()從隊列去任務執行
            runTask(task);//線程執行任務
            task = null;
        }
    } finally {
        workerDone(this);//若任務全部執行完,就開始嘗試去停止線程池;這部分代碼就不再追蹤下去,有興趣的讀者可以自己打開源碼分析,不必害怕,學習大神們的編碼方式,看源碼能讓你學習到很多
    }
}
 private void runTask(Runnable task) {
    final ReentrantLock runLock = this.runLock;
    runLock.lock();
    try {
        //多次檢查線程池有沒有關閉
        if (runState < STOP &&
            Thread.interrupted() &&
            runState >= STOP)
            thread.interrupt();
           
        boolean ran = false;
        //這裡就可以繼承ThreadPoolExecutor,並覆蓋beforeExecute(...)該方法,來做一些執行任務之前的統計工作或者用來保存正在執行的任務
        beforeExecute(thread, task);
        try {
            task.run();
            ran = true;
            //這裡就可以繼承ThreadPoolExecutor,並覆蓋beforeExecute(...)該方法,來做一些執行任務完成之後的統計工作或者用來保存正在執行的任務
            afterExecute(task, null);
            ++completedTasks;//統計總共執行的任務數
        } catch (RuntimeException ex) {
            if (!ran)
                afterExecute(task, ex);
            throw ex;
        }
    } finally {
        runLock.unlock();
    }
}

至此線程池基本的流程完了;

再說說我在項目中的使用:
MyExtendThreadPoolExecutor 繼承了 ThreadPoolExecutor,並覆蓋了其中的一些方法

public class MyExtendThreadPoolExecutor extends ThreadPoolExecutor{
    public static Logger logger=LoggerFactory.getLogger(MyExtendThreadPoolExecutor.class);
    /**
    * 記錄運行中任務
    */
    private LinkedBlockingQueue<Runnable> workBlockingQueue=new  LinkedBlockingQueue<Runnable>();
   
    public MyExtendThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        workBlockingQueue.add((GtdataBreakpointResumeDownloadThread)r);//保存在運行的任務
        logger.info("Before the task execution");
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        workBlockingQueue.remove((GtdataBreakpointResumeDownloadThread)r);//移除關閉的任務
        logger.info("After the task execution");
    }
    /**
    *
    * Description: 正在運行的任務
    * @return LinkedBlockingQueue<Runnable><br>
    * @author lishun
    */
    public LinkedBlockingQueue<Runnable> getWorkBlockingQueue() {
        return workBlockingQueue;
    }
}

MyExtendThreadPoolExecutor pool = new MyExtendThreadPoolExecutor(3, 3,60L,TimeUnit.SECONDS,new LinkedBlockingQueue <Runnable>()); //創建線程池

public void addToThreadPool(DownloadRecord downloadRecord){
    BlockingQueue<Runnable> waitThreadQueue = pool.getQueue();//Returns the task queue
    LinkedBlockingQueue<Runnable> workThreadQueue =pool.getWorkBlockingQueue();//Returns the running work
    GtdataBreakpointResumeDownloadThread downloadThread =
            new GtdataBreakpointResumeDownloadThread(downloadRecord);//需要執行的任務線程
   
    if (!waitThreadQueue.contains(downloadThread)&&!workThreadQueue.contains(downloadThread)) {//判斷任務是否存在正在運行的線程或存在阻塞隊列,不存在的就加入線程池(這裡的比較要重寫equals())
        Timestamp recordtime = new Timestamp(System.currentTimeMillis());
        logger.info("a_workThread:recordId="+downloadRecord.getId()+",name="+downloadRecord.getName()+" add to  workThreadQueue");
        downloadThread.setName("th_"+downloadRecord.getName());
        pool.execute(downloadThread);//添加到線程池
    }else{
        logger.info("i_workThread:recordId="+downloadRecord.getId()+",name="+downloadRecord.getName()+" in  waitThreadQueue or workThreadQueue");
    }
}

Copyright © Linux教程網 All Rights Reserved