Skip to content

Instantly share code, notes, and snippets.

@adamf321
Last active May 29, 2020 14:41
Show Gist options
  • Save adamf321/47fcc2e16ce0e6b36347b982dd2d9c50 to your computer and use it in GitHub Desktop.
Save adamf321/47fcc2e16ce0e6b36347b982dd2d9c50 to your computer and use it in GitHub Desktop.
Building a Simple Data Pipeline with Google App Engine
runtime: php73
env_variables:
GC_PROJECT: "nolte-metrics"
GC_BUCKET: "nolte-metrics.appspot.com"
ALERT_EMAIL: "[email protected]"
<?php
namespace Nolte\Metrics\DataPipeline;
use Google\Cloud\BigQuery\BigQueryClient;
use Google\Cloud\Core\ExponentialBackoff;
/**
* Represents a BigQuery connection.
*/
class BigQuery {
private $big_query;
/**
* Constructor
*/
public function __construct()
{
$this->big_query = new BigQueryClient();
}
/**
* Imports a file stored on GCS to BigQuery
*
* @param string $dataset The dataset to import into.
* @param string $table The table within the dataset.
* @param string $gcsUri The URI of the file on GCS.
* @return void
*/
public function importFile(string $dataset, string $table, string $gcsUri)
{
$table = $this->big_query->dataset( $dataset )->table( $table );
// create the import job
$loadConfig = $table->loadFromStorage($gcsUri)
->sourceFormat('NEWLINE_DELIMITED_JSON')
->writeDisposition('WRITE_APPEND');
$job = $table->runJob($loadConfig);
// poll the job until it is complete
$backoff = new ExponentialBackoff(10);
$backoff->execute(function () use ($job) {
print('Waiting for job to complete' . PHP_EOL);
$job->reload();
if ( !$job->isComplete() ) {
throw new Exception('Job has not yet completed', 500);
}
});
// check if the job has errors
if (isset($job->info()['status']['errorResult'])) {
$error = $job->info()['status']['errorResult']['message'];
printf('Error running job: %s' . PHP_EOL, $error);
throw new Exception('Error running job: %s' . PHP_EOL, 500);
}
print('Data imported successfully' . PHP_EOL);
}
}
<?php
namespace Nolte\Metrics\DataPipeline;
use Google\Cloud\Storage\StorageClient;
/**
* Represents a single bucket on GCS.
*/
class Bucket {
private $bucket;
/**
* Constructor
*/
public function __construct()
{
$storage = new StorageClient();
$this->bucket = $storage->bucket( getenv('GC_BUCKET') );
}
/**
* Takes a string and writes it to a GCS objecte with a given key.
*
* @param string $key The object key.
* @param string $content The content to write.
* @return void
*/
public function writeStringToGcs(string $key, string $content)
{
// Write the content to a tmp file
$tmp = tempnam(sys_get_temp_dir(), 'tempo');
file_put_contents($tmp, $content);
// Then uplod it to GCS
$file = fopen($tmp, 'r');
return $this->bucket->upload($file, [
'name' => $key,
'predefinedAcl' => 'projectPrivate',
]);
}
}
{
"require": {
"google/cloud-storage": "^1.6",
"slim/slim": "^3.0",
"google/cloud-bigquery": "^1.16",
"google/cloud-logging": "^1.20",
"google/cloud-secret-manager": "^1.0",
"wildbit/postmark-php": "^2.10"
},
"autoload": {
"psr-4": {
"Nolte\\Metrics\\DataPipeline\\": "src"
}
}
}
cron:
- description: "tempo worklogs import"
url: /tempo/worklogs
schedule: every day 00:01
- description: "tempo plans import"
url: /tempo/plans
schedule: every day 00:01
- description: "tempo accounts import"
url: /tempo/accounts
schedule: every day 00:01
<?php
function get_secret(string $name)
{
$secrets = new SecretManagerServiceClient();
$secret_name = $secrets->secretVersionName(getenv('GC_PROJECT'), $name, 'latest');
$secret = $secrets->accessSecretVersion($secret_name);
return $secret->getPayload()->getData();
}
<?php
namespace Nolte\Metrics\DataPipeline;
use Google\Cloud\Logging\LoggingClient;
use Postmark\PostmarkClient;
use Postmark\Models\PostmarkException;
/**
* A logger for logging to the GCP log, https://console.cloud.google.com/logs.
* Filter on logName="projects/nolte-metrics/logs/data-pipline" to see these logs.
*/
class Logger {
private $logger;
private $mail;
/**
* Constructor
*/
public function __construct()
{
$logging = new LoggingClient();
$this->logger = $logging->psrLogger( 'data-pipeline' );
$this->mail = new PostmarkClient( get_secret( 'POSTMARK_AUTH_TOKEN' ) );
}
/**
* Log an info message.
*
* @param string $msg The message to log.
* @return void
*/
public function info(string $msg)
{
$this->logger->info( $msg );
}
/**
* Log an error message. Also send an alert email.
*
* @param string $msg The message to log.
* @return void
*/
public function error(string $msg)
{
$this->mail->sendEmail(
'[email protected]',
getenv( 'ALERT_EMAIL' ),
'ERROR: The Data Pipeline Failed',
"The error message was: $msg"
);
$this->logger->error( $msg );
}
}
<?php
if ( getenv('GAE_ENV') !== false ) { // GAE_ENV is only set in App Engine.
$request_headers = getallheaders();
if( ! isset($request_headers['X-Appengine-Cron']) ) { // X-Appengine-Cron header is set by App Engine Cron, it cannot be spoofed.
return $response->withStatus(401);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment