request.py
6.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# -*- coding: utf-8 -*-
"""Implements a simple wrapper around urlopen."""
import logging
from functools import lru_cache
import re
from urllib import parse
from urllib.request import Request
from urllib.request import urlopen
from pytube.exceptions import RegexMatchError
from pytube.helpers import regex_search
logger = logging.getLogger(__name__)
default_chunk_size = 4096 # 4kb
default_range_size = 9437184 # 9MB
def _execute_request(url, method=None, headers=None):
base_headers = {"User-Agent": "Mozilla/5.0", "accept-language": "en-US,en"}
if headers:
base_headers.update(headers)
if url.lower().startswith("http"):
request = Request(url, headers=base_headers, method=method)
else:
raise ValueError("Invalid URL")
return urlopen(request) # nosec
def get(url, extra_headers=None):
"""Send an http GET request.
:param str url:
The URL to perform the GET request for.
:param dict extra_headers:
Extra headers to add to the request
:rtype: str
:returns:
UTF-8 encoded string of response
"""
if extra_headers is None:
extra_headers = {}
return _execute_request(url, headers=extra_headers).read().decode("utf-8")
def seq_stream(url, chunk_size=default_chunk_size, range_size=default_range_size):
"""Read the response in sequence.
:param str url: The URL to perform the GET request for.
:param int chunk_size: The size in bytes of each chunk. Defaults to 4KB
:param int range_size: The size in bytes of each range request. Defaults
to 9MB
:rtype: Iterable[bytes]
"""
# YouTube expects a request sequence number as part of the parameters.
split_url = parse.urlsplit(url)
base_url = '%s://%s/%s?' % (split_url.scheme, split_url.netloc, split_url.path)
querys = dict(parse.parse_qsl(split_url.query))
# The 0th sequential request provides the file headers, which tell us
# information about how the file is segmented.
querys['sq'] = 0
url = base_url + parse.urlencode(querys)
segment_data = b''
for chunk in stream(url):
yield chunk
segment_data += chunk
# We can then parse the header to find the number of segments
stream_info = segment_data.split(b'\r\n')
segment_count_pattern = re.compile(b'Segment-Count: (\\d+)')
for line in stream_info:
match = segment_count_pattern.search(line)
if match:
segment_count = int(match.group(1).decode('utf-8'))
# We request these segments sequentially to build the file.
seq_num = 1
while seq_num <= segment_count:
# Create sequential request URL
querys['sq'] = seq_num
url = base_url + parse.urlencode(querys)
yield from stream(url)
seq_num += 1
return # pylint: disable=R1711
def stream(url, chunk_size=default_chunk_size, range_size=default_range_size):
"""Read the response in chunks.
:param str url: The URL to perform the GET request for.
:param int chunk_size: The size in bytes of each chunk. Defaults to 4KB
:param int range_size: The size in bytes of each range request. Defaults
to 9MB
:rtype: Iterable[bytes]
"""
file_size: int = range_size # fake filesize to start
downloaded = 0
while downloaded < file_size:
stop_pos = min(downloaded + range_size, file_size) - 1
range_header = f"bytes={downloaded}-{stop_pos}"
response = _execute_request(
url, method="GET", headers={"Range": range_header}
)
if file_size == range_size:
try:
content_range = response.info()["Content-Range"]
file_size = int(content_range.split("/")[1])
except (KeyError, IndexError, ValueError) as e:
logger.error(e)
while True:
chunk = response.read(chunk_size)
if not chunk:
break
downloaded += len(chunk)
yield chunk
return # pylint: disable=R1711
@lru_cache()
def filesize(url):
"""Fetch size in bytes of file at given URL
:param str url: The URL to get the size of
:returns: int: size in bytes of remote file
"""
return int(head(url)["content-length"])
@lru_cache()
def seq_filesize(url):
"""Fetch size in bytes of file at given URL from sequential requests
:param str url: The URL to get the size of
:returns: int: size in bytes of remote file
"""
total_filesize = 0
# YouTube expects a request sequence number as part of the parameters.
split_url = parse.urlsplit(url)
base_url = '%s://%s/%s?' % (split_url.scheme, split_url.netloc, split_url.path)
querys = dict(parse.parse_qsl(split_url.query))
# The 0th sequential request provides the file headers, which tell us
# information about how the file is segmented.
querys['sq'] = 0
url = base_url + parse.urlencode(querys)
response = _execute_request(
url, method="GET"
)
response_value = response.read()
# The file header must be added to the total filesize
total_filesize += len(response_value)
# We can then parse the header to find the number of segments
segment_count = 0
stream_info = response_value.split(b'\r\n')
segment_regex = b'Segment-Count: (\\d+)'
for line in stream_info:
# One of the lines should contain the segment count, but we don't know
# which, so we need to iterate through the lines to find it
try:
segment_count = int(regex_search(segment_regex, line, 1))
except RegexMatchError:
pass
if segment_count == 0:
raise RegexMatchError('seq_filesize', segment_regex)
# We make HEAD requests to the segments sequentially to find the total filesize.
seq_num = 1
while seq_num <= segment_count:
# Create sequential request URL
querys['sq'] = seq_num
url = base_url + parse.urlencode(querys)
total_filesize += int(head(url)['content-length'])
seq_num += 1
return total_filesize
def head(url):
"""Fetch headers returned http GET request.
:param str url:
The URL to perform the GET request for.
:rtype: dict
:returns:
dictionary of lowercase headers
"""
response_headers = _execute_request(url, method="HEAD").info()
return {k.lower(): v for k, v in response_headers.items()}