Last active
August 29, 2015 14:10
-
-
Save maimai-swap/3c8bb1b753c42cf95d95 to your computer and use it in GitHub Desktop.
app/models/IdmappingID.php
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
use Illuminate\Queue\Jobs\Job; | |
use Aws\S3\S3Client; | |
use Aws\DynamoDb\DynamoDbClient; | |
use Aws\DynamoDb\Model\BatchRequest\WriteRequestBatch; | |
use Aws\DynamoDb\Model\BatchRequest\PutRequest; | |
use Aws\DynamoDb\Enum\Type; | |
use Illuminate\Support\Facdes\Log; | |
class IdmappingID { | |
protected $s3_client; | |
/** | |
* @var DynamoDbClient | |
*/ | |
protected $dynamdb_client; | |
function __construct() | |
{ | |
} | |
public function fire(Job $job, array $data) | |
{ | |
Log::info(get_class($this)." Parse Start."); | |
if (@$data["Records"]) { | |
Log::info("S3 Event."); | |
$region = $data["Records"][0]["awsRegion"]; | |
$bucket = $data["Records"][0]["s3"]["bucket"]["name"]; | |
$key = $data["Records"][0]["s3"]["object"]["key"]; | |
$s3 = $this->getS3Client(); | |
$s3->registerStreamWrapper(); | |
$path = "s3://$bucket/$key"; | |
if ( strpos($key,"idmapping_") > 0 ) { | |
Log::info("AdFrontLog Event Records. $region $path"); | |
$bname_key = basename($key); | |
$save_path = storage_path()."/idmapping/$bname_key"; | |
$result = $s3->getObject(array( | |
'Bucket' => $bucket, | |
'Key' => $key, | |
'SaveAs' => storage_path()."/idmapping/$bname_key" | |
)); | |
$this->getDynamoDBClient(); | |
$fp = gzopen($save_path,"r"); | |
$batch_array = []; | |
$putBatch = WriteRequestBatch::factory($this->dynamdb_client); | |
while (($data = fgetcsv($fp, 0, "\t")) !== FALSE) { | |
$id = @$data[0]; | |
$another_id = @$data[1]; | |
$updated_at = @$data[3]; | |
if ( empty($id) || empty($another_id) || empty($updated_at) ) { | |
continue; | |
} | |
$batch_array[] = compact($id,$another_id,$updated_at); | |
$putBatch->dd(new PutRequest(array( | |
'id' => array(Type::STRING => (string) $id), | |
'another_id' => array(Type::STRING => (string) $another_id), | |
'updated_at' => array(Type::STRING => (string) $updated_at), | |
), 'IDMapping')); | |
if ( count($batch_array) > 1000 ) { | |
$putBatch->flush(); | |
unset($batch_array); | |
$batch_array = []; | |
Log::info("Count 1000 Flush"); | |
} | |
} | |
gzclose($fp); | |
$putBatch->flush(); | |
} else { | |
Log::info("S3 Event Skip."); | |
} | |
} | |
$job->delete(); | |
Log::info(get_class($this)." Parse End."); | |
} | |
/** | |
* @return S3Client | |
*/ | |
protected function getS3Client() { | |
$this->s3_client = S3Client::factory(array( | |
'profile' => 's3-idmapping', | |
'region' => 'ap-northeast-1', | |
)); | |
return $this->s3_client; | |
} | |
/** | |
* @return DynamoDbClient | |
*/ | |
protected function getDynamoDBClient() { | |
$this->dynamdb_client = DynamoDbClient::factory(array( | |
'profile' => 'ddd', | |
'region' => 'ap-northeast-1', | |
)); | |
return $this->dynamdb_client; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment