Skip to content

Instantly share code, notes, and snippets.

@maimai-swap
Last active August 29, 2015 14:10
Show Gist options
  • Save maimai-swap/3c8bb1b753c42cf95d95 to your computer and use it in GitHub Desktop.
Save maimai-swap/3c8bb1b753c42cf95d95 to your computer and use it in GitHub Desktop.
app/models/IdmappingID.php
<?php
use Illuminate\Queue\Jobs\Job;
use Aws\S3\S3Client;
use Aws\DynamoDb\DynamoDbClient;
use Aws\DynamoDb\Model\BatchRequest\WriteRequestBatch;
use Aws\DynamoDb\Model\BatchRequest\PutRequest;
use Aws\DynamoDb\Enum\Type;
use Illuminate\Support\Facdes\Log;
class IdmappingID {
protected $s3_client;
/**
* @var DynamoDbClient
*/
protected $dynamdb_client;
function __construct()
{
}
public function fire(Job $job, array $data)
{
Log::info(get_class($this)." Parse Start.");
if (@$data["Records"]) {
Log::info("S3 Event.");
$region = $data["Records"][0]["awsRegion"];
$bucket = $data["Records"][0]["s3"]["bucket"]["name"];
$key = $data["Records"][0]["s3"]["object"]["key"];
$s3 = $this->getS3Client();
$s3->registerStreamWrapper();
$path = "s3://$bucket/$key";
if ( strpos($key,"idmapping_") > 0 ) {
Log::info("AdFrontLog Event Records. $region $path");
$bname_key = basename($key);
$save_path = storage_path()."/idmapping/$bname_key";
$result = $s3->getObject(array(
'Bucket' => $bucket,
'Key' => $key,
'SaveAs' => storage_path()."/idmapping/$bname_key"
));
$this->getDynamoDBClient();
$fp = gzopen($save_path,"r");
$batch_array = [];
$putBatch = WriteRequestBatch::factory($this->dynamdb_client);
while (($data = fgetcsv($fp, 0, "\t")) !== FALSE) {
$id = @$data[0];
$another_id = @$data[1];
$updated_at = @$data[3];
if ( empty($id) || empty($another_id) || empty($updated_at) ) {
continue;
}
$batch_array[] = compact($id,$another_id,$updated_at);
$putBatch->dd(new PutRequest(array(
'id' => array(Type::STRING => (string) $id),
'another_id' => array(Type::STRING => (string) $another_id),
'updated_at' => array(Type::STRING => (string) $updated_at),
), 'IDMapping'));
if ( count($batch_array) > 1000 ) {
$putBatch->flush();
unset($batch_array);
$batch_array = [];
Log::info("Count 1000 Flush");
}
}
gzclose($fp);
$putBatch->flush();
} else {
Log::info("S3 Event Skip.");
}
}
$job->delete();
Log::info(get_class($this)." Parse End.");
}
/**
* @return S3Client
*/
protected function getS3Client() {
$this->s3_client = S3Client::factory(array(
'profile' => 's3-idmapping',
'region' => 'ap-northeast-1',
));
return $this->s3_client;
}
/**
* @return DynamoDbClient
*/
protected function getDynamoDBClient() {
$this->dynamdb_client = DynamoDbClient::factory(array(
'profile' => 'ddd',
'region' => 'ap-northeast-1',
));
return $this->dynamdb_client;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment