ledger-core
AbstractBlockchainExplorerAccountSynchronizer.h
1 /*
2  *
3  * AbstractBlockchainExplorerAccountSynchronizer
4  *
5  * Created by El Khalil Bellakrid on 29/07/2018.
6  *
7  * The MIT License (MIT)
8  *
9  * Copyright (c) 2018 Ledger
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in all
19  * copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE.
28  *
29  */
30 
31 
32 #ifndef LEDGER_CORE_ABSTRACTBLOCKCHAINEXPLORERACCOUNTSYNCHRONIZER_H
33 #define LEDGER_CORE_ABSTRACTBLOCKCHAINEXPLORERACCOUNTSYNCHRONIZER_H
34 
35 #include <algorithm>
36 #include <memory>
37 #include <mutex>
38 
39 #include <api/Configuration.hpp>
40 #include <api/ConfigurationDefaults.hpp>
41 #include <async/Future.hpp>
42 #include <async/wait.h>
43 #include <collections/DynamicObject.hpp>
44 #include <debug/Benchmarker.h>
45 #include <events/ProgressNotifier.h>
46 #include <utils/Unit.hpp>
47 #include <utils/DateUtils.hpp>
48 #include <utils/DurationUtils.h>
49 #include <preferences/Preferences.hpp>
50 #include <wallet/common/AbstractWallet.hpp>
51 #include <wallet/common/database/BlockDatabaseHelper.h>
52 #include <wallet/common/database/AccountDatabaseHelper.h>
53 
54 namespace ledger {
55  namespace core {
56 
58  std::string blockHash;
59  uint32_t blockHeight;
60 
62  blockHeight = 0;
63  }
64 
65  template<class Archive>
66  void serialize(Archive & archive) {
67  archive(blockHash, blockHeight);
68  };
69  };
70 
72  uint32_t halfBatchSize;
73  std::vector<BlockchainExplorerAccountSynchronizationBatchSavedState> batches;
74  std::map<std::string, std::string> pendingTxsHash;
75 
77  }
78 
79  template<class Archive>
80  void serialize(Archive & archive)
81  {
82  archive(halfBatchSize, batches, pendingTxsHash); // serialize things by passing them to the archive
83  }
84  };
85 
86  template<typename Account, typename AddressType, typename Keychain, typename Explorer>
88  public:
89 
90  std::shared_ptr<ProgressNotifier<Unit>> synchronizeAccount(const std::shared_ptr<Account>& account) {
91  std::lock_guard<std::mutex> lock(_lock);
92  if (!_currentAccount) {
93  _currentAccount = account;
94  _notifier = std::make_shared<ProgressNotifier<Unit>>();
95  auto self = getSharedFromThis();
96  performSynchronization(account).onComplete(getSynchronizerContext(), [self] (const Try<Unit> &result) {
97  std::lock_guard<std::mutex> l(self->_lock);
98  if (result.isFailure()) {
99  self->_notifier->failure(result.getFailure());
100  } else {
101  self->_notifier->success(unit);
102  }
103  self->_notifier = nullptr;
104  self->_currentAccount = nullptr;
105  });
106 
107  } else if (account != _currentAccount) {
108  throw make_exception(api::ErrorCode::RUNTIME_ERROR, "This synchronizer is already in use");
109  }
110  return _notifier;
111  };
112 
113  protected:
114 
116  std::shared_ptr<Preferences> preferences;
117  std::shared_ptr<spdlog::logger> logger;
118  std::chrono::system_clock::time_point startDate;
119  std::shared_ptr<AbstractWallet> wallet;
120  std::shared_ptr<DynamicObject> configuration;
121  uint32_t halfBatchSize;
122  std::shared_ptr<Keychain> keychain;
124  Option<void *> token;
125  std::shared_ptr<Account> account;
126  std::map<std::string, std::string> transactionsToDrop;
127  };
128 
129 
130  static void initializeSavedState(Option<BlockchainExplorerAccountSynchronizationSavedState> &savedState,
131  int32_t halfBatchSize) {
132  if (savedState.hasValue() && savedState.getValue()
133  .halfBatchSize != halfBatchSize) {
135  block.blockHeight = 1U << 31U;
136 
137  for (auto &state : savedState.getValue()
138  .batches) {
139  if (state.blockHeight < block.blockHeight) {
140  block = state;
141  }
142  }
143  auto newBatchCount = (savedState.getValue()
144  .batches.size() * savedState.getValue()
145  .halfBatchSize) / halfBatchSize;
146  if ((savedState.getValue()
147  .batches.size() * savedState.getValue()
148  .halfBatchSize) / halfBatchSize != 0)
149  newBatchCount += 1;
150  savedState.getValue()
151  .batches
152  .clear();
153  savedState.getValue()
154  .halfBatchSize = (uint32_t) halfBatchSize;
155  for (auto i = 0; i <= newBatchCount; i++) {
157  s.blockHash = block.blockHash;
158  s.blockHeight = block.blockHeight;
159  savedState.getValue().batches.push_back(s);
160  }
161  } else if (savedState.isEmpty()) {
164  savedState.getValue()
165  .halfBatchSize = (uint32_t) halfBatchSize;
166  }
167  };
168 
169  Future<Unit> performSynchronization(const std::shared_ptr<Account> &account) {
170  auto buddy = std::make_shared<SynchronizationBuddy>();
171 
172  buddy->account = account;
173  buddy->preferences = std::static_pointer_cast<AbstractAccount>(account)->getInternalPreferences()
174  ->getSubPreferences(
175  "AbstractBlockchainExplorerAccountSynchronizer");
176  buddy->logger = account->logger();
177  buddy->startDate = DateUtils::now();
178  buddy->wallet = account->getWallet();
179  buddy->configuration = std::static_pointer_cast<AbstractAccount>(account)->getWallet()->getConfig();
180  buddy->halfBatchSize = (uint32_t) buddy->configuration
183  buddy->keychain = account->getKeychain();
184  buddy->savedState = buddy->preferences
185  ->template getObject<BlockchainExplorerAccountSynchronizationSavedState>("state");
186  buddy->logger
187  ->info("Starting synchronization for account#{} ({}) of wallet {} at {}",
188  account->getIndex(),
189  account->getKeychain()->getRestoreKey(),
190  account->getWallet()->getName(), DateUtils::toJSON(buddy->startDate));
191 
192  //Check if reorganization happened
193  soci::session sql(buddy->wallet->getDatabase()->getPool());
194  if (buddy->savedState.nonEmpty()) {
195 
196  //Get deepest block saved in batches to be part of reorg
197  auto sortedBatches = buddy->savedState.getValue().batches;
198  std::sort(sortedBatches.begin(), sortedBatches.end(), [](const BlockchainExplorerAccountSynchronizationBatchSavedState &lhs,
200  return lhs.blockHeight < rhs.blockHeight;
201  });
202 
203  auto currencyName = buddy->wallet->getCurrency().name;
204  size_t index = 0;
205  //Reorg can't happen until genesis block, safely initialize with 0
206  uint64_t deepestFailedBlockHeight = 0;
207  while (index < sortedBatches.size() && !BlockDatabaseHelper::blockExists(sql, sortedBatches[index].blockHash, currencyName)) {
208  deepestFailedBlockHeight = sortedBatches[index].blockHeight;
209  index ++;
210  }
211 
212  //Case of reorg, update savedState's batches
213  if (deepestFailedBlockHeight > 0) {
214  //Get last block (in DB) which contains current account's operations
215  auto previousBlock = AccountDatabaseHelper::getLastBlockWithOperations(sql, buddy->account->getAccountUid());
216  for (auto& batch : buddy->savedState.getValue().batches) {
217  if (batch.blockHeight >= deepestFailedBlockHeight) {
218  batch.blockHeight = previousBlock.nonEmpty() ? (uint32_t)previousBlock.getValue().height : 0;
219  batch.blockHash = previousBlock.nonEmpty() ? previousBlock.getValue().blockHash : "";
220  }
221  }
222  }
223  }
224 
225  initializeSavedState(buddy->savedState, buddy->halfBatchSize);
226 
227  updateTransactionsToDrop(sql, buddy, account->getAccountUid());
228 
229  updateCurrentBlock(buddy, account->getContext());
230 
231  auto self = getSharedFromThis();
232  return _explorer->startSession().template map<Unit>(account->getContext(), [buddy] (void * const& t) -> Unit {
233  buddy->logger->info("Synchronization token obtained");
234  buddy->token = Option<void *>(t);
235  return unit;
236  }).template flatMap<Unit>(account->getContext(), [buddy, self] (const Unit&) {
237  return self->synchronizeBatches(0, buddy);
238  }).template flatMap<Unit>(account->getContext(), [self, buddy] (const Unit&) {
239  return self->_explorer->killSession(buddy->token.getValue());
240  }).template map<Unit>(ImmediateExecutionContext::INSTANCE, [self, buddy] (const Unit&) {
241  auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
242  (DateUtils::now() - buddy->startDate.time_since_epoch()).time_since_epoch());
243  buddy->logger->info("End synchronization for account#{} of wallet {} in {}", buddy->account->getIndex(),
244  buddy->account->getWallet()->getName(), DurationUtils::formatDuration(duration));
245 
246  //Delete dropped txs from DB
247  soci::session sql(buddy->wallet->getDatabase()->getPool());
248  for (auto& tx : buddy->transactionsToDrop) {
249  //Check if tx is pending
250  auto it = buddy->savedState.getValue().pendingTxsHash.find(tx.first);
251  if (it == buddy->savedState.getValue().pendingTxsHash.end()) {
252  buddy->logger->info("Drop transaction {}", tx.first);
253  buddy->logger->info("Deleting operation from DB {}", tx.second);
254  //delete tx.second from DB (from operations)
255  sql << "DELETE FROM operations WHERE uid = :uid", soci::use(tx.second);
256  }
257  }
258 
259  self->_currentAccount = nullptr;
260  return unit;
261  }).recover(ImmediateExecutionContext::INSTANCE, [buddy] (const Exception& ex) -> Unit {
262  auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
263  (DateUtils::now() - buddy->startDate.time_since_epoch()).time_since_epoch());
264  buddy->logger->error("Error during during synchronization for account#{} of wallet {} in {} ms", buddy->account->getIndex(),
265  buddy->account->getWallet()->getName(), duration.count());
266  buddy->logger->error("Due to {}, {}", api::to_string(ex.getErrorCode()), ex.getMessage());
267  throw ex;
268  });
269  };
270 
271  // Synchronize batches.
272  //
273  // This function will synchronize all batches by iterating over batches and transactions
274  // bulks. The input buddy can be used to customize the behavior of the synchronization.
275  Future<Unit> synchronizeBatches(uint32_t currentBatchIndex,
276  std::shared_ptr<SynchronizationBuddy> buddy) {
277  buddy->logger->info("SYNC BATCHES");
278  //For ETH and XRP like wallets, one account corresponds to one ETH address,
279  //so ne need to discover other batches
280  auto hasMultipleAddresses = buddy->wallet->getWalletType() == api::WalletType::BITCOIN;
281  auto done = currentBatchIndex >= buddy->savedState.getValue().batches.size() - 1;
282  if (currentBatchIndex >= buddy->savedState.getValue().batches.size()) {
283  buddy->savedState.getValue().batches.push_back(BlockchainExplorerAccountSynchronizationBatchSavedState());
284  }
285 
286  auto self = getSharedFromThis();
287  auto& batchState = buddy->savedState.getValue().batches[currentBatchIndex];
288 
289  auto benchmark = std::make_shared<Benchmarker>(fmt::format("Synchronize batch {}", currentBatchIndex), buddy->logger);
290  benchmark->start();
291  return synchronizeBatch(currentBatchIndex, buddy).template flatMap<Unit>(buddy->account->getContext(), [=] (const bool& hadTransactions) -> Future<Unit> {
292  benchmark->stop();
293 
294  buddy->preferences->editor()->template putObject<BlockchainExplorerAccountSynchronizationSavedState>("state", buddy->savedState.getValue())->commit();
295 
296  //Sync stops if there are no more batches in savedState and last batch has no transactions
297  //But we may want to force sync of accounts within KEYCHAIN_OBSERVABLE_RANGE
298  auto discoveredAddresses = currentBatchIndex * buddy->halfBatchSize;
299  auto lastDiscoverableAddress = buddy->configuration->getInt(api::Configuration::KEYCHAIN_OBSERVABLE_RANGE).value_or(buddy->halfBatchSize);
300  if (hasMultipleAddresses && (!done || (done && hadTransactions) || lastDiscoverableAddress > discoveredAddresses)) {
301  return self->synchronizeBatches(currentBatchIndex + 1, buddy);
302  }
303 
304  return Future<Unit>::successful(unit);
305  }).recoverWith(ImmediateExecutionContext::INSTANCE, [=] (const Exception &exception) -> Future<Unit> {
306  buddy->logger->info("Recovering from failing synchronization : {}", exception.getMessage());
307 
308  //A block reorganization happened
309  if (exception.getErrorCode() == api::ErrorCode::BLOCK_NOT_FOUND &&
310  buddy->savedState.nonEmpty()) {
311  buddy->logger->info("Recovering from reorganization");
312 
313  //Get its block/block height
314  auto& failedBatch = buddy->savedState.getValue().batches[currentBatchIndex];
315  auto failedBlockHeight = failedBatch.blockHeight;
316  auto failedBlockHash = failedBatch.blockHash;
317 
318  if (failedBlockHeight > 0) {
319  //Delete data related to failedBlock (and all blocks above it)
320  buddy->logger->info("Deleting blocks above block height: {}", failedBlockHeight);
321 
322  soci::session sql(buddy->wallet->getDatabase()->getPool());
323  sql << "DELETE FROM blocks where height >= :failedBlockHeight", soci::use(failedBlockHeight);
324 
325  //Get last block not part from reorg
326  auto lastBlock = BlockDatabaseHelper::getLastBlock(sql, buddy->wallet->getCurrency().name);
327 
328  //Resync from the "beginning" if no last block in DB
329  int64_t lastBlockHeight = 0;
330  std::string lastBlockHash;
331  if (lastBlock.nonEmpty()) {
332  lastBlockHeight = lastBlock.getValue().height;
333  lastBlockHash = lastBlock.getValue().blockHash;
334  }
335 
336  //Update savedState's batches
337  for (auto &batch : buddy->savedState.getValue().batches) {
338  if (batch.blockHeight > lastBlockHeight) {
339  batch.blockHeight = (uint32_t)lastBlockHeight;
340  batch.blockHash = lastBlockHash;
341  }
342  }
343 
344  //Save new savedState
345  buddy->preferences->editor()->template putObject<BlockchainExplorerAccountSynchronizationSavedState>(
346  "state", buddy->savedState.getValue())->commit();
347 
348  //Synchronize same batch now with an existing block (of hash lastBlockHash)
349  //if failedBatch was not the deepest block part of that reorg, this recursive call
350  //will ensure to get (and delete from DB) to the deepest failed block (part of reorg)
351  buddy->logger->info("Relaunch synchronization after recovering from reorganization");
352 
353  return self->synchronizeBatches(currentBatchIndex, buddy);
354  }
355  } else {
356  return Future<Unit>::failure(exception);
357  }
358 
359  return Future<Unit>::successful(unit);
360  });
361  };
362 
363  // Synchronize a transactions batch.
364  //
365  // The currentBatchIndex is the currently synchronized batch. buddy is the
366  // synchronization object used to accumulate a state. hadTransactions is used to check
367  // whether more data is needed. If a block doesn’t have any transaction, it means that
368  // we must stop.
369  Future<bool> synchronizeBatch(uint32_t currentBatchIndex,
370  std::shared_ptr<SynchronizationBuddy> buddy,
371  bool hadTransactions = false) {
372  buddy->logger->info("SYNC BATCH {}", currentBatchIndex);
373 
374  Option<std::string> blockHash;
375  auto self = getSharedFromThis();
376  auto& batchState = buddy->savedState.getValue().batches[currentBatchIndex];
377 
378  if (batchState.blockHeight > 0) {
379  blockHash = Option<std::string>(batchState.blockHash);
380  }
381 
382  auto derivationBenchmark = std::make_shared<Benchmarker>("Batch derivation", buddy->logger);
383  derivationBenchmark->start();
384 
385  auto batch = vector::map<std::string, std::shared_ptr<AddressType>>(
386  buddy->keychain->getAllObservableAddresses((uint32_t) (currentBatchIndex * buddy->halfBatchSize),
387  (uint32_t) ((currentBatchIndex + 1) * buddy->halfBatchSize - 1)),
388  [] (const std::shared_ptr<AddressType>& addr) -> std::string {
389  return addr->toString();
390  }
391  );
392 
393  derivationBenchmark->stop();
394 
395  auto benchmark = std::make_shared<Benchmarker>("Get batch", buddy->logger);
396  benchmark->start();
397  return _explorer
398  ->getTransactions(batch, blockHash, buddy->token)
399  .template flatMap<bool>(buddy->account->getContext(), [self, currentBatchIndex, buddy, hadTransactions, benchmark] (const std::shared_ptr<typename Explorer::TransactionsBulk>& bulk) -> Future<bool> {
400  benchmark->stop();
401 
402  auto insertionBenchmark = std::make_shared<Benchmarker>("Transaction computation", buddy->logger);
403  insertionBenchmark->start();
404 
405  auto& batchState = buddy->savedState.getValue().batches[currentBatchIndex];
406  soci::session sql(buddy->wallet->getDatabase()->getPool());
407  soci::transaction tr(sql);
408  buddy->logger->info("Got {} txs", bulk->transactions.size());
409  for (const auto& tx : bulk->transactions) {
410  buddy->account->putTransaction(sql, tx);
411 
412  //Update first pendingTxHash in savedState
413  auto it = buddy->transactionsToDrop.find(tx.hash);
414  if (it != buddy->transactionsToDrop.end()) {
415  //If block non empty, tx is no longer pending
416  if (tx.block.nonEmpty()) {
417  buddy->savedState.getValue().pendingTxsHash.erase(it->first);
418  } else { //Otherwise tx is in mempool but pending
419  buddy->savedState.getValue().pendingTxsHash.insert(std::pair<std::string, std::string>(it->first, it->second));
420  }
421  }
422 
423  //Remove from tx to drop
424  buddy->transactionsToDrop.erase(tx.hash);
425  }
426 
427  tr.commit();
428  buddy->account->emitEventsNow();
429 
430  // Get the last block
431  if (bulk->transactions.size() > 0) {
432  auto &lastBlock = bulk->transactions.back().block;
433 
434  if (lastBlock.nonEmpty()) {
435  batchState.blockHeight = (uint32_t) lastBlock.getValue().height;
436  batchState.blockHash = lastBlock.getValue().hash;
437  }
438  }
439 
440  insertionBenchmark->stop();
441 
442  auto hadTX = hadTransactions || bulk->transactions.size() > 0;
443  if (bulk->hasNext) {
444  return self->synchronizeBatch(currentBatchIndex, buddy, hadTX);
445  } else {
446  return Future<bool>::successful(hadTX);
447  }
448  });
449  };
450 
451  virtual void updateCurrentBlock(std::shared_ptr<SynchronizationBuddy> &buddy,
452  const std::shared_ptr<api::ExecutionContext> &context) = 0;
453  virtual void updateTransactionsToDrop(soci::session &sql,
454  std::shared_ptr<SynchronizationBuddy> &buddy,
455  const std::string &accountUid) = 0;
456 
457  std::shared_ptr<Explorer> _explorer;
458  std::shared_ptr<ProgressNotifier<Unit>> _notifier;
459  std::mutex _lock;
460  std::shared_ptr<Account> _currentAccount;
461 
462  private:
463 
464  virtual std::shared_ptr<AbstractBlockchainExplorerAccountSynchronizer<Account, AddressType, Keychain, Explorer>> getSharedFromThis() = 0;
465  virtual std::shared_ptr<api::ExecutionContext> getSynchronizerContext() = 0;
466 
467  std::shared_ptr<Preferences> _internalPreferences;
468  };
469  }
470 }
471 
472 
473 #endif //LEDGER_CORE_ABSTRACTBLOCKCHAINEXPLORERACCOUNTSYNCHRONIZER_H
Definition: AbstractBlockchainExplorerAccountSynchronizer.h:57
Definition: Try.hpp:49
static std::string const KEYCHAIN_OBSERVABLE_RANGE
Definition: Configuration.hpp:30
Definition: AbstractBlockchainExplorerAccountSynchronizer.h:115
Definition: Option.hpp:49
Definition: AbstractAccount.hpp:55
Definition: Deffered.hpp:49
Definition: AbstractBlockchainExplorerAccountSynchronizer.h:87
Definition: Unit.hpp:39
Definition: AbstractBlockchainExplorerAccountSynchronizer.h:71
static int32_t const KEYCHAIN_DEFAULT_OBSERVABLE_RANGE
Definition: ConfigurationDefaults.hpp:40
Definition: Account.cpp:8
static std::string const SYNCHRONIZATION_HALF_BATCH_SIZE
Definition: Configuration.hpp:54
Definition: Exception.hpp:45
Definition: logger.hpp:44