util:datablob: data_blob_pad checks its alignment assumption
[samba.git] / source3 / rpc_client / rpc_transport_tstream.c
blob02fc320570379e5b050ca5cf1ed091c13f460d97
1 /*
2 * Unix SMB/CIFS implementation.
3 * RPC client transport over tstream
4 * Copyright (C) Simo Sorce 2010
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "../lib/util/tevent_ntstatus.h"
22 #include "rpc_client/rpc_transport.h"
23 #include "lib/tsocket/tsocket.h"
24 #include "libcli/smb/tstream_smbXcli_np.h"
25 #include "cli_pipe.h"
27 #undef DBGC_CLASS
28 #define DBGC_CLASS DBGC_RPC_CLI
30 struct rpc_tstream_state {
31 struct tstream_context *stream;
32 struct tevent_queue *read_queue;
33 struct tevent_queue *write_queue;
34 unsigned int timeout;
37 static void rpc_tstream_disconnect(struct rpc_tstream_state *s)
39 TALLOC_FREE(s->stream);
42 static bool rpc_tstream_is_connected(void *priv)
44 struct rpc_tstream_state *transp =
45 talloc_get_type_abort(priv, struct rpc_tstream_state);
46 ssize_t ret;
48 if (!transp->stream) {
49 return false;
52 if (!tstream_is_smbXcli_np(transp->stream)) {
53 return true;
56 ret = tstream_pending_bytes(transp->stream);
57 if (ret == -1) {
58 return false;
61 return true;
64 static unsigned int rpc_tstream_set_timeout(void *priv, unsigned int timeout)
66 struct rpc_tstream_state *transp =
67 talloc_get_type_abort(priv, struct rpc_tstream_state);
68 int orig_timeout;
69 bool ok;
71 ok = rpc_tstream_is_connected(transp);
72 if (!ok) {
73 return 0;
76 if (tstream_is_smbXcli_np(transp->stream)) {
77 transp->timeout = timeout;
78 return tstream_smbXcli_np_set_timeout(transp->stream, timeout);
81 orig_timeout = transp->timeout;
83 transp->timeout = timeout;
85 return orig_timeout;
88 struct rpc_tstream_next_vector_state {
89 uint8_t *buf;
90 size_t len;
91 off_t ofs;
94 static void rpc_tstream_next_vector_init(
95 struct rpc_tstream_next_vector_state *s,
96 uint8_t *buf, size_t len)
98 *s = (struct rpc_tstream_next_vector_state) {
99 .buf = buf, .len = MIN(len, UINT16_MAX),
103 static int rpc_tstream_next_vector(struct tstream_context *stream,
104 void *private_data,
105 TALLOC_CTX *mem_ctx,
106 struct iovec **_vector,
107 size_t *count)
109 struct rpc_tstream_next_vector_state *state =
110 (struct rpc_tstream_next_vector_state *)private_data;
111 struct iovec *vector;
113 if (state->ofs == state->len) {
114 *_vector = NULL;
115 *count = 0;
116 return 0;
119 vector = talloc_array(mem_ctx, struct iovec, 1);
120 if (!vector) {
121 return -1;
124 vector[0].iov_base = state->buf;
125 vector[0].iov_len = state->len;
127 state->ofs = state->len;
129 *_vector = vector;
130 *count = 1;
131 return 0;
134 struct rpc_tstream_read_state {
135 struct rpc_tstream_state *transp;
136 struct rpc_tstream_next_vector_state next_vector;
137 ssize_t nread;
140 static void rpc_tstream_read_done(struct tevent_req *subreq);
142 static struct tevent_req *rpc_tstream_read_send(TALLOC_CTX *mem_ctx,
143 struct tevent_context *ev,
144 uint8_t *data, size_t size,
145 void *priv)
147 struct rpc_tstream_state *transp =
148 talloc_get_type_abort(priv, struct rpc_tstream_state);
149 struct tevent_req *req, *subreq;
150 struct rpc_tstream_read_state *state;
151 struct timeval endtime;
153 req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_read_state);
154 if (req == NULL) {
155 return NULL;
157 if (!rpc_tstream_is_connected(transp)) {
158 NTSTATUS status = NT_STATUS_CONNECTION_DISCONNECTED;
159 if (tstream_is_smbXcli_np(transp->stream)) {
160 status = NT_STATUS_PIPE_DISCONNECTED;
162 tevent_req_nterror(req, status);
163 return tevent_req_post(req, ev);
165 state->transp = transp;
166 rpc_tstream_next_vector_init(&state->next_vector, data, size);
168 subreq = tstream_readv_pdu_queue_send(state, ev,
169 transp->stream,
170 transp->read_queue,
171 rpc_tstream_next_vector,
172 &state->next_vector);
173 if (subreq == NULL) {
174 tevent_req_nterror(req, NT_STATUS_NO_MEMORY);
175 return tevent_req_post(req, ev);
178 endtime = timeval_current_ofs_msec(transp->timeout);
179 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
180 goto fail;
183 tevent_req_set_callback(subreq, rpc_tstream_read_done, req);
184 return req;
185 fail:
186 TALLOC_FREE(req);
187 return NULL;
190 static void rpc_tstream_read_done(struct tevent_req *subreq)
192 struct tevent_req *req =
193 tevent_req_callback_data(subreq, struct tevent_req);
194 struct rpc_tstream_read_state *state =
195 tevent_req_data(req, struct rpc_tstream_read_state);
196 int err;
198 state->nread = tstream_readv_pdu_queue_recv(subreq, &err);
199 TALLOC_FREE(subreq);
200 if (state->nread < 0) {
201 rpc_tstream_disconnect(state->transp);
202 tevent_req_nterror(req, map_nt_error_from_unix(err));
203 return;
205 tevent_req_done(req);
208 static NTSTATUS rpc_tstream_read_recv(struct tevent_req *req, ssize_t *size)
210 struct rpc_tstream_read_state *state = tevent_req_data(
211 req, struct rpc_tstream_read_state);
212 NTSTATUS status;
214 if (tevent_req_is_nterror(req, &status)) {
215 return status;
217 *size = state->nread;
218 return NT_STATUS_OK;
221 struct rpc_tstream_write_state {
222 struct tevent_context *ev;
223 struct rpc_tstream_state *transp;
224 struct iovec iov;
225 ssize_t nwritten;
228 static void rpc_tstream_write_done(struct tevent_req *subreq);
230 static struct tevent_req *rpc_tstream_write_send(TALLOC_CTX *mem_ctx,
231 struct tevent_context *ev,
232 const uint8_t *data, size_t size,
233 void *priv)
235 struct rpc_tstream_state *transp =
236 talloc_get_type_abort(priv, struct rpc_tstream_state);
237 struct tevent_req *req, *subreq;
238 struct rpc_tstream_write_state *state;
239 struct timeval endtime;
241 req = tevent_req_create(mem_ctx, &state, struct rpc_tstream_write_state);
242 if (req == NULL) {
243 return NULL;
245 if (!rpc_tstream_is_connected(transp)) {
246 NTSTATUS status = NT_STATUS_CONNECTION_DISCONNECTED;
247 if (tstream_is_smbXcli_np(transp->stream)) {
248 status = NT_STATUS_PIPE_DISCONNECTED;
250 tevent_req_nterror(req, status);
251 return tevent_req_post(req, ev);
253 state->ev = ev;
254 state->transp = transp;
255 state->iov.iov_base = discard_const_p(void *, data);
256 state->iov.iov_len = size;
258 subreq = tstream_writev_queue_send(state, ev,
259 transp->stream,
260 transp->write_queue,
261 &state->iov, 1);
262 if (subreq == NULL) {
263 goto fail;
266 endtime = timeval_current_ofs_msec(transp->timeout);
267 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
268 goto fail;
271 tevent_req_set_callback(subreq, rpc_tstream_write_done, req);
272 return req;
273 fail:
274 TALLOC_FREE(req);
275 return NULL;
278 static void rpc_tstream_write_done(struct tevent_req *subreq)
280 struct tevent_req *req =
281 tevent_req_callback_data(subreq, struct tevent_req);
282 struct rpc_tstream_write_state *state =
283 tevent_req_data(req, struct rpc_tstream_write_state);
284 int err;
286 state->nwritten = tstream_writev_queue_recv(subreq, &err);
287 TALLOC_FREE(subreq);
288 if (state->nwritten < 0) {
289 rpc_tstream_disconnect(state->transp);
290 tevent_req_nterror(req, map_nt_error_from_unix(err));
291 return;
293 tevent_req_done(req);
296 static NTSTATUS rpc_tstream_write_recv(struct tevent_req *req, ssize_t *sent)
298 struct rpc_tstream_write_state *state =
299 tevent_req_data(req, struct rpc_tstream_write_state);
300 NTSTATUS status;
302 if (tevent_req_is_nterror(req, &status)) {
303 return status;
305 *sent = state->nwritten;
306 return NT_STATUS_OK;
309 struct rpc_tstream_trans_state {
310 struct tevent_context *ev;
311 struct rpc_tstream_state *transp;
312 struct iovec req;
313 uint32_t max_rdata_len;
314 struct iovec rep;
317 static void rpc_tstream_trans_writev(struct tevent_req *subreq);
318 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq);
320 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
321 void *private_data,
322 TALLOC_CTX *mem_ctx,
323 struct iovec **_vector,
324 size_t *count);
326 static struct tevent_req *rpc_tstream_trans_send(TALLOC_CTX *mem_ctx,
327 struct tevent_context *ev,
328 const uint8_t *data, size_t data_len,
329 uint32_t max_rdata_len,
330 void *priv)
332 struct rpc_tstream_state *transp =
333 talloc_get_type_abort(priv, struct rpc_tstream_state);
334 struct tevent_req *req, *subreq;
335 struct rpc_tstream_trans_state *state;
336 struct timeval endtime;
337 bool use_trans = false;
339 req = tevent_req_create(mem_ctx, &state,
340 struct rpc_tstream_trans_state);
341 if (req == NULL) {
342 return NULL;
345 if (!rpc_tstream_is_connected(transp)) {
346 NTSTATUS status = NT_STATUS_CONNECTION_DISCONNECTED;
347 if (tstream_is_smbXcli_np(transp->stream)) {
348 status = NT_STATUS_PIPE_DISCONNECTED;
350 tevent_req_nterror(req, status);
351 return tevent_req_post(req, ev);
353 state->ev = ev;
354 state->transp = transp;
355 state->req.iov_len = data_len;
356 state->req.iov_base = discard_const_p(void *, data);
357 state->max_rdata_len = max_rdata_len;
359 endtime = timeval_current_ofs_msec(transp->timeout);
361 if (tstream_is_smbXcli_np(transp->stream)) {
362 use_trans = true;
364 if (tevent_queue_length(transp->write_queue) > 0) {
365 use_trans = false;
367 if (tevent_queue_length(transp->read_queue) > 0) {
368 use_trans = false;
371 if (use_trans) {
372 tstream_smbXcli_np_use_trans(transp->stream);
375 subreq = tstream_writev_queue_send(state, ev,
376 transp->stream,
377 transp->write_queue,
378 &state->req, 1);
379 if (tevent_req_nomem(subreq, req)) {
380 return tevent_req_post(req, ev);
382 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
383 return tevent_req_post(req, ev);
385 tevent_req_set_callback(subreq, rpc_tstream_trans_writev, req);
387 subreq = tstream_readv_pdu_queue_send(state, ev,
388 transp->stream,
389 transp->read_queue,
390 rpc_tstream_trans_next_vector,
391 state);
392 if (tevent_req_nomem(subreq, req)) {
393 return tevent_req_post(req, ev);
395 if (!tevent_req_set_endtime(subreq, ev, endtime)) {
396 return tevent_req_post(req, ev);
398 tevent_req_set_callback(subreq, rpc_tstream_trans_readv_pdu, req);
400 return req;
403 static void rpc_tstream_trans_writev(struct tevent_req *subreq)
405 struct tevent_req *req =
406 tevent_req_callback_data(subreq,
407 struct tevent_req);
408 struct rpc_tstream_trans_state *state =
409 tevent_req_data(req,
410 struct rpc_tstream_trans_state);
411 int ret;
412 int err;
414 ret = tstream_writev_queue_recv(subreq, &err);
415 TALLOC_FREE(subreq);
416 if (ret == -1) {
417 rpc_tstream_disconnect(state->transp);
418 tevent_req_nterror(req, map_nt_error_from_unix(err));
419 return;
423 static int rpc_tstream_trans_next_vector(struct tstream_context *stream,
424 void *private_data,
425 TALLOC_CTX *mem_ctx,
426 struct iovec **_vector,
427 size_t *count)
429 struct rpc_tstream_trans_state *state =
430 talloc_get_type_abort(private_data,
431 struct rpc_tstream_trans_state);
432 struct iovec *vector;
434 if (state->max_rdata_len == state->rep.iov_len) {
435 *_vector = NULL;
436 *count = 0;
437 return 0;
440 state->rep.iov_base = talloc_array(state, uint8_t,
441 state->max_rdata_len);
442 if (state->rep.iov_base == NULL) {
443 return -1;
445 state->rep.iov_len = state->max_rdata_len;
447 vector = talloc_array(mem_ctx, struct iovec, 1);
448 if (!vector) {
449 return -1;
452 vector[0] = state->rep;
454 *_vector = vector;
455 *count = 1;
456 return 0;
459 static void rpc_tstream_trans_readv_pdu(struct tevent_req *subreq)
461 struct tevent_req *req =
462 tevent_req_callback_data(subreq,
463 struct tevent_req);
464 struct rpc_tstream_trans_state *state =
465 tevent_req_data(req,
466 struct rpc_tstream_trans_state);
467 int ret;
468 int err;
470 ret = tstream_readv_pdu_queue_recv(subreq, &err);
471 TALLOC_FREE(subreq);
472 if (ret == -1) {
473 rpc_tstream_disconnect(state->transp);
474 tevent_req_nterror(req, map_nt_error_from_unix(err));
475 return;
478 tevent_req_done(req);
481 static NTSTATUS rpc_tstream_trans_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
482 uint8_t **prdata, uint32_t *prdata_len)
484 struct rpc_tstream_trans_state *state =
485 tevent_req_data(req,
486 struct rpc_tstream_trans_state);
487 NTSTATUS status;
489 if (tevent_req_is_nterror(req, &status)) {
490 return status;
493 *prdata = (uint8_t *)talloc_move(mem_ctx, &state->rep.iov_base);
494 *prdata_len = state->rep.iov_len;
495 return NT_STATUS_OK;
499 * @brief Initialize a tstream transport facility
500 * NOTE: this function will talloc_steal, the stream and the queues.
502 * @param mem_ctx - memory context used to allocate the transport
503 * @param stream - a ready to use tstream
504 * @param presult - the transport structure
506 * @return - a NT Status error code.
508 NTSTATUS rpc_transport_tstream_init(TALLOC_CTX *mem_ctx,
509 struct tstream_context **stream,
510 struct rpc_cli_transport **presult)
512 struct rpc_cli_transport *result;
513 struct rpc_tstream_state *state;
515 result = talloc(mem_ctx, struct rpc_cli_transport);
516 if (result == NULL) {
517 return NT_STATUS_NO_MEMORY;
519 state = talloc(result, struct rpc_tstream_state);
520 if (state == NULL) {
521 TALLOC_FREE(result);
522 return NT_STATUS_NO_MEMORY;
524 result->priv = state;
526 state->read_queue = tevent_queue_create(state, "read_queue");
527 if (state->read_queue == NULL) {
528 TALLOC_FREE(result);
529 return NT_STATUS_NO_MEMORY;
531 state->write_queue = tevent_queue_create(state, "write_queue");
532 if (state->write_queue == NULL) {
533 TALLOC_FREE(result);
534 return NT_STATUS_NO_MEMORY;
537 state->stream = talloc_move(state, stream);
538 state->timeout = 10000; /* 10 seconds. */
540 if (tstream_is_smbXcli_np(state->stream)) {
541 result->trans_send = rpc_tstream_trans_send;
542 result->trans_recv = rpc_tstream_trans_recv;
543 } else {
544 result->trans_send = NULL;
545 result->trans_recv = NULL;
547 result->write_send = rpc_tstream_write_send;
548 result->write_recv = rpc_tstream_write_recv;
549 result->read_send = rpc_tstream_read_send;
550 result->read_recv = rpc_tstream_read_recv;
551 result->is_connected = rpc_tstream_is_connected;
552 result->set_timeout = rpc_tstream_set_timeout;
554 *presult = result;
555 return NT_STATUS_OK;
558 struct tstream_context *rpc_transport_get_tstream(
559 struct rpc_cli_transport *transport)
561 struct rpc_tstream_state *state = talloc_get_type_abort(
562 transport->priv, struct rpc_tstream_state);
563 return state->stream;