2 * 2011+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
18 #include <sys/socket.h>
29 #ifdef HAVE_COCAINE_SUPPORT
33 #include <boost/algorithm/string.hpp>
34 #include <boost/lexical_cast.hpp>
35 #include <boost/thread.hpp>
36 #include <boost/thread/condition.hpp>
40 #include <cocaine/context.hpp>
41 #include <cocaine/logging.hpp>
42 #include <cocaine/app.hpp>
43 #include <cocaine/job.hpp>
45 #include <elliptics/interface.h>
46 #include <elliptics/srw.h>
48 #include "elliptics.h"
52 srw_log(struct dnet_node
*node
, int level
, const std::string
&app
, const std::string
&message
) : m_n(node
) {
53 dnet_log(m_n
, level
, "dnet-sink: %s : %s\n", app
.c_str(), message
.c_str());
56 if (!boost::starts_with(app
, "app/") || (level
> node
->log
->log_level
))
59 std::string msg_with_date
;
65 gettimeofday(&tv
, NULL
);
66 localtime_r((time_t *)&tv
.tv_sec
, &tm
);
67 strftime(str
, sizeof(str
), "%F %R:%S", &tm
);
71 int len
= snprintf(tmp
, sizeof(tmp
), "%s.%06lu %ld/%4d %1d: ", str
, tv
.tv_usec
, dnet_get_id(), getpid(), level
);
72 msg_with_date
.assign(tmp
, len
);
73 msg_with_date
+= message
+ "\n";
75 struct dnet_io_control ctl
;
77 memset(&ctl
, 0, sizeof(ctl
));
80 ctl
.data
= msg_with_date
.data();
82 ctl
.io
.flags
= DNET_IO_FLAGS_APPEND
;
83 ctl
.io
.size
= msg_with_date
.size();
85 std::string app_log
= app
+ ".log";
86 dnet_transform(m_n
, app_log
.data(), app_log
.size(), &ctl
.id
);
91 int err
= dnet_write_data_wait(m_n
, &ctl
, (void **)&result
);
93 /* could not find remote node to send data, saving it locally */
95 log_locally(ctl
.id
, msg_with_date
);
99 std::ostringstream string
;
100 string
<< dnet_dump_id(&ctl
.id
) << ": WRITE: log-write-failed: size: " << message
.size() << ", err: " << err
;
101 throw std::runtime_error(string
.str());
108 struct dnet_node
*m_n
;
110 void log_locally(struct dnet_id
&id
, const std::string
&msg
) const {
111 std::vector
<char> data
;
112 struct dnet_cmd
*cmd
;
113 struct dnet_io_attr
*io
;
116 data
.resize(sizeof(struct dnet_cmd
) + sizeof(struct dnet_io_attr
) + msg
.size());
118 cmd
= (struct dnet_cmd
*)data
.data();
119 io
= (struct dnet_io_attr
*)(cmd
+ 1);
120 msg_data
= (char *)(io
+ 1);
123 cmd
->id
.group_id
= m_n
->id
.group_id
;
124 cmd
->cmd
= DNET_CMD_WRITE
;
125 cmd
->size
= data
.size() - sizeof(struct dnet_cmd
);
127 memcpy(io
->parent
, cmd
->id
.id
, DNET_ID_SIZE
);
128 memcpy(io
->id
, cmd
->id
.id
, DNET_ID_SIZE
);
130 io
->flags
= DNET_IO_FLAGS_APPEND
| DNET_IO_FLAGS_SKIP_SENDING
;
131 io
->size
= msg
.size();
133 memcpy(msg_data
, msg
.data(), msg
.size());
135 m_n
->cb
->command_handler(m_n
->st
, m_n
->cb
->command_private
, cmd
, (void *)(cmd
+ 1));
139 class dnet_sink_t
: public cocaine::logging::sink_t
{
141 dnet_sink_t(struct dnet_node
*n
, cocaine::logging::priorities prio
): cocaine::logging::sink_t(prio
), m_n(n
) {
144 virtual void emit(cocaine::logging::priorities prio
, const std::string
&app
, const std::string
& message
) const {
145 int level
= DNET_LOG_NOTICE
;
146 if (prio
== cocaine::logging::debug
)
147 level
= DNET_LOG_DEBUG
;
148 if (prio
== cocaine::logging::info
)
149 level
= DNET_LOG_INFO
;
150 if (prio
== cocaine::logging::warning
)
151 level
= DNET_LOG_INFO
;
152 if (prio
== cocaine::logging::error
)
153 level
= DNET_LOG_ERROR
;
154 if (prio
== cocaine::logging::ignore
)
158 srw_log
log(m_n
, level
, app
, message
);
162 struct dnet_node
*m_n
;
165 class dnet_job_t
: public cocaine::engine::job_t
168 dnet_job_t(struct dnet_node
*n
, const std::string
& event
, const cocaine::blob_t
& blob
):
169 cocaine::engine::job_t(event
, blob
),
175 virtual void react(const cocaine::engine::events::chunk
& event
) {
176 dnet_log(m_n
, DNET_LOG_INFO
, "chunk: %.*s\n", (int)event
.message
.size(), (char *)event
.message
.data());
178 boost::mutex::scoped_lock
guard(m_lock
);
179 m_res
.insert(m_res
.end(), (char *)event
.message
.data(), (char *)event
.message
.data() + event
.message
.size());
182 virtual void react(const cocaine::engine::events::choke
& ) {
183 srw_log
log(m_n
, DNET_LOG_NOTICE
, "app/" + m_name
, "processing completed, data size: " +
184 boost::lexical_cast
<std::string
>(m_res
.size()));
187 reply(true, NULL
, 0);
190 virtual void react(const cocaine::engine::events::error
& event
) {
191 srw_log
log(m_n
, DNET_LOG_ERROR
, "app/" + m_name
, event
.message
+ ": " + boost::lexical_cast
<std::string
>(event
.code
));
194 void reply(bool completed
, const char *reply
, size_t size
) {
195 boost::mutex::scoped_lock
guard(m_lock
);
198 m_res
.insert(m_res
.end(), reply
, reply
+ size
);
200 m_completed
= completed
;
204 bool wait(long timeout
) {
205 boost::system_time
const abs_time
= boost::get_system_time()+ boost::posix_time::seconds(timeout
);
207 while (!m_completed
) {
208 boost::mutex::scoped_lock
guard(m_lock
);
209 if (!m_cond
.timed_wait(guard
, abs_time
))
216 std::vector
<char> &result(void) {
223 struct dnet_node
*m_n
;
224 std::vector
<char> m_res
;
226 boost::condition m_cond
;
229 typedef boost::shared_ptr
<dnet_job_t
> dnet_shared_job_t
;
233 app_watcher(cocaine::context_t
&ctx
, const std::string
&app
) :
235 m_app
.reset(new cocaine::app_t(ctx
, app
));
238 m_thread
= boost::thread(&app_watcher::process
, this);
242 boost::mutex::scoped_lock
guard(m_lock
);
250 void push(dnet_shared_job_t job
) {
251 boost::mutex::scoped_lock
guard(m_lock
);
253 m_jobs
.push_back(job
);
258 return Json::FastWriter().write(m_app
->info());
263 boost::condition m_cond
;
265 std::deque
<dnet_shared_job_t
> m_jobs
;
266 boost::thread m_thread
;
267 std::auto_ptr
<cocaine::app_t
> m_app
;
270 while (!m_need_exit
) {
272 boost::mutex::scoped_lock
guard(m_lock
);
273 if (m_jobs
.empty()) {
280 if (!m_jobs
.empty()) {
281 dnet_shared_job_t job
= m_jobs
.front();
285 m_app
->enqueue(job
, cocaine::engine::mode::blocking
);
292 typedef std::map
<std::string
, boost::shared_ptr
<app_watcher
> > eng_map_t
;
293 typedef std::map
<int, dnet_shared_job_t
> jobs_map_t
;
296 cocaine::logging::priorities
dnet_log_level_to_prio(int level
) {
297 cocaine::logging::priorities prio
= cocaine::logging::ignore
;
298 if (level
== DNET_LOG_DEBUG
)
299 prio
= cocaine::logging::debug
;
300 else if (level
== DNET_LOG_INFO
)
301 prio
= cocaine::logging::info
;
302 else if (level
== DNET_LOG_NOTICE
)
303 prio
= cocaine::logging::info
;
304 else if (level
== DNET_LOG_ERROR
)
305 prio
= cocaine::logging::error
;
313 srw(struct dnet_node
*n
, const std::string
&config
) : m_n(n
),
314 m_ctx(config
, boost::make_shared
<dnet_sink_t
>(n
, dnet_log_level_to_prio(m_n
->log
->log_level
))) {
318 /* no need to iterate over engines, its destructor automatically stops it */
320 for (eng_map_t::iterator it
= m_map
.begin(); it
!= m_map
.end(); ++it
) {
326 int process(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, struct sph
*sph
) {
327 char *data
= (char *)(sph
+ 1);
328 std::string event
= dnet_get_event(sph
, data
);
330 std::vector
<std::string
> strs
;
331 boost::split(strs
, event
, boost::is_any_of("@"));
333 if (strs
.size() != 2) {
334 dnet_log(m_n
, DNET_LOG_ERROR
, "%s: invalid event name: must be application@event or application@start-task\n",
339 std::string app
= strs
[0];
340 std::string ev
= strs
[1];
342 if (ev
== "start-task") {
343 boost::shared_ptr
<app_watcher
> eng(new app_watcher(m_ctx
, app
));
345 boost::mutex::scoped_lock
guard(m_lock
);
346 m_map
.insert(std::make_pair(app
, eng
));
348 dnet_log(m_n
, DNET_LOG_NOTICE
, "%s: task started: %s\n", event
.c_str(), app
.c_str());
350 } else if (ev
== "stop-task") {
351 boost::mutex::scoped_lock
guard(m_lock
);
352 eng_map_t::iterator it
= m_map
.find(app
);
353 /* destructor stops engine */
354 if (it
!= m_map
.end())
358 dnet_log(m_n
, DNET_LOG_NOTICE
, "%s: task stopped: %s\n", event
.c_str(), app
.c_str());
360 } else if (ev
== "info") {
361 boost::mutex::scoped_lock
guard(m_lock
);
362 eng_map_t::iterator it
= m_map
.find(app
);
363 if (it
== m_map
.end()) {
364 dnet_log(m_n
, DNET_LOG_ERROR
, "%s: no task '%s' started\n", event
.c_str(), app
.c_str());
368 std::string s
= it
->second
->info();
369 dnet_log(m_n
, DNET_LOG_INFO
, "app engine: %s\n", s
.c_str());
370 return dnet_send_reply(st
, cmd
, (void *)s
.data(), s
.size(), 0);
371 } else if (sph
->flags
& (DNET_SPH_FLAGS_REPLY
| DNET_SPH_FLAGS_FINISH
)) {
372 boost::mutex::scoped_lock
guard(m_lock
);
374 jobs_map_t::iterator it
= m_jobs
.find(sph
->src_key
);
375 if (it
== m_jobs
.end()) {
376 dnet_log(m_n
, DNET_LOG_ERROR
, "%s: %s: no job(%x) to complete\n",
377 event
.c_str(), dnet_dump_id(&cmd
->id
), sph
->src_key
);
381 bool final
= sph
->flags
& DNET_SPH_FLAGS_FINISH
;
382 it
->second
->reply(final
, (char *)(sph
+ 1) + sph
->event_size
, sph
->data_size
+ sph
->binary_size
);
383 dnet_log(m_n
, DNET_LOG_INFO
, "%s: task completed(%x), total-data-size: %zd, finish: %d\n",
384 event
.c_str(), sph
->src_key
, total_size(sph
), final
);
391 if (sph
->flags
& DNET_SPH_FLAGS_SRC_BLOCK
) {
392 sph
->src_key
= (unsigned long)sph
;
393 memcpy(sph
->src
.id
, cmd
->id
.id
, sizeof(sph
->src
.id
));
396 boost::mutex::scoped_lock
guard(m_lock
);
397 eng_map_t::iterator it
= m_map
.find(app
);
398 if (it
== m_map
.end()) {
399 dnet_log(m_n
, DNET_LOG_ERROR
, "%s: no task '%s' started\n", event
.c_str(), app
.c_str());
403 dnet_shared_job_t
job(boost::make_shared
<dnet_job_t
>(m_n
, ev
,
404 cocaine::blob_t((const char *)sph
, total_size(sph
) + sizeof(struct sph
))));
406 if (sph
->flags
& DNET_SPH_FLAGS_SRC_BLOCK
)
407 m_jobs
.insert(std::make_pair(sph
->src_key
, job
));
409 it
->second
->push(job
);
412 dnet_log(m_n
, DNET_LOG_INFO
, "%s: task queued(%x), total-data-size: %zd, block: %d\n",
413 event
.c_str(), sph
->src_key
, total_size(sph
), !!(sph
->flags
& DNET_SPH_FLAGS_SRC_BLOCK
));
416 if (sph
->flags
& DNET_SPH_FLAGS_SRC_BLOCK
) {
417 bool success
= job
->wait(m_n
->wait_ts
.tv_sec
);
419 throw std::runtime_error("timeout waiting for exec command to complete");
421 std::vector
<char> res
= job
->result();
423 err
= dnet_send_reply(st
, cmd
, res
.data(), res
.size(), 0);
426 dnet_log(m_n
, DNET_LOG_NOTICE
, "%s: %s: blocked task reply: %zd bytes\n",
427 event
.c_str(), dnet_dump_id_str(sph
->src
.id
), res
.size());
435 struct dnet_node
*m_n
;
436 cocaine::context_t m_ctx
;
441 std::string
dnet_get_event(const struct sph
*sph
, const char *data
) {
442 return std::string(data
, sph
->event_size
);
445 size_t total_size(const struct sph
*sph
) {
446 return sph
->event_size
+ sph
->data_size
+ sph
->binary_size
;
450 int dnet_srw_init(struct dnet_node
*n
, struct dnet_config
*cfg
)
454 if (!cfg
->srw
.config
) {
455 dnet_log(n
, DNET_LOG_ERROR
, "srw: no config\n");
460 dnet_node_set_groups(n
, (int *)&n
->id
.group_id
, 1);
461 n
->srw
= (void *)new srw(n
, cfg
->srw
.config
);
462 dnet_log(n
, DNET_LOG_INFO
, "srw: initialized: config: %s\n", cfg
->srw
.config
);
464 } catch (const std::exception
&e
) {
465 dnet_log(n
, DNET_LOG_ERROR
, "srw: init failed: config: %s, exception: %s\n", cfg
->srw
.config
, e
.what());
472 void dnet_srw_cleanup(struct dnet_node
*n
)
476 delete (srw
*)n
->srw
;
484 int dnet_cmd_exec_raw(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, struct sph
*header
, const void *data
)
486 struct dnet_node
*n
= st
->n
;
487 srw
*s
= (srw
*)n
->srw
;
493 return s
->process(st
, cmd
, header
);
494 } catch (const std::exception
&e
) {
495 dnet_log(n
, DNET_LOG_ERROR
, "%s: srw-processing: event: %.*s, data-size: %lld, binary-size: %lld, exception: %s\n",
496 dnet_dump_id(&cmd
->id
), header
->event_size
, (const char *)data
,
497 (unsigned long long)header
->data_size
, (unsigned long long)header
->binary_size
,
504 int dnet_srw_update(struct dnet_node
*, int )
511 #include "elliptics.h"
513 int dnet_srw_init(struct dnet_node
*, struct dnet_config
*)
518 void dnet_srw_cleanup(struct dnet_node
*)
522 int dnet_cmd_exec_raw(struct dnet_net_state
*, struct dnet_cmd
*, struct sph
*, const void *)
527 int dnet_srw_update(struct dnet_node
*, int)