this is going to be magnificent if I can manage it
[QuestHelper.git] / compile_chain.lua
blob5d046af8999896e1435e7b13a12ef942df44bc1f
2 --[[
4 Okay let's think about this.
6 It takes the form of a series of chain blocks. Each chain block is a "manager" in some sort, in that it just represents the shape of the whole shebang. The actual implementation underlaying it doesn't really matter as much, so let's just build the framework first.
8 Parameters to a block consist only of the construction function for the type it's meant to generate, and the blocks it's meant to pass things to.
10 No need for inheritance, honestly.
13 --[[
15 LET'S THINK ABOUT MULTIPROCESSING
17 First off, sharding. Shard by key, hashing. We know how many destination systems we have, so we'll just shard according to that.
19 Okay here's the trivial version.
21 blockname_dstkey_srckey
23 Each block writes all the output that its block will have to the necessary output blocks. Once it's sure it's done, it . . . does what?
25 argh this is hurting my brain
29 OKAY
31 each block takes as input a series of keys, (k, sk, v)
33 as output, it dumps a set of keys, (k, sk, v, id)
35 these are filtered based on id to each block. a key may end up in zero, one, or many output blocks
37 those are split up by k
39 before processing, they are sorted by sk
44 Output files are:
46 (block)/(shard)/(key)_(writer_id)
48 We can just synchronize it from the controller - one block at a time. The blocks are DAGs, we can plug a Lua serialization library in and just append as we go (with some gzip? benchmark this)
50 Each step just drops into the directories for its block/kshard, then processes files in series - grab all the files for one k, process, continue.
52 I'm pretty sure this works, but I'm running out of hard drive space.
55 require("luarocks.require")
56 require("md5")
57 require("pluto")
58 require("gzio")
59 require("bit")
61 if not push_file_id then push_file_id = function () end end
62 if not pop_file_id then pop_file_id = function () end end
64 local nextProgressTime = 0
66 local function ProgressMessage(msg)
67 if os.time() > nextProgressTime then
68 nextProgressTime = os.time() + 1
69 print(msg)
70 end
71 end
74 local function persist_dump(dat)
75 local len = #dat
76 return string.char(bit.band(#dat, 0xff), bit.band(#dat, 0xff00) / 256, bit.band(#dat, 0xff0000) / 65536, bit.band(#dat, 0xff000000) / 16777216) .. dat
77 end
79 local function persist_split(dat, dest)
80 local cp = 1
81 while cp <= #dat do
82 assert(cp + 4 <= #dat)
83 local a, b, c, d = dat:byte(cp, cp + 3)
84 cp = cp + 4
85 local len = a + b * 256 + c * 65536 + d * 16777216
86 table.insert(dest, dat:sub(cp, cp + len))
87 cp = cp + len
88 end
89 end
92 local file_cache = {}
94 local function get_file(fname)
95 if file_cache[fname] then return file_cache[fname] end
96 file_cache[fname] = gzio.open(fname, "w")
97 if not file_cache[fname] then
98 assert(os.execute("mkdir -p " .. string.match(fname, "(.*)/.-")) == 0)
99 file_cache[fname] = gzio.open(fname, "w")
101 return file_cache[fname]
103 local function flush_cache()
104 for k, v in pairs(file_cache) do
105 v:close()
107 file_cache = {}
112 local MODE_SOLO = 0
113 local MODE_MASTER = 1
114 local MODE_SLAVE = 2
116 local mode
118 local slaveblock
120 local shard
121 local shard_count
123 local shard_ips = {}
125 local block_lookup = {}
127 local fname
128 local path
130 function ChainBlock_Init(path_f, fname_f, init_f)
131 if arg[1] == "master" then
132 mode = MODE_MASTER
133 path = path_f
134 fname = fname_f
135 init_f()
137 os.execute("rm -rf temp_removing")
138 os.execute("mv temp temp_removing")
139 io.popen("rm -rf temp_removing", "w")
141 shard = 0
143 for k = 2, #arg do
144 local ip, ct = arg[k]:match("(.+)x([0-9]+)")
145 assert(ip)
146 assert(ct)
147 for v = 1, ct do
148 table.insert(shard_ips, ip)
151 shard_count = #shard_ips
153 local print_bk = print
154 print = function(...) print_bk("Master", ...) end
156 elseif arg[1] == "slave" then
157 mode = MODE_SLAVE
158 slaveblock = arg[2]
159 shard = tonumber(arg[3])
160 shard_count = tonumber(arg[4])
161 assert(slaveblock)
162 assert(shard)
163 assert(shard_count)
165 local print_bk = print
166 local shardid = string.format("Shard %d/%d %s", shard, shard_count, slaveblock)
167 print = function(...) print_bk(shardid, ...) io.stdout:flush() end
169 ProgressMessage("Starting")
170 elseif arg[1] then
171 assert(false)
172 else
173 mode = MODE_SOLO
174 init_f()
178 function ChainBlock_Work()
179 if mode == MODE_SLAVE then
180 local prefix = string.format("temp/%s/%d", slaveblock, shard)
181 local hnd = io.popen(string.format("ls %s", prefix))
183 local tblock = block_lookup[slaveblock]
184 assert(tblock)
185 local ckey = nil
187 local lines = {}
188 for line in hnd:lines() do
189 table.insert(lines, line)
191 hnd:close()
193 local broadcasts = {}
194 for pos, line in ipairs(lines) do
195 if line:match("broadcast_.*") then
196 local fil = gzio.open(prefix .. "/" .. line, "r")
197 local dat = fil:read("*a")
198 fil:close()
199 os.execute(string.format("rm %s/%s", prefix, line))
201 persist_split(dat, broadcasts)
205 for pos, line in ipairs(lines) do
206 ProgressMessage(string.format("Progress, %d/%d", pos, #lines))
208 if line:match("broadcast_.*") then continue end
210 local tkey = string.match(line, "([a-f0-9]*)_.*")
211 if tkey ~= ckey then
212 tblock:Finish()
213 ckey = tkey
215 for k, v in ipairs(broadcasts) do
216 local tab = pluto.unpersist({}, v)
217 tblock:Broadcast(tab.id, tab.value)
221 local fil = gzio.open(prefix .. "/" .. line, "r")
222 local str, err = fil:read("*a")
224 assert(str, err, prefix .. "/" .. line)
226 fil:close()
228 os.execute(string.format("rm %s/%s", prefix, line))
230 local strs = {}
231 persist_split(str, strs)
233 local up = {}
234 for _, v in ipairs(strs) do
235 table.insert(up, pluto.unpersist({}, v))
237 for _, tab in ipairs(up) do
238 tblock:Insert(tab.key, tab.subkey, tab.value, tblock.filter)
242 tblock:Finish()
245 flush_cache()
246 return true
251 local function md5_clean(dat)
252 local binny = md5.sum(dat)
253 local rv = ""
254 for k = 1, #binny do
255 rv = rv .. string.format("%02x", string.byte(binny, k))
257 return rv
260 local function shardy(dat, shards)
261 if tonumber(dat) then return math.mod(tonumber(dat), shards) + 1 end
263 local binny = md5.sum(dat)
264 assert(shards)
265 local v = 0
266 for k = 1, 4 do
267 v = v * 256
268 v = v + string.byte(binny, k)
270 return math.mod(v, shards) + 1
274 local ChainBlock = {}
275 local ChainBlock_mt = { __index = ChainBlock }
277 function ChainBlock_Create(id, linkfrom, factory, sortpred, filter)
278 local ninst = {}
279 setmetatable(ninst, ChainBlock_mt)
280 ninst.id = id
281 ninst.factory = factory
282 ninst.sortpred = sortpred
283 ninst.filter = filter
284 ninst.items = {}
285 ninst.data = {}
286 ninst.linkto = {}
287 ninst.broadcasted = {}
288 ninst.unfinished = 0
289 ninst.process = function (key, subkey, value, identifier)
290 assert(key and value and type(key) == "string")
291 local touched = false
292 for _, v in pairs(ninst.linkto) do
293 touched = v:Insert(key, subkey, value, identifier) or touched
295 assert(touched, identifier)
297 ninst.broadcast = function (id, value) for _, v in pairs(ninst.linkto) do v:Broadcast(id, value) end end
298 if linkfrom then
299 for k, v in pairs(linkfrom) do
300 v:AddLinkTo(ninst)
301 ninst.unfinished = ninst.unfinished + 1
305 assert(not block_lookup[id])
306 block_lookup[id] = ninst
308 return ninst
312 function ChainBlock:Insert(key, subkey, value, identifier)
313 assert(key)
314 assert(type(key) == "string")
315 if self.filter and self.filter ~= identifier then return end
317 if mode ~= MODE_SOLO and slaveblock ~= self.id then
318 local f = get_file(string.format("temp/%s/%d/%s_%s_%s", self.id, shardy(key, shard_count), md5_clean(key):sub(1,1), (mode == MODE_MASTER and "master" or slaveblock), shard))
319 f:write(persist_dump(pluto.persist({}, {key = key, subkey = subkey, value = value})))
320 else
321 if not subkey then
322 if type(value) == "table" and value.fileid then push_file_id(value.fileid) else push_file_id(-1) end
323 self:GetItem(key):Data(key, subkey, value, self.process)
324 pop_file_id()
325 else
326 table.insert(self:GetData(key), {subkey = subkey, value = value})
330 return true
334 function ChainBlock:Broadcast(id, value)
335 if mode ~= MODE_SOLO and slaveblock ~= self.id then
336 for k = 1, shard_count do
337 local f = get_file(string.format("temp/%s/%d/broadcast_%s_%s", self.id, k, (mode == MODE_MASTER and "master" or slaveblock), shard))
338 f:write(persist_dump(pluto.persist({}, {id = id, value = value})))
340 else
341 table.insert(self.broadcasted, {id = id, value = value})
346 local finish_root_node = true
348 local timing = {}
350 function ChainBlock:Finish()
351 if mode == MODE_MASTER then
353 local frn = finish_root_node
354 finish_root_node = false
356 flush_cache()
358 self.unfinished = self.unfinished - 1
359 if self.unfinished > 0 then return end -- NOT . . . FINISHED . . . YET
361 local start = os.time()
363 local pypes = {}
364 for k = 1, shard_count do
365 table.insert(pypes, io.popen(string.format("ssh %s \"cd %s && nice luajit -O2 %s slave %s %d %d\"", shard_ips[k], path, fname, self.id, k, shard_count), "w"))
367 for k, v in pairs(pypes) do
368 v:close()
371 table.insert(timing, {id = self.id, dur = os.time() - start})
373 for _, v in pairs(self.linkto) do
374 v:Finish()
377 if frn then
378 for k, v in ipairs(timing) do
379 print(string.format("%20s %4d", v.id, v.dur))
383 elseif mode == MODE_SLAVE and slaveblock ~= self.id then
384 return
385 elseif mode == MODE_SOLO or (mode == MODE_SLAVE and slaveblock == self.id) then
386 self.unfinished = self.unfinished - 1
387 if self.unfinished > 0 and mode == MODE_SOLO then return end -- NOT . . . FINISHED . . . YET
389 if mode == MODE_SOLO then print("Broadcasting " .. self.id) end
391 local sdc = 0
392 for k, v in pairs(self.data) do sdc = sdc + 1 end
393 local sdcc = 0
395 if #self.broadcasted > 0 then
396 for k, v in pairs(self.items) do
397 for _, d in pairs(self.broadcasted) do
398 v:Receive(d.id, d.value)
402 self.broadcasted = {}
404 if mode == MODE_SOLO then print("Sorting " .. self.id) end
406 for k, v in pairs(self.data) do
407 ProgressMessage(string.format("Sorting %s, %d/%d", self.id, sdcc, sdc))
408 sdcc = sdcc + 1
410 if self.sortpred then
411 table.sort(v, function (a, b) return self.sortpred(a.subkey, b.subkey) end)
412 else
413 table.sort(v, function (a, b) return a.subkey < b.subkey end)
415 local item = self:GetItem(k)
417 local ict = 0
418 for _, d in pairs(v) do
419 ProgressMessage(string.format("Sorting %s, %d/%d + %d/%d", self.id, sdcc, sdc, ict, #v))
420 ict = ict + 1
421 if d.value.fileid then push_file_id(d.value.fileid) else push_file_id(-1) end
422 item:Data(k, d.subkey, d.value, self.process, self.broadcast)
423 pop_file_id()
426 self.data[k] = 0 -- This is kind of like setting it to nil, but instead of not working, it does work.
429 if mode == MODE_SOLO then print("Finishing " .. self.id) end
431 self.data = {}
432 for k, v in pairs(self.items) do
433 if v.Finish then v:Finish(self.process, self.broadcast) end
434 self.items[k] = 0
436 self.items = {}
438 if mode == MODE_SOLO then print("Chaining " .. self.id) end
440 for _, v in pairs(self.linkto) do
441 v:Finish()
444 if mode == MODE_SOLO then print("Done " .. self.id) end
448 function ChainBlock:AddLinkTo(item)
449 table.insert(self.linkto, item)
452 function ChainBlock:GetItem(key)
453 if not self.items[key] then
454 self.items[key] = self.factory(key)
456 return self.items[key]
459 function ChainBlock:GetData(key)
460 self:GetItem(key) -- just to ensure
461 if not self.data[key] then self.data[key] = {} end
462 return self.data[key]