歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

php多線程pthreads的安裝與使用

安裝Pthreads 基本上需要重新編譯PHP,加上 --enable-maintainer-zts 參數,但是用這個文檔很少;bug會很多很有很多意想不到的問題,生成環境上只能呵呵了,所以這個東西玩玩就算了,真正多線程還是用Python、C等等

以下代碼大部分來自網絡

一、安裝

這裡使用的是 php-7.0.2

./configure \

--prefix=/usr/local/php7 \

--with-config-file-path=/etc \

--with-config-file-scan-dir=/etc/php.d \

--enable-debug \

--enable-maintainer-zts \

--enable-pcntl \

--enable-fpm \

--enable-opcache \

--enable-embed=shared \

--enable-json=shared \

--enable-phpdbg \

--with-curl=shared \

--with-mysql=/usr/local/mysql \

--with-mysqli=/usr/local/mysql/bin/mysql_config \

--with-pdo-mysql

make && make install

安裝pthreads

pecl install pthreads

二、Thread

<?php

#1

$thread = new class extends Thread {

    public function run() {

        echo "Hello World {$this->getThreadId()}\n";                                                                                 

    } 

};

 

$thread->start() && $thread->join();

 

 

 

#2

 

class workerThread extends Thread {

    public function __construct($i){

        $this->i=$i;

    }

 

    public function run(){

        while(true){

            echo $this->i."\n";

            sleep(1);

        }

    }

}

 

for($i=0;$i<50;$i++){

    $workers[$i]=new workerThread($i);

    $workers[$i]->start();

}

 

?>

三、 Worker 與 Stackable

Stackables are tasks that are executed by Worker threads. You can synchronize with, read, and write Stackable objects before, after and during their execution.

<?php

class SQLQuery extends Stackable {

 

    public function __construct($sql) {

        $this->sql = $sql;

    }

 

    public function run() {

        $dbh  = $this->worker->getConnection();

        $row = $dbh->query($this->sql);

        while($member = $row->fetch(PDO::FETCH_ASSOC)){

            print_r($member);

        }

    }

 

}

 

class ExampleWorker extends Worker {

    public static $dbh;

    public function __construct($name) {

    }

 

    public function run(){

        self::$dbh = new PDO('mysql:host=10.0.0.30;dbname=testdb','root','123456');

    }

    public function getConnection(){

        return self::$dbh;

    }

}

 

$worker = new ExampleWorker("My Worker Thread");

 

$sql1 = new SQLQuery('select * from test order by id desc limit 1,5');

$worker->stack($sql1);

 

$sql2 = new SQLQuery('select * from test order by id desc limit 5,5');

$worker->stack($sql2);

 

$worker->start();

$worker->shutdown();

?>

四、 互斥鎖

什麼情況下會用到互斥鎖?在你需要控制多個線程同一時刻只能有一個線程工作的情況下可以使用。一個簡單的計數器程序,說明有無互斥鎖情況下的不同

<?php

$counter = 0;

$handle=fopen("/tmp/counter.txt", "w");

fwrite($handle, $counter );

fclose($handle);

 

class CounterThread extends Thread {

    public function __construct($mutex = null){

        $this->mutex = $mutex;

        $this->handle = fopen("/tmp/counter.txt", "w+");

    }

    public function __destruct(){

        fclose($this->handle);

    }

    public function run() {

        if($this->mutex)

            $locked=Mutex::lock($this->mutex);

 

        $counter = intval(fgets($this->handle));

        $counter++;

        rewind($this->handle);

        fputs($this->handle, $counter );

        printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);

 

        if($this->mutex)

            Mutex::unlock($this->mutex);

    }

}

 

//沒有互斥鎖

for ($i=0;$i<50;$i++){

    $threads[$i] = new CounterThread();

    $threads[$i]->start();

 

}

 

//加入互斥鎖

$mutex = Mutex::create(true);

for ($i=0;$i<50;$i++){

    $threads[$i] = new CounterThread($mutex);

    $threads[$i]->start();

 

}

 

Mutex::unlock($mutex);

for ($i=0;$i<50;$i++){

    $threads[$i]->join();

}

Mutex::destroy($mutex);

 

?>

多線程與共享內存

在共享內存的例子中,沒有使用任何鎖,仍然可能正常工作,可能工作內存操作本身具備鎖的功能

<?php

$tmp = tempnam(__FILE__, 'PHP');

$key = ftok($tmp, 'a');

 

$shmid = shm_attach($key);

$counter = 0;

shm_put_var( $shmid, 1, $counter );

 

class CounterThread extends Thread {

    public function __construct($shmid){

        $this->shmid = $shmid;

    }

    public function run() {

 

        $counter = shm_get_var( $this->shmid, 1 );

        $counter++;

        shm_put_var( $this->shmid, 1, $counter );

 

        printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);

    }

}

 

for ($i=0;$i<100;$i++){

    $threads[] = new CounterThread($shmid);

}

for ($i=0;$i<100;$i++){

    $threads[$i]->start();

 

}

 

for ($i=0;$i<100;$i++){

    $threads[$i]->join();

}

shm_remove( $shmid );

shm_detach( $shmid );

?>

五、 線程同步

有些場景我們不希望 thread->start() 就開始運行程序,而是希望線程等待我們的命令。thread−>wait();測作用是thread−>start()後線程並不會立即運行,只有收到 thread->notify(); 發出的信號後才運行

<?php

$tmp = tempnam(__FILE__, 'PHP');

$key = ftok($tmp, 'a');

 

$shmid = shm_attach($key);

$counter = 0;

shm_put_var( $shmid, 1, $counter );

 

class CounterThread extends Thread {

    public function __construct($shmid){

        $this->shmid = $shmid;

    }

    public function run() {

 

        $this->synchronized(function($thread){

                $thread->wait();

                }, $this);

 

        $counter = shm_get_var( $this->shmid, 1 );

        $counter++;

        shm_put_var( $this->shmid, 1, $counter );

 

        printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);

    }

}

 

for ($i=0;$i<100;$i++){

    $threads[] = new CounterThread($shmid);

}

for ($i=0;$i<100;$i++){

    $threads[$i]->start();

 

}

 

for ($i=0;$i<100;$i++){

    $threads[$i]->synchronized(function($thread){

            $thread->notify();

            }, $threads[$i]);

}

 

for ($i=0;$i<100;$i++){

    $threads[$i]->join();

}

shm_remove( $shmid );

shm_detach( $shmid );

?>       

 

六、線程池

一個Pool類

<?php

class Update extends Thread {

 

    public $running = false;

    public $row = array();

    public function __construct($row) {

 

    $this->row = $row;

        $this->sql = null;

    }

 

    public function run() {

 

    if(strlen($this->row['bankno']) > 100 ){

        $bankno = safenet_decrypt($this->row['bankno']);

    }else{

        $error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);

        file_put_contents("bankno_error.log", $error, FILE_APPEND);

    }

 

    if( strlen($bankno) > 7 ){

        $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);

 

        $this->sql = $sql;

    }

 

    printf("%s\n",$this->sql);

    }

 

}

 

class Pool {

    public $pool = array();

    public function __construct($count) {

        $this->count = $count;

    }

    public function push($row){

        if(count($this->pool) < $this->count){

            $this->pool[] = new Update($row);

            return true;

        }else{

            return false;

        }

    }

    public function start(){

        foreach ( $this->pool as $id => $worker){

            $this->pool[$id]->start();

        }

    }

    public function join(){

        foreach ( $this->pool as $id => $worker){

              $this->pool[$id]->join();

        }

    }

    public function clean(){

        foreach ( $this->pool as $id => $worker){

            if(! $worker->isRunning()){

                unset($this->pool[$id]);

            }

        }

    }

}

 

try {

    $dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(

        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',

        PDO::MYSQL_ATTR_COMPRESS => true

        )

    );

 

    $sql  = "select id,bankno from members order by id desc";

    $row = $dbh->query($sql);

    $pool = new Pool(5);

    while($member = $row->fetch(PDO::FETCH_ASSOC))

    {

 

        while(true){

            if($pool->push($member)){ //壓入任務到池中

                break;

            }else{ //如果池已經滿,就開始啟動線程

                $pool->start();

                $pool->join();

                $pool->clean();

            }

        }

    }

    $pool->start();

    $pool->join();

 

    $dbh = null;

 

} catch (Exception $e) {

    echo '[' , date('H:i:s') , ']', '系統錯誤', $e->getMessage(), "\n";

}

?>

動態隊列線程池

上面的例子是當線程池滿後執行start統一啟動,下面的例子是只要線程池中有空閒便立即創建新線程。

<?php

class Update extends Thread {

 

    public $running = false;

    public $row = array();

    public function __construct($row) {

 

        $this->row = $row;

        $this->sql = null;

        //print_r($this->row);

    }

 

    public function run() {

 

        if(strlen($this->row['bankno']) > 100 ){

            $bankno = safenet_decrypt($this->row['bankno']);

        }else{

            $error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);

            file_put_contents("bankno_error.log", $error, FILE_APPEND);

        }

 

        if( strlen($bankno) > 7 ){

            $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);

 

            $this->sql = $sql;

        }

 

        printf("%s\n",$this->sql);

    }

 

}

 

try {

    $dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(

                PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',

                PDO::MYSQL_ATTR_COMPRESS => true

                )

            );

 

    $sql    = "select id,bankno from members order by id desc limit 50";

 

    $row = $dbh->query($sql);

    $pool = array();

    while($member = $row->fetch(PDO::FETCH_ASSOC))

    {

        $id    = $member['id'];

        while (true){

            if(count($pool) < 5){

                $pool[$id] = new Update($member);

                $pool[$id]->start();

                break;

            }else{

                foreach ( $pool as $name => $worker){

                    if(! $worker->isRunning()){

                        unset($pool[$name]);

                    }

                }

            }

        }

 

    }

 

    $dbh = null;

 

} catch (Exception $e) {

    echo '【' , date('H:i:s') , '】', '【系統錯誤】', $e->getMessage(), "\n";

}

?> 

 

pthreads Pool類

<php

 

class WebWorker extends Worker {

 

    public function __construct(SafeLog $logger) {

        $this->logger = $logger;

    }

 

    protected $loger;

}

 

class WebWork extends Stackable {

 

    public function isComplete() {

        return $this->complete;

    }

 

    public function run() {

        $this->worker

            ->logger

            ->log("%s executing in Thread #%lu",

                    __CLASS__, $this->worker->getThreadId());

        $this->complete = true;

    }

 

    protected $complete;

}

 

class SafeLog extends Stackable {

 

    protected function log($message, $args = []) {

        $args = func_get_args();

 

        if (($message = array_shift($args))) {

            echo vsprintf(

                    "{$message}\n", $args);

        }

    }

}

 

 

$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);

 

$pool->submit($w=new WebWork());

$pool->submit(new WebWork());

$pool->submit(new WebWork());

$pool->submit(new WebWork());

$pool->submit(new WebWork());

$pool->submit(new WebWork());

$pool->submit(new WebWork());

$pool->submit(new WebWork());

$pool->submit(new WebWork());

$pool->submit(new WebWork());

$pool->submit(new WebWork());

$pool->submit(new WebWork());

$pool->submit(new WebWork());

$pool->submit(new WebWork());

$pool->shutdown();

 

$pool->collect(function($work){

        return $work->isComplete();

        });

 

var_dump($pool);           

 

七、多線程文件安全讀寫

  • LOCK_SH 取得共享鎖定(讀取的程序)
  • LOCK_EX 取得獨占鎖定(寫入的程序
  • LOCK_UN 釋放鎖定(無論共享或獨占)
  • LOCK_NB 如果不希望 flock() 在鎖定時堵塞

<?php

$fp = fopen("/tmp/lock.txt", "r+");

if (flock($fp, LOCK_EX)) {  // 進行排它型鎖定

    ftruncate($fp, 0);      // truncate file

    fwrite($fp, "Write something here\n");

    fflush($fp);            // flush output before releasing the lock

    flock($fp, LOCK_UN);    // 釋放鎖定

} else {

    echo "Couldn't get the lock!";

}

fclose($fp);

 

 

 

$fp = fopen('/tmp/lock.txt', 'r+');

if(!flock($fp, LOCK_EX | LOCK_NB)) {

    echo 'Unable to obtain lock';

    exit(-1);

}

fclose($fp);

?>

八、多線程與數據連接

pthreads 與 pdo 同時使用是,需要注意一點,需要靜態聲明public static $dbh;並且通過單例模式訪問數據庫連接。

Worker 與 PDO

<?php

class Work extends Stackable {

 

        public function __construct() {

        }

 

        public function run() {

                $dbh  = $this->worker->getConnection();

                $sql    = "select id,name from members order by id desc limit 50";

                $row = $dbh->query($sql);

                while($member = $row->fetch(PDO::FETCH_ASSOC)){

                        print_r($member);

                }

        }

 

}

 

class ExampleWorker extends Worker {

        public static $dbh;

        public function __construct($name) {

        }

 

        /*

        * The run method should just prepare the environment for the work that is coming ...

        */

        public function run(){

                self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');

        }

        public function getConnection(){

                return self::$dbh;

        }

}

 

$worker = new ExampleWorker("My Worker Thread");

 

$work=new Work();

$worker->stack($work);

 

$worker->start();

$worker->shutdown();

?>

Pool 與 PDO

在線程池中鏈接數據庫

# cat pool.php

<?php

class ExampleWorker extends Worker {

 

    public function __construct(Logging $logger) {

        $this->logger = $logger;

    }

 

    protected $logger;

}

 

/* the collectable class implements machinery for Pool::collect */

class Work extends Stackable {

    public function __construct($number) {

        $this->number = $number;

    }

    public function run() {

                $dbhost = 'db.example.com';              // 數據庫服務器

                $dbuser = 'example.com';                // 數據庫用戶名

                $dbpw = 'password';                              // 數據庫密碼

                $dbname = 'example_real';

        $dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(

                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',

                        PDO::MYSQL_ATTR_COMPRESS => true,

            PDO::ATTR_PERSISTENT => true

                        )

                );

        $sql = "select OPEN_TIME, `COMMENT` from MT4_TRADES where LOGIN='".$this->number['name']."' and CMD='6' and `COMMENT` = '".$this->number['order'].":DEPOSIT'";

        #echo $sql;

        $row = $dbh->query($sql);

        $mt4_trades  = $row->fetch(PDO::FETCH_ASSOC);

        if($mt4_trades){

 

            $row = null;

 

            $sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt4_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';";

            $dbh->query($sql);

            #printf("%s\n",$sql);

        }

        $dbh = null;

        printf("runtime: %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']);

 

    }

}

 

class Logging extends Stackable {

    protected  static $dbh;

    public function __construct() {

        $dbhost = 'db.example.com';        // 數據庫服務器

            $dbuser = 'example.com';                // 數據庫用戶名

            $dbpw = 'password';                              // 數據庫密碼

        $dbname = 'example_real';          // 數據庫名

 

        self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(

            PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',

            PDO::MYSQL_ATTR_COMPRESS => true

            )

        );

 

    }

    protected function log($message, $args = []) {

        $args = func_get_args();

 

        if (($message = array_shift($args))) {

            echo vsprintf("{$message}\n", $args);

        }

    }

 

    protected function getConnection(){

                return self::$dbh;

        }

}

 

$pool = new Pool(200, \ExampleWorker::class, [new Logging()]);

 

$dbhost = 'db.example.com';                      // 數據庫服務器

$dbuser = 'example.com';                // 數據庫用戶名

$dbpw = 'password';                              // 數據庫密碼

$dbname = 'db_example';

$dbh    = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(

                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',

                        PDO::MYSQL_ATTR_COMPRESS => true

                        )

                );

$sql = "select `order`,name from accounts where deposit_time is null order by id desc";

 

$row = $dbh->query($sql);

while($account = $row->fetch(PDO::FETCH_ASSOC))

{

        $pool->submit(new Work($account));

}

 

$pool->shutdown();

 

?> 


進一步改進上面程序,我們使用單例模式 $this->worker->getInstance(); 全局僅僅做一次數據庫連接,線程使用共享的數據庫連接

<?php

class ExampleWorker extends Worker {

 

    #public function __construct(Logging $logger) {

    #  $this->logger = $logger;

    #}

 

    #protected $logger;

    protected  static $dbh;

    public function __construct() {

 

    }

    public function run(){

        $dbhost = 'db.example.com';        // 數據庫服務器

        $dbuser = 'example.com';            // 數據庫用戶名

        $dbpw = 'password';                // 數據庫密碼

        $dbname = 'example';                // 數據庫名

 

        self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(

            PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',

            PDO::MYSQL_ATTR_COMPRESS => true,

            PDO::ATTR_PERSISTENT => true

            )

        );

 

    }

    protected function getInstance(){

        return self::$dbh;

    }

 

}

 

/* the collectable class implements machinery for Pool::collect */

class Work extends Stackable {

    public function __construct($data) {

        $this->data = $data;

        #print_r($data);

    }

 

    public function run() {

        #$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );

 

        try {

            $dbh  = $this->worker->getInstance();

            #print_r($dbh);

                    $id = $this->data['id'];

            $mobile = safenet_decrypt($this->data['mobile']);

            #printf("%d, %s \n", $id, $mobile);

            if(strlen($mobile) > 11){

                $mobile = substr($mobile, -11);

            }

            if($mobile == 'null'){

            #  $sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'";

            #  printf("%s\n",$sql);

            #  $dbh->query($sql);

                $mobile = '';

                $sql = "UPDATE members_digest SET mobile = :mobile where id = :id";

            }else{

                $sql = "UPDATE members_digest SET mobile = md5(:mobile) where id = :id";

            }

            $sth = $dbh->prepare($sql);

            $sth->bindValue(':mobile', $mobile);

            $sth->bindValue(':id', $id);

            $sth->execute();

            #echo $sth->debugDumpParams();

        }

        catch(PDOException $e) {

            $error = sprintf("%s,%s\n", $mobile, $id );

            file_put_contents("mobile_error.log", $error, FILE_APPEND);

        }

 

        #$dbh = null;

        printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);

        #printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);

    }

}

 

$pool = new Pool(100, \ExampleWorker::class, []);

 

#foreach (range(0, 100) as $number) {

#  $pool->submit(new Work($number));

#}

 

$dbhost = 'db.example.com';                    // 數據庫服務器

$dbuser = 'example.com';                        // 數據庫用戶名

$dbpw = 'password';                            // 數據庫密碼

$dbname = 'example';

$dbh    = new PDO("mysql:host=$dbhost;port=3307;dbname=$dbname", $dbuser, $dbpw, array(

                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',

                        PDO::MYSQL_ATTR_COMPRESS => true

                        )

                );

#print_r($dbh);

 

#$sql = "select id, mobile from members where id < :id";

#$sth = $dbh->prepare($sql);

#$sth->bindValue(':id',300);

#$sth->execute();

#$result = $sth->fetchAll();

#print_r($result);

#

#$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";

#$sth = $dbh->prepare($sql);

#$sth->bindValue(':mobile', 'aa');

#$sth->bindValue(':id','272');

#echo $sth->execute();

#echo $sth->queryString;

#echo $sth->debugDumpParams();

 

 

$sql = "select id, mobile from members order by id asc"; // limit 1000";

$row = $dbh->query($sql);

while($members = $row->fetch(PDO::FETCH_ASSOC))

{

        #$order =  $account['order'];

        #printf("%s\n",$order);

        //print_r($members);

        $pool->submit(new Work($members));

        #unset($account['order']);

}

 

$pool->shutdown();

 

?>   

多線程中操作數據庫總結

總的來說 pthreads 仍然處在發展中,仍有一些不足的地方,我們也可以看到pthreads的git在不斷改進這個項目

數據庫持久鏈接很重要,否則每個線程都會開啟一次數據庫連接,然後關閉,會導致很多鏈接超時。

<?php

$dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array(

    PDO::ATTR_PERSISTENT => true

));

?>

Copyright © Linux教程網 All Rights Reserved