Elliptics version update: 2.19.2.7
[elliptics.git] / srw / srw.cpp
bloba5f101a4773533db99b3b9c8c8edf7224b53df4b
1 /*
2 * 2011+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <sys/mman.h>
20 #include <sys/wait.h>
22 #include <ctype.h>
23 #include <fcntl.h>
24 #include <limits.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <unistd.h>
29 #ifdef HAVE_COCAINE_SUPPORT
31 #include <map>
32 #include <vector>
33 #include <boost/algorithm/string.hpp>
34 #include <boost/lexical_cast.hpp>
35 #include <boost/thread.hpp>
36 #include <boost/thread/condition.hpp>
38 #include <zmq.hpp>
40 #include <cocaine/context.hpp>
41 #include <cocaine/logging.hpp>
42 #include <cocaine/app.hpp>
43 #include <cocaine/job.hpp>
45 #include <elliptics/interface.h>
46 #include <elliptics/srw.h>
48 #include "elliptics.h"
50 class srw_log {
51 public:
52 srw_log(struct dnet_session *session, int level, const std::string &app, const std::string &message) : m_s(session) {
53 dnet_log(session->node, level, "srw: %s : %s\n", app.c_str(), message.c_str());
54 return;
56 if (!boost::starts_with(app, "app/") || (level > m_s->node->log->log_level))
57 return;
59 std::string msg_with_date;
61 char str[64];
62 struct tm tm;
63 struct timeval tv;
65 gettimeofday(&tv, NULL);
66 localtime_r((time_t *)&tv.tv_sec, &tm);
67 strftime(str, sizeof(str), "%F %R:%S", &tm);
69 char tmp[128];
71 int len = snprintf(tmp, sizeof(tmp), "%s.%06lu %ld/%4d %1d: ", str, tv.tv_usec, dnet_get_id(), getpid(), level);
72 msg_with_date.assign(tmp, len);
73 msg_with_date += message + "\n";
75 struct dnet_io_control ctl;
77 memset(&ctl, 0, sizeof(ctl));
79 ctl.cflags = 0;
80 ctl.data = msg_with_date.data();
82 ctl.io.flags = DNET_IO_FLAGS_APPEND;
83 ctl.io.size = msg_with_date.size();
85 std::string app_log = app + ".log";
86 dnet_transform(m_s->node, app_log.data(), app_log.size(), &ctl.id);
88 ctl.fd = -1;
90 char *result = NULL;
91 int err = dnet_write_data_wait(m_s, &ctl, (void **)&result);
92 if (err < 0) {
93 /* could not find remote node to send data, saving it locally */
94 if (err == -ENOENT) {
95 log_locally(ctl.id, msg_with_date);
96 return;
99 std::ostringstream string;
100 string << dnet_dump_id(&ctl.id) << ": WRITE: log-write-failed: size: " << message.size() << ", err: " << err;
101 throw std::runtime_error(string.str());
104 free(result);
107 private:
108 struct dnet_session *m_s;
110 void log_locally(struct dnet_id &id, const std::string &msg) const {
111 std::vector<char> data;
112 struct dnet_cmd *cmd;
113 struct dnet_io_attr *io;
114 char *msg_data;
116 data.resize(sizeof(struct dnet_cmd) + sizeof(struct dnet_io_attr) + msg.size());
118 cmd = (struct dnet_cmd *)data.data();
119 io = (struct dnet_io_attr *)(cmd + 1);
120 msg_data = (char *)(io + 1);
122 cmd->id = id;
123 cmd->id.group_id = m_s->node->id.group_id;
124 cmd->cmd = DNET_CMD_WRITE;
125 cmd->size = data.size() - sizeof(struct dnet_cmd);
127 memcpy(io->parent, cmd->id.id, DNET_ID_SIZE);
128 memcpy(io->id, cmd->id.id, DNET_ID_SIZE);
130 io->flags = DNET_IO_FLAGS_APPEND | DNET_IO_FLAGS_SKIP_SENDING;
131 io->size = msg.size();
133 memcpy(msg_data, msg.data(), msg.size());
135 m_s->node->cb->command_handler(m_s->node->st, m_s->node->cb->command_private, cmd, (void *)(cmd + 1));
139 class dnet_sink_t: public cocaine::logging::sink_t {
140 public:
141 dnet_sink_t(struct dnet_session *sess, cocaine::logging::priorities prio): cocaine::logging::sink_t(prio), m_s(sess) {
144 virtual void emit(cocaine::logging::priorities prio, const std::string &app, const std::string& message) const {
145 int level = DNET_LOG_NOTICE;
146 if (prio == cocaine::logging::debug)
147 level = DNET_LOG_DEBUG;
148 if (prio == cocaine::logging::info)
149 level = DNET_LOG_INFO;
150 if (prio == cocaine::logging::warning)
151 level = DNET_LOG_INFO;
152 if (prio == cocaine::logging::error)
153 level = DNET_LOG_ERROR;
154 if (prio == cocaine::logging::ignore)
155 level = -1;
157 if (level != -1)
158 srw_log log(m_s, level, app, message);
161 private:
162 struct dnet_session *m_s;
165 class dnet_job_t: public cocaine::engine::job_t
167 public:
168 dnet_job_t(struct dnet_session *session, const std::string &app, const std::string& event, const cocaine::blob_t& blob):
169 cocaine::engine::job_t(event, blob),
170 m_completed(false),
171 m_name(app + "/" + event),
172 m_s(session) {
175 virtual void react(const cocaine::engine::events::chunk& event) {
176 boost::mutex::scoped_lock guard(m_lock);
177 m_res.insert(m_res.end(), (char *)event.message.data(), (char *)event.message.data() + event.message.size());
179 std::ostringstream msg;
180 msg << "received reply chunk: size: " << event.message.size() << ", data: '" << event.message.data() << "', " <<
181 "accumulated-reply-size: " << m_res.size() << std::endl;
183 srw_log log(m_s, DNET_LOG_NOTICE, "app/" + m_name, msg.str());
186 virtual void react(const cocaine::engine::events::choke& ) {
187 srw_log log(m_s, DNET_LOG_NOTICE, "app/" + m_name, "job completed, data size: " +
188 boost::lexical_cast<std::string>(m_res.size()));
190 if (m_res.size())
191 reply(true, NULL, 0);
194 virtual void react(const cocaine::engine::events::error& event) {
195 srw_log log(m_s, DNET_LOG_ERROR, "app/" + m_name, event.message + ": " + boost::lexical_cast<std::string>(event.code));
198 void reply(bool completed, const char *reply, size_t size) {
199 boost::mutex::scoped_lock guard(m_lock);
201 if (reply && size)
202 m_res.insert(m_res.end(), reply, reply + size);
204 m_completed = completed;
205 m_cond.notify_all();
208 bool wait(long timeout) {
209 boost::system_time const abs_time = boost::get_system_time()+ boost::posix_time::seconds(timeout);
211 while (!m_completed) {
212 boost::mutex::scoped_lock guard(m_lock);
213 if (!m_cond.timed_wait(guard, abs_time))
214 return false;
217 return true;
220 std::vector<char> &result(void) {
221 return m_res;
224 private:
225 bool m_completed;
226 std::string m_name;
227 struct dnet_session *m_s;
228 std::vector<char> m_res;
229 boost::mutex m_lock;
230 boost::condition m_cond;
233 typedef boost::shared_ptr<dnet_job_t> dnet_shared_job_t;
235 class app_watcher {
236 public:
237 app_watcher(cocaine::context_t &ctx, const std::string &app) :
238 m_need_exit(false) {
239 m_app.reset(new cocaine::app_t(ctx, app));
240 m_app->start();
242 m_thread = boost::thread(&app_watcher::process, this);
245 ~app_watcher() {
246 boost::mutex::scoped_lock guard(m_lock);
247 m_need_exit = true;
248 m_cond.notify_one();
249 guard.unlock();
251 m_thread.join();
254 void push(dnet_shared_job_t job) {
255 boost::mutex::scoped_lock guard(m_lock);
257 m_jobs.push_back(job);
258 m_cond.notify_one();
261 std::string info() {
262 return Json::FastWriter().write(m_app->info());
265 private:
266 bool m_need_exit;
267 boost::condition m_cond;
268 boost::mutex m_lock;
269 std::deque<dnet_shared_job_t> m_jobs;
270 boost::thread m_thread;
271 std::auto_ptr<cocaine::app_t> m_app;
273 void process() {
274 while (!m_need_exit) {
276 boost::mutex::scoped_lock guard(m_lock);
277 if (m_jobs.empty()) {
278 m_cond.wait(guard);
281 if (m_need_exit)
282 break;
284 if (!m_jobs.empty()) {
285 dnet_shared_job_t job = m_jobs.front();
286 m_jobs.pop_front();
287 guard.unlock();
289 m_app->enqueue(job, cocaine::engine::mode::blocking);
296 typedef std::map<std::string, boost::shared_ptr<app_watcher> > eng_map_t;
297 typedef std::map<int, dnet_shared_job_t> jobs_map_t;
299 namespace {
300 cocaine::logging::priorities dnet_log_level_to_prio(int level) {
301 cocaine::logging::priorities prio = cocaine::logging::ignore;
302 if (level == DNET_LOG_DEBUG)
303 prio = cocaine::logging::debug;
304 else if (level == DNET_LOG_INFO)
305 prio = cocaine::logging::info;
306 else if (level == DNET_LOG_NOTICE)
307 prio = cocaine::logging::info;
308 else if (level == DNET_LOG_ERROR)
309 prio = cocaine::logging::error;
311 return prio;
315 class srw {
316 public:
317 srw(struct dnet_session *sess, const std::string &config) : m_s(sess),
318 m_ctx(config, boost::make_shared<dnet_sink_t>(m_s, dnet_log_level_to_prio(sess->node->log->log_level))) {
321 ~srw() {
322 /* no need to iterate over engines, its destructor automatically stops it */
323 #if 0
324 for (eng_map_t::iterator it = m_map.begin(); it != m_map.end(); ++it) {
325 it->second->stop();
327 #endif
330 int process(struct dnet_net_state *st, struct dnet_cmd *cmd, struct sph *sph) {
331 char *data = (char *)(sph + 1);
332 std::string event = dnet_get_event(sph, data);
334 std::vector<std::string> strs;
335 boost::split(strs, event, boost::is_any_of("@"));
337 if (strs.size() != 2) {
338 dnet_log(m_s->node, DNET_LOG_ERROR, "%s: invalid event name: must be application@event or application@start-task\n",
339 event.c_str());
340 return -EINVAL;
343 std::string app = strs[0];
344 std::string ev = strs[1];
346 if (ev == "start-task") {
347 boost::shared_ptr<app_watcher> eng(new app_watcher(m_ctx, app));
349 boost::mutex::scoped_lock guard(m_lock);
350 m_map.insert(std::make_pair(app, eng));
352 dnet_log(m_s->node, DNET_LOG_NOTICE, "srw: %s: started\n", app.c_str());
353 return 0;
354 } else if (ev == "stop-task") {
355 boost::mutex::scoped_lock guard(m_lock);
356 eng_map_t::iterator it = m_map.find(app);
357 /* destructor stops engine */
358 if (it != m_map.end())
359 m_map.erase(it);
360 guard.unlock();
362 dnet_log(m_s->node, DNET_LOG_NOTICE, "srw: %s: stopped\n", app.c_str());
363 return 0;
364 } else if (ev == "info") {
365 boost::mutex::scoped_lock guard(m_lock);
366 eng_map_t::iterator it = m_map.find(app);
367 if (it == m_map.end()) {
368 dnet_log(m_s->node, DNET_LOG_ERROR, "srw: %s: no task\n", app.c_str());
369 return -ENOENT;
372 std::string s = it->second->info();
373 dnet_log(m_s->node, DNET_LOG_INFO, "srw: %s: info: %s\n", app.c_str(), s.c_str());
374 return dnet_send_reply(st, cmd, (void *)s.data(), s.size(), 0);
375 } else if (sph->flags & (DNET_SPH_FLAGS_REPLY | DNET_SPH_FLAGS_FINISH)) {
376 boost::mutex::scoped_lock guard(m_lock);
378 jobs_map_t::iterator it = m_jobs.find(sph->src_key);
379 if (it == m_jobs.end()) {
380 dnet_log(m_s->node, DNET_LOG_ERROR, "srw: %s: %s: no job(%x) to complete\n",
381 app.c_str(), dnet_dump_id(&cmd->id), sph->src_key);
382 return -ENOENT;
385 bool final = sph->flags & DNET_SPH_FLAGS_FINISH;
386 it->second->reply(final, (char *)(sph + 1) + sph->event_size, sph->data_size + sph->binary_size);
388 dnet_log(m_s->node, DNET_LOG_INFO, "srw: %s: completed: task: %x, total-data-size: %zd, finish: %d\n",
389 app.c_str(), sph->src_key, total_size(sph), final);
391 if (final)
392 m_jobs.erase(it);
394 return 0;
395 } else {
396 if (sph->flags & DNET_SPH_FLAGS_SRC_BLOCK) {
397 sph->src_key = (unsigned long)sph;
398 memcpy(sph->src.id, cmd->id.id, sizeof(sph->src.id));
401 boost::mutex::scoped_lock guard(m_lock);
402 eng_map_t::iterator it = m_map.find(app);
403 if (it == m_map.end()) {
404 dnet_log(m_s->node, DNET_LOG_ERROR, "srw: %s: no task\n", app.c_str());
405 return -ENOENT;
408 dnet_shared_job_t job(boost::make_shared<dnet_job_t>(m_s, app, ev,
409 cocaine::blob_t((const char *)sph, total_size(sph) + sizeof(struct sph))));
411 if (sph->flags & DNET_SPH_FLAGS_SRC_BLOCK)
412 m_jobs.insert(std::make_pair(sph->src_key, job));
414 it->second->push(job);
415 guard.unlock();
417 dnet_log(m_s->node, DNET_LOG_INFO, "srw: %s: started: task: %x, total-data-size: %zd, block: %d\n",
418 app.c_str(), sph->src_key, total_size(sph), !!(sph->flags & DNET_SPH_FLAGS_SRC_BLOCK));
420 int err = 0;
421 if (sph->flags & DNET_SPH_FLAGS_SRC_BLOCK) {
422 bool success = job->wait(m_s->node->wait_ts.tv_sec);
423 if (!success)
424 throw std::runtime_error("timeout waiting for exec command to complete");
426 std::vector<char> res = job->result();
427 if (res.size()) {
428 err = dnet_send_reply(st, cmd, res.data(), res.size(), 0);
431 dnet_log(m_s->node, DNET_LOG_NOTICE, "srw: %s: %s: completed blocked task: %zd bytes\n",
432 app.c_str(), dnet_dump_id_str(sph->src.id), res.size());
435 return err;
439 private:
440 struct dnet_session *m_s;
441 cocaine::context_t m_ctx;
442 boost::mutex m_lock;
443 eng_map_t m_map;
444 jobs_map_t m_jobs;
446 std::string dnet_get_event(const struct sph *sph, const char *data) {
447 return std::string(data, sph->event_size);
450 size_t total_size(const struct sph *sph) {
451 return sph->event_size + sph->data_size + sph->binary_size;
455 int dnet_srw_init(struct dnet_node *n, struct dnet_config *cfg)
457 int err = 0;
459 if (!cfg->srw.config) {
460 dnet_log(n, DNET_LOG_ERROR, "srw: no config\n");
461 return -ENOTSUP;
464 try {
465 dnet_session *s = dnet_session_create(n);
466 dnet_session_set_groups(s, (int *)&n->id.group_id, 1);
467 n->srw = (void *)new srw(s, cfg->srw.config);
468 dnet_log(n, DNET_LOG_INFO, "srw: initialized: config: %s\n", cfg->srw.config);
469 return 0;
470 } catch (const std::exception &e) {
471 dnet_log(n, DNET_LOG_ERROR, "srw: init failed: config: %s, exception: %s\n", cfg->srw.config, e.what());
472 err = -ENOMEM;
475 return err;
478 void dnet_srw_cleanup(struct dnet_node *n)
480 if (n->srw) {
481 try {
482 delete (srw *)n->srw;
483 } catch (...) {
486 n->srw = NULL;
490 int dnet_cmd_exec_raw(struct dnet_net_state *st, struct dnet_cmd *cmd, struct sph *header, const void *data)
492 struct dnet_node *n = st->n;
493 srw *s = (srw *)n->srw;
495 if (!s)
496 return -ENOTSUP;
498 try {
499 return s->process(st, cmd, header);
500 } catch (const std::exception &e) {
501 dnet_log(n, DNET_LOG_ERROR, "%s: srw-processing: event: %.*s, data-size: %lld, binary-size: %lld, exception: %s\n",
502 dnet_dump_id(&cmd->id), header->event_size, (const char *)data,
503 (unsigned long long)header->data_size, (unsigned long long)header->binary_size,
504 e.what());
507 return -EINVAL;
510 int dnet_srw_update(struct dnet_node *, int )
512 return 0;
514 #else
515 #include <errno.h>
517 #include "elliptics.h"
519 int dnet_srw_init(struct dnet_node *, struct dnet_config *)
521 return -ENOTSUP;
524 void dnet_srw_cleanup(struct dnet_node *)
528 int dnet_cmd_exec_raw(struct dnet_net_state *, struct dnet_cmd *, struct sph *, const void *)
530 return -ENOTSUP;
533 int dnet_srw_update(struct dnet_node *, int)
535 return 0;
537 #endif