1 #! /usr/bin/env python3
4 from binascii
import crc32
5 from shorthand
import bitmask
6 from struct
import Struct
7 from io
import SEEK_CUR
, SEEK_END
, SEEK_SET
9 from contextlib
import ExitStack
10 from io
import BytesIO
11 from shorthand
import read_exactly
12 from datetime
import datetime
14 HEADER
= Struct('<HBHH')
16 OPTIONAL_BLOCK
= 0x4000
21 FILE_STRUCT
= Struct('< LBL L BcHL')
24 SPLIT_MASK
= SPLIT_BEFORE^SPLIT_AFTER
26 def main(volname
=None, *files
):
28 with
Archive() as vol_cleanup
, ExitStack() as extract_cleanup
:
30 vol_cleanup
.vol
= stdin
.buffer
31 vol_cleanup
.volname
= ''
32 vol_cleanup
.vol
= ForwardReader(vol_cleanup
.vol
)
34 vol_cleanup
.open(volname
)
35 for vol
in vol_cleanup
:
36 if volname
is not None:
37 print(vol_cleanup
.volname
)
40 print(end
='+{}:'.format(vol
.vol
.tell()))
42 [type, flags
, packed
, block
] = next(blocks
)
46 print(' Marker (signature) block')
49 print(' Archive volume main block')
50 assert flags
& ~
0x100 == 0x11
51 assert block
.read() == bytes(6)
55 split
= flags
& SPLIT_MASK
58 SPLIT_BEFORE
: 'last part',
59 SPLIT_AFTER
: 'first part',
60 SPLIT_BEFORE^SPLIT_AFTER
: 'middle part',
62 print('', part
, end
=',')
65 mtime
, mtime_flags
, mtime_frac
, exttime
] = block
66 print('', repr(name
), end
=',')
68 dict_size
= flags
& 0xE0
73 assert flags
& SPLIT_BEFORE
and name
== extract_name
76 assert not flags
& SPLIT_BEFORE
79 extract_file
= open(basename
, 'xb')
80 extract_cleanup
.callback(extract_file
.close
)
84 chunk
= read_exactly(vol
.vol
, min(packed
, 0x10000))
86 extract_crc
= crc32(chunk
, extract_crc
)
87 extract_file
.write(chunk
)
88 assert extract_crc
== crc
89 if not flags
& SPLIT_AFTER
:
90 extract_cleanup
.close()
92 print(' CRC', format(crc
, '08X'))
94 print(end
=' Archive volume end block;')
95 crc
= int.from_bytes(read_exactly(block
, 4), 'little')
96 print(' CRC', format(crc
, '08X'), end
=',')
97 print(' part', vol
.part
)
99 assert block
.read() == bytes(7)
102 print(' New sub-block')
103 assert flags
== LONG_BLOCK
104 if vol
.vol
.tell() > vol
.vol
.seek(0, SEEK_END
):
105 print(' Block truncated at', vol
.vol
.tell())
109 raise SystemExit(basename
+ 'not completely extracted')
111 class Archive(ExitStack
):
112 def open(vol_cleanup
, volname
):
113 vol_cleanup
.vol
= vol_cleanup
.enter_context(open(volname
, 'rb'))
114 vol_cleanup
.volname
= volname
116 def __iter__(vol_cleanup
):
119 vol
= Volume(vol_cleanup
.vol
)
126 match
= r
'(.+)\.part(0*{})\.rar'.format(1 + vol
.part
)
127 match
= re
.fullmatch(match
, vol_cleanup
.volname
, re
.ASCII^re
.DOTALL
)
129 raise SystemExit('Cannot determine next volume name')
130 [arc
, digits
] = match
.groups()
133 volname
= '{}.part{:0{}}.rar'.format(arc
, 1 + vol
.part
, digits
)
134 vol_cleanup
.open(volname
)
137 def __init__(self
, vol
):
143 header
= self
.vol
.read(7)
146 [crc
, type, flags
, size
] = HEADER
.unpack(header
)
149 block
= read_exactly(self
.vol
, size
)
151 assert header
.startswith(b
'Rar!\x1A') and not size
153 assert crc32(block
, crc32(header
[2:])) & bitmask(16) == crc
154 block
= BytesIO(block
)
155 if flags
& LONG_BLOCK
:
156 packed
= int.from_bytes(read_exactly(block
, 4), 'little')
161 assert flags
& ~
(SPLIT_MASK^
0xE0) == LONG_BLOCK^
0x1000
163 file = read_exactly(block
, FILE_STRUCT
.size
)
164 [unpacked
, os
, crc
, time
, version
, method
,
165 name
, attrib
] = FILE_STRUCT
.unpack(file)
167 assert version
== 20 and method
== b
'0'
168 name
= read_exactly(block
, name
)
170 tflags
= int.from_bytes(read_exactly(block
, 2), 'little')
172 mtime_flags
= tflags
>> MTIME
173 if mtime_flags
& TIME_VALID
:
174 mtime_frac
= read_exactly(block
, mtime_flags
& 3)
184 f
= (read_exactly(block
, 4 + (f
& 3)), f
)
188 assert not block
.read(1)
190 dict_size
= flags
& 0xE0
191 if dict_size
== 0xE0:
192 assert not packed
and not unpacked
and not crc
193 assert attrib
== 0x10
195 assert dict_size
== 0x20
196 assert attrib
& ~
0x200 == 0x20
198 name
= name
.decode('ascii').split('\\')
199 block
= (name
, unpacked
, crc
,
200 time
, mtime_flags
, mtime_frac
, exttime
)
202 self
.nextvol
= flags
& 1
203 assert flags
& ~
1 == OPTIONAL_BLOCK^
0xE
205 self
.part
= int.from_bytes(read_exactly(block
, 2), 'little')
208 end
= self
.vol
.tell() + packed
209 yield (type, flags
, packed
, block
)
212 def unpack_dostime(time
):
214 1980 + (time
>> 25), time
>> 21 & 15, time
>> 16 & 31,
215 time
>> 11 & 31, time
>> 5 & 63, (time
& 15) * 2,
217 return int(time
.timestamp())
219 def unpack_exttime_field(tflags
, fract
):
220 odd
= bool(tflags
& 4)
221 fract
= int.from_bytes(bytes(3 - len(fract
)) + fract
, 'little')
222 return (odd
, fract
* 100)
227 def __init__(self
, reader
):
232 result
= self
.reader
.read(n
)
233 self
.pos
+= len(result
)
238 def seek(self
, pos
, base
=SEEK_SET
):
240 return self
.reader
.seek(pos
, base
)
248 chunk
= min(pos
, 0x10000)
249 data
= self
.read(chunk
)
251 if len(data
) < chunk
:
256 assert base
== SEEK_END
and not pos
262 if __name__
== "__main__":