Download AMQP extension from here
To compile and install follow this link
There are three steps
Sending Message
Receive Message
To compile and install follow this link
There are three steps
Note: Here i have used Custom_ namespace for Zend. You can change the namespace as per your choice.
Step 1 (Create Adapter class)
Step 2(Create Exchange Class)
Step 3(Overwrite Zend_Queue Class)
Usage:Step 1 (Create Adapter class)
/**
* Class for using a Rabbitmq as a queue
*
* @category Custom
* @package Custom_Queue_Rabbitmq
* @subpackage Adapter
* @author Ritesh Jha
* @copyright Copyright (c) (http://mailrkj(at)gmail(dot)com)
*/
class Custom_Queue_Adapter_Rabbitmq extends Zend_Queue_Adapter_AdapterAbstract
{
/**
* @var object AMQP connection object
*/
protected $_cnn = array();
/**
*
* @var object AMQP excahnge object
*/
protected $_exchange = null;
/**
*
* @var object AMQP queue object
*/
protected $Queue = null;
/**
*
* @var object AMQP queue object
*/
protected $QueueFlag = AMQP_DURABLE;
/**
* Constructor
*
* @param array|Zend_Config $options
* options (host,port,login,password)
* @return AMQPConnection instance
*/
public function __construct($options, Zend_Queue $queue = null)
{
parent::__construct($options, $queue);
if(is_array($options))
{
try
{
$cnn = new AMQPConnection($options);
$cnn->connect();
if(!$cnn->isConnected())
{
throw new Zend_Queue_Exception("Unable to connect RabbitMQ server");
}
else
{
$this->_cnn = $cnn;
$this->Queue = new AMQPQueue($this->_cnn);
}
}catch(Exception $e){throw new Zend_Queue_Exception($e->getMessage());}
}
else
{
throw new Zend_Queue_Exception("The options must be an associative array of host,port,login, password ...");
}
}
/**
* Get AMQPConnection object
* @return object
*/
public function getConnection()
{
return $this->_cnn;
}
/**
* Set exchange for sending message to queue
* @param string $name
* @param string $type (AMQP_EX_TYPE_DIRECT, AMQP_EX_TYPE_FANOUT, AMQP_EX_TYPE_TOPIC or AMQP_EX_TYPE_HEADER)
* @param int $flags (AMQP_PASSIVE, AMQP_DURABLE, AMQP_AUTODELETE)
* @return boolean
*/
public function setExchange($exchange , $routingKey = "*", $type = AMQP_EX_TYPE_DIRECT, $flags = AMQP_DURABLE)
{
if($exchange instanceof Custom_Queue_Exchange)
{
$this->_exchange = $exchange;
}
else
{
$exchange = new Custom_Queue_Exchange($this->_cnn, $exchange , $type, $flags);
$this->_exchange = $exchange;
}
$this->setRoutingKey($routingKey);
return $exchange;
}
/**
* Set routing key for queu
* @param string $routing_key
* @param Custom_Queue $queue
* @return bool
*/
public function setRoutingKey($routingKey, Custom_Queue $queue = null)
{
if($queue)
$queueName = $queue->getName ();
else
$queueName = $this->_queue->getName();
return $this->_exchange->bind($queueName, $routingKey);
}
/**
* get AMQPQueue object
* @return
*/
public function setQueueFlag($flag)
{
return $this->QueueFlag = $flag;
}
/**
* create queue
* @param $name
* @param $timeout
*/
public function create($name, $timeout=null)
{
return $this->Queue->declare($name, $this->QueueFlag);
}
/**
* delete queue
* @param $name
* @param $timeout
*/
public function delete($name)
{
return $this->Queue->delete($name);
}
/**
* Publish message to queue
* @param mixed $message (array or string)
* @param Custom_Queue $queue
* @return boolean
*/
public function send($message, Zend_Queue $queue = null)
{
if(is_array($message))
{
$message = Zend_Json_Encoder::encode($message);
}
if($queue)
$routingKey = $queue->getOption('routingKey');
else
$routingKey = $this->_queue->getOption('routingKey');
if($this->_exchange)
{
return $this->_exchange->publish($message, $routingKey, AMQP_MANDATORY, array('delivery_mode' => 2));
}
else
{
throw new Zend_Queue_Exception("Rabbitmq exchange not found");
}
}
/**
*
* @param array $options (min, max. ack)
* @param int $timeout
* @param Zend_Queue $queue
* @return
*/
public function receive($options = null, $timeout = null, Zend_Queue $queue = null)
{
$messages = $this->Queue->get();
return $messages;
}
public function getCapabilities(){
return array(
'create' => true,
'delete' => true,
'send' => true,
);
}
public function isExists($name){}
public function getQueues(){}
public function count(Zend_Queue $queue = null){}
public function deleteMessage(Zend_Queue_Message $message){}
}
Step 2(Create Exchange Class)
class Custom_Queue_Exchange extends AMQPExchange
{
public function __construct ( AMQPConnection $connection , $exchange_name , $type = AMQP_EX_TYPE_DIRECT, $flags = AMQP_AUTODELETE)
{
parent::__construct ($connection);
$this->declare($exchange_name, $type, $flags);
}
}
Step 3(Overwrite Zend_Queue Class)
/**
* Class for using a Rabbitmq as a queue
*
* @category Custom
* @package Custom_Queue
* @subpackage Adapter
* @author Ritesh Jha
* @copyright Copyright (c) (http://mailrkj(at)gmail(dot)com)
*/
class Custom_Queue extends Zend_Queue
{
var $_instance = null;
public function __construct($adapter, $options = array())
{
if($adapter instanceof Custom_Queue_Adapter_Rabbitmq)
{
parent::__construct($adapter, $options);
#declare new queue
$queueName = (array_key_exists('name', $options))?$options['name']:'queue';
if(array_key_exists('flag', $options)) $adapter->setQueueFlag ($options['flag']);
$queue = $adapter->create($queueName);
$this->_setName($queueName);
#declare exchange
$routingKey = (array_key_exists('routingKey', $options))?$options['routingKey']:'*';
$exchangeName = (array_key_exists('exchange', $options))?$options['exchange']:'exchange';
$ex = $adapter->setExchange($exchangeName, $routingKey);
$this->setOptions($options);
}
else
{
throw new Zend_Queue_Exception("Invalid Rabbitmq adapter");
}
}
/**
* Create a new queue
* @param string $name queue name
* @param int $flag A bitmask of any of the flags: AMQP_AUTODELETE, AMQP_PASSIVE, AMQP_DURABLE, AMQP_NOACK.
* @return int (message count)
*/
public function createQueue($name, $flag = AMQP_DURABLE)
{
$this->getAdapter()->setQueueFlag($flag);
parent::createQueue($name);
}
/**
* Delete a queue and all of it's messages
* Returns false if the queue is not delete, true if the queue deleted
* @param string $name queue name
* @return boolean
*/
public function deleteQueue($name)
{
return $this->getAdapter()->delete($name);
}
/**
* Send a message to the queue
*
* @param array|string $message message
* @return Zend_Queue_Message
* @throws Zend_Queue_Exception
*/
public function send($message)
{
return $this->getAdapter()->send($message);
}
/**
* Consume message
*
* @param array $options
* @param int $timeout
* @return array
*/
public function receive($options = null, $timeout = null)
{
return $this->getAdapter()->receive($options, $timeout);
}
}
Sending Message
try{
$adapter = new Custom_Queue_Adapter_Rabbitmq($adapteroptions);
$queue = new Custom_Queue($adapter, $queueoptions);
$message = 'Testing queue';
if($queue->send($message))
echo "Message published \n";
else
echo "Not";
} catch (Exception $e){echo $e->getMessage(); }
Receive Message
try{
$adapter = new Custom_Queue_Adapter_Rabbitmq($adapteroptions);
$queue = new Custom_Queue($adapter, $queueoptions);
$messages = $queue->receive($options);
} catch (Exception $e){echo $e->getMessage(); }
Ritesh,
ReplyDeletei refatored a bit your version. it is stored here:
https://bitbucket.org/hlopetz/amqp_queue
thanks for your coding!
Andrey
Hlopetz:
ReplyDeletelooks good, and i am glad that it helps you. As i have already written in note above(description section) that you can use your Namespace instead of Custom_.
Ritesh
Hi RKJ im trying to make it work but it crashes when i make the call to Custom_Queue_Adapter_Rabbitmq and i havent been able to debug. Can you please post which values are u using for $adapterOptions ? thanks
ReplyDeleteHi Carlos,
DeleteActually in $adapterOptions you have to put AMQP credentials, the connection parameters are http://www.php.net/manual/en/amqpconnection.construct.php. Andrey refatored the code, you can find it here https://bitbucket.org/hlopetz/amqp_queue
Ritesh
Thanks, actually it was an error between 32bit and 64bit that i have so the ampq.so wasn't loaded. Btw, your lib needs to be adjusted a bit for ampq 1 as the AMQPQueue doesnt receive a connection but receives a channel
ReplyDeletecheers and thanks for sharing
glad that it works for you. didn't get time to review it. If you have implemented with new AMQP, then you can post that part here in comment section so that future visitor can use it.
DeleteI wrote the ZF2 integration for amqp, see: https://github.com/prolic/HumusAmqpModule.
ReplyDelete