Add missing #include to src/main/torrentclient.h.
[tairent.git] / src / azdht / azdhtmodule.cpp
blob843a1da25ad2b7bce0ae96de0ceec55a660e885b
1 /***************************************************************************
2 * *
3 * Copyright (C) 2007 David Brodsky *
4 * *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License as *
7 * published by the Free Software Foundation and appearing *
8 * in the file LICENSE.GPL included in the packaging of this file. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
13 * General Public License for more details. *
14 * *
15 ***************************************************************************/
17 #include <arpa/inet.h>
19 #include <sstream>
21 #include <tairon/core/config.h>
22 #include <tairon/core/log.h>
23 #include <tairon/core/thread.h>
24 #include <tairon/net/socket.h>
25 #include <tairon/net/timer.h>
27 #include "azdhtmodule.h"
29 #include "core/bencode.h"
30 #include "main/torrentmanager.h"
32 namespace Tairent
35 namespace AzDHT
38 AzDHTModule *AzDHTModule::azdhtmodule = 0;
40 /* {{{ AzDHTModule::AzDHTModule() */
41 AzDHTModule::AzDHTModule() : Tairon::Core::Module(), bufSize(1024), readMethod(0)
43 buffer = new char[bufSize];
45 Tairon::Core::Thread *current = Tairon::Core::Thread::current();
47 timer = new Tairon::Net::Timer();
48 timer->timeoutSignal.connect(Tairon::Core::threadMethodDFunctor(current, this, &AzDHTModule::reconnect));
50 socket = new Tairon::Net::Socket(Tairon::Net::Socket::IPv4, Tairon::Net::Socket::Stream);
51 socket->connectedSignal.connect(Tairon::Core::threadMethodDFunctor(current, this, &AzDHTModule::connected));
52 socket->errorSignal.connect(Tairon::Core::threadMethodDFunctor(current, this, &AzDHTModule::socketError));
54 try {
55 address = (*Tairon::Core::Config::self())["azdht-address"];
56 } catch (const Tairon::Core::KeyException &) {
57 INFO("Configuration is missing azdht-address record, using default: 127.0.0.1");
58 address = "127.0.0.1";
61 try {
62 port = atoi((*Tairon::Core::Config::self())["azdht-port"]);
63 } catch (const Tairon::Core::KeyException &) {
64 INFO("Configuration is missing azdht-port record, using default: 7994");
65 port = 7994;
68 try {
69 socket->connect(address, port);
70 } catch (const Tairon::Net::SocketException &) {
71 INFO("Cannot connect to AzDHT program, retrying");
72 connectionError();
75 azdhtmodule = this;
77 /* }}} */
79 /* {{{ AzDHTModule::~AzDHTModule() */
80 AzDHTModule::~AzDHTModule()
82 timer->destroy();
84 if (socket)
85 socket->close();
87 for (std::map<String, Tairon::Core::Functor2<void, const String &, const Tairent::Core::BEncode &> *>::iterator it = handlers.begin(); it != handlers.end(); ++it)
88 delete it->second;
90 delete [] buffer;
92 azdhtmodule = 0;
94 /* }}} */
96 /* {{{ AzDHTModule::connected(Tairon::Net::Socket *) */
97 void AzDHTModule::connected(Tairon::Net::Socket *)
99 INFO("Connected to AzDHT");
100 offset = 0;
101 readMethod = &AzDHTModule::readMessageLength;
103 socket->connectedSignal.clear();
104 socket->readyReadSignal.connect(Tairon::Core::threadMethodDFunctor(Tairon::Core::Thread::current(), this, &AzDHTModule::readyRead));
105 socket->ready();
107 /* }}} */
109 /* {{{ AzDHTModule::connectionError() */
110 void AzDHTModule::connectionError()
112 INFO("Cannot connect to AzDHT program, retrying");
114 socket->close();
115 socket = 0;
116 readMethod = 0;
118 timer->start(60 * 1000, true); // reconnect in 1 minute
120 /* }}} */
122 /* {{{ AzDHTModule::enlargeBuffer(uint32_t) */
123 void AzDHTModule::enlargeBuffer(uint32_t size)
125 if (bufSize >= size) // nothing to do
126 return;
128 char *newBuf = new char[size];
129 memcpy(newBuf, buffer, bufSize);
130 delete [] buffer;
131 buffer = newBuf;
133 /* }}} */
135 /* {{{ AzDHTModule::get(const String &key, char flags) */
136 void AzDHTModule::get(const String &key, char flags)
138 if (!isConnected())
139 return; // do nothing
141 Tairent::Core::BEncode b(Tairent::Core::BEncode::MAP);
142 b["request"] = String("get");
143 b["key"] = key;
144 b["flags"] = flags;
146 sendQuery(b);
148 /* }}} */
150 /* {{{ AzDHTModule::isConnected() */
151 bool AzDHTModule::isConnected()
153 return readMethod != 0;
155 /* }}} */
157 /* {{{ AzDHTModule::processMessage(const Tairent::Core::BEncode &) */
158 void AzDHTModule::processMessage(const Tairent::Core::BEncode &b)
160 String response;
161 String key;
163 try {
164 response = b["response"].asString();
165 key = b["key"].asString();
166 b["values"]; // just make sure that there is such item
167 } catch (const Tairent::Core::BEncodeException &e) {
168 WARNING((const char *) String("Error while processing server's response: " + (const String &) e));
169 return;
172 if (handlers.count(key))
173 (*handlers[key])(key, b["values"]);
175 /* }}} */
177 /* {{{ AzDHTModule::put(const String &, const String &, char) */
178 void AzDHTModule::put(const String &key, const String &value, char flags)
180 if (!isConnected())
181 return; // do nothing
183 Tairent::Core::BEncode b(Tairent::Core::BEncode::MAP);
184 b["request"] = String("put");
185 b["key"] = key;
186 b["value"] = value;
187 b["flags"] = flags;
189 sendQuery(b);
191 /* }}} */
193 /* {{{ AzDHTModule::readMessage() */
194 void AzDHTModule::readMessage()
196 offset += socket->readTo(buffer + offset, msgLength - offset);
198 if (offset < msgLength) // incomplete message
199 return;
201 offset = 0;
202 readMethod = &AzDHTModule::readMessageLength;
204 std::stringstream s(String(buffer, msgLength));
205 Tairent::Core::BEncode b;
206 try {
207 s >> b;
208 } catch (const Tairent::Core::BEncode &e) { // shouldn't happen
209 WARNING((const char *) String("AzDHT sent corrupted bencode: " + (const String &) e));
210 connectionError();
211 return;
214 processMessage(b);
216 /* }}} */
218 /* {{{ AzDHTModule::readMessageLength() */
219 void AzDHTModule::readMessageLength()
221 offset += socket->readTo(buffer + offset, 4 - offset);
223 if (offset < 4) // incomplete length
224 return;
226 msgLength = ntohl(*(uint32_t *) buffer);
227 enlargeBuffer(msgLength);
229 offset = 0;
230 readMethod = &AzDHTModule::readMessage;
232 /* }}} */
234 /* {{{ AzDHTModule::readyRead(Tairon::Net::Socket *) */
235 void AzDHTModule::readyRead(Tairon::Net::Socket *)
237 try {
238 (this->*readMethod)();
239 } catch (const Tairon::Net::SocketException &) { // connection has been closed
240 connectionError();
243 /* }}} */
245 /* {{{ AzDHTModule::reconnect() */
246 void AzDHTModule::reconnect()
248 Tairon::Core::Thread *current = Tairon::Core::Thread::current();
250 socket = new Tairon::Net::Socket(Tairon::Net::Socket::IPv4, Tairon::Net::Socket::Stream);
251 socket->connectedSignal.connect(Tairon::Core::threadMethodDFunctor(current, this, &AzDHTModule::connected));
252 socket->errorSignal.connect(Tairon::Core::threadMethodDFunctor(current, this, &AzDHTModule::socketError));
254 try {
255 socket->connect(address, port);
256 } catch (const Tairon::Net::SocketException &) {
257 connectionError();
260 /* }}} */
262 /* {{{ AzDHTModule::registerHandler(const String &, Tairon::Core::Functor2<void, const String &, const Tairent::Core::BEncode &> *) */
263 void AzDHTModule::registerHandler(const String &key, Tairon::Core::Functor2<void, const String &, const Tairent::Core::BEncode &> *handler)
265 if (handlers.count(key))
266 delete handlers[key];
267 handlers[key] = handler;
269 /* }}} */
271 /* {{{ AzDHTModule::remove(const String &) */
272 void AzDHTModule::remove(const String &key)
274 if (!isConnected())
275 return; // do nothing
277 Tairent::Core::BEncode b(Tairent::Core::BEncode::MAP);
278 b["request"] = String("remove");
279 b["key"] = key;
281 sendQuery(b);
283 /* }}} */
285 /* {{{ AzDHTModule::sendQuery(const Tairent::Core::BEncode &) */
286 void AzDHTModule::sendQuery(const Tairent::Core::BEncode &query)
288 std::stringstream s;
289 s << query;
290 String data;
291 data.reserve(4 + s.str().length());
293 uint32_t length = htonl(s.str().length());
294 data = String((const char * ) &length, 4);
295 data += s.str();
297 try {
298 socket->write(data, true);
299 } catch (const Tairon::Net::SocketException &) {
300 connectionError();
303 /* }}} */
305 /* {{{ AzDHTModule::socketError(Tairon::Net::Socket *, int) */
306 void AzDHTModule::socketError(Tairon::Net::Socket *, int err)
308 connectionError();
310 /* }}} */
312 }; // namespace AzDHT
314 }; // namespace Tairent
316 EXPORT_MODULE(azdht, Tairent::AzDHT::AzDHTModule)
318 // vim: ai sw=4 ts=4 noet fdm=marker