Skip to content

Instantly share code, notes, and snippets.

@cjbottaro
Created September 19, 2019 01:49
Show Gist options
  • Save cjbottaro/ad5c4ea3f99a3ff1aaa263f10d1d63ac to your computer and use it in GitHub Desktop.
Save cjbottaro/ad5c4ea3f99a3ff1aaa263f10d1d63ac to your computer and use it in GitHub Desktop.
Polls SQS for Textract messages and hands them off to our queuing system
defmodule SqsTextractPoller do
use Task, restart: :permanent
require Logger
def start_link(args) do
Task.start_link(__MODULE__, :run, args)
end
def run do
queue_url = Application.get_env(:app, :aws)[:textract_sqs_url]
Stream.repeatedly(fn -> receive_message(queue_url) end)
|> Stream.flat_map(fn response -> response.body.messages end)
|> Stream.map(&parse/1)
|> Enum.each(fn {payload, receipt_handle} ->
Document.TextractJob.perform_async([payload])
delete_message(queue_url, receipt_handle)
end)
end
def receive_message(queue_url) do
ExAws.SQS.receive_message(queue_url,
max_number_of_messages: 10,
wait_time_seconds: 10
) |> ExAws.request!
end
def delete_message(queue_url, receipt_handle) do
ExAws.SQS.delete_message(queue_url, receipt_handle)
|> ExAws.request!()
end
def parse(message) do
payload = message.body
|> Jason.decode!()
|> Map.get("Message")
|> Jason.decode!()
{payload, message.receipt_handle}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment