Monday, March 28, 2011

RabbitMQ adapter for Zend Framework and Zend Queue using AMQP php library

You can use this class as Zend_Queue adapter for RabbitMQ queue system using AMQP php library.

Download AMQP extension from here

To compile and install follow this link

There are three steps

Note: Here i have used Custom_ namespace for Zend. You can change the namespace as per your choice.

Step 1 (Create Adapter class)

/**
* Class for using a Rabbitmq as a queue
*
* @category Custom
* @package Custom_Queue_Rabbitmq
* @subpackage Adapter
* @author Ritesh Jha
* @copyright Copyright (c) (http://mailrkj(at)gmail(dot)com)
*/

class Custom_Queue_Adapter_Rabbitmq extends Zend_Queue_Adapter_AdapterAbstract
{
/**
* @var object AMQP connection object
*/
protected $_cnn = array();

/**
*
* @var object AMQP excahnge object
*/
protected $_exchange = null;

/**
*
* @var object AMQP queue object
*/
protected $Queue = null;


/**
*
* @var object AMQP queue object
*/
protected $QueueFlag = AMQP_DURABLE;

/**
* Constructor
*
* @param array|Zend_Config $options
* options (host,port,login,password)
* @return AMQPConnection instance
*/
public function __construct($options, Zend_Queue $queue = null)
{
parent::__construct($options, $queue);

if(is_array($options))
{
try
{
$cnn = new AMQPConnection($options);
$cnn->connect();

if(!$cnn->isConnected())
{
throw new Zend_Queue_Exception("Unable to connect RabbitMQ server");
}
else
{
$this->_cnn = $cnn;
$this->Queue = new AMQPQueue($this->_cnn);
}
}catch(Exception $e){throw new Zend_Queue_Exception($e->getMessage());}
}
else
{
throw new Zend_Queue_Exception("The options must be an associative array of host,port,login, password ...");
}
}

/**
* Get AMQPConnection object
* @return object
*/
public function getConnection()
{
return $this->_cnn;
}


/**
* Set exchange for sending message to queue
* @param string $name
* @param string $type (AMQP_EX_TYPE_DIRECT, AMQP_EX_TYPE_FANOUT, AMQP_EX_TYPE_TOPIC or AMQP_EX_TYPE_HEADER)
* @param int $flags (AMQP_PASSIVE, AMQP_DURABLE, AMQP_AUTODELETE)
* @return boolean
*/
public function setExchange($exchange , $routingKey = "*", $type = AMQP_EX_TYPE_DIRECT, $flags = AMQP_DURABLE)
{
if($exchange instanceof Custom_Queue_Exchange)
{
$this->_exchange = $exchange;
}
else
{
$exchange = new Custom_Queue_Exchange($this->_cnn, $exchange , $type, $flags);
$this->_exchange = $exchange;
}
$this->setRoutingKey($routingKey);

return $exchange;
}

/**
* Set routing key for queu
* @param string $routing_key
* @param Custom_Queue $queue
* @return bool
*/
public function setRoutingKey($routingKey, Custom_Queue $queue = null)
{
if($queue)
$queueName = $queue->getName ();
else
$queueName = $this->_queue->getName();

return $this->_exchange->bind($queueName, $routingKey);
}

/**
* get AMQPQueue object
* @return
*/
public function setQueueFlag($flag)
{
return $this->QueueFlag = $flag;
}

/**
* create queue
* @param $name
* @param $timeout
*/
public function create($name, $timeout=null)
{
return $this->Queue->declare($name, $this->QueueFlag);
}

/**
* delete queue
* @param $name
* @param $timeout
*/
public function delete($name)
{
return $this->Queue->delete($name);
}

/**
* Publish message to queue
* @param mixed $message (array or string)
* @param Custom_Queue $queue
* @return boolean
*/
public function send($message, Zend_Queue $queue = null)
{
if(is_array($message))
{
$message = Zend_Json_Encoder::encode($message);
}

if($queue)
$routingKey = $queue->getOption('routingKey');
else
$routingKey = $this->_queue->getOption('routingKey');

if($this->_exchange)
{
return $this->_exchange->publish($message, $routingKey, AMQP_MANDATORY, array('delivery_mode' => 2));
}
else
{
throw new Zend_Queue_Exception("Rabbitmq exchange not found");
}
}

/**
*
* @param array $options (min, max. ack)
* @param int $timeout
* @param Zend_Queue $queue
* @return
*/
public function receive($options = null, $timeout = null, Zend_Queue $queue = null)
{
$messages = $this->Queue->get();
return $messages;
}

public function getCapabilities(){
return array(
'create' => true,
'delete' => true,
'send' => true,
);
}

public function isExists($name){}
public function getQueues(){}
public function count(Zend_Queue $queue = null){}
public function deleteMessage(Zend_Queue_Message $message){}
}


Step 2(Create Exchange Class)


class Custom_Queue_Exchange extends AMQPExchange
{
public function __construct ( AMQPConnection $connection , $exchange_name , $type = AMQP_EX_TYPE_DIRECT, $flags = AMQP_AUTODELETE)
{
parent::__construct ($connection);
$this->declare($exchange_name, $type, $flags);
}
}


Step 3(Overwrite Zend_Queue Class)

/**
* Class for using a Rabbitmq as a queue
*
* @category Custom
* @package Custom_Queue
* @subpackage Adapter
* @author Ritesh Jha
* @copyright Copyright (c) (http://mailrkj(at)gmail(dot)com)
*/

class Custom_Queue extends Zend_Queue
{
var $_instance = null;

public function __construct($adapter, $options = array())
{
if($adapter instanceof Custom_Queue_Adapter_Rabbitmq)
{
parent::__construct($adapter, $options);

#declare new queue
$queueName = (array_key_exists('name', $options))?$options['name']:'queue';
if(array_key_exists('flag', $options)) $adapter->setQueueFlag ($options['flag']);
$queue = $adapter->create($queueName);
$this->_setName($queueName);
#declare exchange
$routingKey = (array_key_exists('routingKey', $options))?$options['routingKey']:'*';
$exchangeName = (array_key_exists('exchange', $options))?$options['exchange']:'exchange';
$ex = $adapter->setExchange($exchangeName, $routingKey);
$this->setOptions($options);
}
else
{
throw new Zend_Queue_Exception("Invalid Rabbitmq adapter");
}
}

/**
* Create a new queue
* @param string $name queue name
* @param int $flag A bitmask of any of the flags: AMQP_AUTODELETE, AMQP_PASSIVE, AMQP_DURABLE, AMQP_NOACK.
* @return int (message count)
*/
public function createQueue($name, $flag = AMQP_DURABLE)
{
$this->getAdapter()->setQueueFlag($flag);
parent::createQueue($name);
}


/**
* Delete a queue and all of it's messages
* Returns false if the queue is not delete, true if the queue deleted
* @param string $name queue name
* @return boolean
*/
public function deleteQueue($name)
{
return $this->getAdapter()->delete($name);
}

/**
* Send a message to the queue
*
* @param array|string $message message
* @return Zend_Queue_Message
* @throws Zend_Queue_Exception
*/
public function send($message)
{
return $this->getAdapter()->send($message);
}

/**
* Consume message
*
* @param array $options
* @param int $timeout
* @return array
*/
public function receive($options = null, $timeout = null)
{
return $this->getAdapter()->receive($options, $timeout);
}
}
Usage:

Sending Message
try{
$adapter = new Custom_Queue_Adapter_Rabbitmq($adapteroptions);
$queue = new Custom_Queue($adapter, $queueoptions);
$message = 'Testing queue';
if($queue->send($message))
echo "Message published \n";
else
echo "Not";

} catch (Exception $e){echo $e->getMessage(); }

Receive Message

try{
$adapter = new Custom_Queue_Adapter_Rabbitmq($adapteroptions);
$queue = new Custom_Queue($adapter, $queueoptions);
$messages = $queue->receive($options);
} catch (Exception $e){echo $e->getMessage(); }

7 comments:

  1. Ritesh,

    i refatored a bit your version. it is stored here:
    https://bitbucket.org/hlopetz/amqp_queue

    thanks for your coding!

    Andrey

    ReplyDelete
  2. Hlopetz:

    looks good, and i am glad that it helps you. As i have already written in note above(description section) that you can use your Namespace instead of Custom_.

    Ritesh

    ReplyDelete
  3. Hi RKJ im trying to make it work but it crashes when i make the call to Custom_Queue_Adapter_Rabbitmq and i havent been able to debug. Can you please post which values are u using for $adapterOptions ? thanks

    ReplyDelete
    Replies
    1. Hi Carlos,

      Actually in $adapterOptions you have to put AMQP credentials, the connection parameters are http://www.php.net/manual/en/amqpconnection.construct.php. Andrey refatored the code, you can find it here https://bitbucket.org/hlopetz/amqp_queue

      Ritesh

      Delete
  4. Thanks, actually it was an error between 32bit and 64bit that i have so the ampq.so wasn't loaded. Btw, your lib needs to be adjusted a bit for ampq 1 as the AMQPQueue doesnt receive a connection but receives a channel
    cheers and thanks for sharing

    ReplyDelete
    Replies
    1. glad that it works for you. didn't get time to review it. If you have implemented with new AMQP, then you can post that part here in comment section so that future visitor can use it.

      Delete
  5. I wrote the ZF2 integration for amqp, see: https://github.com/prolic/HumusAmqpModule.

    ReplyDelete