Skip to content

Instantly share code, notes, and snippets.

@acrosman
Created July 30, 2019 04:01
Show Gist options
  • Save acrosman/12555ca2f365592908c6020895c7691a to your computer and use it in GitHub Desktop.
Save acrosman/12555ca2f365592908c6020895c7691a to your computer and use it in GitHub Desktop.
A Drupal 8 batch job to load large amounts of data into a Pantheon web site. Relies on Batch Services sandbox module: https://www.drupal.org/sandbox/acrosman/3025562
<?php
namespace Drupal\example_pantheon_loader;
use Drupal\Core\Database\Connection;
use Drupal\batch_service_interface\AbstractBatchService;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\Core\StringTranslation\TranslationInterface;
use Drupal\file\Entity\File;
use Drupal\Core\File\FileSystem;
use Drupal\Core\Queue\QueueFactory;
/**
* Class DataLoaderBatchService.
*/
class DataLoaderBatchService extends AbstractBatchService {
/**
* Drupal\Core\Database\Connection definition.
*
* @var \Drupal\Core\Database\Connection
*/
protected $database;
/**
* Drupal\user\UserDataInterface definition.
*
* @var \Drupal\user\UserDataInterface
*/
protected $userData;
/**
* Drupal\Core\File\FileSystem definition.
*
* @var \Drupal\Core\File\FileSystem
*/
protected $fileSystem;
/**
* Drupal\Core\Queue\QueueFactory Defition.
*
* @var \Drupal\Core\Queue\QueueFactory
*/
protected $queueFactory;
/**
* Constructs a new UserUpdateBatchService object.
*/
public function __construct(
LoggerChannelFactoryInterface $logger_factory,
TranslationInterface $stringTranslation,
Connection $database_service,
FileSystem $file_system,
QueueFactory $queue_factory
) {
parent::__construct($logger_factory, $stringTranslation);
$this->database = $database_service;
$this->fileSystem = $file_system;
$this->queueFactory = $queue_factory;
}
/**
* {@inheritdoc}
*/
public static function getServiceId() {
return 'example_pantheon_loader.data';
}
/**
* Data is the values from the form, which should include the file reference.
*/
public function generateBatchJob($data = NULL) {
// Clear any tracking data.
$this->database->truncate('example_pantheon_loader_tracker')->execute();
// If you are processing a queue make sure there is a valid queue prepped.
// $queue = $this->queueFactory->get('example_pantheon_loader_remap');
// $queue->createQueue();
$fid = array_pop($data['file']);
$fileEntity = File::load($fid);
$ops = [];
if (empty($fileEntity)) {
$this->logger->error('Unable to load file data for processing.');
return [];
}
$filePath = $this->fileSystem->realpath($fileEntity->getFileUri());
$ops = ['processData' => [$filePath]];
return $this->prepBatchArray($this->t('Loading Data for Processing'), $this->t('Starting file processing...'), $ops);
}
/**
* Batch processing function to process large files.
*
* @param array $data
* Path to file.
* @param array $context
* Drupal batch context array.
*/
public function processData(array $data, array &$context) {
$filePos = 0;
$header = [];
if (isset($context['sandbox']['file_position'])) {
$filePos = $context['sandbox']['file_position'];
$header = $context['sandbox']['file_header'];
}
// Old-school file handling.
$path = array_pop($data);
$file = fopen($path, "r");
if (empty($file)) {
$this->logger->error('File lost during processing.');
$context['finished'] = 1;
$context['results']['success'] = FALSE;
return;
}
if (!feof($file)) {
if (empty($header)) {
$header = fgetcsv($file);
$context['finished'] = 0.00001;
}
else {
fseek($file, $filePos);
// Each pass we process 100 lines, if you have to do something complex
// you might want to reduce the run.
for ($i = 0; $i < 100; $i++) {
$row = fgetcsv($file);
if (!empty($row)) {
$data = array_combine($header, $row);
$member['timestamp'] = time();
$rowData = [
'col_one' => $data['field_name'],
'data' => serialize($data),
'timestamp' => time(),
];
$row_id = $this->database->insert('example_pantheon_loader_tracker')
->fields($rowData)
->execute();
// If you're setting up for a queue you include something like this.
// $queue = $this->queueFactory->get('example_pantheon_loader_remap');
// $queue->createItem($row_id);
}
else {
break;
}
}
$filePos = (float) ftell($file);
$context['finished'] = $filePos / filesize($path);
}
}
else {
$context['finished'] = 1;
}
$context['sandbox']['file_position'] = ftell($file);
$context['sandbox']['file_header'] = $header;
fclose($file);
}
/**
* {@inheritdoc}
*/
public function doFinishBatch($success, $results, $operations) {
if ($success) {
$message = $this->t('All provided data prepped.');
}
else {
$message = $this->t('An error occurred during processing.');
}
drupal_set_message($message);
$this->logger->info($message);
}
}
<?php
namespace Drupal\example_pantheon_loader\Form;
use Drupal\Core\Form\FormBase;
use Drupal\Core\Form\FormStateInterface;
use Drupal\Core\Database\Connection;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\example_pantheon_loader\DataLoaderBatchService;
/**
* Class PrepForm.
*/
class PrepForm extends FormBase {
/**
* Drupal\example_pantheon_loader\UserUpdateBatchService definition.
*
* @var \Drupal\example_pantheon_loader\DataLoaderBatchService
*/
protected $dataLoaderBatchService;
/**
* Drupal\Core\Database\Connection definition.
*
* @var \Drupal\Core\Database\Connection
*/
protected $database;
/**
* Constructs a new PrepForm object.
*/
public function __construct(
DataLoaderBatchService $data_loader_service,
Connection $database_service
) {
$this->dataLoaderBatchService = $data_loader_service;
$this->database = $database_service;
}
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container) {
return new static(
$container->get('example_pantheon_loader.users'),
$container->get('database')
);
}
/**
* {@inheritdoc}
*/
public function getFormId() {
return 'example_pantheon_loader_form';
}
/**
* {@inheritdoc}
*/
public function buildForm(array $form, FormStateInterface $form_state) {
$form['intro'] = [
'#type' => 'html_tag',
'#tag' => 'div',
'#value' => $this->t('This is example code requires additional support.'),
];
$batchRunners = $this->getBatchOptions();
$options = [];
foreach ($batchRunners as $runner => $details) {
$options[$runner] = $details['description'];
}
$form['file'] = [
'#type' => 'managed_file',
'#name' => 'data_file',
'#title' => $this->t('Data file'),
'#description' => $this->t('CSV format for this example.'),
'#upload_location' => 'private://example_pantheon_loader_data/',
'#upload_validators' => [
'file_validate_extensions' => ['csv'],
],
];
$form['submit'] = [
'#type' => 'submit',
'#value' => $this->t('Load Data'),
];
$form['truncate'] = [
'#type' => 'submit',
'#value' => $this->t('Truncate Table'),
];
return $form;
}
/**
* {@inheritdoc}
*/
public function validateForm(array &$form, FormStateInterface $form_state) {
parent::validateForm($form, $form_state);
}
/**
* {@inheritdoc}
*/
public function submitForm(array &$form, FormStateInterface $form_state) {
$input = $form_state->getUserInput();
if ($input['op'] == 'Truncate Tables') {
$this->truncateTables();
return;
}
$batch = $this->dataLoaderBatchService->generateBatchJob($form_state->getValues());
batch_set($batch);
}
/**
* Truncate the tracker table.
*/
private function truncateTables() {
$this->database->truncate('example_pantheon_loader_tracker')->execute();
drupal_set_message('Truncated Pantheon Load Tracker');
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment