1 from collections
.abc
import MutableMapping
15 ('>>', operator
.rshift
),
16 ('<<', operator
.lshift
),
20 ('/', operator
.truediv
),
23 _ASSIGN_OPERATORS
= [(op
+ '=', opfunc
) for op
, opfunc
in _OPERATORS
]
24 _ASSIGN_OPERATORS
.append(('=', (lambda cur
, right
: right
)))
26 _NAME_RE
= r
'[a-zA-Z_$][a-zA-Z_$0-9]*'
28 _MATCHING_PARENS
= dict(zip('({[', ')}]'))
31 class JS_Break(ExtractorError
):
33 ExtractorError
.__init
__(self
, 'Invalid break')
36 class JS_Continue(ExtractorError
):
38 ExtractorError
.__init
__(self
, 'Invalid continue')
41 class LocalNameSpace(MutableMapping
):
42 def __init__(self
, *stack
):
43 self
.stack
= tuple(stack
)
45 def __getitem__(self
, key
):
46 for scope
in self
.stack
:
51 def __setitem__(self
, key
, value
):
52 for scope
in self
.stack
:
57 self
.stack
[0][key
] = value
60 def __delitem__(self
, key
):
61 raise NotImplementedError('Deleting is not supported')
64 for scope
in self
.stack
:
67 def __len__(self
, key
):
68 return len(iter(self
))
71 return f
'LocalNameSpace{self.stack}'
74 class JSInterpreter(object):
75 def __init__(self
, code
, objects
=None):
80 self
._objects
= objects
81 self
.__named
_object
_counter
= 0
83 def _named_object(self
, namespace
, obj
):
84 self
.__named
_object
_counter
+= 1
85 name
= f
'__yt_dlp_jsinterp_obj{self.__named_object_counter}'
90 def _separate(expr
, delim
=',', max_split
=None):
93 counters
= {k
: 0 for k
in _MATCHING_PARENS
.values()}
94 start
, splits
, pos
, delim_len
= 0, 0, 0, len(delim
) - 1
95 for idx
, char
in enumerate(expr
):
96 if char
in _MATCHING_PARENS
:
97 counters
[_MATCHING_PARENS
[char
]] += 1
98 elif char
in counters
:
100 if char
!= delim
[pos
] or any(counters
.values()):
103 elif pos
!= delim_len
:
106 yield expr
[start
: idx
- delim_len
]
107 start
, pos
= idx
+ 1, 0
109 if max_split
and splits
>= max_split
:
114 def _separate_at_paren(expr
, delim
):
115 separated
= list(JSInterpreter
._separate
(expr
, delim
, 1))
116 if len(separated
) < 2:
117 raise ExtractorError(f
'No terminating paren {delim} in {expr}')
118 return separated
[0][1:].strip(), separated
[1].strip()
120 def interpret_statement(self
, stmt
, local_vars
, allow_recursion
=100):
121 if allow_recursion
< 0:
122 raise ExtractorError('Recursion limit reached')
124 sub_statements
= list(self
._separate
(stmt
, ';'))
125 stmt
= (sub_statements
or ['']).pop()
126 for sub_stmt
in sub_statements
:
127 ret
, should_abort
= self
.interpret_statement(sub_stmt
, local_vars
, allow_recursion
- 1)
133 stmt_m
= re
.match(r
'var\s', stmt
)
135 expr
= stmt
[len(stmt_m
.group(0)):]
137 return_m
= re
.match(r
'return(?:\s+|$)', stmt
)
139 expr
= stmt
[len(return_m
.group(0)):]
142 # Try interpreting it as an expression
145 v
= self
.interpret_expression(expr
, local_vars
, allow_recursion
)
146 return v
, should_abort
148 def interpret_expression(self
, expr
, local_vars
, allow_recursion
):
150 if expr
== '': # Empty expression
153 if expr
.startswith('{'):
154 inner
, outer
= self
._separate
_at
_paren
(expr
, '}')
155 inner
, should_abort
= self
.interpret_statement(inner
, local_vars
, allow_recursion
- 1)
156 if not outer
or should_abort
:
159 expr
= json
.dumps(inner
) + outer
161 if expr
.startswith('('):
162 inner
, outer
= self
._separate
_at
_paren
(expr
, ')')
163 inner
= self
.interpret_expression(inner
, local_vars
, allow_recursion
)
167 expr
= json
.dumps(inner
) + outer
169 if expr
.startswith('['):
170 inner
, outer
= self
._separate
_at
_paren
(expr
, ']')
171 name
= self
._named
_object
(local_vars
, [
172 self
.interpret_expression(item
, local_vars
, allow_recursion
)
173 for item
in self
._separate
(inner
)])
176 m
= re
.match(r
'try\s*', expr
)
178 if expr
[m
.end()] == '{':
179 try_expr
, expr
= self
._separate
_at
_paren
(expr
[m
.end():], '}')
181 try_expr
, expr
= expr
[m
.end() - 1:], ''
182 ret
, should_abort
= self
.interpret_statement(try_expr
, local_vars
, allow_recursion
- 1)
185 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
187 m
= re
.match(r
'catch\s*\(', expr
)
189 # We ignore the catch block
190 _
, expr
= self
._separate
_at
_paren
(expr
, '}')
191 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
193 m
= re
.match(r
'for\s*\(', expr
)
195 constructor
, remaining
= self
._separate
_at
_paren
(expr
[m
.end() - 1:], ')')
196 if remaining
.startswith('{'):
197 body
, expr
= self
._separate
_at
_paren
(remaining
, '}')
199 m
= re
.match(r
'switch\s*\(', remaining
) # FIXME
201 switch_val
, remaining
= self
._separate
_at
_paren
(remaining
[m
.end() - 1:], ')')
202 body
, expr
= self
._separate
_at
_paren
(remaining
, '}')
203 body
= 'switch(%s){%s}' % (switch_val
, body
)
205 body
, expr
= remaining
, ''
206 start
, cndn
, increment
= self
._separate
(constructor
, ';')
207 if self
.interpret_statement(start
, local_vars
, allow_recursion
- 1)[1]:
208 raise ExtractorError(
209 f
'Premature return in the initialization of a for loop in {constructor!r}')
211 if not self
.interpret_expression(cndn
, local_vars
, allow_recursion
):
214 ret
, should_abort
= self
.interpret_statement(body
, local_vars
, allow_recursion
- 1)
221 if self
.interpret_statement(increment
, local_vars
, allow_recursion
- 1)[1]:
222 raise ExtractorError(
223 f
'Premature return in the initialization of a for loop in {constructor!r}')
224 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
226 m
= re
.match(r
'switch\s*\(', expr
)
228 switch_val
, remaining
= self
._separate
_at
_paren
(expr
[m
.end() - 1:], ')')
229 switch_val
= self
.interpret_expression(switch_val
, local_vars
, allow_recursion
)
230 body
, expr
= self
._separate
_at
_paren
(remaining
, '}')
231 items
= body
.replace('default:', 'case default:').split('case ')[1:]
232 for default
in (False, True):
235 case
, stmt
= [i
.strip() for i
in self
._separate
(item
, ':', 1)]
237 matched
= matched
or case
== 'default'
239 matched
= case
!= 'default' and switch_val
== self
.interpret_expression(case
, local_vars
, allow_recursion
)
243 ret
, should_abort
= self
.interpret_statement(stmt
, local_vars
, allow_recursion
- 1)
250 return self
.interpret_statement(expr
, local_vars
, allow_recursion
- 1)[0]
252 # Comma separated statements
253 sub_expressions
= list(self
._separate
(expr
))
254 expr
= sub_expressions
.pop().strip() if sub_expressions
else ''
255 for sub_expr
in sub_expressions
:
256 self
.interpret_expression(sub_expr
, local_vars
, allow_recursion
)
258 for m
in re
.finditer(rf
'''(?x)
259 (?P<pre_sign>\+\+|--)(?P<var1>{_NAME_RE})|
260 (?P<var2>{_NAME_RE})(?P<post_sign>\+\+|--)''', expr
):
261 var
= m
.group('var1') or m
.group('var2')
262 start
, end
= m
.span()
263 sign
= m
.group('pre_sign') or m
.group('post_sign')
264 ret
= local_vars
[var
]
265 local_vars
[var
] += 1 if sign
[0] == '+' else -1
266 if m
.group('pre_sign'):
267 ret
= local_vars
[var
]
268 expr
= expr
[:start
] + json
.dumps(ret
) + expr
[end
:]
270 for op
, opfunc
in _ASSIGN_OPERATORS
:
271 m
= re
.match(r
'''(?x)
272 (?P<out>%s)(?:\[(?P<index>[^\]]+?)\])?
274 (?P<expr>.*)$''' % (_NAME_RE
, re
.escape(op
)), expr
)
277 right_val
= self
.interpret_expression(m
.group('expr'), local_vars
, allow_recursion
)
279 if m
.groupdict().get('index'):
280 lvar
= local_vars
[m
.group('out')]
281 idx
= self
.interpret_expression(m
.group('index'), local_vars
, allow_recursion
)
282 if not isinstance(idx
, int):
283 raise ExtractorError(f
'List indices must be integers: {idx}')
285 val
= opfunc(cur
, right_val
)
289 cur
= local_vars
.get(m
.group('out'))
290 val
= opfunc(cur
, right_val
)
291 local_vars
[m
.group('out')] = val
299 elif expr
== 'continue':
303 r
'(?!if|return|true|false|null)(?P<name>%s)$' % _NAME_RE
,
306 return local_vars
[var_m
.group('name')]
309 return json
.loads(expr
)
314 r
'(?P<in>%s)\[(?P<idx>.+)\]$' % _NAME_RE
, expr
)
316 val
= local_vars
[m
.group('in')]
317 idx
= self
.interpret_expression(m
.group('idx'), local_vars
, allow_recursion
)
320 for op
, opfunc
in _OPERATORS
:
321 separated
= list(self
._separate
(expr
, op
))
322 if len(separated
) < 2:
324 right_val
= separated
.pop()
325 left_val
= op
.join(separated
)
326 left_val
, should_abort
= self
.interpret_statement(
327 left_val
, local_vars
, allow_recursion
- 1)
329 raise ExtractorError(f
'Premature left-side return of {op} in {expr!r}')
330 right_val
, should_abort
= self
.interpret_statement(
331 right_val
, local_vars
, allow_recursion
- 1)
333 raise ExtractorError(f
'Premature right-side return of {op} in {expr!r}')
334 return opfunc(left_val
or 0, right_val
)
337 r
'(?P<var>%s)(?:\.(?P<member>[^(]+)|\[(?P<member2>[^]]+)\])\s*' % _NAME_RE
,
340 variable
= m
.group('var')
341 member
= remove_quotes(m
.group('member') or m
.group('member2'))
342 arg_str
= expr
[m
.end():]
343 if arg_str
.startswith('('):
344 arg_str
, remaining
= self
._separate
_at
_paren
(arg_str
, ')')
346 arg_str
, remaining
= None, arg_str
348 def assertion(cndn
, msg
):
349 """ assert, but without risk of getting optimized out """
351 raise ExtractorError(f
'{member} {msg}: {expr}')
355 if variable
== 'String':
357 elif variable
in local_vars
:
358 obj
= local_vars
[variable
]
360 if variable
not in self
._objects
:
361 self
._objects
[variable
] = self
.extract_object(variable
)
362 obj
= self
._objects
[variable
]
366 if member
== 'length':
372 self
.interpret_expression(v
, local_vars
, allow_recursion
)
373 for v
in self
._separate
(arg_str
)]
376 if member
== 'fromCharCode':
377 assertion(argvals
, 'takes one or more arguments')
378 return ''.join(map(chr, argvals
))
379 raise ExtractorError(f
'Unsupported string method {member}')
381 if member
== 'split':
382 assertion(argvals
, 'takes one or more arguments')
383 assertion(argvals
== [''], 'with arguments is not implemented')
385 elif member
== 'join':
386 assertion(isinstance(obj
, list), 'must be applied on a list')
387 assertion(len(argvals
) == 1, 'takes exactly one argument')
388 return argvals
[0].join(obj
)
389 elif member
== 'reverse':
390 assertion(not argvals
, 'does not take any arguments')
393 elif member
== 'slice':
394 assertion(isinstance(obj
, list), 'must be applied on a list')
395 assertion(len(argvals
) == 1, 'takes exactly one argument')
396 return obj
[argvals
[0]:]
397 elif member
== 'splice':
398 assertion(isinstance(obj
, list), 'must be applied on a list')
399 assertion(argvals
, 'takes one or more arguments')
400 index
, howMany
= map(int, (argvals
+ [len(obj
)])[:2])
403 add_items
= argvals
[2:]
405 for i
in range(index
, min(index
+ howMany
, len(obj
))):
406 res
.append(obj
.pop(index
))
407 for i
, item
in enumerate(add_items
):
408 obj
.insert(index
+ i
, item
)
410 elif member
== 'unshift':
411 assertion(isinstance(obj
, list), 'must be applied on a list')
412 assertion(argvals
, 'takes one or more arguments')
413 for item
in reversed(argvals
):
416 elif member
== 'pop':
417 assertion(isinstance(obj
, list), 'must be applied on a list')
418 assertion(not argvals
, 'does not take any arguments')
422 elif member
== 'push':
423 assertion(argvals
, 'takes one or more arguments')
426 elif member
== 'forEach':
427 assertion(argvals
, 'takes one or more arguments')
428 assertion(len(argvals
) <= 2, 'takes at-most 2 arguments')
429 f
, this
= (argvals
+ [''])[:2]
430 return [f((item
, idx
, obj
), this
=this
) for idx
, item
in enumerate(obj
)]
431 elif member
== 'indexOf':
432 assertion(argvals
, 'takes one or more arguments')
433 assertion(len(argvals
) <= 2, 'takes at-most 2 arguments')
434 idx
, start
= (argvals
+ [0])[:2]
436 return obj
.index(idx
, start
)
440 if isinstance(obj
, list):
442 return obj
[member
](argvals
)
445 return self
.interpret_expression(
446 self
._named
_object
(local_vars
, eval_method()) + remaining
,
447 local_vars
, allow_recursion
)
451 m
= re
.match(r
'^(?P<func>%s)\((?P<args>[a-zA-Z0-9_$,]*)\)$' % _NAME_RE
, expr
)
453 fname
= m
.group('func')
455 int(v
) if v
.isdigit() else local_vars
[v
]
456 for v
in self
._separate
(m
.group('args'))])
457 if fname
in local_vars
:
458 return local_vars
[fname
](argvals
)
459 elif fname
not in self
._functions
:
460 self
._functions
[fname
] = self
.extract_function(fname
)
461 return self
._functions
[fname
](argvals
)
464 raise ExtractorError('Unsupported JS expression %r' % expr
)
466 def extract_object(self
, objname
):
467 _FUNC_NAME_RE
= r
'''(?:[a-zA-Z$0-9]+|"[a-zA-Z$0-9]+"|'[a-zA-Z$0-9]+')'''
471 (?<!this\.)%s\s*=\s*{\s*
472 (?P<fields>(%s\s*:\s*function\s*\(.*?\)\s*{.*?}(?:,\s*)?)*)
474 ''' % (re
.escape(objname
), _FUNC_NAME_RE
),
476 fields
= obj_m
.group('fields')
477 # Currently, it only supports function definitions
478 fields_m
= re
.finditer(
480 (?P<key>%s)\s*:\s*function\s*\((?P<args>[a-z,]+)\){(?P<code>[^}]+)}
484 argnames
= f
.group('args').split(',')
485 obj
[remove_quotes(f
.group('key'))] = self
.build_function(argnames
, f
.group('code'))
489 def extract_function_code(self
, funcname
):
490 """ @returns argnames, code """
493 (?:function\s+%s|[{;,]\s*%s\s*=\s*function|var\s+%s\s*=\s*function)\s*
494 \((?P<args>[^)]*)\)\s*
495 (?P<code>\{(?:(?!};)[^"]|"([^"]|\\")*")+\})''' % (
496 re
.escape(funcname
), re
.escape(funcname
), re
.escape(funcname
)),
498 code
, _
= self
._separate
_at
_paren
(func_m
.group('code'), '}') # refine the match
500 raise ExtractorError('Could not find JS function %r' % funcname
)
501 return func_m
.group('args').split(','), code
503 def extract_function(self
, funcname
):
504 return self
.extract_function_from_code(*self
.extract_function_code(funcname
))
506 def extract_function_from_code(self
, argnames
, code
, *global_stack
):
509 mobj
= re
.search(r
'function\((?P<args>[^)]*)\)\s*{', code
)
512 start
, body_start
= mobj
.span()
513 body
, remaining
= self
._separate
_at
_paren
(code
[body_start
- 1:], '}')
514 name
= self
._named
_object
(
516 self
.extract_function_from_code(
517 [str.strip(x
) for x
in mobj
.group('args').split(',')],
518 body
, local_vars
, *global_stack
))
519 code
= code
[:start
] + name
+ remaining
520 return self
.build_function(argnames
, code
, local_vars
, *global_stack
)
522 def call_function(self
, funcname
, *args
):
523 return self
.extract_function(funcname
)(args
)
525 def build_function(self
, argnames
, code
, *global_stack
):
526 global_stack
= list(global_stack
) or [{}]
527 local_vars
= global_stack
.pop(0)
529 def resf(args
, **kwargs
):
531 **dict(zip(argnames
, args
)),
534 var_stack
= LocalNameSpace(local_vars
, *global_stack
)
535 for stmt
in self
._separate
(code
.replace('\n', ''), ';'):
536 ret
, should_abort
= self
.interpret_statement(stmt
, var_stack
)