博客
关于我
php多线程
阅读量:793 次
发布时间:2023-03-01

本文共 6057 字,大约阅读时间需要 20 分钟。

PHP 高级编程之多线程

1. 多线程环境安装

1.1 PHP 5.5.9 安装

安装 PHP 5.5.9:

./configure --prefix=/srv/php-5.5.9 \  --with-config-file-path=/srv/php-5.5.9/etc \  --with-config-file-scan-dir=/srv/php-5.5.9/etc/conf.d \  --enable-fpm \  --with-fpm-user=www \  --with-fpm-group=www \  --with-pear \  --with-curl \  --with-gd \  --with-jpeg-dir \  --with-png-dir \  --with-freetype-dir \  --with-zlib-dir \  --with-iconv \  --with-mcrypt \  --with-mhash \  --with-pdo-mysql \  --with-mysql-sock=/var/lib/mysql/mysql.sock \  --with-openssl \  --with-xsl \  --with-recode \  --enable-sockets \  --enable-soap \  --enable-mbstring \  --enable-gd-native-ttf \  --enable-zip \  --enable-xml \  --enable-bcmath \  --enable-calendar \  --enable-shmop \  --enable-dba \  --enable-wddx \  --enable-sysvsem \  --enable-sysvshm \  --enable-sysvmsg \  --enable-opcache \  --enable-pcntl \  --disable-debug

编译时必须启用zts支持,否则无法安装 pthreads(--enable-maintainer-zts)。

1.2 安装 pthreads 扩展

安装 pthreads:

curl -s https://raw.github.com/oscm/shell/master/php/pecl/pthreads.sh | bash

查看是否安装成功:

php -m | grep pthreads

2. Thread 类

world = $world; }}public function run() { print_r(sprintf("Hello %s\n", $this->world));}?>

使用示例:

$thread = new HelloWorld("World");if ($thread->start()) {    printf("Thread #%lu says: %s\n", $thread->getThreadId(), $thread->join());}?>

3. Worker 与 Stackable

sql = $sql; }}public function run() { $dbh = $this->worker->getConnection(); $row = $dbh->query($this->sql); while ($member = $row->fetch(PDO::FETCH_ASSOC)) { print_r($member); }}?>
class ExampleWorker extends Worker {    public static $dbh;    public function __construct($name) {    }    public function run() {        self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example', 'www', '123456');    }    public function getConnection() {        return self::$dbh;    }}$worker = new ExampleWorker("My Worker Thread");$work = new SQLQuery('select * from members order by id desc limit 5');$worker->stack($work);$worker->start();$worker->shutdown();

4. 互斥锁

互斥锁用于控制多个线程对同一资源的访问,防止竞态条件。以下是一个简单的计数器程序:

$counter = 0;class CounterThread extends Thread {    public function __construct($mutex = null) {        $this->mutex = $mutex;        $this->handle = fopen("/tmp/counter.txt", "w+");    }    public function __destruct() {        fclose($this->handle);    }    public function run() {        if ($this->mutex) {            $locked = Mutex::lock($this->mutex);            $counter = intval(fgets($this->handle));            $counter++;            rewind($this->handle);            fputs($this->handle, $counter);            Mutex::unlock($this->mutex);        }        printf("Thread #%lu says: %s\n", $this->getThreadId(), $counter);    }}

5. 线程同步

线程同步可以通过 wait() 方法实现,等待信号后再运行:

public function run() {    $this->synchronized(function($thread) {        $thread->wait();    }, $this);    // 业务逻辑}

6. 线程池

6.1 自行实现一个 Pool 类
class Update extends Thread {    public $running = false;    public $row = array();    public function __construct($row) {        $this->row = $row;    }    public function run() {        if (strlen($this->row['bankno']) > 100) {            // 业务逻辑        } else {            // 业务逻辑        }    }}class Pool {    public $pool = array();    public $count = 5;    public function push($row) {        if (count($this->pool) < $this->count) {            $this->pool[] = new Update($row);            return true;        } else {            return false;        }    }    public function start() {        foreach ($this->pool as $id => $worker) {            $worker->start();        }    }    public function join() {        foreach ($this->pool as $id => $worker) {            $worker->join();        }    }    public function clean() {        foreach ($this->pool as $id => $worker) {            if (!$worker->isRunning()) {                unset($this->pool[$id]);            }        }    }}
6.2 动态队列线程池
class ExampleWorker extends Worker {    public function __construct(Logging $logger) {        $this->logger = $logger;    }}class Work extends Stackable {    public function __construct($data) {        $this->data = $data;    }}$pool = new Pool(200, ExampleWorker::class, [new Logging()]);$host = 'db.example.com';$user = 'example.com';$pass = 'password';$dbname = 'example';try {    $dbh = new PDO("mysql:host=$host;dbname=$dbname", $user, $pass, [        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',        PDO::MYSQL_ATTR_COMPRESS => true,        PDO::ATTR_PERSISTENT => true,    ]);    $sql = "select id,bankno from members order by id desc limit 50";    $row = $dbh->query($sql);    while ($member = $row->fetch(PDO::FETCH_ASSOC)) {        $pool->submit(new Work($member));        $pool->start();        $pool->join();        $pool->clean();    }} catch (Exception $e) {    echo '[' . date('H:i:s') . ']' . '系统错误' . $e->getMessage() . "\n";}

7. 多线程文件安全读写

文件锁防止多线程同时读写或写入文件:

$fp = fopen("/tmp/lock.txt", "r+");if (flock($fp, LOCK_EX)) {    ftruncate($fp, 0);    fwrite($fp, "Write something here\n");    fflush($fp);    flock($fp, LOCK_UN);} else {    echo "Couldn't get the lock!";}fclose($fp);

8. 多线程与数据连接

pthreads 与 PDO 的结合需要注意事项:

class ExampleWorker extends Worker {    public static $dbh;    public function run() {        $dbh = $this->getInstance();        // 业务逻辑    }    public static function getInstance() {        return self::$dbh;    }}

9. Thread And ZeroMQ

使用 ZeroMQ 进行数据库变更通知:

CREATE DEFINER=`dba`@`192.168.%` PROCEDURE `Table_Example`(IN `TICKET` INT, IN `LOGIN` INT, IN `CMD` INT, IN `VOLUME` INT)LANGUAGE SQLNOT DETERMINISTICREADS SQL DATASQL SECURITY DEFINERCOMMENT '交易监控'BEGIN    IF CMD IN ('0','1')        IF VOLUME >=10 AND VOLUME <=90            select coding into Example from example.members where username = LOGIN and coding = 'Y';            IF Example = 'Y'                select zmq_client('tcp://192.168.2.15:5555', CONCAT(TICKET, ',', LOGIN, ',', VOLUME));            END IF        END IF    END IFEND
class ExampleWorker extends Worker {    public function run() {        try {            $dbh = $this->getInstance();

转载地址:http://antfk.baihongyu.com/

你可能感兴趣的文章
PEP 8016 获胜,成为新的 Python 社区治理方案
查看>>
PEP8规范
查看>>
PEPM Cookie 远程代码执行漏洞复现(XVE-2024-16919)
查看>>
Percona Server 5.6 安装TokuDB
查看>>
SpringBoot(十四)整合MyBatis
查看>>
percona-xtrabackup 备份
查看>>
SpringBoot集成OpenOffice实现doc文档转html
查看>>
ROS中机器人的强化学习路径规划器
查看>>
perl---2012学习笔记
查看>>
Perl6 必应抓取(1):测试版代码
查看>>
Perl的基本語法
查看>>
perl输出中文有乱码
查看>>
Permission denied (publickey,gssapi-keyex,gssapi-with-mic,password). 大数据ssh权限问题 hadoop起不来 hadoopssh错
查看>>
PermissionError:Python 中的 [Errno 13]
查看>>
PermissionError:[Errno 13] 权限被拒绝:‘/manage.py‘
查看>>
Permutation
查看>>
PE文件,节头有感IMAGE_SECTION_HEADER
查看>>
PE查找文件偏移地址
查看>>
PE知识复习之PE的导入表
查看>>
PFX(Parallel Framework) and Traditional Multithreading
查看>>