1 /***************************************************************************
3 * Copyright (C) 2007 David Brodsky *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License as *
7 * published by the Free Software Foundation and appearing *
8 * in the file LICENSE.GPL included in the packaging of this file. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
13 * General Public License for more details. *
15 ***************************************************************************/
17 #include <arpa/inet.h>
21 #include <tairon/core/config.h>
22 #include <tairon/core/log.h>
23 #include <tairon/core/thread.h>
24 #include <tairon/net/socket.h>
25 #include <tairon/net/timer.h>
27 #include "azdhtmodule.h"
29 #include "core/bencode.h"
30 #include "main/torrentmanager.h"
38 AzDHTModule
*AzDHTModule::azdhtmodule
= 0;
40 /* {{{ AzDHTModule::AzDHTModule() */
41 AzDHTModule::AzDHTModule() : Tairon::Core::Module(), bufSize(1024), readMethod(0)
43 buffer
= new char[bufSize
];
45 Tairon::Core::Thread
*current
= Tairon::Core::Thread::current();
47 timer
= new Tairon::Net::Timer();
48 timer
->timeoutSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &AzDHTModule::reconnect
));
50 socket
= new Tairon::Net::Socket(Tairon::Net::Socket::IPv4
, Tairon::Net::Socket::Stream
);
51 socket
->connectedSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &AzDHTModule::connected
));
52 socket
->errorSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &AzDHTModule::socketError
));
55 address
= (*Tairon::Core::Config::self())["azdht-address"];
56 } catch (const Tairon::Core::KeyException
&) {
57 INFO("Configuration is missing azdht-address record, using default: 127.0.0.1");
58 address
= "127.0.0.1";
62 port
= atoi((*Tairon::Core::Config::self())["azdht-port"]);
63 } catch (const Tairon::Core::KeyException
&) {
64 INFO("Configuration is missing azdht-port record, using default: 7994");
69 socket
->connect(address
, port
);
70 } catch (const Tairon::Net::SocketException
&) {
71 INFO("Cannot connect to AzDHT program, retrying");
79 /* {{{ AzDHTModule::~AzDHTModule() */
80 AzDHTModule::~AzDHTModule()
87 for (std::map
<String
, Tairon::Core::Functor2
<void, const String
&, const Tairent::Core::BEncode
&> *>::iterator it
= handlers
.begin(); it
!= handlers
.end(); ++it
)
96 /* {{{ AzDHTModule::connected(Tairon::Net::Socket *) */
97 void AzDHTModule::connected(Tairon::Net::Socket
*)
99 INFO("Connected to AzDHT");
101 readMethod
= &AzDHTModule::readMessageLength
;
103 socket
->connectedSignal
.clear();
104 socket
->readyReadSignal
.connect(Tairon::Core::threadMethodDFunctor(Tairon::Core::Thread::current(), this, &AzDHTModule::readyRead
));
109 /* {{{ AzDHTModule::connectionError() */
110 void AzDHTModule::connectionError()
112 INFO("Cannot connect to AzDHT program, retrying");
118 timer
->start(60 * 1000, true); // reconnect in 1 minute
122 /* {{{ AzDHTModule::enlargeBuffer(uint32_t) */
123 void AzDHTModule::enlargeBuffer(uint32_t size
)
125 if (bufSize
>= size
) // nothing to do
128 char *newBuf
= new char[size
];
129 memcpy(newBuf
, buffer
, bufSize
);
135 /* {{{ AzDHTModule::get(const String &key, char flags) */
136 void AzDHTModule::get(const String
&key
, char flags
)
139 return; // do nothing
141 Tairent::Core::BEncode
b(Tairent::Core::BEncode::MAP
);
142 b
["request"] = String("get");
150 /* {{{ AzDHTModule::isConnected() */
151 bool AzDHTModule::isConnected()
153 return readMethod
!= 0;
157 /* {{{ AzDHTModule::processMessage(const Tairent::Core::BEncode &) */
158 void AzDHTModule::processMessage(const Tairent::Core::BEncode
&b
)
164 response
= b
["response"].asString();
165 key
= b
["key"].asString();
166 b
["values"]; // just make sure that there is such item
167 } catch (const Tairent::Core::BEncodeException
&e
) {
168 WARNING((const char *) String("Error while processing server's response: " + (const String
&) e
));
172 if (handlers
.count(key
))
173 (*handlers
[key
])(key
, b
["values"]);
177 /* {{{ AzDHTModule::put(const String &, const String &, char) */
178 void AzDHTModule::put(const String
&key
, const String
&value
, char flags
)
181 return; // do nothing
183 Tairent::Core::BEncode
b(Tairent::Core::BEncode::MAP
);
184 b
["request"] = String("put");
193 /* {{{ AzDHTModule::readMessage() */
194 void AzDHTModule::readMessage()
196 offset
+= socket
->readTo(buffer
+ offset
, msgLength
- offset
);
198 if (offset
< msgLength
) // incomplete message
202 readMethod
= &AzDHTModule::readMessageLength
;
204 std::stringstream
s(String(buffer
, msgLength
));
205 Tairent::Core::BEncode b
;
208 } catch (const Tairent::Core::BEncode
&e
) { // shouldn't happen
209 WARNING((const char *) String("AzDHT sent corrupted bencode: " + (const String
&) e
));
218 /* {{{ AzDHTModule::readMessageLength() */
219 void AzDHTModule::readMessageLength()
221 offset
+= socket
->readTo(buffer
+ offset
, 4 - offset
);
223 if (offset
< 4) // incomplete length
226 msgLength
= ntohl(*(uint32_t *) buffer
);
227 enlargeBuffer(msgLength
);
230 readMethod
= &AzDHTModule::readMessage
;
234 /* {{{ AzDHTModule::readyRead(Tairon::Net::Socket *) */
235 void AzDHTModule::readyRead(Tairon::Net::Socket
*)
238 (this->*readMethod
)();
239 } catch (const Tairon::Net::SocketException
&) { // connection has been closed
245 /* {{{ AzDHTModule::reconnect() */
246 void AzDHTModule::reconnect()
248 Tairon::Core::Thread
*current
= Tairon::Core::Thread::current();
250 socket
= new Tairon::Net::Socket(Tairon::Net::Socket::IPv4
, Tairon::Net::Socket::Stream
);
251 socket
->connectedSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &AzDHTModule::connected
));
252 socket
->errorSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &AzDHTModule::socketError
));
255 socket
->connect(address
, port
);
256 } catch (const Tairon::Net::SocketException
&) {
262 /* {{{ AzDHTModule::registerHandler(const String &, Tairon::Core::Functor2<void, const String &, const Tairent::Core::BEncode &> *) */
263 void AzDHTModule::registerHandler(const String
&key
, Tairon::Core::Functor2
<void, const String
&, const Tairent::Core::BEncode
&> *handler
)
265 if (handlers
.count(key
))
266 delete handlers
[key
];
267 handlers
[key
] = handler
;
271 /* {{{ AzDHTModule::remove(const String &) */
272 void AzDHTModule::remove(const String
&key
)
275 return; // do nothing
277 Tairent::Core::BEncode
b(Tairent::Core::BEncode::MAP
);
278 b
["request"] = String("remove");
285 /* {{{ AzDHTModule::sendQuery(const Tairent::Core::BEncode &) */
286 void AzDHTModule::sendQuery(const Tairent::Core::BEncode
&query
)
291 data
.reserve(4 + s
.str().length());
293 uint32_t length
= htonl(s
.str().length());
294 data
= String((const char * ) &length
, 4);
298 socket
->write(data
, true);
299 } catch (const Tairon::Net::SocketException
&) {
305 /* {{{ AzDHTModule::socketError(Tairon::Net::Socket *, int) */
306 void AzDHTModule::socketError(Tairon::Net::Socket
*, int err
)
312 }; // namespace AzDHT
314 }; // namespace Tairent
316 EXPORT_MODULE(azdht
, Tairent::AzDHT::AzDHTModule
)
318 // vim: ai sw=4 ts=4 noet fdm=marker