:export_rank has been renamed to :export_vars as it now also exports the
[gsh.git] / gsh / pity.py
blobdb6b15c7ff2e5e7dc8ae53eb2154268bf601ec22
1 #!/usr/bin/env python
2 # This program is free software; you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation; either version 2 of the License, or
5 # (at your option) any later version.
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU Library General Public License for more details.
12 # You should have received a copy of the GNU General Public License
13 # along with this program; if not, write to the Free Software
14 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
16 # See the COPYING file for license information.
18 # Copyright (c) 2007, 2008 Guillaume Chazarain <guichaz@gmail.com>
20 # This file should remain compatible with python-1.5.2
23 import os
24 import signal
25 import socket
26 import string
27 import sys
28 import termios
29 import time
30 from threading import Event, Thread
31 from Queue import Queue
33 # Somewhat protect the stdin, be sure we read what has been sent by gsh, and
34 # not some garbage entered by the user.
35 STDIN_PREFIX = '!?#%!'
37 UNITS = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB']
39 def long_to_str(number):
40 s = str(number)
41 if s[-1] == 'L':
42 s = s[:-1]
43 return s
45 def human_unit(size):
46 """Return a string of the form '12.34 MiB' given a size in bytes."""
47 for i in xrange(len(UNITS) - 1, 0, -1):
48 base = 2.0 ** (10 * i)
49 if 2 * base < size:
50 return '%.2f %s' % ((float(size) / base), UNITS[i])
51 return long_to_str(size) + ' ' + UNITS[0]
54 def rstrip_char(string, char):
55 while string and string[-1] == char:
56 string = string[:-1]
57 return string
59 class bandwidth_monitor(Thread):
60 def __init__(self):
61 Thread.__init__(self)
62 self.setDaemon(1)
63 self.main_done = Event()
64 self.size = 0L
65 self.start()
67 def add_transferred_size(self, size):
68 self.size = self.size + size
70 def finish(self):
71 self.main_done.set()
72 self.join()
74 def run(self):
75 previous_size = 0L
76 previous_sampling_time = time.time()
77 previous_bandwidth = 0L
78 while not self.main_done.isSet():
79 current_size = self.size
80 current_sampling_time = time.time()
81 current_bandwidth = (current_size - previous_size) / \
82 (current_sampling_time - previous_sampling_time)
83 current_bandwidth = (2*current_bandwidth + previous_bandwidth) / 3.0
84 if current_bandwidth < 1:
85 current_bandwidth = 0L
86 print '%s transferred at %s/s' % (human_unit(current_size),
87 human_unit(current_bandwidth))
88 previous_size = current_size
89 previous_sampling_time = current_sampling_time
90 previous_bandwidth = current_bandwidth
91 self.main_done.wait(1.0)
92 print 'Done transferring %s bytes (%s)' % (long_to_str(self.size),
93 human_unit(self.size))
95 def write_fully(fd, data):
96 while data:
97 written = os.write(fd, data)
98 data = data[written:]
100 MAX_QUEUE_ITEM_SIZE = 8 * 1024
102 def forward(input_file, output_files, bandwidth=0):
103 if bandwidth:
104 bw = bandwidth_monitor()
106 input_fd = input_file.fileno()
107 output_fds = []
108 for output_file in output_files:
109 output_fds.append(output_file.fileno())
111 while 1:
112 data = os.read(input_fd, MAX_QUEUE_ITEM_SIZE)
113 if not data:
114 break
115 if bandwidth:
116 bw.add_transferred_size(len(data))
117 for output_fd in output_fds:
118 write_fully(output_fd, data)
120 if bandwidth:
121 bw.finish()
123 input_file.close()
124 for output_file in output_files:
125 output_file.close()
127 def init_listening_socket(gsh_prefix):
128 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
129 s.bind(('', 0))
130 s.listen(5)
131 host = socket.gethostname()
132 port = s.getsockname()[1]
133 prefix = string.join(gsh_prefix, '')
134 print '%s%s:%s' % (prefix, host, port)
135 return s
137 def read_line():
138 line = ''
139 while 1:
140 c = os.read(sys.stdin.fileno(), 1)
141 if c == '\n':
142 break
143 line = line + c
144 if len(line) > 1024:
145 print 'Received input is too large'
146 sys.exit(1)
147 return line
149 def get_destination():
150 fd = sys.stdin.fileno()
151 old_settings = termios.tcgetattr(fd)
152 new_settings = termios.tcgetattr(fd)
153 new_settings[3] = new_settings[3] & ~2 # 3:lflags 2:ICANON
154 new_settings[6][6] = '\000' # Set VMIN to zero for lookahead only
155 termios.tcsetattr(fd, 1, new_settings) # 1:TCSADRAIN
156 while 1:
157 line = read_line()
158 start = string.find(line, STDIN_PREFIX)
159 if start >= 0:
160 line = line[start + len(STDIN_PREFIX):]
161 break
163 termios.tcsetattr(fd, 1, old_settings) # 1:TCSADRAIN
164 split = string.split(line, ':', 1)
165 host = split[0]
166 port = int(split[1])
167 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
168 s.connect((host, port))
169 return s.makefile('r+b')
171 def shell_quote(s):
172 return "'" + string.replace(s, "'", "'\\''") + "'"
174 try:
175 import subprocess
176 except ImportError:
177 def pipe_to_process(cmdline):
178 import popen2
179 return popen2.popen2(cmdline)
180 else:
181 def pipe_to_process(cmdline):
182 p = subprocess.Popen([cmdline],
183 shell=True,
184 stdin=subprocess.PIPE,
185 stdout=subprocess.PIPE,
186 close_fds=True)
188 return p.stdout, p.stdin
190 def do_send(path):
191 split = os.path.split(rstrip_char(path, '/'))
192 dirname, basename = split
193 if dirname:
194 os.chdir(dirname)
195 if not basename:
196 basename = '/'
197 stdout, stdin = pipe_to_process('tar c %s' % shell_quote(basename))
198 stdin.close()
199 forward(stdout, [get_destination()])
201 def do_forward(gsh_prefix):
202 listening_socket = init_listening_socket(gsh_prefix)
203 stdout, stdin = pipe_to_process('tar x')
204 stdout.close()
205 conn, addr = listening_socket.accept()
206 forward(conn.makefile(), [get_destination(), stdin])
208 def do_receive(gsh_prefix):
209 listening_socket = init_listening_socket(gsh_prefix)
210 stdout, stdin = pipe_to_process('tar x')
211 stdout.close()
212 conn, addr = listening_socket.accept()
213 # Only the last item in the chain displays the progress information
214 # as it should be the last one to finish.
215 forward(conn.makefile(), [stdin], bandwidth=1)
217 # Usage:
219 # pity.py send PATH
220 # => reads host:port on stdin
222 # pity.py forward [GSH1...]
223 # => reads host:port on stdin and prints listening host:port on stdout
224 # prefixed by GSH1...
226 # pity.py receive [GSH1...]
227 # => prints listening host:port on stdout prefixed by GSH1...
229 def main():
230 signal.signal(signal.SIGINT, lambda sig, frame: os.kill(0, signal.SIGKILL))
231 cmd = sys.argv[1]
232 try:
233 if cmd == 'send' and len(sys.argv) >= 3:
234 do_send(sys.argv[2])
235 elif cmd == 'forward' and len(sys.argv) >= 2:
236 do_forward(sys.argv[2:])
237 elif cmd == 'receive' and len(sys.argv) >= 2:
238 do_receive(sys.argv[2:])
239 else:
240 print 'Unknown command:', sys.argv
241 sys.exit(1)
242 except OSError, e:
243 print e
244 sys.exit(1)
246 if __name__ == '__main__':
247 main()