Created
December 20, 2023 01:56
-
-
Save LucasBadico/364006e4284f07b4fed36e30dcc8e37a to your computer and use it in GitHub Desktop.
GOLANG CONCURRENCY MODEL EXAMPLE
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// CODE EXAMPLE FROM YOUTUBE LIVE: Golang: Modelo de concorrencia goroutine + channels | |
// LINK: https://youtube.com/live/TRx3W4mLFf8 | |
// Lucas_Badico: Dev, mentor e apaixonado por Programacao | |
// SIGA ME EM TODAS AS REDES SOCIAIS | |
package main | |
import ( | |
"time" | |
"fmt" | |
"sync" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/dynamodb" | |
// "os" | |
) | |
type Item struct { | |
PK string `json:"PK"` | |
SK string `json:"SK"` | |
// Add other attributes here. | |
} | |
func buildBatchDeleteInput(items []Item, tableName string) ([]*dynamodb.BatchWriteItemInput, error) { | |
var batchInputs []*dynamodb.BatchWriteItemInput | |
const batchSize = 25 // Max items allowed in a single batch write request | |
// Calculate the number of batches required | |
totalItems := len(items) | |
numBatches := (totalItems + batchSize - 1) / batchSize | |
// Create and populate batch write requests | |
for i := 0; i < numBatches; i++ { | |
start := i * batchSize | |
end := (i + 1) * batchSize | |
if end > totalItems { | |
end = totalItems | |
} | |
batchItems := items[start:end] | |
deleteRequests := []*dynamodb.WriteRequest{} | |
for _, item := range batchItems { | |
// Extract the partition key and sort key values from the item. | |
partitionKey := item.PK | |
sortKey := item.SK | |
// Create a DeleteRequest for the item. | |
deleteRequest := &dynamodb.WriteRequest{ | |
DeleteRequest: &dynamodb.DeleteRequest{ | |
Key: map[string]*dynamodb.AttributeValue{ | |
"PK": { | |
S: aws.String(partitionKey), | |
}, | |
"SK": { | |
S: aws.String(sortKey), | |
}, | |
}, | |
}, | |
} | |
// Append the DeleteRequest to the list. | |
deleteRequests = append(deleteRequests, deleteRequest) | |
} | |
// Create the BatchWriteItemInput with the DeleteRequests for this batch. | |
input := &dynamodb.BatchWriteItemInput{ | |
RequestItems: map[string][]*dynamodb.WriteRequest{ | |
tableName: deleteRequests, | |
}, | |
} | |
batchInputs = append(batchInputs, input) | |
} | |
return batchInputs, nil | |
} | |
func scanWithLastEvaluatedKey(svc *dynamodb.DynamoDB, tableName string, lastEvaluatedKey map[string]*dynamodb.AttributeValue) ([]Item, map[string]*dynamodb.AttributeValue, error) { | |
var allItems []Item | |
// Prepare the ScanInput with the exclusive start key for pagination. | |
scanInput := &dynamodb.ScanInput{ | |
TableName: aws.String(tableName), | |
// Limit: aws.Int64(3), | |
} | |
if lastEvaluatedKey != nil { | |
scanInput.ExclusiveStartKey = lastEvaluatedKey | |
} | |
// Perform the DynamoDB Scan operation. | |
scanResult, err := svc.Scan(scanInput) | |
if err != nil { | |
return nil, nil, err | |
} | |
// Extract items from the scan result and append them to the allItems slice. | |
for _, item := range scanResult.Items { | |
// Dereference the pointers (*string) to get the actual string values. | |
pk := *item["PK"].S | |
sk := *item["SK"].S | |
allItems = append(allItems, Item{ | |
PK: pk, | |
SK: sk, | |
// Extract other attributes as needed. | |
}) | |
} | |
// Return the scanned items and the new last evaluated key. | |
return allItems, scanResult.LastEvaluatedKey, nil | |
} | |
func deleteBatch(svc *dynamodb.DynamoDB, input *dynamodb.BatchWriteItemInput, wg *sync.WaitGroup) { | |
defer wg.Done() | |
fmt.Println("DELETING...") | |
_, err := svc.BatchWriteItem(input) | |
if err != nil { | |
// Handle the error. | |
fmt.Println("ERROR:", err) | |
return | |
} | |
fmt.Println("DELETED!") | |
} | |
func process(svc *dynamodb.DynamoDB, tableName string) { | |
var lastEvaluatedKey map[string]*dynamodb.AttributeValue // Change the map type based on your actual data type | |
for { | |
// Call your function | |
fmt.Println("READING...") | |
// Call the scanWithLastEvaluatedKey function to get the scan items and the new last evaluated key. | |
items, newLastEvaluatedKey, err := scanWithLastEvaluatedKey(svc, tableName, lastEvaluatedKey) | |
if err != nil { | |
// Handle the error. | |
return | |
} | |
fmt.Println("READ") | |
// Build the BatchWriteItemInput with DeleteRequests using the items from the scan result. | |
batchInputs, err := buildBatchDeleteInput(items, tableName) | |
if err != nil { | |
// Handle the error. | |
return | |
} | |
var wg sync.WaitGroup | |
for _, input := range batchInputs { | |
wg.Add(1) | |
go deleteBatch(svc, input, &wg) | |
} | |
// Wait for all goroutines to finish | |
wg.Wait() | |
// Process the scanned items as needed. | |
// ... | |
// Update the last evaluated key for the next iteration. | |
lastEvaluatedKey = newLastEvaluatedKey | |
// Check if there are more items to scan. | |
if newLastEvaluatedKey == nil { | |
fmt.Println("FINISHED") | |
// No more items to scan, exit the loop. | |
break | |
} | |
} | |
} | |
func repeatAtEachHour(fn func(), duration time.Duration) { | |
for { | |
// Get the current time | |
now := time.Now() | |
// Calculate the duration until the next hour | |
nextHour := now.Truncate(time.Hour).Add(time.Hour) | |
durationUntilNextHour := nextHour.Sub(now) | |
// Sleep until the next hour | |
time.Sleep(durationUntilNextHour) | |
// Call the provided function | |
fn() | |
} | |
} | |
func main() { | |
// Create a new session using the AWS SDK for Go. | |
sess := session.Must(session.NewSessionWithOptions(session.Options{ | |
SharedConfigState: session.SharedConfigEnable, | |
})) | |
// Create a new DynamoDB service client. | |
svc := dynamodb.New(sess) | |
// Specify the table name from which you want to delete items. | |
tableName :="cache" //os.Getenv("DICT_CACHE_TABLE") | |
// Start the process function in a goroutine | |
go process(svc, tableName) | |
// Call the repeatAtEachHour function to repeat the process at each hour | |
repeatAtEachHour(func() { | |
process(svc, tableName) | |
}, time.Hour) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// CODE EXAMPLE FROM YOUTUBE LIVE: Golang: Modelo de concorrencia goroutine + channels | |
// LINK: https://youtube.com/live/TRx3W4mLFf8 | |
// Lucas_Badico: Dev, mentor e apaixonado por Programacao | |
// SIGA ME EM TODAS AS REDES SOCIAIS | |
package main | |
import ( | |
"fmt" | |
"time" | |
"sync" | |
// "github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/service/dynamodb" | |
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" | |
"github.com/google/uuid" | |
) | |
type Item struct { | |
PK string `json:"PK"` | |
SK string `json:"SK"` | |
Timestamp string `json:"Timestamp"` | |
} | |
func runIt(svc *dynamodb.DynamoDB, ch chan int, wg *sync.WaitGroup) { | |
// Inicializa um slice para armazenar os itens | |
var items []*dynamodb.WriteRequest | |
itemCount := 0 | |
tableName := "cache" | |
for i := 0; i < 25; i++ { | |
pk := uuid.New().String() | |
sk := uuid.New().String() | |
item := Item{ | |
PK: pk, | |
SK: sk, | |
Timestamp: time.Now().Format(time.RFC3339), | |
} | |
av, err := dynamodbattribute.MarshalMap(item) | |
if err != nil { | |
fmt.Println("Got error marshalling map:") | |
fmt.Println(err.Error()) | |
return | |
} | |
items = append(items, &dynamodb.WriteRequest{ | |
PutRequest: &dynamodb.PutRequest{ | |
Item: av, | |
}, | |
}) | |
itemCount++ | |
} | |
batchInput := &dynamodb.BatchWriteItemInput{ | |
RequestItems: map[string][]*dynamodb.WriteRequest{ | |
tableName: items, | |
}, | |
} | |
_, err := svc.BatchWriteItem(batchInput) | |
if err != nil { | |
fmt.Println("Got error calling BatchWriteItem:") | |
fmt.Println(err.Error()) | |
return | |
} | |
ch <- itemCount | |
time.Sleep(1 * time.Second) | |
wg.Done() | |
} | |
func finshEachRun(ch chan int) { | |
itemCount := 0 | |
runCount := 0 | |
for { | |
select { | |
case newItemsCount, ok := <- ch: | |
if !ok { | |
fmt.Println("Channel closed!") | |
return | |
} | |
fmt.Println("Received:", newItemsCount) | |
itemCount = itemCount + newItemsCount | |
runCount++ | |
fmt.Printf("Total items written: %d\n", itemCount) | |
fmt.Printf("Total runs: %d\n", runCount) | |
} | |
} | |
} | |
func main() { | |
sess := session.Must(session.NewSessionWithOptions(session.Options{ | |
SharedConfigState: session.SharedConfigEnable, | |
})) | |
svc := dynamodb.New(sess) | |
ch := make(chan int) | |
go finshEachRun(ch) | |
// Substitua pelo nome da sua tabela | |
for { | |
// | |
var wg sync.WaitGroup | |
for i := 0; i < 10; i++ { | |
wg.Add(1) | |
go runIt(svc, ch, &wg) | |
} | |
// Wait for all goroutines to finish | |
// wg.Add(1) | |
// go runIt(svc, ch, &wg) | |
wg.Wait() | |
// Espera um pouco antes de inserir o próximo lote | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment