32 #ifndef LEDGER_CORE_ABSTRACTBLOCKCHAINEXPLORERACCOUNTSYNCHRONIZER_H 33 #define LEDGER_CORE_ABSTRACTBLOCKCHAINEXPLORERACCOUNTSYNCHRONIZER_H 39 #include <api/Configuration.hpp> 40 #include <api/ConfigurationDefaults.hpp> 41 #include <async/Future.hpp> 42 #include <async/wait.h> 43 #include <collections/DynamicObject.hpp> 44 #include <debug/Benchmarker.h> 45 #include <events/ProgressNotifier.h> 46 #include <utils/Unit.hpp> 47 #include <utils/DateUtils.hpp> 48 #include <utils/DurationUtils.h> 49 #include <preferences/Preferences.hpp> 50 #include <wallet/common/AbstractWallet.hpp> 51 #include <wallet/common/database/BlockDatabaseHelper.h> 52 #include <wallet/common/database/AccountDatabaseHelper.h> 58 std::string blockHash;
65 template<
class Archive>
66 void serialize(Archive & archive) {
67 archive(blockHash, blockHeight);
72 uint32_t halfBatchSize;
73 std::vector<BlockchainExplorerAccountSynchronizationBatchSavedState> batches;
74 std::map<std::string, std::string> pendingTxsHash;
79 template<
class Archive>
80 void serialize(Archive & archive)
82 archive(halfBatchSize, batches, pendingTxsHash);
86 template<
typename Account,
typename AddressType,
typename Keychain,
typename Explorer>
90 std::shared_ptr<ProgressNotifier<Unit>> synchronizeAccount(
const std::shared_ptr<Account>& account) {
91 std::lock_guard<std::mutex> lock(_lock);
92 if (!_currentAccount) {
93 _currentAccount = account;
94 _notifier = std::make_shared<ProgressNotifier<Unit>>();
95 auto self = getSharedFromThis();
96 performSynchronization(account).onComplete(getSynchronizerContext(), [
self] (
const Try<Unit> &result) {
97 std::lock_guard<std::mutex> l(self->_lock);
98 if (result.isFailure()) {
99 self->_notifier->failure(result.getFailure());
101 self->_notifier->success(unit);
103 self->_notifier =
nullptr;
104 self->_currentAccount =
nullptr;
107 }
else if (account != _currentAccount) {
116 std::shared_ptr<Preferences> preferences;
117 std::shared_ptr<spdlog::logger>
logger;
118 std::chrono::system_clock::time_point startDate;
119 std::shared_ptr<AbstractWallet> wallet;
120 std::shared_ptr<DynamicObject> configuration;
121 uint32_t halfBatchSize;
122 std::shared_ptr<Keychain> keychain;
125 std::shared_ptr<Account> account;
126 std::map<std::string, std::string> transactionsToDrop;
131 int32_t halfBatchSize) {
132 if (savedState.hasValue() && savedState.getValue()
133 .halfBatchSize != halfBatchSize) {
135 block.blockHeight = 1U << 31U;
137 for (
auto &state : savedState.getValue()
139 if (state.blockHeight < block.blockHeight) {
143 auto newBatchCount = (savedState.getValue()
144 .batches.size() * savedState.getValue()
145 .halfBatchSize) / halfBatchSize;
146 if ((savedState.getValue()
147 .batches.size() * savedState.getValue()
148 .halfBatchSize) / halfBatchSize != 0)
150 savedState.getValue()
153 savedState.getValue()
154 .halfBatchSize = (uint32_t) halfBatchSize;
155 for (
auto i = 0; i <= newBatchCount; i++) {
157 s.blockHash = block.blockHash;
158 s.blockHeight = block.blockHeight;
159 savedState.getValue().batches.push_back(s);
161 }
else if (savedState.isEmpty()) {
164 savedState.getValue()
165 .halfBatchSize = (uint32_t) halfBatchSize;
169 Future<Unit> performSynchronization(
const std::shared_ptr<Account> &account) {
170 auto buddy = std::make_shared<SynchronizationBuddy>();
172 buddy->account = account;
173 buddy->preferences = std::static_pointer_cast<
AbstractAccount>(account)->getInternalPreferences()
175 "AbstractBlockchainExplorerAccountSynchronizer");
176 buddy->logger = account->logger();
177 buddy->startDate = DateUtils::now();
178 buddy->wallet = account->getWallet();
179 buddy->configuration = std::static_pointer_cast<
AbstractAccount>(account)->getWallet()->getConfig();
180 buddy->halfBatchSize = (uint32_t) buddy->configuration
183 buddy->keychain = account->getKeychain();
184 buddy->savedState = buddy->preferences
185 ->template getObject<BlockchainExplorerAccountSynchronizationSavedState>(
"state");
187 ->info(
"Starting synchronization for account#{} ({}) of wallet {} at {}",
189 account->getKeychain()->getRestoreKey(),
190 account->getWallet()->getName(), DateUtils::toJSON(buddy->startDate));
193 soci::session sql(buddy->wallet->getDatabase()->getPool());
194 if (buddy->savedState.nonEmpty()) {
197 auto sortedBatches = buddy->savedState.getValue().batches;
200 return lhs.blockHeight < rhs.blockHeight;
203 auto currencyName = buddy->wallet->getCurrency().name;
206 uint64_t deepestFailedBlockHeight = 0;
207 while (index < sortedBatches.size() && !BlockDatabaseHelper::blockExists(sql, sortedBatches[index].blockHash, currencyName)) {
208 deepestFailedBlockHeight = sortedBatches[index].blockHeight;
213 if (deepestFailedBlockHeight > 0) {
215 auto previousBlock = AccountDatabaseHelper::getLastBlockWithOperations(sql, buddy->account->getAccountUid());
216 for (
auto& batch : buddy->savedState.getValue().batches) {
217 if (batch.blockHeight >= deepestFailedBlockHeight) {
218 batch.blockHeight = previousBlock.nonEmpty() ? (uint32_t)previousBlock.getValue().height : 0;
219 batch.blockHash = previousBlock.nonEmpty() ? previousBlock.getValue().blockHash :
"";
225 initializeSavedState(buddy->savedState, buddy->halfBatchSize);
227 updateTransactionsToDrop(sql, buddy, account->getAccountUid());
229 updateCurrentBlock(buddy, account->getContext());
231 auto self = getSharedFromThis();
232 return _explorer->startSession().template map<Unit>(account->getContext(), [buddy] (
void *
const& t) ->
Unit {
233 buddy->logger->info(
"Synchronization token obtained");
236 }).
template flatMap<Unit>(account->getContext(), [buddy,
self] (
const Unit&) {
237 return self->synchronizeBatches(0, buddy);
238 }).
template flatMap<Unit>(account->getContext(), [
self, buddy] (
const Unit&) {
239 return self->_explorer->killSession(buddy->token.getValue());
240 }).
template map<Unit>(ImmediateExecutionContext::INSTANCE, [
self, buddy] (
const Unit&) {
241 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
242 (DateUtils::now() - buddy->startDate.time_since_epoch()).time_since_epoch());
243 buddy->logger->info(
"End synchronization for account#{} of wallet {} in {}", buddy->account->getIndex(),
244 buddy->account->getWallet()->getName(), DurationUtils::formatDuration(duration));
247 soci::session sql(buddy->wallet->getDatabase()->getPool());
248 for (
auto& tx : buddy->transactionsToDrop) {
250 auto it = buddy->savedState.getValue().pendingTxsHash.find(tx.first);
251 if (it == buddy->savedState.getValue().pendingTxsHash.end()) {
252 buddy->logger->info(
"Drop transaction {}", tx.first);
253 buddy->logger->info(
"Deleting operation from DB {}", tx.second);
255 sql <<
"DELETE FROM operations WHERE uid = :uid", soci::use(tx.second);
259 self->_currentAccount =
nullptr;
261 }).recover(ImmediateExecutionContext::INSTANCE, [buddy] (
const Exception& ex) ->
Unit {
262 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
263 (DateUtils::now() - buddy->startDate.time_since_epoch()).time_since_epoch());
264 buddy->logger->error(
"Error during during synchronization for account#{} of wallet {} in {} ms", buddy->account->getIndex(),
265 buddy->account->getWallet()->getName(), duration.count());
266 buddy->logger->error(
"Due to {}, {}", api::to_string(ex.getErrorCode()), ex.getMessage());
275 Future<Unit> synchronizeBatches(uint32_t currentBatchIndex,
276 std::shared_ptr<SynchronizationBuddy> buddy) {
277 buddy->logger->info(
"SYNC BATCHES");
281 auto done = currentBatchIndex >= buddy->savedState.getValue().batches.size() - 1;
282 if (currentBatchIndex >= buddy->savedState.getValue().batches.size()) {
286 auto self = getSharedFromThis();
287 auto& batchState = buddy->savedState.getValue().batches[currentBatchIndex];
289 auto benchmark = std::make_shared<Benchmarker>(fmt::format(
"Synchronize batch {}", currentBatchIndex), buddy->logger);
291 return synchronizeBatch(currentBatchIndex, buddy).template flatMap<Unit>(buddy->account->getContext(), [=] (
const bool& hadTransactions) ->
Future<Unit> {
294 buddy->preferences->editor()->template putObject<BlockchainExplorerAccountSynchronizationSavedState>(
"state", buddy->savedState.getValue())->commit();
298 auto discoveredAddresses = currentBatchIndex * buddy->halfBatchSize;
300 if (hasMultipleAddresses && (!done || (done && hadTransactions) || lastDiscoverableAddress > discoveredAddresses)) {
301 return self->synchronizeBatches(currentBatchIndex + 1, buddy);
305 }).recoverWith(ImmediateExecutionContext::INSTANCE, [=] (
const Exception &exception) ->
Future<Unit> {
306 buddy->logger->info(
"Recovering from failing synchronization : {}", exception.getMessage());
310 buddy->savedState.nonEmpty()) {
311 buddy->logger->info(
"Recovering from reorganization");
314 auto& failedBatch = buddy->savedState.getValue().batches[currentBatchIndex];
315 auto failedBlockHeight = failedBatch.blockHeight;
316 auto failedBlockHash = failedBatch.blockHash;
318 if (failedBlockHeight > 0) {
320 buddy->logger->info(
"Deleting blocks above block height: {}", failedBlockHeight);
322 soci::session sql(buddy->wallet->getDatabase()->getPool());
323 sql <<
"DELETE FROM blocks where height >= :failedBlockHeight", soci::use(failedBlockHeight);
326 auto lastBlock = BlockDatabaseHelper::getLastBlock(sql, buddy->wallet->getCurrency().name);
329 int64_t lastBlockHeight = 0;
330 std::string lastBlockHash;
331 if (lastBlock.nonEmpty()) {
332 lastBlockHeight = lastBlock.getValue().height;
333 lastBlockHash = lastBlock.getValue().blockHash;
337 for (
auto &batch : buddy->savedState.getValue().batches) {
338 if (batch.blockHeight > lastBlockHeight) {
339 batch.blockHeight = (uint32_t)lastBlockHeight;
340 batch.blockHash = lastBlockHash;
345 buddy->preferences->editor()->template putObject<BlockchainExplorerAccountSynchronizationSavedState>(
346 "state", buddy->savedState.getValue())->commit();
351 buddy->logger->info(
"Relaunch synchronization after recovering from reorganization");
353 return self->synchronizeBatches(currentBatchIndex, buddy);
369 Future<bool> synchronizeBatch(uint32_t currentBatchIndex,
370 std::shared_ptr<SynchronizationBuddy> buddy,
371 bool hadTransactions =
false) {
372 buddy->logger->info(
"SYNC BATCH {}", currentBatchIndex);
375 auto self = getSharedFromThis();
376 auto& batchState = buddy->savedState.getValue().batches[currentBatchIndex];
378 if (batchState.blockHeight > 0) {
382 auto derivationBenchmark = std::make_shared<Benchmarker>(
"Batch derivation", buddy->logger);
383 derivationBenchmark->start();
385 auto batch = vector::map<std::string, std::shared_ptr<AddressType>>(
386 buddy->keychain->getAllObservableAddresses((uint32_t) (currentBatchIndex * buddy->halfBatchSize),
387 (uint32_t) ((currentBatchIndex + 1) * buddy->halfBatchSize - 1)),
388 [] (
const std::shared_ptr<AddressType>& addr) -> std::string {
389 return addr->toString();
393 derivationBenchmark->stop();
395 auto benchmark = std::make_shared<Benchmarker>(
"Get batch", buddy->logger);
398 ->getTransactions(batch, blockHash, buddy->token)
399 .template flatMap<bool>(buddy->account->getContext(), [
self, currentBatchIndex, buddy, hadTransactions, benchmark] (
const std::shared_ptr<typename Explorer::TransactionsBulk>& bulk) ->
Future<bool> {
402 auto insertionBenchmark = std::make_shared<Benchmarker>(
"Transaction computation", buddy->logger);
403 insertionBenchmark->start();
405 auto& batchState = buddy->savedState.getValue().batches[currentBatchIndex];
406 soci::session sql(buddy->wallet->getDatabase()->getPool());
407 soci::transaction tr(sql);
408 buddy->logger->info(
"Got {} txs", bulk->transactions.size());
409 for (
const auto& tx : bulk->transactions) {
410 buddy->account->putTransaction(sql, tx);
413 auto it = buddy->transactionsToDrop.find(tx.hash);
414 if (it != buddy->transactionsToDrop.end()) {
416 if (tx.block.nonEmpty()) {
417 buddy->savedState.getValue().pendingTxsHash.erase(it->first);
419 buddy->savedState.getValue().pendingTxsHash.insert(std::pair<std::string, std::string>(it->first, it->second));
424 buddy->transactionsToDrop.erase(tx.hash);
428 buddy->account->emitEventsNow();
431 if (bulk->transactions.size() > 0) {
432 auto &lastBlock = bulk->transactions.back().block;
434 if (lastBlock.nonEmpty()) {
435 batchState.blockHeight = (uint32_t) lastBlock.getValue().height;
436 batchState.blockHash = lastBlock.getValue().hash;
440 insertionBenchmark->stop();
442 auto hadTX = hadTransactions || bulk->transactions.size() > 0;
444 return self->synchronizeBatch(currentBatchIndex, buddy, hadTX);
451 virtual void updateCurrentBlock(std::shared_ptr<SynchronizationBuddy> &buddy,
452 const std::shared_ptr<api::ExecutionContext> &context) = 0;
453 virtual void updateTransactionsToDrop(soci::session &sql,
454 std::shared_ptr<SynchronizationBuddy> &buddy,
455 const std::string &accountUid) = 0;
457 std::shared_ptr<Explorer> _explorer;
458 std::shared_ptr<ProgressNotifier<Unit>> _notifier;
460 std::shared_ptr<Account> _currentAccount;
464 virtual std::shared_ptr<AbstractBlockchainExplorerAccountSynchronizer<Account, AddressType, Keychain, Explorer>> getSharedFromThis() = 0;
465 virtual std::shared_ptr<api::ExecutionContext> getSynchronizerContext() = 0;
467 std::shared_ptr<Preferences> _internalPreferences;
473 #endif //LEDGER_CORE_ABSTRACTBLOCKCHAINEXPLORERACCOUNTSYNCHRONIZER_H Definition: AbstractBlockchainExplorerAccountSynchronizer.h:57
static std::string const KEYCHAIN_OBSERVABLE_RANGE
Definition: Configuration.hpp:30
Definition: AbstractBlockchainExplorerAccountSynchronizer.h:115
Definition: Option.hpp:49
Definition: AbstractAccount.hpp:55
Definition: Deffered.hpp:49
Definition: AbstractBlockchainExplorerAccountSynchronizer.h:87
Definition: AbstractBlockchainExplorerAccountSynchronizer.h:71
static int32_t const KEYCHAIN_DEFAULT_OBSERVABLE_RANGE
Definition: ConfigurationDefaults.hpp:40
Definition: Account.cpp:8
static std::string const SYNCHRONIZATION_HALF_BATCH_SIZE
Definition: Configuration.hpp:54
Definition: Exception.hpp:45
Definition: logger.hpp:44