Fix: 1.16.5 is erroneously marked as unsupported
[pyCraft.git] / tests / test_encryption.py
blobc23433967b3b3cbc6cdd1594dd9520df8478a29d
1 import os
2 import unittest
3 import hashlib
4 from io import BytesIO
5 from minecraft.networking.encryption import (
6 minecraft_sha1_hash_digest,
7 encrypt_token_and_secret,
8 generate_shared_secret,
9 generate_verification_hash,
10 create_AES_cipher,
11 EncryptedFileObjectWrapper,
12 EncryptedSocketWrapper
14 from minecraft.networking.packets import clientbound
15 from tests import test_connection
17 from cryptography.hazmat.backends import default_backend
18 from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
19 from cryptography.hazmat.primitives.serialization import load_der_private_key
21 KEY_LOCATION = os.path.join(os.path.dirname(os.path.realpath(__file__)),
22 "encryption")
25 def setUpModule():
26 global private_key, public_key, token
28 with open(os.path.join(KEY_LOCATION, "priv_key.bin"), "rb") as f:
29 private_key = f.read()
30 private_key = load_der_private_key(private_key, None, default_backend())
32 with open(os.path.join(KEY_LOCATION, "pub_key.bin"), "rb") as f:
33 public_key = f.read()
35 token = generate_shared_secret()
38 def tearDownModule():
39 global private_key, public_key, token
40 del private_key, public_key, token
43 class Hashing(unittest.TestCase):
44 test_data = {'Notch': '4ed1f46bbe04bc756bcb17c0c7ce3e4632f06a48',
45 'jeb_': '-7c9d5b0044c130109a5d7b5fb5c317c02b4e28c1',
46 'simon': '88e16a1019277b15d58faf0541e11910eb756f6'}
48 def test_hashing(self):
49 for input_value, result in self.test_data.items():
50 sha1_hash = hashlib.sha1()
51 sha1_hash.update(input_value.encode('utf-8'))
52 self.assertEquals(minecraft_sha1_hash_digest(sha1_hash), result)
55 class Encryption(unittest.TestCase):
57 def test_token_secret_encryption(self):
58 secret = generate_shared_secret()
59 encrypted_token, encrypted_secret = \
60 encrypt_token_and_secret(public_key, token, secret)
61 decrypted_token = private_key.decrypt(encrypted_token, PKCS1v15())
62 decrypted_secret = private_key.decrypt(encrypted_secret, PKCS1v15())
64 self.assertEquals(token, decrypted_token)
65 self.assertEquals(secret, decrypted_secret)
67 def test_generate_hash(self):
68 verification_hash = generate_verification_hash(
69 u"", "secret".encode('utf-8'), public_key)
70 self.assertEquals("1f142e737a84a974a5f2a22f6174a78d80fd97f5",
71 verification_hash)
73 def test_file_object_wrapper(self):
74 cipher = create_AES_cipher(generate_shared_secret())
75 encryptor = cipher.encryptor()
76 decryptor = cipher.decryptor()
78 test_data = "hello".encode('utf-8')
79 io = BytesIO()
80 io.write(encryptor.update(test_data))
81 io.seek(0)
83 file_object_wrapper = EncryptedFileObjectWrapper(io, decryptor)
84 decrypted_data = file_object_wrapper.read(len(test_data))
86 self.assertEqual(test_data, decrypted_data)
88 def test_socket_wrapper(self):
89 secret = generate_shared_secret()
91 cipher = create_AES_cipher(secret)
92 encryptor = cipher.encryptor()
93 decryptor = cipher.decryptor()
95 server_cipher = create_AES_cipher(secret)
96 server_encryptor = server_cipher.encryptor()
97 server_decryptor = server_cipher.decryptor()
99 mock_socket = MockSocket(server_encryptor, server_decryptor)
100 wrapper = EncryptedSocketWrapper(mock_socket, encryptor, decryptor)
102 self.assertEqual(wrapper.fileno(), 0)
104 # Ensure that the 12 bytes we receive are the same as the 12 bytes
105 # sent by the server, after undergoing encryption
106 self.assertEqual(wrapper.recv(12), mock_socket.raw_data[:12])
108 # Ensure that hello reaches the server properly after undergoing
109 # encryption
110 test_data = "hello".encode('utf-8')
111 wrapper.send(test_data)
112 self.assertEqual(test_data, mock_socket.received)
115 class EncryptedConnection(test_connection.ConnectTest):
116 def test_connect(self):
117 self._test_connect(private_key=private_key,
118 public_key_bytes=public_key)
120 def _start_client(self, client):
121 def handle_login_success(_packet):
122 assert isinstance(client.socket, EncryptedSocketWrapper)
123 assert isinstance(client.file_object, EncryptedFileObjectWrapper)
124 client.register_packet_listener(
125 handle_login_success, clientbound.login.LoginSuccessPacket)
126 super(EncryptedConnection, self)._start_client(client)
129 class EncryptedCompressedConnection(EncryptedConnection,
130 test_connection.ConnectCompressionLowTest):
131 pass
134 # Regression test for <https://github.com/ammaraskar/pyCraft/issues/109>.
135 class EncryptedCompressedReconnect(test_connection.ReconnectTest,
136 EncryptedCompressedConnection):
137 pass
140 class MockSocket(object):
142 def __init__(self, encryptor, decryptor):
143 self.raw_data = os.urandom(100)
144 self.encryptor = encryptor
145 self.decryptor = decryptor
146 self.received = None
148 # when we receive data from the server
149 # it'll be encrypted
150 def recv(self, length):
151 return self.encryptor.update(self.raw_data[:length])
153 # decrypt the data as it reaches
154 # the server side
155 def send(self, data):
156 self.received = self.decryptor.update(data)
158 def fileno(self):
159 return 0