app/modules/entities/crud/odm.js
// @flow
const _ = require('lodash');
const Search = require('./search');
const Mapping = require('./mapping');
const Utils = require('../../utils/utils');
/**
* Object Data Model
* Generic class to model an entity in the program (saved into an ElasticSearch DB).
*
* @public
*
*
*/
class ODM {
_client: Object;
_index: string;
_type: string;
_db: Object;
_model: Object;
_id: ?string;
/**
* @param client: ElasticSearch client;
* @param id: id of the entity (could be null)
*/
constructor(index: string, type: string, client: Object,
model: Object, id: ?string) {
this._client = client;
this._id = id;
this._db = {};
this._index = index;
this._type = type;
this._model = model;
}
/**
* @public
* @return the underlying model of this entity
* @see entity models for more details
*/
get model(): Object {
return this._model;
}
/**
* @public
* @return the underlying ElasticSearch mapping of this entity
*/
get mapping(): Object {
return this._model.Mapping;
}
/**
* @public
* @return the underlying ElasticSearch index of this entity
*/
get index(): string {
return this._index;
}
/**
* @public
* @return the underlying ElasticSearch type of this entity
*/
get type(): string {
return this._type;
}
get source(): Object {
return this.db.source || {};
}
/**
* Process an entity and create an {ODM} object.
*
* @public
* @param hit: hit in ElasticSearch result
* @param found: true if the object was found in DB, false otherwise
* @return content of the hit
*/
static format_hit(hit: Object, found: boolean = true): Object {
if (hit == null) {
return {};
}
let score = 0;
const index = hit._index;
const type = hit._type;
const id = hit._id;
const source = '_source' in hit ? hit._source : {};
const sort = 'sort' in hit ? hit.sort : [];
if ('score' in hit) {
score = hit._score;
}
source._id = id;
return {
id,
type,
index,
score,
source,
sort,
found,
};
}
/**
* Get name of the class (= entity)
* @public
* @return name of the class
*/
get name(): string {
return this.constructor.name.toLowerCase();
}
/**
* Get id of an entity (possibly null)
* @public
* @return id
*/
get id(): ?string {
return this._id;
}
get messages(): Object {
return this._model.Messages;
}
/**
* Get all information about an entity;
* @public
* @return all interesting information
*/
get db(): Object {
return JSON.parse(JSON.stringify(this._db));
}
/**
* Set all interesting information about an entity
* @public
* @param o: information
*/
set db(o: Object) {
this._db = o;
}
static async fetch_mapping(index: string, type: string, client: Object, include_meta: boolean = false) {
const mapping = await client.indices.getMapping({ index, type });
if (index in mapping && type in mapping[index].mappings) {
if (include_meta) {
return mapping[index].mappings[type];
}
return mapping[index].mappings[type].properties;
}
return null;
}
static async fetch_settings(index: string, type: string, client: Object) {
const settings = await client.indices.getSettings({ index, type });
if (settings && index in settings) {
return settings[index];
}
return null;
}
static async read(index: string, type: string,
client: Object, model: Object, response: Object,
population: Array<String> = [], backward: boolean = false): Object {
const o = {};
if ('_scroll_id' in response) {
o.scroll_id = response._scroll_id;
}
if ('count' in response) {
o.count = response.count;
}
if ('took' in response) {
o.took = response.took;
}
if ('timed_out' in response) {
o.timeout = response.timed_out;
}
if ('hits' in response) {
o.hits = response.hits.hits.map((hit) => {
const info = this.format_hit(hit);
const odm = new this(index, type, client, model, info.id);
odm.db = info;
return odm;
});
if (backward) {
_.reverse(o.hits);
}
await o.hits.reduce((pr, hit) =>
pr.then(() => hit.post_read_hook(population)), Promise.resolve());
o.total = response.hits.total;
o.count = response.hits.total;
o.max_score = response.hits.max_score;
}
if ('aggregations' in response) {
o.aggs = response.aggregations;
}
return o;
}
static async search(index: string, type: string, client: Object, model: Object,
search: Search, opts: Object = {}): Promise<Object> {
const query = search.generate();
console.log(JSON.stringify(query));
const sort = search.sort();
const aggs = search.aggs();
const population = 'population' in opts ? opts.population : [];
const body = {
from: 'from' in opts ? opts.from : 0,
size: 'size' in opts ? opts.size : 1000,
_source: 'source' in opts ? opts.source : true,
query,
};
if (sort != null) {
body.sort = sort;
if ('search_after' in opts) {
body.from = 0;
body.search_after = opts.search_after;
} else if ('search_before' in opts) {
body.sort = body.sort.map(s => _.reduce(s, (obj, value, key) => {
obj[key] = value.order === 'asc' ? _.merge({}, value, { order: 'desc' }) : _.merge({}, value, { order: 'asc' });
return obj;
}, {}));
body.search_after = opts.search_before;
}
}
if (aggs != null) {
body.aggs = aggs;
}
let response = {};
if ('scroll_id' in opts && 'scroll' in opts) {
response = await client.scroll({
scrollId: opts.scroll_id,
scroll: opts.scroll,
});
} else {
const req = {
index,
type,
body,
};
if ('scroll' in opts) {
req.scroll = opts.scroll;
}
response = await client.search(req);
}
return this.read(index, type, client, model, response, population, 'search_before' in opts);
}
static async count(index: string, type: string, client: Object,
model: Object, search: Search): Promise<Object> {
const query = search.generate();
const response = await client.count({
index,
type,
body: {
query,
},
});
return this.read(index, type, client, model, response);
}
static async deleteByQuery(index: string, type: string, client: Object, search: Search) {
const query = search.generate();
await client.deleteByQuery({
index,
type,
refresh: true,
body: {
query,
},
});
}
static async remove(index: string, type: string, client: Object, id: string): Promise<boolean> {
try {
const response = await client.delete({
index,
type,
id,
refresh: true,
});
// console.log(response);
return response.found;
} catch (err) {
console.log('remove error', err);
return false;
}
}
static async _create_or_update(index: string, type: string,
client: Object, model: Object, body: Object, id: ?string = null): Promise<?ODM> {
console.log('create or update body', JSON.stringify(body));
try {
const content = {
index,
type,
body,
refresh: true,
};
if (id != null) {
content.id = id;
const ret = await this.pre_update_hook(index, type, client, model, body, id);
if (!ret) {
return null;
}
} else {
// TODO NEED TO BE REMOVE AFTER DATA IMPORT
if ('_id' in content.body) {
content.id = content.body._id;
delete content.body._id;
}
const ret = await this.pre_create_hook(index, type, client, model, body);
if (!ret) {
return null;
}
}
const response = await client.index(content);
if (('created' in response && response.created)
|| ('result' in response && response.result === 'updated')) {
try {
const get_response = await client.get({
index,
type,
id: response._id,
});
const odm = new this(index, type, client, model, response._id);
odm.db = this.format_hit(get_response, get_response.found);
if ('created' in response) {
await odm.post_create_hook();
} else {
await odm.post_update_hook();
}
// console.log(odm);
return odm;
} catch (err) {
return null;
}
}
return null;
} catch (err) {
console.log('creation or update error', err);
return null;
}
}
static async _bulk_create_or_update(index: string, type: string, client: Object,
body: Array<Object>, action: string = 'create'): Promise<?ODM> {
try {
const content = {
index,
type,
body: _.flatten(body.map((e) => {
if (action === 'create') {
return [{ index: {} }, e];
} else if (action === 'update') {
const _id = e._id;
delete e._id;
return [{ update: { _id } }, { doc: e }];
}
return [];
})),
refresh: true,
};
const response = await client.bulk(content);
console.log(response);
return null;
} catch (err) {
console.log('bulk creation or update error', err);
return null;
}
}
async read(opts: Object = {}): Promise<ODM> {
const population = 'population' in opts ? opts.population : [];
const source = 'source' in opts ? opts.source : null;
await this.pre_read_hook(source, population);
try {
const response = await this._client.get({
index: this.index,
type: this.type,
id: this._id,
_source: source,
});
this.db = this.constructor.format_hit(response, response.found);
await this.post_read_hook(population);
} catch (err) {
const response = err.body;
this.db = this.constructor.format_hit(response, response ? response.found : false);
await this.post_read_hook(population);
}
return this;
}
static async create(index: string, type: string, client: Object,
model: Object, body: Object): Promise<?ODM> {
return this._create_or_update(index, type, client, model, body);
}
static async update(index: string, type: string, client: Object,
model: Object, body: Object, id: string): Promise<?ODM> {
// console.log('update', JSON.stringify(body));
return this._create_or_update(index, type, client, model, body, id);
}
static async bulk_create(index: string, type: string, client: Object,
body: Array<Object>): Promise<?ODM> {
return this._bulk_create_or_update(index, type, client, body, 'create');
}
static async bulk_update(index: string, type: string, client: Object,
body: Array<Object>): Promise<?ODM> {
return this._bulk_create_or_update(index, type, client, body, 'update');
}
oupdate(): Promise<?ODM> {
return ODM.update(this.index, this.type, this.client, this.model, this.source);
}
ocreate(): Promise<?ODM> {
return ODM.create(this.index, this.type, this.client, this.model, this.source);
}
toJSON(): Object {
return this.db;
}
async pre_read_hook() {
// TODO TBD
}
static async pre_create_hook(index: string, type: string,
client: Object, model: Object, body: Object): Promise<boolean> {
// To be re-implemented in subclass (if needed)
return true;
}
static async pre_update_hook(index: string, type: string,
client: Object, model: Object, body: Object, id: string): Promise<boolean> {
// To be re-implemented in subclass (if needed)
return true;
}
async post_read_hook(population: Array<String>) {
// To be re-implemented in subclass (if needed)
await this._handle_population(population);
}
async post_create_hook() {
// To be re-implemented in subclass (if needed)
}
async post_update_hook() {
// To be re-implemented in subclass (if needed)
}
async _handle_population(population: Array<String>, propagate_population: boolean = false) {
const EntitiesUtils = require('../../utils/entities');
const mapping = await this.constructor.fetch_mapping(this.index, this.type,
this._client, true);
if (mapping && !('_meta' in mapping)) {
return;
}
if (!('refs' in mapping._meta)) {
return;
}
const refs = mapping._meta.refs;
const info = this._db.source;
for (const p of population) {
const path = p.split('.');
const vals = [...Utils.find_popvalue_with_path(info, path.slice(), true)];
let ref = [...Utils.find_popvalue_with_path(refs, path.slice())];
if (ref.length > 0 && vals.length > 0) {
ref = ref[0];
const last = path[path.length - 1];
for (const v of vals) {
if (ref === 'lang') {
if (v[last]) {
const result = await EntitiesUtils.search(ref, { where: { key: v[last], size: 250 } });
const hits = EntitiesUtils.get_hits(result);
v[last] = hits.length > 0 ? hits.map(h => h.source) : [];
} else {
v[last] = [];
}
} else {
const result = await EntitiesUtils.retrieve(v[last],
ref, '', propagate_population ? population.join(',') : '');
v[last] = result != null ? result.source : {};
}
}
}
}
}
}
module.exports = ODM;