4 Okay let's think about this.
6 It takes the form of a series of chain blocks. Each chain block is a "manager" in some sort, in that it just represents the shape of the whole shebang. The actual implementation underlaying it doesn't really matter as much, so let's just build the framework first.
8 Parameters to a block consist only of the construction function for the type it's meant to generate, and the blocks it's meant to pass things to.
10 No need for inheritance, honestly.
15 LET'S THINK ABOUT MULTIPROCESSING
17 First off, sharding. Shard by key, hashing. We know how many destination systems we have, so we'll just shard according to that.
19 Okay here's the trivial version.
21 blockname_dstkey_srckey
23 Each block writes all the output that its block will have to the necessary output blocks. Once it's sure it's done, it . . . does what?
25 argh this is hurting my brain
31 each block takes as input a series of keys, (k, sk, v)
33 as output, it dumps a set of keys, (k, sk, v, id)
35 these are filtered based on id to each block. a key may end up in zero, one, or many output blocks
37 those are split up by k
39 before processing, they are sorted by sk
46 (block)/(shard)/(key)_(writer_id)
48 We can just synchronize it from the controller - one block at a time. The blocks are DAGs, we can plug a Lua serialization library in and just append as we go (with some gzip? benchmark this)
50 Each step just drops into the directories for its block/kshard, then processes files in series - grab all the files for one k, process, continue.
52 I'm pretty sure this works, but I'm running out of hard drive space.
55 require("luarocks.require")
61 if not push_file_id
then push_file_id
= function () end end
62 if not pop_file_id
then pop_file_id
= function () end end
64 local nextProgressTime
= 0
66 local function ProgressMessage(msg
)
67 if os
.time() > nextProgressTime
then
68 nextProgressTime
= os
.time() + 1
74 local function persist_dump(dat
)
76 return string.char(bit
.band(#dat
, 0xff), bit
.band(#dat
, 0xff00) / 256, bit
.band(#dat
, 0xff0000) / 65536, bit
.band(#dat
, 0xff000000) / 16777216) .. dat
79 local function persist_split(dat
, dest
)
82 assert(cp
+ 4 <= #dat
)
83 local a
, b
, c
, d
= dat
:byte(cp
, cp
+ 3)
85 local len
= a
+ b
* 256 + c
* 65536 + d
* 16777216
86 table.insert(dest
, dat
:sub(cp
, cp
+ len
))
94 local function get_file(fname
)
95 if file_cache
[fname
] then return file_cache
[fname
] end
96 file_cache
[fname
] = gzio
.open(fname
, "w")
97 if not file_cache
[fname
] then
98 assert(os
.execute("mkdir -p " .. string.match(fname
, "(.*)/.-")) == 0)
99 file_cache
[fname
] = gzio
.open(fname
, "w")
101 return file_cache
[fname
]
103 local function flush_cache()
104 for k
, v
in pairs(file_cache
) do
113 local MODE_MASTER
= 1
125 local block_lookup
= {}
130 function ChainBlock_Init(path_f
, fname_f
, init_f
)
131 if arg
[1] == "master" then
137 os
.execute("rm -rf temp_removing")
138 os
.execute("mv temp temp_removing")
139 io
.popen("rm -rf temp_removing", "w")
144 local ip
, ct
= arg
[k
]:match("(.+)x([0-9]+)")
148 table.insert(shard_ips
, ip
)
151 shard_count
= #shard_ips
153 local print_bk
= print
154 print = function(...) print_bk("Master", ...) end
156 elseif arg
[1] == "slave" then
159 shard
= tonumber(arg
[3])
160 shard_count
= tonumber(arg
[4])
165 local print_bk
= print
166 local shardid
= string.format("Shard %d/%d %s", shard
, shard_count
, slaveblock
)
167 print = function(...) print_bk(shardid
, ...) io
.stdout
:flush() end
169 ProgressMessage("Starting")
178 function ChainBlock_Work()
179 if mode
== MODE_SLAVE
then
180 local prefix
= string.format("temp/%s/%d", slaveblock
, shard
)
181 local hnd
= io
.popen(string.format("ls %s", prefix
))
183 local tblock
= block_lookup
[slaveblock
]
188 for line
in hnd
:lines() do
189 table.insert(lines
, line
)
193 local broadcasts
= {}
194 for pos
, line
in ipairs(lines
) do
195 if line
:match("broadcast_.*") then
196 local fil
= gzio
.open(prefix
.. "/" .. line
, "r")
197 local dat
= fil
:read("*a")
199 os
.execute(string.format("rm %s/%s", prefix
, line
))
201 persist_split(dat
, broadcasts
)
205 for pos
, line
in ipairs(lines
) do
206 ProgressMessage(string.format("Progress, %d/%d", pos
, #lines
))
208 if line
:match("broadcast_.*") then continue
end
210 local tkey
= string.match(line
, "([a-f0-9]*)_.*")
215 for k
, v
in ipairs(broadcasts
) do
216 local tab
= pluto
.unpersist({}, v
)
217 tblock
:Broadcast(tab
.id
, tab
.value
)
221 local fil
= gzio
.open(prefix
.. "/" .. line
, "r")
222 local str
, err
= fil
:read("*a")
224 assert(str
, err
, prefix
.. "/" .. line
)
228 os
.execute(string.format("rm %s/%s", prefix
, line
))
231 persist_split(str
, strs
)
234 for _
, v
in ipairs(strs
) do
235 table.insert(up
, pluto
.unpersist({}, v
))
237 for _
, tab
in ipairs(up
) do
238 tblock
:Insert(tab
.key
, tab
.subkey
, tab
.value
, tblock
.filter
)
251 local function md5_clean(dat
)
252 local binny
= md5
.sum(dat
)
255 rv
= rv
.. string.format("%02x", string.byte(binny
, k
))
260 local function shardy(dat
, shards
)
261 if tonumber(dat
) then return math
.mod(tonumber(dat
), shards
) + 1 end
263 local binny
= md5
.sum(dat
)
268 v
= v
+ string.byte(binny
, k
)
270 return math
.mod(v
, shards
) + 1
274 local ChainBlock
= {}
275 local ChainBlock_mt
= { __index
= ChainBlock
}
277 function ChainBlock_Create(id
, linkfrom
, factory
, sortpred
, filter
)
279 setmetatable(ninst
, ChainBlock_mt
)
281 ninst
.factory
= factory
282 ninst
.sortpred
= sortpred
283 ninst
.filter
= filter
287 ninst
.broadcasted
= {}
289 ninst
.process
= function (key
, subkey
, value
, identifier
)
290 assert(key
and value
and type(key
) == "string")
291 local touched
= false
292 for _
, v
in pairs(ninst
.linkto
) do
293 touched
= v
:Insert(key
, subkey
, value
, identifier
) or touched
295 assert(touched
, identifier
)
297 ninst
.broadcast
= function (id
, value
) for _
, v
in pairs(ninst
.linkto
) do v
:Broadcast(id
, value
) end end
299 for k
, v
in pairs(linkfrom
) do
301 ninst
.unfinished
= ninst
.unfinished
+ 1
305 assert(not block_lookup
[id
])
306 block_lookup
[id
] = ninst
312 function ChainBlock
:Insert(key
, subkey
, value
, identifier
)
314 assert(type(key
) == "string")
315 if self
.filter
and self
.filter
~= identifier
then return end
317 if mode
~= MODE_SOLO
and slaveblock
~= self
.id
then
318 local f
= get_file(string.format("temp/%s/%d/%s_%s_%s", self
.id
, shardy(key
, shard_count
), md5_clean(key
):sub(1,1), (mode
== MODE_MASTER
and "master" or slaveblock
), shard
))
319 f
:write(persist_dump(pluto
.persist({}, {key
= key
, subkey
= subkey
, value
= value
})))
322 if type(value
) == "table" and value
.fileid
then push_file_id(value
.fileid
) else push_file_id(-1) end
323 self
:GetItem(key
):Data(key
, subkey
, value
, self
.process
)
326 table.insert(self
:GetData(key
), {subkey
= subkey
, value
= value
})
334 function ChainBlock
:Broadcast(id
, value
)
335 if mode
~= MODE_SOLO
and slaveblock
~= self
.id
then
336 for k
= 1, shard_count
do
337 local f
= get_file(string.format("temp/%s/%d/broadcast_%s_%s", self
.id
, k
, (mode
== MODE_MASTER
and "master" or slaveblock
), shard
))
338 f
:write(persist_dump(pluto
.persist({}, {id
= id
, value
= value
})))
341 table.insert(self
.broadcasted
, {id
= id
, value
= value
})
346 local finish_root_node
= true
350 function ChainBlock
:Finish()
351 if mode
== MODE_MASTER
then
353 local frn
= finish_root_node
354 finish_root_node
= false
358 self
.unfinished
= self
.unfinished
- 1
359 if self
.unfinished
> 0 then return end -- NOT . . . FINISHED . . . YET
361 local start
= os
.time()
364 for k
= 1, shard_count
do
365 table.insert(pypes
, io
.popen(string.format("ssh %s \"cd %s && nice luajit -O2 %s slave %s %d %d\"", shard_ips
[k
], path
, fname
, self
.id
, k
, shard_count
), "w"))
367 for k
, v
in pairs(pypes
) do
371 table.insert(timing
, {id
= self
.id
, dur
= os
.time() - start
})
373 for _
, v
in pairs(self
.linkto
) do
378 for k
, v
in ipairs(timing
) do
379 print(string.format("%20s %4d", v
.id
, v
.dur
))
383 elseif mode
== MODE_SLAVE
and slaveblock
~= self
.id
then
385 elseif mode
== MODE_SOLO
or (mode
== MODE_SLAVE
and slaveblock
== self
.id
) then
386 self
.unfinished
= self
.unfinished
- 1
387 if self
.unfinished
> 0 and mode
== MODE_SOLO
then return end -- NOT . . . FINISHED . . . YET
389 if mode
== MODE_SOLO
then print("Broadcasting " .. self
.id
) end
392 for k
, v
in pairs(self
.data
) do sdc
= sdc
+ 1 end
395 if #self
.broadcasted
> 0 then
396 for k
, v
in pairs(self
.items
) do
397 for _
, d
in pairs(self
.broadcasted
) do
398 v
:Receive(d
.id
, d
.value
)
402 self
.broadcasted
= {}
404 if mode
== MODE_SOLO
then print("Sorting " .. self
.id
) end
406 for k
, v
in pairs(self
.data
) do
407 ProgressMessage(string.format("Sorting %s, %d/%d", self
.id
, sdcc
, sdc
))
410 if self
.sortpred
then
411 table.sort(v
, function (a
, b
) return self
.sortpred(a
.subkey
, b
.subkey
) end)
413 table.sort(v
, function (a
, b
) return a
.subkey
< b
.subkey
end)
415 local item
= self
:GetItem(k
)
418 for _
, d
in pairs(v
) do
419 ProgressMessage(string.format("Sorting %s, %d/%d + %d/%d", self
.id
, sdcc
, sdc
, ict
, #v
))
421 if d
.value
.fileid
then push_file_id(d
.value
.fileid
) else push_file_id(-1) end
422 item
:Data(k
, d
.subkey
, d
.value
, self
.process
, self
.broadcast
)
426 self
.data
[k
] = 0 -- This is kind of like setting it to nil, but instead of not working, it does work.
429 if mode
== MODE_SOLO
then print("Finishing " .. self
.id
) end
432 for k
, v
in pairs(self
.items
) do
433 if v
.Finish
then v
:Finish(self
.process
, self
.broadcast
) end
438 if mode
== MODE_SOLO
then print("Chaining " .. self
.id
) end
440 for _
, v
in pairs(self
.linkto
) do
444 if mode
== MODE_SOLO
then print("Done " .. self
.id
) end
448 function ChainBlock
:AddLinkTo(item
)
449 table.insert(self
.linkto
, item
)
452 function ChainBlock
:GetItem(key
)
453 if not self
.items
[key
] then
454 self
.items
[key
] = self
.factory(key
)
456 return self
.items
[key
]
459 function ChainBlock
:GetData(key
)
460 self
:GetItem(key
) -- just to ensure
461 if not self
.data
[key
] then self
.data
[key
] = {} end
462 return self
.data
[key
]