Created
September 2, 2016 17:48
-
-
Save grayside/dd19c9c1a51a961773de67f83fe4bfd7 to your computer and use it in GitHub Desktop.
Webhook Migrate Non-blocking Subscriber Implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace Drupal\webhook_migrate\EventSubscriber; | |
use Drupal\webhook_migrate\MigrateRunner; | |
use Drupal\webhooks\EventSubscriber\NonBlockingSubscriberBase; | |
use Drupal\webhooks\Webhook; | |
use Drupal\webhooks\WebhookRelayQueue; | |
use Drupal\migrate\Plugin\MigrationInterface; | |
use Drupal\Core\Lock\LockBackendInterface; | |
use Drupal\Core\Logger\LoggerChannelFactoryInterface; | |
use Symfony\Component\HttpKernel\Event\PostResponseEvent; | |
/** | |
* Handles events for triggered migrate webhooks. | |
* | |
* This service is primarily responsible for fine-tuning processing estimates | |
* based on the presence of the idlist option, and submitting all migration | |
* options to the MigrateRunner service to be executed. | |
* | |
* In the future, this might be adjusted to a base class for different | |
* operations. | |
*/ | |
class InboundWebhookSubscriber extends NonBlockingSubscriberBase { | |
/** | |
* A constant used to derive an estimate for processing time. | |
* | |
* This is used by multiplying by the number of items, the result is treated | |
* as the time in seconds. | |
* | |
* @todo the value is currently arbitrary and should be refined. | |
*/ | |
const TIME_MULTIPLIER = 15; | |
/** | |
* Bridge options to migration execution. | |
* | |
* @var \Drupal\webhook_migrate\MigrateRunner | |
*/ | |
protected $migrateRunner; | |
/** | |
* {@inheritdoc} | |
*/ | |
public function __construct(LockBackendInterface $lock, LoggerChannelFactoryInterface $logger_factory, WebhookRelayQueue $webhook_relay_queue, MigrateRunner $migrate_runner) { | |
parent::__construct($lock, $logger_factory, $webhook_relay_queue); | |
$this->migrateRunner = $migrate_runner; | |
} | |
/** | |
* {@inheritdoc} | |
*/ | |
protected function processWebhook(Webhook $webhook, PostResponseEvent $event) { | |
$payload = $webhook->getPayload(); | |
if (isset($payload['options'])) { | |
// Gracefully handle scalar values supplied for list options. | |
foreach (['group', 'migration', 'idlist'] as $item) { | |
if (isset($options[$item]) && !is_array($options[$item])) { | |
$options[$item] = (array) $options[$item]; | |
} | |
} | |
$context = array_intersect_key($payload, ['id' => TRUE, 'callback_url' => TRUE]); | |
$result = $this->migrateRunner->import($payload['options'], $context); | |
// @see \Drupal\migrate\Plugin\MigrationInterface for other result states. | |
return $result == MigrationInterface::RESULT_COMPLETED; | |
} | |
// NonBlockingSubscriberBase::__construct() will log a notice-level entry | |
// that processing failed for general audit purposes. This warning indicates | |
// what might have gone wrong. | |
// | |
// @todo identify how to get this information sent along with the (future) | |
// callback report. | |
$this->logger->warning('Bad Request. Could not process event %event without migration options in payload. Webhook %uuid.', [ | |
'%event' => $webhook->getEvent(), | |
'%uuid' => $webhook->getUuid(), | |
]); | |
return FALSE; | |
} | |
/** | |
* {@inheritdoc} | |
*/ | |
protected function estimateProcessingTime(Webhook $webhook) { | |
$options = $webhook->getPayload()['options']; | |
// Allocate 15 seconds for each item when a limited set of items is | |
// indicated. This refines the estimate. | |
$count = []; | |
if (!empty($options['idlist'])) { | |
$count[] = count($options['idlist']); | |
} | |
if (!empty($options['limit'])) { | |
$count[] = $options['limit']; | |
} | |
if (!empty($count)) { | |
return max($count) * InboundWebhookSubscriber::TIME_MULTIPLIER; | |
} | |
// The default value is 5 minutes. | |
// @todo This also requires refinement to target a maximum time required. | |
return parent::estimateProcessingTime($webhook); | |
} | |
/** | |
* {@inheritdoc} | |
*/ | |
protected function supportsWebhook(Webhook $webhook) { | |
return $webhook->getEvent() == 'migrate:import:trigger'; | |
} | |
/** | |
* {@inheritdoc} | |
*/ | |
protected function getErrorPayload(Webhook $webhook, $status = 409, $message = 'Processing Error', $expire = NULL) { | |
$result = parent::getErrorPayload($webhook, $status, $message, $expire); | |
$id = $webhook->getPayload()['id']; | |
if (isset($id)) { | |
$result['id'] = $id; | |
} | |
return $result; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment