Created
June 19, 2024 14:11
-
-
Save odytrice/08bd3474cf91b0bedf6bf8f35b655e8d to your computer and use it in GitHub Desktop.
Bitcoin Indexer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Nomad.Bitcoin.Indexer.BitcoinActors | |
open System | |
open System.Linq | |
open Akka.FSharp | |
open Microsoft.Extensions.DependencyInjection | |
open Microsoft.Extensions.Logging | |
open Nomad.Bitcoin.Core.Contracts | |
open Nomad.Bitcoin.Core.Domain | |
open Nomad.Bitcoin.Core.Domain.Errors | |
open Nomad.Bitcoin.Core.Domain.Types | |
open FsToolkit.ErrorHandling | |
open Nomad.Bitcoin.Infrastructure.Database | |
open Nomad.Bitcoin.Core.Domain.Helpers | |
type Msg = | |
| StartIndex | |
type Context = | |
abstract member Index: IWriteIndexRepository with get | |
abstract member Node: INodeRepository with get | |
abstract member Logger: ILogger<Context> with get | |
abstract member Database: BitcoinDataContext with get | |
module Context = | |
let create (sp: IServiceProvider) = | |
{ | |
new Context with | |
member _.Index = sp.GetService<IWriteIndexRepository>() | |
member _.Node = sp.GetService<INodeRepository>() | |
member _.Database = sp.GetService<BitcoinDataContext>() | |
member _.Logger = sp.GetService<ILogger<Context>>() | |
} | |
let fetchOutput (indexRepo: IWriteIndexRepository) (input: TransactionInput) = | |
asyncResult { | |
match input with | |
| CoinBase _ -> return input, None | |
| TxInput txInput -> | |
let! output = indexRepo.GetOutputByIndex(txInput.TransactionId, txInput.OutputIndex) | |
return input, Some output | |
} | |
let indexTransaction (context: Context) (addressMap: Map<string, int64>) (blockHash: BlockHash, blockTime: DateTimeOffset) (transaction: TransactionDetails) = | |
asyncResult { | |
// Create All Input Indices | |
let inputIndices : InputIndex list = [ | |
for input in transaction.Inputs do | |
match input with | |
| CoinBase coinBaseDetails -> | |
yield { | |
OutputId = None | |
InputId = 0L | |
TransactionId = transaction.Id | |
Sequence = coinBaseDetails.Sequence | |
IsCoinBase = true | |
OutputTransactionId = None | |
OutputTransactionIndex = None | |
} | |
| TxInput txInputDetails -> | |
yield { | |
OutputTransactionIndex = txInputDetails.OutputIndex |> Some | |
OutputTransactionId = txInputDetails.TransactionId |> Some | |
OutputId = None | |
InputId = 0L | |
TransactionId = transaction.Id | |
Sequence = txInputDetails.Sequence | |
IsCoinBase = false | |
} | |
] | |
// Create Output Indices | |
let outputIndices: OutputIndex list = [ | |
for output in transaction.Outputs do | |
let addresses = output.ScriptPubKey |> Helpers.parseAddresses | |
yield { | |
OutputId = 0L | |
TransactionId = transaction.Hash | |
Index = output.Index | |
Value = decimal output.Value | |
ScriptPubKey = output.ScriptPubKey.Asm | |
Addresses = [ | |
for address in addresses do | |
// Locate Address From Address Map, so we can get the ID of the Address | |
let idOption = addressMap |> Map.tryFind address | |
if idOption.IsSome then | |
yield { | |
Id = idOption.Value | |
Value = address | |
Balance = 0M<Bitcoins> | |
TransactionCount = 0 | |
Date = blockTime | |
} | |
] | |
} | |
] | |
// Construct A Single Transaction | |
let indexTransaction: TransactionIndex = { | |
TransactionId = transaction.Id | |
BlockHash = blockHash | |
Inputs = inputIndices | |
Outputs = outputIndices | |
} | |
do! context.Index.AddTransaction indexTransaction | |
} | |
let indexTransactions (context: Context) (addressMap: Map<string, int64>) (blockDetails: BlockDetails) = | |
asyncResult { | |
let logger = context.Logger | |
logger.LogInformation("Saving Block Transactions") | |
// Get Already Indexed Transactions By Block | |
let! indexedTransactionIds = | |
blockDetails.Hash | |
|> context.Index.GetTransactionsByBlock | |
|> AsyncResult.map(List.map(fun t -> t.TransactionId)) | |
// Filter Pending Transactions | |
let pendingTransactions = | |
blockDetails.Transactions | |
|> List.filter(fun t -> not <| indexedTransactionIds.Contains(t.Id)) | |
for transaction in pendingTransactions do | |
let blockData = blockDetails.Hash, blockDetails.Time | |
do! indexTransaction context addressMap blockData transaction | |
} | |
/// Creates a map of addresses | |
let createAddressMap (index: IWriteIndexRepository) (blockTime: DateTimeOffset) (addresses: string list): Async<Result<Map<string, int64>,AppError>> = | |
asyncResult { | |
// Create a Set to remove Duplicates | |
let addressSet = Set addresses | |
// Fetch Existing Addresses from Index | |
let! existingAddresses = | |
addressSet | |
|> List.ofSeq | |
|> index.GetAddresses | |
// Create Address set using existing Addresses | |
let existingAddressSet = | |
existingAddresses | |
|> List.map _.Value | |
|> Set | |
// Extract new addresses and save them to the index | |
let newAddressSet = addressSet - existingAddressSet | |
let! newAddressRecords = | |
newAddressSet | |
|> Set.map (fun a -> { | |
AddressIndex.Id = 0L | |
AddressIndex.Value = a | |
AddressIndex.Date = blockTime | |
Balance = 0M<Bitcoins> | |
TransactionCount = 0 | |
}) | |
|> List.ofSeq | |
|> index.SaveAddresses | |
// Return existing and new addresses Combined with their Ids | |
return | |
newAddressRecords @ existingAddresses | |
|> List.map (fun address -> address.Value, address.Id) | |
|> Map | |
} | |
let indexBlock (sp: IServiceProvider) (height: int) = | |
asyncResult { | |
use scope = sp.CreateScope() | |
let context = Context.create scope.ServiceProvider | |
let logger = context.Logger | |
// Fetch Block Details | |
let! blockHash = context.Node.GetBlockHash height | |
logger.LogInformation("Fetching Block {Hash}", blockHash) | |
let! blockDetails = context.Node.GetBlockDetails blockHash | |
logger.LogInformation("Successfully Fetched Block") | |
// Extract All Addresses from the Block | |
let allAddresses = | |
blockDetails.Transactions | |
|> List.collect _.Outputs | |
|> List.map _.ScriptPubKey | |
|> List.collect Helpers.parseAddresses | |
// Create AddressMap or "AddressBook" | |
let! addressMap = | |
allAddresses | |
|> createAddressMap context.Index blockDetails.Time | |
logger.LogInformation("Saving Block") | |
let! existing = context.Index.GetBlock(blockDetails.Hash) | |
match existing with | |
| None -> do! context.Index.AddBlock(blockDetails) | |
| Some block -> logger.LogWarning("Block {Height} has already been indexed", block.Height) | |
do! indexTransactions context addressMap blockDetails | |
logger.LogInformation("Marking Block {Height} as Completed", blockDetails.Height) | |
do! context.Index.SetBlockStatus(blockDetails.Hash, IndexStatus.Complete); | |
} | |
let indexPendingBlocks (sp: IServiceProvider) (context: Context) = | |
asyncResult { | |
let! pendingBlocks = context.Index.GetPendingBlocks() | |
if pendingBlocks.Length > 0 then | |
context.Logger.LogInformation("Found {count} Pending Blocks", pendingBlocks.Length) | |
let! missingBlocks = context.Index.GetMissingBlocks() | |
if missingBlocks.Length > 0 then | |
context.Logger.LogInformation("Found {count} Missing Blocks", missingBlocks.Length) | |
return! | |
missingBlocks @ pendingBlocks | |
|> List.map(indexBlock sp) | |
|> List.sequenceAsyncResultM | |
} | |
let rootActor (sp: IServiceProvider) (mailbox:Actor<Msg>) = | |
let indexChain (context: Context) = | |
asyncResult { | |
let! chainInfo = context.Node.GetChainInfo() | |
let nodeBlockHeight = chainInfo.Blocks | |
let! indexBlockHeight = context.Index.GetBlockChainHeight() | |
let! pendingBlocks = indexPendingBlocks sp context | |
if nodeBlockHeight > indexBlockHeight then | |
context.Logger.LogInformation("Current Block Height {nodeBlockHeight}\n", nodeBlockHeight) | |
let batches = [ indexBlockHeight .. nodeBlockHeight ] |> List.batchesOf 10 | |
for batch in batches do | |
let startIndex , endIndex = batch.Head, batch |> List.last | |
context.Logger.LogInformation("Processing Batch {Start} .. {End}", startIndex, endIndex) | |
// Run Block Index in Parallel | |
let! results = | |
batch | |
|> List.map(indexBlock sp) | |
|> Async.Parallel | |
// If any fails, Collect the Errors into an Aggregate Error | |
do! results | |
|> List.ofArray | |
|> List.sequenceResultA | |
|> Result.mapError AggregateError | |
|> Result.ignore | |
context.Logger.LogInformation("Consolidating Detached Inputs") | |
do! context.Index.ConsolidateInputs() |> AsyncResult.ignore | |
let percentageCompleted = float endIndex / float nodeBlockHeight * 100.0 | |
context.Logger.LogInformation("Processing Complete for Batch. Overall Index is {percentageComplete:N4}% Complete\n\n", percentageCompleted) | |
return nodeBlockHeight - indexBlockHeight + pendingBlocks.Length | |
else | |
return pendingBlocks.Length | |
} | |
let rec loop () = | |
actor { | |
use scope = sp.CreateScope() | |
let! msg = mailbox.Receive() | |
let context = Context.create scope.ServiceProvider | |
match msg with | |
| StartIndex -> | |
let blocksIndexed = | |
indexChain context | |
|> Async.RunSynchronously | |
match blocksIndexed with | |
| Ok 0 -> context.Logger.LogInformation "No Blocks were processed, Index is up to date" | |
| Ok blocks -> context.Logger.LogInformation("{blocks} Blocks were processed", blocks) | |
| Error appError -> | |
let msg = describeError appError | |
context.Logger.LogError ("An Error Occurred indexing Blocks {msg}", msg) | |
return! loop() | |
} | |
loop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment