2 # -*- coding: utf-8 -*-
9 sys
.path
.insert(0, "/usr/lib/")
10 sys
.path
.insert(0, "/lib/python2.6/site-packages/")
11 from libelliptics_python
import *
15 def __init__(self
, remotes
=[], group
=0, log
='/dev/stdout', mask
=8, own
=''):
16 self
.log
= elliptics_log_file(log
, mask
)
17 self
.n
= elliptics_node_python(self
.log
)
21 self
.n
.add_remote(r
[0], r
[1])
25 self
.own_group
= group
28 for r
in self
.n
.get_routes():
29 if r
[0].group_id
== group
:
32 if len(self
.routes
) == 0:
33 raise NameError("Route table for group " + str(group
) + " is empty")
35 self
.routes
.sort(key
= lambda x
: x
[0].id)
37 def eid_more(self
, e1
, e2
):
40 def destination(self
, eid
):
43 if self
.eid_more(eid
, r
[0]):
50 def merge(self
, inpath
='', outpath
=''):
51 input = eblob
.blob(inpath
)
57 outb
[r
[1]] = eblob
.blob(outpath
+ '-' + r
[1], data_mode
='ab', index_mode
='ab')
59 for id in input.iterate(want_removed
=False):
60 d
= self
.destination(elliptics_id(list(bytearray(id)), self
.own_group
, -1))
68 old_pos
= input.position
71 idata
, data
= input.get_data()
75 b
.dataf
.write(input.data
[eblob
.blob
.index_size
:])
77 input.position
= old_pos
81 if __name__
== '__main__':
82 # list of tuples of remote addresses to connect and grab route table
83 remotes
= [('elisto19f.dev', 1025)]
85 # when doing merge, only select addresses from this group
88 # when doing merge, do NOT get IDs which belong to this address
89 # it must be IPv4 address:port
90 # when setting it to obscure string, we force merge process to spread all IDs
91 # so that no single key will belong to the host of the input blob
92 # localhost is kind of such 'obscure string' - if remote nodes are not on the localhost,
93 # then there is no way we may have localaddr in route table (remember, this address
94 # is announced when node joins network, so remote nodes connect to it)
95 except_addr
= '127.0.0.1:1025'
97 # Path to blob to get objects from. Index file must be near with .index suffix
100 # output path - real output files will have '-addr:port' suffix added
101 # you may want to copy them to appropriate elliptics nodes
102 # Then on remote node you should rename this file to needed name
103 # (if you have data.3 file in elliptics blob dir, this may be data.4)
104 # and then restart ioserv, it will catch up new blobs and start serving requests
105 # Be careful with different columns - blobs with names like 'data-1'
106 # This tool does not know what column this data has, it will create output
107 # files with this prefix only and described above suffix
108 outdir
='/srv/data/new'
110 # please note that after merge process completed you should remove sorted index
111 # for appropriate blob, since it is not updated during merge iteration process,
112 # so local node will continue to 'think' that some records are not removed
113 # reading will fail, since it checks data and index headers,
114 # but still it is a performance issue
116 # maximum output blob size in bytes
117 blobsize
= 40*1024*1024*1024
119 for blobtype
in ['data', 'data-1']:
120 # list of blobtype blobs
121 inlist
= map(os
.path
.basename
, glob
.glob(indir
+ '/' + blobtype
+ '.*'))
122 # filter .index, data.stat, etc files
123 inlist
= filter(lambda x
: re
.match('^data(-[0-9]+)?\.[0-9]+$', x
), inlist
)
125 inlist
= sorted(inlist
, key
=lambda x
: int(x
.rsplit('.', 1)[1]))
129 for inblob
in inlist
:
130 inpath
= indir
+ '/' + inblob
133 # output path for blob
134 outpath
= outdir
+ '/' + blobtype
+ '.' + str(outblobnum
)
135 # list of already merged blogs
136 outlist
= glob
.glob(outpath
+ '-*')
137 # filter .index files
138 outlist
= filter(lambda x
: not x
.endswith('.index'), outlist
)
140 # ok if blobs not found or all blob sizes less than blobsize
141 if len(outlist
) < 1 or max(map(os
.path
.getsize
, outlist
)) < blobsize
:
147 print "Doing merge from %s to %s" % (inpath
, outpath
)
148 m
= merge(remotes
=remotes
, group
=want_group
, own
=except_addr
)
149 m
.merge(inpath
=inpath
, outpath
=outpath
)
150 except NameError as e
:
152 print "Processes completed:", e
153 print "Please remove %s now and restart elliptics (it will regenerate sorted index if needed)\n" %\
154 (inpath
+ '.index.sorted')