Skip to content

Instantly share code, notes, and snippets.

@maltzsama
Created May 18, 2023 17:12
Show Gist options
  • Save maltzsama/16671dea09c0ccc1ec8389ac32517532 to your computer and use it in GitHub Desktop.
Save maltzsama/16671dea09c0ccc1ec8389ac32517532 to your computer and use it in GitHub Desktop.
Send message from pyspark to SQS Queue
from pyspark.sql import SparkSession
import boto3
# Configurar a sessão Spark
spark = SparkSession.builder.getOrCreate()
# Criar DataFrame com dados de exemplo
data = [("mensagem1",), ("mensagem2",), ("mensagem3",)]
df = spark.createDataFrame(data, ["message"])
# Função para enviar mensagem para o SQS
def send_to_sqs(row):
# Inicializar o cliente do SQS
sqs = boto3.client('sqs', region_name='us-east-1')
# URL da fila do SQS
queue_url = 'URL_DA_FILA_DO_SQS'
# Extrair a mensagem do DataFrame
message = row["message"]
# Enviar a mensagem para o SQS
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=message
)
print(f"Mensagem '{message}' enviada com sucesso. ID da mensagem: {response['MessageId']}")
# Registrar a função UDF (User-Defined Function)
send_to_sqs_udf = spark.udf.register("send_to_sqs_udf", send_to_sqs)
# Aplicar a função UDF ao DataFrame
df.withColumn("result", send_to_sqs_udf(df)).show(truncate=False)
from pyspark.sql import SparkSession
import boto3
# Configurar a sessão Spark
spark = SparkSession.builder.getOrCreate()
# Criar RDD com dados de exemplo
data = [("mensagem1",), ("mensagem2",), ("mensagem3",)]
rdd = spark.sparkContext.parallelize(data)
# Função para enviar mensagens em lote para o SQS
def send_batch_to_sqs(iter):
# Inicializar o cliente do SQS
sqs = boto3.client('sqs', region_name='us-east-1')
# URL da fila do SQS
queue_url = 'URL_DA_FILA_DO_SQS'
# Lista para armazenar as mensagens em lote
messages = []
for item in iter:
# Extrair a mensagem do item do RDD
message = item[0]
# Adicionar a mensagem à lista em formato de lote
messages.append({
'Id': str(len(messages)), # ID único para cada mensagem
'MessageBody': message
})
# Enviar em lote quando o tamanho máximo de mensagens é alcançado (por exemplo, 10)
if len(messages) == 10:
response = sqs.send_message_batch(
QueueUrl=queue_url,
Entries=messages
)
print(f"Enviado lote de mensagens. Número de mensagens enviadas: {len(messages)}, Erros: {response.get('Failed', [])}")
# Limpar a lista de mensagens para o próximo lote
messages = []
# Enviar as mensagens restantes que não formam um lote completo
if messages:
response = sqs.send_message_batch(
QueueUrl=queue_url,
Entries=messages
)
print(f"Enviado lote de mensagens restantes. Número de mensagens enviadas: {len(messages)}, Erros: {response.get('Failed', [])}")
# Enviar dados para o SQS usando foreachPartition()
rdd.foreachPartition(send_batch_to_sqs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment