1 # -*- coding: utf-8 -*-
6 from zlib
import decompress
7 from random
import choice
9 from minecraft
.utility
import protocol_earlier
10 from minecraft
import (
11 PRE
, SUPPORTED_PROTOCOL_VERSIONS
, RELEASE_PROTOCOL_VERSIONS
,
13 from minecraft
.networking
.connection
import ConnectionContext
14 from minecraft
.networking
.types
import (
15 VarInt
, Enum
, Vector
, PositionAndLook
, OriginPoint
,
17 from minecraft
.networking
.packets
import (
18 Packet
, PacketBuffer
, PacketListener
, KeepAlivePacket
, serverbound
,
22 TEST_VERSIONS
= list(RELEASE_PROTOCOL_VERSIONS
)
23 if SUPPORTED_PROTOCOL_VERSIONS
[-1] not in TEST_VERSIONS
:
24 TEST_VERSIONS
.append(SUPPORTED_PROTOCOL_VERSIONS
[-1])
27 class PacketBufferTest(unittest
.TestCase
):
28 def test_basic_read_write(self
):
31 packet_buffer
= PacketBuffer()
32 packet_buffer
.send(message
)
34 packet_buffer
.reset_cursor()
35 self
.assertEqual(packet_buffer
.read(), message
)
36 packet_buffer
.reset_cursor()
37 self
.assertEqual(packet_buffer
.recv(), message
)
40 self
.assertNotEqual(packet_buffer
.read(), message
)
42 def test_get_writable(self
):
45 packet_buffer
= PacketBuffer()
46 packet_buffer
.send(message
)
48 self
.assertEqual(packet_buffer
.get_writable(), message
)
51 class PacketSerializationTest(unittest
.TestCase
):
53 def test_packet(self
):
54 for protocol_version
in TEST_VERSIONS
:
55 logging
.debug('protocol_version = %r' % protocol_version
)
56 context
= ConnectionContext(protocol_version
=protocol_version
)
58 packet
= serverbound
.play
.ChatPacket(context
)
59 packet
.message
= u
"κόσμε"
61 packet_buffer
= PacketBuffer()
62 packet
.write(packet_buffer
)
64 packet_buffer
.reset_cursor()
65 # Read the length and packet id
66 VarInt
.read(packet_buffer
)
67 packet_id
= VarInt
.read(packet_buffer
)
68 self
.assertEqual(packet_id
, packet
.id)
70 deserialized
= serverbound
.play
.ChatPacket(context
)
71 deserialized
.read(packet_buffer
)
73 self
.assertEqual(packet
.message
, deserialized
.message
)
75 def test_compressed_packet(self
):
76 for protocol_version
in TEST_VERSIONS
:
77 logging
.debug('protocol_version = %r' % protocol_version
)
78 context
= ConnectionContext(protocol_version
=protocol_version
)
80 msg
= ''.join(choice(string
.ascii_lowercase
) for i
in range(500))
81 packet
= serverbound
.play
.ChatPacket(context
)
84 self
.write_read_packet(packet
, 20)
85 self
.write_read_packet(packet
, -1)
87 def write_read_packet(self
, packet
, compression_threshold
):
88 for protocol_version
in TEST_VERSIONS
:
89 logging
.debug('protocol_version = %r' % protocol_version
)
90 context
= ConnectionContext(protocol_version
=protocol_version
)
92 packet_buffer
= PacketBuffer()
93 packet
.write(packet_buffer
, compression_threshold
)
95 packet_buffer
.reset_cursor()
97 VarInt
.read(packet_buffer
)
98 compressed_size
= VarInt
.read(packet_buffer
)
100 if compressed_size
> 0:
101 decompressed
= decompress(packet_buffer
.read(compressed_size
))
102 packet_buffer
.reset()
103 packet_buffer
.send(decompressed
)
104 packet_buffer
.reset_cursor()
106 packet_id
= VarInt
.read(packet_buffer
)
107 self
.assertEqual(packet_id
, packet
.id)
109 deserialized
= serverbound
.play
.ChatPacket(context
)
110 deserialized
.read(packet_buffer
)
112 self
.assertEqual(packet
.message
, deserialized
.message
)
115 class PacketListenerTest(unittest
.TestCase
):
117 def test_listener(self
):
118 message
= "hello world"
120 def test_packet(chat_packet
):
121 self
.assertEqual(chat_packet
.message
, message
)
123 for protocol_version
in TEST_VERSIONS
:
124 logging
.debug('protocol_version = %r' % protocol_version
)
125 context
= ConnectionContext(protocol_version
=protocol_version
)
127 listener
= PacketListener(test_packet
, serverbound
.play
.ChatPacket
)
129 packet
= serverbound
.play
.ChatPacket(context
).set_values(
131 uncalled_packet
= KeepAlivePacket().set_values(keep_alive_id
=0)
133 listener
.call_packet(packet
)
134 listener
.call_packet(uncalled_packet
)
137 class PacketEnumTest(unittest
.TestCase
):
138 def test_packet_str(self
):
139 class ExamplePacket(Packet
):
141 packet_name
= 'example'
154 str(ExamplePacket(ConnectionContext(), alpha
=0, beta
=0, gamma
=0)),
155 '0x00 ExamplePacket(alpha=ZERO, beta=0, gamma=0)'
159 class TestReadWritePackets(unittest
.TestCase
):
162 def test_explosion_packet(self
):
163 context
= ConnectionContext(protocol_version
=TEST_VERSIONS
[-1])
164 Record
= clientbound
.play
.ExplosionPacket
.Record
165 packet
= clientbound
.play
.ExplosionPacket(
166 position
=Vector(787, -37, 0), radius
=15,
167 records
=[Record(-14, -116, -5), Record(-77, 34, -36),
168 Record(-35, -127, 95), Record(11, 113, -8)],
169 player_motion
=Vector(4, 5, 0), context
=context
)
173 '0x%02X ExplosionPacket(x=787, y=-37, z=0, radius=15, records=['
174 'Record(-14, -116, -5), Record(-77, 34, -36), '
175 'Record(-35, -127, 95), Record(11, 113, -8)], '
176 'player_motion_x=4, player_motion_y=5, player_motion_z=0)'
180 self
._test
_read
_write
_packet
(packet
)
182 def test_combat_event_packet(self
):
183 packet
= clientbound
.play
.CombatEventPacket(
184 event
=clientbound
.play
.CombatEventPacket
.EnterCombatEvent())
187 'CombatEventPacket(event=EnterCombatEvent())'
189 self
._test
_read
_write
_packet
(packet
, vmax
=PRE |
14)
190 with self
.assertRaises(NotImplementedError):
191 self
._test
_read
_write
_packet
(packet
, vmin
=PRE |
15)
193 specialised_packet
= clientbound
.play
.EnterCombatEventPacket()
194 self
.assertIsInstance(specialised_packet
.event
, type(packet
.event
))
195 for field
in specialised_packet
.fields
:
196 value
= getattr(packet
.event
, field
)
197 setattr(specialised_packet
, field
, value
)
198 self
.assertEqual(getattr(specialised_packet
.event
, field
), value
)
200 packet
= clientbound
.play
.CombatEventPacket(
201 event
=clientbound
.play
.CombatEventPacket
.EndCombatEvent(
202 duration
=415, entity_id
=91063502))
203 self
.assertEqual(str(packet
),
204 'CombatEventPacket(event=EndCombatEvent('
205 'duration=415, entity_id=91063502))')
206 self
._test
_read
_write
_packet
(packet
, vmax
=PRE |
14)
207 with self
.assertRaises(NotImplementedError):
208 self
._test
_read
_write
_packet
(packet
, vmin
=PRE |
15)
210 specialised_packet
= clientbound
.play
.EndCombatEventPacket()
211 self
.assertIsInstance(specialised_packet
.event
, type(packet
.event
))
212 for field
in specialised_packet
.fields
:
213 value
= getattr(packet
.event
, field
)
214 setattr(specialised_packet
, field
, value
)
215 self
.assertEqual(getattr(specialised_packet
.event
, field
), value
)
217 packet
= clientbound
.play
.CombatEventPacket(
218 event
=clientbound
.play
.CombatEventPacket
.EntityDeadEvent(
219 player_id
=178, entity_id
=36, message
='RIP'))
222 "CombatEventPacket(event=EntityDeadEvent("
223 "player_id=178, entity_id=36, message='RIP'))"
225 self
._test
_read
_write
_packet
(packet
, vmax
=PRE |
14)
226 with self
.assertRaises(NotImplementedError):
227 self
._test
_read
_write
_packet
(packet
, vmin
=PRE |
15)
229 specialised_packet
= clientbound
.play
.DeathCombatEventPacket()
230 self
.assertIsInstance(specialised_packet
.event
, type(packet
.event
))
231 for field
in specialised_packet
.fields
:
232 value
= getattr(packet
.event
, field
)
233 setattr(specialised_packet
, field
, value
)
234 self
.assertEqual(getattr(specialised_packet
.event
, field
), value
)
236 def test_multi_block_change_packet(self
):
237 Record
= clientbound
.play
.MultiBlockChangePacket
.Record
239 for protocol_version
in TEST_VERSIONS
:
240 context
= ConnectionContext()
241 context
.protocol_version
= protocol_version
242 packet
= clientbound
.play
.MultiBlockChangePacket(context
)
244 if context
.protocol_later_eq(741):
245 packet
.chunk_section_pos
= Vector(167, 17, 33)
246 packet
.invert_trust_edges
= False
248 packet
.chunk_x
, packet
.chunk_z
= 167, 17
249 self
.assertEqual(packet
.chunk_pos
, (167, 17))
252 Record(x
=1, y
=2, z
=3, blockId
=56, blockMeta
=13),
253 Record(position
=Vector(1, 2, 3), block_state_id
=909),
254 Record(position
=(1, 2, 3), blockStateId
=909),
258 self
.assertEqual(packet
.records
[i
].blockId
, 56)
259 self
.assertEqual(packet
.records
[i
].blockMeta
, 13)
260 self
.assertEqual(packet
.records
[i
].blockStateId
, 909)
261 self
.assertEqual(packet
.records
[i
].position
, Vector(1, 2, 3))
263 self
._test
_read
_write
_packet
(packet
, context
)
265 def test_spawn_object_packet(self
):
266 for protocol_version
in TEST_VERSIONS
:
267 logging
.debug('protocol_version = %r' % protocol_version
)
268 context
= ConnectionContext(protocol_version
=protocol_version
)
270 EntityType
= clientbound
.play
.SpawnObjectPacket
.field_enum(
273 pos_look
= PositionAndLook(
274 position
=(Vector(68.0, 38.0, 76.0)
275 if context
.protocol_later_eq(100) else
277 yaw
=263.494, pitch
=180)
278 velocity
= Vector(21, 55, 41)
279 entity_id
, type_name
, type_id
= 49846, 'EGG', EntityType
.EGG
281 packet
= clientbound
.play
.SpawnObjectPacket(
283 x
=pos_look
.x
, y
=pos_look
.y
, z
=pos_look
.z
,
284 yaw
=pos_look
.yaw
, pitch
=pos_look
.pitch
,
285 velocity_x
=velocity
.x
, velocity_y
=velocity
.y
,
286 velocity_z
=velocity
.z
,
287 entity_id
=entity_id
, type_id
=type_id
, data
=1)
288 if context
.protocol_later_eq(49):
289 object_uuid
= 'd9568851-85bc-4a10-8d6a-261d130626fa'
290 packet
.object_uuid
= object_uuid
291 self
.assertEqual(packet
.objectUUID
, object_uuid
)
292 self
.assertEqual(packet
.position_and_look
, pos_look
)
293 self
.assertEqual(packet
.position
, pos_look
.position
)
294 self
.assertEqual(packet
.velocity
, velocity
)
295 self
.assertEqual(packet
.type, type_name
)
299 "0x%02X SpawnObjectPacket(entity_id=49846, "
300 "object_uuid='d9568851-85bc-4a10-8d6a-261d130626fa', "
301 "type_id=EGG, x=68.0, y=38.0, z=76.0, pitch=180, yaw=263.494, "
302 "data=1, velocity_x=21, velocity_y=55, velocity_z=41)"
303 % packet
.id if context
.protocol_later_eq(100) else
304 "0x%02X SpawnObjectPacket(entity_id=49846, "
305 "object_uuid='d9568851-85bc-4a10-8d6a-261d130626fa', "
306 "type_id=EGG, x=68, y=38, z=76, pitch=180, yaw=263.494, "
307 "data=1, velocity_x=21, velocity_y=55, velocity_z=41)"
308 % packet
.id if context
.protocol_later_eq(49) else
309 "0x%02X SpawnObjectPacket(entity_id=49846, type_id=EGG, "
310 "x=68, y=38, z=76, pitch=180, yaw=263.494, data=1, "
311 "velocity_x=21, velocity_y=55, velocity_z=41)" % packet
.id
314 packet2
= clientbound
.play
.SpawnObjectPacket(
315 context
=context
, position_and_look
=pos_look
,
316 velocity
=velocity
, type=type_name
,
317 entity_id
=entity_id
, data
=1)
318 if context
.protocol_later_eq(49):
319 packet2
.object_uuid
= object_uuid
320 self
.assertEqual(packet
.__dict
__, packet2
.__dict__
)
322 packet2
.position
= pos_look
.position
323 self
.assertEqual(packet
.position
, packet2
.position
)
326 if context
.protocol_earlier(49):
328 self
._test
_read
_write
_packet
(packet
, context
,
329 yaw
=360/256, pitch
=360/256)
330 self
._test
_read
_write
_packet
(packet2
, context
,
331 yaw
=360/256, pitch
=360/256)
333 def test_sound_effect_packet(self
):
334 for protocol_version
in TEST_VERSIONS
:
335 context
= ConnectionContext(protocol_version
=protocol_version
)
337 packet
= clientbound
.play
.SoundEffectPacket(
338 sound_id
=545, effect_position
=Vector(0.125, 300.0, 50.5),
340 if context
.protocol_later_eq(201):
341 packet
.pitch
= struct
.unpack('f', struct
.pack('f', 1.5))[0]
343 packet
.pitch
= int(1.5 / 63.5) * 63.5
344 if context
.protocol_later_eq(95):
345 packet
.sound_category
= \
346 clientbound
.play
.SoundEffectPacket
.SoundCategory
.NEUTRAL
348 self
._test
_read
_write
_packet
(packet
, context
)
350 def test_face_player_packet(self
):
351 for protocol_version
in TEST_VERSIONS
:
352 context
= ConnectionContext(protocol_version
=protocol_version
)
354 packet
= clientbound
.play
.FacePlayerPacket(context
)
355 packet
.target
= 1.0, -2.0, 3.5
356 packet
.entity_id
= None
357 if context
.protocol_later_eq(353):
358 packet
.origin
= OriginPoint
.EYES
361 "0x%02X FacePlayerPacket(origin=EYES, x=1.0, y=-2.0, z=3.5, "
362 "entity_id=None)" % packet
.id
363 if context
.protocol_later_eq(353) else
364 "0x%02X FacePlayerPacket(entity_id=None, x=1.0, y=-2.0, z=3.5)"
367 self
._test
_read
_write
_packet
(packet
, context
)
369 packet
.entity_id
= 123
370 if context
.protocol_later_eq(353):
371 packet
.entity_origin
= OriginPoint
.FEET
376 "0x%02X FacePlayerPacket(origin=EYES, x=1.0, y=-2.0, z=3.5, "
377 "entity_id=123, entity_origin=FEET)" % packet
.id
378 if context
.protocol_later_eq(353) else
379 "0x%02X FacePlayerPacket(entity_id=123)" % packet
.id
381 self
._test
_read
_write
_packet
(packet
, context
)
383 def _test_read_write_packet(
384 self
, packet_in
, context
=None, vmin
=None, vmax
=None, **kwargs
387 If kwargs are specified, the key will be tested against the
388 respective delta value. Useful for testing FixedPointNumbers
389 where there is precision lost in the resulting value.
392 for pv
in TEST_VERSIONS
:
393 if vmin
is not None and protocol_earlier(pv
, vmin
):
395 if vmax
is not None and protocol_earlier(vmax
, pv
):
397 logging
.debug('protocol_version = %r' % pv
)
398 context
= ConnectionContext(protocol_version
=pv
)
399 self
._test
_read
_write
_packet
(packet_in
, context
)
401 packet_in
.context
= context
402 packet_buffer
= PacketBuffer()
403 packet_in
.write(packet_buffer
)
404 packet_buffer
.reset_cursor()
405 VarInt
.read(packet_buffer
)
406 packet_id
= VarInt
.read(packet_buffer
)
407 self
.assertEqual(packet_id
, packet_in
.id)
409 packet_out
= type(packet_in
)(context
=context
)
410 packet_out
.read(packet_buffer
)
411 self
.assertIs(type(packet_in
), type(packet_out
))
413 for packet_attr
, precision
in kwargs
.items():
414 packet_attribute_in
= packet_in
.__dict
__.pop(packet_attr
)
415 packet_attribute_out
= packet_out
.__dict
__.pop(packet_attr
)
416 self
.assertAlmostEqual(packet_attribute_in
,
417 packet_attribute_out
,
420 self
.assertEqual(packet_in
.__dict
__, packet_out
.__dict
__)