2 * 2011+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
18 #include <sys/socket.h>
29 #ifdef HAVE_COCAINE_SUPPORT
33 #include <boost/algorithm/string.hpp>
34 #include <boost/lexical_cast.hpp>
35 #include <boost/thread.hpp>
36 #include <boost/thread/condition.hpp>
40 #include <cocaine/context.hpp>
41 #include <cocaine/logging.hpp>
42 #include <cocaine/app.hpp>
43 #include <cocaine/job.hpp>
45 #include <elliptics/interface.h>
46 #include <elliptics/srw.h>
48 #include "elliptics.h"
52 srw_log(struct dnet_node
*node
, int level
, const std::string
&app
, const std::string
&message
) : m_n(node
) {
53 dnet_log(m_n
, level
, "srw: %s : %s\n", app
.c_str(), message
.c_str());
56 if (!boost::starts_with(app
, "app/") || (level
> node
->log
->log_level
))
59 std::string msg_with_date
;
65 gettimeofday(&tv
, NULL
);
66 localtime_r((time_t *)&tv
.tv_sec
, &tm
);
67 strftime(str
, sizeof(str
), "%F %R:%S", &tm
);
71 int len
= snprintf(tmp
, sizeof(tmp
), "%s.%06lu %ld/%4d %1d: ", str
, tv
.tv_usec
, dnet_get_id(), getpid(), level
);
72 msg_with_date
.assign(tmp
, len
);
73 msg_with_date
+= message
+ "\n";
75 struct dnet_io_control ctl
;
77 memset(&ctl
, 0, sizeof(ctl
));
80 ctl
.data
= msg_with_date
.data();
82 ctl
.io
.flags
= DNET_IO_FLAGS_APPEND
;
83 ctl
.io
.size
= msg_with_date
.size();
85 std::string app_log
= app
+ ".log";
86 dnet_transform(m_n
, app_log
.data(), app_log
.size(), &ctl
.id
);
91 int err
= dnet_write_data_wait(m_n
, &ctl
, (void **)&result
);
93 /* could not find remote node to send data, saving it locally */
95 log_locally(ctl
.id
, msg_with_date
);
99 std::ostringstream string
;
100 string
<< dnet_dump_id(&ctl
.id
) << ": WRITE: log-write-failed: size: " << message
.size() << ", err: " << err
;
101 throw std::runtime_error(string
.str());
108 struct dnet_node
*m_n
;
110 void log_locally(struct dnet_id
&id
, const std::string
&msg
) const {
111 std::vector
<char> data
;
112 struct dnet_cmd
*cmd
;
113 struct dnet_io_attr
*io
;
116 data
.resize(sizeof(struct dnet_cmd
) + sizeof(struct dnet_io_attr
) + msg
.size());
118 cmd
= (struct dnet_cmd
*)data
.data();
119 io
= (struct dnet_io_attr
*)(cmd
+ 1);
120 msg_data
= (char *)(io
+ 1);
123 cmd
->id
.group_id
= m_n
->id
.group_id
;
124 cmd
->cmd
= DNET_CMD_WRITE
;
125 cmd
->size
= data
.size() - sizeof(struct dnet_cmd
);
127 memcpy(io
->parent
, cmd
->id
.id
, DNET_ID_SIZE
);
128 memcpy(io
->id
, cmd
->id
.id
, DNET_ID_SIZE
);
130 io
->flags
= DNET_IO_FLAGS_APPEND
| DNET_IO_FLAGS_SKIP_SENDING
;
131 io
->size
= msg
.size();
133 memcpy(msg_data
, msg
.data(), msg
.size());
135 m_n
->cb
->command_handler(m_n
->st
, m_n
->cb
->command_private
, cmd
, (void *)(cmd
+ 1));
139 class dnet_sink_t
: public cocaine::logging::sink_t
{
141 dnet_sink_t(struct dnet_node
*n
, cocaine::logging::priorities prio
): cocaine::logging::sink_t(prio
), m_n(n
) {
144 virtual void emit(cocaine::logging::priorities prio
, const std::string
&app
, const std::string
& message
) const {
145 int level
= DNET_LOG_NOTICE
;
146 if (prio
== cocaine::logging::debug
)
147 level
= DNET_LOG_DEBUG
;
148 if (prio
== cocaine::logging::info
)
149 level
= DNET_LOG_INFO
;
150 if (prio
== cocaine::logging::warning
)
151 level
= DNET_LOG_INFO
;
152 if (prio
== cocaine::logging::error
)
153 level
= DNET_LOG_ERROR
;
154 if (prio
== cocaine::logging::ignore
)
158 srw_log
log(m_n
, level
, app
, message
);
162 struct dnet_node
*m_n
;
165 class dnet_job_t
: public cocaine::engine::job_t
168 dnet_job_t(struct dnet_node
*n
, const std::string
&app
, const std::string
& event
, const cocaine::blob_t
& blob
):
169 cocaine::engine::job_t(event
, blob
),
171 m_name(app
+ "/" + event
),
175 virtual void react(const cocaine::engine::events::chunk
& event
) {
176 boost::mutex::scoped_lock
guard(m_lock
);
177 m_res
.insert(m_res
.end(), (char *)event
.message
.data(), (char *)event
.message
.data() + event
.message
.size());
179 std::ostringstream msg
;
180 msg
<< "received reply chunk: size: " << event
.message
.size() << ", data: '" << event
.message
.data() << "', " <<
181 "accumulated-reply-size: " << m_res
.size() << std::endl
;
183 srw_log
log(m_n
, DNET_LOG_NOTICE
, "app/" + m_name
, msg
.str());
186 virtual void react(const cocaine::engine::events::choke
& ) {
187 srw_log
log(m_n
, DNET_LOG_NOTICE
, "app/" + m_name
, "job completed, data size: " +
188 boost::lexical_cast
<std::string
>(m_res
.size()));
191 reply(true, NULL
, 0);
194 virtual void react(const cocaine::engine::events::error
& event
) {
195 srw_log
log(m_n
, DNET_LOG_ERROR
, "app/" + m_name
, event
.message
+ ": " + boost::lexical_cast
<std::string
>(event
.code
));
198 void reply(bool completed
, const char *reply
, size_t size
) {
199 boost::mutex::scoped_lock
guard(m_lock
);
202 m_res
.insert(m_res
.end(), reply
, reply
+ size
);
204 m_completed
= completed
;
208 bool wait(long timeout
) {
209 boost::system_time
const abs_time
= boost::get_system_time()+ boost::posix_time::seconds(timeout
);
211 while (!m_completed
) {
212 boost::mutex::scoped_lock
guard(m_lock
);
213 if (!m_cond
.timed_wait(guard
, abs_time
))
220 std::vector
<char> &result(void) {
227 struct dnet_node
*m_n
;
228 std::vector
<char> m_res
;
230 boost::condition m_cond
;
233 typedef boost::shared_ptr
<dnet_job_t
> dnet_shared_job_t
;
237 app_watcher(cocaine::context_t
&ctx
, const std::string
&app
) :
239 m_app
.reset(new cocaine::app_t(ctx
, app
));
242 m_thread
= boost::thread(&app_watcher::process
, this);
246 boost::mutex::scoped_lock
guard(m_lock
);
254 void push(dnet_shared_job_t job
) {
255 boost::mutex::scoped_lock
guard(m_lock
);
257 m_jobs
.push_back(job
);
262 return Json::FastWriter().write(m_app
->info());
267 boost::condition m_cond
;
269 std::deque
<dnet_shared_job_t
> m_jobs
;
270 boost::thread m_thread
;
271 std::auto_ptr
<cocaine::app_t
> m_app
;
274 while (!m_need_exit
) {
276 boost::mutex::scoped_lock
guard(m_lock
);
277 if (m_jobs
.empty()) {
284 if (!m_jobs
.empty()) {
285 dnet_shared_job_t job
= m_jobs
.front();
289 m_app
->enqueue(job
, cocaine::engine::mode::blocking
);
296 typedef std::map
<std::string
, boost::shared_ptr
<app_watcher
> > eng_map_t
;
297 typedef std::map
<int, dnet_shared_job_t
> jobs_map_t
;
300 cocaine::logging::priorities
dnet_log_level_to_prio(int level
) {
301 cocaine::logging::priorities prio
= cocaine::logging::ignore
;
302 if (level
== DNET_LOG_DEBUG
)
303 prio
= cocaine::logging::debug
;
304 else if (level
== DNET_LOG_INFO
)
305 prio
= cocaine::logging::info
;
306 else if (level
== DNET_LOG_NOTICE
)
307 prio
= cocaine::logging::info
;
308 else if (level
== DNET_LOG_ERROR
)
309 prio
= cocaine::logging::error
;
317 srw(struct dnet_node
*n
, const std::string
&config
) : m_n(n
),
318 m_ctx(config
, boost::make_shared
<dnet_sink_t
>(n
, dnet_log_level_to_prio(m_n
->log
->log_level
))) {
322 /* no need to iterate over engines, its destructor automatically stops it */
324 for (eng_map_t::iterator it
= m_map
.begin(); it
!= m_map
.end(); ++it
) {
330 int process(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, struct sph
*sph
) {
331 char *data
= (char *)(sph
+ 1);
332 std::string event
= dnet_get_event(sph
, data
);
334 std::vector
<std::string
> strs
;
335 boost::split(strs
, event
, boost::is_any_of("@"));
337 if (strs
.size() != 2) {
338 dnet_log(m_n
, DNET_LOG_ERROR
, "%s: invalid event name: must be application@event or application@start-task\n",
343 std::string app
= strs
[0];
344 std::string ev
= strs
[1];
346 if (ev
== "start-task") {
347 boost::shared_ptr
<app_watcher
> eng(new app_watcher(m_ctx
, app
));
349 boost::mutex::scoped_lock
guard(m_lock
);
350 m_map
.insert(std::make_pair(app
, eng
));
352 dnet_log(m_n
, DNET_LOG_NOTICE
, "srw: %s: started\n", app
.c_str());
354 } else if (ev
== "stop-task") {
355 boost::mutex::scoped_lock
guard(m_lock
);
356 eng_map_t::iterator it
= m_map
.find(app
);
357 /* destructor stops engine */
358 if (it
!= m_map
.end())
362 dnet_log(m_n
, DNET_LOG_NOTICE
, "srw: %s: stopped\n", app
.c_str());
364 } else if (ev
== "info") {
365 boost::mutex::scoped_lock
guard(m_lock
);
366 eng_map_t::iterator it
= m_map
.find(app
);
367 if (it
== m_map
.end()) {
368 dnet_log(m_n
, DNET_LOG_ERROR
, "srw: %s: no task\n", app
.c_str());
372 std::string s
= it
->second
->info();
373 dnet_log(m_n
, DNET_LOG_INFO
, "srw: %s: info: %s\n", app
.c_str(), s
.c_str());
374 return dnet_send_reply(st
, cmd
, (void *)s
.data(), s
.size(), 0);
375 } else if (sph
->flags
& (DNET_SPH_FLAGS_REPLY
| DNET_SPH_FLAGS_FINISH
)) {
376 boost::mutex::scoped_lock
guard(m_lock
);
378 jobs_map_t::iterator it
= m_jobs
.find(sph
->src_key
);
379 if (it
== m_jobs
.end()) {
380 dnet_log(m_n
, DNET_LOG_ERROR
, "srw: %s: %s: no job(%x) to complete\n",
381 app
.c_str(), dnet_dump_id(&cmd
->id
), sph
->src_key
);
385 bool final
= sph
->flags
& DNET_SPH_FLAGS_FINISH
;
386 it
->second
->reply(final
, (char *)(sph
+ 1) + sph
->event_size
, sph
->data_size
+ sph
->binary_size
);
387 dnet_log(m_n
, DNET_LOG_INFO
, "srw: %s: completed: task: %x, total-data-size: %zd, finish: %d\n",
388 app
.c_str(), sph
->src_key
, total_size(sph
), final
);
395 if (sph
->flags
& DNET_SPH_FLAGS_SRC_BLOCK
) {
396 sph
->src_key
= (unsigned long)sph
;
397 memcpy(sph
->src
.id
, cmd
->id
.id
, sizeof(sph
->src
.id
));
400 boost::mutex::scoped_lock
guard(m_lock
);
401 eng_map_t::iterator it
= m_map
.find(app
);
402 if (it
== m_map
.end()) {
403 dnet_log(m_n
, DNET_LOG_ERROR
, "srw: %s: no task\n", app
.c_str());
407 dnet_shared_job_t
job(boost::make_shared
<dnet_job_t
>(m_n
, app
, ev
,
408 cocaine::blob_t((const char *)sph
, total_size(sph
) + sizeof(struct sph
))));
410 if (sph
->flags
& DNET_SPH_FLAGS_SRC_BLOCK
)
411 m_jobs
.insert(std::make_pair(sph
->src_key
, job
));
413 it
->second
->push(job
);
416 dnet_log(m_n
, DNET_LOG_INFO
, "srw: %s: started: task: %x, total-data-size: %zd, block: %d\n",
417 app
.c_str(), sph
->src_key
, total_size(sph
), !!(sph
->flags
& DNET_SPH_FLAGS_SRC_BLOCK
));
420 if (sph
->flags
& DNET_SPH_FLAGS_SRC_BLOCK
) {
421 bool success
= job
->wait(m_n
->wait_ts
.tv_sec
);
423 throw std::runtime_error("timeout waiting for exec command to complete");
425 std::vector
<char> res
= job
->result();
427 err
= dnet_send_reply(st
, cmd
, res
.data(), res
.size(), 0);
430 dnet_log(m_n
, DNET_LOG_NOTICE
, "srw: %s: %s: completed blocked task: %zd bytes\n",
431 app
.c_str(), dnet_dump_id_str(sph
->src
.id
), res
.size());
439 struct dnet_node
*m_n
;
440 cocaine::context_t m_ctx
;
445 std::string
dnet_get_event(const struct sph
*sph
, const char *data
) {
446 return std::string(data
, sph
->event_size
);
449 size_t total_size(const struct sph
*sph
) {
450 return sph
->event_size
+ sph
->data_size
+ sph
->binary_size
;
454 int dnet_srw_init(struct dnet_node
*n
, struct dnet_config
*cfg
)
458 if (!cfg
->srw
.config
) {
459 dnet_log(n
, DNET_LOG_ERROR
, "srw: no config\n");
464 dnet_node_set_groups(n
, (int *)&n
->id
.group_id
, 1);
465 n
->srw
= (void *)new srw(n
, cfg
->srw
.config
);
466 dnet_log(n
, DNET_LOG_INFO
, "srw: initialized: config: %s\n", cfg
->srw
.config
);
468 } catch (const std::exception
&e
) {
469 dnet_log(n
, DNET_LOG_ERROR
, "srw: init failed: config: %s, exception: %s\n", cfg
->srw
.config
, e
.what());
476 void dnet_srw_cleanup(struct dnet_node
*n
)
480 delete (srw
*)n
->srw
;
488 int dnet_cmd_exec_raw(struct dnet_net_state
*st
, struct dnet_cmd
*cmd
, struct sph
*header
, const void *data
)
490 struct dnet_node
*n
= st
->n
;
491 srw
*s
= (srw
*)n
->srw
;
497 return s
->process(st
, cmd
, header
);
498 } catch (const std::exception
&e
) {
499 dnet_log(n
, DNET_LOG_ERROR
, "%s: srw-processing: event: %.*s, data-size: %lld, binary-size: %lld, exception: %s\n",
500 dnet_dump_id(&cmd
->id
), header
->event_size
, (const char *)data
,
501 (unsigned long long)header
->data_size
, (unsigned long long)header
->binary_size
,
508 int dnet_srw_update(struct dnet_node
*, int )
515 #include "elliptics.h"
517 int dnet_srw_init(struct dnet_node
*, struct dnet_config
*)
522 void dnet_srw_cleanup(struct dnet_node
*)
526 int dnet_cmd_exec_raw(struct dnet_net_state
*, struct dnet_cmd
*, struct sph
*, const void *)
531 int dnet_srw_update(struct dnet_node
*, int)