Showing posts with label Zend_Queue. Show all posts
Showing posts with label Zend_Queue. Show all posts

Monday, March 28, 2011

RabbitMQ adapter for Zend Framework and Zend Queue using AMQP php library

You can use this class as Zend_Queue adapter for RabbitMQ queue system using AMQP php library.

Download AMQP extension from here

To compile and install follow this link

There are three steps

Note: Here i have used Custom_ namespace for Zend. You can change the namespace as per your choice.

Step 1 (Create Adapter class)

/**
* Class for using a Rabbitmq as a queue
*
* @category Custom
* @package Custom_Queue_Rabbitmq
* @subpackage Adapter
* @author Ritesh Jha
* @copyright Copyright (c) (http://mailrkj(at)gmail(dot)com)
*/

class Custom_Queue_Adapter_Rabbitmq extends Zend_Queue_Adapter_AdapterAbstract
{
/**
* @var object AMQP connection object
*/
protected $_cnn = array();

/**
*
* @var object AMQP excahnge object
*/
protected $_exchange = null;

/**
*
* @var object AMQP queue object
*/
protected $Queue = null;


/**
*
* @var object AMQP queue object
*/
protected $QueueFlag = AMQP_DURABLE;

/**
* Constructor
*
* @param array|Zend_Config $options
* options (host,port,login,password)
* @return AMQPConnection instance
*/
public function __construct($options, Zend_Queue $queue = null)
{
parent::__construct($options, $queue);

if(is_array($options))
{
try
{
$cnn = new AMQPConnection($options);
$cnn->connect();

if(!$cnn->isConnected())
{
throw new Zend_Queue_Exception("Unable to connect RabbitMQ server");
}
else
{
$this->_cnn = $cnn;
$this->Queue = new AMQPQueue($this->_cnn);
}
}catch(Exception $e){throw new Zend_Queue_Exception($e->getMessage());}
}
else
{
throw new Zend_Queue_Exception("The options must be an associative array of host,port,login, password ...");
}
}

/**
* Get AMQPConnection object
* @return object
*/
public function getConnection()
{
return $this->_cnn;
}


/**
* Set exchange for sending message to queue
* @param string $name
* @param string $type (AMQP_EX_TYPE_DIRECT, AMQP_EX_TYPE_FANOUT, AMQP_EX_TYPE_TOPIC or AMQP_EX_TYPE_HEADER)
* @param int $flags (AMQP_PASSIVE, AMQP_DURABLE, AMQP_AUTODELETE)
* @return boolean
*/
public function setExchange($exchange , $routingKey = "*", $type = AMQP_EX_TYPE_DIRECT, $flags = AMQP_DURABLE)
{
if($exchange instanceof Custom_Queue_Exchange)
{
$this->_exchange = $exchange;
}
else
{
$exchange = new Custom_Queue_Exchange($this->_cnn, $exchange , $type, $flags);
$this->_exchange = $exchange;
}
$this->setRoutingKey($routingKey);

return $exchange;
}

/**
* Set routing key for queu
* @param string $routing_key
* @param Custom_Queue $queue
* @return bool
*/
public function setRoutingKey($routingKey, Custom_Queue $queue = null)
{
if($queue)
$queueName = $queue->getName ();
else
$queueName = $this->_queue->getName();

return $this->_exchange->bind($queueName, $routingKey);
}

/**
* get AMQPQueue object
* @return
*/
public function setQueueFlag($flag)
{
return $this->QueueFlag = $flag;
}

/**
* create queue
* @param $name
* @param $timeout
*/
public function create($name, $timeout=null)
{
return $this->Queue->declare($name, $this->QueueFlag);
}

/**
* delete queue
* @param $name
* @param $timeout
*/
public function delete($name)
{
return $this->Queue->delete($name);
}

/**
* Publish message to queue
* @param mixed $message (array or string)
* @param Custom_Queue $queue
* @return boolean
*/
public function send($message, Zend_Queue $queue = null)
{
if(is_array($message))
{
$message = Zend_Json_Encoder::encode($message);
}

if($queue)
$routingKey = $queue->getOption('routingKey');
else
$routingKey = $this->_queue->getOption('routingKey');

if($this->_exchange)
{
return $this->_exchange->publish($message, $routingKey, AMQP_MANDATORY, array('delivery_mode' => 2));
}
else
{
throw new Zend_Queue_Exception("Rabbitmq exchange not found");
}
}

/**
*
* @param array $options (min, max. ack)
* @param int $timeout
* @param Zend_Queue $queue
* @return
*/
public function receive($options = null, $timeout = null, Zend_Queue $queue = null)
{
$messages = $this->Queue->get();
return $messages;
}

public function getCapabilities(){
return array(
'create' => true,
'delete' => true,
'send' => true,
);
}

public function isExists($name){}
public function getQueues(){}
public function count(Zend_Queue $queue = null){}
public function deleteMessage(Zend_Queue_Message $message){}
}


Step 2(Create Exchange Class)


class Custom_Queue_Exchange extends AMQPExchange
{
public function __construct ( AMQPConnection $connection , $exchange_name , $type = AMQP_EX_TYPE_DIRECT, $flags = AMQP_AUTODELETE)
{
parent::__construct ($connection);
$this->declare($exchange_name, $type, $flags);
}
}


Step 3(Overwrite Zend_Queue Class)

/**
* Class for using a Rabbitmq as a queue
*
* @category Custom
* @package Custom_Queue
* @subpackage Adapter
* @author Ritesh Jha
* @copyright Copyright (c) (http://mailrkj(at)gmail(dot)com)
*/

class Custom_Queue extends Zend_Queue
{
var $_instance = null;

public function __construct($adapter, $options = array())
{
if($adapter instanceof Custom_Queue_Adapter_Rabbitmq)
{
parent::__construct($adapter, $options);

#declare new queue
$queueName = (array_key_exists('name', $options))?$options['name']:'queue';
if(array_key_exists('flag', $options)) $adapter->setQueueFlag ($options['flag']);
$queue = $adapter->create($queueName);
$this->_setName($queueName);
#declare exchange
$routingKey = (array_key_exists('routingKey', $options))?$options['routingKey']:'*';
$exchangeName = (array_key_exists('exchange', $options))?$options['exchange']:'exchange';
$ex = $adapter->setExchange($exchangeName, $routingKey);
$this->setOptions($options);
}
else
{
throw new Zend_Queue_Exception("Invalid Rabbitmq adapter");
}
}

/**
* Create a new queue
* @param string $name queue name
* @param int $flag A bitmask of any of the flags: AMQP_AUTODELETE, AMQP_PASSIVE, AMQP_DURABLE, AMQP_NOACK.
* @return int (message count)
*/
public function createQueue($name, $flag = AMQP_DURABLE)
{
$this->getAdapter()->setQueueFlag($flag);
parent::createQueue($name);
}


/**
* Delete a queue and all of it's messages
* Returns false if the queue is not delete, true if the queue deleted
* @param string $name queue name
* @return boolean
*/
public function deleteQueue($name)
{
return $this->getAdapter()->delete($name);
}

/**
* Send a message to the queue
*
* @param array|string $message message
* @return Zend_Queue_Message
* @throws Zend_Queue_Exception
*/
public function send($message)
{
return $this->getAdapter()->send($message);
}

/**
* Consume message
*
* @param array $options
* @param int $timeout
* @return array
*/
public function receive($options = null, $timeout = null)
{
return $this->getAdapter()->receive($options, $timeout);
}
}
Usage:

Sending Message
try{
$adapter = new Custom_Queue_Adapter_Rabbitmq($adapteroptions);
$queue = new Custom_Queue($adapter, $queueoptions);
$message = 'Testing queue';
if($queue->send($message))
echo "Message published \n";
else
echo "Not";

} catch (Exception $e){echo $e->getMessage(); }

Receive Message

try{
$adapter = new Custom_Queue_Adapter_Rabbitmq($adapteroptions);
$queue = new Custom_Queue($adapter, $queueoptions);
$messages = $queue->receive($options);
} catch (Exception $e){echo $e->getMessage(); }