4 Okay let's think about this.
6 It takes the form of a series of chain blocks. Each chain block is a "manager" in some sort, in that it just represents the shape of the whole shebang. The actual implementation underlaying it doesn't really matter as much, so let's just build the framework first.
8 Parameters to a block consist only of the construction function for the type it's meant to generate, and the blocks it's meant to pass things to.
10 No need for inheritance, honestly.
15 LET'S THINK ABOUT MULTIPROCESSING
17 First off, sharding. Shard by key, hashing. We know how many destination systems we have, so we'll just shard according to that.
19 Okay here's the trivial version.
21 blockname_dstkey_srckey
23 Each block writes all the output that its block will have to the necessary output blocks. Once it's sure it's done, it . . . does what?
25 argh this is hurting my brain
31 each block takes as input a series of keys, (k, sk, v)
33 as output, it dumps a set of keys, (k, sk, v, id)
35 these are filtered based on id to each block. a key may end up in zero, one, or many output blocks
37 those are split up by k
39 before processing, they are sorted by sk
46 (block)/(shard)/(key)_(writer_id)
48 We can just synchronize it from the controller - one block at a time. The blocks are DAGs, we can plug a Lua serialization library in and just append as we go (with some gzip? benchmark this)
50 Each step just drops into the directories for its block/kshard, then processes files in series - grab all the files for one k, process, continue.
52 I'm pretty sure this works, but I'm running out of hard drive space.
55 require("luarocks.require")
60 zlib
= require("zlib") -- lol what
62 --local intermediate_start_at = "solidity"
64 --local compress = zlib.compress
65 --local decompress = zlib.decompress
67 local compress
= lzo_compress
68 local decompress
= lzo_decompress
73 print(compress("hello hello hello"))
74 print(decompress(compress("hello hello hello")))
78 local function safety(func
, ...)
80 local it
= select('#', ...)
82 local rv
, err
= xpcall(function () return func(unpack(dt
, 1, it
)) end, function (ter
) return ter
.. "\n\n" .. debug
.traceback() end)
85 print("RV, ERR", rv
, err
)
92 if not push_file_id
then push_file_id
= function () end end
93 if not pop_file_id
then pop_file_id
= function () end end
95 local nextProgressTime
= 0
97 local function Progressable()
98 return os
.time() > nextProgressTime
100 local function ProgressMessage(msg
)
101 if os
.time() > nextProgressTime
then
102 nextProgressTime
= os
.time() + 1
109 function filecontents(filename
)
110 local fil
= io
.open(filename
, "rb")
111 if not fil
then print("Failed to open filename", filename
) end
116 local dofunc
= function ()
117 local red
= fil
:read(4)
118 if not red
then return fcerror
end
120 local a
, b
, c
, d
= red
:byte(1, 4)
121 local len
= a
+ b
* 256 + c
* 65536 + d
* 16777216
122 if len
== math
.pow(2, 32) - 1 then
125 local junk
= fil
:read(len
)
130 local first
= dofunc()
131 if first
== fcerror
then
133 print("everything is broken", filename
)
135 return filecontents(filename
)
145 if rv
== fcerror
then
146 print("everything is error", filename
)
147 assert(rv
~= fcerror
)
152 local function multifile(...)
159 if dat
then return dat
end
164 local nfil
= table.remove(files
)
165 if not nfil
then return nil end
167 curf
= filecontents(nfil
)
170 local dat
, err
= curf()
171 if not dat
then print(err
, curfname
) end
177 local function cheap_left(x
) return (2*x
) end
178 local function cheap_right(x
) return (2*x
+ 1) end
179 local function cheap_sane(heap
)
181 local finishbefore
= 2
183 if i
== finishbefore
then
186 finishbefore
= finishbefore
* 2
188 dmp
= dmp
.. string.format("%f ", heap
[i
].c
)
193 assert(not heap
[cheap_left(i
)] or heap
[i
].c
<= heap
[cheap_left(i
)].c
)
194 assert(not heap
[cheap_right(i
)] or heap
[i
].c
<= heap
[cheap_right(i
)].c
)
197 local function cheap_insert(heap
, item
, pred
)
199 table.insert(heap
, item
)
202 local ptd2
= math
.floor(pt
/ 2)
203 if not pred(heap
[pt
], heap
[ptd2
]) then
207 heap
[pt
] = heap
[ptd2
]
213 local function cheap_extract(heap
, pred
)
215 if #heap
== 1 then table.remove(heap
) return rv
end
216 heap
[1] = table.remove(heap
)
220 if heap
[cheap_left(idx
)] and pred(heap
[cheap_left(idx
)], heap
[minix
]) then minix
= cheap_left(idx
) end
221 if heap
[cheap_right(idx
)] and pred(heap
[cheap_right(idx
)], heap
[minix
]) then minix
= cheap_right(idx
) end
223 local tx
= heap
[minix
]
224 heap
[minix
] = heap
[idx
]
235 local function multifilesort(pred
, ...)
236 local filenames
= {...}
238 local lpred
= function (a
, b
)
239 return pred(a
.nxt
, b
.nxt
)
244 for i
= 1, #filenames
do
245 local fil
= filecontents(filenames
[i
])
248 cheap_insert(heep
, {fil
= fil
, nxt
= pluto
.unpersist({}, decompress(dt
))}, lpred
)
253 if #heep
== 0 then return nil end
255 local dt
= cheap_extract(heep
, lpred
)
261 dt
.nxt
= pluto
.unpersist({}, decompress(dt
.nxt
))
262 cheap_insert(heep
, dt
, lpred
)
269 local function filewriter(filename
)
270 local fil
= io
.open(filename
, "wb")
272 assert(os
.execute("mkdir -p " .. string.match(filename
, "(.*)/.-")) == 0)
273 fil
= io
.open(filename
, "wb")
277 local wroteshit
= false
281 fil
:write(string.char(0xff, 0xff, 0xff, 0xff))
284 write = function (_
, dat
)
286 fil
:write(string.char(bit
.band(#dat
, 0xff), bit
.band(#dat
, 0xff00) / 256, bit
.band(#dat
, 0xff0000) / 65536, bit
.band(#dat
, 0xff000000) / 16777216))
292 local file_cache
= {}
294 local function get_file(fname
)
295 if file_cache
[fname
] then return file_cache
[fname
] end
296 file_cache
[fname
] = filewriter(fname
)
297 return file_cache
[fname
]
299 local function flush_cache()
300 for k
, v
in pairs(file_cache
) do
308 local MODE_MASTER
= 1
317 local internal_split
= 16
321 local block_lookup
= {}
326 function ChainBlock_Init(path_f
, fname_f
, init_f
)
327 if arg
[1] == "master" then
333 if not intermediate_start_at
then
334 print("Removing ancient data . . .")
335 os
.execute("rm -rf temp_removing")
336 print("Shifting old data . . .")
337 os
.execute("mv temp temp_removing")
338 io
.popen("rm -rf temp_removing", "w")
339 print("Beginning . . .")
345 local ip
, ct
= arg
[k
]:match("(.+)x([0-9]+)")
349 table.insert(shard_ips
, ip
)
353 shard_count
= tonumber(arg
[2])
355 local print_bk
= print
356 print = function(...) print_bk("Master", ...) end
358 elseif arg
[1] == "slave" then
361 shard
= tonumber(arg
[3])
362 shard_count
= tonumber(arg
[4])
367 local print_bk
= print
368 local shardid
= string.format("Shard %2d/%2d %s", shard
, shard_count
, slaveblock
)
369 print = function(...) print_bk(shardid
, ...) io
.stdout
:flush() end
371 ProgressMessage("Starting")
377 function ChainBlock_Work()
378 if mode
== MODE_SLAVE
then
379 local prefix
= string.format("temp/%s/%d", slaveblock
, shard
)
380 local hnd
= io
.popen(string.format("ls %s 2> /dev/null", prefix
))
382 local tblock
= block_lookup
[slaveblock
]
387 for line
in hnd
:lines() do
388 table.insert(lines
, line
)
393 for pos
, line
in ipairs(lines
) do
394 if line
:match("data_.*") then
395 table.insert(srcfiles
, prefix
.. "/" .. line
)
399 local function megasortpred(a
, b
)
400 if a
.key
~= b
.key
then return a
.key
< b
.key
end
401 return tblock
.sortpred
and tblock
.sortpred(a
.subkey
, b
.subkey
)
405 local intermediaries
= {}
411 local function finish_datix()
412 if #datix
== 0 then return end -- bzzzzt
414 local intermedfname
= string.format("%s/intermed_%d", prefix
, #intermediaries
+ 1)
415 table.insert(intermediaries
, intermedfname
)
417 table.sort(datix
, function(a
, b
)
418 return megasortpred(a
.deco
, b
.deco
)
421 local out
= filewriter(intermedfname
)
422 if not out
then print(intermedfname
) end
423 assert(out
, intermedfname
)
424 for _
, v
in ipairs(datix
) do
431 collectgarbage("collect")
434 for data
in multifile(unpack(srcfiles
)) do
436 if math
.mod(gogo
, 100000) == 50 then
440 while os
.time() == t
do
444 print("benchmarking", ct
)
449 local daca
= decompress(data
)
450 local chunk
= pluto
.unpersist({}, daca
)
451 table.insert(datix
, {raw
= data
, deco
= chunk
})
455 local garbaj
= collectgarbage("count")
457 if garbaj
> 250000 then
458 ProgressMessage(string.format("Dumping intermediate file %d containing %d", #intermediaries
+ 1, #datix
))
466 local broadcasts
= {}
467 for pos
, line
in ipairs(lines
) do
468 if line
:match("broadcast_.*") then
469 for k
in filecontents(prefix
.. "/" .. line
) do
470 local tab
= pluto
.unpersist({}, decompress(k
))
471 tblock
:Broadcast(tab
.id
, tab
.value
)
472 bctcount
= bctcount
+ 1
477 print(string.format("Processing %d broadcasts, %d data, %d mem", bctcount
, sortct
, collectgarbage("count")))
482 for tab
in multifilesort(megasortpred
, unpack(intermediaries
)) do
483 if Progressable() then
484 ProgressMessage(string.format("Processing %d/%d, %d mem", curct
, sortct
, collectgarbage("count")))
487 if tab
.key
~= curkey
then
494 tblock
:Insert(tab
.key
, tab
.subkey
, tab
.value
, tblock
.filter
)
505 local function md5_value(dat
)
506 if tonumber(dat
) then return dat
end
508 local binny
= md5
.sum(dat
)
512 v
= v
+ string.byte(binny
, k
)
517 local function md5_clean(dat)
518 local binny = md5.sum(dat)
521 rv = rv .. string.format("%02x", string.byte(binny, k))
526 local function shardy(dat, shards)
527 if tonumber(dat) then return math.mod(tonumber(dat), shards) + 1 end
529 local binny = md5.sum(dat)
534 v = v + string.byte(binny, k)
536 return math.mod(v, shards) + 1
540 local ChainBlock
= {}
541 local ChainBlock_mt
= { __index
= ChainBlock
}
543 function ChainBlock_Create(id
, linkfrom
, factory
, sortpred
, filter
)
545 setmetatable(ninst
, ChainBlock_mt
)
547 ninst
.factory
= factory
548 ninst
.sortpred
= sortpred
549 ninst
.filter
= filter
553 ninst
.broadcasted
= {}
555 ninst
.process
= function (key
, subkey
, value
, identifier
)
556 if not (key
and value
and type(key
) == "string") then
557 print("Something's wrong with key and value!")
558 print("Key: ", type(key
), key
)
559 print("Value: ", type(value
), value
)
560 assert(key
and value
and type(key
) == "string")
563 local touched
= false
564 for _
, v
in pairs(ninst
.linkto
) do
565 touched
= v
:Insert(key
, subkey
, value
, identifier
) or touched
569 print("Identifier", identifier
, "from block", id
, "didn't connect to anything!")
570 assert(touched
, identifier
)
573 ninst
.broadcast
= function (id
, value
) for _
, v
in pairs(ninst
.linkto
) do v
:Broadcast(id
, value
) end end
575 for k
, v
in pairs(linkfrom
) do
577 ninst
.unfinished
= ninst
.unfinished
+ 1
581 assert(not block_lookup
[id
])
582 block_lookup
[id
] = ninst
588 function ChainBlock
:Insert(key
, subkey
, value
, identifier
)
590 assert(type(key
) == "string")
591 if self
.filter
and self
.filter
~= identifier
then return end
593 if slaveblock
~= self
.id
then -- we write
594 local ki
= md5_value(key
)
595 local shard_dest_1
= math
.mod(ki
, shard_count
) + 1
597 local f
= get_file(string.format("temp/%s/%d/data_%s_%s", self
.id
, shard_dest_1
, (mode
== MODE_MASTER
and "master" or slaveblock
), shard
))
598 f
:write(compress(pluto
.persist({}, {key
= key
, subkey
= subkey
, value
= value
})))
599 else -- we put to the system
600 if type(value
) == "table" and value
.fileid
then push_file_id(value
.fileid
) else push_file_id(-1) end
601 safety(self
:GetItem(key
).Data
, self
:GetItem(key
), key
, subkey
, value
, self
.process
)
609 function ChainBlock
:Broadcast(id
, value
)
610 if slaveblock
~= self
.id
then -- we write
611 for k
= 1, shard_count
do
612 local f
= get_file(string.format("temp/%s/%d/broadcast_%s_%s", self
.id
, k
, (mode
== MODE_MASTER
and "master" or slaveblock
), shard
))
613 f
:write(compress(pluto
.persist({}, {id
= id
, value
= value
})))
615 else -- we put to the system
616 table.insert(self
.broadcasted
, {id
= id
, value
= value
})
621 local finish_root_node
= true
625 function ChainBlock
:Finish()
626 if mode
== MODE_MASTER
then
627 local frn
= finish_root_node
628 finish_root_node
= false
632 self
.unfinished
= self
.unfinished
- 1
633 if self
.unfinished
> 0 then return end -- NOT . . . FINISHED . . . YET
637 local start
= os
.time()
639 if not intermediate_start_at
or self
.id
== intermediate_start_at
then
640 intermediate_start_at
= nil
642 for k
= 1, shard_count
do
643 multirun_add(string.format("ssh %s \"cd %s && nice luajit -O2 %s slave %s %d %d\"", shard_ips
[1], path
, fname
, self
.id
, k
, shard_count
)) -- so, right now this works because we only have one computer, but if we ever have more than one IP we'll have to put part of this into multirun_complete
645 multirun_complete(self
.id
, #shard_ips
)
648 table.insert(timing
, {id
= self
.id
, dur
= os
.time() - start
})
650 for _
, v
in pairs(self
.linkto
) do
655 for k
, v
in ipairs(timing
) do
656 print(string.format("%20s %4d", v
.id
, v
.dur
))
660 elseif mode
== MODE_SLAVE
and slaveblock
~= self
.id
then
662 elseif mode
== MODE_SLAVE
and slaveblock
== self
.id
then
663 for k
, v
in pairs(self
.items
) do
664 if v
.Finish
then safety(v
.Finish
, v
, self
.process
, self
.broadcast
) end
671 function ChainBlock
:AddLinkTo(item
)
672 table.insert(self
.linkto
, item
)
675 function ChainBlock
:GetItem(key
)
676 if not self
.items
[key
] then
677 self
.items
[key
] = self
.factory(key
)
679 for _
, v
in ipairs(self
.broadcasted
) do
680 safety(self
.items
[key
].Receive
, self
.items
[key
], v
.id
, v
.value
)
683 return self
.items
[key
]
686 function ChainBlock
:GetData(key
)
687 self
:GetItem(key
) -- just to ensure
688 if not self
.data
[key
] then self
.data
[key
] = {} end
689 return self
.data
[key
]