Elliptics version update: 2.19.2.7
[elliptics.git] / example / merge_iterator.py
blob23aafd1c5866d8918d146cdd7a04c28eb6187a4f
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
4 import sys
5 import glob
6 import os
7 import re
9 sys.path.insert(0, "/usr/lib/")
10 sys.path.insert(0, "/lib/python2.6/site-packages/")
11 from libelliptics_python import *
12 import eblob
14 class merge:
15 def __init__(self, remotes=[], group=0, log='/dev/stdout', mask=8, own=''):
16 self.log = elliptics_log_file(log, mask)
17 self.n = elliptics_node_python(self.log)
19 for r in remotes:
20 try:
21 self.n.add_remote(r[0], r[1])
22 except:
23 pass
25 self.own_group = group
26 self.own = own
27 self.routes = []
28 for r in self.n.get_routes():
29 if r[0].group_id == group:
30 self.routes.append(r)
32 if len(self.routes) == 0:
33 raise NameError("Route table for group " + str(group) + " is empty")
35 self.routes.sort(key = lambda x: x[0].id)
37 def eid_more(self, e1, e2):
38 return e1.id >= e2.id
40 def destination(self, eid):
41 ret = self.routes[-1]
42 for r in self.routes:
43 if self.eid_more(eid, r[0]):
44 ret = r
45 else:
46 break
48 return ret
50 def merge(self, inpath='', outpath=''):
51 input = eblob.blob(inpath)
53 outb = {}
54 for r in self.routes:
55 if r[1] != self.own:
56 if not r[1] in outb:
57 outb[r[1]] = eblob.blob(outpath + '-' + r[1], data_mode='ab', index_mode='ab')
59 for id in input.iterate(want_removed=False):
60 d = self.destination(elliptics_id(list(bytearray(id)), self.own_group, -1))
62 if d[1] != self.own:
63 b = outb[d[1]]
65 input.read_data()
67 pos = b.dataf.tell()
68 old_pos = input.position
69 input.position = pos
71 idata, data = input.get_data()
73 b.index.write(idata)
74 b.dataf.write(idata)
75 b.dataf.write(input.data[eblob.blob.index_size:])
77 input.position = old_pos
78 input.mark_removed()
79 input.update()
81 if __name__ == '__main__':
82 # list of tuples of remote addresses to connect and grab route table
83 remotes = [('elisto19f.dev', 1025)]
85 # when doing merge, only select addresses from this group
86 want_group = 1
88 # when doing merge, do NOT get IDs which belong to this address
89 # it must be IPv4 address:port
90 # when setting it to obscure string, we force merge process to spread all IDs
91 # so that no single key will belong to the host of the input blob
92 # localhost is kind of such 'obscure string' - if remote nodes are not on the localhost,
93 # then there is no way we may have localaddr in route table (remember, this address
94 # is announced when node joins network, so remote nodes connect to it)
95 except_addr = '127.0.0.1:1025'
97 # Path to blob to get objects from. Index file must be near with .index suffix
98 indir='/srv/data'
100 # output path - real output files will have '-addr:port' suffix added
101 # you may want to copy them to appropriate elliptics nodes
102 # Then on remote node you should rename this file to needed name
103 # (if you have data.3 file in elliptics blob dir, this may be data.4)
104 # and then restart ioserv, it will catch up new blobs and start serving requests
105 # Be careful with different columns - blobs with names like 'data-1'
106 # This tool does not know what column this data has, it will create output
107 # files with this prefix only and described above suffix
108 outdir='/srv/data/new'
110 # please note that after merge process completed you should remove sorted index
111 # for appropriate blob, since it is not updated during merge iteration process,
112 # so local node will continue to 'think' that some records are not removed
113 # reading will fail, since it checks data and index headers,
114 # but still it is a performance issue
116 # maximum output blob size in bytes
117 blobsize = 40*1024*1024*1024
119 for blobtype in ['data', 'data-1']:
120 # list of blobtype blobs
121 inlist = map(os.path.basename, glob.glob(indir + '/' + blobtype + '.*'))
122 # filter .index, data.stat, etc files
123 inlist = filter(lambda x: re.match('^data(-[0-9]+)?\.[0-9]+$', x), inlist)
124 # sort by blob id
125 inlist = sorted(inlist, key=lambda x: int(x.rsplit('.', 1)[1]))
127 # output blob id
128 outblobnum = 0
129 for inblob in inlist:
130 inpath = indir + '/' + inblob
132 while True:
133 # output path for blob
134 outpath = outdir + '/' + blobtype + '.' + str(outblobnum)
135 # list of already merged blogs
136 outlist = glob.glob(outpath + '-*')
137 # filter .index files
138 outlist = filter(lambda x: not x.endswith('.index'), outlist)
140 # ok if blobs not found or all blob sizes less than blobsize
141 if len(outlist) < 1 or max(map(os.path.getsize, outlist)) < blobsize:
142 break
144 outblobnum += 1
146 try:
147 print "Doing merge from %s to %s" % (inpath, outpath)
148 m = merge(remotes=remotes, group=want_group, own=except_addr)
149 m.merge(inpath=inpath, outpath=outpath)
150 except NameError as e:
151 del m
152 print "Processes completed:", e
153 print "Please remove %s now and restart elliptics (it will regenerate sorted index if needed)\n" %\
154 (inpath + '.index.sorted')