More refactor. Added trivial config parser into c++ bindings for client.
[elliptics.git] / include / elliptics / srw / pipe.hpp
blob97e623f99a3a0cf1ae4c99dcee58216a0434e642
1 #ifndef __ELLIPTICS_SRW_PIPE_HPP
2 #define __ELLIPTICS_SRW_PIPE_HPP
4 #include <iostream>
5 #include <fstream>
6 #include <stdexcept>
7 #include <sstream>
9 #include <boost/thread/mutex.hpp>
10 #include <boost/shared_array.hpp>
11 #include <boost/shared_ptr.hpp>
12 #include <boost/thread/condition.hpp>
14 #include <elliptics/packet.h>
16 namespace ioremap {
17 namespace srw {
19 static inline std::string get_event(const struct sph &header, const char *data) {
20 std::string event;
21 event.assign(data, header.event_size);
23 return event;
26 class pipe {
27 public:
28 pipe(const std::string &pipe_base, bool worker) : base(pipe_base) {
29 create_and_open(worker);
32 virtual ~pipe() {
33 std::string m_p = base + ".w2c";
34 unlink(m_p.c_str());
36 m_p = base + ".c2w";
37 unlink(m_p.c_str());
40 void read(struct sph &header, std::string &data) {
41 rpipe.read((char *)&header, sizeof(struct sph));
43 data.resize(hsize(header));
44 rpipe.read((char *)data.data(), hsize(header));
47 void write(struct sph &header, const char *data) {
48 wpipe.write((char *)&header, sizeof(struct sph));
50 if (header.data_size && data)
51 wpipe.write(data, hsize(header));
53 wpipe.flush();
56 private:
57 std::string base;
58 std::fstream rpipe, wpipe;
60 size_t hsize(struct sph &header) {
61 return header.data_size + header.binary_size + header.event_size;
64 void create_fifo(const std::string &path) {
65 int err;
66 err = mkfifo(path.c_str(), 0644);
67 if (err < 0) {
68 err = -errno;
69 if (err != -EEXIST) {
70 std::ostringstream str;
72 str << "could not create fifo '" << path << "': " << err;
73 throw std::runtime_error(str.str());
78 void create_and_open(int worker) {
79 std::string w2c = base + ".w2c";
80 std::string c2w = base + ".c2w";
82 create_fifo(w2c);
83 create_fifo(c2w);
86 * Order is significant - one side must open read first while another one must open write endpoint first
88 if (worker) {
89 rpipe.open(c2w.c_str(), std::ios_base::in | std::ios_base::binary);
90 wpipe.open(w2c.c_str(), std::ios_base::out | std::ios_base::binary);
91 } else {
92 wpipe.open(c2w.c_str(), std::ios_base::out | std::ios_base::binary);
93 rpipe.open(w2c.c_str(), std::ios_base::in | std::ios_base::binary);
96 rpipe.exceptions(std::ifstream::failbit | std::ifstream::badbit);
97 wpipe.exceptions(std::ifstream::failbit | std::ifstream::badbit);
104 #endif /* __ELLIPTICS_SRW_PIPE_HPP */