1 /***************************************************************************
3 * Copyright (C) 2007 David Brodsky *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License as *
7 * published by the Free Software Foundation and appearing *
8 * in the file LICENSE.GPL included in the packaging of this file. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
13 * General Public License for more details. *
15 ***************************************************************************/
17 #include <arpa/inet.h>
21 #include <tairon/core/config.h>
22 #include <tairon/core/log.h>
23 #include <tairon/core/thread.h>
24 #include <tairon/net/socket.h>
25 #include <tairon/net/timer.h>
27 #include "azdhtmodule.h"
29 #include "core/bencode.h"
30 #include "main/torrentmanager.h"
38 AzDHTModule
*AzDHTModule::azdhtmodule
= 0;
40 /* {{{ AzDHTModule::AzDHTModule() */
41 AzDHTModule::AzDHTModule() : Tairon::Core::Module(), bufSize(1024), readMethod(0)
43 buffer
= new char[bufSize
];
45 Tairon::Core::Thread
*current
= Tairon::Core::Thread::current();
47 timer
= new Tairon::Net::Timer();
48 timer
->timeoutSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &AzDHTModule::reconnect
));
50 socket
= new Tairon::Net::Socket(Tairon::Net::Socket::IPv4
, Tairon::Net::Socket::Stream
);
51 socket
->connectedSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &AzDHTModule::connected
));
52 socket
->errorSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &AzDHTModule::socketError
));
56 port
= atoi((*Tairon::Core::Config::self())["azdht-port"]);
57 } catch (const Tairon::Core::KeyException
&) {
58 INFO("Configuration is missing azdht-port record, using default: 7994");
63 socket
->connect("localhost", port
);
64 } catch (const Tairon::Net::SocketException
&) {
65 INFO("Cannot connect to AzDHT program, retrying");
73 /* {{{ AzDHTModule::~AzDHTModule */
74 AzDHTModule::~AzDHTModule()
81 for (std::map
<String
, Tairon::Core::Functor2
<void, const String
&, const Tairent::Core::BEncode
&> *>::iterator it
= handlers
.begin(); it
!= handlers
.end(); ++it
)
90 /* {{{ AzDHTModule::connected(Tairon::Net::Socket *) */
91 void AzDHTModule::connected(Tairon::Net::Socket
*)
94 readMethod
= &AzDHTModule::readMessageLength
;
96 socket
->connectedSignal
.clear();
97 socket
->readyReadSignal
.connect(Tairon::Core::threadMethodDFunctor(Tairon::Core::Thread::current(), this, &AzDHTModule::readyRead
));
102 /* {{{ AzDHTModule::connectionError() */
103 void AzDHTModule::connectionError()
109 timer
->start(60 * 1000, true); // reconnect in 1 minute
113 /* {{{ AzDHTModule::enlargeBuffer(uint32_t) */
114 void AzDHTModule::enlargeBuffer(uint32_t size
)
116 if (bufSize
>= size
) // nothing to do
119 char *newBuf
= new char[size
];
120 memcpy(newBuf
, buffer
, bufSize
);
126 /* {{{ AzDHTModule::get(const String &key, char flags) */
127 void AzDHTModule::get(const String
&key
, char flags
)
129 Tairent::Core::BEncode
b(Tairent::Core::BEncode::MAP
);
130 b
["request"] = String("get");
138 /* {{{ AzDHTModule::isConnected() */
139 bool AzDHTModule::isConnected()
141 return readMethod
!= 0;
145 /* {{{ AzDHTModule::processMessage(const Tairent::Core::BEncode &) */
146 void AzDHTModule::processMessage(const Tairent::Core::BEncode
&b
)
152 response
= b
["response"].asString();
153 key
= b
["key"].asString();
154 } catch (const Tairent::Core::BEncodeException
&e
) {
155 WARNING((const char *) String("Error while processing server's response: " + (const String
&) e
));
159 if (handlers
.count(key
))
160 (*handlers
[key
])(key
, b
["values"]);
164 /* {{{ AzDHTModule::put(const String &, const String &, char) */
165 void AzDHTModule::put(const String
&key
, const String
&value
, char flags
)
167 Tairent::Core::BEncode
b(Tairent::Core::BEncode::MAP
);
168 b
["request"] = String("put");
177 /* {{{ AzDHTModule::readMessage() */
178 void AzDHTModule::readMessage()
180 offset
+= socket
->readTo(buffer
+ offset
, msgLength
- offset
);
182 if (offset
< msgLength
) // incomplete message
186 readMethod
= &AzDHTModule::readMessageLength
;
188 std::stringstream
s(String(buffer
, msgLength
));
189 Tairent::Core::BEncode b
;
192 } catch (const Tairent::Core::BEncode
&e
) { // shouldn't happen
193 WARNING((const char *) String("AzDHT sent corrupted bencode: " + (const String
&) e
));
202 /* {{{ AzDHTModule::readMessageLength() */
203 void AzDHTModule::readMessageLength()
205 offset
+= socket
->readTo(buffer
+ offset
, 4 - offset
);
207 if (offset
< 4) // incomplete length
210 msgLength
= ntohl(*(uint32_t *) buffer
);
211 enlargeBuffer(msgLength
);
214 readMethod
= &AzDHTModule::readMessage
;
218 /* {{{ AzDHTModule::readyRead(Tairon::Net::Socket *) */
219 void AzDHTModule::readyRead(Tairon::Net::Socket
*)
222 (this->*readMethod
)();
223 } catch (const Tairon::Net::SocketException
&) { // connection has been closed
229 /* {{{ AzDHTModule::reconnect() */
230 void AzDHTModule::reconnect()
232 Tairon::Core::Thread
*current
= Tairon::Core::Thread::current();
234 socket
= new Tairon::Net::Socket(Tairon::Net::Socket::IPv4
, Tairon::Net::Socket::Stream
);
235 socket
->connectedSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &AzDHTModule::connected
));
236 socket
->errorSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &AzDHTModule::socketError
));
240 port
= atoi((*Tairon::Core::Config::self())["azdht-port"]);
241 } catch (const Tairon::Core::KeyException
&) {
246 socket
->connect("localhost", port
);
247 } catch (const Tairon::Net::SocketException
&) {
248 INFO("Cannot connect to AzDHT program, retrying");
255 /* {{{ AzDHTModule::registerHandler(const String &, Tairon::Core::Functor2<void, const String &, const Tairent::Core::BEncode &> *) */
256 void AzDHTModule::registerHandler(const String
&key
, Tairon::Core::Functor2
<void, const String
&, const Tairent::Core::BEncode
&> *handler
)
258 if (handlers
.count(key
))
259 delete handlers
[key
];
260 handlers
[key
] = handler
;
264 /* {{{ AzDHTModule::remove(const String &) */
265 void AzDHTModule::remove(const String
&key
)
267 Tairent::Core::BEncode
b(Tairent::Core::BEncode::MAP
);
268 b
["request"] = String("remove");
275 /* {{{ AzDHTModule::sendQuery(const Tairent::Core::BEncode &) */
276 void AzDHTModule::sendQuery(const Tairent::Core::BEncode
&query
)
281 data
.reserve(4 + s
.str().length());
283 uint32_t length
= htonl(s
.str().length());
284 data
= String((const char * ) &length
, 4);
288 socket
->write(data
, true);
289 } catch (const Tairon::Net::SocketException
&) {
295 /* {{{ AzDHTModule::socketError(Tairon::Net::Socket *, int) */
296 void AzDHTModule::socketError(Tairon::Net::Socket
*, int err
)
302 }; // namespace AzDHT
304 }; // namespace Tairent
306 EXPORT_MODULE(azdht
, Tairent::AzDHT::AzDHTModule
)
308 // vim: ai sw=4 ts=4 noet fdm=marker