/*!
* Ledger event storage class.
*
* Copyright (c) 2017-2018 Digital Bazaar, Inc. All rights reserved.
*/
'use strict';
const _ = require('lodash');
const assert = require('assert-plus');
const async = require('async');
const bedrock = require('bedrock');
const database = require('bedrock-mongodb');
const jsonld = bedrock.jsonld;
const logger = require('./logger');
const {BedrockError} = bedrock.util;
// TODO: ideally, code to create indexes for event storage would be in
// this file
/**
* The events API is used to perform operations on events associated
* with a particular ledger.
*/
module.exports = class LedgerEventStorage {
constructor({eventCollection, ledgerNodeId, operationStorage}) {
// assign the collection used for event storage
this.collection = eventCollection;
this.operationStorage = operationStorage;
this.ledgerNodeId = ledgerNodeId;
this.plugins = {};
// expose utils that can be used in storage plugins
this.util = {
assert,
dbHash: database.hash,
logger,
BedrockError,
};
}
/**
* Adds an event to associate with a ledger given an event and a set of
* options.
*
* event - the event to associate with a ledger.
* meta - the metadata that is associated with the event.
* eventHash - the hash of the event data.
* callback(err, result) - the callback to call when finished.
* err - An Error if an error occurred, null otherwise.
* result - the result of the operation.
* event - the event that was committed to storage.
* meta - the metadata that was committed to storage.
*/
add({event, meta}, callback) {
if(!(event && _.isObject(event))) {
throw new TypeError('`event` must be an object.');
}
if(!(meta && meta.eventHash)) {
throw new TypeError('`meta.eventHash` is required.');
}
const {operationHash} = event;
// drop `operationHash` from the event without mutating or cloning
const _event = _.pickBy(event, (v, k) => k !== 'operationHash');
// insert the event
const now = Date.now();
const record = {
eventHash: database.hash(meta.eventHash),
event: _event,
meta: _.defaults(meta, {
created: now,
updated: now
})
};
logger.verbose(`adding event: ${meta.eventHash}`);
async.auto({
checkOps: callback => {
if(!operationHash && jsonld.hasValue(
event, 'type', 'WebLedgerOperationEvent')) {
return callback(new BedrockError(
'`operationHash` is required for event type ' +
'`WebLedgerOperationEvent`', 'DataError', {event}));
}
// some types of events do not include operationHash
if(!operationHash) {
return callback();
}
const {eventHash} = meta;
this.operationStorage.exists(
{eventHash, operationHash}, (err, result) => {
if(err) {
return callback(err);
}
// failure
if(!result) {
return callback(new BedrockError(
'Some operations have not been properly assigned to the event.',
'InvalidStateError', {eventRecord: record, operationHash}));
}
// success
callback();
});
},
insert: ['checkOps', (results, callback) => this.collection.insert(
record, database.writeOptions, (err, result) => {
if(err && database.isDuplicateError(err)) {
return callback(new BedrockError(
'An event with the same hash already exists.',
'DuplicateError', {
httpStatusCode: 409,
public: true,
eventHash: meta.eventHash
}, err.message));
}
if(err) {
return callback(err);
}
callback(null, {
event: result.ops[0].event,
meta: result.ops[0].meta
});
})],
}, (err, results) => {
if(err) {
return callback(err);
}
callback(null, results.insert);
});
}
// TODO: add docs
addMany({events}, callback) {
const dupHashes = [];
// retries on duplicate errors
async.retry({
errorFilter: database.isDuplicateError,
times: Infinity
}, callback => this.collection.insertMany(
events, {ordered: true}, err => {
if(err) {
if(database.isDuplicateError(err)) {
// remove events up to the dup and retry
dupHashes.push(events[err.index].eventHash);
events.splice(0, err.index + 1);
if(events.length === 0) {
// the last event was a duplicate, no more events to try, end
return callback();
}
return callback(err);
}
return callback(err);
}
callback();
}),
err => {
if(err) {
return callback(err);
}
callback(null, {dupHashes});
});
}
/**
* Identify events that are not in storage.
*
* @param eventHash the hash or array of hashes to check.
* @param callback(err, result) called onced the operation completes.
*/
difference(eventHash, callback) {
const hashes = [].concat(eventHash);
const query = {
'meta.deleted': {$exists: false},
eventHash: {$in: hashes.map(h => database.hash(h))},
};
this.collection.find(query, {_id: 0, 'meta.eventHash': 1})
.toArray((err, result) => {
if(err) {
return callback(err);
}
const localEvents = new Set(result.map(r => r.meta.eventHash));
callback(null, hashes.filter(v => !localEvents.has(v)));
});
}
// return records in the same order as the request
// FIXME: `event.operation` must be removed from merge events
getMany({eventHashes}) {
eventHashes = eventHashes.map(h => database.hash(h));
const operationCollectionName = this.operationStorage.collection.s.name;
return this.collection.aggregate([
{$match: {
eventHash: {$in: eventHashes},
}},
{$lookup:
{
from: operationCollectionName,
let: {eventHash: '$eventHash'},
pipeline: [
{$match: {$expr: {$eq: ['$meta.eventHash', '$$eventHash']}}},
{$sort: {'meta.eventOrder': 1}},
{$replaceRoot: {newRoot: "$operation"}}
],
as: 'event.operation'
}
},
{$addFields: {_order: {$indexOfArray: [eventHashes, '$eventHash']}}},
{$sort: {'_order': 1}},
{$project: {_id: 0, _order: 0}},
], {allowDiskUse: true});
}
/**
* Determine if an event with a given hash exists.
*
* @param eventHash the hash or array of hashes of the event(s).
* @param callback(err, result) called once the operation completes.
*/
exists(eventHash, callback) {
const hashes = [].concat(eventHash).map(h => database.hash(h));
const query = {
'meta.deleted': {$exists: false},
eventHash: {$in: hashes},
};
this.collection.find(query).count((err, result) =>
err ? callback(err) : callback(null, hashes.length === result));
}
/**
* Gets an event in the ledger given a query and a set of options.
*
* eventHash - the hash of the event to fetch from storage.
* callback(err, result) - the callback to call when finished.
* err - An Error if an error occurred, null otherwise.
* result - the result of the retrieval
* event - the event.
* meta - metadata about the event.
*/
get(eventHash, callback) {
const operationCollectionName = this.operationStorage.collection.s.name;
const query = {
eventHash: database.hash(eventHash),
'meta.deleted': {$exists: false}
};
this.collection.aggregate([
{$match: query},
{$limit: 1},
{$lookup: {
from: operationCollectionName,
let: {eventHash: '$eventHash'},
pipeline: [
{$match: {$expr: {$eq: ['$meta.eventHash', '$$eventHash']}}},
{$sort: {'meta.eventOrder': 1}},
{$replaceRoot: {newRoot: "$operation"}}
],
as: 'event.operation'
}},
], {allowDiskUse: true}).toArray((err, result) => {
if(err) {
return callback(err);
}
if(result.length === 0) {
return callback(new BedrockError(
'Failed to get event. An event with the given ID does not exist.',
'NotFoundError', {
httpStatusCode: 404,
public: true,
eventHash
}));
}
const {event, meta} = result[0];
if(event.type !== 'WebLedgerOperationEvent') {
delete event.operation;
}
callback(null, {event, meta});
});
}
/**
* Gets the active configuration based on blockHeight. A ledger configuration
* is active for blocks that are *subsequent* to the block that includes
* the ledger configuration event itself.
*
* blockHeight - the blockHeight used to locate the ledger configuration.
* callback(err, result) - the callback to call when finished.
* err - An Error if an error occurred, null otherwise.
* result - the result of the retrieval.
* event - the event.
* meta - metadata about the event.
*/
getActiveConfig({blockHeight}, callback) {
assert.optionalNumber(blockHeight, 'blockHeight');
const query = {
'event.type': 'WebLedgerConfigurationEvent',
// NOTE: the active config does not include any configs that may be in
// the block specified by blockHeight
'meta.blockHeight': {$lt: blockHeight},
'meta.deleted': {$exists: false},
};
this.collection.find(query).sort({
'meta.blockHeight': -1,
'meta.blockOrder': -1
}).limit(1).toArray((err, result) => {
if(err) {
return callback(err);
}
if(result.length === 0) {
return callback(new BedrockError(
'The active ledger configuration was not found.',
'NotFoundError', {blockHeight, httpStatusCode: 404, public: true}));
}
callback(null, result[0]);
});
}
/**
* Gets a count of events.
*
* @param consensus - filter events based on consensus status.
* @param type - filter events based on event type.
*
* @param callback(err, count) - the callback to call when finished.
*/
// consensus === undefined means ignore consensus
getCount({consensus, type}, callback) {
if(!(callback && typeof callback === 'function')) {
throw new TypeError('`callback` must be a function.');
}
const query = {
'meta.deleted': {
$exists: false
}
};
if(typeof consensus === 'boolean') {
query['meta.consensus'] = {$exists: consensus};
}
if(type) {
query['event.type'] = type;
}
this.collection.find(query).count(callback);
}
/**
* Gets the latest configuration event that has consensus.
*
* callback(err, result) - the callback to call when finished.
* err - An Error if an error occurred, null otherwise.
* result - the result of the retrieval.
* event - the event.
* meta - metadata about the event.
*/
getLatestConfig(callback) {
// find the latest config event that has consensus
const query = {
'event.type': 'WebLedgerConfigurationEvent',
'meta.deleted': {$exists: false},
};
this.collection.find(query).sort({
'meta.blockHeight': -1,
'meta.blockOrder': -1
}).limit(1).toArray((err, result) => {
if(err) {
return callback(err);
}
if(result.length === 0) {
return callback(new BedrockError(
'The latest ledger configuration was not found.',
'NotFoundError', {httpStatusCode: 404, public: true}));
}
callback(null, result[0]);
});
}
/**
* Update an existing event associated with the ledger given an
* eventId, an array of patch instructions, and a set of options.
*
* eventHash - the ID of the event to update
* patch - a list of patch commands for the event
* callback(err, result) - the callback to call when finished.
* err - An Error if an error occurred, null otherwise.
* result - the value of the updated event.
*/
update({eventHash, patch}, callback) {
if(!Array.isArray(patch)) {
throw new TypeError('patch must be an array');
}
async.auto({
buildUpdate: callback => {
const setObject = {};
const unsetObject = {};
const pushFields = {};
const pullFields = {};
async.eachSeries(patch, (operation, callback) => {
// ensure that only meta fields are modified
const opLength = Object.keys(operation.changes).length;
if(opLength !== 1 ||
(opLength === 1 && operation.changes.meta === undefined)) {
return callback(new BedrockError(
'Only event meta can be updated.',
'Forbidden', {operation: operation}
));
}
// process set, unset, add, and remove operations
if(operation.op === 'set') {
_.extend(setObject, operation.changes);
}
else if(operation.op === 'unset') {
_.extend(unsetObject, operation.changes);
}
else if(operation.op === 'add') {
for(const key in operation.changes) {
const arrayUpdate = database.buildUpdate(operation.changes);
const field = Object.keys(arrayUpdate)[0];
if(field in pushFields) {
pushFields[field].$each.push(arrayUpdate[field]);
} else {
pushFields[field] = {$each: [arrayUpdate[field]]};
}
}
} else if(operation.op === 'remove') {
for(const key in operation.changes) {
const arrayUpdate = database.buildUpdate(operation.changes);
const field = Object.keys(arrayUpdate)[0];
if(field in pullFields) {
pullFields[field].push(arrayUpdate[field]);
} else {
pullFields[field] = [arrayUpdate[field]];
}
}
}
callback();
}, err => {
if(err) {
return callback(err);
}
// build the update object for MongoDB
const update = {};
const setFields = database.buildUpdate(setObject);
const unsetFields = database.buildUpdate(unsetObject);
if(Object.keys(setFields).length > 0) {
update.$set = setFields;
}
if(Object.keys(unsetFields).length > 0) {
update.$unset = unsetFields;
}
if(Object.keys(pushFields).length > 0) {
update.$addToSet = pushFields;
}
if(Object.keys(pullFields).length > 0) {
update.$pullAll = pullFields;
}
callback(null, update);
});
},
update: ['buildUpdate', (results, callback) => this.collection.update(
{eventHash: database.hash(eventHash)},
results.buildUpdate, database.writeOptions, callback)],
checkUpdate: ['update', (results, callback) => {
if(results.update.result.n === 0) {
return callback(new BedrockError(
'Could not update event. Event with given hash not found.',
'NotFoundError', {eventHash}));
}
callback();
}]
}, err => callback(err));
}
/**
* Delete an event associated with the ledger given an event hash and a
* set of options.
*
* eventHash - the hash of the event to delete.
* options - a set of options used when deleting the event.
* callback(err) - the callback to call when finished.
* err - An Error if an error occurred, null otherwise.
*/
remove(eventHash, callback) {
async.auto({
update: callback => {
// find and delete the existing event
const filter = {
eventHash: database.hash(eventHash)
};
const now = Date.now();
const update = {
$set: {
meta: {
updated: now,
deleted: now
}
}
};
this.collection.updateOne(filter, update, callback);
},
ensureUpdate: ['update', (results, callback) => {
if(results.update.matchedCount !== 1) {
return callback(new BedrockError(
'Delete of event failed.', 'NotFoundError', {eventHash}
));
}
callback();
}]
}, err => callback(err));
}
};