Skip to content

Instantly share code, notes, and snippets.

@zircote
Created September 21, 2012 19:38
Show Gist options
  • Save zircote/3763446 to your computer and use it in GitHub Desktop.
Save zircote/3763446 to your computer and use it in GitHub Desktop.
A zend_log writer for AMQP
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; AMQP Logging ;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
resources.log.amqp.writerName = "Amqp"
resources.log.amqp.writerNamespace = "Log_Writer"
resources.log.amqp.writerParams.connection.server = "amqp://guest:guest@localhost:5672/"
;resources.log.amqp.writerParams.connection.options.insist = false
;resources.log.amqp.writerParams.connection.options.login_method = "auth_plain"
;resources.log.amqp.writerParams.connection.options.login_response = null
;resources.log.amqp.writerParams.connection.options.locale = "en_US"
;resources.log.amqp.writerParams.connection.options.connection_timeout = 3
;resources.log.amqp.writerParams.connection.options.read_write_timeout = 3
;resources.log.amqp.writerParams.connection.options.context = null
resources.log.amqp.writerParams.message.routing_key = "com.logging.%priorityName%"
;resources.log.amqp.writerParams.message.content_type = "application/json"
;resources.log.amqp.writerParams.message.content_encoding = ""
;resources.log.amqp.writerParams.message.application_headers[] =
;resources.log.amqp.writerParams.message.priority
;resources.log.amqp.writerParams.message.expiration
;resources.log.amqp.writerParams.connection.options.ssl_options.verify_peer = true ;bool
;resources.log.amqp.writerParams.connection.options.ssl_options.allow_self_signed = true; bool
;resources.log.amqp.writerParams.connection.options.ssl_options.cafile = "ca_file_name" ; string
;resources.log.amqp.writerParams.connection.options.ssl_options.capath = "/path/to/ca_file"; string
;resources.log.amqp.writerParams.connection.options.ssl_options.local_cert = "";string
;resources.log.amqp.writerParams.connection.options.ssl_options.passphrase = ""; string
;resources.log.amqp.writerParams.connection.options.ssl_options.CN_match = ""; string
;resources.log.amqp.writerParams.connection.options.ssl_options.verify_depth = 0; int
;resources.log.amqp.writerParams.connection.options.ssl_options.ciphers = ""; string
;resources.log.amqp.writerParams.connection.options.ssl_options.capture_peer_cert = true; bool
;resources.log.amqp.writerParams.connection.options.ssl_options.capture_peer_cert_chain = true; bool
;resources.log.amqp.writerParams.connection.options.ssl_options.SNI_enabled = true; bool
;resources.log.amqp.writerParams.connection.options.ssl_options.SNI_server_name = ""; string
resources.log.amqp.writerParams.exchange.name = "com.logging"
resources.log.amqp.writerParams.exchange.type = "fanout"
resources.log.amqp.writerParams.exchange.passive = false
resources.log.amqp.writerParams.exchange.durable = true
resources.log.amqp.writerParams.exchange.auto_delete = false
resources.log.amqp.filterName = "Priority"
resources.log.amqp.filterParams.priority = 7
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; AMQP LOG MAP ;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;resources.log.amqp.writerParams.documentMap.timestamp = 'timestamp'
;resources.log.amqp.writerParams.documentMap.message = 'message'
;resources.log.amqp.writerParams.documentMap.priority = 'priority'
;resources.log.amqp.writerParams.documentMap.priorityName = 'priorityName'
;resources.log.amqp.writerParams.documentMap.hostname = 'hostname'
<?php
/**
* @package
* @category
* @subcategory
*/
/**
* @package
* @category
* @subcategory
*/
class Log_Writer_Amqp extends Zend_Log_Writer_Abstract
{
const EXCHANGE_FANOUT = 'fanout';
const EXCHANGE_DIRECT = 'direct';
const EXCHANGE_TOPIC = 'topic';
/**
* @var AMQP\Connection
*/
protected $amqpConnection;
/**
* @var AMQP\Channel
*/
protected $amqpChannel;
/**
* @var array
*/
protected $defaultOptions = array(
'connection' => array(
'server' => 'amqp://guest:guest@localhost:5672/',
'options' => array(
'insist' => false,
'login_method' => \AMQP\Connection::AMQP_AUTH_PLAIN,
'login_response' => null,
'locale' => 'en_US',
'connection_timeout' => 3,
'read_write_timeout' => 3,
'context' => null,
'ssl_options' => array()
)
),
'message' => array(
'routing_key' => __CLASS__
),
'exchange' => array(
'name' => __CLASS__,
'type' => self::EXCHANGE_FANOUT,
'passive' => true,
'durable' => false,
'auto_delete' => true
),
'documentMap' => array(
'timestamp' => 'timestamp',
'message' => 'message',
'priority' => 'priority',
'priorityName' => 'priorityName',
'hostname' => 'hostname'
)
);
/**
* @var array
*/
protected $options;
/**
* @param array $event
*
* @throws Zend_Log_Exception
*/
protected function _write($event)
{
new Zend_Log_Formatter_Simple();
static $hostname;
if (!$hostname) {
$hostname = php_uname();
}
$event['hostname'] = $hostname;
$event['timestamp'] = date(DATE_ISO8601);
$dataToInsert = array();
foreach ($this->options['documentMap'] as $columnName => $fieldKey) {
$dataToInsert[$columnName] = $event[$fieldKey];
}
$message = new AMQP\Message(json_encode($dataToInsert));
$message->type = 'application/json';
$message->delivery_mode = 2;
$this->getChannel()
->basicPublish(
$message,
$this->options['exchange']['name'],
$this->routingKey($dataToInsert, $this->options['message']['routing_key'])
);
}
protected function routingKey($event, $routingKey)
{
foreach ($event as $name => $value) {
if ((is_object($value) && !method_exists($value, '__toString')) || is_array($value)) {
$value = gettype($value);
}
$routingKey = str_replace("%$name%", $value, $routingKey);
}
return strtolower($routingKey);
}
/**
*
*/
public function __construct($options)
{
$connection = array();
if (isset($options['connection']['options']['ssl_options'])) {
$ssl_options = (array) $options['connection']['options']['ssl_options'];
unset($options['connection']['options']['ssl_options']);
}
if (isset($options['message'])) {
$messageOptions = (array) $options['message'];
unset($options['message']);
}
$connection['connection']['options'] = array_merge(
$this->defaultOptions['connection']['options'],
(array) @$options['connection']['options']
);
unset($options['connection']['options']);
$connection['connection'] = array_merge(
$this->defaultOptions['connection'],
(array) @$connection['connection']
);
unset($options['connection']);
$this->options = array_merge($this->defaultOptions, $options);
$this->options['connection'] = $connection['connection'];
$this->options['message'] = $messageOptions;
if (isset($ssl_options) && $ssl_options) {
$this->options['connection']['options']['ssl_options'] = $ssl_options;
}
}
/**
* @return AMQP\Connection
*/
protected function getConnection()
{
if (!$this->amqpConnection) {
$this->amqpConnection = new AMQP\Connection($this->options['connection']['server']);
}
return $this->amqpConnection;
}
/**
* @return AMQP\Channel
*/
protected function getChannel()
{
if (!$this->amqpChannel) {
$this->amqpChannel = $this->getConnection()->channel();
$this->amqpChannel->exchangeDeclare(
$this->options['exchange']['name'],
$this->options['exchange']['type'],
$this->options['exchange']['passive'],
$this->options['exchange']['durable'],
$this->options['exchange']['auto_delete']
);
}
return $this->amqpChannel;
}
/**
* @static
*
* @param array $config
*
* @return void|Zend_Log_FactoryInterface
*/
public static function factory($config)
{
$config = self::_parseConfig($config);
return new self($config);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment