Add azdht module.
[tairent.git] / src / azdht / azdhtmodule.cpp
blob6bc3f0fa62874c993956d346cf6fa619ce459579
1 /***************************************************************************
2 * *
3 * Copyright (C) 2007 David Brodsky *
4 * *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License as *
7 * published by the Free Software Foundation and appearing *
8 * in the file LICENSE.GPL included in the packaging of this file. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
13 * General Public License for more details. *
14 * *
15 ***************************************************************************/
17 #include <arpa/inet.h>
19 #include <sstream>
21 #include <tairon/core/config.h>
22 #include <tairon/core/log.h>
23 #include <tairon/core/thread.h>
24 #include <tairon/net/socket.h>
25 #include <tairon/net/timer.h>
27 #include "azdhtmodule.h"
29 #include "core/bencode.h"
30 #include "main/torrentmanager.h"
32 namespace Tairent
35 namespace AzDHT
38 AzDHTModule *AzDHTModule::azdhtmodule = 0;
40 /* {{{ AzDHTModule::AzDHTModule() */
41 AzDHTModule::AzDHTModule() : Tairon::Core::Module(), bufSize(1024), readMethod(0)
43 buffer = new char[bufSize];
45 Tairon::Core::Thread *current = Tairon::Core::Thread::current();
47 timer = new Tairon::Net::Timer();
48 timer->timeoutSignal.connect(Tairon::Core::threadMethodDFunctor(current, this, &AzDHTModule::reconnect));
50 socket = new Tairon::Net::Socket(Tairon::Net::Socket::IPv4, Tairon::Net::Socket::Stream);
51 socket->connectedSignal.connect(Tairon::Core::threadMethodDFunctor(current, this, &AzDHTModule::connected));
52 socket->errorSignal.connect(Tairon::Core::threadMethodDFunctor(current, this, &AzDHTModule::socketError));
54 uint16_t port;
55 try {
56 port = atoi((*Tairon::Core::Config::self())["azdht-port"]);
57 } catch (const Tairon::Core::KeyException &) {
58 INFO("Configuration is missing azdht-port record, using default: 7994");
59 port = 7994;
62 try {
63 socket->connect("localhost", port);
64 } catch (const Tairon::Net::SocketException &) {
65 INFO("Cannot connect to AzDHT program, retrying");
66 connectionError();
69 azdhtmodule = this;
71 /* }}} */
73 /* {{{ AzDHTModule::~AzDHTModule */
74 AzDHTModule::~AzDHTModule()
76 timer->destroy();
78 if (socket)
79 socket->close();
81 for (std::map<String, Tairon::Core::Functor2<void, const String &, const Tairent::Core::BEncode &> *>::iterator it = handlers.begin(); it != handlers.end(); ++it)
82 delete it->second;
84 delete [] buffer;
86 azdhtmodule = 0;
88 /* }}} */
90 /* {{{ AzDHTModule::connected(Tairon::Net::Socket *) */
91 void AzDHTModule::connected(Tairon::Net::Socket *)
93 offset = 0;
94 readMethod = &AzDHTModule::readMessageLength;
96 socket->connectedSignal.clear();
97 socket->readyReadSignal.connect(Tairon::Core::threadMethodDFunctor(Tairon::Core::Thread::current(), this, &AzDHTModule::readyRead));
98 socket->ready();
100 /* }}} */
102 /* {{{ AzDHTModule::connectionError() */
103 void AzDHTModule::connectionError()
105 socket->close();
106 socket = 0;
107 readMethod = 0;
109 timer->start(60 * 1000, true); // reconnect in 1 minute
111 /* }}} */
113 /* {{{ AzDHTModule::enlargeBuffer(uint32_t) */
114 void AzDHTModule::enlargeBuffer(uint32_t size)
116 if (bufSize >= size) // nothing to do
117 return;
119 char *newBuf = new char[size];
120 memcpy(newBuf, buffer, bufSize);
121 delete [] buffer;
122 buffer = newBuf;
124 /* }}} */
126 /* {{{ AzDHTModule::get(const String &key, char flags) */
127 void AzDHTModule::get(const String &key, char flags)
129 Tairent::Core::BEncode b(Tairent::Core::BEncode::MAP);
130 b["request"] = String("get");
131 b["key"] = key;
132 b["flags"] = flags;
134 sendQuery(b);
136 /* }}} */
138 /* {{{ AzDHTModule::isConnected() */
139 bool AzDHTModule::isConnected()
141 return readMethod != 0;
143 /* }}} */
145 /* {{{ AzDHTModule::processMessage(const Tairent::Core::BEncode &) */
146 void AzDHTModule::processMessage(const Tairent::Core::BEncode &b)
148 String response;
149 String key;
151 try {
152 response = b["response"].asString();
153 key = b["key"].asString();
154 } catch (const Tairent::Core::BEncodeException &e) {
155 WARNING((const char *) String("Error while processing server's response: " + (const String &) e));
156 return;
159 if (handlers.count(key))
160 (*handlers[key])(key, b["values"]);
162 /* }}} */
164 /* {{{ AzDHTModule::put(const String &, const String &, char) */
165 void AzDHTModule::put(const String &key, const String &value, char flags)
167 Tairent::Core::BEncode b(Tairent::Core::BEncode::MAP);
168 b["request"] = String("put");
169 b["key"] = key;
170 b["value"] = value;
171 b["flags"] = flags;
173 sendQuery(b);
175 /* }}} */
177 /* {{{ AzDHTModule::readMessage() */
178 void AzDHTModule::readMessage()
180 offset += socket->readTo(buffer + offset, msgLength - offset);
182 if (offset < msgLength) // incomplete message
183 return;
185 offset = 0;
186 readMethod = &AzDHTModule::readMessageLength;
188 std::stringstream s(String(buffer, msgLength));
189 Tairent::Core::BEncode b;
190 try {
191 s >> b;
192 } catch (const Tairent::Core::BEncode &e) { // shouldn't happen
193 WARNING((const char *) String("AzDHT sent corrupted bencode: " + (const String &) e));
194 connectionError();
195 return;
198 processMessage(b);
200 /* }}} */
202 /* {{{ AzDHTModule::readMessageLength() */
203 void AzDHTModule::readMessageLength()
205 offset += socket->readTo(buffer + offset, 4 - offset);
207 if (offset < 4) // incomplete length
208 return;
210 msgLength = ntohl(*(uint32_t *) buffer);
211 enlargeBuffer(msgLength);
213 offset = 0;
214 readMethod = &AzDHTModule::readMessage;
216 /* }}} */
218 /* {{{ AzDHTModule::readyRead(Tairon::Net::Socket *) */
219 void AzDHTModule::readyRead(Tairon::Net::Socket *)
221 try {
222 (this->*readMethod)();
223 } catch (const Tairon::Net::SocketException &) { // connection has been closed
224 connectionError();
227 /* }}} */
229 /* {{{ AzDHTModule::reconnect() */
230 void AzDHTModule::reconnect()
232 Tairon::Core::Thread *current = Tairon::Core::Thread::current();
234 socket = new Tairon::Net::Socket(Tairon::Net::Socket::IPv4, Tairon::Net::Socket::Stream);
235 socket->connectedSignal.connect(Tairon::Core::threadMethodDFunctor(current, this, &AzDHTModule::connected));
236 socket->errorSignal.connect(Tairon::Core::threadMethodDFunctor(current, this, &AzDHTModule::socketError));
238 uint16_t port;
239 try {
240 port = atoi((*Tairon::Core::Config::self())["azdht-port"]);
241 } catch (const Tairon::Core::KeyException &) {
242 port = 7994;
245 try {
246 socket->connect("localhost", port);
247 } catch (const Tairon::Net::SocketException &) {
248 INFO("Cannot connect to AzDHT program, retrying");
250 connectionError();
253 /* }}} */
255 /* {{{ AzDHTModule::registerHandler(const String &, Tairon::Core::Functor2<void, const String &, const Tairent::Core::BEncode &> *) */
256 void AzDHTModule::registerHandler(const String &key, Tairon::Core::Functor2<void, const String &, const Tairent::Core::BEncode &> *handler)
258 if (handlers.count(key))
259 delete handlers[key];
260 handlers[key] = handler;
262 /* }}} */
264 /* {{{ AzDHTModule::remove(const String &) */
265 void AzDHTModule::remove(const String &key)
267 Tairent::Core::BEncode b(Tairent::Core::BEncode::MAP);
268 b["request"] = String("remove");
269 b["key"] = key;
271 sendQuery(b);
273 /* }}} */
275 /* {{{ AzDHTModule::sendQuery(const Tairent::Core::BEncode &) */
276 void AzDHTModule::sendQuery(const Tairent::Core::BEncode &query)
278 std::stringstream s;
279 s << query;
280 String data;
281 data.reserve(4 + s.str().length());
283 uint32_t length = htonl(s.str().length());
284 data = String((const char * ) &length, 4);
285 data += s.str();
287 try {
288 socket->write(data, true);
289 } catch (const Tairon::Net::SocketException &) {
290 connectionError();
293 /* }}} */
295 /* {{{ AzDHTModule::socketError(Tairon::Net::Socket *, int) */
296 void AzDHTModule::socketError(Tairon::Net::Socket *, int err)
298 connectionError();
300 /* }}} */
302 }; // namespace AzDHT
304 }; // namespace Tairent
306 EXPORT_MODULE(azdht, Tairent::AzDHT::AzDHTModule)
308 // vim: ai sw=4 ts=4 noet fdm=marker