4 # Christophe DUMEZ (chris@qbittorrent.org)
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions are met:
9 # * Redistributions of source code must retain the above copyright notice,
10 # this list of conditions and the following disclaimer.
11 # * Redistributions in binary form must reproduce the above copyright
12 # notice, this list of conditions and the following disclaimer in the
13 # documentation and/or other materials provided with the distribution.
14 # * Neither the name of the author nor the names of its contributors may be
15 # used to endorse or promote products derived from this software without
16 # specific prior written permission.
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 # POSSIBILITY OF SUCH DAMAGE.
43 from collections
.abc
import Mapping
44 from typing
import Any
, Optional
47 def getBrowserUserAgent() -> str:
48 """ Disguise as browser to circumvent website blocking """
50 # Firefox release calendar
51 # https://whattrainisitnow.com/calendar/
52 # https://wiki.mozilla.org/index.php?title=Release_Management/Calendar&redirect=no
54 baseDate
= datetime
.date(2024, 4, 16)
57 nowDate
= datetime
.date
.today()
58 nowVersion
= baseVersion
+ ((nowDate
- baseDate
).days
// 30)
60 return f
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:{nowVersion}.0) Gecko/20100101 Firefox/{nowVersion}.0"
63 headers
: dict[str, Any
] = {'User-Agent': getBrowserUserAgent()}
65 # SOCKS5 Proxy support
66 if "sock_proxy" in os
.environ
and len(os
.environ
["sock_proxy"].strip()) > 0:
67 proxy_str
= os
.environ
["sock_proxy"].strip()
68 m
= re
.match(r
"^(?:(?P<username>[^:]+):(?P<password>[^@]+)@)?(?P<host>[^:]+):(?P<port>\w+)$",
71 socks
.setdefaultproxy(socks
.PROXY_TYPE_SOCKS5
, m
.group('host'),
72 int(m
.group('port')), True, m
.group('username'), m
.group('password'))
73 socket
.socket
= socks
.socksocket
# type: ignore[misc]
76 # This is only provided for backward compatibility, new code should not use it
77 htmlentitydecode
= html
.unescape
80 def retrieve_url(url
: str, custom_headers
: Mapping
[str, Any
] = {}, request_data
: Optional
[Any
] = None, ssl_context
: Optional
[ssl
.SSLContext
] = None, unescape_html_entities
: bool = True) -> str:
81 """ Return the content of the url page as a string """
83 request
= urllib
.request
.Request(url
, request_data
, {**headers
, **custom_headers
})
85 response
= urllib
.request
.urlopen(request
, context
=ssl_context
)
86 except urllib
.error
.URLError
as errno
:
87 print(f
"Connection error: {errno.reason}", file=sys
.stderr
)
89 data
: bytes
= response
.read()
91 # Check if it is gzipped
92 if data
[:2] == b
'\x1f\x8b':
93 # Data is gzip encoded, decode it
94 with io
.BytesIO(data
) as compressedStream
, gzip
.GzipFile(fileobj
=compressedStream
) as gzipper
:
99 charset
= response
.getheader('Content-Type', '').split('charset=', 1)[1]
103 dataStr
= data
.decode(charset
, 'replace')
105 if unescape_html_entities
:
106 dataStr
= html
.unescape(dataStr
)
111 def download_file(url
: str, referer
: Optional
[str] = None, ssl_context
: Optional
[ssl
.SSLContext
] = None) -> str:
112 """ Download file at url and write it to a file, return the path to the file and the url """
115 request
= urllib
.request
.Request(url
, headers
=headers
)
116 if referer
is not None:
117 request
.add_header('referer', referer
)
118 response
= urllib
.request
.urlopen(request
, context
=ssl_context
)
119 data
= response
.read()
121 # Check if it is gzipped
122 if data
[:2] == b
'\x1f\x8b':
123 # Data is gzip encoded, decode it
124 with io
.BytesIO(data
) as compressedStream
, gzip
.GzipFile(fileobj
=compressedStream
) as gzipper
:
125 data
= gzipper
.read()
128 fileHandle
, path
= tempfile
.mkstemp()
129 with os
.fdopen(fileHandle
, "wb") as file:
133 return f
"{path} {url}"