Created June 19, 2024 14:11
Bitcoin Indexer
module Nomad.Bitcoin.Indexer.BitcoinActors
open System
open System.Linq
open Akka.FSharp
open Microsoft.Extensions.DependencyInjection
open Microsoft.Extensions.Logging
open Nomad.Bitcoin.Core.Contracts
open Nomad.Bitcoin.Core.Domain
open Nomad.Bitcoin.Core.Domain.Errors
open Nomad.Bitcoin.Core.Domain.Types
open FsToolkit.ErrorHandling
open Nomad.Bitcoin.Infrastructure.Database
open Nomad.Bitcoin.Core.Domain.Helpers
type Msg =
| StartIndex
type Context =
abstract member Index: IWriteIndexRepository with get
abstract member Node: INodeRepository with get
abstract member Logger: ILogger<Context> with get
abstract member Database: BitcoinDataContext with get
module Context =
let create (sp: IServiceProvider) =
new Context with
member _.Index = sp.GetService<IWriteIndexRepository>()
member _.Node = sp.GetService<INodeRepository>()
member _.Database = sp.GetService<BitcoinDataContext>()
member _.Logger = sp.GetService<ILogger<Context>>()
let fetchOutput (indexRepo: IWriteIndexRepository) (input: TransactionInput) =
asyncResult {
match input with
| CoinBase _ -> return input, None
| TxInput txInput ->
let! output = indexRepo.GetOutputByIndex(txInput.TransactionId, txInput.OutputIndex)
return input, Some output
let indexTransaction (context: Context) (addressMap: Map<string, int64>) (blockHash: BlockHash, blockTime: DateTimeOffset) (transaction: TransactionDetails) =
asyncResult {
// Create All Input Indices
let inputIndices : InputIndex list = [
for input in transaction.Inputs do
match input with
| CoinBase coinBaseDetails ->
yield {
OutputId = None
InputId = 0L
TransactionId = transaction.Id
Sequence = coinBaseDetails.Sequence
IsCoinBase = true
OutputTransactionId = None
OutputTransactionIndex = None
| TxInput txInputDetails ->
yield {
OutputTransactionIndex = txInputDetails.OutputIndex |> Some
OutputTransactionId = txInputDetails.TransactionId |> Some
OutputId = None
InputId = 0L
TransactionId = transaction.Id
Sequence = txInputDetails.Sequence
IsCoinBase = false
// Create Output Indices
let outputIndices: OutputIndex list = [
for output in transaction.Outputs do
let addresses = output.ScriptPubKey |> Helpers.parseAddresses
yield {
OutputId = 0L
TransactionId = transaction.Hash
Index = output.Index
Value = decimal output.Value
ScriptPubKey = output.ScriptPubKey.Asm
Addresses = [
for address in addresses do
// Locate Address From Address Map, so we can get the ID of the Address
let idOption = addressMap |> Map.tryFind address
if idOption.IsSome then
yield {
Id = idOption.Value
Value = address
Balance = 0M<Bitcoins>
TransactionCount = 0
Date = blockTime
// Construct A Single Transaction
let indexTransaction: TransactionIndex = {
TransactionId = transaction.Id
BlockHash = blockHash
Inputs = inputIndices
Outputs = outputIndices
do! context.Index.AddTransaction indexTransaction
let indexTransactions (context: Context) (addressMap: Map<string, int64>) (blockDetails: BlockDetails) =
asyncResult {
let logger = context.Logger
logger.LogInformation("Saving Block Transactions")
// Get Already Indexed Transactions By Block
let! indexedTransactionIds =
|> context.Index.GetTransactionsByBlock
|> t -> t.TransactionId))
// Filter Pending Transactions
let pendingTransactions =
|> List.filter(fun t -> not <| indexedTransactionIds.Contains(t.Id))
for transaction in pendingTransactions do
let blockData = blockDetails.Hash, blockDetails.Time
do! indexTransaction context addressMap blockData transaction
/// Creates a map of addresses
let createAddressMap (index: IWriteIndexRepository) (blockTime: DateTimeOffset) (addresses: string list): Async<Result<Map<string, int64>,AppError>> =
asyncResult {
// Create a Set to remove Duplicates
let addressSet = Set addresses
// Fetch Existing Addresses from Index
let! existingAddresses =
|> List.ofSeq
|> index.GetAddresses
// Create Address set using existing Addresses
let existingAddressSet =
|> _.Value
|> Set
// Extract new addresses and save them to the index
let newAddressSet = addressSet - existingAddressSet
let! newAddressRecords =
|> (fun a -> {
AddressIndex.Id = 0L
AddressIndex.Value = a
AddressIndex.Date = blockTime
Balance = 0M<Bitcoins>
TransactionCount = 0
|> List.ofSeq
|> index.SaveAddresses
// Return existing and new addresses Combined with their Ids
newAddressRecords @ existingAddresses
|> (fun address -> address.Value, address.Id)
|> Map
let indexBlock (sp: IServiceProvider) (height: int) =
asyncResult {
use scope = sp.CreateScope()
let context = Context.create scope.ServiceProvider
let logger = context.Logger
// Fetch Block Details
let! blockHash = context.Node.GetBlockHash height
logger.LogInformation("Fetching Block {Hash}", blockHash)
let! blockDetails = context.Node.GetBlockDetails blockHash
logger.LogInformation("Successfully Fetched Block")
// Extract All Addresses from the Block
let allAddresses =
|> List.collect _.Outputs
|> _.ScriptPubKey
|> List.collect Helpers.parseAddresses
// Create AddressMap or "AddressBook"
let! addressMap =
|> createAddressMap context.Index blockDetails.Time
logger.LogInformation("Saving Block")
let! existing = context.Index.GetBlock(blockDetails.Hash)
match existing with
| None -> do! context.Index.AddBlock(blockDetails)
| Some block -> logger.LogWarning("Block {Height} has already been indexed", block.Height)
do! indexTransactions context addressMap blockDetails
logger.LogInformation("Marking Block {Height} as Completed", blockDetails.Height)
do! context.Index.SetBlockStatus(blockDetails.Hash, IndexStatus.Complete);
let indexPendingBlocks (sp: IServiceProvider) (context: Context) =
asyncResult {
let! pendingBlocks = context.Index.GetPendingBlocks()
if pendingBlocks.Length > 0 then
context.Logger.LogInformation("Found {count} Pending Blocks", pendingBlocks.Length)
let! missingBlocks = context.Index.GetMissingBlocks()
if missingBlocks.Length > 0 then
context.Logger.LogInformation("Found {count} Missing Blocks", missingBlocks.Length)
missingBlocks @ pendingBlocks
|> sp)
|> List.sequenceAsyncResultM
let rootActor (sp: IServiceProvider) (mailbox:Actor<Msg>) =
let indexChain (context: Context) =
asyncResult {
let! chainInfo = context.Node.GetChainInfo()
let nodeBlockHeight = chainInfo.Blocks
let! indexBlockHeight = context.Index.GetBlockChainHeight()
let! pendingBlocks = indexPendingBlocks sp context
if nodeBlockHeight > indexBlockHeight then
context.Logger.LogInformation("Current Block Height {nodeBlockHeight}\n", nodeBlockHeight)
let batches = [ indexBlockHeight .. nodeBlockHeight ] |> List.batchesOf 10
for batch in batches do
let startIndex , endIndex = batch.Head, batch |> List.last
context.Logger.LogInformation("Processing Batch {Start} .. {End}", startIndex, endIndex)
// Run Block Index in Parallel
let! results =
|> sp)
|> Async.Parallel
// If any fails, Collect the Errors into an Aggregate Error
do! results
|> List.ofArray
|> List.sequenceResultA
|> Result.mapError AggregateError
|> Result.ignore
context.Logger.LogInformation("Consolidating Detached Inputs")
do! context.Index.ConsolidateInputs() |> AsyncResult.ignore
let percentageCompleted = float endIndex / float nodeBlockHeight * 100.0
context.Logger.LogInformation("Processing Complete for Batch. Overall Index is {percentageComplete:N4}% Complete\n\n", percentageCompleted)
return nodeBlockHeight - indexBlockHeight + pendingBlocks.Length
return pendingBlocks.Length
let rec loop () =
actor {
use scope = sp.CreateScope()
let! msg = mailbox.Receive()
let context = Context.create scope.ServiceProvider
match msg with
| StartIndex ->
let blocksIndexed =
indexChain context
|> Async.RunSynchronously
match blocksIndexed with
| Ok 0 -> context.Logger.LogInformation "No Blocks were processed, Index is up to date"
| Ok blocks -> context.Logger.LogInformation("{blocks} Blocks were processed", blocks)
| Error appError ->
let msg = describeError appError
context.Logger.LogError ("An Error Occurred indexing Blocks {msg}", msg)
return! loop()
