本文共 6057 字,大约阅读时间需要 20 分钟。
安装 PHP 5.5.9:
./configure --prefix=/srv/php-5.5.9 \ --with-config-file-path=/srv/php-5.5.9/etc \ --with-config-file-scan-dir=/srv/php-5.5.9/etc/conf.d \ --enable-fpm \ --with-fpm-user=www \ --with-fpm-group=www \ --with-pear \ --with-curl \ --with-gd \ --with-jpeg-dir \ --with-png-dir \ --with-freetype-dir \ --with-zlib-dir \ --with-iconv \ --with-mcrypt \ --with-mhash \ --with-pdo-mysql \ --with-mysql-sock=/var/lib/mysql/mysql.sock \ --with-openssl \ --with-xsl \ --with-recode \ --enable-sockets \ --enable-soap \ --enable-mbstring \ --enable-gd-native-ttf \ --enable-zip \ --enable-xml \ --enable-bcmath \ --enable-calendar \ --enable-shmop \ --enable-dba \ --enable-wddx \ --enable-sysvsem \ --enable-sysvshm \ --enable-sysvmsg \ --enable-opcache \ --enable-pcntl \ --disable-debug
编译时必须启用zts支持,否则无法安装 pthreads(--enable-maintainer-zts)。
安装 pthreads:
curl -s https://raw.github.com/oscm/shell/master/php/pecl/pthreads.sh | bash
查看是否安装成功:
php -m | grep pthreads
world = $world; }}public function run() { print_r(sprintf("Hello %s\n", $this->world));}?> 使用示例:
$thread = new HelloWorld("World");if ($thread->start()) { printf("Thread #%lu says: %s\n", $thread->getThreadId(), $thread->join());}?> sql = $sql; }}public function run() { $dbh = $this->worker->getConnection(); $row = $dbh->query($this->sql); while ($member = $row->fetch(PDO::FETCH_ASSOC)) { print_r($member); }}?> class ExampleWorker extends Worker { public static $dbh; public function __construct($name) { } public function run() { self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example', 'www', '123456'); } public function getConnection() { return self::$dbh; }}$worker = new ExampleWorker("My Worker Thread");$work = new SQLQuery('select * from members order by id desc limit 5');$worker->stack($work);$worker->start();$worker->shutdown(); 互斥锁用于控制多个线程对同一资源的访问,防止竞态条件。以下是一个简单的计数器程序:
$counter = 0;class CounterThread extends Thread { public function __construct($mutex = null) { $this->mutex = $mutex; $this->handle = fopen("/tmp/counter.txt", "w+"); } public function __destruct() { fclose($this->handle); } public function run() { if ($this->mutex) { $locked = Mutex::lock($this->mutex); $counter = intval(fgets($this->handle)); $counter++; rewind($this->handle); fputs($this->handle, $counter); Mutex::unlock($this->mutex); } printf("Thread #%lu says: %s\n", $this->getThreadId(), $counter); }} 线程同步可以通过 wait() 方法实现,等待信号后再运行:
public function run() { $this->synchronized(function($thread) { $thread->wait(); }, $this); // 业务逻辑} class Update extends Thread { public $running = false; public $row = array(); public function __construct($row) { $this->row = $row; } public function run() { if (strlen($this->row['bankno']) > 100) { // 业务逻辑 } else { // 业务逻辑 } }}class Pool { public $pool = array(); public $count = 5; public function push($row) { if (count($this->pool) < $this->count) { $this->pool[] = new Update($row); return true; } else { return false; } } public function start() { foreach ($this->pool as $id => $worker) { $worker->start(); } } public function join() { foreach ($this->pool as $id => $worker) { $worker->join(); } } public function clean() { foreach ($this->pool as $id => $worker) { if (!$worker->isRunning()) { unset($this->pool[$id]); } } }} class ExampleWorker extends Worker { public function __construct(Logging $logger) { $this->logger = $logger; }}class Work extends Stackable { public function __construct($data) { $this->data = $data; }}$pool = new Pool(200, ExampleWorker::class, [new Logging()]);$host = 'db.example.com';$user = 'example.com';$pass = 'password';$dbname = 'example';try { $dbh = new PDO("mysql:host=$host;dbname=$dbname", $user, $pass, [ PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true, PDO::ATTR_PERSISTENT => true, ]); $sql = "select id,bankno from members order by id desc limit 50"; $row = $dbh->query($sql); while ($member = $row->fetch(PDO::FETCH_ASSOC)) { $pool->submit(new Work($member)); $pool->start(); $pool->join(); $pool->clean(); }} catch (Exception $e) { echo '[' . date('H:i:s') . ']' . '系统错误' . $e->getMessage() . "\n";} 文件锁防止多线程同时读写或写入文件:
$fp = fopen("/tmp/lock.txt", "r+");if (flock($fp, LOCK_EX)) { ftruncate($fp, 0); fwrite($fp, "Write something here\n"); fflush($fp); flock($fp, LOCK_UN);} else { echo "Couldn't get the lock!";}fclose($fp); pthreads 与 PDO 的结合需要注意事项:
class ExampleWorker extends Worker { public static $dbh; public function run() { $dbh = $this->getInstance(); // 业务逻辑 } public static function getInstance() { return self::$dbh; }} 使用 ZeroMQ 进行数据库变更通知:
CREATE DEFINER=`dba`@`192.168.%` PROCEDURE `Table_Example`(IN `TICKET` INT, IN `LOGIN` INT, IN `CMD` INT, IN `VOLUME` INT)LANGUAGE SQLNOT DETERMINISTICREADS SQL DATASQL SECURITY DEFINERCOMMENT '交易监控'BEGIN IF CMD IN ('0','1') IF VOLUME >=10 AND VOLUME <=90 select coding into Example from example.members where username = LOGIN and coding = 'Y'; IF Example = 'Y' select zmq_client('tcp://192.168.2.15:5555', CONCAT(TICKET, ',', LOGIN, ',', VOLUME)); END IF END IF END IFEND class ExampleWorker extends Worker { public function run() { try { $dbh = $this->getInstance(); 转载地址:http://antfk.baihongyu.com/