update todo
[QuestHelper.git] / Development / build2 / compile_chain.lua
blobe96d1dd0de87f0186fd422ef2a0246be4b3d64e5
2 --[[
4 Okay let's think about this.
6 It takes the form of a series of chain blocks. Each chain block is a "manager" in some sort, in that it just represents the shape of the whole shebang. The actual implementation underlaying it doesn't really matter as much, so let's just build the framework first.
8 Parameters to a block consist only of the construction function for the type it's meant to generate, and the blocks it's meant to pass things to.
10 No need for inheritance, honestly.
13 --[[
15 LET'S THINK ABOUT MULTIPROCESSING
17 First off, sharding. Shard by key, hashing. We know how many destination systems we have, so we'll just shard according to that.
19 Okay here's the trivial version.
21 blockname_dstkey_srckey
23 Each block writes all the output that its block will have to the necessary output blocks. Once it's sure it's done, it . . . does what?
25 argh this is hurting my brain
29 OKAY
31 each block takes as input a series of keys, (k, sk, v)
33 as output, it dumps a set of keys, (k, sk, v, id)
35 these are filtered based on id to each block. a key may end up in zero, one, or many output blocks
37 those are split up by k
39 before processing, they are sorted by sk
44 Output files are:
46 (block)/(shard)/(key)_(writer_id)
48 We can just synchronize it from the controller - one block at a time. The blocks are DAGs, we can plug a Lua serialization library in and just append as we go (with some gzip? benchmark this)
50 Each step just drops into the directories for its block/kshard, then processes files in series - grab all the files for one k, process, continue.
52 I'm pretty sure this works, but I'm running out of hard drive space.
55 require("luarocks.require")
56 require("md5")
57 require("pluto")
58 require("gzio")
59 require("bit")
60 zlib = require("zlib") -- lol what
62 --local intermediate_start_at = "solidity"
64 --local compress = zlib.compress
65 --local decompress = zlib.decompress
67 local compress = lzo_compress
68 local decompress = lzo_decompress
70 if false then
71 print("comp loaded")
73 print(compress("hello hello hello"))
74 print(decompress(compress("hello hello hello")))
75 assert(false)
76 end
78 local function safety(func, ...)
79 local dt = {...}
80 local it = select('#', ...)
82 local rv, err = xpcall(function () return func(unpack(dt, 1, it)) end, function (ter) return ter .. "\n\n" .. debug.traceback() end)
83 if err then
84 print("safetyout")
85 print("RV, ERR", rv, err)
86 assert(false)
87 else
88 return rv
89 end
90 end
92 if not push_file_id then push_file_id = function () end end
93 if not pop_file_id then pop_file_id = function () end end
95 local nextProgressTime = 0
97 local function Progressable()
98 return os.time() > nextProgressTime
99 end
100 local function ProgressMessage(msg)
101 if os.time() > nextProgressTime then
102 nextProgressTime = os.time() + 1
103 print(msg)
109 function filecontents(filename)
110 local fil = io.open(filename, "rb")
111 if not fil then print("Failed to open filename", filename) end
112 assert(fil)
114 local fcerror = {}
116 local dofunc = function ()
117 local red = fil:read(4)
118 if not red then return fcerror end
119 assert(#red == 4)
120 local a, b, c, d = red:byte(1, 4)
121 local len = a + b * 256 + c * 65536 + d * 16777216
122 if len == math.pow(2, 32) - 1 then
123 return nil
125 local junk = fil:read(len)
126 assert(#junk == len)
127 return junk
130 local first = dofunc()
131 if first == fcerror then
132 fil:close()
133 print("everything is broken", filename)
134 sleep(1)
135 return filecontents(filename)
138 return function ()
139 if first then
140 local f = first
141 first = nil
142 return f
144 local rv = dofunc()
145 if rv == fcerror then
146 print("everything is error", filename)
147 assert(rv ~= fcerror)
149 return rv
152 local function multifile(...)
153 local files = {...}
154 local curf
155 local curfname
156 return function ()
157 if curf then
158 local dat = curf()
159 if dat then return dat end
160 curf = nil
163 if not curf then
164 local nfil = table.remove(files)
165 if not nfil then return nil end
166 curfname = nfil
167 curf = filecontents(nfil)
170 local dat, err = curf()
171 if not dat then print(err, curfname) end
172 assert(dat)
173 return dat
177 local function cheap_left(x) return (2*x) end
178 local function cheap_right(x) return (2*x + 1) end
179 local function cheap_sane(heap)
180 local dmp = ""
181 local finishbefore = 2
182 for i = 1, #heap do
183 if i == finishbefore then
184 print(dmp)
185 dmp = ""
186 finishbefore = finishbefore * 2
188 dmp = dmp .. string.format("%f ", heap[i].c)
190 print(dmp)
191 print("")
192 for i = 1, #heap do
193 assert(not heap[cheap_left(i)] or heap[i].c <= heap[cheap_left(i)].c)
194 assert(not heap[cheap_right(i)] or heap[i].c <= heap[cheap_right(i)].c)
197 local function cheap_insert(heap, item, pred)
198 assert(item)
199 table.insert(heap, item)
200 local pt = #heap
201 while pt > 1 do
202 local ptd2 = math.floor(pt / 2)
203 if not pred(heap[pt], heap[ptd2]) then
204 break
206 local tmp = heap[pt]
207 heap[pt] = heap[ptd2]
208 heap[ptd2] = tmp
209 pt = ptd2
211 --cheap_sane(heap)
213 local function cheap_extract(heap, pred)
214 local rv = heap[1]
215 if #heap == 1 then table.remove(heap) return rv end
216 heap[1] = table.remove(heap)
217 local idx = 1
218 while idx < #heap do
219 local minix = idx
220 if heap[cheap_left(idx)] and pred(heap[cheap_left(idx)], heap[minix]) then minix = cheap_left(idx) end
221 if heap[cheap_right(idx)] and pred(heap[cheap_right(idx)], heap[minix]) then minix = cheap_right(idx) end
222 if minix ~= idx then
223 local tx = heap[minix]
224 heap[minix] = heap[idx]
225 heap[idx] = tx
226 idx = minix
227 else
228 break
231 --cheap_sane(heap)
232 return rv
235 local function multifilesort(pred, ...)
236 local filenames = {...}
238 local lpred = function (a, b)
239 return pred(a.nxt, b.nxt)
242 local heep = {}
244 for i = 1, #filenames do
245 local fil = filecontents(filenames[i])
246 local dt = fil()
247 if dt then
248 cheap_insert(heep, {fil = fil, nxt = pluto.unpersist({}, decompress(dt))}, lpred)
252 return function ()
253 if #heep == 0 then return nil end
255 local dt = cheap_extract(heep, lpred)
257 local nxt = dt.nxt
259 dt.nxt = dt.fil()
260 if dt.nxt then
261 dt.nxt = pluto.unpersist({}, decompress(dt.nxt))
262 cheap_insert(heep, dt, lpred)
265 return nxt
269 local function filewriter(filename)
270 local fil = io.open(filename, "wb")
271 if not fil then
272 assert(os.execute("mkdir -p " .. string.match(filename, "(.*)/.-")) == 0)
273 fil = io.open(filename, "wb")
275 assert(fil)
277 local wroteshit = false
279 return {
280 close = function ()
281 fil:write(string.char(0xff, 0xff, 0xff, 0xff))
282 fil:close()
283 end,
284 write = function (_, dat)
285 wroteshit = true
286 fil:write(string.char(bit.band(#dat, 0xff), bit.band(#dat, 0xff00) / 256, bit.band(#dat, 0xff0000) / 65536, bit.band(#dat, 0xff000000) / 16777216))
287 fil:write(dat)
292 local file_cache = {}
294 local function get_file(fname)
295 if file_cache[fname] then return file_cache[fname] end
296 file_cache[fname] = filewriter(fname)
297 return file_cache[fname]
299 local function flush_cache()
300 for k, v in pairs(file_cache) do
301 v:close()
303 file_cache = {}
308 local MODE_MASTER = 1
309 local MODE_SLAVE = 2
311 local mode
313 local slaveblock
315 local shard
316 local shard_count
317 local internal_split = 16
319 local shard_ips = {}
321 local block_lookup = {}
323 local fname
324 local path
326 function ChainBlock_Init(path_f, fname_f, init_f)
327 if arg[1] == "master" then
328 mode = MODE_MASTER
329 path = path_f
330 fname = fname_f
331 init_f()
333 if not intermediate_start_at then
334 print("Removing ancient data . . .")
335 os.execute("rm -rf temp_removing")
336 print("Shifting old data . . .")
337 os.execute("mv temp temp_removing")
338 io.popen("rm -rf temp_removing", "w")
339 print("Beginning . . .")
342 shard = 0
344 for k = 3, #arg do
345 local ip, ct = arg[k]:match("(.+)x([0-9]+)")
346 assert(ip)
347 assert(ct)
348 for v = 1, ct do
349 table.insert(shard_ips, ip)
353 shard_count = tonumber(arg[2])
355 local print_bk = print
356 print = function(...) print_bk("Master", ...) end
358 elseif arg[1] == "slave" then
359 mode = MODE_SLAVE
360 slaveblock = arg[2]
361 shard = tonumber(arg[3])
362 shard_count = tonumber(arg[4])
363 assert(slaveblock)
364 assert(shard)
365 assert(shard_count)
367 local print_bk = print
368 local shardid = string.format("Shard %2d/%2d %s", shard, shard_count, slaveblock)
369 print = function(...) print_bk(shardid, ...) io.stdout:flush() end
371 ProgressMessage("Starting")
372 elseif arg[1] then
373 assert(false)
377 function ChainBlock_Work()
378 if mode == MODE_SLAVE then
379 local prefix = string.format("temp/%s/%d", slaveblock, shard)
380 local hnd = io.popen(string.format("ls %s 2> /dev/null", prefix))
382 local tblock = block_lookup[slaveblock]
383 assert(tblock)
384 local ckey = nil
386 local lines = {}
387 for line in hnd:lines() do
388 table.insert(lines, line)
390 hnd:close()
392 local srcfiles = {}
393 for pos, line in ipairs(lines) do
394 if line:match("data_.*") then
395 table.insert(srcfiles, prefix .. "/" .. line)
399 local function megasortpred(a, b)
400 if a.key ~= b.key then return a.key < b.key end
401 return tblock.sortpred and tblock.sortpred(a.subkey, b.subkey)
404 -- sort step
405 local intermediaries = {}
407 local sortct = 0
409 local datix = {}
410 local ct = 0
411 local function finish_datix()
412 if #datix == 0 then return end -- bzzzzt
414 local intermedfname = string.format("%s/intermed_%d", prefix, #intermediaries + 1)
415 table.insert(intermediaries, intermedfname)
417 table.sort(datix, function(a, b)
418 return megasortpred(a.deco, b.deco)
419 end)
421 local out = filewriter(intermedfname)
422 if not out then print(intermedfname) end
423 assert(out, intermedfname)
424 for _, v in ipairs(datix) do
425 out:write(v.raw)
427 out:close()
429 datix = {}
431 collectgarbage("collect")
433 local gogo = 0
434 for data in multifile(unpack(srcfiles)) do
435 if false then
436 if math.mod(gogo, 100000) == 50 then
437 for i = 1, 5 do
438 local t = os.time()
439 local ct = 0
440 while os.time() == t do
441 ct = ct + 1
442 decompress(data)
444 print("benchmarking", ct)
447 gogo = gogo + 1
449 local daca = decompress(data)
450 local chunk = pluto.unpersist({}, daca)
451 table.insert(datix, {raw = data, deco = chunk})
452 ct = ct + 1
453 sortct = sortct + 1
454 if ct == 1000 then
455 local garbaj = collectgarbage("count")
456 ct = 0
457 if garbaj > 250000 then
458 ProgressMessage(string.format("Dumping intermediate file %d containing %d", #intermediaries + 1, #datix))
459 finish_datix()
463 finish_datix()
465 local bctcount = 0
466 local broadcasts = {}
467 for pos, line in ipairs(lines) do
468 if line:match("broadcast_.*") then
469 for k in filecontents(prefix .. "/" .. line) do
470 local tab = pluto.unpersist({}, decompress(k))
471 tblock:Broadcast(tab.id, tab.value)
472 bctcount = bctcount + 1
477 print(string.format("Processing %d broadcasts, %d data, %d mem", bctcount, sortct, collectgarbage("count")))
479 -- merge
480 local curkey = nil
481 local curct = 0
482 for tab in multifilesort(megasortpred, unpack(intermediaries)) do
483 if Progressable() then
484 ProgressMessage(string.format("Processing %d/%d, %d mem", curct, sortct, collectgarbage("count")))
486 curct = curct + 1
487 if tab.key ~= curkey then
488 --print("finishing")
489 tblock:Finish()
490 curkey = tab.key
493 --print("tbi")
494 tblock:Insert(tab.key, tab.subkey, tab.value, tblock.filter)
497 tblock:Finish()
499 flush_cache()
500 return true
505 local function md5_value(dat)
506 if tonumber(dat) then return dat end
508 local binny = md5.sum(dat)
509 local v = 0
510 for k = 1, 4 do
511 v = v * 256
512 v = v + string.byte(binny, k)
514 return v
516 --[[
517 local function md5_clean(dat)
518 local binny = md5.sum(dat)
519 local rv = ""
520 for k = 1, #binny do
521 rv = rv .. string.format("%02x", string.byte(binny, k))
523 return rv
526 local function shardy(dat, shards)
527 if tonumber(dat) then return math.mod(tonumber(dat), shards) + 1 end
529 local binny = md5.sum(dat)
530 assert(shards)
531 local v = 0
532 for k = 1, 4 do
533 v = v * 256
534 v = v + string.byte(binny, k)
536 return math.mod(v, shards) + 1
537 end]]
540 local ChainBlock = {}
541 local ChainBlock_mt = { __index = ChainBlock }
543 function ChainBlock_Create(id, linkfrom, factory, sortpred, filter)
544 local ninst = {}
545 setmetatable(ninst, ChainBlock_mt)
546 ninst.id = id
547 ninst.factory = factory
548 ninst.sortpred = sortpred
549 ninst.filter = filter
550 ninst.items = {}
551 ninst.data = {}
552 ninst.linkto = {}
553 ninst.broadcasted = {}
554 ninst.unfinished = 0
555 ninst.process = function (key, subkey, value, identifier)
556 if not (key and value and type(key) == "string") then
557 print("Something's wrong with key and value!")
558 print("Key: ", type(key), key)
559 print("Value: ", type(value), value)
560 assert(key and value and type(key) == "string")
563 local touched = false
564 for _, v in pairs(ninst.linkto) do
565 touched = v:Insert(key, subkey, value, identifier) or touched
568 if not touched then
569 print("Identifier", identifier, "from block", id, "didn't connect to anything!")
570 assert(touched, identifier)
573 ninst.broadcast = function (id, value) for _, v in pairs(ninst.linkto) do v:Broadcast(id, value) end end
574 if linkfrom then
575 for k, v in pairs(linkfrom) do
576 v:AddLinkTo(ninst)
577 ninst.unfinished = ninst.unfinished + 1
581 assert(not block_lookup[id])
582 block_lookup[id] = ninst
584 return ninst
588 function ChainBlock:Insert(key, subkey, value, identifier)
589 assert(key)
590 assert(type(key) == "string")
591 if self.filter and self.filter ~= identifier then return end
593 if slaveblock ~= self.id then -- we write
594 local ki = md5_value(key)
595 local shard_dest_1 = math.mod(ki, shard_count) + 1
597 local f = get_file(string.format("temp/%s/%d/data_%s_%s", self.id, shard_dest_1, (mode == MODE_MASTER and "master" or slaveblock), shard))
598 f:write(compress(pluto.persist({}, {key = key, subkey = subkey, value = value})))
599 else -- we put to the system
600 if type(value) == "table" and value.fileid then push_file_id(value.fileid) else push_file_id(-1) end
601 safety(self:GetItem(key).Data, self:GetItem(key), key, subkey, value, self.process)
602 pop_file_id()
605 return true
609 function ChainBlock:Broadcast(id, value)
610 if slaveblock ~= self.id then -- we write
611 for k = 1, shard_count do
612 local f = get_file(string.format("temp/%s/%d/broadcast_%s_%s", self.id, k, (mode == MODE_MASTER and "master" or slaveblock), shard))
613 f:write(compress(pluto.persist({}, {id = id, value = value})))
615 else -- we put to the system
616 table.insert(self.broadcasted, {id = id, value = value})
621 local finish_root_node = true
623 local timing = {}
625 function ChainBlock:Finish()
626 if mode == MODE_MASTER then
627 local frn = finish_root_node
628 finish_root_node = false
630 flush_cache()
632 self.unfinished = self.unfinished - 1
633 if self.unfinished > 0 then return end -- NOT . . . FINISHED . . . YET
635 sync()
637 local start = os.time()
639 if not intermediate_start_at or self.id == intermediate_start_at then
640 intermediate_start_at = nil
641 multirun_clear()
642 for k = 1, shard_count do
643 multirun_add(string.format("ssh %s \"cd %s && nice luajit -O2 %s slave %s %d %d\"", shard_ips[1], path, fname, self.id, k, shard_count)) -- so, right now this works because we only have one computer, but if we ever have more than one IP we'll have to put part of this into multirun_complete
645 multirun_complete(self.id, #shard_ips)
648 table.insert(timing, {id = self.id, dur = os.time() - start})
650 for _, v in pairs(self.linkto) do
651 v:Finish()
654 if frn then
655 for k, v in ipairs(timing) do
656 print(string.format("%20s %4d", v.id, v.dur))
660 elseif mode == MODE_SLAVE and slaveblock ~= self.id then
661 return
662 elseif mode == MODE_SLAVE and slaveblock == self.id then
663 for k, v in pairs(self.items) do
664 if v.Finish then safety(v.Finish, v, self.process, self.broadcast) end
665 self.items[k] = 0
667 self.items = {}
671 function ChainBlock:AddLinkTo(item)
672 table.insert(self.linkto, item)
675 function ChainBlock:GetItem(key)
676 if not self.items[key] then
677 self.items[key] = self.factory(key)
679 for _, v in ipairs(self.broadcasted) do
680 safety(self.items[key].Receive, self.items[key], v.id, v.value)
683 return self.items[key]
686 function ChainBlock:GetData(key)
687 self:GetItem(key) -- just to ensure
688 if not self.data[key] then self.data[key] = {} end
689 return self.data[key]