Minor cleanups
[elliptics.git] / example / ioclient.c
blob93bfddd563d437831b44c287b6967c3a77f02819
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <sys/time.h>
20 #include <sys/syscall.h>
22 #include <ctype.h>
23 #include <fcntl.h>
24 #include <errno.h>
25 #include <unistd.h>
26 #include <signal.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <time.h>
32 #include <netinet/in.h>
34 #include "elliptics/packet.h"
35 #include "elliptics/interface.h"
37 #include "backends.h"
38 #include "common.h"
40 #ifndef __unused
41 #define __unused __attribute__ ((unused))
42 #endif
44 static struct dnet_log ioclient_logger;
46 static void dnet_usage(char *p)
48 fprintf(stderr, "Usage: %s\n"
49 " -r addr:port:family - adds a route to the given node\n"
50 " -W file - write given file to the network storage\n"
51 " -s - request IO counter stats from node\n"
52 " -z - request VFS IO stats from node\n"
53 " -a - request stats from all connected nodes\n"
54 " -U status - update server status: 1 - elliptics exits, 2 - goes RO\n"
55 " -R file - read given file from the network into the local storage\n"
56 " -I id - transaction id (used to read data)\n"
57 " -g groups - group IDs to connect\n"
58 " -c cmd-event - execute command with given event on the remote node\n"
59 " -L file - lookup a storage which hosts given file\n"
60 " -l log - log file. Default: disabled\n"
61 " -w timeout - wait timeout in seconds used to wait for content sync.\n"
62 " ... - parameters can be repeated multiple times\n"
63 " each time they correspond to the last added node\n"
64 " -m level - log level\n"
65 " -M level - set new log level\n"
66 " -F flags - change node flags (see @cfg->flags comments in include/elliptics/interface.h)\n"
67 " -O offset - read/write offset in the file\n"
68 " -S size - read/write transaction size\n"
69 " -u file - unlink file\n"
70 " -N namespace - use this namespace for operations\n"
71 " -D object - read latest data for given object, if -I id is specified, this field is unused\n"
72 " -C flags - command flags\n"
73 " -t column - column ID to read or write\n"
74 " -d - start defragmentation\n"
75 " -i flags - IO flags (see DNET_IO_FLAGS_* in include/elliptics/packet.h\n"
76 , p);
79 int main(int argc, char *argv[])
81 int ch, err, i, have_remote = 0;
82 int io_counter_stat = 0, vfs_stat = 0, single_node_stat = 1;
83 struct dnet_node_status node_status;
84 int update_status = 0;
85 struct dnet_node *n = NULL;
86 struct dnet_session *s = NULL;
87 struct dnet_config cfg, rem, *remotes = NULL;
88 char *logfile = "/dev/stderr", *readf = NULL, *writef = NULL, *cmd = NULL, *lookup = NULL;
89 char *read_data = NULL;
90 char *removef = NULL;
91 unsigned char trans_id[DNET_ID_SIZE], *id = NULL;
92 FILE *log = NULL;
93 uint64_t offset, size;
94 int *groups = NULL, group_num = 0;
95 int type = EBLOB_TYPE_DATA;
96 uint64_t cflags = 0;
97 uint64_t ioflags = 0;
98 int defrag = 0;
99 sigset_t mask;
101 memset(&node_status, 0, sizeof(struct dnet_node_status));
102 memset(&cfg, 0, sizeof(struct dnet_config));
104 node_status.nflags = -1;
105 node_status.status_flags = -1;
106 node_status.log_level = ~0U;
108 size = offset = 0;
110 cfg.sock_type = SOCK_STREAM;
111 cfg.proto = IPPROTO_TCP;
112 cfg.wait_timeout = 60;
113 ioclient_logger.log_level = DNET_LOG_ERROR;
115 memcpy(&rem, &cfg, sizeof(struct dnet_config));
117 while ((ch = getopt(argc, argv, "i:dC:t:A:F:M:N:g:u:O:S:m:zsU:aL:w:l:c:I:r:W:R:D:h")) != -1) {
118 switch (ch) {
119 case 'i':
120 ioflags = strtoull(optarg, NULL, 0);
121 break;
122 case 'd':
123 defrag = 1;
124 break;
125 case 'C':
126 cflags = strtoull(optarg, NULL, 0);
127 break;
128 case 't':
129 type = atoi(optarg);
130 break;
131 case 'F':
132 node_status.nflags = strtol(optarg, NULL, 0);
133 update_status = 1;
134 break;
135 case 'M':
136 node_status.log_level = atoi(optarg);
137 update_status = 1;
138 break;
139 case 'N':
140 cfg.ns = optarg;
141 cfg.nsize = strlen(optarg);
142 break;
143 case 'u':
144 removef = optarg;
145 break;
146 case 'O':
147 offset = strtoull(optarg, NULL, 0);
148 break;
149 case 'S':
150 size = strtoull(optarg, NULL, 0);
151 break;
152 case 'm':
153 ioclient_logger.log_level = atoi(optarg);
154 break;
155 case 's':
156 io_counter_stat = 1;
157 break;
158 case 'U':
159 node_status.status_flags = strtol(optarg, NULL, 0);
160 update_status = 1;
161 break;
162 case 'z':
163 vfs_stat = 1;
164 break;
165 case 'a':
166 single_node_stat = 0;
167 break;
168 case 'L':
169 lookup = optarg;
170 break;
171 case 'w':
172 cfg.check_timeout = cfg.wait_timeout = atoi(optarg);
173 break;
174 case 'l':
175 logfile = optarg;
176 break;
177 case 'c':
178 cmd = optarg;
179 break;
180 case 'I':
181 err = dnet_parse_numeric_id(optarg, trans_id);
182 if (err)
183 return err;
184 id = trans_id;
185 break;
186 case 'g':
187 group_num = dnet_parse_groups(optarg, &groups);
188 if (group_num <= 0)
189 return -1;
190 break;
191 case 'r':
192 err = dnet_parse_addr(optarg, &rem);
193 if (err)
194 return err;
195 have_remote++;
196 remotes = realloc(remotes, sizeof(rem) * have_remote);
197 if (!remotes)
198 return -ENOMEM;
199 memcpy(&remotes[have_remote - 1], &rem, sizeof(rem));
200 break;
201 case 'W':
202 writef = optarg;
203 break;
204 case 'R':
205 readf = optarg;
206 break;
207 case 'D':
208 read_data = optarg;
209 break;
210 case 'h':
211 default:
212 dnet_usage(argv[0]);
213 return -1;
217 log = fopen(logfile, "a");
218 if (!log) {
219 err = -errno;
220 fprintf(stderr, "Failed to open log file %s: %s.\n", logfile, strerror(errno));
221 return err;
224 ioclient_logger.log_private = log;
225 ioclient_logger.log = dnet_common_log;
226 cfg.log = &ioclient_logger;
228 n = dnet_node_create(&cfg);
229 if (!n)
230 return -1;
232 sigemptyset(&mask);
233 sigaddset(&mask, SIGTERM);
234 sigaddset(&mask, SIGINT);
235 sigaddset(&mask, SIGHUP);
236 sigaddset(&mask, SIGCHLD);
237 pthread_sigmask(SIG_UNBLOCK, &mask, NULL);
238 sigprocmask(SIG_UNBLOCK, &mask, NULL);
240 if (have_remote) {
241 int error = -ECONNRESET;
242 for (i=0; i<have_remote; ++i) {
243 if (single_node_stat && (vfs_stat || io_counter_stat))
244 remotes[i].flags = DNET_CFG_NO_ROUTE_LIST;
245 err = dnet_add_state(n, &remotes[i]);
246 if (!err)
247 error = 0;
250 if (error)
251 return error;
254 s = dnet_session_create(n);
255 if (!s)
256 return -1;
258 dnet_session_set_groups(s, groups, group_num);
260 if (defrag)
261 return dnet_start_defrag(s, cflags);
263 if (writef) {
264 if (id) {
265 struct dnet_id raw;
267 dnet_setup_id(&raw, 0, id);
268 raw.type = type;
270 err = dnet_write_file_id(s, writef, &raw, offset, offset, size, cflags, ioflags);
271 } else {
272 err = dnet_write_file(s, writef, writef, strlen(writef), offset, offset, size, cflags, ioflags, type);
275 if (err)
276 return err;
279 if (readf) {
280 if (id) {
281 struct dnet_id raw;
283 dnet_setup_id(&raw, 0, id);
284 raw.type = type;
286 err = dnet_read_file_id(s, readf, &raw, offset, size);
287 } else {
288 err = dnet_read_file(s, readf, readf, strlen(readf), offset, size, type);
290 if (err)
291 return err;
294 if (read_data) {
295 void *data;
296 struct dnet_id raw;
297 struct dnet_io_attr io;
299 if (!id) {
300 dnet_transform(n, read_data, strlen(read_data), &raw);
301 } else {
302 memcpy(&raw.id, id, DNET_ID_SIZE);
304 raw.type = type;
305 raw.group_id = 0; /* unused */
307 memset(&io, 0, sizeof(io));
308 io.type = type;
309 io.flags = ioflags;
310 memcpy(io.id, raw.id, DNET_ID_SIZE);
311 memcpy(io.parent, raw.id, DNET_ID_SIZE);
313 /* number of copies to check to find the latest data */
314 io.num = group_num;
316 err = dnet_read_latest(s, &raw, &io, cflags, &data);
317 if (err)
318 return err;
320 data += sizeof(struct dnet_io_attr);
321 io.size -= sizeof(struct dnet_io_attr);
323 while (io.size) {
324 err = write(1, data, io.size);
325 if (err <= 0) {
326 err = -errno;
327 dnet_log_raw(n, DNET_LOG_ERROR, "%s: can not write data to stdout: %d %s",
328 read_data, err, strerror(-err));
329 return err;
332 io.size -= err;
336 if (removef) {
337 if (id) {
338 struct dnet_id raw;
340 dnet_setup_id(&raw, 0, id);
341 raw.type = type;
342 dnet_remove_object_now(s, &raw, cflags, ioflags);
344 return 0;
347 err = dnet_remove_file(s, removef, strlen(removef), NULL, cflags, ioflags);
348 if (err)
349 return err;
352 if (cmd) {
353 struct dnet_id __did, *did = NULL;
354 struct sph *sph;
355 int len = strlen(cmd);
356 int event_size = len;
357 char *ret = NULL;
358 char *tmp;
360 tmp = strchr(cmd, ' ');
361 if (tmp) {
362 event_size = tmp - cmd;
365 if (id) {
366 did = &__did;
368 dnet_setup_id(did, 0, id);
369 did->type = type;
372 sph = malloc(sizeof(struct sph) + len + 1);
373 if (!sph)
374 return -ENOMEM;
376 memset(sph, 0, sizeof(struct sph));
378 sph->flags = DNET_SPH_FLAGS_SRC_BLOCK;
379 sph->key = -1;
380 sph->binary_size = 0;
381 sph->data_size = len - event_size;
382 sph->event_size = event_size;
384 sprintf(sph->data, "%s", cmd);
386 err = dnet_send_cmd(s, did, sph, (void **)&ret);
387 if (err < 0)
388 return err;
390 free(sph);
392 if (err > 0) {
393 printf("%.*s\n", err, ret);
394 free(ret);
398 if (lookup) {
399 err = dnet_lookup(s, lookup);
400 if (err)
401 return err;
404 if (vfs_stat) {
405 err = dnet_request_stat(s, NULL, DNET_CMD_STAT, 0, NULL, NULL);
406 if (err < 0)
407 return err;
410 if (io_counter_stat) {
411 err = dnet_request_stat(s, NULL, DNET_CMD_STAT_COUNT, DNET_ATTR_CNTR_GLOBAL, NULL, NULL);
412 if (err < 0)
413 return err;
416 if (update_status) {
417 struct dnet_addr addr;
419 for (i=0; i<have_remote; ++i) {
420 memset(&addr, 0, sizeof(addr));
421 addr.addr_len = sizeof(addr.addr);
423 err = dnet_fill_addr(&addr, remotes[i].addr, remotes[i].port,
424 remotes[i].family, remotes[i].sock_type, remotes[i].proto);
425 if (err) {
426 dnet_log_raw(n, DNET_LOG_ERROR, "ioclient: dnet_fill_addr: %s:%s:%d, sock_type: %d, proto: %d: %s %d\n",
427 remotes[i].addr, remotes[i].port,
428 remotes[i].family, remotes[i].sock_type, remotes[i].proto,
429 strerror(-err), err);
432 err = dnet_update_status(s, &addr, NULL, &node_status);
433 if (err) {
434 dnet_log_raw(n, DNET_LOG_ERROR, "ioclient: dnet_update_status: %s:%s:%d, sock_type: %d, proto: %d: update: %d: "
435 "%s %d\n",
436 remotes[i].addr, remotes[i].port,
437 remotes[i].family, remotes[i].sock_type, remotes[i].proto, update_status,
438 strerror(-err), err);
444 dnet_session_destroy(s);
445 dnet_node_destroy(n);
447 return 0;