[ie/youtube] Add age-gate workaround for some embeddable videos (#11821)
[yt-dlp.git] / yt_dlp / extractor / twitcasting.py
blobbf9c6348cbff0fc555cea67feb858e6384e42a07
1 import base64
2 import itertools
3 import re
5 from .common import InfoExtractor
6 from ..dependencies import websockets
7 from ..utils import (
8 ExtractorError,
9 UserNotLive,
10 clean_html,
11 float_or_none,
12 get_element_by_class,
13 get_element_by_id,
14 parse_duration,
15 qualities,
16 str_to_int,
17 traverse_obj,
18 try_get,
19 unified_timestamp,
20 urlencode_postdata,
21 urljoin,
25 class TwitCastingIE(InfoExtractor):
26 _VALID_URL = r'https?://(?:[^/?#]+\.)?twitcasting\.tv/(?P<uploader_id>[^/?#]+)/(?:movie|twplayer)/(?P<id>\d+)'
27 _M3U8_HEADERS = {
28 'Origin': 'https://twitcasting.tv',
29 'Referer': 'https://twitcasting.tv/',
31 _TESTS = [{
32 'url': 'https://twitcasting.tv/ivetesangalo/movie/2357609',
33 'md5': '745243cad58c4681dc752490f7540d7f',
34 'info_dict': {
35 'id': '2357609',
36 'ext': 'mp4',
37 'title': 'Live #2357609',
38 'uploader_id': 'ivetesangalo',
39 'description': 'Twitter Oficial da cantora brasileira Ivete Sangalo.',
40 'thumbnail': r're:^https?://.*\.jpg$',
41 'upload_date': '20110822',
42 'timestamp': 1313978424,
43 'duration': 32,
44 'view_count': int,
46 'params': {
47 'skip_download': True,
49 }, {
50 'url': 'https://twitcasting.tv/mttbernardini/movie/3689740',
51 'info_dict': {
52 'id': '3689740',
53 'ext': 'mp4',
54 'title': 'Live playing something #3689740',
55 'uploader_id': 'mttbernardini',
56 'description': 'md5:1dc7efa2f1ab932fcd119265cebeec69',
57 'thumbnail': r're:^https?://.*\.jpg$',
58 'upload_date': '20120211',
59 'timestamp': 1328995624,
60 'duration': 681,
61 'view_count': int,
63 'params': {
64 'skip_download': True,
65 'videopassword': 'abc',
67 }, {
68 'url': 'https://twitcasting.tv/loft_heaven/movie/685979292',
69 'info_dict': {
70 'id': '685979292',
71 'ext': 'mp4',
72 'title': '【無料配信】南波一海のhear/here “ナタリー望月哲さんに聞く編集と「渋谷系狂騒曲」”',
73 'uploader_id': 'loft_heaven',
74 'description': 'md5:3a0c7b53019df987ce545c935538bacf',
75 'upload_date': '20210604',
76 'timestamp': 1622802114,
77 'thumbnail': r're:^https?://.*\.jpg$',
78 'duration': 6964,
79 'view_count': int,
81 'params': {
82 'skip_download': True,
86 def _parse_data_movie_playlist(self, dmp, video_id):
87 # attempt 1: parse as JSON directly
88 try:
89 return self._parse_json(dmp, video_id)
90 except ExtractorError:
91 pass
92 # attempt 2: decode reversed base64
93 decoded = base64.b64decode(dmp[::-1])
94 return self._parse_json(decoded, video_id)
96 def _real_extract(self, url):
97 uploader_id, video_id = self._match_valid_url(url).groups()
99 webpage, urlh = self._download_webpage_handle(url, video_id)
100 video_password = self.get_param('videopassword')
101 request_data = None
102 if video_password:
103 request_data = urlencode_postdata({
104 'password': video_password,
105 **self._hidden_inputs(webpage),
106 }, encoding='utf-8')
107 webpage, urlh = self._download_webpage_handle(
108 url, video_id, data=request_data,
109 headers={'Origin': 'https://twitcasting.tv'},
110 note='Trying video password')
111 if urlh.url != url and request_data:
112 webpage = self._download_webpage(
113 urlh.url, video_id, data=request_data,
114 headers={'Origin': 'https://twitcasting.tv'},
115 note='Retrying authentication')
116 # has to check here as the first request can contain password input form even if the password is correct
117 if re.search(r'<form\s+method="POST">\s*<input\s+[^>]+?name="password"', webpage):
118 raise ExtractorError('This video is protected by a password, use the --video-password option', expected=True)
120 title = (clean_html(get_element_by_id('movietitle', webpage))
121 or self._html_search_meta(['og:title', 'twitter:title'], webpage, fatal=True))
123 video_js_data = try_get(
124 webpage,
125 lambda x: self._parse_data_movie_playlist(self._search_regex(
126 r'data-movie-playlist=\'([^\']+?)\'',
127 x, 'movie playlist', default=None), video_id)['2'], list)
129 thumbnail = traverse_obj(video_js_data, (0, 'thumbnailUrl')) or self._og_search_thumbnail(webpage)
130 description = clean_html(get_element_by_id(
131 'authorcomment', webpage)) or self._html_search_meta(
132 ['description', 'og:description', 'twitter:description'], webpage)
133 duration = (try_get(video_js_data, lambda x: sum(float_or_none(y.get('duration')) for y in x) / 1000)
134 or parse_duration(clean_html(get_element_by_class('tw-player-duration-time', webpage))))
135 view_count = str_to_int(self._search_regex(
136 (r'Total\s*:\s*Views\s*([\d,]+)', r'総視聴者\s*:\s*([\d,]+)\s*</'), webpage, 'views', None))
137 timestamp = unified_timestamp(self._search_regex(
138 r'data-toggle="true"[^>]+datetime="([^"]+)"',
139 webpage, 'datetime', None))
141 stream_server_data = self._download_json(
142 f'https://twitcasting.tv/streamserver.php?target={uploader_id}&mode=client', video_id,
143 'Downloading live info', fatal=False)
145 is_live = any(f'data-{x}' in webpage for x in ['is-onlive="true"', 'live-type="live"', 'status="online"'])
146 if not traverse_obj(stream_server_data, 'llfmp4') and is_live:
147 self.raise_login_required(method='cookies')
149 base_dict = {
150 'title': title,
151 'description': description,
152 'thumbnail': thumbnail,
153 'timestamp': timestamp,
154 'uploader_id': uploader_id,
155 'duration': duration,
156 'view_count': view_count,
157 'is_live': is_live,
160 def find_dmu(x):
161 data_movie_url = self._search_regex(
162 r'data-movie-url=(["\'])(?P<url>(?:(?!\1).)+)\1',
163 x, 'm3u8 url', group='url', default=None)
164 if data_movie_url:
165 return [data_movie_url]
167 m3u8_urls = (try_get(webpage, find_dmu, list)
168 or traverse_obj(video_js_data, (..., 'source', 'url'))
169 or ([f'https://twitcasting.tv/{uploader_id}/metastream.m3u8'] if is_live else None))
170 if not m3u8_urls:
171 raise ExtractorError('Failed to get m3u8 playlist')
173 if is_live:
174 m3u8_url = m3u8_urls[0]
175 formats = self._extract_m3u8_formats(
176 m3u8_url, video_id, ext='mp4', m3u8_id='hls',
177 live=True, headers=self._M3U8_HEADERS)
179 if traverse_obj(stream_server_data, ('hls', 'source')):
180 formats.extend(self._extract_m3u8_formats(
181 m3u8_url, video_id, ext='mp4', m3u8_id='source',
182 live=True, query={'mode': 'source'},
183 note='Downloading source quality m3u8',
184 headers=self._M3U8_HEADERS, fatal=False))
186 if websockets:
187 qq = qualities(['base', 'mobilesource', 'main'])
188 streams = traverse_obj(stream_server_data, ('llfmp4', 'streams')) or {}
189 for mode, ws_url in streams.items():
190 formats.append({
191 'url': ws_url,
192 'format_id': f'ws-{mode}',
193 'ext': 'mp4',
194 'quality': qq(mode),
195 'source_preference': -10,
196 # TwitCasting simply sends moof atom directly over WS
197 'protocol': 'websocket_frag',
200 infodict = {
201 'formats': formats,
202 '_format_sort_fields': ('source', ),
204 elif len(m3u8_urls) == 1:
205 formats = self._extract_m3u8_formats(
206 m3u8_urls[0], video_id, 'mp4', headers=self._M3U8_HEADERS)
207 infodict = {
208 # No problem here since there's only one manifest
209 'formats': formats,
210 'http_headers': self._M3U8_HEADERS,
212 else:
213 infodict = {
214 '_type': 'multi_video',
215 'entries': [{
216 'id': f'{video_id}-{num}',
217 'url': m3u8_url,
218 'ext': 'mp4',
219 # Requesting the manifests here will cause download to fail.
220 # So use ffmpeg instead. See: https://github.com/yt-dlp/yt-dlp/issues/382
221 'protocol': 'm3u8',
222 'http_headers': self._M3U8_HEADERS,
223 **base_dict,
224 } for (num, m3u8_url) in enumerate(m3u8_urls)],
227 return {
228 'id': video_id,
229 **base_dict,
230 **infodict,
234 class TwitCastingLiveIE(InfoExtractor):
235 _VALID_URL = r'https?://(?:[^/?#]+\.)?twitcasting\.tv/(?P<id>[^/?#]+)/?(?:[#?]|$)'
236 _TESTS = [{
237 'url': 'https://twitcasting.tv/ivetesangalo',
238 'only_matching': True,
239 }, {
240 'url': 'https://twitcasting.tv/c:unusedlive',
241 'expected_exception': 'UserNotLive',
244 def _real_extract(self, url):
245 uploader_id = self._match_id(url)
246 self.to_screen(
247 f'Downloading live video of user {uploader_id}. '
248 f'Pass "https://twitcasting.tv/{uploader_id}/show" to download the history')
250 is_live = traverse_obj(self._download_json(
251 f'https://frontendapi.twitcasting.tv/watch/user/{uploader_id}',
252 uploader_id, 'Checking live status', data=b'', fatal=False), ('is_live', {bool}))
253 if is_live is False: # only raise here if API response was as expected
254 raise UserNotLive(video_id=uploader_id)
256 # Use /show/ page so that password-protected and members-only livestreams can be found
257 webpage = self._download_webpage(
258 f'https://twitcasting.tv/{uploader_id}/show/', uploader_id, 'Downloading live history')
259 is_live = is_live or self._search_regex(
260 r'(?s)(<span\s*class="tw-movie-thumbnail2-badge"\s*data-status="live">\s*LIVE)',
261 webpage, 'is live?', default=False)
262 # Current live is always the first match
263 current_live = self._search_regex(
264 r'(?s)<a\s+class="tw-movie-thumbnail2"\s+href="/[^/"]+/movie/(?P<video_id>\d+)"',
265 webpage, 'current live ID', default=None, group='video_id')
266 if not is_live or not current_live:
267 raise UserNotLive(video_id=uploader_id)
269 return self.url_result(f'https://twitcasting.tv/{uploader_id}/movie/{current_live}', TwitCastingIE)
272 class TwitCastingUserIE(InfoExtractor):
273 _VALID_URL = r'https?://(?:[^/?#]+\.)?twitcasting\.tv/(?P<id>[^/?#]+)/(?:show|archive)/?(?:[#?]|$)'
274 _TESTS = [{
275 'url': 'https://twitcasting.tv/natsuiromatsuri/archive/',
276 'info_dict': {
277 'id': 'natsuiromatsuri',
278 'title': 'natsuiromatsuri - Live History',
280 'playlist_mincount': 235,
281 }, {
282 'url': 'https://twitcasting.tv/noriyukicas/show',
283 'only_matching': True,
286 def _entries(self, uploader_id):
287 base_url = next_url = f'https://twitcasting.tv/{uploader_id}/show'
288 for page_num in itertools.count(1):
289 webpage = self._download_webpage(
290 next_url, uploader_id, query={'filter': 'watchable'}, note=f'Downloading page {page_num}')
291 matches = re.finditer(
292 r'(?s)<a\s+class="tw-movie-thumbnail2"\s+href="(?P<url>/[^/"]+/movie/\d+)"', webpage)
293 for mobj in matches:
294 yield self.url_result(urljoin(base_url, mobj.group('url')))
296 next_url = self._search_regex(
297 r'<a href="(/%s/show/%d-\d+)[?"]' % (re.escape(uploader_id), page_num),
298 webpage, 'next url', default=None)
299 next_url = urljoin(base_url, next_url)
300 if not next_url:
301 return
303 def _real_extract(self, url):
304 uploader_id = self._match_id(url)
305 return self.playlist_result(
306 self._entries(uploader_id), uploader_id, f'{uploader_id} - Live History')