歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

Python多進程並發操作中進程池Pool的應用

在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多台主機,並行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,10幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,這時候進程池Pool發揮作用的時候就到了。

Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,

那麼就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,

那麼該請求就會等待,直到池中有進程結束,

才會創建新的進程來它。這裡有一個簡單的例子:

#!/usr/bin/env python

#coding=utf-8

from multiprocessing import Pool

from time import sleep

def f(x):

    for i in range(10):

        print '%s --- %s ' % (i, x)

        sleep(1)

def main():

    pool = Pool(processes=3)    # set the processes max number 3

    for i in range(11,20):

        result = pool.apply_async(f, (i,))

    pool.close()

    pool.join()

    if result.successful():

        print 'successful'

if __name__ == "__main__":

    main()

先創建容量為3的進程池,然後將f(i)依次傳遞給它,運行腳本後利用ps aux | grep pool.py查看進程情況,會發現最多只會有三個進程執行。pool.apply_async()用來向進程池提交目標請求,pool.join()是用來等待進程池中的worker進程執行完畢,防止主進程在worker進程結束前結束。但必pool.join()必須使用在pool.close()或者pool.terminate()之後。其中close()跟terminate()的區別在於close()會等待池中的worker進程執行結束再關閉pool,而terminate()則是直接關閉。result.successful()表示整個調用執行的狀態,如果還有worker沒有執行完,則會拋出AssertionError異常。
    利用multiprocessing下的Pool可以很方便的同時自動處理幾百或者上千個並行操作,腳本的復雜性也大大降低.

python中multiprocessing.pool函數介紹

一 apply(func[, args[, kwds]])
  apply用於傳遞不定參數,同python中的apply函數一致(不過內置的apply函數從2.3以後就不建議使用了),主進程會阻塞於函數。
for x in gen_list(l):
    result = pool.apply(pool_test, (x,))
    print 'main process'
這個時候主進程的執行流程同單進程一致
二 apply_async(func[, args[, kwds[, callback]]])
  與apply用法一致,但它是非阻塞的且支持結果返回後進行回調。
for x in gen_list(l):
    result = pool.apply_async(pool_test, (x,))
    print 'main process'
  這個時候主進程循環運行過程中不等待apply_async的返回結果,在主進程結束後,即使子進程還未返回整個程序也會就退出。雖然 apply_async是非阻塞的,但其返回結果的get方法卻是阻塞的,在本例中result.get()會阻塞主進程。因此可以這樣來處理返回結果:
    [x.get() for x in [pool.apply_async(pool_test, (x,)) for x in gen_list(l)]]
如果我們對返回結果不感興趣, 那麼可以在主進程中使用pool.close與pool.join來防止主進程退出。注意join方法一定要在close或terminate之後調用。
    for x in gen_list(l):
    pool.apply_async(pool_test, (x, ))
    print 'main_process'
    pool.close()
    pool.join()
三 map(func, iterable[, chunksize])
  map方法與內置的map函數行為基本一致,在它會使進程阻塞與此直到結果返回。
  但需注意的是其第二個參數雖然描述的為iterable, 但在實際使用中發現只有在整個隊列全部就緒後,程序才會運行子進程。
四 map_async(func, iterable[, chunksize[, callback]])
  與map用法一致,但是它是非阻塞的。其有關事項見apply_async。
五 imap(func, iterable[, chunksize])
  與map不同的是, imap的返回結果為iter,需要在主進程中主動使用next來驅動子進程的調用。即使子進程沒有返回結果,主進程對於gen_list(l)的 iter還是會繼續進行, 另外根據python2.6文檔的描述,對於大數據量的iterable而言,將chunksize設置大一些比默認的1要好。
  for x in pool.imap(pool_test, gen_list(l)):
      pass
六 imap_unordered(func, iterable[, chunksize])
  同imap一致,只不過其並不保證返回結果與迭代傳入的順序一致。
七 close()
  關閉pool,使其不在接受新的任務。
八 terminate()
  結束工作進程,不在處理未處理的任務。
九 join()
  主進程阻塞等待子進程的退出, join方法要在close或terminate之後使用。

 

l = range(10)
def gen_list(l):
    for x in l:
        print 'yield', x
        yield x

 def pool_test(x):
    print 'f2', x
    time.sleep(1)

Copyright © Linux教程網 All Rights Reserved