ctdb-server: Remove duplicate logic
[samba4-gss.git] / source3 / rpc_server / mdssvc / mdssvc_es.c
blobd51441092b42b00fce523fdf7c18399bcc455a8e
1 /*
2 Unix SMB/CIFS implementation.
3 Main metadata server / Spotlight routines / ES backend
5 Copyright (C) Ralph Boehme 2019
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "includes.h"
22 #include "system/filesys.h"
23 #include "lib/util/time_basic.h"
24 #include "lib/tls/tls.h"
25 #include "lib/util/tevent_ntstatus.h"
26 #include "libcli/http/http.h"
27 #include "lib/util/tevent_unix.h"
28 #include "credentials.h"
29 #include "mdssvc.h"
30 #include "mdssvc_es.h"
31 #include "rpc_server/mdssvc/es_parser.tab.h"
32 #include "lib/param/param.h"
34 #include <jansson.h>
36 #undef DBGC_CLASS
37 #define DBGC_CLASS DBGC_RPC_SRV
39 #define MDSSVC_ELASTIC_QUERY_TEMPLATE \
40 "{" \
41 " \"from\": %zu," \
42 " \"size\": %zu," \
43 " \"_source\": [%s]," \
44 " \"query\": {" \
45 " \"query_string\": {" \
46 " \"query\": \"%s\"" \
47 " }" \
48 " }" \
49 "}"
51 #define MDSSVC_ELASTIC_SOURCES \
52 "\"path.real\""
54 static bool mdssvc_es_init(struct mdssvc_ctx *mdssvc_ctx)
56 struct mdssvc_es_ctx *mdssvc_es_ctx = NULL;
57 json_error_t json_error;
58 char *default_path = NULL;
59 const char *path = NULL;
61 mdssvc_es_ctx = talloc_zero(mdssvc_ctx, struct mdssvc_es_ctx);
62 if (mdssvc_es_ctx == NULL) {
63 return false;
65 mdssvc_es_ctx->mdssvc_ctx = mdssvc_ctx;
67 mdssvc_es_ctx->creds = cli_credentials_init_anon(mdssvc_es_ctx);
68 if (mdssvc_es_ctx->creds == NULL) {
69 TALLOC_FREE(mdssvc_es_ctx);
70 return false;
73 default_path = talloc_asprintf(
74 mdssvc_es_ctx,
75 "%s/mdssvc/elasticsearch_mappings.json",
76 get_dyn_SAMBA_DATADIR());
77 if (default_path == NULL) {
78 TALLOC_FREE(mdssvc_es_ctx);
79 return false;
82 path = lp_parm_const_string(GLOBAL_SECTION_SNUM,
83 "elasticsearch",
84 "mappings",
85 default_path);
86 if (path == NULL) {
87 TALLOC_FREE(mdssvc_es_ctx);
88 return false;
91 mdssvc_es_ctx->mappings = json_load_file(path, 0, &json_error);
92 if (mdssvc_es_ctx->mappings == NULL) {
93 DBG_ERR("Opening mapping file [%s] failed: %s\n",
94 path, json_error.text);
95 TALLOC_FREE(mdssvc_es_ctx);
96 return false;
98 TALLOC_FREE(default_path);
100 mdssvc_ctx->backend_private = mdssvc_es_ctx;
101 return true;
104 static bool mdssvc_es_shutdown(struct mdssvc_ctx *mdssvc_ctx)
106 return true;
109 static struct tevent_req *mds_es_connect_send(
110 TALLOC_CTX *mem_ctx,
111 struct tevent_context *ev,
112 struct mds_es_ctx *mds_es_ctx);
113 static int mds_es_connect_recv(struct tevent_req *req);
114 static void mds_es_connected(struct tevent_req *subreq);
115 static bool mds_es_next_search_trigger(struct mds_es_ctx *mds_es_ctx);
116 static void mds_es_search_set_pending(struct sl_es_search *s);
117 static void mds_es_search_unset_pending(struct sl_es_search *s);
119 static int mds_es_ctx_destructor(struct mds_es_ctx *mds_es_ctx)
121 struct sl_es_search *s = mds_es_ctx->searches;
124 * The per tree-connect state mds_es_ctx (a child of mds_ctx) is about
125 * to go away and has already freed all waiting searches. If there's a
126 * search remaining that's when the search is already active. Reset the
127 * mds_es_ctx pointer, so we can detect this when the search completes.
130 if (s == NULL) {
131 return 0;
134 s->mds_es_ctx = NULL;
136 return 0;
139 static bool mds_es_connect(struct mds_ctx *mds_ctx)
141 struct mdssvc_es_ctx *mdssvc_es_ctx = talloc_get_type_abort(
142 mds_ctx->mdssvc_ctx->backend_private, struct mdssvc_es_ctx);
143 struct mds_es_ctx *mds_es_ctx = NULL;
144 struct tevent_req *subreq = NULL;
146 mds_es_ctx = talloc_zero(mds_ctx, struct mds_es_ctx);
147 if (mds_es_ctx == NULL) {
148 return false;
150 *mds_es_ctx = (struct mds_es_ctx) {
151 .mdssvc_es_ctx = mdssvc_es_ctx,
152 .mds_ctx = mds_ctx,
155 mds_ctx->backend_private = mds_es_ctx;
156 talloc_set_destructor(mds_es_ctx, mds_es_ctx_destructor);
158 subreq = mds_es_connect_send(
159 mds_es_ctx,
160 mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
161 mds_es_ctx);
162 if (subreq == NULL) {
163 TALLOC_FREE(mds_es_ctx);
164 return false;
166 tevent_req_set_callback(subreq, mds_es_connected, mds_es_ctx);
167 return true;
170 static void mds_es_connected(struct tevent_req *subreq)
172 struct mds_es_ctx *mds_es_ctx = tevent_req_callback_data(
173 subreq, struct mds_es_ctx);
174 int ret;
175 bool ok;
177 ret = mds_es_connect_recv(subreq);
178 TALLOC_FREE(subreq);
179 if (ret != 0) {
180 DBG_ERR("HTTP connect failed\n");
181 return;
184 ok = mds_es_next_search_trigger(mds_es_ctx);
185 if (!ok) {
186 DBG_ERR("mds_es_next_search_trigger failed\n");
188 return;
191 struct mds_es_connect_state {
192 struct tevent_context *ev;
193 struct mds_es_ctx *mds_es_ctx;
194 struct tevent_queue_entry *qe;
195 const char *server_addr;
196 uint16_t server_port;
197 struct tstream_tls_params *tls_params;
200 static void mds_es_http_connect_done(struct tevent_req *subreq);
201 static void mds_es_http_waited(struct tevent_req *subreq);
203 static struct tevent_req *mds_es_connect_send(
204 TALLOC_CTX *mem_ctx,
205 struct tevent_context *ev,
206 struct mds_es_ctx *mds_es_ctx)
208 struct tevent_req *req = NULL;
209 struct tevent_req *subreq = NULL;
210 struct mds_es_connect_state *state = NULL;
211 const char *server_addr = NULL;
212 bool use_tls;
213 NTSTATUS status;
215 req = tevent_req_create(mem_ctx, &state, struct mds_es_connect_state);
216 if (req == NULL) {
217 return NULL;
219 *state = (struct mds_es_connect_state) {
220 .ev = ev,
221 .mds_es_ctx = mds_es_ctx,
224 server_addr = lp_parm_const_string(
225 mds_es_ctx->mds_ctx->snum,
226 "elasticsearch",
227 "address",
228 "localhost");
229 state->server_addr = talloc_strdup(state, server_addr);
230 if (tevent_req_nomem(state->server_addr, req)) {
231 return tevent_req_post(req, ev);
234 state->server_port = lp_parm_int(
235 mds_es_ctx->mds_ctx->snum,
236 "elasticsearch",
237 "port",
238 9200);
240 use_tls = lp_parm_bool(
241 mds_es_ctx->mds_ctx->snum,
242 "elasticsearch",
243 "use tls",
244 false);
246 DBG_DEBUG("Connecting to HTTP%s [%s] port [%"PRIu16"]\n",
247 use_tls ? "S" : "", state->server_addr, state->server_port);
249 if (use_tls) {
250 struct loadparm_context *lp_ctx = NULL;
252 lp_ctx = loadparm_init_s3(state, loadparm_s3_helpers());
253 if (tevent_req_nomem(lp_ctx, req)) {
254 return tevent_req_post(req, ev);
257 status = tstream_tls_params_client_lpcfg(state,
258 lp_ctx,
259 state->server_addr,
260 &state->tls_params);
261 TALLOC_FREE(lp_ctx);
262 if (!NT_STATUS_IS_OK(status)) {
263 DBG_ERR("Failed tstream_tls_params_client - %s\n",
264 nt_errstr(status));
265 tevent_req_nterror(req, status);
266 return tevent_req_post(req, ev);
270 subreq = http_connect_send(state,
271 state->ev,
272 state->server_addr,
273 state->server_port,
274 mds_es_ctx->mdssvc_es_ctx->creds,
275 state->tls_params);
276 if (tevent_req_nomem(subreq, req)) {
277 return tevent_req_post(req, ev);
279 tevent_req_set_callback(subreq, mds_es_http_connect_done, req);
280 return req;
283 static void mds_es_http_connect_done(struct tevent_req *subreq)
285 struct tevent_req *req = tevent_req_callback_data(
286 subreq, struct tevent_req);
287 struct mds_es_connect_state *state = tevent_req_data(
288 req, struct mds_es_connect_state);
289 int error;
291 error = http_connect_recv(subreq,
292 state->mds_es_ctx,
293 &state->mds_es_ctx->http_conn);
294 TALLOC_FREE(subreq);
295 if (error != 0) {
296 DBG_ERR("HTTP connect failed, retrying...\n");
298 subreq = tevent_wakeup_send(
299 state->mds_es_ctx,
300 state->mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
301 tevent_timeval_current_ofs(10, 0));
302 if (tevent_req_nomem(subreq, req)) {
303 return;
305 tevent_req_set_callback(subreq,
306 mds_es_http_waited,
307 req);
308 return;
311 DBG_DEBUG("Connected to HTTP%s [%s] port [%"PRIu16"]\n",
312 state->tls_params ? "S" : "",
313 state->server_addr, state->server_port);
315 tevent_req_done(req);
316 return;
319 static void mds_es_http_waited(struct tevent_req *subreq)
321 struct tevent_req *req = tevent_req_callback_data(
322 subreq, struct tevent_req);
323 struct mds_es_connect_state *state = tevent_req_data(
324 req, struct mds_es_connect_state);
325 bool ok;
327 ok = tevent_wakeup_recv(subreq);
328 TALLOC_FREE(subreq);
329 if (!ok) {
330 tevent_req_error(req, ETIMEDOUT);
331 return;
334 subreq = mds_es_connect_send(
335 state->mds_es_ctx,
336 state->mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
337 state->mds_es_ctx);
338 if (tevent_req_nomem(subreq, req)) {
339 return;
341 tevent_req_set_callback(subreq, mds_es_connected, state->mds_es_ctx);
344 static int mds_es_connect_recv(struct tevent_req *req)
346 return tevent_req_simple_recv_unix(req);
349 static void mds_es_reconnect_on_error(struct sl_es_search *s)
351 struct mds_es_ctx *mds_es_ctx = s->mds_es_ctx;
352 struct tevent_req *subreq = NULL;
354 if (s->slq != NULL) {
355 s->slq->state = SLQ_STATE_ERROR;
358 DBG_WARNING("Reconnecting HTTP...\n");
359 TALLOC_FREE(mds_es_ctx->http_conn);
361 subreq = mds_es_connect_send(
362 mds_es_ctx,
363 mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
364 mds_es_ctx);
365 if (subreq == NULL) {
366 DBG_ERR("mds_es_connect_send failed\n");
367 return;
369 tevent_req_set_callback(subreq, mds_es_connected, mds_es_ctx);
372 static int search_destructor(struct sl_es_search *s)
374 if (s->mds_es_ctx == NULL) {
375 return 0;
377 DLIST_REMOVE(s->mds_es_ctx->searches, s);
378 return 0;
381 static struct tevent_req *mds_es_search_send(TALLOC_CTX *mem_ctx,
382 struct tevent_context *ev,
383 struct sl_es_search *s);
384 static int mds_es_search_recv(struct tevent_req *req);
385 static void mds_es_search_done(struct tevent_req *subreq);
387 static bool mds_es_search(struct sl_query *slq)
389 struct mds_es_ctx *mds_es_ctx = talloc_get_type_abort(
390 slq->mds_ctx->backend_private, struct mds_es_ctx);
391 struct sl_es_search *s = NULL;
392 bool ok;
394 s = talloc_zero(slq, struct sl_es_search);
395 if (s == NULL) {
396 return false;
398 *s = (struct sl_es_search) {
399 .ev = mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
400 .mds_es_ctx = mds_es_ctx,
401 .slq = slq,
402 .size = SL_PAGESIZE,
405 /* 0 would mean no limit */
406 s->max = lp_parm_ulonglong(s->slq->mds_ctx->snum,
407 "elasticsearch",
408 "max results",
409 MAX_SL_RESULTS);
411 DBG_DEBUG("Spotlight query: '%s'\n", slq->query_string);
413 ok = map_spotlight_to_es_query(
415 mds_es_ctx->mdssvc_es_ctx->mappings,
416 slq->path_scope,
417 slq->query_string,
418 &s->es_query);
419 if (!ok) {
420 TALLOC_FREE(s);
421 return false;
423 DBG_DEBUG("Elasticsearch query: '%s'\n", s->es_query);
425 slq->backend_private = s;
426 slq->state = SLQ_STATE_RUNNING;
427 DLIST_ADD_END(mds_es_ctx->searches, s);
428 talloc_set_destructor(s, search_destructor);
430 return mds_es_next_search_trigger(mds_es_ctx);
433 static bool mds_es_next_search_trigger(struct mds_es_ctx *mds_es_ctx)
435 struct tevent_req *subreq = NULL;
436 struct sl_es_search *s = mds_es_ctx->searches;
438 if (mds_es_ctx->http_conn == NULL) {
439 DBG_DEBUG("Waiting for HTTP connection...\n");
440 return true;
442 if (s == NULL) {
443 DBG_DEBUG("No pending searches, idling...\n");
444 return true;
446 if (s->pending) {
447 DBG_DEBUG("Search pending [%p]\n", s);
448 return true;
451 subreq = mds_es_search_send(s, s->ev, s);
452 if (subreq == NULL) {
453 return false;
455 tevent_req_set_callback(subreq, mds_es_search_done, s);
456 mds_es_search_set_pending(s);
457 return true;
460 static void mds_es_search_done(struct tevent_req *subreq)
462 struct sl_es_search *s = tevent_req_callback_data(
463 subreq, struct sl_es_search);
464 struct mds_es_ctx *mds_es_ctx = s->mds_es_ctx;
465 struct sl_query *slq = s->slq;
466 int ret;
467 bool ok;
469 DBG_DEBUG("Search done for search [%p]\n", s);
471 mds_es_search_unset_pending(s);
473 if (mds_es_ctx == NULL) {
475 * Search connection closed by the user while s was pending.
477 TALLOC_FREE(s);
478 return;
481 DLIST_REMOVE(mds_es_ctx->searches, s);
483 ret = mds_es_search_recv(subreq);
484 TALLOC_FREE(subreq);
485 if (ret != 0) {
486 mds_es_reconnect_on_error(s);
487 return;
490 if (slq == NULL) {
492 * Closed by the user. Explicitly free "s" here because the
493 * talloc parent slq is already gone.
495 TALLOC_FREE(s);
496 goto trigger;
499 SLQ_DEBUG(10, slq, "search done");
501 if (s->total == 0 || s->from >= s->max) {
502 slq->state = SLQ_STATE_DONE;
503 goto trigger;
506 if (slq->query_results->num_results >= SL_PAGESIZE) {
507 slq->state = SLQ_STATE_FULL;
508 goto trigger;
512 * Reschedule this query as there are more results waiting in the
513 * Elasticsearch server and the client result queue has room as
514 * well. But put it at the end of the list of active queries as a simple
515 * heuristic that should ensure all client queries are dispatched to the
516 * server.
518 DLIST_ADD_END(mds_es_ctx->searches, s);
520 trigger:
521 ok = mds_es_next_search_trigger(mds_es_ctx);
522 if (!ok) {
523 DBG_ERR("mds_es_next_search_trigger failed\n");
527 static void mds_es_search_http_send_done(struct tevent_req *subreq);
528 static void mds_es_search_http_read_done(struct tevent_req *subreq);
530 struct mds_es_search_state {
531 struct tevent_context *ev;
532 struct sl_es_search *s;
533 struct tevent_queue_entry *qe;
534 struct http_request http_request;
535 struct http_request *http_response;
538 static int mds_es_search_pending_destructor(struct sl_es_search *s)
541 * s is a child of slq which may get freed when a user closes a
542 * query. To maintain the HTTP request/response sequence on the HTTP
543 * channel, we keep processing pending requests and free s when we
544 * receive the HTTP response for pending requests.
546 DBG_DEBUG("Preserving pending search [%p]\n", s);
547 s->slq = NULL;
548 return -1;
551 static void mds_es_search_set_pending(struct sl_es_search *s)
553 DBG_DEBUG("Set pending [%p]\n", s);
554 SLQ_DEBUG(10, s->slq, "pending");
556 s->pending = true;
557 talloc_set_destructor(s, mds_es_search_pending_destructor);
560 static void mds_es_search_unset_pending(struct sl_es_search *s)
562 DBG_DEBUG("Unset pending [%p]\n", s);
563 if (s->slq != NULL) {
564 SLQ_DEBUG(10, s->slq, "unset pending");
567 s->pending = false;
568 talloc_set_destructor(s, search_destructor);
571 static struct tevent_req *mds_es_search_send(TALLOC_CTX *mem_ctx,
572 struct tevent_context *ev,
573 struct sl_es_search *s)
575 struct tevent_req *req = NULL;
576 struct tevent_req *subreq = NULL;
577 struct mds_es_search_state *state = NULL;
578 const char *index = NULL;
579 char *elastic_query = NULL;
580 char *uri = NULL;
581 size_t elastic_query_len;
582 char *elastic_query_len_str = NULL;
583 char *hostname = NULL;
584 bool pretty = false;
586 req = tevent_req_create(mem_ctx, &state, struct mds_es_search_state);
587 if (req == NULL) {
588 return NULL;
590 *state = (struct mds_es_search_state) {
591 .ev = ev,
592 .s = s,
595 if (!tevent_req_set_endtime(req, ev, timeval_current_ofs(60, 0))) {
596 return tevent_req_post(req, s->ev);
599 index = lp_parm_const_string(s->slq->mds_ctx->snum,
600 "elasticsearch",
601 "index",
602 "_all");
603 if (tevent_req_nomem(index, req)) {
604 return tevent_req_post(req, ev);
607 if (DEBUGLVL(10)) {
608 pretty = true;
611 uri = talloc_asprintf(state,
612 "/%s/_search%s",
613 index,
614 pretty ? "?pretty" : "");
615 if (tevent_req_nomem(uri, req)) {
616 return tevent_req_post(req, ev);
619 elastic_query = talloc_asprintf(state,
620 MDSSVC_ELASTIC_QUERY_TEMPLATE,
621 s->from,
622 s->size,
623 MDSSVC_ELASTIC_SOURCES,
624 s->es_query);
625 if (tevent_req_nomem(elastic_query, req)) {
626 return tevent_req_post(req, ev);
628 DBG_DEBUG("Elastic query: '%s'\n", elastic_query);
630 elastic_query_len = strlen(elastic_query);
632 state->http_request = (struct http_request) {
633 .type = HTTP_REQ_POST,
634 .uri = uri,
635 .body = data_blob_const(elastic_query, elastic_query_len),
636 .major = '1',
637 .minor = '1',
640 elastic_query_len_str = talloc_asprintf(state, "%zu", elastic_query_len);
641 if (tevent_req_nomem(elastic_query_len_str, req)) {
642 return tevent_req_post(req, ev);
645 hostname = get_myname(state);
646 if (tevent_req_nomem(hostname, req)) {
647 return tevent_req_post(req, ev);
650 http_add_header(state, &state->http_request.headers,
651 "Content-Type", "application/json");
652 http_add_header(state, &state->http_request.headers,
653 "Accept", "application/json");
654 http_add_header(state, &state->http_request.headers,
655 "User-Agent", "Samba/mdssvc");
656 http_add_header(state, &state->http_request.headers,
657 "Host", hostname);
658 http_add_header(state, &state->http_request.headers,
659 "Content-Length", elastic_query_len_str);
661 subreq = http_send_request_send(state,
663 s->mds_es_ctx->http_conn,
664 &state->http_request);
665 if (tevent_req_nomem(subreq, req)) {
666 return tevent_req_post(req, ev);
668 tevent_req_set_callback(subreq, mds_es_search_http_send_done, req);
669 return req;
672 static void mds_es_search_http_send_done(struct tevent_req *subreq)
674 struct tevent_req *req = tevent_req_callback_data(
675 subreq, struct tevent_req);
676 struct mds_es_search_state *state = tevent_req_data(
677 req, struct mds_es_search_state);
678 NTSTATUS status;
680 DBG_DEBUG("Sent out search [%p]\n", state->s);
682 status = http_send_request_recv(subreq);
683 TALLOC_FREE(subreq);
684 if (!NT_STATUS_IS_OK(status)) {
685 tevent_req_error(req, map_errno_from_nt_status(status));
686 return;
689 if (state->s->mds_es_ctx == NULL || state->s->slq == NULL) {
690 tevent_req_done(req);
691 return;
694 subreq = http_read_response_send(state,
695 state->ev,
696 state->s->mds_es_ctx->http_conn,
697 SL_PAGESIZE * 8192);
698 if (tevent_req_nomem(subreq, req)) {
699 return;
701 tevent_req_set_callback(subreq, mds_es_search_http_read_done, req);
704 static void mds_es_search_http_read_done(struct tevent_req *subreq)
706 struct tevent_req *req = tevent_req_callback_data(
707 subreq, struct tevent_req);
708 struct mds_es_search_state *state = tevent_req_data(
709 req, struct mds_es_search_state);
710 struct sl_es_search *s = state->s;
711 struct sl_query *slq = s->slq;
712 json_t *root = NULL;
713 json_t *matches = NULL;
714 json_t *match = NULL;
715 size_t i;
716 json_error_t error;
717 size_t hits;
718 NTSTATUS status;
719 int ret;
720 bool ok;
722 DBG_DEBUG("Got response for search [%p]\n", s);
724 status = http_read_response_recv(subreq, state, &state->http_response);
725 TALLOC_FREE(subreq);
726 if (!NT_STATUS_IS_OK(status)) {
727 DBG_DEBUG("HTTP response failed: %s\n", nt_errstr(status));
728 tevent_req_error(req, map_errno_from_nt_status(status));
729 return;
732 if (slq == NULL || s->mds_es_ctx == NULL) {
733 tevent_req_done(req);
734 return;
737 switch (state->http_response->response_code) {
738 case 200:
739 break;
740 default:
741 DBG_ERR("HTTP server response: %u\n",
742 state->http_response->response_code);
743 goto fail;
746 DBG_DEBUG("JSON response:\n%s\n",
747 talloc_strndup(talloc_tos(),
748 (char *)state->http_response->body.data,
749 state->http_response->body.length));
751 root = json_loadb((char *)state->http_response->body.data,
752 state->http_response->body.length,
754 &error);
755 if (root == NULL) {
756 DBG_ERR("json_loadb failed\n");
757 goto fail;
760 if (s->total == 0) {
762 * Get the total number of results the first time, format
763 * used by Elasticsearch 7.0 or newer
765 ret = json_unpack(root, "{s: {s: {s: i}}}",
766 "hits", "total", "value", &s->total);
767 if (ret != 0) {
768 /* Format used before 7.0 */
769 ret = json_unpack(root, "{s: {s: i}}",
770 "hits", "total", &s->total);
771 if (ret != 0) {
772 DBG_ERR("json_unpack failed\n");
773 goto fail;
777 DBG_DEBUG("Total: %zu\n", s->total);
779 if (s->total == 0) {
780 json_decref(root);
781 tevent_req_done(req);
782 return;
786 if (s->max == 0 || s->max > s->total) {
787 s->max = s->total;
790 ret = json_unpack(root, "{s: {s:o}}",
791 "hits", "hits", &matches);
792 if (ret != 0 || matches == NULL) {
793 DBG_ERR("json_unpack hits failed\n");
794 goto fail;
797 hits = json_array_size(matches);
798 if (hits == 0) {
799 DBG_ERR("Hu?! No results?\n");
800 goto fail;
802 DBG_DEBUG("Hits: %zu\n", hits);
804 for (i = 0; i < hits && s->from + i < s->max; i++) {
805 const char *path = NULL;
807 match = json_array_get(matches, i);
808 if (match == NULL) {
809 DBG_ERR("Hu?! No value for index %zu\n", i);
810 goto fail;
812 ret = json_unpack(match,
813 "{s: {s: {s: s}}}",
814 "_source",
815 "path",
816 "real",
817 &path);
818 if (ret != 0) {
819 DBG_ERR("Missing path.real in JSON result\n");
820 goto fail;
823 ok = mds_add_result(slq, path);
824 if (!ok) {
825 DBG_ERR("error adding result for path: %s\n", path);
826 goto fail;
829 json_decref(root);
831 s->from += hits;
832 slq->state = SLQ_STATE_RESULTS;
833 tevent_req_done(req);
834 return;
836 fail:
837 if (root != NULL) {
838 json_decref(root);
840 slq->state = SLQ_STATE_ERROR;
841 tevent_req_error(req, EINVAL);
842 return;
845 static int mds_es_search_recv(struct tevent_req *req)
847 return tevent_req_simple_recv_unix(req);
850 static bool mds_es_search_cont(struct sl_query *slq)
852 struct sl_es_search *s = talloc_get_type_abort(
853 slq->backend_private, struct sl_es_search);
855 SLQ_DEBUG(10, slq, "continue");
856 DLIST_ADD_END(s->mds_es_ctx->searches, s);
857 return mds_es_next_search_trigger(s->mds_es_ctx);
860 struct mdssvc_backend mdsscv_backend_es = {
861 .init = mdssvc_es_init,
862 .shutdown = mdssvc_es_shutdown,
863 .connect = mds_es_connect,
864 .search_start = mds_es_search,
865 .search_cont = mds_es_search_cont,