Skip to content

Instantly share code, notes, and snippets.

@pyr0hu
Last active January 3, 2024 16:11
Show Gist options
  • Save pyr0hu/7eb3642f032295c5505203753ce651cb to your computer and use it in GitHub Desktop.
Save pyr0hu/7eb3642f032295c5505203753ce651cb to your computer and use it in GitHub Desktop.
<?php
namespace App\Jobs;
use Illuminate\Queue\Jobs\JobName;
use Illuminate\Support\Facades\Storage;
class S3ExtractJob extends Job
{
public function __construct(
protected string $jsonPath,
protected string $wrappedJob
) {
}
public function handle()
{
$payload = json_decode(Storage::disk('sqs-s3')->read($this->jsonPath), true);
[$class, $method] = JobName::parse($payload['job']);
($this->job->getContainer()->make($class))->{$method}($this->job, $payload['data']);
}
public function displayName(): string
{
return $this->wrappedJob;
}
}
<?php
namespace App\Foundation\Queue;
use Aws\Sqs\SqsClient;
use Illuminate\Queue\Connectors\SqsConnector;
use Illuminate\Support\Arr;
class SqsWithS3Connector extends SqsConnector
{
public function connect(array $config)
{
$config = $this->getDefaultConfiguration($config);
if (! empty($config['key']) && ! empty($config['secret'])) {
$config['credentials'] = Arr::only($config, ['key', 'secret', 'token']);
}
return new SqsWithS3Queue(
new SqsClient(
Arr::except($config, ['token'])
),
$config['queue'],
$config['prefix'] ?? '',
$config['suffix'] ?? '',
$config['after_commit'] ?? null
);
}
}
<?php
namespace App\Foundation\Queue;
use App\Jobs\S3ExtractJob;
use Illuminate\Queue\SqsQueue;
use Illuminate\Support\Facades\Storage;
use Illuminate\Support\Str;
class SqsWithS3Queue extends SqsQueue
{
public function pushRaw($payload, $queue = null, array $options = [])
{
$path = $this->storePayloadOnS3($payload);
$data = json_decode($payload);
$payload = $this->createPayload(new S3ExtractJob($path, $data->displayName), $queue ?: $this->default);
return $this->sendMessageToQueue($payload, $queue);
}
protected function sendMessageToQueue($payload, $queue = null, int $delay = null)
{
$message = [
'QueueUrl' => $this->getQueue($queue),
'MessageBody' => $payload,
];
if (! is_null($delay)) {
$message['DelaySeconds'] = $delay;
}
return $this->sqs->sendMessage($message)->get('MessageId');
}
protected function storePayloadOnS3(string $payload): string
{
// Store payload in S3 and get the json file path.
// We will store the path in the queue and later on just read the json file.
$path = 'payloads/'.Str::uuid().'.json';
Storage::disk('sqs-s3')->put($path, $payload);
return $path;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment