/*!
* Ledger block storage class.
*
* Copyright (c) 2017-2018 Digital Bazaar, Inc. All rights reserved.
*/
'use strict';
const _ = require('lodash');
const assert = require('assert-plus');
const async = require('async');
const bedrock = require('bedrock');
const database = require('bedrock-mongodb');
const logger = require('./logger');
const {BedrockError} = bedrock.util;
/**
* The blocks API is used to perform operations on blocks associated with a
* particular ledger.
*/
module.exports = class LedgerBlockStorage {
constructor({blockCollection, eventCollection, eventStorage, ledgerNodeId}) {
// assign the collection used for block storage
this.collection = blockCollection;
// assign the collection used for events storage
this.eventCollection = eventCollection;
// event storage API
this.eventStorage = eventStorage;
this.ledgerNodeId = ledgerNodeId;
this.plugins = {};
// expose utils that can be used in storage plugins
this.util = {
assert,
dbHash: database.hash,
logger,
BedrockError,
};
}
/**
* Adds a block in the ledger given a block, metadata associated with the
* block.
*
* @param block - the block to create in the ledger.
* blockHeight - the height of the block.
* event - an array of events associated with the block.
* @param emit - when true (default), a bedrock event is emitted to inform
* listeners that a block has been added.
* @param meta - the metadata associated with the block.
* blockHash - the hash value of the block.
* @param callback(err) - the callback to call when finished.
* err - An Error if an error occurred, null otherwise.
* result - the result of the operation.
* block - the block that was committed to storage.
* meta - the metadata that was committed to storage.
*/
add({block, emit = true, meta}, callback) {
// check block
if(!(block && Number.isInteger(block.blockHeight) && block.event)) {
throw new TypeError(
'`block.blockHeight` and `block.event` are required.');
}
if(!(meta && meta.blockHash)) {
throw new TypeError('`meta.blockHash` is required.');
}
const {blockHeight, event} = block;
// drop `event` from the block without mutating or cloning
const _block = _.pickBy(block, (v, k) => k !== 'event');
async.auto({
checkEvent: callback => {
const dHashes = event.map(eventHash => database.hash(eventHash));
// NOTE: `meta.consensus` and `meta.consensusDate` are to be managed
// by consensus algorithms and therefore are not validated here
const query = {
eventHash: {$in: dHashes},
'meta.blockHeight': blockHeight,
'meta.blockOrder': {$exists: true}
};
this.eventCollection.count(query, (err, result) => {
if(err) {
return callback(err);
}
// failure
if(result !== event.length) {
return callback(new BedrockError(
'Some events have not been properly assigned to the block.',
'InvalidStateError', {block: _block, event}));
}
// success
callback();
});
},
insert: ['checkEvent', (results, callback) => {
// insert the block
const now = Date.now();
const record = {
block: _block,
blockHash: database.hash(meta.blockHash),
id: database.hash(_block.id),
meta: _.defaults(meta, {
created: now,
updated: now
}),
};
logger.debug(`adding block: ${meta.blockHash}`);
this.collection.insert(record, database.writeOptions, (err, result) => {
if(err) {
return callback(err);
}
callback(null, result.ops[0]);
});
}],
emit: ['insert', (results, callback) => {
if(!emit) {
return callback();
}
bedrock.events.emit('bedrock-ledger-storage.block.add', {
blockHeight, ledgerNodeId: this.ledgerNodeId
}, callback);
}]
}, (err, results) => {
if(err) {
if(database.isDuplicateError(err)) {
return callback(new BedrockError(
'A block with the same hash already exists.',
'DuplicateError', {blockHash: meta.blockHash}, err));
}
return callback(err);
}
callback(null, {block: results.insert.block, meta: results.insert.meta});
});
}
/**
* Gets the block that has consensus given a blockId.
*
* @param blockId - the identifier of the block that has consensus.
* @param [consensus] `false` to retrieve a non-consensus block instead.
* @param callback(err, block) - the callback to call when finished.
* err - An Error if an error occurred, null otherwise.
* block - the block with the given ID that has consensus.
*/
get({blockId, consensus = true}, callback) {
async.auto({
find: callback => {
// find an existing block with consensus
const query = {
id: database.hash(blockId),
'meta.deleted': {
$exists: false
},
'meta.consensus': {
$exists: true
}
};
if(!consensus) {
query['meta.consensus'].$exists = false;
}
this.collection.findOne(query, callback);
},
expandEvents: ['find', (results, callback) => {
if(!results.find) {
return callback(new BedrockError(
'A block with the given ID does not exist.',
'NotFoundError', {blockId}));
}
// _expandEvents mutates the parameter
this._expandEvents(results.find.block, callback);
}]
}, (err, results) => {
if(err) {
return callback(err);
}
// _expandEvents mutates the parameter
callback(null, {block: results.find.block, meta: results.find.meta});
});
}
// FIXME: this API is not used anywhere if if it should be kept, it needs
// to be updated to get eventhashes etc.
/**
* Gets the block summary for consensus block given a blockId.
*
* @param blockId - the identifier of the block that has consensus.
* @param [consensus] `false` to retrieve a summary for a non-consensus
* block instead.
* @param [eventHash] `true` to get all event hashes from `event`.
* @param callback(err, block) - the callback to call when finished.
* err - An Error if an error occurred, null otherwise.
* block - the block summary for the given ID that has consensus.
*/
getSummary({blockId, consensus = true, eventHash}, callback) {
async.auto({
find: callback => {
const query = {
id: database.hash(blockId),
'meta.deleted': {
$exists: false
},
'meta.consensus': {
$exists: true
}
};
if(consensus === false) {
query['meta.consensus'].$exists = false;
}
const projection = {};
this.collection.findOne(query, projection, (err, record) => {
if(err) {
return callback(err);
}
if(!record) {
return callback(new BedrockError(
'A block with the given ID does not exist.',
'NotFoundError', {blockId}));
}
return callback(null, record);
});
},
eventHashes: ['find', (results, callback) => {
if(!eventHash) {
return callback();
}
// TODO: make code DRY
// FIXME: this might be accomplished with aggregate query
// get event hashes
const block = results.find.block;
const query = {'meta.blockHeight': block.blockHeight};
const projection = {_id: 0, 'meta.eventHash': 1};
this.eventCollection.find(query, projection)
.sort({'meta.blockOrder': 1})
.toArray((err, eventHashes) => {
if(err) {
return callback(err);
}
block.eventHash = eventHashes.map(r => r.meta.eventHash);
callback();
});
}]
}, (err, results) => {
if(err) {
return callback(err);
}
callback(
null, {block: results.find.block, meta: results.find.meta});
});
}
/**
* Gets a block that has consensus given a blockHeight.
*
* @param blockHeight - the height of the block that has consensus.
* @param callback(err, block) - the callback to call when finished.
* err - An Error if an error occurred, null otherwise.
* block - the block with the given block height that has consensus.
*/
getByHeight(blockHeight, callback) {
async.auto({
find: callback => {
// find an existing block with consensus
const query = {
'block.blockHeight': blockHeight,
'meta.deleted': {
$exists: false
},
'meta.consensus': {
$exists: true
}
};
this.collection.findOne(query, callback);
},
expandEvents: ['find', (results, callback) => {
if(!results.find) {
return callback(new BedrockError(
'A block with the given `blockHeight` does not exist.',
'NotFoundError', {blockHeight}));
}
this._expandEvents(results.find.block, callback);
}]
}, (err, results) => {
if(err) {
return callback(err);
}
callback(null, {block: results.find.block, meta: results.find.meta});
});
}
/**
* Gets the block summary for consensus block given a blockHeight.
*
* @param blockHeight - the height of the block that has consensus.
* @param [consensus] `false` to retrieve a summary for a non-consensus
* @param [eventHash] `true` to get all event hashes from `event`.
* @param callback(err, block) - the callback to call when finished.
* err - An Error if an error occurred, null otherwise.
* block - the block summary for the given ID that has consensus.
*/
getSummaryByHeight(
{blockHeight, consensus = true, eventHash = false}, callback) {
async.auto({
find: callback => {
const query = {
'block.blockHeight': blockHeight,
'meta.deleted': {
$exists: false
},
'meta.consensus': {
$exists: true
}
};
if(consensus === false) {
query['meta.consensus'].$exists = false;
}
const projection = {};
this.collection.findOne(query, projection, (err, record) => {
if(err) {
return callback(err);
}
if(!record) {
return callback(new BedrockError(
'A block with the given block height does not exist.',
'NotFoundError', {blockHeight}));
}
return callback(null, record);
});
},
eventHashes: ['find', (results, callback) => {
if(!eventHash) {
return callback();
}
// TODO: make code DRY
// FIXME: this might be accomplished with aggregate query
// get event hashes
const block = results.find.block;
const query = {'meta.blockHeight': block.blockHeight};
const projection = {_id: 0, 'meta.eventHash': 1};
this.eventCollection.find(query, projection)
.sort({'meta.blockOrder': 1})
.toArray((err, eventHashes) => {
if(err) {
return callback(err);
}
block.eventHash = eventHashes.map(r => r.meta.eventHash);
callback();
});
}]
}, (err, results) => {
if(err) {
return callback(err);
}
callback(
null, {block: results.find.block, meta: results.find.meta});
});
}
/**
* Gets all blocks matching a given blockId even if they have not
* achieved consensus.
*
* @param blockId - the identifier of the block(s) to fetch from the ledger.
* @param callback(err, iterator) - the callback to call when finished.
* err - An Error if an error occurred, null otherwise.
* iterator - an iterator for all of the returned blocks.
*/
getAll(blockId, callback) {
async.auto({
find: callback => {
// find an existing block
const query = {
id: database.hash(blockId),
'meta.deleted': {
$exists: false
}
};
const cursor = this.collection.find(query);
callback(null, cursor);
},
hasNext: ['find', (results, callback) => {
// check to see if there are any results
results.find.hasNext().then(hasNext => {
callback(null, hasNext);
});
}]
}, (err, results) => {
if(err) {
return callback(err);
}
// create a block iterator
const iterator = {
done: !results.hasNext
};
iterator.next = () => {
if(iterator.done) {
return {done: true};
}
const cursor = results.find;
const promise = cursor.next().then(record => {
// ensure iterator will have something to iterate over next
return cursor.hasNext().then(hasNext => {
iterator.done = !hasNext;
// TODO: expand events in block
return {
block: record.block,
meta: record.meta
};
});
}).catch(err => {
iterator.done = true;
throw err;
});
return {value: promise, done: iterator.done};
};
iterator[Symbol.iterator] = () => {
return iterator;
};
callback(null, iterator);
});
}
/**
* Retrieves the genesis block from the ledger.
*
* @param callback(err, result) - the callback to call when finished.
* err - An Error if an error occurred, null otherwise.
* result - the result with the genesis block.
* genesisBlock - the genesis block and its meta.
*/
getGenesis(callback) {
// find the genesis block with consensus
const query = {
'block.previousBlock': {$exists: false},
'block.previousBlockHash': {$exists: false},
'meta.deleted': {$exists: false},
'meta.consensus': {$exists: true}
};
this.collection.findOne(query, {block: 1, meta: 1}, (err, record) => {
if(err) {
return callback(err);
}
if(!record) {
return callback(new BedrockError(
'The genesis block does not exist.',
'NotFoundError'));
}
this._expandEvents(record.block, err => {
if(err) {
return callback(err);
}
// NOTE: _expandEvents mutates record.block
callback(null, {
genesisBlock: {
block: record.block,
meta: record.meta
}
});
});
});
}
/**
* Retrieves the latest block from the ledger.
*
* @param callback(err, result) - the callback to call when finished.
* err - An Error if an error occurred, null otherwise.
* result - the block.
* eventBlock - the latest events block and meta.
*/
getLatest(callback) {
async.auto({
block: callback => {
// find the latest config block with consensus
const query = {
'block.type': 'WebLedgerEventBlock',
'meta.deleted': {$exists: false},
'meta.consensus': {$exists: true}
};
const projection = {_id: 0};
const sort = {'block.blockHeight': -1};
this.collection.find(query, projection).sort(sort).limit(1)
.toArray(callback);
},
expandEvents: ['block', (results, callback) => {
if(results.block.length === 0) {
return callback();
}
// _expandEvents mutates the event array in the block
this._expandEvents(results.block[0].block, callback);
}]
}, (err, results) => {
if(err) {
return callback(err);
}
const eventBlock = results.block.length === 1 ? results.block[0] : {};
callback(null, {eventBlock});
});
}
/**
* Retrieves a summary of the latest block from the ledger.
*
* @param callback(err, result) - the callback to call when finished.
* err - An Error if an error occurred, null otherwise.
* result - the block.
* eventBlock - the latest events block summary.
*/
getLatestSummary(callback) {
// find the latest config block with consensus
const query = {
'block.type': 'WebLedgerEventBlock',
'meta.deleted': {$exists: false},
'meta.consensusDate': {$exists: true}
};
const projection = {
_id: 0,
'block.@context': 1,
'block.id': 1,
'block.blockHeight': 1,
'block.consensusMethod': 1,
'block.type': 1,
'block.previousBlock': 1,
'block.previousBlockHash': 1,
meta: 1
};
const sort = {'block.blockHeight': -1};
this.collection.find(query, projection).sort(sort).limit(1)
.toArray((err, result) => {
if(err) {
return callback(err);
}
const eventBlock = result.length === 1 ? result[0] : {};
callback(null, {eventBlock});
});
}
/**
* Update an existing block in the ledger given a block hash, an array of
* patch instructions, and a set of options.
*
* @param blockHash - the hash of the block to update.
* @param patch - the patch instructions to execute on the block.
* @param callback(err) - the callback to call when finished.
* err - An Error if an error occurred, null otherwise.
*/
update({blockHash, patch}, callback) {
if(!Array.isArray(patch)) {
throw new TypeError('patch must be an array');
}
async.auto({
buildUpdate: callback => {
const setObject = {};
const unsetObject = {};
const pushFields = {};
const pullFields = {};
async.eachSeries(patch, (operation, callback) => {
// ensure that only meta fields are modified
const opLength = Object.keys(operation.changes).length;
if(opLength !== 1 ||
(opLength === 1 && operation.changes.meta === undefined)) {
return callback(new BedrockError(
'Only block meta can be updated.',
'Forbidden', {operation: operation}
));
}
// process set, unset, add, and remove operations
if(operation.op === 'set') {
_.extend(setObject, operation.changes);
}
else if(operation.op === 'unset') {
_.extend(unsetObject, operation.changes);
}
else if(operation.op === 'add') {
for(const key in operation.changes) {
const arrayUpdate = database.buildUpdate(operation.changes);
const field = Object.keys(arrayUpdate)[0];
if(field in pushFields) {
pushFields[field].$each.push(arrayUpdate[field]);
} else {
pushFields[field] = {$each: [arrayUpdate[field]]};
}
}
} else if(operation.op === 'remove') {
for(const key in operation.changes) {
const arrayUpdate = database.buildUpdate(operation.changes);
const field = Object.keys(arrayUpdate)[0];
if(field in pullFields) {
pullFields[field].push(arrayUpdate[field]);
} else {
pullFields[field] = [arrayUpdate[field]];
}
}
}
callback();
}, err => {
if(err) {
return callback(err);
}
// build the update object for MongoDB
const update = {};
const setFields = database.buildUpdate(setObject);
const unsetFields = database.buildUpdate(unsetObject);
if(Object.keys(setFields).length > 0) {
update.$set = setFields;
}
if(Object.keys(unsetFields).length > 0) {
update.$unset = unsetFields;
}
if(Object.keys(pushFields).length > 0) {
update.$addToSet = pushFields;
}
if(Object.keys(pullFields).length > 0) {
update.$pullAll = pullFields;
}
callback(null, update);
});
},
update: ['buildUpdate', (results, callback) => {
this.collection.update(
{'meta.blockHash': blockHash}, results.buildUpdate,
database.writeOptions, callback);
}],
checkUpdate: ['update', (results, callback) => {
if(results.update.result.n === 0) {
return callback(new BedrockError(
'Could not update block. Block with given hash not found.',
'NotFoundError', {blockHash: blockHash}));
}
callback();
}]
}, err => callback(err));
}
/**
* Delete a block in the ledger given a block hash and a set of options.
*
* @param blockHash - the hash of the block to delete.
* @param callback(err) - the callback to call when finished.
* err - An Error if an error occurred, null otherwise.
*/
remove(blockHash, callback) {
async.auto({
update: callback => {
// find and delete the existing block
const filter = {
blockHash: database.hash(blockHash)
};
const now = Date.now();
const update = {
$set: {
meta: {
updated: now,
deleted: now
}
}
};
this.collection.updateOne(filter, update, callback);
},
ensureUpdate: ['update', (results, callback) => {
if(results.update.matchedCount !== 1) {
return callback(new BedrockError(
'Delete of block failed.', 'NotFoundError', {blockHash}
));
}
callback();
}]
}, callback);
}
// FIXME: this might be accomplished with aggregate query
_expandEvents(block, callback) {
block.event = [];
// TODO: make code DRY
const query = {'meta.blockHeight': block.blockHeight};
const projection = {_id: 0, 'meta.eventHash': 1};
this.eventCollection.find(query, projection)
.sort({'meta.blockOrder': 1})
.toArray((err, eventHashes) => {
if(err) {
return callback(err);
}
eventHashes = eventHashes.map(r => r.meta.eventHash);
// NOTE: getMany preserves order of hashes
this.eventStorage.getMany({eventHashes}).forEach(({event}) => {
if(event.type !== 'WebLedgerOperationEvent') {
// must strip operation
delete event.operation;
}
block.event.push(event);
}, callback);
});
}
};