Manual Reference Source

app/modules/entities/crud/odm.js

  1. // @flow
  2. const _ = require('lodash');
  3. const Search = require('./search');
  4. const Mapping = require('./mapping');
  5. const Utils = require('../../utils/utils');
  6.  
  7. /**
  8. * Object Data Model
  9. * Generic class to model an entity in the program (saved into an ElasticSearch DB).
  10. *
  11. * @public
  12. *
  13. *
  14. */
  15. class ODM {
  16. _client: Object;
  17. _index: string;
  18. _type: string;
  19. _db: Object;
  20. _model: Object;
  21. _id: ?string;
  22.  
  23. /**
  24. * @param client: ElasticSearch client;
  25. * @param id: id of the entity (could be null)
  26. */
  27. constructor(index: string, type: string, client: Object,
  28. model: Object, id: ?string) {
  29. this._client = client;
  30. this._id = id;
  31. this._db = {};
  32. this._index = index;
  33. this._type = type;
  34. this._model = model;
  35. }
  36.  
  37. /**
  38. * @public
  39. * @return the underlying model of this entity
  40. * @see entity models for more details
  41. */
  42. get model(): Object {
  43. return this._model;
  44. }
  45.  
  46. /**
  47. * @public
  48. * @return the underlying ElasticSearch mapping of this entity
  49. */
  50. get mapping(): Object {
  51. return this._model.Mapping;
  52. }
  53.  
  54. /**
  55. * @public
  56. * @return the underlying ElasticSearch index of this entity
  57. */
  58. get index(): string {
  59. return this._index;
  60. }
  61.  
  62. /**
  63. * @public
  64. * @return the underlying ElasticSearch type of this entity
  65. */
  66. get type(): string {
  67. return this._type;
  68. }
  69.  
  70. get source(): Object {
  71. return this.db.source || {};
  72. }
  73.  
  74. /**
  75. * Process an entity and create an {ODM} object.
  76. *
  77. * @public
  78. * @param hit: hit in ElasticSearch result
  79. * @param found: true if the object was found in DB, false otherwise
  80. * @return content of the hit
  81. */
  82. static format_hit(hit: Object, found: boolean = true): Object {
  83. if (hit == null) {
  84. return {};
  85. }
  86.  
  87. let score = 0;
  88. const index = hit._index;
  89. const type = hit._type;
  90. const id = hit._id;
  91. const source = '_source' in hit ? hit._source : {};
  92. const sort = 'sort' in hit ? hit.sort : [];
  93. if ('score' in hit) {
  94. score = hit._score;
  95. }
  96.  
  97. source._id = id;
  98.  
  99. return {
  100. id,
  101. type,
  102. index,
  103. score,
  104. source,
  105. sort,
  106. found,
  107. };
  108. }
  109.  
  110. /**
  111. * Get name of the class (= entity)
  112. * @public
  113. * @return name of the class
  114. */
  115. get name(): string {
  116. return this.constructor.name.toLowerCase();
  117. }
  118.  
  119. /**
  120. * Get id of an entity (possibly null)
  121. * @public
  122. * @return id
  123. */
  124. get id(): ?string {
  125. return this._id;
  126. }
  127.  
  128. get messages(): Object {
  129. return this._model.Messages;
  130. }
  131.  
  132. /**
  133. * Get all information about an entity;
  134. * @public
  135. * @return all interesting information
  136. */
  137. get db(): Object {
  138. return JSON.parse(JSON.stringify(this._db));
  139. }
  140.  
  141. /**
  142. * Set all interesting information about an entity
  143. * @public
  144. * @param o: information
  145. */
  146. set db(o: Object) {
  147. this._db = o;
  148. }
  149.  
  150. static async fetch_mapping(index: string, type: string, client: Object, include_meta: boolean = false) {
  151. const mapping = await client.indices.getMapping({ index, type });
  152. if (index in mapping && type in mapping[index].mappings) {
  153. if (include_meta) {
  154. return mapping[index].mappings[type];
  155. }
  156. return mapping[index].mappings[type].properties;
  157. }
  158. return null;
  159. }
  160.  
  161. static async fetch_settings(index: string, type: string, client: Object) {
  162. const settings = await client.indices.getSettings({ index, type });
  163. if (settings && index in settings) {
  164. return settings[index];
  165. }
  166. return null;
  167. }
  168.  
  169. static async read(index: string, type: string,
  170. client: Object, model: Object, response: Object,
  171. population: Array<String> = [], backward: boolean = false): Object {
  172. const o = {};
  173.  
  174. if ('_scroll_id' in response) {
  175. o.scroll_id = response._scroll_id;
  176. }
  177.  
  178. if ('count' in response) {
  179. o.count = response.count;
  180. }
  181.  
  182. if ('took' in response) {
  183. o.took = response.took;
  184. }
  185.  
  186. if ('timed_out' in response) {
  187. o.timeout = response.timed_out;
  188. }
  189.  
  190. if ('hits' in response) {
  191. o.hits = response.hits.hits.map((hit) => {
  192. const info = this.format_hit(hit);
  193. const odm = new this(index, type, client, model, info.id);
  194. odm.db = info;
  195. return odm;
  196. });
  197.  
  198. if (backward) {
  199. _.reverse(o.hits);
  200. }
  201.  
  202. await o.hits.reduce((pr, hit) =>
  203. pr.then(() => hit.post_read_hook(population)), Promise.resolve());
  204.  
  205. o.total = response.hits.total;
  206. o.count = response.hits.total;
  207. o.max_score = response.hits.max_score;
  208. }
  209.  
  210. if ('aggregations' in response) {
  211. o.aggs = response.aggregations;
  212. }
  213.  
  214. return o;
  215. }
  216.  
  217. static async search(index: string, type: string, client: Object, model: Object,
  218. search: Search, opts: Object = {}): Promise<Object> {
  219. const query = search.generate();
  220. console.log(JSON.stringify(query));
  221. const sort = search.sort();
  222. const aggs = search.aggs();
  223. const population = 'population' in opts ? opts.population : [];
  224. const body = {
  225. from: 'from' in opts ? opts.from : 0,
  226. size: 'size' in opts ? opts.size : 1000,
  227. _source: 'source' in opts ? opts.source : true,
  228. query,
  229. };
  230.  
  231.  
  232. if (sort != null) {
  233. body.sort = sort;
  234. if ('search_after' in opts) {
  235. body.from = 0;
  236. body.search_after = opts.search_after;
  237. } else if ('search_before' in opts) {
  238. body.sort = body.sort.map(s => _.reduce(s, (obj, value, key) => {
  239. obj[key] = value.order === 'asc' ? _.merge({}, value, { order: 'desc' }) : _.merge({}, value, { order: 'asc' });
  240. return obj;
  241. }, {}));
  242.  
  243. body.search_after = opts.search_before;
  244. }
  245. }
  246.  
  247. if (aggs != null) {
  248. body.aggs = aggs;
  249. }
  250.  
  251. let response = {};
  252. if ('scroll_id' in opts && 'scroll' in opts) {
  253. response = await client.scroll({
  254. scrollId: opts.scroll_id,
  255. scroll: opts.scroll,
  256. });
  257. } else {
  258. const req = {
  259. index,
  260. type,
  261. body,
  262. };
  263.  
  264. if ('scroll' in opts) {
  265. req.scroll = opts.scroll;
  266. }
  267.  
  268. response = await client.search(req);
  269. }
  270.  
  271. return this.read(index, type, client, model, response, population, 'search_before' in opts);
  272. }
  273.  
  274. static async count(index: string, type: string, client: Object,
  275. model: Object, search: Search): Promise<Object> {
  276. const query = search.generate();
  277. const response = await client.count({
  278. index,
  279. type,
  280. body: {
  281. query,
  282. },
  283. });
  284. return this.read(index, type, client, model, response);
  285. }
  286.  
  287. static async deleteByQuery(index: string, type: string, client: Object, search: Search) {
  288. const query = search.generate();
  289. await client.deleteByQuery({
  290. index,
  291. type,
  292. refresh: true,
  293. body: {
  294. query,
  295. },
  296. });
  297. }
  298.  
  299. static async remove(index: string, type: string, client: Object, id: string): Promise<boolean> {
  300. try {
  301. const response = await client.delete({
  302. index,
  303. type,
  304. id,
  305. refresh: true,
  306. });
  307. // console.log(response);
  308. return response.found;
  309. } catch (err) {
  310. console.log('remove error', err);
  311. return false;
  312. }
  313. }
  314.  
  315. static async _create_or_update(index: string, type: string,
  316. client: Object, model: Object, body: Object, id: ?string = null): Promise<?ODM> {
  317. console.log('create or update body', JSON.stringify(body));
  318. try {
  319. const content = {
  320. index,
  321. type,
  322. body,
  323. refresh: true,
  324. };
  325.  
  326. if (id != null) {
  327. content.id = id;
  328. const ret = await this.pre_update_hook(index, type, client, model, body, id);
  329. if (!ret) {
  330. return null;
  331. }
  332. } else {
  333. // TODO NEED TO BE REMOVE AFTER DATA IMPORT
  334. if ('_id' in content.body) {
  335. content.id = content.body._id;
  336. delete content.body._id;
  337. }
  338.  
  339. const ret = await this.pre_create_hook(index, type, client, model, body);
  340. if (!ret) {
  341. return null;
  342. }
  343. }
  344.  
  345. const response = await client.index(content);
  346. if (('created' in response && response.created)
  347. || ('result' in response && response.result === 'updated')) {
  348. try {
  349. const get_response = await client.get({
  350. index,
  351. type,
  352. id: response._id,
  353. });
  354. const odm = new this(index, type, client, model, response._id);
  355. odm.db = this.format_hit(get_response, get_response.found);
  356. if ('created' in response) {
  357. await odm.post_create_hook();
  358. } else {
  359. await odm.post_update_hook();
  360. }
  361.  
  362. // console.log(odm);
  363. return odm;
  364. } catch (err) {
  365. return null;
  366. }
  367. }
  368. return null;
  369. } catch (err) {
  370. console.log('creation or update error', err);
  371. return null;
  372. }
  373. }
  374.  
  375. static async _bulk_create_or_update(index: string, type: string, client: Object,
  376. body: Array<Object>, action: string = 'create'): Promise<?ODM> {
  377. try {
  378. const content = {
  379. index,
  380. type,
  381. body: _.flatten(body.map((e) => {
  382. if (action === 'create') {
  383. return [{ index: {} }, e];
  384. } else if (action === 'update') {
  385. const _id = e._id;
  386. delete e._id;
  387. return [{ update: { _id } }, { doc: e }];
  388. }
  389. return [];
  390. })),
  391. refresh: true,
  392. };
  393.  
  394. const response = await client.bulk(content);
  395. console.log(response);
  396. return null;
  397. } catch (err) {
  398. console.log('bulk creation or update error', err);
  399. return null;
  400. }
  401. }
  402.  
  403.  
  404. async read(opts: Object = {}): Promise<ODM> {
  405. const population = 'population' in opts ? opts.population : [];
  406. const source = 'source' in opts ? opts.source : null;
  407.  
  408. await this.pre_read_hook(source, population);
  409.  
  410. try {
  411. const response = await this._client.get({
  412. index: this.index,
  413. type: this.type,
  414. id: this._id,
  415. _source: source,
  416. });
  417.  
  418.  
  419. this.db = this.constructor.format_hit(response, response.found);
  420. await this.post_read_hook(population);
  421. } catch (err) {
  422. const response = err.body;
  423. this.db = this.constructor.format_hit(response, response ? response.found : false);
  424. await this.post_read_hook(population);
  425. }
  426. return this;
  427. }
  428.  
  429. static async create(index: string, type: string, client: Object,
  430. model: Object, body: Object): Promise<?ODM> {
  431. return this._create_or_update(index, type, client, model, body);
  432. }
  433.  
  434. static async update(index: string, type: string, client: Object,
  435. model: Object, body: Object, id: string): Promise<?ODM> {
  436. // console.log('update', JSON.stringify(body));
  437. return this._create_or_update(index, type, client, model, body, id);
  438. }
  439.  
  440. static async bulk_create(index: string, type: string, client: Object,
  441. body: Array<Object>): Promise<?ODM> {
  442. return this._bulk_create_or_update(index, type, client, body, 'create');
  443. }
  444.  
  445. static async bulk_update(index: string, type: string, client: Object,
  446. body: Array<Object>): Promise<?ODM> {
  447. return this._bulk_create_or_update(index, type, client, body, 'update');
  448. }
  449.  
  450. oupdate(): Promise<?ODM> {
  451. return ODM.update(this.index, this.type, this.client, this.model, this.source);
  452. }
  453.  
  454. ocreate(): Promise<?ODM> {
  455. return ODM.create(this.index, this.type, this.client, this.model, this.source);
  456. }
  457.  
  458. toJSON(): Object {
  459. return this.db;
  460. }
  461.  
  462. async pre_read_hook() {
  463. // TODO TBD
  464. }
  465.  
  466. static async pre_create_hook(index: string, type: string,
  467. client: Object, model: Object, body: Object): Promise<boolean> {
  468. // To be re-implemented in subclass (if needed)
  469. return true;
  470. }
  471.  
  472. static async pre_update_hook(index: string, type: string,
  473. client: Object, model: Object, body: Object, id: string): Promise<boolean> {
  474. // To be re-implemented in subclass (if needed)
  475. return true;
  476. }
  477.  
  478. async post_read_hook(population: Array<String>) {
  479. // To be re-implemented in subclass (if needed)
  480. await this._handle_population(population);
  481. }
  482.  
  483. async post_create_hook() {
  484. // To be re-implemented in subclass (if needed)
  485. }
  486.  
  487. async post_update_hook() {
  488. // To be re-implemented in subclass (if needed)
  489. }
  490.  
  491. async _handle_population(population: Array<String>, propagate_population: boolean = false) {
  492. const EntitiesUtils = require('../../utils/entities');
  493. const mapping = await this.constructor.fetch_mapping(this.index, this.type,
  494. this._client, true);
  495.  
  496. if (mapping && !('_meta' in mapping)) {
  497. return;
  498. }
  499.  
  500. if (!('refs' in mapping._meta)) {
  501. return;
  502. }
  503.  
  504. const refs = mapping._meta.refs;
  505. const info = this._db.source;
  506.  
  507.  
  508. for (const p of population) {
  509. const path = p.split('.');
  510. const vals = [...Utils.find_popvalue_with_path(info, path.slice(), true)];
  511. let ref = [...Utils.find_popvalue_with_path(refs, path.slice())];
  512. if (ref.length > 0 && vals.length > 0) {
  513. ref = ref[0];
  514. const last = path[path.length - 1];
  515. for (const v of vals) {
  516. if (ref === 'lang') {
  517. if (v[last]) {
  518. const result = await EntitiesUtils.search(ref, { where: { key: v[last], size: 250 } });
  519. const hits = EntitiesUtils.get_hits(result);
  520. v[last] = hits.length > 0 ? hits.map(h => h.source) : [];
  521. } else {
  522. v[last] = [];
  523. }
  524. } else {
  525. const result = await EntitiesUtils.retrieve(v[last],
  526. ref, '', propagate_population ? population.join(',') : '');
  527. v[last] = result != null ? result.source : {};
  528. }
  529. }
  530. }
  531. }
  532. }
  533. }
  534.  
  535. module.exports = ODM;