Strip some control commands parameters, as we don't take
[gsh.git] / gsh / pity.py
blob764fa28d56d7e85c9944ec16ac4919e2661c6a2a
1 #!/usr/bin/env python
2 # This program is free software; you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation; either version 2 of the License, or
5 # (at your option) any later version.
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU Library General Public License for more details.
12 # You should have received a copy of the GNU General Public License
13 # along with this program; if not, write to the Free Software
14 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
16 # See the COPYING file for license information.
18 # Copyright (c) 2007, 2008 Guillaume Chazarain <guichaz@gmail.com>
20 # This file should remain compatible with python-1.5.2
23 import os
24 import popen2
25 import signal
26 import socket
27 import string
28 import sys
29 import termios
30 import time
31 from threading import Event, Thread
32 from Queue import Queue
34 # Somewhat protect the stdin, be sure we read what has been sent by gsh, and
35 # not some garbage entered by the user.
36 STDIN_PREFIX = '!?#%!'
38 UNITS = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB']
40 def human_unit(size):
41 """Return a string of the form '12.34 MiB' given a size in bytes."""
42 for i in xrange(len(UNITS) - 1, 0, -1):
43 base = 2.0 ** (10 * i)
44 if 2 * base < size:
45 return '%.2f %s' % ((float(size) / base), UNITS[i])
46 return str(size) + ' ' + UNITS[0]
49 def rstrip_char(string, char):
50 while string and string[-1] == char:
51 string = string[:-1]
52 return string
54 class bandwidth_monitor(Thread):
55 def __init__(self):
56 Thread.__init__(self)
57 self.setDaemon(1)
58 self.main_done = Event()
59 self.size = 0
60 self.start()
62 def add_transferred_size(self, size):
63 self.size = self.size + size
65 def finish(self):
66 self.main_done.set()
67 self.join()
69 def run(self):
70 previous_size = 0
71 previous_sampling_time = time.time()
72 previous_bandwidth = 0
73 while not self.main_done.isSet():
74 current_size = self.size
75 current_sampling_time = time.time()
76 current_bandwidth = (current_size - previous_size) / \
77 (current_sampling_time - previous_sampling_time)
78 current_bandwidth = (2*current_bandwidth + previous_bandwidth) / 3.0
79 if current_bandwidth < 1:
80 current_bandwidth = 0
81 print '%s transferred at %s/s' % (human_unit(current_size),
82 human_unit(current_bandwidth))
83 previous_size = current_size
84 previous_sampling_time = current_sampling_time
85 previous_bandwidth = current_bandwidth
86 self.main_done.wait(1.0)
87 print 'Done transferring %d bytes' % (self.size)
89 def write_fully(fd, data):
90 while data:
91 written = os.write(fd, data)
92 data = data[written:]
94 MAX_QUEUE_SIZE = 256 * 1024 * 1024
95 MAX_QUEUE_ITEM_SIZE = 8 * 1024
97 def forward(input_file, output_files, bandwidth=0):
98 if bandwidth:
99 bw = bandwidth_monitor()
101 input_fd = input_file.fileno()
102 output_fds = []
103 for output_file in output_files:
104 output_fds.append(output_file.fileno())
106 while 1:
107 data = os.read(input_fd, MAX_QUEUE_ITEM_SIZE)
108 if not data:
109 break
110 if bandwidth:
111 bw.add_transferred_size(len(data))
112 for output_fd in output_fds:
113 write_fully(output_fd, data)
115 if bandwidth:
116 bw.finish()
118 input_file.close()
119 for output_file in output_files:
120 output_file.close()
122 def init_listening_socket(gsh_prefix):
123 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
124 s.bind(('', 0))
125 s.listen(5)
126 host = socket.gethostname()
127 port = s.getsockname()[1]
128 prefix = string.join(gsh_prefix, '')
129 print '%s%s:%s' % (prefix, host, port)
130 return s
132 def read_line():
133 line = ''
134 while 1:
135 c = os.read(sys.stdin.fileno(), 1)
136 if c == '\n':
137 break
138 line = line + c
139 if len(line) > 1024:
140 print 'Received input is too large'
141 sys.exit(1)
142 return line
144 def get_destination():
145 fd = sys.stdin.fileno()
146 old_settings = termios.tcgetattr(fd)
147 new_settings = termios.tcgetattr(fd)
148 new_settings[3] = new_settings[3] & ~2 # 3:lflags 2:ICANON
149 new_settings[6][6] = '\000' # Set VMIN to zero for lookahead only
150 termios.tcsetattr(fd, 1, new_settings) # 1:TCSADRAIN
151 while 1:
152 line = read_line()
153 start = string.find(line, STDIN_PREFIX)
154 if start >= 0:
155 line = line[start + len(STDIN_PREFIX):]
156 break
158 termios.tcsetattr(fd, 1, old_settings) # 1:TCSADRAIN
159 split = string.split(line, ':', 1)
160 host = split[0]
161 port = int(split[1])
162 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
163 s.connect((host, port))
164 return s.makefile('r+b')
166 def shell_quote(s):
167 return "'" + string.replace(s, "'", "'\\''") + "'"
169 def do_send(path):
170 split = os.path.split(rstrip_char(path, '/'))
171 dirname, basename = split
172 if dirname:
173 os.chdir(dirname)
174 if not basename:
175 basename = '/'
176 stdout, stdin = popen2.popen2('tar c %s' % shell_quote(basename))
177 stdin.close()
178 forward(stdout, [get_destination()])
180 def do_forward(gsh_prefix):
181 listening_socket = init_listening_socket(gsh_prefix)
182 stdout, stdin = popen2.popen2('tar x')
183 stdout.close()
184 conn, addr = listening_socket.accept()
185 forward(conn.makefile(), [get_destination(), stdin])
187 def do_receive(gsh_prefix):
188 listening_socket = init_listening_socket(gsh_prefix)
189 stdout, stdin = popen2.popen2('tar x')
190 stdout.close()
191 conn, addr = listening_socket.accept()
192 # Only the last item in the chain displays the progress information
193 # as it should be the last one to finish.
194 forward(conn.makefile(), [stdin], bandwidth=1)
196 # Usage:
198 # pity.py send PATH
199 # => reads host:port on stdin
201 # pity.py forward [GSH1...]
202 # => reads host:port on stdin and prints listening host:port on stdout
203 # prefixed by GSH1...
205 # pity.py receive [GSH1...]
206 # => prints listening host:port on stdout prefixed by GSH1...
208 def main():
209 signal.signal(signal.SIGINT, lambda sig, frame: os.kill(0, signal.SIGKILL))
210 cmd = sys.argv[1]
211 try:
212 if cmd == 'send' and len(sys.argv) >= 3:
213 do_send(sys.argv[2])
214 elif cmd == 'forward' and len(sys.argv) >= 2:
215 do_forward(sys.argv[2:])
216 elif cmd == 'receive' and len(sys.argv) >= 2:
217 do_receive(sys.argv[2:])
218 else:
219 print 'Unknown command:', sys.argv
220 sys.exit(1)
221 except OSError, e:
222 print e
223 sys.exit(1)
225 if __name__ == '__main__':
226 main()