1 #ifndef __ELLIPTICS_SRW_PIPE_HPP
2 #define __ELLIPTICS_SRW_PIPE_HPP
9 #include <boost/thread/mutex.hpp>
10 #include <boost/shared_array.hpp>
11 #include <boost/shared_ptr.hpp>
12 #include <boost/thread/condition.hpp>
14 #include <elliptics/packet.h>
19 static inline std::string
get_event(const struct sph
&header
, const char *data
) {
21 event
.assign(data
, header
.event_size
);
28 pipe(const std::string
&pipe_base
, bool worker
) : base(pipe_base
) {
29 create_and_open(worker
);
33 std::string m_p
= base
+ ".w2c";
40 void read(struct sph
&header
, std::string
&data
) {
41 rpipe
.read((char *)&header
, sizeof(struct sph
));
43 data
.resize(hsize(header
));
44 rpipe
.read((char *)data
.data(), hsize(header
));
47 void write(struct sph
&header
, const char *data
) {
48 wpipe
.write((char *)&header
, sizeof(struct sph
));
50 if (header
.data_size
&& data
)
51 wpipe
.write(data
, hsize(header
));
58 std::fstream rpipe
, wpipe
;
60 size_t hsize(struct sph
&header
) {
61 return header
.data_size
+ header
.binary_size
+ header
.event_size
;
64 void create_fifo(const std::string
&path
) {
66 err
= mkfifo(path
.c_str(), 0644);
70 std::ostringstream str
;
72 str
<< "could not create fifo '" << path
<< "': " << err
;
73 throw std::runtime_error(str
.str());
78 void create_and_open(int worker
) {
79 std::string w2c
= base
+ ".w2c";
80 std::string c2w
= base
+ ".c2w";
86 * Order is significant - one side must open read first while another one must open write endpoint first
89 rpipe
.open(c2w
.c_str(), std::ios_base::in
| std::ios_base::binary
);
90 wpipe
.open(w2c
.c_str(), std::ios_base::out
| std::ios_base::binary
);
92 wpipe
.open(c2w
.c_str(), std::ios_base::out
| std::ios_base::binary
);
93 rpipe
.open(w2c
.c_str(), std::ios_base::in
| std::ios_base::binary
);
96 rpipe
.exceptions(std::ifstream::failbit
| std::ifstream::badbit
);
97 wpipe
.exceptions(std::ifstream::failbit
| std::ifstream::badbit
);
104 #endif /* __ELLIPTICS_SRW_PIPE_HPP */