Last active
May 29, 2020 14:41
-
-
Save adamf321/47fcc2e16ce0e6b36347b982dd2d9c50 to your computer and use it in GitHub Desktop.
Building a Simple Data Pipeline with Google App Engine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
runtime: php73 | |
env_variables: | |
GC_PROJECT: "nolte-metrics" | |
GC_BUCKET: "nolte-metrics.appspot.com" | |
ALERT_EMAIL: "[email protected]" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace Nolte\Metrics\DataPipeline; | |
use Google\Cloud\BigQuery\BigQueryClient; | |
use Google\Cloud\Core\ExponentialBackoff; | |
/** | |
* Represents a BigQuery connection. | |
*/ | |
class BigQuery { | |
private $big_query; | |
/** | |
* Constructor | |
*/ | |
public function __construct() | |
{ | |
$this->big_query = new BigQueryClient(); | |
} | |
/** | |
* Imports a file stored on GCS to BigQuery | |
* | |
* @param string $dataset The dataset to import into. | |
* @param string $table The table within the dataset. | |
* @param string $gcsUri The URI of the file on GCS. | |
* @return void | |
*/ | |
public function importFile(string $dataset, string $table, string $gcsUri) | |
{ | |
$table = $this->big_query->dataset( $dataset )->table( $table ); | |
// create the import job | |
$loadConfig = $table->loadFromStorage($gcsUri) | |
->sourceFormat('NEWLINE_DELIMITED_JSON') | |
->writeDisposition('WRITE_APPEND'); | |
$job = $table->runJob($loadConfig); | |
// poll the job until it is complete | |
$backoff = new ExponentialBackoff(10); | |
$backoff->execute(function () use ($job) { | |
print('Waiting for job to complete' . PHP_EOL); | |
$job->reload(); | |
if ( !$job->isComplete() ) { | |
throw new Exception('Job has not yet completed', 500); | |
} | |
}); | |
// check if the job has errors | |
if (isset($job->info()['status']['errorResult'])) { | |
$error = $job->info()['status']['errorResult']['message']; | |
printf('Error running job: %s' . PHP_EOL, $error); | |
throw new Exception('Error running job: %s' . PHP_EOL, 500); | |
} | |
print('Data imported successfully' . PHP_EOL); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace Nolte\Metrics\DataPipeline; | |
use Google\Cloud\Storage\StorageClient; | |
/** | |
* Represents a single bucket on GCS. | |
*/ | |
class Bucket { | |
private $bucket; | |
/** | |
* Constructor | |
*/ | |
public function __construct() | |
{ | |
$storage = new StorageClient(); | |
$this->bucket = $storage->bucket( getenv('GC_BUCKET') ); | |
} | |
/** | |
* Takes a string and writes it to a GCS objecte with a given key. | |
* | |
* @param string $key The object key. | |
* @param string $content The content to write. | |
* @return void | |
*/ | |
public function writeStringToGcs(string $key, string $content) | |
{ | |
// Write the content to a tmp file | |
$tmp = tempnam(sys_get_temp_dir(), 'tempo'); | |
file_put_contents($tmp, $content); | |
// Then uplod it to GCS | |
$file = fopen($tmp, 'r'); | |
return $this->bucket->upload($file, [ | |
'name' => $key, | |
'predefinedAcl' => 'projectPrivate', | |
]); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"require": { | |
"google/cloud-storage": "^1.6", | |
"slim/slim": "^3.0", | |
"google/cloud-bigquery": "^1.16", | |
"google/cloud-logging": "^1.20", | |
"google/cloud-secret-manager": "^1.0", | |
"wildbit/postmark-php": "^2.10" | |
}, | |
"autoload": { | |
"psr-4": { | |
"Nolte\\Metrics\\DataPipeline\\": "src" | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cron: | |
- description: "tempo worklogs import" | |
url: /tempo/worklogs | |
schedule: every day 00:01 | |
- description: "tempo plans import" | |
url: /tempo/plans | |
schedule: every day 00:01 | |
- description: "tempo accounts import" | |
url: /tempo/accounts | |
schedule: every day 00:01 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
function get_secret(string $name) | |
{ | |
$secrets = new SecretManagerServiceClient(); | |
$secret_name = $secrets->secretVersionName(getenv('GC_PROJECT'), $name, 'latest'); | |
$secret = $secrets->accessSecretVersion($secret_name); | |
return $secret->getPayload()->getData(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace Nolte\Metrics\DataPipeline; | |
use Google\Cloud\Logging\LoggingClient; | |
use Postmark\PostmarkClient; | |
use Postmark\Models\PostmarkException; | |
/** | |
* A logger for logging to the GCP log, https://console.cloud.google.com/logs. | |
* Filter on logName="projects/nolte-metrics/logs/data-pipline" to see these logs. | |
*/ | |
class Logger { | |
private $logger; | |
private $mail; | |
/** | |
* Constructor | |
*/ | |
public function __construct() | |
{ | |
$logging = new LoggingClient(); | |
$this->logger = $logging->psrLogger( 'data-pipeline' ); | |
$this->mail = new PostmarkClient( get_secret( 'POSTMARK_AUTH_TOKEN' ) ); | |
} | |
/** | |
* Log an info message. | |
* | |
* @param string $msg The message to log. | |
* @return void | |
*/ | |
public function info(string $msg) | |
{ | |
$this->logger->info( $msg ); | |
} | |
/** | |
* Log an error message. Also send an alert email. | |
* | |
* @param string $msg The message to log. | |
* @return void | |
*/ | |
public function error(string $msg) | |
{ | |
$this->mail->sendEmail( | |
'[email protected]', | |
getenv( 'ALERT_EMAIL' ), | |
'ERROR: The Data Pipeline Failed', | |
"The error message was: $msg" | |
); | |
$this->logger->error( $msg ); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
if ( getenv('GAE_ENV') !== false ) { // GAE_ENV is only set in App Engine. | |
$request_headers = getallheaders(); | |
if( ! isset($request_headers['X-Appengine-Cron']) ) { // X-Appengine-Cron header is set by App Engine Cron, it cannot be spoofed. | |
return $response->withStatus(401); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment