bedrock-ledger-storage-mongodb/lib/LedgerOperationStorage.js

/*!
 * Ledger operation storage class.
 *
 * Copyright (c) 2017-2018 Digital Bazaar, Inc. All rights reserved.
 */
'use strict';

const _ = require('lodash');
const assert = require('assert-plus');
const bedrock = require('bedrock');
const database = require('bedrock-mongodb');
const logger = require('./logger');
const {BedrockError} = bedrock.util;

/**
 * The operation API is used to perform operations on operations associated with
 * a particular event.
 */
module.exports = class LedgerOperationStorage {
  constructor({eventCollection, ledgerNodeId, operationCollection}) {
    this.collection = operationCollection;
    this.eventCollection = eventCollection;
    this.eventCollectionName = eventCollection.s.name;
    this.ledgerNodeId = ledgerNodeId;
    this.plugins = {};
    // expose utils that can be used in storage plugins
    this.util = {
      assert,
      dbHash: database.hash,
      logger,
      BedrockError
    };
  }

  addMany({ignoreDuplicate = true, operations}, callback) {
    this.collection.insertMany(
      operations, _.assign({}, database.writeOptions, {ordered: false}),
      (err, result) => {
        if(err && ignoreDuplicate && database.isDuplicateError(err)) {
          return callback(null, result);
        }
        if(err) {
          return callback(err);
        }
        callback(null, result);
      });
  }

  // TODO: an optional parameter like
  // `since: {blockHeight: 0, blockOrder: 0, eventOrder: 0}`
  // could be useful if the state machine only needs operations after a certain
  // point
  getRecordHistory({maxBlockHeight, recordId}, callback) {
    assert.string(recordId, 'recordId');
    if(maxBlockHeight !== undefined && !(Number.isInteger(maxBlockHeight) &&
      maxBlockHeight > 0)) {
      throw new TypeError('maxBlockHeight must be a positive integer.');
    }

    const query = {recordId: database.hash(recordId)};

    const eventMatch = {'meta.eventMeta.consensus': {$exists: true}};
    if(maxBlockHeight) {
      eventMatch['meta.eventMeta.blockHeight'] = {$lte: maxBlockHeight};
    }

    this.collection.aggregate([
      {$match: query},
      {$project: {_id: 0}},
      {$lookup: {
        from: this.eventCollectionName,
        let: {eventHash: '$meta.eventHash'},
        pipeline: [
          {$match: {$expr: {$eq: ['$eventHash', '$$eventHash']}}},
          {$project: {
            _id: 0,
            'meta.consensus': 1,
            'meta.blockHeight': 1,
            'meta.blockOrder': 1
          }},
          {$replaceRoot: {newRoot: "$meta"}}
        ],
        as: 'meta.eventMeta'
      }},
      {$unwind: '$meta.eventMeta'},
      {$match: eventMatch},
      {$sort: {
        'meta.eventMeta.blockHeight': 1,
        'meta.eventMeta.blockOrder': 1,
        'meta.eventOrder': 1
      }},
    ], {allowDiskUse: true}).toArray((err, result) => {
      if(err) {
        return callback(err);
      }
      if(result.length === 0) {
        return callback(new BedrockError(
          'Failed to get history for the specified record.',
          'NotFoundError', {
            httpStatusCode: 404,
            maxBlockHeight,
            public: true,
            recordId
          }));
      }
      callback(null, result);
    });
  }

  /**
   * Determine if operations exist.
   *
   * @param eventHash the hash of the event associated with the operation
   * @param operationHash the hash or array of hashes of the operation(s).
   * @param callback(err, result) called once the operation completes.
   */
  exists({eventHash, operationHash}, callback) {
    // NOTE: duplicate opHashes are acceptable, but do not need to be included
    // in the query
    let hashes = [].concat(operationHash);
    const totalHashes = hashes.length;
    hashes = _.uniq(hashes);
    const query = {
      'meta.deleted': {$exists: false},
      'meta.eventHash': database.hash(eventHash),
      'meta.eventOrder': {$exists: true},
      'meta.operationHash': {$in: hashes},
    };
    this.collection.find(query).count((err, result) =>
      err ? callback(err) : callback(null, totalHashes === result));
  }
};