1 from mod_pywebsocket
import common
2 from mod_pywebsocket
import msgutil
5 def _retrieve_frame(stream
):
6 # FIXME: Use better API.
7 frame
= stream
._receive
_frame
_as
_frame
_object
()
8 for frame_filter
in stream
._options
.incoming_frame_filters
:
9 frame_filter
.filter(frame
)
13 def web_socket_do_extra_handshake(request
):
14 # Disable compression extensions because we handle frame directly
16 request
.ws_extension_processors
= []
19 def web_socket_transfer_data(request
):
20 expected_messages
= ['Hello, world!']
22 for test_number
, expected_message
in enumerate(expected_messages
):
23 frame
= _retrieve_frame(request
.ws_stream
)
24 if frame
.opcode
== common
.OPCODE_BINARY
and frame
.payload
== expected_message
and frame
.fin
:
25 msgutil
.send_message(request
, 'PASS: Message #%d.' % test_number
)
27 msgutil
.send_message(request
, 'FAIL: Message #%d: Received unexpected frame: opcode = %r, payload = %r, final = %r' % (test_number
, frame
.opcode
, frame
.payload
, frame
.fin
))
30 def all_distinct_bytes():
31 return ''.join([chr(i
) for i
in xrange(256)])