Created
September 21, 2012 19:38
-
-
Save zircote/3763446 to your computer and use it in GitHub Desktop.
A zend_log writer for AMQP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;;; AMQP Logging ;;; | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
resources.log.amqp.writerName = "Amqp" | |
resources.log.amqp.writerNamespace = "Log_Writer" | |
resources.log.amqp.writerParams.connection.server = "amqp://guest:guest@localhost:5672/" | |
;resources.log.amqp.writerParams.connection.options.insist = false | |
;resources.log.amqp.writerParams.connection.options.login_method = "auth_plain" | |
;resources.log.amqp.writerParams.connection.options.login_response = null | |
;resources.log.amqp.writerParams.connection.options.locale = "en_US" | |
;resources.log.amqp.writerParams.connection.options.connection_timeout = 3 | |
;resources.log.amqp.writerParams.connection.options.read_write_timeout = 3 | |
;resources.log.amqp.writerParams.connection.options.context = null | |
resources.log.amqp.writerParams.message.routing_key = "com.logging.%priorityName%" | |
;resources.log.amqp.writerParams.message.content_type = "application/json" | |
;resources.log.amqp.writerParams.message.content_encoding = "" | |
;resources.log.amqp.writerParams.message.application_headers[] = | |
;resources.log.amqp.writerParams.message.priority | |
;resources.log.amqp.writerParams.message.expiration | |
;resources.log.amqp.writerParams.connection.options.ssl_options.verify_peer = true ;bool | |
;resources.log.amqp.writerParams.connection.options.ssl_options.allow_self_signed = true; bool | |
;resources.log.amqp.writerParams.connection.options.ssl_options.cafile = "ca_file_name" ; string | |
;resources.log.amqp.writerParams.connection.options.ssl_options.capath = "/path/to/ca_file"; string | |
;resources.log.amqp.writerParams.connection.options.ssl_options.local_cert = "";string | |
;resources.log.amqp.writerParams.connection.options.ssl_options.passphrase = ""; string | |
;resources.log.amqp.writerParams.connection.options.ssl_options.CN_match = ""; string | |
;resources.log.amqp.writerParams.connection.options.ssl_options.verify_depth = 0; int | |
;resources.log.amqp.writerParams.connection.options.ssl_options.ciphers = ""; string | |
;resources.log.amqp.writerParams.connection.options.ssl_options.capture_peer_cert = true; bool | |
;resources.log.amqp.writerParams.connection.options.ssl_options.capture_peer_cert_chain = true; bool | |
;resources.log.amqp.writerParams.connection.options.ssl_options.SNI_enabled = true; bool | |
;resources.log.amqp.writerParams.connection.options.ssl_options.SNI_server_name = ""; string | |
resources.log.amqp.writerParams.exchange.name = "com.logging" | |
resources.log.amqp.writerParams.exchange.type = "fanout" | |
resources.log.amqp.writerParams.exchange.passive = false | |
resources.log.amqp.writerParams.exchange.durable = true | |
resources.log.amqp.writerParams.exchange.auto_delete = false | |
resources.log.amqp.filterName = "Priority" | |
resources.log.amqp.filterParams.priority = 7 | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;;; AMQP LOG MAP ;;; | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;resources.log.amqp.writerParams.documentMap.timestamp = 'timestamp' | |
;resources.log.amqp.writerParams.documentMap.message = 'message' | |
;resources.log.amqp.writerParams.documentMap.priority = 'priority' | |
;resources.log.amqp.writerParams.documentMap.priorityName = 'priorityName' | |
;resources.log.amqp.writerParams.documentMap.hostname = 'hostname' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/** | |
* @package | |
* @category | |
* @subcategory | |
*/ | |
/** | |
* @package | |
* @category | |
* @subcategory | |
*/ | |
class Log_Writer_Amqp extends Zend_Log_Writer_Abstract | |
{ | |
const EXCHANGE_FANOUT = 'fanout'; | |
const EXCHANGE_DIRECT = 'direct'; | |
const EXCHANGE_TOPIC = 'topic'; | |
/** | |
* @var AMQP\Connection | |
*/ | |
protected $amqpConnection; | |
/** | |
* @var AMQP\Channel | |
*/ | |
protected $amqpChannel; | |
/** | |
* @var array | |
*/ | |
protected $defaultOptions = array( | |
'connection' => array( | |
'server' => 'amqp://guest:guest@localhost:5672/', | |
'options' => array( | |
'insist' => false, | |
'login_method' => \AMQP\Connection::AMQP_AUTH_PLAIN, | |
'login_response' => null, | |
'locale' => 'en_US', | |
'connection_timeout' => 3, | |
'read_write_timeout' => 3, | |
'context' => null, | |
'ssl_options' => array() | |
) | |
), | |
'message' => array( | |
'routing_key' => __CLASS__ | |
), | |
'exchange' => array( | |
'name' => __CLASS__, | |
'type' => self::EXCHANGE_FANOUT, | |
'passive' => true, | |
'durable' => false, | |
'auto_delete' => true | |
), | |
'documentMap' => array( | |
'timestamp' => 'timestamp', | |
'message' => 'message', | |
'priority' => 'priority', | |
'priorityName' => 'priorityName', | |
'hostname' => 'hostname' | |
) | |
); | |
/** | |
* @var array | |
*/ | |
protected $options; | |
/** | |
* @param array $event | |
* | |
* @throws Zend_Log_Exception | |
*/ | |
protected function _write($event) | |
{ | |
new Zend_Log_Formatter_Simple(); | |
static $hostname; | |
if (!$hostname) { | |
$hostname = php_uname(); | |
} | |
$event['hostname'] = $hostname; | |
$event['timestamp'] = date(DATE_ISO8601); | |
$dataToInsert = array(); | |
foreach ($this->options['documentMap'] as $columnName => $fieldKey) { | |
$dataToInsert[$columnName] = $event[$fieldKey]; | |
} | |
$message = new AMQP\Message(json_encode($dataToInsert)); | |
$message->type = 'application/json'; | |
$message->delivery_mode = 2; | |
$this->getChannel() | |
->basicPublish( | |
$message, | |
$this->options['exchange']['name'], | |
$this->routingKey($dataToInsert, $this->options['message']['routing_key']) | |
); | |
} | |
protected function routingKey($event, $routingKey) | |
{ | |
foreach ($event as $name => $value) { | |
if ((is_object($value) && !method_exists($value, '__toString')) || is_array($value)) { | |
$value = gettype($value); | |
} | |
$routingKey = str_replace("%$name%", $value, $routingKey); | |
} | |
return strtolower($routingKey); | |
} | |
/** | |
* | |
*/ | |
public function __construct($options) | |
{ | |
$connection = array(); | |
if (isset($options['connection']['options']['ssl_options'])) { | |
$ssl_options = (array) $options['connection']['options']['ssl_options']; | |
unset($options['connection']['options']['ssl_options']); | |
} | |
if (isset($options['message'])) { | |
$messageOptions = (array) $options['message']; | |
unset($options['message']); | |
} | |
$connection['connection']['options'] = array_merge( | |
$this->defaultOptions['connection']['options'], | |
(array) @$options['connection']['options'] | |
); | |
unset($options['connection']['options']); | |
$connection['connection'] = array_merge( | |
$this->defaultOptions['connection'], | |
(array) @$connection['connection'] | |
); | |
unset($options['connection']); | |
$this->options = array_merge($this->defaultOptions, $options); | |
$this->options['connection'] = $connection['connection']; | |
$this->options['message'] = $messageOptions; | |
if (isset($ssl_options) && $ssl_options) { | |
$this->options['connection']['options']['ssl_options'] = $ssl_options; | |
} | |
} | |
/** | |
* @return AMQP\Connection | |
*/ | |
protected function getConnection() | |
{ | |
if (!$this->amqpConnection) { | |
$this->amqpConnection = new AMQP\Connection($this->options['connection']['server']); | |
} | |
return $this->amqpConnection; | |
} | |
/** | |
* @return AMQP\Channel | |
*/ | |
protected function getChannel() | |
{ | |
if (!$this->amqpChannel) { | |
$this->amqpChannel = $this->getConnection()->channel(); | |
$this->amqpChannel->exchangeDeclare( | |
$this->options['exchange']['name'], | |
$this->options['exchange']['type'], | |
$this->options['exchange']['passive'], | |
$this->options['exchange']['durable'], | |
$this->options['exchange']['auto_delete'] | |
); | |
} | |
return $this->amqpChannel; | |
} | |
/** | |
* @static | |
* | |
* @param array $config | |
* | |
* @return void|Zend_Log_FactoryInterface | |
*/ | |
public static function factory($config) | |
{ | |
$config = self::_parseConfig($config); | |
return new self($config); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment