Net::REPL::Client: Allow read/print portions to be overridden separately.
[thrasher.git] / perl / lib / Thrasher / Plugin / ProxyFileTransfer.pm
blob236312ca9b952bb4e868a5af579117f556ad020c
1 package Thrasher::Plugin::ProxyFileTransfer;
3 use strict;
4 use warnings;
6 =pod
8 =head1 NAME
10 Thrasher::Plugin::ProxyFileTransfer - using a XEP-0065-compliant
11 socket proxy, transfer files to and from the legacy service.
13 =head1 DESCRIPTION
15 This module provides as much code as possible to implement file
16 transfer to and from the legacy service, but legacy Protocols will
17 still need to implement a little bit of stuff to hook everything
18 together. This will be documented at some point, probably when
19 somebody asks for it.
21 Note that this does not permit direct connections; it always uses a
22 proxy. This works out well for what the authors need. If you need to
23 permit direct connections, please feel free to submit a patch; it's
24 certainly possible.
26 Unfortunately, this is a very complicated interaction. If you have
27 any interest in working on this, I strongly recommend popping open
28 something with an XML console (like Psi) and looking at all the XML
29 flying around, rather than trying to figure out what is going
30 on by reading this code. This is event-based code at its worst.
32 =cut
34 # FIXME: Use bytestream query to see if remote client can support file
35 # transfers
36 # FIXME: Verify the account is online when sending a file.
38 use Thrasher::Callbacks qw(:all);
39 use Thrasher::Log qw(:all);
40 use Thrasher::Constants qw(:all);
41 use Thrasher::XML qw(:all);
42 use Thrasher::Plugin qw(:all);
44 use Digest::SHA1 qw(sha1_hex);
46 use Thrasher::Plugin::Socks;
48 use Data::Dumper;
50 # FIXME: It is possible to fire discovery before the proxy is
51 # configured. If someone wants to transfer files and no proxy
52 # was found, try again before giving up.
53 # undef => not yet discovered
54 # 0 => discovery failed; there is no XEP-0065 proxy
55 my $proxy_server_jid = undef;
56 my $proxy_server_port = undef;
57 my $proxy_server_host = undef;
59 # This is keyed by stream id
60 my $file_transfer_data_by_sid = {};
62 # * Key: Thrasher's file id as returned by thrasher_wrapper_send_file.
63 # * Value: Thrasher::Plugin::ProxyFileTransfer::FT object.
64 my $file_transfer_data_by_id = {};
66 # When the server completes the basic discovery sweep it does,
67 # finish discovering the proxy's information.
68 do_when('server_discovery_items',
69 sub {
70 PROXY_SEARCH:
71 for my $item (keys %{$Thrasher::SERVER_INFO}) {
72 my ($identities, undef) =
73 @{$Thrasher::SERVER_INFO->{$item}};
74 for my $identity (@$identities) {
75 my ($type, $category, $name) = @$identity;
76 if ($type eq 'bytestreams' &&
77 $category eq 'proxy') {
78 $proxy_server_jid = $item;
79 succeeded("proxy_server_found");
80 last PROXY_SEARCH;
85 if (!defined($proxy_server_jid)) {
86 log("ERROR: No XEP-0065-compliant SOCKS5 proxy "
87 ."service found on the host server. File "
88 ."transfers WILL NOT WORK.");
89 $proxy_server_jid = 0;
90 failed("proxy_server_found");
91 } else {
92 log("Proxy server for file transfers found: ".
93 "$proxy_server_jid; querying for IP");
94 $Thrasher::Component::COMPONENT->iq_query
95 ([[$NS_COMPONENT, 'iq'],
96 {to => $proxy_server_jid,
97 from => $Thrasher::Component::COMPONENT->{component_name},
98 type => 'get'},
99 [[[$NS_BYTESTREAMS, 'query'], {}, []]]],
100 sub {
101 my $component = shift;
102 my $iq_params = shift;
103 my $iq_packet = shift;
105 if ($iq_params->{type} eq 'error') {
106 log("ERROR: Bytestreams proxy won't"
107 ." tell us the IP it is located on."
108 ." File transfers WILL NOT WORK. "
109 ."You may need to tweak permissions "
110 ."to permit "
111 .$Thrasher::Component::COMPONENT->{component_name}
112 ." to connect to " .
113 $proxy_server_jid . ".");
114 return;
117 # Note that while this query is permitted
118 # to return a zeroconf value, we don't
119 # support that at this time.
120 my $streamhost;
122 local $@;
123 eval {
124 $streamhost = extract
125 ([undef, undef,
126 save_match(
127 [[$NS_BYTESTREAMS,
128 'streamhost'],
129 {'{}host' => save('host'),
130 '{}port' => save('port')},
131 undef])],
132 $iq_params->{query});
134 if ($@) {
135 $streamhost = undef;
138 if ($streamhost) {
139 $proxy_server_port = $streamhost->{port};
140 $proxy_server_host = $streamhost->{host};
141 log("Proxy server fully located, file "
142 . "transfers should work.");
143 succeeded('xep-0065_proxy_server_located');
144 _initialize_the_wrapper();
146 else {
147 log("Error: Proxy server will not "
148 . "identify the ip/port it is "
149 . "on. File transfers WILL NOT "
150 . "WORK.");
156 sub {
157 log("ERROR: No XEP-0065-compliant SOCKS5 proxy service "
158 ."found on the host server, because service discovery "
159 ."against the server failed (which is really weird!). "
160 ."File transfers WILL NOT WORK.");
161 $proxy_server_jid = 0;
162 failed("proxy_server_found");
165 register_plugin({client_iq_handlers =>
166 {$NS_BYTESTREAMS =>
167 {set => \&handle_set_bytestreams},
168 $NS_FILE_TRANSFER =>
169 {set => \&handle_set_file_transfer},
170 $NS_STREAM_INITIATION =>
171 {set => \&handle_set_stream_initiation}
173 features => [$NS_BYTESTREAMS,
174 $NS_STREAM_INITIATION,
175 $NS_FILE_TRANSFER]});
177 #<iq type="set" to="profgilzot@aim.transport" id="ab06a" >
178 #<query xmlns="http://jabber.org/protocol/bytestreams" mode="tcp" sid="s5b_5666fd39c31ac262" >
179 #<streamhost port="7777" host="10.2.68.46" jid="proxy65.imtest.barracuda.com" >
180 #<proxy xmlns="http://affinix.com/jabber/stream"/>
181 #</streamhost>
182 #<fast xmlns="http://affinix.com/jabber/stream"/>
183 #</query>
184 #</iq>
186 sub handle_set_bytestreams {
187 my $component = shift;
188 my $iq_params = shift;
189 my $iq_packet = shift;
191 if (!$proxy_server_jid ||
192 !$proxy_server_port ||
193 !$proxy_server_host) {
194 log('Cannot set bytestreams; no proxy server yet known.');
195 $component->iq_error($iq_params, 'service_unavailable');
196 return;
199 my $query = $iq_params->{query};
201 my $stream_id = $query->[1]->{'{}sid'};
202 if (!defined($stream_id)) {
203 log("While attempting to deal with bytestream negotiation, "
204 ."got a packet with no sid.");
205 $component->iq_error($iq_params, 'bad_request');
206 return;
209 my $file_data = delete $file_transfer_data_by_sid->{$stream_id};
210 if (!defined($file_data)) {
211 log("While attempting to find the file data, couldn't "
212 ."find data about stream id $stream_id.");
213 $component->iq_error($iq_params, 'bad_request');
214 return;
217 # We are looking for a streamhost that isn't using zeroconf
218 my $acceptable_streamhosts = [];
220 for my $child (@{$iq_params->{query}->[2]}) {
221 if (ref($child) &&
222 $child->[0]->[1] eq 'streamhost') {
223 my $atts = $child->[1];
224 if ($atts->{'{}zeroconf'}) {
225 next;
228 if ((my $jid = $atts->{'{}jid'}) &&
229 (my $host = $atts->{'{}host'}) &&
230 (my $port = $atts->{'{}port'})) {
231 push @$acceptable_streamhosts,
232 {jid => $jid,
233 host => $host,
234 port => $port};
239 if (!@$acceptable_streamhosts) {
240 log("Could not negotiate an acceptable streamhost.");
241 $component->iq_error($iq_params, 'not_acceptable');
242 return;
245 # FIXME: We really ought to try these in sequence. However,
246 # I find myself wondering how many clients actually ship
247 # out more than one possibility.
248 $file_transfer_data_by_sid->{streamhosts} = $acceptable_streamhosts;
250 my $streamhost = $file_transfer_data_by_sid->{streamhosts}->[0];
252 log("Streamhost chosen: " . Dumper($streamhost));
254 my $host = sha1_hex($stream_id .
255 $iq_params->{from} .
256 $iq_params->{to});
258 my $ft = Thrasher::Plugin::ProxyFileTransfer::FT->new();
260 # Unfortunately, the raw TCP connection seems to be
261 # synchronous and I don't see how to make it
262 # unsynchronous. Fortunately, it should be local anyhow.
263 $ft->{'socks'} = Thrasher::Plugin::Socks->new(
264 ProxyAddr => $streamhost->{'host'},
265 ProxyPort => $streamhost->{'port'},
266 ConnectAddr => $host,
267 ConnectPort => 0,
268 EventLoop => $component->{'event_loop'},
269 SuccessCallback => sub {
270 my ($data_read_sub) = @_;
271 log('Got SOCKS data reader.');
272 $ft->{'data_reader'} = $data_read_sub;
274 FailureCallback => sub { log('Proxy file transfer failure.'); },
277 if (!defined($ft->{'socks'})) {
278 # FIXME: Try any other streams offered.
279 $component->iq_error($iq_params, 'item_not_found');
280 return;
283 $component->iq_reply
284 ($iq_params,
285 [[$NS_BYTESTREAMS, 'query'], {},
286 [[[$NS_BYTESTREAMS, 'streamhost-used'],
287 {jid => $streamhost->{jid}},
288 []]]]);
290 my $legacy_to_id =
291 $component->xmpp_name_to_legacy($iq_params->{from},
292 $iq_params->{to});
294 log("About to initiate purple-side file transfer:");
295 my $file_transfer_id = THPPW::thrasher_wrapper_send_file(
296 strip_resource($iq_params->{'from'}),
297 $legacy_to_id,
298 $file_data->{'filename'},
299 $file_data->{'size'},
300 $file_data->{'desc'},
302 if (! $file_transfer_id) {
303 log('Failed to set up libpurple xfer!');
304 $component->iq_error($iq_params, 'service_unavailable');
305 return;
307 log("Purple-side init for $file_transfer_id done.");
309 $file_transfer_data_by_id->{$file_transfer_id} = $ft;
310 $ft->{'id'} = $file_transfer_id;
312 $ft->{'socks'}->connect();
315 sub handle_file_transfer {
316 my $component = shift;
317 my $iq_params = shift;
318 my $iq_packet = shift;
320 log("Got a request for a file transfer.");
321 $component->iq_error($iq_params, 'service_unavailable');
324 sub handle_set_stream_initiation {
325 my $component = shift;
326 my $iq_params = shift;
327 my $iq_packet = shift;
329 log("Got a request for a stream initiation.");
331 # Extract the information for the <file>.
332 my $file_results = recursive_extract
333 ($iq_params->{query},
335 # Extract the <file> tag and save the require params
336 [undef, {'{}id' => save('stream_id')},
337 save_match('rec', [[$NS_FILE_TRANSFER, 'file'],
338 {'{}name' => save('filename'),
339 '{}size' => save('size'),
340 '{}date' => save('date', 1),
341 '{}hash' => save('hash', 1)},
342 undef])],
344 # go looking for the description.
345 # Note we don't implement <range> support right now.
346 [undef, undef,
347 save_match('rec', [[undef, 'desc'], undef,
348 save_sub('desc', \&text_extractor)],
349 1)]);
350 $file_results->{desc} ||= [];
351 $file_results->{desc} = join '', @{$file_results->{desc}};
353 # Extract the feature negotiation information.
354 my $negotiation_results = recursive_extract
355 ($iq_params->{query},
357 [undef, undef,
358 save_match('rec', [[$NS_FEATURE_NEG, 'feature']])],
360 [undef, undef, save_match('rec', [[$NS_DATA, 'x']])],
362 [undef, undef, save_match('rec', [[$NS_DATA, 'field'],
363 {type => 'list-single',
364 var =>
365 'stream-method'}])],
367 # For each option, extract the possible values
368 [undef, undef,
369 save_sub('options',
370 sub {
371 my $tag = shift;
372 # "If this is an option, extract the
373 # value's text element."
374 if (ref($tag) && $tag->[0]->[1] eq 'option') {
375 for my $child (@{$tag->[2]}) {
376 if (ref($child) &&
377 $child->[0]->[1] eq 'value') {
378 return join '', @{$child->[2]};
383 return undef;
385 )]);
387 # Validate the basic information we are looking for
388 my $valid = 1;
389 if (!exists($file_results->{filename})) {
390 log("Offered file transfer to legacy user, but no filename given.");
391 $valid = 0;
393 if (!exists($file_results->{size})) {
394 log("Offered file transfer to legacy user, but no size given.");
395 $valid = 0;
397 if (!$negotiation_results->{options}) {
398 log("Offered file transfer to legacy user, but no feature "
399 ."negotiations found.");
400 $valid = 0;
403 if ($valid == 0) {
404 $component->iq_error($iq_params, 'bad_request');
405 return;
408 # Verify the remote server is offering bytestreams
409 my $bytestreams_offered = 0;
410 for my $option (@{$negotiation_results->{options}}) {
411 if ($option eq $NS_BYTESTREAMS) {
412 $bytestreams_offered = 1;
415 if ($bytestreams_offered == 0) {
416 log("Offered a file transfer to legacy user, but no "
417 ."bytestreams offer found.");
418 $component->iq_error($iq_params, 'bad_request');
421 $file_transfer_data_by_sid->{$file_results->{stream_id}} = $file_results;
423 # If we've gotten to here, we accept the file transfer offer.
424 my $response =
425 [[$NS_STREAM_INITIATION, 'si'], {},
426 [#[[$NS_FILE_TRANSFER, 'file'], {}, []],
427 [[$NS_FEATURE_NEG, 'feature'], {},
428 [[[$NS_DATA, 'x'], {type => 'submit'},
429 [[[$NS_DATA, 'field'], {var => 'stream-method'},
430 [[[$NS_DATA, 'value'], {}, [$NS_BYTESTREAMS]
431 ]]]]]]]]];
432 $component->iq_reply($iq_params, $response);
435 sub _initialize_the_wrapper {
436 THPPW::thrasher_wrapper_ft_init(
437 Thrasher::error_wrap("ft_recv_request", \&_ft_recv_request),
438 Thrasher::error_wrap("ft_recv_accept", \&_ft_recv_accept),
439 Thrasher::error_wrap("ft_recv_start", \&_ft_recv_start),
440 Thrasher::error_wrap("ft_recv_cancel", \&_ft_recv_cancel),
441 Thrasher::error_wrap("ft_recv_complete", \&_ft_recv_complete),
442 Thrasher::error_wrap("ft_send_accept", \&_ft_send_accept),
443 Thrasher::error_wrap("ft_send_start", \&_ft_send_start),
444 Thrasher::error_wrap("ft_send_cancel", \&_ft_send_cancel),
445 Thrasher::error_wrap("ft_send_complete", \&_ft_send_complete),
446 Thrasher::error_wrap("ft_read", \&_ft_read),
447 Thrasher::error_wrap("ft_write", \&_ft_write),
448 Thrasher::error_wrap("ft_data_not_sent", \&_ft_data_not_sent),
452 sub _ft_recv_request {
453 my ($id, $jid, $legacy_name, $filename, $size) = @_;
454 debug("_ft_recv_request($id, $jid, $legacy_name"
455 . ", $filename, $size) called");
457 $jid = strip_resource($jid);
458 my $session = $Thrasher::Component::COMPONENT->session_for($jid);
459 my $stream_id = 's_' . $Thrasher::Component::COMPONENT->get_id();
460 my $legacy_jid = $Thrasher::Component::COMPONENT->legacy_name_to_xmpp(
461 $jid,
462 $legacy_name,
464 my $ft = Thrasher::Plugin::ProxyFileTransfer::FT->new(
465 'sid' => $stream_id,
466 'id' => $id,
467 'target_full_jid' => $session->{'full_jid'},
468 'initiator_legacy_jid' => $legacy_jid,
470 $file_transfer_data_by_id->{$ft->{'id'}} = $ft;
472 # Refer to http://xmpp.org/extensions/xep-0065.html &c.
474 # FT setup stage 1: offer the file and get "declined" or feature
475 # negotiation back.
477 # <iq from='c%d@xmpp.transport' to='a@b/R' type='set'>
478 # <si xmlns='http://jabber.org/protocol/si' profile='http://jabber.org/protocol/si/profile/file-transfer' id='s_id937'>
479 # <file xmlns='http://jabber.org/protocol/si/profile/file-transfer' size='694' name='small.file'>
480 # <range/>
481 # </file>
482 # <feature xmlns='http://jabber.org/protocol/feature-neg'>
483 # <x xmlns='jabber:x:data' type='form'>
484 # <field type='list-single' var='stream-method'>
485 # <option>
486 # <value>http://jabber.org/protocol/bytestreams</value>
487 # </option>
488 # </field>
489 # </x>
490 # </feature>
491 # </si>
492 # </iq>
493 $Thrasher::Component::COMPONENT->iq_query(
494 [[$NS_COMPONENT, 'iq'], { 'to' => $ft->{'target_full_jid'},
495 'from' => $ft->{'initiator_legacy_jid'},
496 'type' => 'set',
498 [[[$NS_STREAM_INITIATION, 'si'], { 'profile' => $NS_FILE_TRANSFER,
499 'id' => $stream_id, },
500 [[[$NS_FILE_TRANSFER, 'file'], { 'size' => $size,
501 'name' => $filename, },
502 [ [[$NS_FILE_TRANSFER, 'range'], {}, []] ]], # </file>
503 [[$NS_FEATURE_NEG, 'feature'], {},
504 [[[$NS_DATA, 'x'], { 'type' => 'form' }, [
505 [[$NS_DATA, 'field'], { 'type' => 'list-single',
506 'var' => 'stream-method', },
507 [[[$NS_DATA, 'option'], {},
508 [[[$NS_DATA, 'value'], {},
509 [ $NS_BYTESTREAMS ],
510 []], # </value>
511 ]], # </option
512 ]], # </field>
513 ]], # </x>
514 ]], # </feature>
515 ]], # </si>
516 ]], # </iq>
517 sub { _ft_recv_request2($ft, @_); });
519 return;
522 # FT setup stage 2:
523 # * got error (including "Declined") => tell libpurple FT was refused.
524 # * bytestreams not available => tell libpurple FT was refused.
525 # * feature negotiation has bytestreams => send streamhost and wait
526 # for streamhost-used.
527 sub _ft_recv_request2 {
528 my ($ft, $component, $iq_params, $iq_packet) = @_;
530 if ($iq_params->{'type'} eq 'error') {
531 # including "Declined"
532 THPPW::thrasher_action_ft_recv_request_respond($ft->{'id'}, 0);
533 return;
536 my $neg = recursive_extract(
537 $iq_params->{query},
538 [undef, undef, save_match('rec', [[$NS_FEATURE_NEG, 'feature']])],
539 [undef, undef, save_match('rec', [[$NS_DATA, 'x']])],
540 [undef, undef,
541 save_sub('stream-method',
542 sub {
543 my ($tag) = @_;
544 # "If $tag is the stream-method field, extract
545 # the value's text element."
546 if (ref($tag)
547 && $tag->[0]->[0] eq $NS_DATA
548 && $tag->[0]->[1] eq 'field') {
549 while (my ($attr, $val) = each(%{$tag->[1]})) {
550 if ($attr =~ /}var$/ && $val eq 'stream-method') {
551 for my $child (@{$tag->[2]}) {
552 if (ref($child)
553 && $child->[0]->[1] eq 'value') {
554 return join('', @{$child->[2]});
558 return undef;
561 })],
563 $neg->{'stream-method'} ||= [];
564 if (! grep(/^$NS_BYTESTREAMS/, @{$neg->{'stream-method'}})) {
565 # If bytestreams aren't available, refuse to FT.
566 THPPW::thrasher_action_ft_recv_request_respond($ft->{'id'}, 0);
567 return;
570 # Send streamhost and wait for streamhost-used
572 # <iq from='c%d@xmpp.transport' to='a@b/R' type='set'>
573 # <query xmlns="http://jabber.org/protocol/bytestreams" sid="s_id937" >
574 # <streamhost port="$proxy_server_port" host="$proxy_server_host" jid="$proxy_server_jid">
575 # <proxy xmlns="http://affinix.com/jabber/stream"/>
576 # </streamhost>
577 # </query>
578 # </iq>
579 $Thrasher::Component::COMPONENT->iq_query(
580 [[$NS_COMPONENT, 'iq'], { 'to' => $ft->{'target_full_jid'},
581 'from' => $ft->{'initiator_legacy_jid'},
582 'type' => 'set',
584 [[[$NS_BYTESTREAMS, 'query'], { 'sid' => $ft->{'sid'}, },
585 [[[$NS_BYTESTREAMS, 'streamhost'], { 'port' => $proxy_server_port,
586 'host' => $proxy_server_host,
587 'jid' => $proxy_server_jid,
589 []], # </streamhost>
590 ]], # </query>
591 ]], # </iq>
592 sub { _ft_recv_request3($ft, @_); });
594 return;
597 # FT setup stage 3:
598 # * got streamhost-used => open SOCKS5 connection to streamhost.
599 # * otherwise, or if TCP connection fails => tell libpurple FT was refused.
600 sub _ft_recv_request3 {
601 my ($ft, $component, $iq_params, $iq_packet) = @_;
603 my $used = extract([undef, undef,
604 save_match([[$NS_BYTESTREAMS, 'streamhost-used'],
605 { '{}jid' => save('jid'), },
606 undef])],
607 $iq_params->{query});
608 if (! $used->{'jid'}) {
609 THPPW::thrasher_action_ft_recv_request_respond($ft->{'id'}, 0);
612 # TODO: Currently can only use the one discovered proxy, which was
613 # also the only streamhost _ft_recv_request2 offered. If any was
614 # picked it better be that one! Check it just in case.
615 if ($used->{'jid'} ne $proxy_server_jid) {
616 debug("FT $ft->{id}: Wrong streamhost-used $used->{jid}");
617 THPPW::thrasher_action_ft_recv_request_respond($ft->{'id'}, 0);
620 log("FT $ft->{id}: TCP connecting.");
621 $ft->{'socks'} = Thrasher::Plugin::Socks->new(
622 Debug => 1,
623 ProxyAddr => $proxy_server_host,
624 ProxyPort => $proxy_server_port,
625 ConnectAddr => sha1_hex(join('',
626 $ft->{'sid'},
627 $ft->{'initiator_legacy_jid'},
628 $ft->{'target_full_jid'})),
629 ConnectPort => 0,
630 EventLoop => $component->{'event_loop'},
631 SuccessCallback => sub { _ft_recv_request4($ft); },
632 FailureCallback => sub {
633 debug("FT $ft->{id}: streamhost connection failed");
634 THPPW::thrasher_action_ft_recv_request_respond($ft->{'id'}, 0);
637 if (defined($ft->{'socks'})) {
638 log("FT $ft->{id}: SOCKS5 connecting.");
639 $ft->{'socks'}->connect();
641 else {
642 log("FT $ft->{id}: TCP connection to SOCKS5 proxy failed.");
643 THPPW::thrasher_action_ft_recv_request_respond($ft->{'id'}, 0);
645 return;
648 # FT setup stage 4:
649 # SOCKS5 connection to streamhost succeeded. Activate streamhost.
650 # Upon result IQ, tell libpurple FT was accepted.
651 sub _ft_recv_request4 {
652 my ($ft) = @_;
654 log("FT $ft->{id}: Got write socket from SOCKS.");
655 $ft->{'write_file'} = $ft->{'socks'}->{'socket'};
657 # <iq type="set" to="$used->{'jid'}" >
658 # <query xmlns="http://jabber.org/protocol/bytestreams" sid="s_id937">
659 # <activate>$ft->{'target_full_jid'}</activate>
660 # </query>
661 # </iq>
662 $Thrasher::Component::COMPONENT->iq_query(
663 [[$NS_COMPONENT, 'iq'], { 'to' => $proxy_server_jid,
664 'from' => $ft->{'initiator_legacy_jid'},
665 'type' => 'set',
667 [[[$NS_BYTESTREAMS, 'query'], { 'sid' => $ft->{'sid'}, },
668 [[[$NS_BYTESTREAMS, 'activate'], {},
669 [ $ft->{'target_full_jid'} ]], # </activate>
670 ]], # </query>
671 ]], # </iq>
672 sub {
673 THPPW::thrasher_action_ft_recv_request_respond($ft->{'id'}, 1);
674 # Don't add the FD watch or send ui_ready yet--must wait
675 # for recv_start or write loop. If the public IM side
676 # isn't ready to read, FT gets canceled before it starts.
680 return;
683 sub _ft_recv_accept {
684 debug("_ft_recv_accept called");
685 return 1;
688 sub _ft_recv_start {
689 debug("_ft_recv_start called");
690 return 1;
693 sub _ft_recv_cancel {
694 my ($id) = @_;
695 debug("_ft_recv_cancel($id) called");
697 my $ft = delete($file_transfer_data_by_id->{$id});
698 # FIXME: End transfer correctly.
699 if ($ft->{'socks'}) {
700 $ft->{'socks'}->close();
703 return;
706 sub _ft_recv_complete {
707 my ($id) = @_;
708 debug("_ft_recv_complete($id) called");
709 my $ft = $file_transfer_data_by_id->{$id};
711 # Ensure any buffered data drains into SOCKS eventually.
712 if ($ft->{'leftover_data'}) {
713 # Done with the FD watch that interacts with the protocol.
714 $ft->{'socks'}->remove_fd_watch();
715 # Write until leftovers are gone.
716 $ft->add_fd_watch(sub {
717 my $written = $ft->write('');
718 if (! $ft->{'leftover_data'} || $written < 0) {
719 # Completely written or a permanent error.
720 _ft_recv_complete($id);
721 return 0;
723 else {
724 return 1;
729 # else done with this side of the FT, too.
730 else {
731 delete($file_transfer_data_by_id->{$id});
732 $ft->{'socks'}->close();
734 return;
737 sub _ft_send_accept {
738 debug("_ft_send_accept called");
739 return 1;
742 sub _ft_send_start {
743 my ($id) = @_;
744 debug("_ft_send_start($id) called");
746 my $ft = $file_transfer_data_by_id->{$id};
747 $ft->add_fd_watch();
749 return 1;
752 sub _ft_send_cancel {
753 my ($id) = @_;
754 debug("_ft_send_cancel($id) called");
756 my $ft = delete($file_transfer_data_by_id->{$id});
757 # FIXME: End transfer correctly.
758 if ($ft->{'socks'}) {
759 $ft->{'socks'}->close();
762 return;
765 sub _ft_send_complete {
766 my ($id) = @_;
767 debug("_ft_send_complete($id) called");
769 my $ft = delete($file_transfer_data_by_id->{$id});
770 # FIXME: End transfer correctly.
771 $ft->{'socks'}->close();
773 return;
776 sub _ft_read {
777 my ($id, $size) = @_;
778 debug("_ft_read($id, $size) called");
780 my $ft = $file_transfer_data_by_id->{$id};
781 if (! $ft) {
782 debug("No such FT: $id");
783 return ('', 0);
786 my $data = '';
787 my $need_cancel = 0;
789 # Take from leftover_data, if any.
790 if ($ft->{'leftover_data'}) {
791 $data = $ft->{'leftover_data'};
792 $ft->{'leftover_data'} = '';
793 if (bytes::length($data) > $size) {
794 my $rest = substr($data, $size);
795 $ft->{'leftover_data'} = $rest;
796 $data = substr($data, 0, $size);
798 $size -= bytes::length($data);
801 # Read remainder of chunk
802 if ($size && $ft->{'data_reader'}) {
803 log("FT $ft->{id}: about to read.");
805 my ($eof, $error, $chunk) = $ft->{'data_reader'}->($size);
806 if (defined($chunk) && bytes::length($chunk) > 0) {
807 log("FT $ft->{id}: read " . bytes::length($chunk) . ' bytes.');
808 $data .= $chunk;
810 elsif ($eof) {
811 log("FT $ft->{id}: Reading done.");
812 $ft->{'data_reader'} = undef;
813 $ft->socks_ended();
815 elsif ($error) {
816 if ($!{'EAGAIN'}) {
817 log("FT $ft->{id}: underflow; waiting for SOCKS data.");
819 else {
820 log("FT $ft->{id}: Read error: $!");
821 $ft->{'data_reader'} = undef;
822 $need_cancel = 1;
826 elsif (! $ft->{'data_reader'}) {
827 log("FT $ft->{id}: nowhere to read!");
828 if ($ft->{'socks_ended'}) {
829 $need_cancel = 1;
833 if ($need_cancel) {
834 THPPW::thrasher_action_ft_cancel_local($ft->{'id'});
835 # If read() returns any data after canceling, do_transfer() in
836 # libpurple/ft.c writes it anyway, potentially triggering
837 # another cancel.
839 # An xfer must *never* be canceled twice! Some prpls
840 # (currently at least jabber, msn, and yahoo) aren't
841 # double-cancel safe (e.g. null pointer deference when
842 # jabber_si_xfer_cancel_send() is called twice).
844 # This cancel should probably be moved out of read() entirely.
845 return ('', 0);
847 else {
848 return ($data, bytes::length($data));
852 sub _ft_data_not_sent {
853 my ($id, $rest) = @_;
854 my $rest_sz = bytes::length($rest);
855 debug("_ft_data_not_sent($id) called with $rest_sz bytes.");
857 my $ft = $file_transfer_data_by_id->{$id};
858 $ft->{'leftover_data'} .= $rest;
860 return;
863 sub _ft_write {
864 my ($id, $data) = @_;
865 debug("_ft_write($id) called with " . bytes::length($data) . ' bytes.');
867 my $ft = $file_transfer_data_by_id->{$id};
868 if (! $ft) {
869 debug("No such FT: $id");
870 return 0;
873 my $written = $ft->write($data);
874 if ($written == 0) {
875 $ft->add_fd_watch();
877 elsif ($written < 0) {
878 THPPW::thrasher_action_ft_cancel_local($id);
880 return $written;
885 package Thrasher::Plugin::ProxyFileTransfer::FT;
886 use strict;
887 use warnings;
889 use List::Util qw(max);
891 use Thrasher::Log qw(:all);
893 # Data members:
894 # * "id": Thrasher's file id as returned by thrasher_wrapper_send_file.
895 # * "leftover_data": bytestring left by data_not_sent.
896 # * "socks": Thrasher::Plugin::Socks object connected to streamhost.
897 # * "socks_ended": Set to true after the SOCKS connection is finished
898 # and closed but more events are expected. Distinguishes from the
899 # state where the SOCKS reader/writer is not yet setup but events
900 # are happening.
902 # Data members for sending files:
903 # * "data_reader": if defined, is a function that will
904 # actually do the read. If undef, the stream is done.
905 # $data_reader->($size) returns ($eof, $error, $chunk).
907 # Data members for receiving files:
908 # * "sid": Stream ID.
909 # * "target_full_jid": Recipient's JID.
910 # * "initiator_legacy_jid": legacy JID of sender.
911 # * "write_file": FD to write data to. If undef, the stream is done.
913 sub new {
914 my $class = shift;
916 my $self = {
917 'data_reader' => undef,
918 'write_file' => undef,
919 'data' => [],
922 bless($self, $class);
923 return $self;
926 sub add_fd_watch {
927 my ($self, $callback) = @_;
929 if ($self->{'socks'}->{'watch_id'}) {
930 # Only add one default FD watch at a time so remove_fd_watch() works.
931 # debug("FT $self->{id}: FD watch already added.");
932 return;
935 # When sending a file and the public IM side cancels, this watch
936 # might fire and generate warnings before libpurple figures out
937 # the cancel event.
938 my $protocol = $Thrasher::Component::COMPONENT->{'protocol'};
939 $callback ||= sub {
940 # Logs a LOT.
941 # log("FT $self->{id}: sending local_ready.");
942 return $protocol->ft_local_ready($self->{'id'});
944 my $directions;
945 if ($self->{'data_reader'}) {
946 $directions = $Thrasher::EventLoop::IN;
948 elsif ($self->{'write_file'}) {
949 $directions = $Thrasher::EventLoop::OUT;
951 else {
952 log("FT $self->{id}: Neither a reader nor a writer be?!!");
953 return;
955 # Thrasher must add its own watcher to poke libpurple's
956 # do_transfer. Would be nice if the UI could say "I know I
957 # underflowed this time, but go back to managing the watcher,
958 # please" because if ui_read underflows even once transfer_cb
959 # will be removed and there's no way to set it back.
960 $self->{'socks'}->add_fd_watch($directions, $callback);
963 sub socks_ended {
964 my ($self) = @_;
966 $self->{'socks_ended'} = 1;
967 # Don't close the SOCKS socket yet as that would take away its FD watch.
970 # $ft->write($data_bytes)
972 # Returns:
973 # * > 0 if that number of bytes were actually written successfully.
974 # * == 0 if a temporary condition resulting in no data being written this time.
975 # * < 0 if a permanent error occurred and this FT will never work data again.
976 sub write {
977 my ($self, $data) = @_;
978 my $write_sz = bytes::length($data);
980 my $leftover_sz = bytes::length($self->{'leftover_data'});
981 if ($leftover_sz) {
982 $data = $self->{'leftover_data'} . $data;
983 $self->{'leftover_data'} = '';
984 $write_sz += $leftover_sz;
985 debug("FT $self->{id}: Adding $leftover_sz leftover bytes to write.");
988 my $written = 0;
989 if ($write_sz && $self->{'write_file'}) {
990 # Must do own buffering here, anyway, so might as well syswrite().
991 $written = syswrite($self->{'write_file'}, $data);
992 if (! defined($written)) {
993 $written = 0;
994 if (! $!{'EAGAIN'}) {
995 log("FT $self->{id}: Write error: $!");
996 $self->{'leftover_data'} = undef;
997 $self->{'write_file'} = undef;
998 return -1;
1002 elsif (! $self->{'write_file'}) {
1003 log("FT $self->{id}: nowhere to write!");
1004 if ($self->{'socks_ended'}) {
1005 return -1;
1009 if ($written < $write_sz) {
1010 my $rest = substr($data, $written);
1011 log("FT $self->{id}: "
1012 . bytes::length($rest) . ' bytes yet unwritten.');
1013 $self->{'leftover_data'} = $rest;
1016 # Return the amount written from this request, not including leftovers used.
1017 my $ret = max($written - $leftover_sz,
1019 return $ret;