app/modules/pipeline/pipeline.js
// @flow
const _ = require('lodash');
const Errors = require('../exceptions/errors');
const Validator = require('./validator/validator');
const Completer = require('./completer/completer');
const Transformer = require('./transformer/transformer');
const Formatter = require('./formatter/formatter');
const EntitiesUtils = require('../utils/entities');
const Utils = require('../utils/utils');
const Logger = require('../../logger');
/**
* Completion and validation pipeline
*
* The pipeline works by applying a sequence of actions (represented as Koa's middlewares).
* When an action fails, the middleware returns an error that needs to be handled by the client
* who emitted that request.
*
* The pipeline works the following way:
*
* - For PUT requests (UPDATE):
* 1. Check if entity exists, if not, fails
* 2. Merge JSON body with existing entity
* 3. Apply default fields
* 4. Complete fields
* 5. Format fieds
* 6. Validate fields
* 7. UPDATE or FAIL
*
* - For POST requests (CREATE):
* 1. Start at 2. from the above list
* 7. CREATE or FAIL.
*/
class Pipeline {
/**
* Check if an entity exists in the DB by using its type (publication, user, journal, ...).
*
* @param body - JSON body
* @param type - entity type
* @returns a promise containing a boolean
*/
static async _check_if_entity_exists(body: Object, type: string): Promise<boolean> {
if (body == null) {
return false;
}
const id = body._id;
if (id == null) {
return false;
}
const entity = await EntitiesUtils.retrieve(id, type);
if (entity == null) {
return false;
}
return entity.db.found;
}
/**
* Merge two objects, considering the second one as default values (if key exists
* in the first object, the first object keeps its own values).
*
* @param input - input object
* @param defaults - default object
* @return merged object
*/
static _merge_defaults(input: Object, defaults: Object): Object {
return Utils.merge_with_replacement({}, defaults, input);
}
static _merge_put(input: Object, defaults: Object): Object {
return Utils.merge_with_replacement({}, defaults, input);
}
static async _reset(input: Object, resetters: Object): Object {
for (const path in resetters) {
const segs = path.split('.');
input = await Utils.traverse_and_execute(input, segs, async () => resetters[path]);
}
return input;
}
static _filter(input: Object, filters: Array<String>): Object {
filters.forEach((f) => {
if (f in input) {
delete input[f];
}
});
return input;
}
static _format_range(potential_range, total) {
if (potential_range == null) {
return [];
}
if (potential_range.match(/[0-9]+-[0-9]+/)) {
const range = potential_range.split('-').map(r => parseInt(r, 10));
return _.range(range[0], range[1]).filter(r => (r >= 0 && r < total));
}
const pnum = parseInt(potential_range, 10);
if (isNaN(pnum)) {
return [];
}
return [pnum];
}
static async _evaluate_pipeline(item: Object, extra_info: Object, pipeline: Object,
type: string, action: string, method: string) {
switch (action) {
case 'transform': {
item = await Transformer(item, pipeline.Transforming || []);
return { item, errors: null };
}
case 'filter': {
item = Pipeline._filter(item, pipeline.Filtering || []);
return { item, errors: null };
}
case 'reset': {
item = await Pipeline._reset(item, pipeline.Resetting || {});
return { item, errors: null };
}
case 'defaults': {
item = Pipeline._merge_defaults(item, pipeline.Defaults || {});
return { item, errors: null };
}
case 'format': {
item = await Formatter(item, pipeline.Formatting, extra_info);
return { item, errors: null };
}
case 'complete': {
item = await Completer(item, pipeline.Completion, extra_info);
return { item, errors: null };
}
default:
case 'validate': {
const validator = new Validator();
const errors = await validator
.validate(item, pipeline.Validation, method);
return { item, errors };
}
}
}
static async _papply(item: Object, extra_info: Object, type: string,
pipelines: Array<Object>, action: string, method: string,
range: Array<any>, bulk_mode: boolean): Promise<Object> {
if (action === 'check' && !bulk_mode) {
if (method === 'put') {
const exists = await Pipeline._check_if_entity_exists(item, type);
if (!exists) {
return Errors.InvalidEntity;
}
}
return item;
} else if (action === 'merge' && !bulk_mode) {
if (method === 'put') {
const entity = await EntitiesUtils.retrieve(item._id, type);
if (!entity) {
return Errors.InvalidEntity;
}
item = Pipeline._merge_put(item, entity.source);
}
return item;
}
let errors = {};
let info = { errors: null, item };
const prange = range.length === 0 ? _.range(0, pipelines.length) : range;
for (const i of prange) {
const pipeline = pipelines[i];
info = await Pipeline._evaluate_pipeline(info.item, extra_info, pipeline,
type, action, method);
if (action === 'validate') {
errors = Utils.merge_with_concat({}, errors, info.errors);
}
}
if (Object.keys(errors).length > 0) {
return { change: 'Validation', errors };
}
return info.item;
}
/**
* Dispatcher to apply each part of the pipeline
*
* @param type - Entity type
* @param m - Part of the pipeline to apply
* @returns Koa middleware
*/
static _action(type: string, m: string, bulk_mode: boolean): Function {
return async function afunc(ctx: Object, next: Function): Promise<*> {
let items = ctx.request.body;
const method = ctx.request.method.toLowerCase();
const model = ctx.__md.model;
const pipelines = model.Pipelines || [];
const range = Pipeline._format_range(ctx.params.range, pipelines.length);
if (!(items instanceof Array)) {
items = [items];
}
const promises = items.map(item => Pipeline._papply(item, ctx.__md || {},
type, pipelines, m, method, range, bulk_mode));
try {
if (!bulk_mode) {
const ret = await promises[0];
if ('change' in ret) {
ctx.body = ret;
} else {
ctx.request.body = ret;
await next();
}
return;
}
ctx.body = await promises;
} catch (err) {
Logger.error('Error when processing pipelines');
Logger.error(err);
throw Errors.UnableToProcessPipelines;
}
};
}
static memoize_model(type: string): Function {
return async function afunc(ctx: Object, next: Function): Promise<*> {
const model = await EntitiesUtils.get_model_from_type(type);
if ('__md' in ctx) {
ctx.__md.model = model;
} else {
ctx.__md = { model };
}
await next();
};
}
/**
* Invoke the dispatcher to format the input
*
* @param type - Entity type;
* @returns Koa middleware
*/
static format(type: string, bulk_mode: boolean = false): Function {
return Pipeline._action(type, 'format', bulk_mode);
}
/**
* Invoke the dispatcher to validate the input
*
* @param type - Entity type;
* @returns Koa middleware
*/
static validate(type: string, bulk_mode: boolean = false): Function {
return Pipeline._action(type, 'validate', bulk_mode);
}
/**
* Invoke the dispatcher to complete the input
*
* @param type - Entity type;
* @returns Koa middleware
*/
static complete(type: string, bulk_mode: boolean = false): Function {
return Pipeline._action(type, 'complete', bulk_mode);
}
/**
* Invoke the dispatcher to transform the input
*
* @param type - Entity type;
* @returns Koa middleware
*/
static transform(type: string, bulk_mode: boolean = false): Function {
return Pipeline._action(type, 'transform', bulk_mode);
}
/**
* Invoke the dispatcher to filter the input
*
* @param type - Entity type;
* @returns Koa middleware
*/
static filter(type: string, bulk_mode: boolean = false): Function {
return Pipeline._action(type, 'filter', bulk_mode);
}
/**
* Invoke the dispatcher to reset the input
*
* @param type - Entity type;
* @returns Koa middleware
*/
static reset(type: string, bulk_mode: boolean = false): Function {
return Pipeline._action(type, 'reset', bulk_mode);
}
/**
* Invoke the dispatcher to merge previous entity with new input
*
* @param type - Entity type;
* @returns Koa middleware
*/
static merge(type: string, bulk_mode: boolean = false): Function {
return Pipeline._action(type, 'merge', bulk_mode);
}
/**
* Invoke the dispatcher to include default values in the input
*
* @param type - Entity type;
* @returns Koa middleware
*/
static defaults(type: string, bulk_mode: boolean = false): Function {
return Pipeline._action(type, 'defaults', bulk_mode);
}
/**
* Invoke the dispatcher to check if the entity exists
*
* @param type - Entity type;
* @returns Koa middleware
*/
static check(type: string, bulk_mode: boolean = false): Function {
return Pipeline._action(type, 'check', bulk_mode);
}
}
module.exports = Pipeline;