Fix: 1.16.5 is erroneously marked as unsupported
[pyCraft.git] / tests / test_packets.py
bloba59462b4a7ef374696c7f5823c9aa6702bb6c0c8
1 # -*- coding: utf-8 -*-
2 import unittest
3 import string
4 import logging
5 import struct
6 from zlib import decompress
7 from random import choice
9 from minecraft.utility import protocol_earlier
10 from minecraft import (
11 PRE, SUPPORTED_PROTOCOL_VERSIONS, RELEASE_PROTOCOL_VERSIONS,
13 from minecraft.networking.connection import ConnectionContext
14 from minecraft.networking.types import (
15 VarInt, Enum, Vector, PositionAndLook, OriginPoint,
17 from minecraft.networking.packets import (
18 Packet, PacketBuffer, PacketListener, KeepAlivePacket, serverbound,
19 clientbound
22 TEST_VERSIONS = list(RELEASE_PROTOCOL_VERSIONS)
23 if SUPPORTED_PROTOCOL_VERSIONS[-1] not in TEST_VERSIONS:
24 TEST_VERSIONS.append(SUPPORTED_PROTOCOL_VERSIONS[-1])
27 class PacketBufferTest(unittest.TestCase):
28 def test_basic_read_write(self):
29 message = b"hello"
31 packet_buffer = PacketBuffer()
32 packet_buffer.send(message)
34 packet_buffer.reset_cursor()
35 self.assertEqual(packet_buffer.read(), message)
36 packet_buffer.reset_cursor()
37 self.assertEqual(packet_buffer.recv(), message)
39 packet_buffer.reset()
40 self.assertNotEqual(packet_buffer.read(), message)
42 def test_get_writable(self):
43 message = b"hello"
45 packet_buffer = PacketBuffer()
46 packet_buffer.send(message)
48 self.assertEqual(packet_buffer.get_writable(), message)
51 class PacketSerializationTest(unittest.TestCase):
53 def test_packet(self):
54 for protocol_version in TEST_VERSIONS:
55 logging.debug('protocol_version = %r' % protocol_version)
56 context = ConnectionContext(protocol_version=protocol_version)
58 packet = serverbound.play.ChatPacket(context)
59 packet.message = u"κόσμε"
61 packet_buffer = PacketBuffer()
62 packet.write(packet_buffer)
64 packet_buffer.reset_cursor()
65 # Read the length and packet id
66 VarInt.read(packet_buffer)
67 packet_id = VarInt.read(packet_buffer)
68 self.assertEqual(packet_id, packet.id)
70 deserialized = serverbound.play.ChatPacket(context)
71 deserialized.read(packet_buffer)
73 self.assertEqual(packet.message, deserialized.message)
75 def test_compressed_packet(self):
76 for protocol_version in TEST_VERSIONS:
77 logging.debug('protocol_version = %r' % protocol_version)
78 context = ConnectionContext(protocol_version=protocol_version)
80 msg = ''.join(choice(string.ascii_lowercase) for i in range(500))
81 packet = serverbound.play.ChatPacket(context)
82 packet.message = msg
84 self.write_read_packet(packet, 20)
85 self.write_read_packet(packet, -1)
87 def write_read_packet(self, packet, compression_threshold):
88 for protocol_version in TEST_VERSIONS:
89 logging.debug('protocol_version = %r' % protocol_version)
90 context = ConnectionContext(protocol_version=protocol_version)
92 packet_buffer = PacketBuffer()
93 packet.write(packet_buffer, compression_threshold)
95 packet_buffer.reset_cursor()
97 VarInt.read(packet_buffer)
98 compressed_size = VarInt.read(packet_buffer)
100 if compressed_size > 0:
101 decompressed = decompress(packet_buffer.read(compressed_size))
102 packet_buffer.reset()
103 packet_buffer.send(decompressed)
104 packet_buffer.reset_cursor()
106 packet_id = VarInt.read(packet_buffer)
107 self.assertEqual(packet_id, packet.id)
109 deserialized = serverbound.play.ChatPacket(context)
110 deserialized.read(packet_buffer)
112 self.assertEqual(packet.message, deserialized.message)
115 class PacketListenerTest(unittest.TestCase):
117 def test_listener(self):
118 message = "hello world"
120 def test_packet(chat_packet):
121 self.assertEqual(chat_packet.message, message)
123 for protocol_version in TEST_VERSIONS:
124 logging.debug('protocol_version = %r' % protocol_version)
125 context = ConnectionContext(protocol_version=protocol_version)
127 listener = PacketListener(test_packet, serverbound.play.ChatPacket)
129 packet = serverbound.play.ChatPacket(context).set_values(
130 message=message)
131 uncalled_packet = KeepAlivePacket().set_values(keep_alive_id=0)
133 listener.call_packet(packet)
134 listener.call_packet(uncalled_packet)
137 class PacketEnumTest(unittest.TestCase):
138 def test_packet_str(self):
139 class ExamplePacket(Packet):
140 id = 0x00
141 packet_name = 'example'
142 definition = [
143 {'alpha': VarInt},
144 {'beta': VarInt},
145 {'gamma': VarInt}]
147 class Alpha(Enum):
148 ZERO = 0
150 class Beta(Enum):
151 ONE = 1
153 self.assertEqual(
154 str(ExamplePacket(ConnectionContext(), alpha=0, beta=0, gamma=0)),
155 '0x00 ExamplePacket(alpha=ZERO, beta=0, gamma=0)'
159 class TestReadWritePackets(unittest.TestCase):
160 maxDiff = None
162 def test_explosion_packet(self):
163 context = ConnectionContext(protocol_version=TEST_VERSIONS[-1])
164 Record = clientbound.play.ExplosionPacket.Record
165 packet = clientbound.play.ExplosionPacket(
166 position=Vector(787, -37, 0), radius=15,
167 records=[Record(-14, -116, -5), Record(-77, 34, -36),
168 Record(-35, -127, 95), Record(11, 113, -8)],
169 player_motion=Vector(4, 5, 0), context=context)
171 self.assertEqual(
172 str(packet),
173 '0x%02X ExplosionPacket(x=787, y=-37, z=0, radius=15, records=['
174 'Record(-14, -116, -5), Record(-77, 34, -36), '
175 'Record(-35, -127, 95), Record(11, 113, -8)], '
176 'player_motion_x=4, player_motion_y=5, player_motion_z=0)'
177 % packet.id
180 self._test_read_write_packet(packet)
182 def test_combat_event_packet(self):
183 packet = clientbound.play.CombatEventPacket(
184 event=clientbound.play.CombatEventPacket.EnterCombatEvent())
185 self.assertEqual(
186 str(packet),
187 'CombatEventPacket(event=EnterCombatEvent())'
189 self._test_read_write_packet(packet, vmax=PRE | 14)
190 with self.assertRaises(NotImplementedError):
191 self._test_read_write_packet(packet, vmin=PRE | 15)
193 specialised_packet = clientbound.play.EnterCombatEventPacket()
194 self.assertIsInstance(specialised_packet.event, type(packet.event))
195 for field in specialised_packet.fields:
196 value = getattr(packet.event, field)
197 setattr(specialised_packet, field, value)
198 self.assertEqual(getattr(specialised_packet.event, field), value)
200 packet = clientbound.play.CombatEventPacket(
201 event=clientbound.play.CombatEventPacket.EndCombatEvent(
202 duration=415, entity_id=91063502))
203 self.assertEqual(str(packet),
204 'CombatEventPacket(event=EndCombatEvent('
205 'duration=415, entity_id=91063502))')
206 self._test_read_write_packet(packet, vmax=PRE | 14)
207 with self.assertRaises(NotImplementedError):
208 self._test_read_write_packet(packet, vmin=PRE | 15)
210 specialised_packet = clientbound.play.EndCombatEventPacket()
211 self.assertIsInstance(specialised_packet.event, type(packet.event))
212 for field in specialised_packet.fields:
213 value = getattr(packet.event, field)
214 setattr(specialised_packet, field, value)
215 self.assertEqual(getattr(specialised_packet.event, field), value)
217 packet = clientbound.play.CombatEventPacket(
218 event=clientbound.play.CombatEventPacket.EntityDeadEvent(
219 player_id=178, entity_id=36, message='RIP'))
220 self.assertEqual(
221 str(packet),
222 "CombatEventPacket(event=EntityDeadEvent("
223 "player_id=178, entity_id=36, message='RIP'))"
225 self._test_read_write_packet(packet, vmax=PRE | 14)
226 with self.assertRaises(NotImplementedError):
227 self._test_read_write_packet(packet, vmin=PRE | 15)
229 specialised_packet = clientbound.play.DeathCombatEventPacket()
230 self.assertIsInstance(specialised_packet.event, type(packet.event))
231 for field in specialised_packet.fields:
232 value = getattr(packet.event, field)
233 setattr(specialised_packet, field, value)
234 self.assertEqual(getattr(specialised_packet.event, field), value)
236 def test_multi_block_change_packet(self):
237 Record = clientbound.play.MultiBlockChangePacket.Record
239 for protocol_version in TEST_VERSIONS:
240 context = ConnectionContext()
241 context.protocol_version = protocol_version
242 packet = clientbound.play.MultiBlockChangePacket(context)
244 if context.protocol_later_eq(741):
245 packet.chunk_section_pos = Vector(167, 17, 33)
246 packet.invert_trust_edges = False
247 else:
248 packet.chunk_x, packet.chunk_z = 167, 17
249 self.assertEqual(packet.chunk_pos, (167, 17))
251 packet.records = [
252 Record(x=1, y=2, z=3, blockId=56, blockMeta=13),
253 Record(position=Vector(1, 2, 3), block_state_id=909),
254 Record(position=(1, 2, 3), blockStateId=909),
257 for i in range(3):
258 self.assertEqual(packet.records[i].blockId, 56)
259 self.assertEqual(packet.records[i].blockMeta, 13)
260 self.assertEqual(packet.records[i].blockStateId, 909)
261 self.assertEqual(packet.records[i].position, Vector(1, 2, 3))
263 self._test_read_write_packet(packet, context)
265 def test_spawn_object_packet(self):
266 for protocol_version in TEST_VERSIONS:
267 logging.debug('protocol_version = %r' % protocol_version)
268 context = ConnectionContext(protocol_version=protocol_version)
270 EntityType = clientbound.play.SpawnObjectPacket.field_enum(
271 'type_id', context)
273 pos_look = PositionAndLook(
274 position=(Vector(68.0, 38.0, 76.0)
275 if context.protocol_later_eq(100) else
276 Vector(68, 38, 76)),
277 yaw=263.494, pitch=180)
278 velocity = Vector(21, 55, 41)
279 entity_id, type_name, type_id = 49846, 'EGG', EntityType.EGG
281 packet = clientbound.play.SpawnObjectPacket(
282 context=context,
283 x=pos_look.x, y=pos_look.y, z=pos_look.z,
284 yaw=pos_look.yaw, pitch=pos_look.pitch,
285 velocity_x=velocity.x, velocity_y=velocity.y,
286 velocity_z=velocity.z,
287 entity_id=entity_id, type_id=type_id, data=1)
288 if context.protocol_later_eq(49):
289 object_uuid = 'd9568851-85bc-4a10-8d6a-261d130626fa'
290 packet.object_uuid = object_uuid
291 self.assertEqual(packet.objectUUID, object_uuid)
292 self.assertEqual(packet.position_and_look, pos_look)
293 self.assertEqual(packet.position, pos_look.position)
294 self.assertEqual(packet.velocity, velocity)
295 self.assertEqual(packet.type, type_name)
297 self.assertEqual(
298 str(packet),
299 "0x%02X SpawnObjectPacket(entity_id=49846, "
300 "object_uuid='d9568851-85bc-4a10-8d6a-261d130626fa', "
301 "type_id=EGG, x=68.0, y=38.0, z=76.0, pitch=180, yaw=263.494, "
302 "data=1, velocity_x=21, velocity_y=55, velocity_z=41)"
303 % packet.id if context.protocol_later_eq(100) else
304 "0x%02X SpawnObjectPacket(entity_id=49846, "
305 "object_uuid='d9568851-85bc-4a10-8d6a-261d130626fa', "
306 "type_id=EGG, x=68, y=38, z=76, pitch=180, yaw=263.494, "
307 "data=1, velocity_x=21, velocity_y=55, velocity_z=41)"
308 % packet.id if context.protocol_later_eq(49) else
309 "0x%02X SpawnObjectPacket(entity_id=49846, type_id=EGG, "
310 "x=68, y=38, z=76, pitch=180, yaw=263.494, data=1, "
311 "velocity_x=21, velocity_y=55, velocity_z=41)" % packet.id
314 packet2 = clientbound.play.SpawnObjectPacket(
315 context=context, position_and_look=pos_look,
316 velocity=velocity, type=type_name,
317 entity_id=entity_id, data=1)
318 if context.protocol_later_eq(49):
319 packet2.object_uuid = object_uuid
320 self.assertEqual(packet.__dict__, packet2.__dict__)
322 packet2.position = pos_look.position
323 self.assertEqual(packet.position, packet2.position)
325 packet2.data = 0
326 if context.protocol_earlier(49):
327 del packet2.velocity
328 self._test_read_write_packet(packet, context,
329 yaw=360/256, pitch=360/256)
330 self._test_read_write_packet(packet2, context,
331 yaw=360/256, pitch=360/256)
333 def test_sound_effect_packet(self):
334 for protocol_version in TEST_VERSIONS:
335 context = ConnectionContext(protocol_version=protocol_version)
337 packet = clientbound.play.SoundEffectPacket(
338 sound_id=545, effect_position=Vector(0.125, 300.0, 50.5),
339 volume=0.75)
340 if context.protocol_later_eq(201):
341 packet.pitch = struct.unpack('f', struct.pack('f', 1.5))[0]
342 else:
343 packet.pitch = int(1.5 / 63.5) * 63.5
344 if context.protocol_later_eq(95):
345 packet.sound_category = \
346 clientbound.play.SoundEffectPacket.SoundCategory.NEUTRAL
348 self._test_read_write_packet(packet, context)
350 def test_face_player_packet(self):
351 for protocol_version in TEST_VERSIONS:
352 context = ConnectionContext(protocol_version=protocol_version)
354 packet = clientbound.play.FacePlayerPacket(context)
355 packet.target = 1.0, -2.0, 3.5
356 packet.entity_id = None
357 if context.protocol_later_eq(353):
358 packet.origin = OriginPoint.EYES
359 self.assertEqual(
360 str(packet),
361 "0x%02X FacePlayerPacket(origin=EYES, x=1.0, y=-2.0, z=3.5, "
362 "entity_id=None)" % packet.id
363 if context.protocol_later_eq(353) else
364 "0x%02X FacePlayerPacket(entity_id=None, x=1.0, y=-2.0, z=3.5)"
365 % packet.id
367 self._test_read_write_packet(packet, context)
369 packet.entity_id = 123
370 if context.protocol_later_eq(353):
371 packet.entity_origin = OriginPoint.FEET
372 else:
373 del packet.target
374 self.assertEqual(
375 str(packet),
376 "0x%02X FacePlayerPacket(origin=EYES, x=1.0, y=-2.0, z=3.5, "
377 "entity_id=123, entity_origin=FEET)" % packet.id
378 if context.protocol_later_eq(353) else
379 "0x%02X FacePlayerPacket(entity_id=123)" % packet.id
381 self._test_read_write_packet(packet, context)
383 def _test_read_write_packet(
384 self, packet_in, context=None, vmin=None, vmax=None, **kwargs
387 If kwargs are specified, the key will be tested against the
388 respective delta value. Useful for testing FixedPointNumbers
389 where there is precision lost in the resulting value.
391 if context is None:
392 for pv in TEST_VERSIONS:
393 if vmin is not None and protocol_earlier(pv, vmin):
394 continue
395 if vmax is not None and protocol_earlier(vmax, pv):
396 continue
397 logging.debug('protocol_version = %r' % pv)
398 context = ConnectionContext(protocol_version=pv)
399 self._test_read_write_packet(packet_in, context)
400 else:
401 packet_in.context = context
402 packet_buffer = PacketBuffer()
403 packet_in.write(packet_buffer)
404 packet_buffer.reset_cursor()
405 VarInt.read(packet_buffer)
406 packet_id = VarInt.read(packet_buffer)
407 self.assertEqual(packet_id, packet_in.id)
409 packet_out = type(packet_in)(context=context)
410 packet_out.read(packet_buffer)
411 self.assertIs(type(packet_in), type(packet_out))
413 for packet_attr, precision in kwargs.items():
414 packet_attribute_in = packet_in.__dict__.pop(packet_attr)
415 packet_attribute_out = packet_out.__dict__.pop(packet_attr)
416 self.assertAlmostEqual(packet_attribute_in,
417 packet_attribute_out,
418 delta=precision)
420 self.assertEqual(packet_in.__dict__, packet_out.__dict__)