3 var Promise = require('bluebird');
4 var persistence = require('gitter-web-persistence');
5 var BatchStream = require('batch-stream');
6 var through2 = require('through2');
7 var ElasticBulkUpdateStream = require('./elastic-bulk-update-stream');
8 var bulkTools = require('./elastic-bulk-tools');
9 var inputsForUser = require('./elastic-inputs-for-user');
10 var elasticClient = require('../../utils/elasticsearch-typeahead-client');
11 var userService = require('gitter-web-users');
12 var collections = require('gitter-web-utils/lib/collections');
13 var uuid = require('uuid/v4');
14 var debug = require('debug')('gitter:app:user-typeahead');
15 var mongoReadPrefs = require('gitter-web-persistence-utils/lib/mongo-read-prefs');
17 var INDEX_PREFIX = 'typeahead_';
18 var READ_INDEX_ALIAS = 'typeahead-read';
19 var WRITE_INDEX_ALIAS = 'typeahead-write';
20 var BATCH_SIZE = 1000;
21 var MEMBERSHIP_LIMIT = 600;
23 function query(text, options) {
24 var roomId = (options || {}).roomId || '*';
29 index: READ_INDEX_ALIAS,
36 context: { rooms: roomId },
43 var options = res.suggest[0].options;
44 var userIds = options.map(function(option) {
45 return option.payload._uid[0].split('#')[1];
47 return userService.findByIds(userIds).then(function(users) {
48 return collections.maintainIdOrder(userIds, users);
54 var newIndex = INDEX_PREFIX + uuid();
56 return createIndex(newIndex)
58 return setWriteAlias(newIndex);
61 return Promise.all([reindexUsers(), reindexMemberships()]);
64 return setReadAlias(newIndex);
67 return removeUnusedIndicies();
71 function addUsersToGroupRoom(userIds, roomId) {
72 var updates = userIds.map(function(userId) {
73 return createAddMembershipUpdate(userId, [roomId]);
75 var req = bulkTools.createBulkUpdate(updates);
76 return elasticClient.bulk(req).then(function(res) {
77 var err = bulkTools.findErrors(req, res);
82 function removeUsersFromRoom(userIds, roomId) {
83 var updates = userIds.map(function(userId) {
84 return createRemoveMembershipUpdate(userId, roomId);
86 var req = bulkTools.createBulkUpdate(updates);
87 return elasticClient.bulk(req).then(function(res) {
88 var err = bulkTools.findErrors(req, res);
93 function upsertUser(user) {
94 return elasticClient.update(createUserUpdate(user));
97 function createIndex(name) {
98 debug('creating index %s', name);
99 return elasticClient.indices.create({
104 number_of_replicas: 1,
116 search_analyzer: 'simple',
117 preserve_separators: false,
118 contexts: [{ name: 'rooms', type: 'category' }]
127 function setWriteAlias(index) {
128 debug('setting %s as sole write alias (%s)', index, WRITE_INDEX_ALIAS);
129 return elasticClient.indices.updateAliases({
132 { remove: { index: INDEX_PREFIX + '*', alias: WRITE_INDEX_ALIAS } },
133 { add: { index: index, alias: WRITE_INDEX_ALIAS } }
139 function setReadAlias(index) {
140 debug('setting %s as sole read alias (%s)', index, READ_INDEX_ALIAS);
141 return elasticClient.indices.updateAliases({
144 { remove: { index: INDEX_PREFIX + '*', alias: READ_INDEX_ALIAS } },
145 { add: { index: index, alias: READ_INDEX_ALIAS } }
151 function removeUnusedIndicies() {
152 return elasticClient.indices.getAliases().then(function(resp) {
153 var unused = Object.keys(resp).filter(function(index) {
154 var aliases = Object.keys(resp[index].aliases);
155 return index.indexOf(INDEX_PREFIX) === 0 && aliases.length === 0;
158 if (!unused.length) return;
160 debug('removing indices %j', unused);
161 return elasticClient.indices.delete({ index: unused });
165 function reindexUsers() {
166 return new Promise(function(resolve, reject) {
167 var userStream = persistence.User.find()
169 .read(mongoReadPrefs.secondaryPreferred)
170 .batchSize(BATCH_SIZE)
173 var user2elastic = through2.obj(function(user, encoding, callback) {
174 this.push(createUserUpdate(user));
178 var batchStream = new BatchStream({ size: BATCH_SIZE });
180 var elasticBulkUpdateStream = new ElasticBulkUpdateStream(elasticClient);
188 .pipe(elasticBulkUpdateStream)
190 .on('finish', resolve);
194 function reindexMemberships() {
195 return new Promise(function(resolve, reject) {
196 var troupeUserStream = persistence.TroupeUser.aggregate([
200 userIds: { $push: '$userId' }
213 // if we allowed oneToOnes, then @mydigitalself would break elasticsearch!
214 // field is missing if not one to one, but true if it is
215 'troupe.oneToOne': { $ne: true }
229 troupeIds: { $push: '$_id' }
234 .read(mongoReadPrefs.secondaryPreferred)
235 .cursor({ batchSize: BATCH_SIZE })
239 var memberships2elastic = through2.obj(function(memberships, encoding, callback) {
240 this.push(createAddMembershipUpdate(memberships._id, memberships.troupeIds));
244 var batchStream = new BatchStream({ size: BATCH_SIZE });
246 var elasticBulkUpdateStream = new ElasticBulkUpdateStream(elasticClient);
250 .pipe(memberships2elastic)
254 .pipe(elasticBulkUpdateStream)
256 .on('finish', resolve);
260 function createUserUpdate(user) {
261 var id = user._id.toString();
262 var input = inputsForUser(user);
265 index: WRITE_INDEX_ALIAS,
283 _retry_on_conflict: 3
287 function createAddMembershipUpdate(userId, roomIds) {
288 var id = userId.toString();
289 var newRooms = roomIds.map(function(roomId) {
290 return roomId.toString();
293 if (newRooms.length > MEMBERSHIP_LIMIT) {
294 // going over the limit can cause a too_complex_to_determinize_exception
295 // from elastic as the automaton has a total limit of 10000 states.
297 '%s is in %d rooms which is over the limit of %d. probably troll; ignoring update',
306 index: WRITE_INDEX_ALIAS,
310 // ensures roomIds are unique and that the update doenst go over the membership limit
312 'ctx._source.suggest.contexts.rooms = (ctx._source.suggest.contexts.rooms + new_rooms).unique(false).take(' +
321 rooms: ['*'].concat(newRooms)
326 _retry_on_conflict: 3
330 function createRemoveMembershipUpdate(userId, roomId) {
332 index: WRITE_INDEX_ALIAS,
334 id: userId.toString(),
336 script: 'ctx._source.suggest.contexts.rooms = ctx._source.suggest.contexts.rooms -= roomId',
338 roomId: roomId.toString()
348 _retry_on_conflict: 3
355 addUsersToGroupRoom: addUsersToGroupRoom,
356 removeUsersFromRoom: removeUsersFromRoom,
357 upsertUser: upsertUser