2 # Copyright (c) 2014-2017 The Bitcoin Core developers
3 # Distributed under the MIT software license, see the accompanying
4 # file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 """Linux network utilities.
7 Roughly based on http://voorloopnul.com/blog/a-python-netstat-in-less-than-100-lines-of-code/ by Ricardo Pascal
16 from binascii
import unhexlify
, hexlify
18 # STATE_ESTABLISHED = '01'
19 # STATE_SYN_SENT = '02'
20 # STATE_SYN_RECV = '03'
21 # STATE_FIN_WAIT1 = '04'
22 # STATE_FIN_WAIT2 = '05'
23 # STATE_TIME_WAIT = '06'
25 # STATE_CLOSE_WAIT = '08'
26 # STATE_LAST_ACK = '09'
28 # STATE_CLOSING = '0B'
30 def get_socket_inodes(pid
):
32 Get list of socket inodes for process pid.
34 base
= '/proc/%i/fd' % pid
36 for item
in os
.listdir(base
):
37 target
= os
.readlink(os
.path
.join(base
, item
))
38 if target
.startswith('socket:'):
39 inodes
.append(int(target
[8:-1]))
42 def _remove_empty(array
):
43 return [x
for x
in array
if x
!='']
45 def _convert_ip_port(array
):
46 host
,port
= array
.split(':')
47 # convert host from mangled-per-four-bytes form as used by kernel
48 host
= unhexlify(host
)
50 for x
in range(0, len(host
) // 4):
51 (val
,) = struct
.unpack('=I', host
[x
*4:(x
+1)*4])
52 host_out
+= '%08x' % val
54 return host_out
,int(port
,16)
56 def netstat(typ
='tcp'):
58 Function to return a list with status of tcp connections at linux systems
59 To get pid of all network process running on system, you must run this script
62 with
open('/proc/net/'+typ
,'r',encoding
='utf8') as f
:
63 content
= f
.readlines()
67 line_array
= _remove_empty(line
.split(' ')) # Split lines and remove empty spaces.
68 tcp_id
= line_array
[0]
69 l_addr
= _convert_ip_port(line_array
[1])
70 r_addr
= _convert_ip_port(line_array
[2])
72 inode
= int(line_array
[9]) # Need the inode to match with process pid.
73 nline
= [tcp_id
, l_addr
, r_addr
, state
, inode
]
77 def get_bind_addrs(pid
):
79 Get bind addresses as (host,port) tuples for process pid.
81 inodes
= get_socket_inodes(pid
)
83 for conn
in netstat('tcp') + netstat('tcp6'):
84 if conn
[3] == STATE_LISTEN
and conn
[4] in inodes
:
85 bind_addrs
.append(conn
[1])
88 # from: http://code.activestate.com/recipes/439093/
91 Return all interfaces that are up
93 is_64bits
= sys
.maxsize
> 2**32
94 struct_size
= 40 if is_64bits
else 32
95 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
96 max_possible
= 8 # initial value
98 bytes
= max_possible
* struct_size
99 names
= array
.array('B', b
'\0' * bytes
)
100 outbytes
= struct
.unpack('iL', fcntl
.ioctl(
102 0x8912, # SIOCGIFCONF
103 struct
.pack('iL', bytes
, names
.buffer_info()[0])
105 if outbytes
== bytes
:
109 namestr
= names
.tostring()
110 return [(namestr
[i
:i
+16].split(b
'\0', 1)[0],
111 socket
.inet_ntoa(namestr
[i
+20:i
+24]))
112 for i
in range(0, outbytes
, struct_size
)]
114 def addr_to_hex(addr
):
116 Convert string IPv4 or IPv6 address to binary address as returned by
118 Very naive implementation that certainly doesn't work for all IPv6 variants.
120 if '.' in addr
: # IPv4
121 addr
= [int(x
) for x
in addr
.split('.')]
122 elif ':' in addr
: # IPv6
123 sub
= [[], []] # prefix, suffix
125 addr
= addr
.split(':')
126 for i
,comp
in enumerate(addr
):
128 if i
== 0 or i
== (len(addr
)-1): # skip empty component at beginning or end
130 x
+= 1 # :: skips to suffix
132 else: # two bytes per component
134 sub
[x
].append(val
>> 8)
135 sub
[x
].append(val
& 0xff)
136 nullbytes
= 16 - len(sub
[0]) - len(sub
[1])
137 assert((x
== 0 and nullbytes
== 0) or (x
== 1 and nullbytes
> 0))
138 addr
= sub
[0] + ([0] * nullbytes
) + sub
[1]
140 raise ValueError('Could not parse address %s' % addr
)
141 return hexlify(bytearray(addr
)).decode('ascii')
143 def test_ipv6_local():
145 Check for (local) IPv6 support.
148 # By using SOCK_DGRAM this will not actually make a connection, but it will
149 # fail if there is no route to IPv6 localhost.
152 s
= socket
.socket(socket
.AF_INET6
, socket
.SOCK_DGRAM
)
153 s
.connect(('::1', 0))