[ie/soundcloud] Various fixes (#11820)
[yt-dlp.git] / yt_dlp / extractor / goplay.py
blob32300f75c2f55d14a51ffdb457f51289d070fe89
1 import base64
2 import binascii
3 import datetime as dt
4 import hashlib
5 import hmac
6 import json
7 import os
8 import re
9 import urllib.parse
11 from .common import InfoExtractor
12 from ..utils import (
13 ExtractorError,
14 int_or_none,
15 js_to_json,
16 remove_end,
17 traverse_obj,
21 class GoPlayIE(InfoExtractor):
22 _VALID_URL = r'https?://(www\.)?goplay\.be/video/([^/?#]+/[^/?#]+/|)(?P<id>[^/#]+)'
24 _NETRC_MACHINE = 'goplay'
26 _TESTS = [{
27 'url': 'https://www.goplay.be/video/de-slimste-mens-ter-wereld/de-slimste-mens-ter-wereld-s22/de-slimste-mens-ter-wereld-s22-aflevering-1',
28 'info_dict': {
29 'id': '2baa4560-87a0-421b-bffc-359914e3c387',
30 'ext': 'mp4',
31 'title': 'S22 - Aflevering 1',
32 'description': r're:In aflevering 1 nemen Daan Alferink, Tess Elst en Xander De Rycke .{66}',
33 'series': 'De Slimste Mens ter Wereld',
34 'episode': 'Episode 1',
35 'season_number': 22,
36 'episode_number': 1,
37 'season': 'Season 22',
39 'params': {'skip_download': True},
40 'skip': 'This video is only available for registered users',
41 }, {
42 'url': 'https://www.goplay.be/video/1917',
43 'info_dict': {
44 'id': '40cac41d-8d29-4ef5-aa11-75047b9f0907',
45 'ext': 'mp4',
46 'title': '1917',
47 'description': r're:Op het hoogtepunt van de Eerste Wereldoorlog krijgen twee jonge .{94}',
49 'params': {'skip_download': True},
50 'skip': 'This video is only available for registered users',
51 }, {
52 'url': 'https://www.goplay.be/video/de-mol/de-mol-s11/de-mol-s11-aflevering-1#autoplay',
53 'info_dict': {
54 'id': 'ecb79672-92b9-4cd9-a0d7-e2f0250681ee',
55 'ext': 'mp4',
56 'title': 'S11 - Aflevering 1',
57 'description': r're:Tien kandidaten beginnen aan hun verovering van Amerika en ontmoeten .{102}',
58 'episode': 'Episode 1',
59 'series': 'De Mol',
60 'season_number': 11,
61 'episode_number': 1,
62 'season': 'Season 11',
64 'params': {'skip_download': True},
65 'skip': 'This video is only available for registered users',
68 _id_token = None
70 def _perform_login(self, username, password):
71 self.report_login()
72 aws = AwsIdp(ie=self, pool_id='eu-west-1_dViSsKM5Y', client_id='6s1h851s8uplco5h6mqh1jac8m')
73 self._id_token, _ = aws.authenticate(username=username, password=password)
75 def _real_initialize(self):
76 if not self._id_token:
77 raise self.raise_login_required(method='password')
79 def _find_json(self, s):
80 return self._search_json(
81 r'\w+\s*:\s*', s, 'next js data', None, contains_pattern=r'\[(?s:.+)\]', default=None)
83 def _real_extract(self, url):
84 display_id = self._match_id(url)
85 webpage = self._download_webpage(url, display_id)
87 nextjs_data = traverse_obj(
88 re.findall(r'<script[^>]*>\s*self\.__next_f\.push\(\s*(\[.+?\])\s*\);?\s*</script>', webpage),
89 (..., {js_to_json}, {json.loads}, ..., {self._find_json}, ...))
90 meta = traverse_obj(nextjs_data, (
91 ..., lambda _, v: v['meta']['path'] == urllib.parse.urlparse(url).path, 'meta', any))
93 video_id = meta['uuid']
94 info_dict = traverse_obj(meta, {
95 'title': ('title', {str}),
96 'description': ('description', {str.strip}),
99 if traverse_obj(meta, ('program', 'subtype')) != 'movie':
100 for season_data in traverse_obj(nextjs_data, (..., 'children', ..., 'playlists', ...)):
101 episode_data = traverse_obj(
102 season_data, ('videos', lambda _, v: v['videoId'] == video_id, any))
103 if not episode_data:
104 continue
106 episode_title = traverse_obj(
107 episode_data, 'contextualTitle', 'episodeTitle', expected_type=str)
108 info_dict.update({
109 'title': episode_title or info_dict.get('title'),
110 'series': remove_end(info_dict.get('title'), f' - {episode_title}'),
111 'season_number': traverse_obj(season_data, ('season', {int_or_none})),
112 'episode_number': traverse_obj(episode_data, ('episodeNumber', {int_or_none})),
114 break
116 api = self._download_json(
117 f'https://api.goplay.be/web/v1/videos/long-form/{video_id}',
118 video_id, headers={
119 'Authorization': f'Bearer {self._id_token}',
120 **self.geo_verification_headers(),
123 if 'manifestUrls' in api:
124 formats, subtitles = self._extract_m3u8_formats_and_subtitles(
125 api['manifestUrls']['hls'], video_id, ext='mp4', m3u8_id='HLS')
127 else:
128 if 'ssai' not in api:
129 raise ExtractorError('expecting Google SSAI stream')
131 ssai_content_source_id = api['ssai']['contentSourceID']
132 ssai_video_id = api['ssai']['videoID']
134 dai = self._download_json(
135 f'https://dai.google.com/ondemand/dash/content/{ssai_content_source_id}/vid/{ssai_video_id}/streams',
136 video_id, data=b'{"api-key":"null"}',
137 headers={'content-type': 'application/json'})
139 periods = self._extract_mpd_periods(dai['stream_manifest'], video_id)
141 # skip pre-roll and mid-roll ads
142 periods = [p for p in periods if '-ad-' not in p['id']]
144 formats, subtitles = self._merge_mpd_periods(periods)
146 info_dict.update({
147 'id': video_id,
148 'formats': formats,
149 'subtitles': subtitles,
151 return info_dict
154 # Taken from https://github.com/add-ons/plugin.video.viervijfzes/blob/master/resources/lib/viervijfzes/auth_awsidp.py
155 # Released into Public domain by https://github.com/michaelarnauts
157 class InvalidLoginException(ExtractorError):
158 """ The login credentials are invalid """
161 class AuthenticationException(ExtractorError):
162 """ Something went wrong while logging in """
165 class AwsIdp:
166 """ AWS Identity Provider """
168 def __init__(self, ie, pool_id, client_id):
170 :param InfoExtrator ie: The extractor that instantiated this class.
171 :param str pool_id: The AWS user pool to connect to (format: <region>_<poolid>).
172 E.g.: eu-west-1_aLkOfYN3T
173 :param str client_id: The client application ID (the ID of the application connecting)
176 self.ie = ie
178 self.pool_id = pool_id
179 if '_' not in self.pool_id:
180 raise ValueError('Invalid pool_id format. Should be <region>_<poolid>.')
182 self.client_id = client_id
183 self.region = self.pool_id.split('_')[0]
184 self.url = f'https://cognito-idp.{self.region}.amazonaws.com/'
186 # Initialize the values
187 # https://github.com/aws/amazon-cognito-identity-js/blob/master/src/AuthenticationHelper.js#L22
188 self.n_hex = (
189 'FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD1'
190 '29024E088A67CC74020BBEA63B139B22514A08798E3404DD'
191 'EF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245'
192 'E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7ED'
193 'EE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3D'
194 'C2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F'
195 '83655D23DCA3AD961C62F356208552BB9ED529077096966D'
196 '670C354E4ABC9804F1746C08CA18217C32905E462E36CE3B'
197 'E39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9'
198 'DE2BCBF6955817183995497CEA956AE515D2261898FA0510'
199 '15728E5A8AAAC42DAD33170D04507A33A85521ABDF1CBA64'
200 'ECFB850458DBEF0A8AEA71575D060C7DB3970F85A6E1E4C7'
201 'ABF5AE8CDB0933D71E8C94E04A25619DCEE3D2261AD2EE6B'
202 'F12FFA06D98A0864D87602733EC86A64521F2B18177B200C'
203 'BBE117577A615D6C770988C0BAD946E208E24FA074E5AB31'
204 '43DB5BFCE0FD108E4B82D120A93AD2CAFFFFFFFFFFFFFFFF')
206 # https://github.com/aws/amazon-cognito-identity-js/blob/master/src/AuthenticationHelper.js#L49
207 self.g_hex = '2'
208 self.info_bits = bytearray('Caldera Derived Key', 'utf-8')
210 self.big_n = self.__hex_to_long(self.n_hex)
211 self.g = self.__hex_to_long(self.g_hex)
212 self.k = self.__hex_to_long(self.__hex_hash('00' + self.n_hex + '0' + self.g_hex))
213 self.small_a_value = self.__generate_random_small_a()
214 self.large_a_value = self.__calculate_a()
216 def authenticate(self, username, password):
217 """ Authenticate with a username and password. """
218 # Step 1: First initiate an authentication request
219 auth_data_dict = self.__get_authentication_request(username)
220 auth_data = json.dumps(auth_data_dict).encode()
221 auth_headers = {
222 'X-Amz-Target': 'AWSCognitoIdentityProviderService.InitiateAuth',
223 'Accept-Encoding': 'identity',
224 'Content-Type': 'application/x-amz-json-1.1',
226 auth_response_json = self.ie._download_json(
227 self.url, None, data=auth_data, headers=auth_headers,
228 note='Authenticating username', errnote='Invalid username')
229 challenge_parameters = auth_response_json.get('ChallengeParameters')
231 if auth_response_json.get('ChallengeName') != 'PASSWORD_VERIFIER':
232 raise AuthenticationException(auth_response_json['message'])
234 # Step 2: Respond to the Challenge with a valid ChallengeResponse
235 challenge_request = self.__get_challenge_response_request(challenge_parameters, password)
236 challenge_data = json.dumps(challenge_request).encode()
237 challenge_headers = {
238 'X-Amz-Target': 'AWSCognitoIdentityProviderService.RespondToAuthChallenge',
239 'Content-Type': 'application/x-amz-json-1.1',
241 auth_response_json = self.ie._download_json(
242 self.url, None, data=challenge_data, headers=challenge_headers,
243 note='Authenticating password', errnote='Invalid password')
245 if 'message' in auth_response_json:
246 raise InvalidLoginException(auth_response_json['message'])
247 return (
248 auth_response_json['AuthenticationResult']['IdToken'],
249 auth_response_json['AuthenticationResult']['RefreshToken'],
252 def __get_authentication_request(self, username):
255 :param str username: The username to use
257 :return: A full Authorization request.
258 :rtype: dict
260 return {
261 'AuthParameters': {
262 'USERNAME': username,
263 'SRP_A': self.__long_to_hex(self.large_a_value),
265 'AuthFlow': 'USER_SRP_AUTH',
266 'ClientId': self.client_id,
269 def __get_challenge_response_request(self, challenge_parameters, password):
270 """ Create a Challenge Response Request object.
272 :param dict[str,str|imt] challenge_parameters: The parameters for the challenge.
273 :param str password: The password.
275 :return: A valid and full request data object to use as a response for a challenge.
276 :rtype: dict
278 user_id = challenge_parameters['USERNAME']
279 user_id_for_srp = challenge_parameters['USER_ID_FOR_SRP']
280 srp_b = challenge_parameters['SRP_B']
281 salt = challenge_parameters['SALT']
282 secret_block = challenge_parameters['SECRET_BLOCK']
284 timestamp = self.__get_current_timestamp()
286 # Get a HKDF key for the password, SrpB and the Salt
287 hkdf = self.__get_hkdf_key_for_password(
288 user_id_for_srp,
289 password,
290 self.__hex_to_long(srp_b),
291 salt,
293 secret_block_bytes = base64.standard_b64decode(secret_block)
295 # the message is a combo of the pool_id, provided SRP userId, the Secret and Timestamp
296 msg = \
297 bytearray(self.pool_id.split('_')[1], 'utf-8') + \
298 bytearray(user_id_for_srp, 'utf-8') + \
299 bytearray(secret_block_bytes) + \
300 bytearray(timestamp, 'utf-8')
301 hmac_obj = hmac.new(hkdf, msg, digestmod=hashlib.sha256)
302 signature_string = base64.standard_b64encode(hmac_obj.digest()).decode('utf-8')
303 return {
304 'ChallengeResponses': {
305 'USERNAME': user_id,
306 'TIMESTAMP': timestamp,
307 'PASSWORD_CLAIM_SECRET_BLOCK': secret_block,
308 'PASSWORD_CLAIM_SIGNATURE': signature_string,
310 'ChallengeName': 'PASSWORD_VERIFIER',
311 'ClientId': self.client_id,
314 def __get_hkdf_key_for_password(self, username, password, server_b_value, salt):
315 """ Calculates the final hkdf based on computed S value, and computed U value and the key.
317 :param str username: Username.
318 :param str password: Password.
319 :param int server_b_value: Server B value.
320 :param int salt: Generated salt.
322 :return Computed HKDF value.
323 :rtype: object
326 u_value = self.__calculate_u(self.large_a_value, server_b_value)
327 if u_value == 0:
328 raise ValueError('U cannot be zero.')
329 username_password = '{}{}:{}'.format(self.pool_id.split('_')[1], username, password)
330 username_password_hash = self.__hash_sha256(username_password.encode())
332 x_value = self.__hex_to_long(self.__hex_hash(self.__pad_hex(salt) + username_password_hash))
333 g_mod_pow_xn = pow(self.g, x_value, self.big_n)
334 int_value2 = server_b_value - self.k * g_mod_pow_xn
335 s_value = pow(int_value2, self.small_a_value + u_value * x_value, self.big_n)
336 return self.__compute_hkdf(
337 bytearray.fromhex(self.__pad_hex(s_value)),
338 bytearray.fromhex(self.__pad_hex(self.__long_to_hex(u_value))),
341 def __compute_hkdf(self, ikm, salt):
342 """ Standard hkdf algorithm
344 :param {Buffer} ikm Input key material.
345 :param {Buffer} salt Salt value.
346 :return {Buffer} Strong key material.
349 prk = hmac.new(salt, ikm, hashlib.sha256).digest()
350 info_bits_update = self.info_bits + bytearray(chr(1), 'utf-8')
351 hmac_hash = hmac.new(prk, info_bits_update, hashlib.sha256).digest()
352 return hmac_hash[:16]
354 def __calculate_u(self, big_a, big_b):
355 """ Calculate the client's value U which is the hash of A and B
357 :param int big_a: Large A value.
358 :param int big_b: Server B value.
360 :return Computed U value.
361 :rtype: int
364 u_hex_hash = self.__hex_hash(self.__pad_hex(big_a) + self.__pad_hex(big_b))
365 return self.__hex_to_long(u_hex_hash)
367 def __generate_random_small_a(self):
368 """ Helper function to generate a random big integer
370 :return a random value.
371 :rtype: int
373 random_long_int = self.__get_random(128)
374 return random_long_int % self.big_n
376 def __calculate_a(self):
377 """ Calculate the client's public value A = g^a%N with the generated random number a
379 :return Computed large A.
380 :rtype: int
383 big_a = pow(self.g, self.small_a_value, self.big_n)
384 # safety check
385 if (big_a % self.big_n) == 0:
386 raise ValueError('Safety check for A failed')
387 return big_a
389 @staticmethod
390 def __long_to_hex(long_num):
391 return f'{long_num:x}'
393 @staticmethod
394 def __hex_to_long(hex_string):
395 return int(hex_string, 16)
397 @staticmethod
398 def __hex_hash(hex_string):
399 return AwsIdp.__hash_sha256(bytearray.fromhex(hex_string))
401 @staticmethod
402 def __hash_sha256(buf):
403 """AuthenticationHelper.hash"""
404 digest = hashlib.sha256(buf).hexdigest()
405 return (64 - len(digest)) * '0' + digest
407 @staticmethod
408 def __pad_hex(long_int):
409 """ Converts a Long integer (or hex string) to hex format padded with zeroes for hashing
411 :param int|str long_int: Number or string to pad.
413 :return Padded hex string.
414 :rtype: str
417 if not isinstance(long_int, str):
418 hash_str = AwsIdp.__long_to_hex(long_int)
419 else:
420 hash_str = long_int
421 if len(hash_str) % 2 == 1:
422 hash_str = f'0{hash_str}'
423 elif hash_str[0] in '89ABCDEFabcdef':
424 hash_str = f'00{hash_str}'
425 return hash_str
427 @staticmethod
428 def __get_random(nbytes):
429 random_hex = binascii.hexlify(os.urandom(nbytes))
430 return AwsIdp.__hex_to_long(random_hex)
432 @staticmethod
433 def __get_current_timestamp():
434 """ Creates a timestamp with the correct English format.
436 :return: timestamp in format 'Sun Jan 27 19:00:04 UTC 2019'
437 :rtype: str
440 # We need US only data, so we cannot just do a strftime:
441 # Sun Jan 27 19:00:04 UTC 2019
442 months = [None, 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
443 days = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
445 time_now = dt.datetime.now(dt.timezone.utc)
446 format_string = f'{days[time_now.weekday()]} {months[time_now.month]} {time_now.day} %H:%M:%S UTC %Y'
447 return time_now.strftime(format_string)
449 def __str__(self):
450 return 'AWS IDP Client for:\nRegion: {}\nPoolId: {}\nAppId: {}'.format(
451 self.region, self.pool_id.split('_')[1], self.client_id,