Download AMQP extension from here
To compile and install follow this link
There are three steps
Sending Message
Receive Message
To compile and install follow this link
There are three steps
Note: Here i have used Custom_ namespace for Zend. You can change the namespace as per your choice.
Step 1 (Create Adapter class)
Step 2(Create Exchange Class)
Step 3(Overwrite Zend_Queue Class)
Usage:Step 1 (Create Adapter class)
/**
* Class for using a Rabbitmq as a queue
*
* @category Custom
* @package Custom_Queue_Rabbitmq
* @subpackage Adapter
* @author Ritesh Jha
* @copyright Copyright (c) (http://mailrkj(at)gmail(dot)com)
*/
class Custom_Queue_Adapter_Rabbitmq extends Zend_Queue_Adapter_AdapterAbstract
{
/**
* @var object AMQP connection object
*/
protected $_cnn = array();
/**
*
* @var object AMQP excahnge object
*/
protected $_exchange = null;
/**
*
* @var object AMQP queue object
*/
protected $Queue = null;
/**
*
* @var object AMQP queue object
*/
protected $QueueFlag = AMQP_DURABLE;
/**
* Constructor
*
* @param array|Zend_Config $options
* options (host,port,login,password)
* @return AMQPConnection instance
*/
public function __construct($options, Zend_Queue $queue = null)
{
parent::__construct($options, $queue);
if(is_array($options))
{
try
{
$cnn = new AMQPConnection($options);
$cnn->connect();
if(!$cnn->isConnected())
{
throw new Zend_Queue_Exception("Unable to connect RabbitMQ server");
}
else
{
$this->_cnn = $cnn;
$this->Queue = new AMQPQueue($this->_cnn);
}
}catch(Exception $e){throw new Zend_Queue_Exception($e->getMessage());}
}
else
{
throw new Zend_Queue_Exception("The options must be an associative array of host,port,login, password ...");
}
}
/**
* Get AMQPConnection object
* @return object
*/
public function getConnection()
{
return $this->_cnn;
}
/**
* Set exchange for sending message to queue
* @param string $name
* @param string $type (AMQP_EX_TYPE_DIRECT, AMQP_EX_TYPE_FANOUT, AMQP_EX_TYPE_TOPIC or AMQP_EX_TYPE_HEADER)
* @param int $flags (AMQP_PASSIVE, AMQP_DURABLE, AMQP_AUTODELETE)
* @return boolean
*/
public function setExchange($exchange , $routingKey = "*", $type = AMQP_EX_TYPE_DIRECT, $flags = AMQP_DURABLE)
{
if($exchange instanceof Custom_Queue_Exchange)
{
$this->_exchange = $exchange;
}
else
{
$exchange = new Custom_Queue_Exchange($this->_cnn, $exchange , $type, $flags);
$this->_exchange = $exchange;
}
$this->setRoutingKey($routingKey);
return $exchange;
}
/**
* Set routing key for queu
* @param string $routing_key
* @param Custom_Queue $queue
* @return bool
*/
public function setRoutingKey($routingKey, Custom_Queue $queue = null)
{
if($queue)
$queueName = $queue->getName ();
else
$queueName = $this->_queue->getName();
return $this->_exchange->bind($queueName, $routingKey);
}
/**
* get AMQPQueue object
* @return
*/
public function setQueueFlag($flag)
{
return $this->QueueFlag = $flag;
}
/**
* create queue
* @param $name
* @param $timeout
*/
public function create($name, $timeout=null)
{
return $this->Queue->declare($name, $this->QueueFlag);
}
/**
* delete queue
* @param $name
* @param $timeout
*/
public function delete($name)
{
return $this->Queue->delete($name);
}
/**
* Publish message to queue
* @param mixed $message (array or string)
* @param Custom_Queue $queue
* @return boolean
*/
public function send($message, Zend_Queue $queue = null)
{
if(is_array($message))
{
$message = Zend_Json_Encoder::encode($message);
}
if($queue)
$routingKey = $queue->getOption('routingKey');
else
$routingKey = $this->_queue->getOption('routingKey');
if($this->_exchange)
{
return $this->_exchange->publish($message, $routingKey, AMQP_MANDATORY, array('delivery_mode' => 2));
}
else
{
throw new Zend_Queue_Exception("Rabbitmq exchange not found");
}
}
/**
*
* @param array $options (min, max. ack)
* @param int $timeout
* @param Zend_Queue $queue
* @return
*/
public function receive($options = null, $timeout = null, Zend_Queue $queue = null)
{
$messages = $this->Queue->get();
return $messages;
}
public function getCapabilities(){
return array(
'create' => true,
'delete' => true,
'send' => true,
);
}
public function isExists($name){}
public function getQueues(){}
public function count(Zend_Queue $queue = null){}
public function deleteMessage(Zend_Queue_Message $message){}
}
Step 2(Create Exchange Class)
class Custom_Queue_Exchange extends AMQPExchange
{
public function __construct ( AMQPConnection $connection , $exchange_name , $type = AMQP_EX_TYPE_DIRECT, $flags = AMQP_AUTODELETE)
{
parent::__construct ($connection);
$this->declare($exchange_name, $type, $flags);
}
}
Step 3(Overwrite Zend_Queue Class)
/**
* Class for using a Rabbitmq as a queue
*
* @category Custom
* @package Custom_Queue
* @subpackage Adapter
* @author Ritesh Jha
* @copyright Copyright (c) (http://mailrkj(at)gmail(dot)com)
*/
class Custom_Queue extends Zend_Queue
{
var $_instance = null;
public function __construct($adapter, $options = array())
{
if($adapter instanceof Custom_Queue_Adapter_Rabbitmq)
{
parent::__construct($adapter, $options);
#declare new queue
$queueName = (array_key_exists('name', $options))?$options['name']:'queue';
if(array_key_exists('flag', $options)) $adapter->setQueueFlag ($options['flag']);
$queue = $adapter->create($queueName);
$this->_setName($queueName);
#declare exchange
$routingKey = (array_key_exists('routingKey', $options))?$options['routingKey']:'*';
$exchangeName = (array_key_exists('exchange', $options))?$options['exchange']:'exchange';
$ex = $adapter->setExchange($exchangeName, $routingKey);
$this->setOptions($options);
}
else
{
throw new Zend_Queue_Exception("Invalid Rabbitmq adapter");
}
}
/**
* Create a new queue
* @param string $name queue name
* @param int $flag A bitmask of any of the flags: AMQP_AUTODELETE, AMQP_PASSIVE, AMQP_DURABLE, AMQP_NOACK.
* @return int (message count)
*/
public function createQueue($name, $flag = AMQP_DURABLE)
{
$this->getAdapter()->setQueueFlag($flag);
parent::createQueue($name);
}
/**
* Delete a queue and all of it's messages
* Returns false if the queue is not delete, true if the queue deleted
* @param string $name queue name
* @return boolean
*/
public function deleteQueue($name)
{
return $this->getAdapter()->delete($name);
}
/**
* Send a message to the queue
*
* @param array|string $message message
* @return Zend_Queue_Message
* @throws Zend_Queue_Exception
*/
public function send($message)
{
return $this->getAdapter()->send($message);
}
/**
* Consume message
*
* @param array $options
* @param int $timeout
* @return array
*/
public function receive($options = null, $timeout = null)
{
return $this->getAdapter()->receive($options, $timeout);
}
}
Sending Message
try{
$adapter = new Custom_Queue_Adapter_Rabbitmq($adapteroptions);
$queue = new Custom_Queue($adapter, $queueoptions);
$message = 'Testing queue';
if($queue->send($message))
echo "Message published \n";
else
echo "Not";
} catch (Exception $e){echo $e->getMessage(); }
Receive Message
try{
$adapter = new Custom_Queue_Adapter_Rabbitmq($adapteroptions);
$queue = new Custom_Queue($adapter, $queueoptions);
$messages = $queue->receive($options);
} catch (Exception $e){echo $e->getMessage(); }