Gitter migration: Setup redirects (rollout pt. 3)
[gitter.git] / server / services / typeaheads / user-typeahead-elastic.js
blob0cf9136c0b86763e07fdd59bd9b9ee3c6ca16d65
1 'use strict';
3 var Promise = require('bluebird');
4 var persistence = require('gitter-web-persistence');
5 var BatchStream = require('batch-stream');
6 var through2 = require('through2');
7 var ElasticBulkUpdateStream = require('./elastic-bulk-update-stream');
8 var bulkTools = require('./elastic-bulk-tools');
9 var inputsForUser = require('./elastic-inputs-for-user');
10 var elasticClient = require('../../utils/elasticsearch-typeahead-client');
11 var userService = require('gitter-web-users');
12 var collections = require('gitter-web-utils/lib/collections');
13 var uuid = require('uuid/v4');
14 var debug = require('debug')('gitter:app:user-typeahead');
15 var mongoReadPrefs = require('gitter-web-persistence-utils/lib/mongo-read-prefs');
17 var INDEX_PREFIX = 'typeahead_';
18 var READ_INDEX_ALIAS = 'typeahead-read';
19 var WRITE_INDEX_ALIAS = 'typeahead-write';
20 var BATCH_SIZE = 1000;
21 var MEMBERSHIP_LIMIT = 600;
23 function query(text, options) {
24   var roomId = (options || {}).roomId || '*';
26   return elasticClient
27     .suggest({
28       size: 10,
29       index: READ_INDEX_ALIAS,
30       type: 'user',
31       body: {
32         suggest: {
33           text: text,
34           completion: {
35             field: 'suggest',
36             context: { rooms: roomId },
37             payload: ['_uid']
38           }
39         }
40       }
41     })
42     .then(function(res) {
43       var options = res.suggest[0].options;
44       var userIds = options.map(function(option) {
45         return option.payload._uid[0].split('#')[1];
46       });
47       return userService.findByIds(userIds).then(function(users) {
48         return collections.maintainIdOrder(userIds, users);
49       });
50     });
53 function reindex() {
54   var newIndex = INDEX_PREFIX + uuid();
56   return createIndex(newIndex)
57     .then(function() {
58       return setWriteAlias(newIndex);
59     })
60     .then(function() {
61       return Promise.all([reindexUsers(), reindexMemberships()]);
62     })
63     .then(function() {
64       return setReadAlias(newIndex);
65     })
66     .then(function() {
67       return removeUnusedIndicies();
68     });
71 function addUsersToGroupRoom(userIds, roomId) {
72   var updates = userIds.map(function(userId) {
73     return createAddMembershipUpdate(userId, [roomId]);
74   });
75   var req = bulkTools.createBulkUpdate(updates);
76   return elasticClient.bulk(req).then(function(res) {
77     var err = bulkTools.findErrors(req, res);
78     if (err) throw err;
79   });
82 function removeUsersFromRoom(userIds, roomId) {
83   var updates = userIds.map(function(userId) {
84     return createRemoveMembershipUpdate(userId, roomId);
85   });
86   var req = bulkTools.createBulkUpdate(updates);
87   return elasticClient.bulk(req).then(function(res) {
88     var err = bulkTools.findErrors(req, res);
89     if (err) throw err;
90   });
93 function upsertUser(user) {
94   return elasticClient.update(createUserUpdate(user));
97 function createIndex(name) {
98   debug('creating index %s', name);
99   return elasticClient.indices.create({
100     index: name,
101     body: {
102       settings: {
103         number_of_shards: 4,
104         number_of_replicas: 1,
105         mapper: {
106           dynamic: false
107         }
108       },
109       mappings: {
110         user: {
111           dynamic: 'strict',
112           properties: {
113             suggest: {
114               type: 'completion',
115               analyzer: 'simple',
116               search_analyzer: 'simple',
117               preserve_separators: false,
118               contexts: [{ name: 'rooms', type: 'category' }]
119             }
120           }
121         }
122       }
123     }
124   });
127 function setWriteAlias(index) {
128   debug('setting %s as sole write alias (%s)', index, WRITE_INDEX_ALIAS);
129   return elasticClient.indices.updateAliases({
130     body: {
131       actions: [
132         { remove: { index: INDEX_PREFIX + '*', alias: WRITE_INDEX_ALIAS } },
133         { add: { index: index, alias: WRITE_INDEX_ALIAS } }
134       ]
135     }
136   });
139 function setReadAlias(index) {
140   debug('setting %s as sole read alias (%s)', index, READ_INDEX_ALIAS);
141   return elasticClient.indices.updateAliases({
142     body: {
143       actions: [
144         { remove: { index: INDEX_PREFIX + '*', alias: READ_INDEX_ALIAS } },
145         { add: { index: index, alias: READ_INDEX_ALIAS } }
146       ]
147     }
148   });
151 function removeUnusedIndicies() {
152   return elasticClient.indices.getAliases().then(function(resp) {
153     var unused = Object.keys(resp).filter(function(index) {
154       var aliases = Object.keys(resp[index].aliases);
155       return index.indexOf(INDEX_PREFIX) === 0 && aliases.length === 0;
156     });
158     if (!unused.length) return;
160     debug('removing indices %j', unused);
161     return elasticClient.indices.delete({ index: unused });
162   });
165 function reindexUsers() {
166   return new Promise(function(resolve, reject) {
167     var userStream = persistence.User.find()
168       .lean()
169       .read(mongoReadPrefs.secondaryPreferred)
170       .batchSize(BATCH_SIZE)
171       .stream();
173     var user2elastic = through2.obj(function(user, encoding, callback) {
174       this.push(createUserUpdate(user));
175       callback();
176     });
178     var batchStream = new BatchStream({ size: BATCH_SIZE });
180     var elasticBulkUpdateStream = new ElasticBulkUpdateStream(elasticClient);
182     userStream
183       .on('error', reject)
184       .pipe(user2elastic)
185       .on('error', reject)
186       .pipe(batchStream)
187       .on('error', reject)
188       .pipe(elasticBulkUpdateStream)
189       .on('error', reject)
190       .on('finish', resolve);
191   });
194 function reindexMemberships() {
195   return new Promise(function(resolve, reject) {
196     var troupeUserStream = persistence.TroupeUser.aggregate([
197       {
198         $group: {
199           _id: '$troupeId',
200           userIds: { $push: '$userId' }
201         }
202       },
203       {
204         $lookup: {
205           from: 'troupes',
206           localField: '_id',
207           foreignField: '_id',
208           as: 'troupe'
209         }
210       },
211       {
212         $match: {
213           // if we allowed oneToOnes, then @mydigitalself would break elasticsearch!
214           // field is missing if not one to one, but true if it is
215           'troupe.oneToOne': { $ne: true }
216         }
217       },
218       {
219         $project: {
220           userIds: true
221         }
222       },
223       {
224         $unwind: '$userIds'
225       },
226       {
227         $group: {
228           _id: '$userIds',
229           troupeIds: { $push: '$_id' }
230         }
231       }
232     ])
233       .allowDiskUse(true)
234       .read(mongoReadPrefs.secondaryPreferred)
235       .cursor({ batchSize: BATCH_SIZE })
236       .exec()
237       .stream();
239     var memberships2elastic = through2.obj(function(memberships, encoding, callback) {
240       this.push(createAddMembershipUpdate(memberships._id, memberships.troupeIds));
241       callback();
242     });
244     var batchStream = new BatchStream({ size: BATCH_SIZE });
246     var elasticBulkUpdateStream = new ElasticBulkUpdateStream(elasticClient);
248     troupeUserStream
249       .on('error', reject)
250       .pipe(memberships2elastic)
251       .on('error', reject)
252       .pipe(batchStream)
253       .on('error', reject)
254       .pipe(elasticBulkUpdateStream)
255       .on('error', reject)
256       .on('finish', resolve);
257   });
260 function createUserUpdate(user) {
261   var id = user._id.toString();
262   var input = inputsForUser(user);
264   return {
265     index: WRITE_INDEX_ALIAS,
266     type: 'user',
267     id: id,
268     body: {
269       doc: {
270         suggest: {
271           input: input
272         }
273       },
274       upsert: {
275         suggest: {
276           input: input,
277           contexts: {
278             rooms: ['*']
279           }
280         }
281       }
282     },
283     _retry_on_conflict: 3
284   };
287 function createAddMembershipUpdate(userId, roomIds) {
288   var id = userId.toString();
289   var newRooms = roomIds.map(function(roomId) {
290     return roomId.toString();
291   });
293   if (newRooms.length > MEMBERSHIP_LIMIT) {
294     // going over the limit can cause a too_complex_to_determinize_exception
295     // from elastic as the automaton has a total limit of 10000 states.
296     debug(
297       '%s is in %d rooms which is over the limit of %d. probably troll; ignoring update',
298       userId,
299       newRooms.length,
300       MEMBERSHIP_LIMIT
301     );
302     newRooms = [];
303   }
305   return {
306     index: WRITE_INDEX_ALIAS,
307     type: 'user',
308     id: id,
309     body: {
310       // ensures roomIds are unique and that the update doenst go over the membership limit
311       script:
312         'ctx._source.suggest.contexts.rooms = (ctx._source.suggest.contexts.rooms + new_rooms).unique(false).take(' +
313         MEMBERSHIP_LIMIT +
314         ')',
315       params: {
316         new_rooms: newRooms
317       },
318       upsert: {
319         suggest: {
320           contexts: {
321             rooms: ['*'].concat(newRooms)
322           }
323         }
324       }
325     },
326     _retry_on_conflict: 3
327   };
330 function createRemoveMembershipUpdate(userId, roomId) {
331   return {
332     index: WRITE_INDEX_ALIAS,
333     type: 'user',
334     id: userId.toString(),
335     body: {
336       script: 'ctx._source.suggest.contexts.rooms = ctx._source.suggest.contexts.rooms -= roomId',
337       params: {
338         roomId: roomId.toString()
339       },
340       upsert: {
341         suggest: {
342           contexts: {
343             rooms: ['*']
344           }
345         }
346       }
347     },
348     _retry_on_conflict: 3
349   };
352 module.exports = {
353   query: query,
354   reindex: reindex,
355   addUsersToGroupRoom: addUsersToGroupRoom,
356   removeUsersFromRoom: removeUsersFromRoom,
357   upsertUser: upsertUser