2 Unix SMB/CIFS implementation.
3 Main metadata server / Spotlight routines / ES backend
5 Copyright (C) Ralph Boehme 2019
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #include "system/filesys.h"
23 #include "lib/util/time_basic.h"
24 #include "lib/tls/tls.h"
25 #include "lib/util/tevent_ntstatus.h"
26 #include "libcli/http/http.h"
27 #include "lib/util/tevent_unix.h"
28 #include "credentials.h"
30 #include "mdssvc_es.h"
31 #include "rpc_server/mdssvc/es_parser.tab.h"
32 #include "lib/param/param.h"
37 #define DBGC_CLASS DBGC_RPC_SRV
39 #define MDSSVC_ELASTIC_QUERY_TEMPLATE \
43 " \"_source\": [%s]," \
45 " \"query_string\": {" \
46 " \"query\": \"%s\"" \
51 #define MDSSVC_ELASTIC_SOURCES \
54 static bool mdssvc_es_init(struct mdssvc_ctx
*mdssvc_ctx
)
56 struct mdssvc_es_ctx
*mdssvc_es_ctx
= NULL
;
57 json_error_t json_error
;
58 char *default_path
= NULL
;
59 const char *path
= NULL
;
61 mdssvc_es_ctx
= talloc_zero(mdssvc_ctx
, struct mdssvc_es_ctx
);
62 if (mdssvc_es_ctx
== NULL
) {
65 mdssvc_es_ctx
->mdssvc_ctx
= mdssvc_ctx
;
67 mdssvc_es_ctx
->creds
= cli_credentials_init_anon(mdssvc_es_ctx
);
68 if (mdssvc_es_ctx
->creds
== NULL
) {
69 TALLOC_FREE(mdssvc_es_ctx
);
73 default_path
= talloc_asprintf(
75 "%s/mdssvc/elasticsearch_mappings.json",
76 get_dyn_SAMBA_DATADIR());
77 if (default_path
== NULL
) {
78 TALLOC_FREE(mdssvc_es_ctx
);
82 path
= lp_parm_const_string(GLOBAL_SECTION_SNUM
,
87 TALLOC_FREE(mdssvc_es_ctx
);
91 mdssvc_es_ctx
->mappings
= json_load_file(path
, 0, &json_error
);
92 if (mdssvc_es_ctx
->mappings
== NULL
) {
93 DBG_ERR("Opening mapping file [%s] failed: %s\n",
94 path
, json_error
.text
);
95 TALLOC_FREE(mdssvc_es_ctx
);
98 TALLOC_FREE(default_path
);
100 mdssvc_ctx
->backend_private
= mdssvc_es_ctx
;
104 static bool mdssvc_es_shutdown(struct mdssvc_ctx
*mdssvc_ctx
)
109 static struct tevent_req
*mds_es_connect_send(
111 struct tevent_context
*ev
,
112 struct mds_es_ctx
*mds_es_ctx
);
113 static int mds_es_connect_recv(struct tevent_req
*req
);
114 static void mds_es_connected(struct tevent_req
*subreq
);
115 static bool mds_es_next_search_trigger(struct mds_es_ctx
*mds_es_ctx
);
116 static void mds_es_search_set_pending(struct sl_es_search
*s
);
117 static void mds_es_search_unset_pending(struct sl_es_search
*s
);
119 static int mds_es_ctx_destructor(struct mds_es_ctx
*mds_es_ctx
)
121 struct sl_es_search
*s
= mds_es_ctx
->searches
;
124 * The per tree-connect state mds_es_ctx (a child of mds_ctx) is about
125 * to go away and has already freed all waiting searches. If there's a
126 * search remaining that's when the search is already active. Reset the
127 * mds_es_ctx pointer, so we can detect this when the search completes.
134 s
->mds_es_ctx
= NULL
;
139 static bool mds_es_connect(struct mds_ctx
*mds_ctx
)
141 struct mdssvc_es_ctx
*mdssvc_es_ctx
= talloc_get_type_abort(
142 mds_ctx
->mdssvc_ctx
->backend_private
, struct mdssvc_es_ctx
);
143 struct mds_es_ctx
*mds_es_ctx
= NULL
;
144 struct tevent_req
*subreq
= NULL
;
146 mds_es_ctx
= talloc_zero(mds_ctx
, struct mds_es_ctx
);
147 if (mds_es_ctx
== NULL
) {
150 *mds_es_ctx
= (struct mds_es_ctx
) {
151 .mdssvc_es_ctx
= mdssvc_es_ctx
,
155 mds_ctx
->backend_private
= mds_es_ctx
;
156 talloc_set_destructor(mds_es_ctx
, mds_es_ctx_destructor
);
158 subreq
= mds_es_connect_send(
160 mdssvc_es_ctx
->mdssvc_ctx
->ev_ctx
,
162 if (subreq
== NULL
) {
163 TALLOC_FREE(mds_es_ctx
);
166 tevent_req_set_callback(subreq
, mds_es_connected
, mds_es_ctx
);
170 static void mds_es_connected(struct tevent_req
*subreq
)
172 struct mds_es_ctx
*mds_es_ctx
= tevent_req_callback_data(
173 subreq
, struct mds_es_ctx
);
177 ret
= mds_es_connect_recv(subreq
);
180 DBG_ERR("HTTP connect failed\n");
184 ok
= mds_es_next_search_trigger(mds_es_ctx
);
186 DBG_ERR("mds_es_next_search_trigger failed\n");
191 struct mds_es_connect_state
{
192 struct tevent_context
*ev
;
193 struct mds_es_ctx
*mds_es_ctx
;
194 struct tevent_queue_entry
*qe
;
195 const char *server_addr
;
196 uint16_t server_port
;
197 struct tstream_tls_params
*tls_params
;
200 static void mds_es_http_connect_done(struct tevent_req
*subreq
);
201 static void mds_es_http_waited(struct tevent_req
*subreq
);
203 static struct tevent_req
*mds_es_connect_send(
205 struct tevent_context
*ev
,
206 struct mds_es_ctx
*mds_es_ctx
)
208 struct tevent_req
*req
= NULL
;
209 struct tevent_req
*subreq
= NULL
;
210 struct mds_es_connect_state
*state
= NULL
;
211 const char *server_addr
= NULL
;
215 req
= tevent_req_create(mem_ctx
, &state
, struct mds_es_connect_state
);
219 *state
= (struct mds_es_connect_state
) {
221 .mds_es_ctx
= mds_es_ctx
,
224 server_addr
= lp_parm_const_string(
225 mds_es_ctx
->mds_ctx
->snum
,
229 state
->server_addr
= talloc_strdup(state
, server_addr
);
230 if (tevent_req_nomem(state
->server_addr
, req
)) {
231 return tevent_req_post(req
, ev
);
234 state
->server_port
= lp_parm_int(
235 mds_es_ctx
->mds_ctx
->snum
,
240 use_tls
= lp_parm_bool(
241 mds_es_ctx
->mds_ctx
->snum
,
246 DBG_DEBUG("Connecting to HTTP%s [%s] port [%"PRIu16
"]\n",
247 use_tls
? "S" : "", state
->server_addr
, state
->server_port
);
250 struct loadparm_context
*lp_ctx
= NULL
;
252 lp_ctx
= loadparm_init_s3(state
, loadparm_s3_helpers());
253 if (tevent_req_nomem(lp_ctx
, req
)) {
254 return tevent_req_post(req
, ev
);
257 status
= tstream_tls_params_client_lpcfg(state
,
262 if (!NT_STATUS_IS_OK(status
)) {
263 DBG_ERR("Failed tstream_tls_params_client - %s\n",
265 tevent_req_nterror(req
, status
);
266 return tevent_req_post(req
, ev
);
270 subreq
= http_connect_send(state
,
274 mds_es_ctx
->mdssvc_es_ctx
->creds
,
276 if (tevent_req_nomem(subreq
, req
)) {
277 return tevent_req_post(req
, ev
);
279 tevent_req_set_callback(subreq
, mds_es_http_connect_done
, req
);
283 static void mds_es_http_connect_done(struct tevent_req
*subreq
)
285 struct tevent_req
*req
= tevent_req_callback_data(
286 subreq
, struct tevent_req
);
287 struct mds_es_connect_state
*state
= tevent_req_data(
288 req
, struct mds_es_connect_state
);
291 error
= http_connect_recv(subreq
,
293 &state
->mds_es_ctx
->http_conn
);
296 DBG_ERR("HTTP connect failed, retrying...\n");
298 subreq
= tevent_wakeup_send(
300 state
->mds_es_ctx
->mdssvc_es_ctx
->mdssvc_ctx
->ev_ctx
,
301 tevent_timeval_current_ofs(10, 0));
302 if (tevent_req_nomem(subreq
, req
)) {
305 tevent_req_set_callback(subreq
,
311 DBG_DEBUG("Connected to HTTP%s [%s] port [%"PRIu16
"]\n",
312 state
->tls_params
? "S" : "",
313 state
->server_addr
, state
->server_port
);
315 tevent_req_done(req
);
319 static void mds_es_http_waited(struct tevent_req
*subreq
)
321 struct tevent_req
*req
= tevent_req_callback_data(
322 subreq
, struct tevent_req
);
323 struct mds_es_connect_state
*state
= tevent_req_data(
324 req
, struct mds_es_connect_state
);
327 ok
= tevent_wakeup_recv(subreq
);
330 tevent_req_error(req
, ETIMEDOUT
);
334 subreq
= mds_es_connect_send(
336 state
->mds_es_ctx
->mdssvc_es_ctx
->mdssvc_ctx
->ev_ctx
,
338 if (tevent_req_nomem(subreq
, req
)) {
341 tevent_req_set_callback(subreq
, mds_es_connected
, state
->mds_es_ctx
);
344 static int mds_es_connect_recv(struct tevent_req
*req
)
346 return tevent_req_simple_recv_unix(req
);
349 static void mds_es_reconnect_on_error(struct sl_es_search
*s
)
351 struct mds_es_ctx
*mds_es_ctx
= s
->mds_es_ctx
;
352 struct tevent_req
*subreq
= NULL
;
354 if (s
->slq
!= NULL
) {
355 s
->slq
->state
= SLQ_STATE_ERROR
;
358 DBG_WARNING("Reconnecting HTTP...\n");
359 TALLOC_FREE(mds_es_ctx
->http_conn
);
361 subreq
= mds_es_connect_send(
363 mds_es_ctx
->mdssvc_es_ctx
->mdssvc_ctx
->ev_ctx
,
365 if (subreq
== NULL
) {
366 DBG_ERR("mds_es_connect_send failed\n");
369 tevent_req_set_callback(subreq
, mds_es_connected
, mds_es_ctx
);
372 static int search_destructor(struct sl_es_search
*s
)
374 if (s
->mds_es_ctx
== NULL
) {
377 DLIST_REMOVE(s
->mds_es_ctx
->searches
, s
);
381 static struct tevent_req
*mds_es_search_send(TALLOC_CTX
*mem_ctx
,
382 struct tevent_context
*ev
,
383 struct sl_es_search
*s
);
384 static int mds_es_search_recv(struct tevent_req
*req
);
385 static void mds_es_search_done(struct tevent_req
*subreq
);
387 static bool mds_es_search(struct sl_query
*slq
)
389 struct mds_es_ctx
*mds_es_ctx
= talloc_get_type_abort(
390 slq
->mds_ctx
->backend_private
, struct mds_es_ctx
);
391 struct sl_es_search
*s
= NULL
;
394 s
= talloc_zero(slq
, struct sl_es_search
);
398 *s
= (struct sl_es_search
) {
399 .ev
= mds_es_ctx
->mdssvc_es_ctx
->mdssvc_ctx
->ev_ctx
,
400 .mds_es_ctx
= mds_es_ctx
,
405 /* 0 would mean no limit */
406 s
->max
= lp_parm_ulonglong(s
->slq
->mds_ctx
->snum
,
411 DBG_DEBUG("Spotlight query: '%s'\n", slq
->query_string
);
413 ok
= map_spotlight_to_es_query(
415 mds_es_ctx
->mdssvc_es_ctx
->mappings
,
423 DBG_DEBUG("Elasticsearch query: '%s'\n", s
->es_query
);
425 slq
->backend_private
= s
;
426 slq
->state
= SLQ_STATE_RUNNING
;
427 DLIST_ADD_END(mds_es_ctx
->searches
, s
);
428 talloc_set_destructor(s
, search_destructor
);
430 return mds_es_next_search_trigger(mds_es_ctx
);
433 static bool mds_es_next_search_trigger(struct mds_es_ctx
*mds_es_ctx
)
435 struct tevent_req
*subreq
= NULL
;
436 struct sl_es_search
*s
= mds_es_ctx
->searches
;
438 if (mds_es_ctx
->http_conn
== NULL
) {
439 DBG_DEBUG("Waiting for HTTP connection...\n");
443 DBG_DEBUG("No pending searches, idling...\n");
447 DBG_DEBUG("Search pending [%p]\n", s
);
451 subreq
= mds_es_search_send(s
, s
->ev
, s
);
452 if (subreq
== NULL
) {
455 tevent_req_set_callback(subreq
, mds_es_search_done
, s
);
456 mds_es_search_set_pending(s
);
460 static void mds_es_search_done(struct tevent_req
*subreq
)
462 struct sl_es_search
*s
= tevent_req_callback_data(
463 subreq
, struct sl_es_search
);
464 struct mds_es_ctx
*mds_es_ctx
= s
->mds_es_ctx
;
465 struct sl_query
*slq
= s
->slq
;
469 DBG_DEBUG("Search done for search [%p]\n", s
);
471 mds_es_search_unset_pending(s
);
473 if (mds_es_ctx
== NULL
) {
475 * Search connection closed by the user while s was pending.
481 DLIST_REMOVE(mds_es_ctx
->searches
, s
);
483 ret
= mds_es_search_recv(subreq
);
486 mds_es_reconnect_on_error(s
);
492 * Closed by the user. Explicitly free "s" here because the
493 * talloc parent slq is already gone.
499 SLQ_DEBUG(10, slq
, "search done");
501 if (s
->total
== 0 || s
->from
>= s
->max
) {
502 slq
->state
= SLQ_STATE_DONE
;
506 if (slq
->query_results
->num_results
>= SL_PAGESIZE
) {
507 slq
->state
= SLQ_STATE_FULL
;
512 * Reschedule this query as there are more results waiting in the
513 * Elasticsearch server and the client result queue has room as
514 * well. But put it at the end of the list of active queries as a simple
515 * heuristic that should ensure all client queries are dispatched to the
518 DLIST_ADD_END(mds_es_ctx
->searches
, s
);
521 ok
= mds_es_next_search_trigger(mds_es_ctx
);
523 DBG_ERR("mds_es_next_search_trigger failed\n");
527 static void mds_es_search_http_send_done(struct tevent_req
*subreq
);
528 static void mds_es_search_http_read_done(struct tevent_req
*subreq
);
530 struct mds_es_search_state
{
531 struct tevent_context
*ev
;
532 struct sl_es_search
*s
;
533 struct tevent_queue_entry
*qe
;
534 struct http_request http_request
;
535 struct http_request
*http_response
;
538 static int mds_es_search_pending_destructor(struct sl_es_search
*s
)
541 * s is a child of slq which may get freed when a user closes a
542 * query. To maintain the HTTP request/response sequence on the HTTP
543 * channel, we keep processing pending requests and free s when we
544 * receive the HTTP response for pending requests.
546 DBG_DEBUG("Preserving pending search [%p]\n", s
);
551 static void mds_es_search_set_pending(struct sl_es_search
*s
)
553 DBG_DEBUG("Set pending [%p]\n", s
);
554 SLQ_DEBUG(10, s
->slq
, "pending");
557 talloc_set_destructor(s
, mds_es_search_pending_destructor
);
560 static void mds_es_search_unset_pending(struct sl_es_search
*s
)
562 DBG_DEBUG("Unset pending [%p]\n", s
);
563 if (s
->slq
!= NULL
) {
564 SLQ_DEBUG(10, s
->slq
, "unset pending");
568 talloc_set_destructor(s
, search_destructor
);
571 static struct tevent_req
*mds_es_search_send(TALLOC_CTX
*mem_ctx
,
572 struct tevent_context
*ev
,
573 struct sl_es_search
*s
)
575 struct tevent_req
*req
= NULL
;
576 struct tevent_req
*subreq
= NULL
;
577 struct mds_es_search_state
*state
= NULL
;
578 const char *index
= NULL
;
579 char *elastic_query
= NULL
;
581 size_t elastic_query_len
;
582 char *elastic_query_len_str
= NULL
;
583 char *hostname
= NULL
;
586 req
= tevent_req_create(mem_ctx
, &state
, struct mds_es_search_state
);
590 *state
= (struct mds_es_search_state
) {
595 if (!tevent_req_set_endtime(req
, ev
, timeval_current_ofs(60, 0))) {
596 return tevent_req_post(req
, s
->ev
);
599 index
= lp_parm_const_string(s
->slq
->mds_ctx
->snum
,
603 if (tevent_req_nomem(index
, req
)) {
604 return tevent_req_post(req
, ev
);
611 uri
= talloc_asprintf(state
,
614 pretty
? "?pretty" : "");
615 if (tevent_req_nomem(uri
, req
)) {
616 return tevent_req_post(req
, ev
);
619 elastic_query
= talloc_asprintf(state
,
620 MDSSVC_ELASTIC_QUERY_TEMPLATE
,
623 MDSSVC_ELASTIC_SOURCES
,
625 if (tevent_req_nomem(elastic_query
, req
)) {
626 return tevent_req_post(req
, ev
);
628 DBG_DEBUG("Elastic query: '%s'\n", elastic_query
);
630 elastic_query_len
= strlen(elastic_query
);
632 state
->http_request
= (struct http_request
) {
633 .type
= HTTP_REQ_POST
,
635 .body
= data_blob_const(elastic_query
, elastic_query_len
),
640 elastic_query_len_str
= talloc_asprintf(state
, "%zu", elastic_query_len
);
641 if (tevent_req_nomem(elastic_query_len_str
, req
)) {
642 return tevent_req_post(req
, ev
);
645 hostname
= get_myname(state
);
646 if (tevent_req_nomem(hostname
, req
)) {
647 return tevent_req_post(req
, ev
);
650 http_add_header(state
, &state
->http_request
.headers
,
651 "Content-Type", "application/json");
652 http_add_header(state
, &state
->http_request
.headers
,
653 "Accept", "application/json");
654 http_add_header(state
, &state
->http_request
.headers
,
655 "User-Agent", "Samba/mdssvc");
656 http_add_header(state
, &state
->http_request
.headers
,
658 http_add_header(state
, &state
->http_request
.headers
,
659 "Content-Length", elastic_query_len_str
);
661 subreq
= http_send_request_send(state
,
663 s
->mds_es_ctx
->http_conn
,
664 &state
->http_request
);
665 if (tevent_req_nomem(subreq
, req
)) {
666 return tevent_req_post(req
, ev
);
668 tevent_req_set_callback(subreq
, mds_es_search_http_send_done
, req
);
672 static void mds_es_search_http_send_done(struct tevent_req
*subreq
)
674 struct tevent_req
*req
= tevent_req_callback_data(
675 subreq
, struct tevent_req
);
676 struct mds_es_search_state
*state
= tevent_req_data(
677 req
, struct mds_es_search_state
);
680 DBG_DEBUG("Sent out search [%p]\n", state
->s
);
682 status
= http_send_request_recv(subreq
);
684 if (!NT_STATUS_IS_OK(status
)) {
685 tevent_req_error(req
, map_errno_from_nt_status(status
));
689 if (state
->s
->mds_es_ctx
== NULL
|| state
->s
->slq
== NULL
) {
690 tevent_req_done(req
);
694 subreq
= http_read_response_send(state
,
696 state
->s
->mds_es_ctx
->http_conn
,
698 if (tevent_req_nomem(subreq
, req
)) {
701 tevent_req_set_callback(subreq
, mds_es_search_http_read_done
, req
);
704 static void mds_es_search_http_read_done(struct tevent_req
*subreq
)
706 struct tevent_req
*req
= tevent_req_callback_data(
707 subreq
, struct tevent_req
);
708 struct mds_es_search_state
*state
= tevent_req_data(
709 req
, struct mds_es_search_state
);
710 struct sl_es_search
*s
= state
->s
;
711 struct sl_query
*slq
= s
->slq
;
713 json_t
*matches
= NULL
;
714 json_t
*match
= NULL
;
722 DBG_DEBUG("Got response for search [%p]\n", s
);
724 status
= http_read_response_recv(subreq
, state
, &state
->http_response
);
726 if (!NT_STATUS_IS_OK(status
)) {
727 DBG_DEBUG("HTTP response failed: %s\n", nt_errstr(status
));
728 tevent_req_error(req
, map_errno_from_nt_status(status
));
732 if (slq
== NULL
|| s
->mds_es_ctx
== NULL
) {
733 tevent_req_done(req
);
737 switch (state
->http_response
->response_code
) {
741 DBG_ERR("HTTP server response: %u\n",
742 state
->http_response
->response_code
);
746 DBG_DEBUG("JSON response:\n%s\n",
747 talloc_strndup(talloc_tos(),
748 (char *)state
->http_response
->body
.data
,
749 state
->http_response
->body
.length
));
751 root
= json_loadb((char *)state
->http_response
->body
.data
,
752 state
->http_response
->body
.length
,
756 DBG_ERR("json_loadb failed\n");
762 * Get the total number of results the first time, format
763 * used by Elasticsearch 7.0 or newer
765 ret
= json_unpack(root
, "{s: {s: {s: i}}}",
766 "hits", "total", "value", &s
->total
);
768 /* Format used before 7.0 */
769 ret
= json_unpack(root
, "{s: {s: i}}",
770 "hits", "total", &s
->total
);
772 DBG_ERR("json_unpack failed\n");
777 DBG_DEBUG("Total: %zu\n", s
->total
);
781 tevent_req_done(req
);
786 if (s
->max
== 0 || s
->max
> s
->total
) {
790 ret
= json_unpack(root
, "{s: {s:o}}",
791 "hits", "hits", &matches
);
792 if (ret
!= 0 || matches
== NULL
) {
793 DBG_ERR("json_unpack hits failed\n");
797 hits
= json_array_size(matches
);
799 DBG_ERR("Hu?! No results?\n");
802 DBG_DEBUG("Hits: %zu\n", hits
);
804 for (i
= 0; i
< hits
&& s
->from
+ i
< s
->max
; i
++) {
805 const char *path
= NULL
;
807 match
= json_array_get(matches
, i
);
809 DBG_ERR("Hu?! No value for index %zu\n", i
);
812 ret
= json_unpack(match
,
819 DBG_ERR("Missing path.real in JSON result\n");
823 ok
= mds_add_result(slq
, path
);
825 DBG_ERR("error adding result for path: %s\n", path
);
832 slq
->state
= SLQ_STATE_RESULTS
;
833 tevent_req_done(req
);
840 slq
->state
= SLQ_STATE_ERROR
;
841 tevent_req_error(req
, EINVAL
);
845 static int mds_es_search_recv(struct tevent_req
*req
)
847 return tevent_req_simple_recv_unix(req
);
850 static bool mds_es_search_cont(struct sl_query
*slq
)
852 struct sl_es_search
*s
= talloc_get_type_abort(
853 slq
->backend_private
, struct sl_es_search
);
855 SLQ_DEBUG(10, slq
, "continue");
856 DLIST_ADD_END(s
->mds_es_ctx
->searches
, s
);
857 return mds_es_next_search_trigger(s
->mds_es_ctx
);
860 struct mdssvc_backend mdsscv_backend_es
= {
861 .init
= mdssvc_es_init
,
862 .shutdown
= mdssvc_es_shutdown
,
863 .connect
= mds_es_connect
,
864 .search_start
= mds_es_search
,
865 .search_cont
= mds_es_search_cont
,