温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

PHP中如何实现基于Redis的MessageQueue队列封装

发布时间:2021-06-03 11:13:24 来源:亿速云 阅读:228 作者:小新 栏目:web开发

小编给大家分享一下PHP中如何实现基于Redis的MessageQueue队列封装,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!

Redis的链表List可以用来做链表,高并发的特性非常适合做分布式的并行消息传递。

左进右出

$redis->lPush($key, $value); $redis->rPop($key);

以下程序已在生产环境中正式使用。

基于Redis的PHP消息队列封装

<?php /**  * Created by PhpStorm.  * User: huyanping  * Date: 14-8-19  * Time: 下午12:10  *  * 基于Redis的消息队列封装  */ namespace Zebra\MessageQueue; class RedisMessageQueue implements IMessageQueue {   protected $redis_server;   protected $server;   protected $port;   /**    * @var 消息队列标志    */   protected $key;   /**    * 构造队列,创建redis链接    * @param $server_config    * @param $key    * @param bool $p_connect    */   public function __construct($server_config = array('IP' => '127.0.0.1', 'PORT' => '6379'), $key = 'redis_message_queue', $p_connect = false)   {     if (empty($key))       throw new \Exception('message queue key can not be empty');     $this->server = $server_config['IP'];     $this->port = $server_config['PORT'];     $this->key = $key;     $this->check_environment();     if ($p_connect) {       $this->pconnect();     } else {       $this->connect();     }   }   /**    * 析构函数,关闭redis链接,使用长连接时,最好主动调用关闭    */   public function __destruct()   {     $this->close();   }   /**    * 短连接    */   private function connect()   {     $this->redis_server = new \Redis();     $this->redis_server->connect($this->server, $this->port);   }   /**    * 长连接    */   public function pconnect()   {     $this->redis_server = new \Redis();     $this->redis_server->pconnect($this->server, $this->port);   }   /**    * 关闭链接    */   public function close()   {     $this->redis_server->close();   }   /**    * 向队列插入一条信息    * @param $message    * @return mixed    */   public function put($message)   {     return $this->redis_server->lPush($this->key, $message);   }   /**    * 向队列中插入一串信息    * @param $message    * @return mixed    */   public function puts(){     $params = func_get_args();     $message_array = array_merge(array($this->key), $params);     return call_user_func_array(array($this->redis_server, 'lPush'), $message_array);   }   /**    * 从队列顶部获取一条记录    * @return mixed    */   public function get()   {     return $this->redis_server->lPop($this->key);   }   /**    * 选择数据库,可以用于区分不同队列    * @param $database    */   public function select($database)   {     $this->redis_server->select($database);   }   /**    * 获得队列状态,即目前队列中的消息数量    * @return mixed    */   public function size()   {     return $this->redis_server->lSize($this->key);   }   /**    * 获取某一位置的值,不会删除该位置的值    * @param $pos    * @return mixed    */   public function view($pos)   {     return $this->redis_server->lGet($this->key, $pos);   }   /**    * 检查Redis扩展    * @throws Exception    */   protected function check_environment()   {     if (!\extension_loaded('redis')) {       throw new \Exception('Redis extension not loaded');     }   } }

如果需要一次写入多个队列,可以使用如下调用方式:

<?php $redis = new RedisMessageQueue(); $redis->puts(1, 2, 3, 4); $redis->puts(5, 6, 7, 8, 9);

模仿HTTPSQS输出结果的封装如下,提供了写入位置和读取位置记录的功能:

<?php /**  * Created by PhpStorm.  * User: huyanping  * Date: 14-9-5  * Time: 下午2:16  *  * 附加了队列状态信息的RedisMessageQueue  */ namespace Zebra\MessageQueue; class RedisMessageQueueStatus extends RedisMessageQueue {   protected $record_status;   protected $put_position;   protected $get_position;   public function __construct(     $server_config = array('IP' => '127.0.0.1', 'PORT' => '6379'),     $key = 'redis_message_queue',     $p_connect = false,     $record_status=true   ){     parent::__construct($server_config, $key, $p_connect);     $this->record_status = $record_status;     $this->put_position = $this->key . '_put_position';     $this->get_position = $this->key . '_get_position';   }   public function get(){     if($queue = parent::get()){       $incr_result = $this->redis_server->incr($this->get_position);       if(!$incr_result) throw new \Exception('can not mark get position,please check the redis server');       return $queue;     }else{       return false;     }   }   public function put($message){     if(parent::put($message)){       $incr_result = $this->redis_server->incr($this->put_position);       if(!$incr_result) throw new \Exception('can not mark put position,please check the redis server');       return true;     }else{       return false;     }   }   public function puts_status(){     $message_array = func_get_args();     $result = call_user_func_array(array($this, 'puts'), $message_array);     if($result){       $this->redis_server->incrBy($this->put_position, count($message_array));       return true;     }     return false;   }   public function size(){     return $this->redis_server->lSize($this->key);   }   public function status(){     $status['put_position'] = ($put_position = $this->redis_server->get($this->put_position)) ? $put_position : 0;     $status['get_position'] = ($get_position = $this->redis_server->get($this->get_position)) ? $get_position : 0;     $status['unread_queue'] = $this->size();     $status['queue_name'] = $this->key;     $status['server'] = $this->server;     $status['port'] = $this->port;     return $status;   }   public function status_normal(){     $status = $this->status();     $message = 'Redis Message Queue' . PHP_EOL;     $message .= '-------------------' . PHP_EOL;     $message .= 'Message queue name:' . $status['queue_name'] . PHP_EOL;     $message .= 'Put position of queue:' . $status['put_position'] . PHP_EOL;     $message .= 'Get position of queue:' . $status['get_position'] . PHP_EOL;     $message .= 'Number of unread queue:' . $status['unread_queue'] . PHP_EOL;     return $message;   }   public function status_json(){     return \json_encode($this->status());   } }

看完了这篇文章,相信你对“PHP中如何实现基于Redis的MessageQueue队列封装”有了一定的了解,如果想了解更多相关知识,欢迎关注亿速云行业资讯频道,感谢各位的阅读!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI