1 package Thrasher
::Plugin
::ProxyFileTransfer
;
10 Thrasher::Plugin::ProxyFileTransfer - using a XEP-0065-compliant
11 socket proxy, transfer files to and from the legacy service.
15 This module provides as much code as possible to implement file
16 transfer to and from the legacy service, but legacy Protocols will
17 still need to implement a little bit of stuff to hook everything
18 together. This will be documented at some point, probably when
21 Note that this does not permit direct connections; it always uses a
22 proxy. This works out well for what the authors need. If you need to
23 permit direct connections, please feel free to submit a patch; it's
26 Unfortunately, this is a very complicated interaction. If you have
27 any interest in working on this, I strongly recommend popping open
28 something with an XML console (like Psi) and looking at all the XML
29 flying around, rather than trying to figure out what is going
30 on by reading this code. This is event-based code at its worst.
34 # FIXME: Use bytestream query to see if remote client can support file
36 # FIXME: Verify the account is online when sending a file.
38 use Thrasher
::Callbacks
qw(:all);
39 use Thrasher
::Log
qw(:all);
40 use Thrasher
::Constants
qw(:all);
41 use Thrasher
::XML
qw(:all);
42 use Thrasher
::Plugin
qw(:all);
44 use Digest
::SHA1
qw(sha1_hex);
46 use Thrasher
::Plugin
::Socks
;
50 # FIXME: It is possible to fire discovery before the proxy is
51 # configured. If someone wants to transfer files and no proxy
52 # was found, try again before giving up.
53 # undef => not yet discovered
54 # 0 => discovery failed; there is no XEP-0065 proxy
55 my $proxy_server_jid = undef;
56 my $proxy_server_port = undef;
57 my $proxy_server_host = undef;
59 # This is keyed by stream id
60 my $file_transfer_data_by_sid = {};
62 # * Key: Thrasher's file id as returned by thrasher_wrapper_send_file.
63 # * Value: Thrasher::Plugin::ProxyFileTransfer::FT object.
64 my $file_transfer_data_by_id = {};
66 # When the server completes the basic discovery sweep it does,
67 # finish discovering the proxy's information.
68 do_when
('server_discovery_items',
71 for my $item (keys %{$Thrasher::SERVER_INFO
}) {
72 my ($identities, undef) =
73 @
{$Thrasher::SERVER_INFO
->{$item}};
74 for my $identity (@
$identities) {
75 my ($type, $category, $name) = @
$identity;
76 if ($type eq 'bytestreams' &&
77 $category eq 'proxy') {
78 $proxy_server_jid = $item;
79 succeeded
("proxy_server_found");
85 if (!defined($proxy_server_jid)) {
86 log("ERROR: No XEP-0065-compliant SOCKS5 proxy "
87 ."service found on the host server. File "
88 ."transfers WILL NOT WORK.");
89 $proxy_server_jid = 0;
90 failed
("proxy_server_found");
92 log("Proxy server for file transfers found: ".
93 "$proxy_server_jid; querying for IP");
94 $Thrasher::Component
::COMPONENT
->iq_query
95 ([[$NS_COMPONENT, 'iq'],
96 {to
=> $proxy_server_jid,
97 from
=> $Thrasher::Component
::COMPONENT
->{component_name
},
99 [[[$NS_BYTESTREAMS, 'query'], {}, []]]],
101 my $component = shift;
102 my $iq_params = shift;
103 my $iq_packet = shift;
105 if ($iq_params->{type
} eq 'error') {
106 log("ERROR: Bytestreams proxy won't"
107 ." tell us the IP it is located on."
108 ." File transfers WILL NOT WORK. "
109 ."You may need to tweak permissions "
111 .$Thrasher::Component
::COMPONENT
->{component_name
}
113 $proxy_server_jid . ".");
117 # Note that while this query is permitted
118 # to return a zeroconf value, we don't
119 # support that at this time.
124 $streamhost = extract
129 {'{}host' => save
('host'),
130 '{}port' => save
('port')},
132 $iq_params->{query
});
139 $proxy_server_port = $streamhost->{port
};
140 $proxy_server_host = $streamhost->{host
};
141 log("Proxy server fully located, file "
142 . "transfers should work.");
143 succeeded
('xep-0065_proxy_server_located');
144 _initialize_the_wrapper
();
147 log("Error: Proxy server will not "
148 . "identify the ip/port it is "
149 . "on. File transfers WILL NOT "
157 log("ERROR: No XEP-0065-compliant SOCKS5 proxy service "
158 ."found on the host server, because service discovery "
159 ."against the server failed (which is really weird!). "
160 ."File transfers WILL NOT WORK.");
161 $proxy_server_jid = 0;
162 failed
("proxy_server_found");
165 register_plugin
({client_iq_handlers
=>
167 {set
=> \
&handle_set_bytestreams
},
169 {set
=> \
&handle_set_file_transfer
},
170 $NS_STREAM_INITIATION =>
171 {set
=> \
&handle_set_stream_initiation
}
173 features
=> [$NS_BYTESTREAMS,
174 $NS_STREAM_INITIATION,
175 $NS_FILE_TRANSFER]});
177 #<iq type="set" to="profgilzot@aim.transport" id="ab06a" >
178 #<query xmlns="http://jabber.org/protocol/bytestreams" mode="tcp" sid="s5b_5666fd39c31ac262" >
179 #<streamhost port="7777" host="10.2.68.46" jid="proxy65.imtest.barracuda.com" >
180 #<proxy xmlns="http://affinix.com/jabber/stream"/>
182 #<fast xmlns="http://affinix.com/jabber/stream"/>
186 sub handle_set_bytestreams
{
187 my $component = shift;
188 my $iq_params = shift;
189 my $iq_packet = shift;
191 if (!$proxy_server_jid ||
192 !$proxy_server_port ||
193 !$proxy_server_host) {
194 log('Cannot set bytestreams; no proxy server yet known.');
195 $component->iq_error($iq_params, 'service_unavailable');
199 my $query = $iq_params->{query
};
201 my $stream_id = $query->[1]->{'{}sid'};
202 if (!defined($stream_id)) {
203 log("While attempting to deal with bytestream negotiation, "
204 ."got a packet with no sid.");
205 $component->iq_error($iq_params, 'bad_request');
209 my $file_data = delete $file_transfer_data_by_sid->{$stream_id};
210 if (!defined($file_data)) {
211 log("While attempting to find the file data, couldn't "
212 ."find data about stream id $stream_id.");
213 $component->iq_error($iq_params, 'bad_request');
217 # We are looking for a streamhost that isn't using zeroconf
218 my $acceptable_streamhosts = [];
220 for my $child (@
{$iq_params->{query
}->[2]}) {
222 $child->[0]->[1] eq 'streamhost') {
223 my $atts = $child->[1];
224 if ($atts->{'{}zeroconf'}) {
228 if ((my $jid = $atts->{'{}jid'}) &&
229 (my $host = $atts->{'{}host'}) &&
230 (my $port = $atts->{'{}port'})) {
231 push @
$acceptable_streamhosts,
239 if (!@
$acceptable_streamhosts) {
240 log("Could not negotiate an acceptable streamhost.");
241 $component->iq_error($iq_params, 'not_acceptable');
245 # FIXME: We really ought to try these in sequence. However,
246 # I find myself wondering how many clients actually ship
247 # out more than one possibility.
248 $file_transfer_data_by_sid->{streamhosts
} = $acceptable_streamhosts;
250 my $streamhost = $file_transfer_data_by_sid->{streamhosts
}->[0];
252 log("Streamhost chosen: " . Dumper
($streamhost));
254 my $host = sha1_hex
($stream_id .
258 my $ft = Thrasher
::Plugin
::ProxyFileTransfer
::FT
->new();
260 # Unfortunately, the raw TCP connection seems to be
261 # synchronous and I don't see how to make it
262 # unsynchronous. Fortunately, it should be local anyhow.
263 $ft->{'socks'} = Thrasher
::Plugin
::Socks
->new(
264 ProxyAddr
=> $streamhost->{'host'},
265 ProxyPort
=> $streamhost->{'port'},
266 ConnectAddr
=> $host,
268 EventLoop
=> $component->{'event_loop'},
269 SuccessCallback
=> sub {
270 my ($data_read_sub) = @_;
271 log('Got SOCKS data reader.');
272 $ft->{'data_reader'} = $data_read_sub;
274 FailureCallback
=> sub { log('Proxy file transfer failure.'); },
277 if (!defined($ft->{'socks'})) {
278 # FIXME: Try any other streams offered.
279 $component->iq_error($iq_params, 'item_not_found');
285 [[$NS_BYTESTREAMS, 'query'], {},
286 [[[$NS_BYTESTREAMS, 'streamhost-used'],
287 {jid
=> $streamhost->{jid
}},
291 $component->xmpp_name_to_legacy($iq_params->{from
},
294 log("About to initiate purple-side file transfer:");
295 my $file_transfer_id = THPPW
::thrasher_wrapper_send_file
(
296 strip_resource
($iq_params->{'from'}),
298 $file_data->{'filename'},
299 $file_data->{'size'},
300 $file_data->{'desc'},
302 if (! $file_transfer_id) {
303 log('Failed to set up libpurple xfer!');
304 $component->iq_error($iq_params, 'service_unavailable');
307 log("Purple-side init for $file_transfer_id done.");
309 $file_transfer_data_by_id->{$file_transfer_id} = $ft;
310 $ft->{'id'} = $file_transfer_id;
312 $ft->{'socks'}->connect();
315 sub handle_file_transfer
{
316 my $component = shift;
317 my $iq_params = shift;
318 my $iq_packet = shift;
320 log("Got a request for a file transfer.");
321 $component->iq_error($iq_params, 'service_unavailable');
324 sub handle_set_stream_initiation
{
325 my $component = shift;
326 my $iq_params = shift;
327 my $iq_packet = shift;
329 log("Got a request for a stream initiation.");
331 # Extract the information for the <file>.
332 my $file_results = recursive_extract
333 ($iq_params->{query
},
335 # Extract the <file> tag and save the require params
336 [undef, {'{}id' => save
('stream_id')},
337 save_match
('rec', [[$NS_FILE_TRANSFER, 'file'],
338 {'{}name' => save
('filename'),
339 '{}size' => save
('size'),
340 '{}date' => save
('date', 1),
341 '{}hash' => save
('hash', 1)},
344 # go looking for the description.
345 # Note we don't implement <range> support right now.
347 save_match
('rec', [[undef, 'desc'], undef,
348 save_sub
('desc', \
&text_extractor
)],
350 $file_results->{desc
} ||= [];
351 $file_results->{desc
} = join '', @
{$file_results->{desc
}};
353 # Extract the feature negotiation information.
354 my $negotiation_results = recursive_extract
355 ($iq_params->{query
},
358 save_match
('rec', [[$NS_FEATURE_NEG, 'feature']])],
360 [undef, undef, save_match
('rec', [[$NS_DATA, 'x']])],
362 [undef, undef, save_match
('rec', [[$NS_DATA, 'field'],
363 {type
=> 'list-single',
367 # For each option, extract the possible values
372 # "If this is an option, extract the
373 # value's text element."
374 if (ref($tag) && $tag->[0]->[1] eq 'option') {
375 for my $child (@
{$tag->[2]}) {
377 $child->[0]->[1] eq 'value') {
378 return join '', @
{$child->[2]};
387 # Validate the basic information we are looking for
389 if (!exists($file_results->{filename
})) {
390 log("Offered file transfer to legacy user, but no filename given.");
393 if (!exists($file_results->{size
})) {
394 log("Offered file transfer to legacy user, but no size given.");
397 if (!$negotiation_results->{options
}) {
398 log("Offered file transfer to legacy user, but no feature "
399 ."negotiations found.");
404 $component->iq_error($iq_params, 'bad_request');
408 # Verify the remote server is offering bytestreams
409 my $bytestreams_offered = 0;
410 for my $option (@
{$negotiation_results->{options
}}) {
411 if ($option eq $NS_BYTESTREAMS) {
412 $bytestreams_offered = 1;
415 if ($bytestreams_offered == 0) {
416 log("Offered a file transfer to legacy user, but no "
417 ."bytestreams offer found.");
418 $component->iq_error($iq_params, 'bad_request');
421 $file_transfer_data_by_sid->{$file_results->{stream_id
}} = $file_results;
423 # If we've gotten to here, we accept the file transfer offer.
425 [[$NS_STREAM_INITIATION, 'si'], {},
426 [#[[$NS_FILE_TRANSFER, 'file'], {}, []],
427 [[$NS_FEATURE_NEG, 'feature'], {},
428 [[[$NS_DATA, 'x'], {type
=> 'submit'},
429 [[[$NS_DATA, 'field'], {var
=> 'stream-method'},
430 [[[$NS_DATA, 'value'], {}, [$NS_BYTESTREAMS]
432 $component->iq_reply($iq_params, $response);
435 sub _initialize_the_wrapper
{
436 THPPW
::thrasher_wrapper_ft_init
(
437 Thrasher
::error_wrap
("ft_recv_request", \
&_ft_recv_request
),
438 Thrasher
::error_wrap
("ft_recv_accept", \
&_ft_recv_accept
),
439 Thrasher
::error_wrap
("ft_recv_start", \
&_ft_recv_start
),
440 Thrasher
::error_wrap
("ft_recv_cancel", \
&_ft_recv_cancel
),
441 Thrasher
::error_wrap
("ft_recv_complete", \
&_ft_recv_complete
),
442 Thrasher
::error_wrap
("ft_send_accept", \
&_ft_send_accept
),
443 Thrasher
::error_wrap
("ft_send_start", \
&_ft_send_start
),
444 Thrasher
::error_wrap
("ft_send_cancel", \
&_ft_send_cancel
),
445 Thrasher
::error_wrap
("ft_send_complete", \
&_ft_send_complete
),
446 Thrasher
::error_wrap
("ft_read", \
&_ft_read
),
447 Thrasher
::error_wrap
("ft_write", \
&_ft_write
),
448 Thrasher
::error_wrap
("ft_data_not_sent", \
&_ft_data_not_sent
),
452 sub _ft_recv_request
{
453 my ($id, $jid, $legacy_name, $filename, $size) = @_;
454 debug
("_ft_recv_request($id, $jid, $legacy_name"
455 . ", $filename, $size) called");
457 $jid = strip_resource
($jid);
458 my $session = $Thrasher::Component
::COMPONENT
->session_for($jid);
459 my $stream_id = 's_' . $Thrasher::Component
::COMPONENT
->get_id();
460 my $legacy_jid = $Thrasher::Component
::COMPONENT
->legacy_name_to_xmpp(
464 my $ft = Thrasher
::Plugin
::ProxyFileTransfer
::FT
->new(
467 'target_full_jid' => $session->{'full_jid'},
468 'initiator_legacy_jid' => $legacy_jid,
470 $file_transfer_data_by_id->{$ft->{'id'}} = $ft;
472 # Refer to http://xmpp.org/extensions/xep-0065.html &c.
474 # FT setup stage 1: offer the file and get "declined" or feature
477 # <iq from='c%d@xmpp.transport' to='a@b/R' type='set'>
478 # <si xmlns='http://jabber.org/protocol/si' profile='http://jabber.org/protocol/si/profile/file-transfer' id='s_id937'>
479 # <file xmlns='http://jabber.org/protocol/si/profile/file-transfer' size='694' name='small.file'>
482 # <feature xmlns='http://jabber.org/protocol/feature-neg'>
483 # <x xmlns='jabber:x:data' type='form'>
484 # <field type='list-single' var='stream-method'>
486 # <value>http://jabber.org/protocol/bytestreams</value>
493 $Thrasher::Component
::COMPONENT
->iq_query(
494 [[$NS_COMPONENT, 'iq'], { 'to' => $ft->{'target_full_jid'},
495 'from' => $ft->{'initiator_legacy_jid'},
498 [[[$NS_STREAM_INITIATION, 'si'], { 'profile' => $NS_FILE_TRANSFER,
499 'id' => $stream_id, },
500 [[[$NS_FILE_TRANSFER, 'file'], { 'size' => $size,
501 'name' => $filename, },
502 [ [[$NS_FILE_TRANSFER, 'range'], {}, []] ]], # </file>
503 [[$NS_FEATURE_NEG, 'feature'], {},
504 [[[$NS_DATA, 'x'], { 'type' => 'form' }, [
505 [[$NS_DATA, 'field'], { 'type' => 'list-single',
506 'var' => 'stream-method', },
507 [[[$NS_DATA, 'option'], {},
508 [[[$NS_DATA, 'value'], {},
517 sub { _ft_recv_request2
($ft, @_); });
523 # * got error (including "Declined") => tell libpurple FT was refused.
524 # * bytestreams not available => tell libpurple FT was refused.
525 # * feature negotiation has bytestreams => send streamhost and wait
526 # for streamhost-used.
527 sub _ft_recv_request2
{
528 my ($ft, $component, $iq_params, $iq_packet) = @_;
530 if ($iq_params->{'type'} eq 'error') {
531 # including "Declined"
532 THPPW
::thrasher_action_ft_recv_request_respond
($ft->{'id'}, 0);
536 my $neg = recursive_extract
(
538 [undef, undef, save_match
('rec', [[$NS_FEATURE_NEG, 'feature']])],
539 [undef, undef, save_match
('rec', [[$NS_DATA, 'x']])],
541 save_sub
('stream-method',
544 # "If $tag is the stream-method field, extract
545 # the value's text element."
547 && $tag->[0]->[0] eq $NS_DATA
548 && $tag->[0]->[1] eq 'field') {
549 while (my ($attr, $val) = each(%{$tag->[1]})) {
550 if ($attr =~ /}var$/ && $val eq 'stream-method') {
551 for my $child (@
{$tag->[2]}) {
553 && $child->[0]->[1] eq 'value') {
554 return join('', @
{$child->[2]});
563 $neg->{'stream-method'} ||= [];
564 if (! grep(/^$NS_BYTESTREAMS/, @
{$neg->{'stream-method'}})) {
565 # If bytestreams aren't available, refuse to FT.
566 THPPW
::thrasher_action_ft_recv_request_respond
($ft->{'id'}, 0);
570 # Send streamhost and wait for streamhost-used
572 # <iq from='c%d@xmpp.transport' to='a@b/R' type='set'>
573 # <query xmlns="http://jabber.org/protocol/bytestreams" sid="s_id937" >
574 # <streamhost port="$proxy_server_port" host="$proxy_server_host" jid="$proxy_server_jid">
575 # <proxy xmlns="http://affinix.com/jabber/stream"/>
579 $Thrasher::Component
::COMPONENT
->iq_query(
580 [[$NS_COMPONENT, 'iq'], { 'to' => $ft->{'target_full_jid'},
581 'from' => $ft->{'initiator_legacy_jid'},
584 [[[$NS_BYTESTREAMS, 'query'], { 'sid' => $ft->{'sid'}, },
585 [[[$NS_BYTESTREAMS, 'streamhost'], { 'port' => $proxy_server_port,
586 'host' => $proxy_server_host,
587 'jid' => $proxy_server_jid,
592 sub { _ft_recv_request3
($ft, @_); });
598 # * got streamhost-used => open SOCKS5 connection to streamhost.
599 # * otherwise, or if TCP connection fails => tell libpurple FT was refused.
600 sub _ft_recv_request3
{
601 my ($ft, $component, $iq_params, $iq_packet) = @_;
603 my $used = extract
([undef, undef,
604 save_match
([[$NS_BYTESTREAMS, 'streamhost-used'],
605 { '{}jid' => save
('jid'), },
607 $iq_params->{query
});
608 if (! $used->{'jid'}) {
609 THPPW
::thrasher_action_ft_recv_request_respond
($ft->{'id'}, 0);
612 # TODO: Currently can only use the one discovered proxy, which was
613 # also the only streamhost _ft_recv_request2 offered. If any was
614 # picked it better be that one! Check it just in case.
615 if ($used->{'jid'} ne $proxy_server_jid) {
616 debug
("FT $ft->{id}: Wrong streamhost-used $used->{jid}");
617 THPPW
::thrasher_action_ft_recv_request_respond
($ft->{'id'}, 0);
620 log("FT $ft->{id}: TCP connecting.");
621 $ft->{'socks'} = Thrasher
::Plugin
::Socks
->new(
623 ProxyAddr
=> $proxy_server_host,
624 ProxyPort
=> $proxy_server_port,
625 ConnectAddr
=> sha1_hex
(join('',
627 $ft->{'initiator_legacy_jid'},
628 $ft->{'target_full_jid'})),
630 EventLoop
=> $component->{'event_loop'},
631 SuccessCallback
=> sub { _ft_recv_request4
($ft); },
632 FailureCallback
=> sub {
633 debug
("FT $ft->{id}: streamhost connection failed");
634 THPPW
::thrasher_action_ft_recv_request_respond
($ft->{'id'}, 0);
637 if (defined($ft->{'socks'})) {
638 log("FT $ft->{id}: SOCKS5 connecting.");
639 $ft->{'socks'}->connect();
642 log("FT $ft->{id}: TCP connection to SOCKS5 proxy failed.");
643 THPPW
::thrasher_action_ft_recv_request_respond
($ft->{'id'}, 0);
649 # SOCKS5 connection to streamhost succeeded. Activate streamhost.
650 # Upon result IQ, tell libpurple FT was accepted.
651 sub _ft_recv_request4
{
654 log("FT $ft->{id}: Got write socket from SOCKS.");
655 $ft->{'write_file'} = $ft->{'socks'}->{'socket'};
657 # <iq type="set" to="$used->{'jid'}" >
658 # <query xmlns="http://jabber.org/protocol/bytestreams" sid="s_id937">
659 # <activate>$ft->{'target_full_jid'}</activate>
662 $Thrasher::Component
::COMPONENT
->iq_query(
663 [[$NS_COMPONENT, 'iq'], { 'to' => $proxy_server_jid,
664 'from' => $ft->{'initiator_legacy_jid'},
667 [[[$NS_BYTESTREAMS, 'query'], { 'sid' => $ft->{'sid'}, },
668 [[[$NS_BYTESTREAMS, 'activate'], {},
669 [ $ft->{'target_full_jid'} ]], # </activate>
673 THPPW
::thrasher_action_ft_recv_request_respond
($ft->{'id'}, 1);
674 # Don't add the FD watch or send ui_ready yet--must wait
675 # for recv_start or write loop. If the public IM side
676 # isn't ready to read, FT gets canceled before it starts.
683 sub _ft_recv_accept
{
684 debug
("_ft_recv_accept called");
689 debug
("_ft_recv_start called");
693 sub _ft_recv_cancel
{
695 debug
("_ft_recv_cancel($id) called");
697 my $ft = delete($file_transfer_data_by_id->{$id});
698 # FIXME: End transfer correctly.
699 if ($ft->{'socks'}) {
700 $ft->{'socks'}->close();
706 sub _ft_recv_complete
{
708 debug
("_ft_recv_complete($id) called");
709 my $ft = $file_transfer_data_by_id->{$id};
711 # Ensure any buffered data drains into SOCKS eventually.
712 if ($ft->{'leftover_data'}) {
713 # Done with the FD watch that interacts with the protocol.
714 $ft->{'socks'}->remove_fd_watch();
715 # Write until leftovers are gone.
716 $ft->add_fd_watch(sub {
717 my $written = $ft->write('');
718 if (! $ft->{'leftover_data'} || $written < 0) {
719 # Completely written or a permanent error.
720 _ft_recv_complete
($id);
729 # else done with this side of the FT, too.
731 delete($file_transfer_data_by_id->{$id});
732 $ft->{'socks'}->close();
737 sub _ft_send_accept
{
738 debug
("_ft_send_accept called");
744 debug
("_ft_send_start($id) called");
746 my $ft = $file_transfer_data_by_id->{$id};
752 sub _ft_send_cancel
{
754 debug
("_ft_send_cancel($id) called");
756 my $ft = delete($file_transfer_data_by_id->{$id});
757 # FIXME: End transfer correctly.
758 if ($ft->{'socks'}) {
759 $ft->{'socks'}->close();
765 sub _ft_send_complete
{
767 debug
("_ft_send_complete($id) called");
769 my $ft = delete($file_transfer_data_by_id->{$id});
770 # FIXME: End transfer correctly.
771 $ft->{'socks'}->close();
777 my ($id, $size) = @_;
778 debug
("_ft_read($id, $size) called");
780 my $ft = $file_transfer_data_by_id->{$id};
782 debug
("No such FT: $id");
789 # Take from leftover_data, if any.
790 if ($ft->{'leftover_data'}) {
791 $data = $ft->{'leftover_data'};
792 $ft->{'leftover_data'} = '';
793 if (bytes
::length($data) > $size) {
794 my $rest = substr($data, $size);
795 $ft->{'leftover_data'} = $rest;
796 $data = substr($data, 0, $size);
798 $size -= bytes
::length($data);
801 # Read remainder of chunk
802 if ($size && $ft->{'data_reader'}) {
803 log("FT $ft->{id}: about to read.");
805 my ($eof, $error, $chunk) = $ft->{'data_reader'}->($size);
806 if (defined($chunk) && bytes
::length($chunk) > 0) {
807 log("FT $ft->{id}: read " . bytes
::length($chunk) . ' bytes.');
811 log("FT $ft->{id}: Reading done.");
812 $ft->{'data_reader'} = undef;
817 log("FT $ft->{id}: underflow; waiting for SOCKS data.");
820 log("FT $ft->{id}: Read error: $!");
821 $ft->{'data_reader'} = undef;
826 elsif (! $ft->{'data_reader'}) {
827 log("FT $ft->{id}: nowhere to read!");
828 if ($ft->{'socks_ended'}) {
834 THPPW
::thrasher_action_ft_cancel_local
($ft->{'id'});
835 # If read() returns any data after canceling, do_transfer() in
836 # libpurple/ft.c writes it anyway, potentially triggering
839 # An xfer must *never* be canceled twice! Some prpls
840 # (currently at least jabber, msn, and yahoo) aren't
841 # double-cancel safe (e.g. null pointer deference when
842 # jabber_si_xfer_cancel_send() is called twice).
844 # This cancel should probably be moved out of read() entirely.
848 return ($data, bytes
::length($data));
852 sub _ft_data_not_sent
{
853 my ($id, $rest) = @_;
854 my $rest_sz = bytes
::length($rest);
855 debug
("_ft_data_not_sent($id) called with $rest_sz bytes.");
857 my $ft = $file_transfer_data_by_id->{$id};
858 $ft->{'leftover_data'} .= $rest;
864 my ($id, $data) = @_;
865 debug
("_ft_write($id) called with " . bytes
::length($data) . ' bytes.');
867 my $ft = $file_transfer_data_by_id->{$id};
869 debug
("No such FT: $id");
873 my $written = $ft->write($data);
877 elsif ($written < 0) {
878 THPPW
::thrasher_action_ft_cancel_local
($id);
885 package Thrasher
::Plugin
::ProxyFileTransfer
::FT
;
889 use List
::Util
qw(max);
891 use Thrasher
::Log
qw(:all);
894 # * "id": Thrasher's file id as returned by thrasher_wrapper_send_file.
895 # * "leftover_data": bytestring left by data_not_sent.
896 # * "socks": Thrasher::Plugin::Socks object connected to streamhost.
897 # * "socks_ended": Set to true after the SOCKS connection is finished
898 # and closed but more events are expected. Distinguishes from the
899 # state where the SOCKS reader/writer is not yet setup but events
902 # Data members for sending files:
903 # * "data_reader": if defined, is a function that will
904 # actually do the read. If undef, the stream is done.
905 # $data_reader->($size) returns ($eof, $error, $chunk).
907 # Data members for receiving files:
908 # * "sid": Stream ID.
909 # * "target_full_jid": Recipient's JID.
910 # * "initiator_legacy_jid": legacy JID of sender.
911 # * "write_file": FD to write data to. If undef, the stream is done.
917 'data_reader' => undef,
918 'write_file' => undef,
922 bless($self, $class);
927 my ($self, $callback) = @_;
929 if ($self->{'socks'}->{'watch_id'}) {
930 # Only add one default FD watch at a time so remove_fd_watch() works.
931 # debug("FT $self->{id}: FD watch already added.");
935 # When sending a file and the public IM side cancels, this watch
936 # might fire and generate warnings before libpurple figures out
938 my $protocol = $Thrasher::Component
::COMPONENT
->{'protocol'};
941 # log("FT $self->{id}: sending local_ready.");
942 return $protocol->ft_local_ready($self->{'id'});
945 if ($self->{'data_reader'}) {
946 $directions = $Thrasher::EventLoop
::IN
;
948 elsif ($self->{'write_file'}) {
949 $directions = $Thrasher::EventLoop
::OUT
;
952 log("FT $self->{id}: Neither a reader nor a writer be?!!");
955 # Thrasher must add its own watcher to poke libpurple's
956 # do_transfer. Would be nice if the UI could say "I know I
957 # underflowed this time, but go back to managing the watcher,
958 # please" because if ui_read underflows even once transfer_cb
959 # will be removed and there's no way to set it back.
960 $self->{'socks'}->add_fd_watch($directions, $callback);
966 $self->{'socks_ended'} = 1;
967 # Don't close the SOCKS socket yet as that would take away its FD watch.
970 # $ft->write($data_bytes)
973 # * > 0 if that number of bytes were actually written successfully.
974 # * == 0 if a temporary condition resulting in no data being written this time.
975 # * < 0 if a permanent error occurred and this FT will never work data again.
977 my ($self, $data) = @_;
978 my $write_sz = bytes
::length($data);
980 my $leftover_sz = bytes
::length($self->{'leftover_data'});
982 $data = $self->{'leftover_data'} . $data;
983 $self->{'leftover_data'} = '';
984 $write_sz += $leftover_sz;
985 debug
("FT $self->{id}: Adding $leftover_sz leftover bytes to write.");
989 if ($write_sz && $self->{'write_file'}) {
990 # Must do own buffering here, anyway, so might as well syswrite().
991 $written = syswrite($self->{'write_file'}, $data);
992 if (! defined($written)) {
994 if (! $!{'EAGAIN'}) {
995 log("FT $self->{id}: Write error: $!");
996 $self->{'leftover_data'} = undef;
997 $self->{'write_file'} = undef;
1002 elsif (! $self->{'write_file'}) {
1003 log("FT $self->{id}: nowhere to write!");
1004 if ($self->{'socks_ended'}) {
1009 if ($written < $write_sz) {
1010 my $rest = substr($data, $written);
1011 log("FT $self->{id}: "
1012 . bytes
::length($rest) . ' bytes yet unwritten.');
1013 $self->{'leftover_data'} = $rest;
1016 # Return the amount written from this request, not including leftovers used.
1017 my $ret = max
($written - $leftover_sz,