Return correct reply size: data+binary, not data+event
[elliptics.git] / srw / srw.cpp
blob80cca72023e9e0e3b4a4e5736dcb8b5751793430
1 /*
2 * 2011+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <sys/mman.h>
20 #include <sys/wait.h>
22 #include <ctype.h>
23 #include <fcntl.h>
24 #include <limits.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
29 #ifdef HAVE_COCAINE_SUPPORT
31 #include <map>
32 #include <vector>
33 #include <boost/algorithm/string.hpp>
34 #include <boost/lexical_cast.hpp>
35 #include <boost/thread.hpp>
36 #include <boost/thread/condition.hpp>
38 #include <zmq.hpp>
40 #include <cocaine/context.hpp>
41 #include <cocaine/logging.hpp>
42 #include <cocaine/app.hpp>
43 #include <cocaine/job.hpp>
45 #include <elliptics/interface.h>
46 #include <elliptics/srw.h>
48 #include "elliptics.h"
50 class srw_log {
51 public:
52 srw_log(struct dnet_node *node, int level, const std::string &app, const std::string &message) : m_n(node) {
53 dnet_log(m_n, level, "dnet-sink: %s : %s\n", app.c_str(), message.c_str());
54 return;
56 if (!boost::starts_with(app, "app/") || (level > node->log->log_level))
57 return;
59 std::string msg_with_date;
61 char str[64];
62 struct tm tm;
63 struct timeval tv;
65 gettimeofday(&tv, NULL);
66 localtime_r((time_t *)&tv.tv_sec, &tm);
67 strftime(str, sizeof(str), "%F %R:%S", &tm);
69 char tmp[128];
71 int len = snprintf(tmp, sizeof(tmp), "%s.%06lu %ld/%4d %1d: ", str, tv.tv_usec, dnet_get_id(), getpid(), level);
72 msg_with_date.assign(tmp, len);
73 msg_with_date += message + "\n";
75 struct dnet_io_control ctl;
77 memset(&ctl, 0, sizeof(ctl));
79 ctl.cflags = 0;
80 ctl.data = msg_with_date.data();
82 ctl.io.flags = DNET_IO_FLAGS_APPEND;
83 ctl.io.size = msg_with_date.size();
85 std::string app_log = app + ".log";
86 dnet_transform(m_n, app_log.data(), app_log.size(), &ctl.id);
88 ctl.fd = -1;
90 char *result = NULL;
91 int err = dnet_write_data_wait(m_n, &ctl, (void **)&result);
92 if (err < 0) {
93 /* could not find remote node to send data, saving it locally */
94 if (err == -ENOENT) {
95 log_locally(ctl.id, msg_with_date);
96 return;
99 std::ostringstream string;
100 string << dnet_dump_id(&ctl.id) << ": WRITE: log-write-failed: size: " << message.size() << ", err: " << err;
101 throw std::runtime_error(string.str());
104 free(result);
107 private:
108 struct dnet_node *m_n;
110 void log_locally(struct dnet_id &id, const std::string &msg) const {
111 std::vector<char> data;
112 struct dnet_cmd *cmd;
113 struct dnet_io_attr *io;
114 char *msg_data;
116 data.resize(sizeof(struct dnet_cmd) + sizeof(struct dnet_io_attr) + msg.size());
118 cmd = (struct dnet_cmd *)data.data();
119 io = (struct dnet_io_attr *)(cmd + 1);
120 msg_data = (char *)(io + 1);
122 cmd->id = id;
123 cmd->id.group_id = m_n->id.group_id;
124 cmd->cmd = DNET_CMD_WRITE;
125 cmd->size = data.size() - sizeof(struct dnet_cmd);
127 memcpy(io->parent, cmd->id.id, DNET_ID_SIZE);
128 memcpy(io->id, cmd->id.id, DNET_ID_SIZE);
130 io->flags = DNET_IO_FLAGS_APPEND | DNET_IO_FLAGS_SKIP_SENDING;
131 io->size = msg.size();
133 memcpy(msg_data, msg.data(), msg.size());
135 m_n->cb->command_handler(m_n->st, m_n->cb->command_private, cmd, (void *)(cmd + 1));
139 class dnet_sink_t: public cocaine::logging::sink_t {
140 public:
141 dnet_sink_t(struct dnet_node *n, cocaine::logging::priorities prio): cocaine::logging::sink_t(prio), m_n(n) {
144 virtual void emit(cocaine::logging::priorities prio, const std::string &app, const std::string& message) const {
145 int level = DNET_LOG_NOTICE;
146 if (prio == cocaine::logging::debug)
147 level = DNET_LOG_DEBUG;
148 if (prio == cocaine::logging::info)
149 level = DNET_LOG_INFO;
150 if (prio == cocaine::logging::warning)
151 level = DNET_LOG_INFO;
152 if (prio == cocaine::logging::error)
153 level = DNET_LOG_ERROR;
154 if (prio == cocaine::logging::ignore)
155 level = -1;
157 if (level != -1)
158 srw_log log(m_n, level, app, message);
161 private:
162 struct dnet_node *m_n;
165 class dnet_job_t: public cocaine::engine::job_t
167 public:
168 dnet_job_t(struct dnet_node *n, const std::string& event, const cocaine::blob_t& blob):
169 cocaine::engine::job_t(event, blob),
170 m_completed(false),
171 m_name(event),
172 m_n(n) {
175 virtual void react(const cocaine::engine::events::chunk& event) {
176 dnet_log(m_n, DNET_LOG_INFO, "chunk: %.*s\n", (int)event.message.size(), (char *)event.message.data());
178 boost::mutex::scoped_lock guard(m_lock);
179 m_res.insert(m_res.end(), (char *)event.message.data(), (char *)event.message.data() + event.message.size());
182 virtual void react(const cocaine::engine::events::choke& ) {
183 srw_log log(m_n, DNET_LOG_NOTICE, "app/" + m_name, "processing completed, data size: " +
184 boost::lexical_cast<std::string>(m_res.size()));
186 if (m_res.size())
187 reply(true, NULL, 0);
190 virtual void react(const cocaine::engine::events::error& event) {
191 srw_log log(m_n, DNET_LOG_ERROR, "app/" + m_name, event.message + ": " + boost::lexical_cast<std::string>(event.code));
194 void reply(bool completed, const char *reply, size_t size) {
195 boost::mutex::scoped_lock guard(m_lock);
197 if (reply && size)
198 m_res.insert(m_res.end(), reply, reply + size);
200 m_completed = completed;
201 m_cond.notify_all();
204 bool wait(long timeout) {
205 boost::system_time const abs_time = boost::get_system_time()+ boost::posix_time::seconds(timeout);
207 while (!m_completed) {
208 boost::mutex::scoped_lock guard(m_lock);
209 if (!m_cond.timed_wait(guard, abs_time))
210 return false;
213 return true;
216 std::vector<char> &result(void) {
217 return m_res;
220 private:
221 bool m_completed;
222 std::string m_name;
223 struct dnet_node *m_n;
224 std::vector<char> m_res;
225 boost::mutex m_lock;
226 boost::condition m_cond;
229 typedef boost::shared_ptr<dnet_job_t> dnet_shared_job_t;
231 class app_watcher {
232 public:
233 app_watcher(cocaine::context_t &ctx, const std::string &app) :
234 m_need_exit(false) {
235 m_app.reset(new cocaine::app_t(ctx, app));
236 m_app->start();
238 m_thread = boost::thread(&app_watcher::process, this);
241 ~app_watcher() {
242 boost::mutex::scoped_lock guard(m_lock);
243 m_need_exit = true;
244 m_cond.notify_one();
245 guard.unlock();
247 m_thread.join();
250 void push(dnet_shared_job_t job) {
251 boost::mutex::scoped_lock guard(m_lock);
253 m_jobs.push_back(job);
254 m_cond.notify_one();
257 std::string info() {
258 return Json::FastWriter().write(m_app->info());
261 private:
262 bool m_need_exit;
263 boost::condition m_cond;
264 boost::mutex m_lock;
265 std::deque<dnet_shared_job_t> m_jobs;
266 boost::thread m_thread;
267 std::auto_ptr<cocaine::app_t> m_app;
269 void process() {
270 while (!m_need_exit) {
272 boost::mutex::scoped_lock guard(m_lock);
273 if (m_jobs.empty()) {
274 m_cond.wait(guard);
277 if (m_need_exit)
278 break;
280 if (!m_jobs.empty()) {
281 dnet_shared_job_t job = m_jobs.front();
282 m_jobs.pop_front();
283 guard.unlock();
285 m_app->enqueue(job, cocaine::engine::mode::blocking);
292 typedef std::map<std::string, boost::shared_ptr<app_watcher> > eng_map_t;
293 typedef std::map<int, dnet_shared_job_t> jobs_map_t;
295 namespace {
296 cocaine::logging::priorities dnet_log_level_to_prio(int level) {
297 cocaine::logging::priorities prio = cocaine::logging::ignore;
298 if (level == DNET_LOG_DEBUG)
299 prio = cocaine::logging::debug;
300 else if (level == DNET_LOG_INFO)
301 prio = cocaine::logging::info;
302 else if (level == DNET_LOG_NOTICE)
303 prio = cocaine::logging::info;
304 else if (level == DNET_LOG_ERROR)
305 prio = cocaine::logging::error;
307 return prio;
311 class srw {
312 public:
313 srw(struct dnet_node *n, const std::string &config) : m_n(n),
314 m_ctx(config, boost::make_shared<dnet_sink_t>(n, dnet_log_level_to_prio(m_n->log->log_level))) {
317 ~srw() {
318 /* no need to iterate over engines, its destructor automatically stops it */
319 #if 0
320 for (eng_map_t::iterator it = m_map.begin(); it != m_map.end(); ++it) {
321 it->second->stop();
323 #endif
326 int process(struct dnet_net_state *st, struct dnet_cmd *cmd, struct sph *sph) {
327 char *data = (char *)(sph + 1);
328 std::string event = dnet_get_event(sph, data);
330 std::vector<std::string> strs;
331 boost::split(strs, event, boost::is_any_of("@"));
333 if (strs.size() != 2) {
334 dnet_log(m_n, DNET_LOG_ERROR, "%s: invalid event name: must be application@event or application@start-task\n",
335 event.c_str());
336 return -EINVAL;
339 std::string app = strs[0];
340 std::string ev = strs[1];
342 if (ev == "start-task") {
343 boost::shared_ptr<app_watcher> eng(new app_watcher(m_ctx, app));
345 boost::mutex::scoped_lock guard(m_lock);
346 m_map.insert(std::make_pair(app, eng));
348 dnet_log(m_n, DNET_LOG_NOTICE, "%s: task started: %s\n", event.c_str(), app.c_str());
349 return 0;
350 } else if (ev == "stop-task") {
351 boost::mutex::scoped_lock guard(m_lock);
352 eng_map_t::iterator it = m_map.find(app);
353 /* destructor stops engine */
354 if (it != m_map.end())
355 m_map.erase(it);
356 guard.unlock();
358 dnet_log(m_n, DNET_LOG_NOTICE, "%s: task stopped: %s\n", event.c_str(), app.c_str());
359 return 0;
360 } else if (ev == "info") {
361 boost::mutex::scoped_lock guard(m_lock);
362 eng_map_t::iterator it = m_map.find(app);
363 if (it == m_map.end()) {
364 dnet_log(m_n, DNET_LOG_ERROR, "%s: no task '%s' started\n", event.c_str(), app.c_str());
365 return -ENOENT;
368 std::string s = it->second->info();
369 dnet_log(m_n, DNET_LOG_INFO, "app engine: %s\n", s.c_str());
370 return dnet_send_reply(st, cmd, (void *)s.data(), s.size(), 0);
371 } else if (sph->flags & (DNET_SPH_FLAGS_REPLY | DNET_SPH_FLAGS_FINISH)) {
372 boost::mutex::scoped_lock guard(m_lock);
374 jobs_map_t::iterator it = m_jobs.find(sph->src_key);
375 if (it == m_jobs.end()) {
376 dnet_log(m_n, DNET_LOG_ERROR, "%s: %s: no job(%x) to complete\n",
377 event.c_str(), dnet_dump_id(&cmd->id), sph->src_key);
378 return -ENOENT;
381 bool final = sph->flags & DNET_SPH_FLAGS_FINISH;
382 it->second->reply(final, (char *)(sph + 1) + sph->event_size, sph->data_size + sph->binary_size);
383 dnet_log(m_n, DNET_LOG_INFO, "%s: task completed(%x), total-data-size: %zd, finish: %d\n",
384 event.c_str(), sph->src_key, total_size(sph), final);
386 if (final)
387 m_jobs.erase(it);
389 return 0;
390 } else {
391 if (sph->flags & DNET_SPH_FLAGS_SRC_BLOCK) {
392 sph->src_key = (unsigned long)sph;
393 memcpy(sph->src.id, cmd->id.id, sizeof(sph->src.id));
396 boost::mutex::scoped_lock guard(m_lock);
397 eng_map_t::iterator it = m_map.find(app);
398 if (it == m_map.end()) {
399 dnet_log(m_n, DNET_LOG_ERROR, "%s: no task '%s' started\n", event.c_str(), app.c_str());
400 return -ENOENT;
403 dnet_shared_job_t job(boost::make_shared<dnet_job_t>(m_n, ev,
404 cocaine::blob_t((const char *)sph, total_size(sph) + sizeof(struct sph))));
406 if (sph->flags & DNET_SPH_FLAGS_SRC_BLOCK)
407 m_jobs.insert(std::make_pair(sph->src_key, job));
409 it->second->push(job);
410 guard.unlock();
412 dnet_log(m_n, DNET_LOG_INFO, "%s: task queued(%x), total-data-size: %zd, block: %d\n",
413 event.c_str(), sph->src_key, total_size(sph), !!(sph->flags & DNET_SPH_FLAGS_SRC_BLOCK));
415 int err = 0;
416 if (sph->flags & DNET_SPH_FLAGS_SRC_BLOCK) {
417 bool success = job->wait(m_n->wait_ts.tv_sec);
418 if (!success)
419 throw std::runtime_error("timeout waiting for exec command to complete");
421 std::vector<char> res = job->result();
422 if (res.size()) {
423 err = dnet_send_reply(st, cmd, res.data(), res.size(), 0);
426 dnet_log(m_n, DNET_LOG_NOTICE, "%s: %s: blocked task reply: %zd bytes\n",
427 event.c_str(), dnet_dump_id_str(sph->src.id), res.size());
430 return err;
434 private:
435 struct dnet_node *m_n;
436 cocaine::context_t m_ctx;
437 boost::mutex m_lock;
438 eng_map_t m_map;
439 jobs_map_t m_jobs;
441 std::string dnet_get_event(const struct sph *sph, const char *data) {
442 return std::string(data, sph->event_size);
445 size_t total_size(const struct sph *sph) {
446 return sph->event_size + sph->data_size + sph->binary_size;
450 int dnet_srw_init(struct dnet_node *n, struct dnet_config *cfg)
452 int err = 0;
454 if (!cfg->srw.config) {
455 dnet_log(n, DNET_LOG_ERROR, "srw: no config\n");
456 return -ENOTSUP;
459 try {
460 dnet_node_set_groups(n, (int *)&n->id.group_id, 1);
461 n->srw = (void *)new srw(n, cfg->srw.config);
462 dnet_log(n, DNET_LOG_INFO, "srw: initialized: config: %s\n", cfg->srw.config);
463 return 0;
464 } catch (const std::exception &e) {
465 dnet_log(n, DNET_LOG_ERROR, "srw: init failed: config: %s, exception: %s\n", cfg->srw.config, e.what());
466 err = -ENOMEM;
469 return err;
472 void dnet_srw_cleanup(struct dnet_node *n)
474 if (n->srw) {
475 try {
476 delete (srw *)n->srw;
477 } catch (...) {
480 n->srw = NULL;
484 int dnet_cmd_exec_raw(struct dnet_net_state *st, struct dnet_cmd *cmd, struct sph *header, const void *data)
486 struct dnet_node *n = st->n;
487 srw *s = (srw *)n->srw;
489 if (!s)
490 return -ENOTSUP;
492 try {
493 return s->process(st, cmd, header);
494 } catch (const std::exception &e) {
495 dnet_log(n, DNET_LOG_ERROR, "%s: srw-processing: event: %.*s, data-size: %lld, binary-size: %lld, exception: %s\n",
496 dnet_dump_id(&cmd->id), header->event_size, (const char *)data,
497 (unsigned long long)header->data_size, (unsigned long long)header->binary_size,
498 e.what());
501 return -EINVAL;
504 int dnet_srw_update(struct dnet_node *, int )
506 return 0;
508 #else
509 #include <errno.h>
511 #include "elliptics.h"
513 int dnet_srw_init(struct dnet_node *, struct dnet_config *)
515 return -ENOTSUP;
518 void dnet_srw_cleanup(struct dnet_node *)
522 int dnet_cmd_exec_raw(struct dnet_net_state *, struct dnet_cmd *, struct sph *, const void *)
524 return -ENOTSUP;
527 int dnet_srw_update(struct dnet_node *, int)
529 return 0;
531 #endif