5 from minecraft
.networking
.encryption
import (
6 minecraft_sha1_hash_digest
,
7 encrypt_token_and_secret
,
8 generate_shared_secret
,
9 generate_verification_hash
,
11 EncryptedFileObjectWrapper
,
12 EncryptedSocketWrapper
14 from minecraft
.networking
.packets
import clientbound
15 from tests
import test_connection
17 from cryptography
.hazmat
.backends
import default_backend
18 from cryptography
.hazmat
.primitives
.asymmetric
.padding
import PKCS1v15
19 from cryptography
.hazmat
.primitives
.serialization
import load_der_private_key
21 KEY_LOCATION
= os
.path
.join(os
.path
.dirname(os
.path
.realpath(__file__
)),
26 global private_key
, public_key
, token
28 with
open(os
.path
.join(KEY_LOCATION
, "priv_key.bin"), "rb") as f
:
29 private_key
= f
.read()
30 private_key
= load_der_private_key(private_key
, None, default_backend())
32 with
open(os
.path
.join(KEY_LOCATION
, "pub_key.bin"), "rb") as f
:
35 token
= generate_shared_secret()
39 global private_key
, public_key
, token
40 del private_key
, public_key
, token
43 class Hashing(unittest
.TestCase
):
44 test_data
= {'Notch': '4ed1f46bbe04bc756bcb17c0c7ce3e4632f06a48',
45 'jeb_': '-7c9d5b0044c130109a5d7b5fb5c317c02b4e28c1',
46 'simon': '88e16a1019277b15d58faf0541e11910eb756f6'}
48 def test_hashing(self
):
49 for input_value
, result
in self
.test_data
.items():
50 sha1_hash
= hashlib
.sha1()
51 sha1_hash
.update(input_value
.encode('utf-8'))
52 self
.assertEquals(minecraft_sha1_hash_digest(sha1_hash
), result
)
55 class Encryption(unittest
.TestCase
):
57 def test_token_secret_encryption(self
):
58 secret
= generate_shared_secret()
59 encrypted_token
, encrypted_secret
= \
60 encrypt_token_and_secret(public_key
, token
, secret
)
61 decrypted_token
= private_key
.decrypt(encrypted_token
, PKCS1v15())
62 decrypted_secret
= private_key
.decrypt(encrypted_secret
, PKCS1v15())
64 self
.assertEquals(token
, decrypted_token
)
65 self
.assertEquals(secret
, decrypted_secret
)
67 def test_generate_hash(self
):
68 verification_hash
= generate_verification_hash(
69 u
"", "secret".encode('utf-8'), public_key
)
70 self
.assertEquals("1f142e737a84a974a5f2a22f6174a78d80fd97f5",
73 def test_file_object_wrapper(self
):
74 cipher
= create_AES_cipher(generate_shared_secret())
75 encryptor
= cipher
.encryptor()
76 decryptor
= cipher
.decryptor()
78 test_data
= "hello".encode('utf-8')
80 io
.write(encryptor
.update(test_data
))
83 file_object_wrapper
= EncryptedFileObjectWrapper(io
, decryptor
)
84 decrypted_data
= file_object_wrapper
.read(len(test_data
))
86 self
.assertEqual(test_data
, decrypted_data
)
88 def test_socket_wrapper(self
):
89 secret
= generate_shared_secret()
91 cipher
= create_AES_cipher(secret
)
92 encryptor
= cipher
.encryptor()
93 decryptor
= cipher
.decryptor()
95 server_cipher
= create_AES_cipher(secret
)
96 server_encryptor
= server_cipher
.encryptor()
97 server_decryptor
= server_cipher
.decryptor()
99 mock_socket
= MockSocket(server_encryptor
, server_decryptor
)
100 wrapper
= EncryptedSocketWrapper(mock_socket
, encryptor
, decryptor
)
102 self
.assertEqual(wrapper
.fileno(), 0)
104 # Ensure that the 12 bytes we receive are the same as the 12 bytes
105 # sent by the server, after undergoing encryption
106 self
.assertEqual(wrapper
.recv(12), mock_socket
.raw_data
[:12])
108 # Ensure that hello reaches the server properly after undergoing
110 test_data
= "hello".encode('utf-8')
111 wrapper
.send(test_data
)
112 self
.assertEqual(test_data
, mock_socket
.received
)
115 class EncryptedConnection(test_connection
.ConnectTest
):
116 def test_connect(self
):
117 self
._test
_connect
(private_key
=private_key
,
118 public_key_bytes
=public_key
)
120 def _start_client(self
, client
):
121 def handle_login_success(_packet
):
122 assert isinstance(client
.socket
, EncryptedSocketWrapper
)
123 assert isinstance(client
.file_object
, EncryptedFileObjectWrapper
)
124 client
.register_packet_listener(
125 handle_login_success
, clientbound
.login
.LoginSuccessPacket
)
126 super(EncryptedConnection
, self
)._start
_client
(client
)
129 class EncryptedCompressedConnection(EncryptedConnection
,
130 test_connection
.ConnectCompressionLowTest
):
134 # Regression test for <https://github.com/ammaraskar/pyCraft/issues/109>.
135 class EncryptedCompressedReconnect(test_connection
.ReconnectTest
,
136 EncryptedCompressedConnection
):
140 class MockSocket(object):
142 def __init__(self
, encryptor
, decryptor
):
143 self
.raw_data
= os
.urandom(100)
144 self
.encryptor
= encryptor
145 self
.decryptor
= decryptor
148 # when we receive data from the server
150 def recv(self
, length
):
151 return self
.encryptor
.update(self
.raw_data
[:length
])
153 # decrypt the data as it reaches
155 def send(self
, data
):
156 self
.received
= self
.decryptor
.update(data
)