php-memcache 消息队列

php

浏览数:394

2019-1-13

class memq {

    private $q_name;
    private $host = '127.0.0.1';
    private $port = '11211';

    public function __construct($q_name) {
        $this->q_name = $q_name;
        $this->start_key = 'memq_start_' . $q_name;
        $this->end_key = 'memq_end_' . $q_name;
        $this->lock_key = 'memq_lock_' . $q_name;
        $this->item_key_format = 'memq_item_' . $q_name . '_%d';
        $this->_connect();
        $this->memc->add($this->start_key, 0);
        $this->memc->add($this->end_key, 0);
    }
    public function push($elem) {
        $this->_lock();
        $i = $this->memc->increment($this->end_key);
        $this->memc->set(sprintf($this->item_key_format, $i), $elem);
        $this->_unlock();
        return $elem;
    }
    public function pop() {
        $this->_lock();
        $i = $this->memc->increment($this->start_key);
        $key = sprintf($this->item_key_format, $i);
        $elem = $this->memc->get($key);
        if (false === $elem) { // eoq
            $this->memc->increment($this->end_key);
        }else{
            $this->memc->delete($key);
        }
        $this->_unlock();
        return $elem;
    }
    public function peek() {
        $i = $this->memc->get($this->start_key);
        return $this->memc->get(sprintf($this->item_key_format, $i+1));
    }
    public function count() {
        $this->_lock();
        $count = ($this->memc->get($this->end_key) - $this->memc->get($this->start_key));
        $this->_unlock();
        return $count;
    }
    public function is_empty() {
        return ($this->count() == 0);
    }
    public function delete() {
        $this->_lock();
        $start = $this->memc->get($this->start_key);
        $end = $this->memc->get($this->end_key);
        for ($i=$start; $i < $end; $i++) {
            $this->memc->delete(sprintf($this->item_key_format, $i+1));
        }
        $this->memc->set($this->start_key, $end);
        $this->_unlock();
    }
    private function _connect() {
        $this->memc = new memcache;
        $this->memc->addServer($this->host, $this->port);
    }
    private function _lock() {
        while (!$this->memc->add($this->lock_key, '1', 0, 2)) {
            usleep(100);
        }
    }
    private function _unlock() {
        $this->memc->delete($this->lock_key);
    }
}