Last active
May 4, 2021 03:18
-
-
Save dangdennis/c045206b44b75bfba001b84c9ad5b060 to your computer and use it in GitHub Desktop.
Query Flow events concurrently
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"os" | |
"time" | |
"github.com/gin-gonic/gin" | |
"github.com/onflow/flow-go-sdk" | |
"github.com/onflow/flow-go-sdk/client" | |
"google.golang.org/grpc" | |
"gorm.io/gorm" | |
"github.com/NiftyNiche/o30/events-engine/database" | |
"github.com/NiftyNiche/o30/events-engine/schema" | |
) | |
func main() { | |
db := database.New(os.Getenv("SUPABASE_DB_HOST"), os.Getenv("SUPABASE_DB_PASSWORD")) | |
defer func() { | |
sqlDB, err := db.DB() | |
panicOnError(err) | |
sqlDB.Close() | |
}() | |
flowClient, err := client.New("127.0.0.1:3569", grpc.WithInsecure()) | |
panicOnError(err) | |
go func() { | |
InitEventEngine(db, flowClient) | |
}() | |
r := gin.Default() | |
r.GET("/ping", func(c *gin.Context) { | |
c.JSON(200, gin.H{ | |
"message": "pong", | |
}) | |
}) | |
r.Run(":8000") | |
} | |
// InitEventEngine starts up the listeners periodically to poll for new events | |
func InitEventEngine(db *gorm.DB, fc *client.Client) { | |
fmt.Println("Initiate events job") | |
maxConcurrency := 1 | |
increment := uint64(200) | |
eventJobs := make(chan EventJob, maxConcurrency) | |
ctx := context.Background() | |
go func() { | |
EnqueueJob(db, fc, ctx, eventJobs, increment) | |
}() | |
for j := range eventJobs { | |
go func(job EventJob) { | |
DequeueJob(db, fc, ctx, job) | |
}(j) | |
} | |
} | |
// EventJob is a job for events processing | |
type EventJob struct { | |
FromBlock uint64 | |
ToBlock uint64 | |
BlockCursor schema.BlockCursor | |
} | |
func EnqueueJob(db *gorm.DB, fc *client.Client, ctx context.Context, eventJobs chan<- EventJob, increment uint64) { | |
for { | |
var cursors []schema.BlockCursor | |
err := db.Where("active = true and is_queued = false").Find(&cursors).Error | |
if err != nil { | |
fmt.Printf("Failed to get cursors %+v\n", err) | |
return | |
} | |
if len(cursors) == 0 { | |
fmt.Println("No active cursors to process") | |
return | |
} | |
latestBlock, err := fc.GetLatestBlock(ctx, true) | |
if err != nil { | |
fmt.Printf("Failed to latest block %+v\n", err) | |
return | |
} | |
fmt.Printf("Latest block height is %d\n", latestBlock.Height) | |
// collect ids to toggle queue status | |
var queuedCursorIDs []uint | |
for _, c := range cursors { | |
if latestBlock.Height <= c.CurrentBlockHeight { | |
fmt.Printf("Cannot query from latest block height. block_cursor id #%d\n", c.BlockCursorID) | |
continue | |
} | |
fromBlock := c.CurrentBlockHeight + 1 | |
toBlock := c.CurrentBlockHeight + increment | |
if toBlock >= latestBlock.Height { | |
toBlock = latestBlock.Height | |
} | |
eventJobs <- EventJob{ | |
FromBlock: fromBlock, // start at the next block because the query is inclusive-end,, | |
ToBlock: toBlock, | |
BlockCursor: c, | |
} | |
queuedCursorIDs = append(queuedCursorIDs, c.BlockCursorID) | |
} | |
err = db.Table("block_cursors").Where("block_cursor_id IN ?", queuedCursorIDs).Updates(map[string]interface{}{"is_queued": true}).Error | |
if err != nil { | |
fmt.Printf("Failed to to set block_cursors.is_queued to true %+v", err) | |
return | |
} | |
fmt.Printf("%d jobs are enqueued\n", len(cursors)) | |
time.Sleep(2000 * time.Millisecond) | |
} | |
} | |
func DequeueJob(db *gorm.DB, fc *client.Client, ctx context.Context, job EventJob) { | |
dequeueSelf := func() error { | |
err := db.Table("block_cursors").Where("block_cursor_id = ?", job.BlockCursor.BlockCursorID).Updates(map[string]interface{}{"is_queued": false}).Error | |
if err != nil { | |
fmt.Printf("Failed to set block_cursor.is_queued to false %+v", err) | |
return err | |
} | |
return nil | |
} | |
fmt.Printf("%d-%d\n", job.FromBlock, job.ToBlock) | |
results, err := fc.GetEventsForHeightRange(ctx, client.EventRangeQuery{ | |
Type: job.BlockCursor.EventName, | |
StartHeight: job.FromBlock, // start at the next block because the query is inclusive-end | |
EndHeight: job.ToBlock, | |
}) | |
if err != nil { | |
fmt.Printf("Failed to query events %+v\n", err) | |
dequeueSelf() | |
return | |
} | |
for _, block := range results { | |
for _, event := range block.Events { | |
err = SaveFlowEvent(db, job.BlockCursor, block, event) | |
if err != nil { | |
fmt.Printf("Failed to save event %s %+v\n", event.ID(), err) | |
fmt.Printf("Failed event value %s", event.Value.String()) | |
} | |
} | |
} | |
job.BlockCursor.CurrentBlockHeight = job.ToBlock | |
err = db.Save(&job.BlockCursor).Error | |
if err != nil { | |
fmt.Printf("Failed to update block_cursor #%d %+v\n", job.BlockCursor.BlockCursorID, err) | |
} | |
err = dequeueSelf() | |
if err != nil { | |
fmt.Println("Job failed") | |
} | |
fmt.Println("Job complete") | |
} | |
func SaveFlowEvent(db *gorm.DB, cursor schema.BlockCursor, block client.BlockEvents, event flow.Event) (err error) { | |
return db.Create(&schema.Event{ | |
BlockHeight: block.Height, | |
EventName: cursor.EventName, | |
TransactionID: event.TransactionID.String(), | |
TransactionIndex: event.TransactionIndex, | |
EventIndex: event.EventIndex, | |
BlockCursorID: cursor.BlockCursorID, | |
Payload: event.Value.String(), | |
}).Error | |
} | |
func panicOnError(err error) { | |
if err != nil { | |
fmt.Println("err:", err.Error()) | |
panic(err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment