2 # This program is free software; you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation; either version 2 of the License, or
5 # (at your option) any later version.
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU Library General Public License for more details.
12 # You should have received a copy of the GNU General Public License
13 # along with this program; if not, write to the Free Software
14 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
16 # See the COPYING file for license information.
18 # Copyright (c) 2007, 2008 Guillaume Chazarain <guichaz@gmail.com>
20 # This file should remain compatible with python-1.5.2
30 from threading
import Event
, Thread
31 from Queue
import Queue
33 # Somewhat protect the stdin, be sure we read what has been sent by gsh, and
34 # not some garbage entered by the user.
35 STDIN_PREFIX
= '!?#%!'
37 UNITS
= ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB']
39 def long_to_str(number
):
46 """Return a string of the form '12.34 MiB' given a size in bytes."""
47 for i
in xrange(len(UNITS
) - 1, 0, -1):
48 base
= 2.0 ** (10 * i
)
50 return '%.2f %s' % ((float(size
) / base
), UNITS
[i
])
51 return long_to_str(size
) + ' ' + UNITS
[0]
54 def rstrip_char(string
, char
):
55 while string
and string
[-1] == char
:
59 class bandwidth_monitor(Thread
):
63 self
.main_done
= Event()
67 def add_transferred_size(self
, size
):
68 self
.size
= self
.size
+ size
76 previous_sampling_time
= time
.time()
77 previous_bandwidth
= 0L
78 while not self
.main_done
.isSet():
79 current_size
= self
.size
80 current_sampling_time
= time
.time()
81 current_bandwidth
= (current_size
- previous_size
) / \
82 (current_sampling_time
- previous_sampling_time
)
83 current_bandwidth
= (2*current_bandwidth
+ previous_bandwidth
) / 3.0
84 if current_bandwidth
< 1:
85 current_bandwidth
= 0L
86 print '%s transferred at %s/s' % (human_unit(current_size
),
87 human_unit(current_bandwidth
))
88 previous_size
= current_size
89 previous_sampling_time
= current_sampling_time
90 previous_bandwidth
= current_bandwidth
91 self
.main_done
.wait(1.0)
92 print 'Done transferring %s bytes (%s)' % (long_to_str(self
.size
),
93 human_unit(self
.size
))
95 def write_fully(fd
, data
):
97 written
= os
.write(fd
, data
)
100 MAX_QUEUE_ITEM_SIZE
= 8 * 1024
102 def forward(input_file
, output_files
, bandwidth
=0):
104 bw
= bandwidth_monitor()
106 input_fd
= input_file
.fileno()
108 for output_file
in output_files
:
109 output_fds
.append(output_file
.fileno())
112 data
= os
.read(input_fd
, MAX_QUEUE_ITEM_SIZE
)
116 bw
.add_transferred_size(len(data
))
117 for output_fd
in output_fds
:
118 write_fully(output_fd
, data
)
124 for output_file
in output_files
:
127 def init_listening_socket(gsh_prefix
):
128 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
131 host
= socket
.gethostname()
132 port
= s
.getsockname()[1]
133 prefix
= string
.join(gsh_prefix
, '')
134 print '%s%s:%s' % (prefix
, host
, port
)
140 c
= os
.read(sys
.stdin
.fileno(), 1)
145 print 'Received input is too large'
149 def get_destination():
150 fd
= sys
.stdin
.fileno()
151 old_settings
= termios
.tcgetattr(fd
)
152 new_settings
= termios
.tcgetattr(fd
)
153 new_settings
[3] = new_settings
[3] & ~
2 # 3:lflags 2:ICANON
154 new_settings
[6][6] = '\000' # Set VMIN to zero for lookahead only
155 termios
.tcsetattr(fd
, 1, new_settings
) # 1:TCSADRAIN
158 start
= string
.find(line
, STDIN_PREFIX
)
160 line
= line
[start
+ len(STDIN_PREFIX
):]
163 termios
.tcsetattr(fd
, 1, old_settings
) # 1:TCSADRAIN
164 split
= string
.split(line
, ':', 1)
167 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
168 s
.connect((host
, port
))
169 return s
.makefile('r+b')
172 return "'" + string
.replace(s
, "'", "'\\''") + "'"
177 def pipe_to_process(cmdline
):
179 return popen2
.popen2(cmdline
)
181 def pipe_to_process(cmdline
):
182 p
= subprocess
.Popen([cmdline
],
184 stdin
=subprocess
.PIPE
,
185 stdout
=subprocess
.PIPE
,
188 return p
.stdout
, p
.stdin
191 split
= os
.path
.split(rstrip_char(path
, '/'))
192 dirname
, basename
= split
197 stdout
, stdin
= pipe_to_process('tar c %s' % shell_quote(basename
))
199 forward(stdout
, [get_destination()])
201 def do_forward(gsh_prefix
):
202 listening_socket
= init_listening_socket(gsh_prefix
)
203 stdout
, stdin
= pipe_to_process('tar x')
205 conn
, addr
= listening_socket
.accept()
206 forward(conn
.makefile(), [get_destination(), stdin
])
208 def do_receive(gsh_prefix
):
209 listening_socket
= init_listening_socket(gsh_prefix
)
210 stdout
, stdin
= pipe_to_process('tar x')
212 conn
, addr
= listening_socket
.accept()
213 # Only the last item in the chain displays the progress information
214 # as it should be the last one to finish.
215 forward(conn
.makefile(), [stdin
], bandwidth
=1)
220 # => reads host:port on stdin
222 # pity.py forward [GSH1...]
223 # => reads host:port on stdin and prints listening host:port on stdout
224 # prefixed by GSH1...
226 # pity.py receive [GSH1...]
227 # => prints listening host:port on stdout prefixed by GSH1...
230 signal
.signal(signal
.SIGINT
, lambda sig
, frame
: os
.kill(0, signal
.SIGKILL
))
233 if cmd
== 'send' and len(sys
.argv
) >= 3:
235 elif cmd
== 'forward' and len(sys
.argv
) >= 2:
236 do_forward(sys
.argv
[2:])
237 elif cmd
== 'receive' and len(sys
.argv
) >= 2:
238 do_receive(sys
.argv
[2:])
240 print 'Unknown command:', sys
.argv
246 if __name__
== '__main__':