2 # Copyright (c) 2010 ArtForz -- public domain half-a-node
3 # Copyright (c) 2012 Jeff Garzik
4 # Copyright (c) 2010-2017 The Bitcoin Core developers
5 # Distributed under the MIT software license, see the accompanying
6 # file COPYING or http://www.opensource.org/licenses/mit-license.php.
7 """Bitcoin P2P network half-a-node.
9 This python code was modified from ArtForz' public domain half-a-node, as
10 found in the mini-node branch of http://github.com/jgarzik/pynode.
12 P2PConnection: A low-level connection object to a node's P2P interface
13 P2PInterface: A high-level interface object for communicating to a node over P2P"""
15 from collections
import defaultdict
16 from io
import BytesIO
23 from test_framework
.messages
import *
24 from test_framework
.util
import wait_until
26 logger
= logging
.getLogger("TestFramework.mininode")
31 b
"blocktxn": msg_blocktxn
,
32 b
"cmpctblock": msg_cmpctblock
,
33 b
"feefilter": msg_feefilter
,
34 b
"getaddr": msg_getaddr
,
35 b
"getblocks": msg_getblocks
,
36 b
"getblocktxn": msg_getblocktxn
,
37 b
"getdata": msg_getdata
,
38 b
"getheaders": msg_getheaders
,
39 b
"headers": msg_headers
,
41 b
"mempool": msg_mempool
,
44 b
"reject": msg_reject
,
45 b
"sendcmpct": msg_sendcmpct
,
46 b
"sendheaders": msg_sendheaders
,
48 b
"verack": msg_verack
,
49 b
"version": msg_version
,
53 "mainnet": b
"\xf9\xbe\xb4\xd9", # mainnet
54 "testnet3": b
"\x0b\x11\x09\x07", # testnet3
55 "regtest": b
"\xfa\xbf\xb5\xda", # regtest
58 class P2PConnection(asyncore
.dispatcher
):
59 """A low-level connection object to a node's P2P interface.
61 This class is responsible for:
63 - opening and closing the TCP connection to the node
64 - reading bytes from and writing bytes to the socket
65 - deserializing and serializing the P2P message header
66 - logging messages as they are sent and received
68 This class contains no logic for handing the P2P message payloads. It must be
69 sub-classed and the on_message() callback overridden."""
72 # All P2PConnections must be created before starting the NetworkThread.
73 # assert that the network thread is not running.
74 assert not network_thread_running()
76 super().__init
__(map=mininode_socket_map
)
78 def peer_connect(self
, dstaddr
, dstport
, net
="regtest"):
79 self
.dstaddr
= dstaddr
80 self
.dstport
= dstport
81 self
.create_socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
82 self
.socket
.setsockopt(socket
.IPPROTO_TCP
, socket
.TCP_NODELAY
, 1)
85 self
.state
= "connecting"
87 self
.disconnect
= False
89 logger
.info('Connecting to Bitcoin Node: %s:%d' % (self
.dstaddr
, self
.dstport
))
92 self
.connect((dstaddr
, dstport
))
96 def peer_disconnect(self
):
97 # Connection could have already been closed by other end.
98 if self
.state
== "connected":
99 self
.disconnect_node()
101 # Connection and disconnection methods
103 def handle_connect(self
):
104 """asyncore callback when a connection is opened."""
105 if self
.state
!= "connected":
106 logger
.debug("Connected & Listening: %s:%d" % (self
.dstaddr
, self
.dstport
))
107 self
.state
= "connected"
110 def handle_close(self
):
111 """asyncore callback when a connection is closed."""
112 logger
.debug("Closing connection to: %s:%d" % (self
.dstaddr
, self
.dstport
))
113 self
.state
= "closed"
122 def disconnect_node(self
):
123 """Disconnect the p2p connection.
125 Called by the test logic thread. Causes the p2p connection
126 to be disconnected on the next iteration of the asyncore loop."""
127 self
.disconnect
= True
129 # Socket read methods
131 def handle_read(self
):
132 """asyncore callback when data is read from the socket."""
139 """Try to read P2P messages from the recv buffer.
141 This method reads data from the buffer in a loop. It deserializes,
142 parses and verifies the P2P header, then passes the P2P payload to
143 the on_message callback for processing."""
146 if len(self
.recvbuf
) < 4:
148 if self
.recvbuf
[:4] != MAGIC_BYTES
[self
.network
]:
149 raise ValueError("got garbage %s" % repr(self
.recvbuf
))
150 if len(self
.recvbuf
) < 4 + 12 + 4 + 4:
152 command
= self
.recvbuf
[4:4+12].split(b
"\x00", 1)[0]
153 msglen
= struct
.unpack("<i", self
.recvbuf
[4+12:4+12+4])[0]
154 checksum
= self
.recvbuf
[4+12+4:4+12+4+4]
155 if len(self
.recvbuf
) < 4 + 12 + 4 + 4 + msglen
:
157 msg
= self
.recvbuf
[4+12+4+4:4+12+4+4+msglen
]
160 if checksum
!= h
[:4]:
161 raise ValueError("got bad checksum " + repr(self
.recvbuf
))
162 self
.recvbuf
= self
.recvbuf
[4+12+4+4+msglen
:]
163 if command
not in MESSAGEMAP
:
164 raise ValueError("Received unknown command from %s:%d: '%s' %s" % (self
.dstaddr
, self
.dstport
, command
, repr(msg
)))
166 t
= MESSAGEMAP
[command
]()
168 self
._log
_message
("receive", t
)
170 except Exception as e
:
171 logger
.exception('Error reading message:', repr(e
))
174 def on_message(self
, message
):
175 """Callback for processing a P2P payload. Must be overridden by derived class."""
176 raise NotImplementedError
178 # Socket write methods
181 """asyncore method to determine whether the handle_write() callback should be called on the next loop."""
183 pre_connection
= self
.state
== "connecting"
184 length
= len(self
.sendbuf
)
185 return (length
> 0 or pre_connection
)
187 def handle_write(self
):
188 """asyncore callback when data should be written to the socket."""
190 # asyncore does not expose socket connection, only the first read/write
191 # event, thus we must check connection manually here to know when we
193 if self
.state
== "connecting":
194 self
.handle_connect()
195 if not self
.writable():
199 sent
= self
.send(self
.sendbuf
)
203 self
.sendbuf
= self
.sendbuf
[sent
:]
205 def send_message(self
, message
, pushbuf
=False):
206 """Send a P2P message over the socket.
208 This method takes a P2P payload, builds the P2P header and adds
209 the message to the send buffer to be sent over the socket."""
210 if self
.state
!= "connected" and not pushbuf
:
211 raise IOError('Not connected, no pushbuf')
212 self
._log
_message
("send", message
)
213 command
= message
.command
214 data
= message
.serialize()
215 tmsg
= MAGIC_BYTES
[self
.network
]
217 tmsg
+= b
"\x00" * (12 - len(command
))
218 tmsg
+= struct
.pack("<I", len(data
))
224 if (len(self
.sendbuf
) == 0 and not pushbuf
):
226 sent
= self
.send(tmsg
)
227 self
.sendbuf
= tmsg
[sent
:]
228 except BlockingIOError
:
233 # Class utility methods
235 def _log_message(self
, direction
, msg
):
236 """Logs a message being sent or received over the connection."""
237 if direction
== "send":
238 log_message
= "Send message to "
239 elif direction
== "receive":
240 log_message
= "Received message from "
241 log_message
+= "%s:%d: %s" % (self
.dstaddr
, self
.dstport
, repr(msg
)[:500])
242 if len(log_message
) > 500:
243 log_message
+= "... (msg truncated)"
244 logger
.debug(log_message
)
247 class P2PInterface(P2PConnection
):
248 """A high-level P2P interface class for communicating with a Bitcoin node.
250 This class provides high-level callbacks for processing P2P message
251 payloads, as well as convenience methods for interacting with the
254 Individual testcases should subclass this and override the on_* methods
255 if they want to alter message handling behaviour."""
259 # Track number of messages of each type received and the most recent
260 # message of each type
261 self
.message_count
= defaultdict(int)
262 self
.last_message
= {}
264 # A count of the number of ping messages we've sent to the node
265 self
.ping_counter
= 1
267 # The network services received from the peer
270 def peer_connect(self
, *args
, services
=NODE_NETWORK|NODE_WITNESS
, send_version
=True, **kwargs
):
271 super().peer_connect(*args
, **kwargs
)
276 vt
.nServices
= services
277 vt
.addrTo
.ip
= self
.dstaddr
278 vt
.addrTo
.port
= self
.dstport
279 vt
.addrFrom
.ip
= "0.0.0.0"
281 self
.send_message(vt
, True)
283 # Message receiving methods
285 def on_message(self
, message
):
286 """Receive message and dispatch message to appropriate callback.
288 We keep a count of how many of each message type has been received
289 and the most recent message of each type."""
292 command
= message
.command
.decode('ascii')
293 self
.message_count
[command
] += 1
294 self
.last_message
[command
] = message
295 getattr(self
, 'on_' + command
)(message
)
297 print("ERROR delivering %s (%s)" % (repr(message
), sys
.exc_info()[0]))
300 # Callback methods. Can be overridden by subclasses in individual test
301 # cases to provide custom message handling behaviour.
309 def on_addr(self
, message
): pass
310 def on_block(self
, message
): pass
311 def on_blocktxn(self
, message
): pass
312 def on_cmpctblock(self
, message
): pass
313 def on_feefilter(self
, message
): pass
314 def on_getaddr(self
, message
): pass
315 def on_getblocks(self
, message
): pass
316 def on_getblocktxn(self
, message
): pass
317 def on_getdata(self
, message
): pass
318 def on_getheaders(self
, message
): pass
319 def on_headers(self
, message
): pass
320 def on_mempool(self
, message
): pass
321 def on_pong(self
, message
): pass
322 def on_reject(self
, message
): pass
323 def on_sendcmpct(self
, message
): pass
324 def on_sendheaders(self
, message
): pass
325 def on_tx(self
, message
): pass
327 def on_inv(self
, message
):
329 for i
in message
.inv
:
333 self
.send_message(want
)
335 def on_ping(self
, message
):
336 self
.send_message(msg_pong(message
.nonce
))
338 def on_verack(self
, message
):
339 self
.verack_received
= True
341 def on_version(self
, message
):
342 assert message
.nVersion
>= MIN_VERSION_SUPPORTED
, "Version {} received. Test framework only supports versions greater than {}".format(message
.nVersion
, MIN_VERSION_SUPPORTED
)
343 self
.send_message(msg_verack())
344 self
.nServices
= message
.nServices
346 # Connection helper methods
348 def wait_for_disconnect(self
, timeout
=60):
349 test_function
= lambda: self
.state
!= "connected"
350 wait_until(test_function
, timeout
=timeout
, lock
=mininode_lock
)
352 # Message receiving helper methods
354 def wait_for_block(self
, blockhash
, timeout
=60):
355 test_function
= lambda: self
.last_message
.get("block") and self
.last_message
["block"].block
.rehash() == blockhash
356 wait_until(test_function
, timeout
=timeout
, lock
=mininode_lock
)
358 def wait_for_getdata(self
, timeout
=60):
359 test_function
= lambda: self
.last_message
.get("getdata")
360 wait_until(test_function
, timeout
=timeout
, lock
=mininode_lock
)
362 def wait_for_getheaders(self
, timeout
=60):
363 test_function
= lambda: self
.last_message
.get("getheaders")
364 wait_until(test_function
, timeout
=timeout
, lock
=mininode_lock
)
366 def wait_for_inv(self
, expected_inv
, timeout
=60):
367 """Waits for an INV message and checks that the first inv object in the message was as expected."""
368 if len(expected_inv
) > 1:
369 raise NotImplementedError("wait_for_inv() will only verify the first inv object")
370 test_function
= lambda: self
.last_message
.get("inv") and \
371 self
.last_message
["inv"].inv
[0].type == expected_inv
[0].type and \
372 self
.last_message
["inv"].inv
[0].hash == expected_inv
[0].hash
373 wait_until(test_function
, timeout
=timeout
, lock
=mininode_lock
)
375 def wait_for_verack(self
, timeout
=60):
376 test_function
= lambda: self
.message_count
["verack"]
377 wait_until(test_function
, timeout
=timeout
, lock
=mininode_lock
)
379 # Message sending helper functions
381 def send_and_ping(self
, message
):
382 self
.send_message(message
)
383 self
.sync_with_ping()
385 # Sync up with the node
386 def sync_with_ping(self
, timeout
=60):
387 self
.send_message(msg_ping(nonce
=self
.ping_counter
))
388 test_function
= lambda: self
.last_message
.get("pong") and self
.last_message
["pong"].nonce
== self
.ping_counter
389 wait_until(test_function
, timeout
=timeout
, lock
=mininode_lock
)
390 self
.ping_counter
+= 1
393 # Keep our own socket map for asyncore, so that we can track disconnects
394 # ourselves (to workaround an issue with closing an asyncore socket when
396 mininode_socket_map
= dict()
398 # One lock for synchronizing all data access between the networking thread (see
399 # NetworkThread below) and the thread running the test logic. For simplicity,
400 # P2PConnection acquires this lock whenever delivering a message to a P2PInterface,
401 # and whenever adding anything to the send buffer (in send_message()). This
402 # lock should be acquired in the thread running the test logic to synchronize
403 # access to any data shared with the P2PInterface or P2PConnection.
404 mininode_lock
= threading
.RLock()
406 class NetworkThread(threading
.Thread
):
408 super().__init
__(name
="NetworkThread")
411 while mininode_socket_map
:
412 # We check for whether to disconnect outside of the asyncore
413 # loop to workaround the behavior of asyncore when using
416 for fd
, obj
in mininode_socket_map
.items():
418 disconnected
.append(obj
)
419 [obj
.handle_close() for obj
in disconnected
]
420 asyncore
.loop(0.1, use_poll
=True, map=mininode_socket_map
, count
=1)
421 logger
.debug("Network thread closing")
423 def network_thread_start():
424 """Start the network thread."""
425 # Only one network thread may run at a time
426 assert not network_thread_running()
428 NetworkThread().start()
430 def network_thread_running():
431 """Return whether the network thread is running."""
432 return any([thread
.name
== "NetworkThread" for thread
in threading
.enumerate()])
434 def network_thread_join(timeout
=10):
435 """Wait timeout seconds for the network thread to terminate.
437 Throw if the network thread doesn't terminate in timeout seconds."""
438 network_threads
= [thread
for thread
in threading
.enumerate() if thread
.name
== "NetworkThread"]
439 assert len(network_threads
) <= 1
440 for thread
in network_threads
:
442 assert not thread
.is_alive()