Manual Reference Source

app/modules/entities/crud/odm.js

// @flow
const _ = require('lodash');
const Search = require('./search');
const Mapping = require('./mapping');
const Utils = require('../../utils/utils');

/**
 * Object Data Model
 * Generic class to model an entity in the program (saved into an ElasticSearch DB).
 *
 * @public
 *
 *
 */
class ODM {
    _client: Object;
    _index: string;
    _type: string;
    _db: Object;
    _model: Object;
    _id: ?string;

    /**
     * @param client: ElasticSearch client;
     * @param id: id of the entity (could be null)
     */
    constructor(index: string, type: string, client: Object,
        model: Object, id: ?string) {
        this._client = client;
        this._id = id;
        this._db = {};
        this._index = index;
        this._type = type;
        this._model = model;
    }

    /**
     * @public
     * @return the underlying model of this entity
     * @see entity models for more details
     */
    get model(): Object {
        return this._model;
    }

    /**
     * @public
     * @return the underlying ElasticSearch mapping of this entity
     */
    get mapping(): Object {
        return this._model.Mapping;
    }

    /**
     * @public
     * @return the underlying ElasticSearch index of this entity
     */
    get index(): string {
        return this._index;
    }

    /**
     * @public
     * @return the underlying ElasticSearch type of this entity
     */
    get type(): string {
        return this._type;
    }

    get source(): Object {
        return this.db.source || {};
    }

    /**
     * Process an entity and create an {ODM} object.
     *
     * @public
     * @param hit: hit in ElasticSearch result
     * @param found: true if the object was found in DB, false otherwise
     * @return content of the hit
     */
    static format_hit(hit: Object, found: boolean = true): Object {
        if (hit == null) {
            return {};
        }

        let score = 0;
        const index = hit._index;
        const type = hit._type;
        const id = hit._id;
        const source = '_source' in hit ? hit._source : {};
        const sort = 'sort' in hit ? hit.sort : [];
        if ('score' in hit) {
            score = hit._score;
        }

        source._id = id;

        return {
            id,
            type,
            index,
            score,
            source,
            sort,
            found,
        };
    }

    /**
     * Get name of the class (= entity)
     * @public
     * @return name of the class
     */
    get name(): string {
        return this.constructor.name.toLowerCase();
    }

    /**
     * Get id of an entity (possibly null)
     * @public
     * @return id
     */
    get id(): ?string {
        return this._id;
    }

    get messages(): Object {
        return this._model.Messages;
    }

    /**
     * Get all information about an entity;
     * @public
     * @return all interesting information
     */
    get db(): Object {
        return JSON.parse(JSON.stringify(this._db));
    }

    /**
     * Set all interesting information about an entity
     * @public
     * @param o: information
     */
    set db(o: Object) {
        this._db = o;
    }

    static async fetch_mapping(index: string, type: string, client: Object, include_meta: boolean = false) {
        const mapping = await client.indices.getMapping({ index, type });
        if (index in mapping && type in mapping[index].mappings) {
            if (include_meta) {
                return mapping[index].mappings[type];
            }
            return mapping[index].mappings[type].properties;
        }
        return null;
    }

    static async fetch_settings(index: string, type: string, client: Object) {
        const settings = await client.indices.getSettings({ index, type });
        if (settings && index in settings) {
            return settings[index];
        }
        return null;
    }

    static async read(index: string, type: string,
            client: Object, model: Object, response: Object,
            population: Array<String> = [], backward: boolean = false): Object {
        const o = {};

        if ('_scroll_id' in response) {
            o.scroll_id = response._scroll_id;
        }

        if ('count' in response) {
            o.count = response.count;
        }

        if ('took' in response) {
            o.took = response.took;
        }

        if ('timed_out' in response) {
            o.timeout = response.timed_out;
        }

        if ('hits' in response) {
            o.hits = response.hits.hits.map((hit) => {
                const info = this.format_hit(hit);
                const odm = new this(index, type, client, model, info.id);
                odm.db = info;
                return odm;
            });

            if (backward) {
                _.reverse(o.hits);
            }

            await o.hits.reduce((pr, hit) =>
                pr.then(() => hit.post_read_hook(population)), Promise.resolve());

            o.total = response.hits.total;
            o.count = response.hits.total;
            o.max_score = response.hits.max_score;
        }

        if ('aggregations' in response) {
            o.aggs = response.aggregations;
        }

        return o;
    }

    static async search(index: string, type: string, client: Object, model: Object,
            search: Search, opts: Object = {}): Promise<Object> {
        const query = search.generate();
        console.log(JSON.stringify(query));
        const sort = search.sort();
        const aggs = search.aggs();
        const population = 'population' in opts ? opts.population : [];
        const body = {
            from: 'from' in opts ? opts.from : 0,
            size: 'size' in opts ? opts.size : 1000,
            _source: 'source' in opts ? opts.source : true,
            query,
        };


        if (sort != null) {
            body.sort = sort;
            if ('search_after' in opts) {
                body.from = 0;
                body.search_after = opts.search_after;
            } else if ('search_before' in opts) {
                body.sort = body.sort.map(s => _.reduce(s, (obj, value, key) => {
                    obj[key] = value.order === 'asc' ? _.merge({}, value, { order: 'desc' }) : _.merge({}, value, { order: 'asc' });
                    return obj;
                }, {}));

                body.search_after = opts.search_before;
            }
        }

        if (aggs != null) {
            body.aggs = aggs;
        }

        let response = {};
        if ('scroll_id' in opts && 'scroll' in opts) {
            response = await client.scroll({
                scrollId: opts.scroll_id,
                scroll: opts.scroll,
            });
        } else {
            const req = {
                index,
                type,
                body,
            };

            if ('scroll' in opts) {
                req.scroll = opts.scroll;
            }

            response = await client.search(req);
        }

        return this.read(index, type, client, model, response, population, 'search_before' in opts);
    }

    static async count(index: string, type: string, client: Object,
            model: Object, search: Search): Promise<Object> {
        const query = search.generate();
        const response = await client.count({
            index,
            type,
            body: {
                query,
            },
        });
        return this.read(index, type, client, model, response);
    }

    static async deleteByQuery(index: string, type: string, client: Object, search: Search) {
        const query = search.generate();
        await client.deleteByQuery({
            index,
            type,
            refresh: true,
            body: {
                query,
            },
        });
    }

    static async remove(index: string, type: string, client: Object, id: string): Promise<boolean> {
        try {
            const response = await client.delete({
                index,
                type,
                id,
                refresh: true,
            });
            // console.log(response);
            return response.found;
        } catch (err) {
            console.log('remove error', err);
            return false;
        }
    }

    static async _create_or_update(index: string, type: string,
            client: Object, model: Object, body: Object, id: ?string = null): Promise<?ODM> {
        console.log('create or update body', JSON.stringify(body));
        try {
            const content = {
                index,
                type,
                body,
                refresh: true,
            };

            if (id != null) {
                content.id = id;
                const ret = await this.pre_update_hook(index, type, client, model, body, id);
                if (!ret) {
                    return null;
                }
            } else {
                // TODO NEED TO BE REMOVE AFTER DATA IMPORT
                if ('_id' in content.body) {
                    content.id = content.body._id;
                    delete content.body._id;
                }

                const ret = await this.pre_create_hook(index, type, client, model, body);
                if (!ret) {
                    return null;
                }
            }

            const response = await client.index(content);
            if (('created' in response && response.created)
                || ('result' in response && response.result === 'updated')) {
                try {
                    const get_response = await client.get({
                        index,
                        type,
                        id: response._id,
                    });
                    const odm = new this(index, type, client, model, response._id);
                    odm.db = this.format_hit(get_response, get_response.found);
                    if ('created' in response) {
                        await odm.post_create_hook();
                    } else {
                        await odm.post_update_hook();
                    }

                    // console.log(odm);
                    return odm;
                } catch (err) {
                    return null;
                }
            }
            return null;
        } catch (err) {
            console.log('creation or update error', err);
            return null;
        }
    }

    static async _bulk_create_or_update(index: string, type: string, client: Object,
            body: Array<Object>, action: string = 'create'): Promise<?ODM> {
        try {
            const content = {
                index,
                type,
                body: _.flatten(body.map((e) => {
                    if (action === 'create') {
                        return [{ index: {} }, e];
                    } else if (action === 'update') {
                        const _id = e._id;
                        delete e._id;
                        return [{ update: { _id } }, { doc: e }];
                    }
                    return [];
                })),
                refresh: true,
            };

            const response = await client.bulk(content);
            console.log(response);
            return null;
        } catch (err) {
            console.log('bulk creation or update error', err);
            return null;
        }
    }


    async read(opts: Object = {}): Promise<ODM> {
        const population = 'population' in opts ? opts.population : [];
        const source = 'source' in opts ? opts.source : null;

        await this.pre_read_hook(source, population);

        try {
            const response = await this._client.get({
                index: this.index,
                type: this.type,
                id: this._id,
                _source: source,
            });


            this.db = this.constructor.format_hit(response, response.found);
            await this.post_read_hook(population);
        } catch (err) {
            const response = err.body;
            this.db = this.constructor.format_hit(response, response ? response.found : false);
            await this.post_read_hook(population);
        }
        return this;
    }

    static async create(index: string, type: string, client: Object,
            model: Object, body: Object): Promise<?ODM> {
        return this._create_or_update(index, type, client, model, body);
    }

    static async update(index: string, type: string, client: Object,
            model: Object, body: Object, id: string): Promise<?ODM> {
        // console.log('update', JSON.stringify(body));
        return this._create_or_update(index, type, client, model, body, id);
    }

    static async bulk_create(index: string, type: string, client: Object,
            body: Array<Object>): Promise<?ODM> {
        return this._bulk_create_or_update(index, type, client, body, 'create');
    }

    static async bulk_update(index: string, type: string, client: Object,
            body: Array<Object>): Promise<?ODM> {
        return this._bulk_create_or_update(index, type, client, body, 'update');
    }

    oupdate(): Promise<?ODM> {
        return ODM.update(this.index, this.type, this.client, this.model, this.source);
    }

    ocreate(): Promise<?ODM> {
        return ODM.create(this.index, this.type, this.client, this.model, this.source);
    }

    toJSON(): Object {
        return this.db;
    }

    async pre_read_hook() {
        // TODO TBD
    }

    static async pre_create_hook(index: string, type: string,
            client: Object, model: Object, body: Object): Promise<boolean> {
        // To be re-implemented in subclass (if needed)
        return true;
    }

    static async pre_update_hook(index: string, type: string,
            client: Object, model: Object, body: Object, id: string): Promise<boolean> {
        // To be re-implemented in subclass (if needed)
        return true;
    }

    async post_read_hook(population: Array<String>) {
        // To be re-implemented in subclass (if needed)
        await this._handle_population(population);
    }

    async post_create_hook() {
        // To be re-implemented in subclass (if needed)
    }

    async post_update_hook() {
        // To be re-implemented in subclass (if needed)
    }

    async _handle_population(population: Array<String>, propagate_population: boolean = false) {
        const EntitiesUtils = require('../../utils/entities');
        const mapping = await this.constructor.fetch_mapping(this.index, this.type,
                this._client, true);

        if (mapping && !('_meta' in mapping)) {
            return;
        }

        if (!('refs' in mapping._meta)) {
            return;
        }

        const refs = mapping._meta.refs;
        const info = this._db.source;


        for (const p of population) {
            const path = p.split('.');
            const vals = [...Utils.find_popvalue_with_path(info, path.slice(), true)];
            let ref = [...Utils.find_popvalue_with_path(refs, path.slice())];
            if (ref.length > 0 && vals.length > 0) {
                ref = ref[0];
                const last = path[path.length - 1];
                for (const v of vals) {
                    if (ref === 'lang') {
                        if (v[last]) {
                            const result = await EntitiesUtils.search(ref, { where: { key: v[last], size: 250 } });
                            const hits = EntitiesUtils.get_hits(result);
                            v[last] = hits.length > 0 ? hits.map(h => h.source) : [];
                        } else {
                            v[last] = [];
                        }
                    } else {
                        const result = await EntitiesUtils.retrieve(v[last],
                            ref, '', propagate_population ? population.join(',') : '');
                        v[last] = result != null ? result.source : {};
                    }
                }
            }
        }
    }
}

module.exports = ODM;