Created
May 18, 2023 17:12
-
-
Save maltzsama/16671dea09c0ccc1ec8389ac32517532 to your computer and use it in GitHub Desktop.
Send message from pyspark to SQS Queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
import boto3 | |
# Configurar a sessão Spark | |
spark = SparkSession.builder.getOrCreate() | |
# Criar DataFrame com dados de exemplo | |
data = [("mensagem1",), ("mensagem2",), ("mensagem3",)] | |
df = spark.createDataFrame(data, ["message"]) | |
# Função para enviar mensagem para o SQS | |
def send_to_sqs(row): | |
# Inicializar o cliente do SQS | |
sqs = boto3.client('sqs', region_name='us-east-1') | |
# URL da fila do SQS | |
queue_url = 'URL_DA_FILA_DO_SQS' | |
# Extrair a mensagem do DataFrame | |
message = row["message"] | |
# Enviar a mensagem para o SQS | |
response = sqs.send_message( | |
QueueUrl=queue_url, | |
MessageBody=message | |
) | |
print(f"Mensagem '{message}' enviada com sucesso. ID da mensagem: {response['MessageId']}") | |
# Registrar a função UDF (User-Defined Function) | |
send_to_sqs_udf = spark.udf.register("send_to_sqs_udf", send_to_sqs) | |
# Aplicar a função UDF ao DataFrame | |
df.withColumn("result", send_to_sqs_udf(df)).show(truncate=False) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
import boto3 | |
# Configurar a sessão Spark | |
spark = SparkSession.builder.getOrCreate() | |
# Criar RDD com dados de exemplo | |
data = [("mensagem1",), ("mensagem2",), ("mensagem3",)] | |
rdd = spark.sparkContext.parallelize(data) | |
# Função para enviar mensagens em lote para o SQS | |
def send_batch_to_sqs(iter): | |
# Inicializar o cliente do SQS | |
sqs = boto3.client('sqs', region_name='us-east-1') | |
# URL da fila do SQS | |
queue_url = 'URL_DA_FILA_DO_SQS' | |
# Lista para armazenar as mensagens em lote | |
messages = [] | |
for item in iter: | |
# Extrair a mensagem do item do RDD | |
message = item[0] | |
# Adicionar a mensagem à lista em formato de lote | |
messages.append({ | |
'Id': str(len(messages)), # ID único para cada mensagem | |
'MessageBody': message | |
}) | |
# Enviar em lote quando o tamanho máximo de mensagens é alcançado (por exemplo, 10) | |
if len(messages) == 10: | |
response = sqs.send_message_batch( | |
QueueUrl=queue_url, | |
Entries=messages | |
) | |
print(f"Enviado lote de mensagens. Número de mensagens enviadas: {len(messages)}, Erros: {response.get('Failed', [])}") | |
# Limpar a lista de mensagens para o próximo lote | |
messages = [] | |
# Enviar as mensagens restantes que não formam um lote completo | |
if messages: | |
response = sqs.send_message_batch( | |
QueueUrl=queue_url, | |
Entries=messages | |
) | |
print(f"Enviado lote de mensagens restantes. Número de mensagens enviadas: {len(messages)}, Erros: {response.get('Failed', [])}") | |
# Enviar dados para o SQS usando foreachPartition() | |
rdd.foreachPartition(send_batch_to_sqs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment