Created
April 10, 2019 12:38
-
-
Save igor-egorov/058dd929475370fda561730791a808a7 to your computer and use it in GitHub Desktop.
debuggable pending transactions storage
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright Soramitsu Co., Ltd. All Rights Reserved. | |
* SPDX-License-Identifier: Apache-2.0 | |
*/ | |
#include "pending_txs_storage/impl/pending_txs_storage_impl.hpp" | |
#include <sstream> | |
#include "interfaces/transaction.hpp" | |
#include "multi_sig_transactions/state/mst_state.hpp" | |
namespace iroha { | |
PendingTransactionStorageImpl::PendingTransactionStorageImpl( | |
StateObservable updated_batches, | |
BatchObservable prepared_batch, | |
BatchObservable expired_batch) { | |
updated_batches_subscription_ = | |
updated_batches.subscribe([this](const SharedState &batches) { | |
this->updatedBatchesHandler(batches); | |
}); | |
prepared_batch_subscription_ = | |
prepared_batch.subscribe([this](const SharedBatch &preparedBatch) { | |
this->removeBatch(preparedBatch); | |
}); | |
expired_batch_subscription_ = | |
expired_batch.subscribe([this](const SharedBatch &expiredBatch) { | |
this->removeBatch(expiredBatch); | |
}); | |
} | |
PendingTransactionStorageImpl::~PendingTransactionStorageImpl() { | |
updated_batches_subscription_.unsubscribe(); | |
prepared_batch_subscription_.unsubscribe(); | |
expired_batch_subscription_.unsubscribe(); | |
} | |
PendingTransactionStorageImpl::SharedTxsCollectionType | |
PendingTransactionStorageImpl::getPendingTransactions( | |
const AccountIdType &account_id) const { | |
printStorageState("getPendingTransactions"); | |
std::shared_lock<std::shared_timed_mutex> lock(mutex_); | |
auto creator_it = storage_.index.find(account_id); | |
if (storage_.index.end() != creator_it) { | |
auto &batch_hashes = creator_it->second; | |
SharedTxsCollectionType result; | |
auto &batches = storage_.batches; | |
for (const auto &batch_hash : batch_hashes) { | |
auto batch_it = batches.find(batch_hash); | |
if (batches.end() != batch_it) { | |
auto &txs = batch_it->second->transactions(); | |
result.insert(result.end(), txs.begin(), txs.end()); | |
} | |
} | |
return result; | |
} | |
return {}; | |
} | |
void PendingTransactionStorageImpl::printStorageState( | |
std::string comment) const { | |
std::shared_lock<std::shared_timed_mutex> lock(mutex_); | |
std::stringstream out; | |
auto splitter = "==============================="; | |
out << splitter << std::endl << comment << std::endl; | |
for (auto author : storage_.index) { | |
out << "Creator: " << author.first << std::endl; | |
for (auto hash : author.second) { | |
out << "\t" << hash.toString() << std::endl; | |
auto batch = storage_.batches.find(hash); | |
if (batch != storage_.batches.end()) { | |
out << "\tHas all signatures: " << batch->second->hasAllSignatures() | |
<< std::endl; | |
for (auto tx : batch->second->transactions()) { | |
out << "\t\t" << tx->toString() << std::endl; | |
} | |
} else { | |
out << "\tStorage is inconsistent state!" << std::endl; | |
} | |
} | |
} | |
out << splitter << std::endl; | |
std::cout << out.str() << std::endl; | |
} | |
std::set<PendingTransactionStorageImpl::AccountIdType> | |
PendingTransactionStorageImpl::batchCreators(const TransactionBatch &batch) { | |
std::set<AccountIdType> creators; | |
for (const auto &transaction : batch.transactions()) { | |
creators.insert(transaction->creatorAccountId()); | |
} | |
return creators; | |
} | |
void PendingTransactionStorageImpl::updatedBatchesHandler( | |
const SharedState &updated_batches) { | |
printStorageState("+ updatedBatchesHandler"); | |
{ | |
std::unique_lock<std::shared_timed_mutex> lock(mutex_); | |
updated_batches->iterateBatches([this](const auto &batch) { | |
auto hash = batch->reducedHash(); | |
auto it = storage_.batches.find(hash); | |
if (storage_.batches.end() == it) { | |
for (const auto &creator : batchCreators(*batch)) { | |
storage_.index[creator].insert(hash); | |
} | |
} | |
storage_.batches[hash] = batch; | |
}); | |
} | |
printStorageState("- updatedBatchesHandler"); | |
} | |
void PendingTransactionStorageImpl::removeBatch(const SharedBatch &batch) { | |
auto creators = batchCreators(*batch); | |
auto hash = batch->reducedHash(); | |
printStorageState("+ removeBatch"); | |
{ | |
std::unique_lock<std::shared_timed_mutex> lock(mutex_); | |
storage_.batches.erase(hash); | |
for (const auto &creator : creators) { | |
auto &index = storage_.index; | |
auto index_it = index.find(creator); | |
if (index.end() != index_it) { | |
auto &creator_set = index_it->second; | |
creator_set.erase(hash); | |
if (creator_set.empty()) { | |
index.erase(index_it); | |
} | |
} | |
} | |
} | |
printStorageState("- removeBatch"); | |
} | |
} // namespace iroha |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment