cipher.py
9.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# -*- coding: utf-8 -*-
"""
This module contains all logic necessary to decipher the signature.
YouTube's strategy to restrict downloading videos is to send a ciphered version
of the signature to the client, along with the decryption algorithm obfuscated
in JavaScript. For the clients to play the videos, JavaScript must take the
ciphered version, cycle it through a series of "transform functions," and then
signs the media URL with the output.
This module is responsible for (1) finding and extracting those "transform
functions" (2) maps them to Python equivalents and (3) taking the ciphered
signature and decoding it.
"""
import logging
import re
from itertools import chain
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from pytube.exceptions import RegexMatchError
from pytube.helpers import cache
from pytube.helpers import regex_search
logger = logging.getLogger(__name__)
class Cipher:
def __init__(self, js: str):
self.transform_plan: List[str] = get_transform_plan(js)
var, _ = self.transform_plan[0].split(".")
self.transform_map = get_transform_map(js, var)
self.js_func_regex = re.compile(r"\w+\.(\w+)\(\w,(\d+)\)")
def get_signature(self, ciphered_signature: str) -> str:
"""Decipher the signature.
Taking the ciphered signature, applies the transform functions.
:param str ciphered_signature:
The ciphered signature sent in the ``player_config``.
:rtype: str
:returns:
Decrypted signature required to download the media content.
"""
signature = list(ciphered_signature)
for js_func in self.transform_plan:
name, argument = self.parse_function(js_func) # type: ignore
signature = self.transform_map[name](signature, argument)
logger.debug(
"applied transform function\n"
"output: %s\n"
"js_function: %s\n"
"argument: %d\n"
"function: %s",
"".join(signature),
name,
argument,
self.transform_map[name],
)
return "".join(signature)
@cache
def parse_function(self, js_func: str) -> Tuple[str, int]:
"""Parse the Javascript transform function.
Break a JavaScript transform function down into a two element ``tuple``
containing the function name and some integer-based argument.
:param str js_func:
The JavaScript version of the transform function.
:rtype: tuple
:returns:
two element tuple containing the function name and an argument.
**Example**:
parse_function('DE.AJ(a,15)')
('AJ', 15)
"""
logger.debug("parsing transform function")
parse_match = self.js_func_regex.search(js_func)
if not parse_match:
raise RegexMatchError(
caller="parse_function", pattern="js_func_regex"
)
fn_name, fn_arg = parse_match.groups()
return fn_name, int(fn_arg)
def get_initial_function_name(js: str) -> str:
"""Extract the name of the function responsible for computing the signature.
:param str js:
The contents of the base.js asset file.
:rtype: str
:returns:
Function name from regex match
"""
function_patterns = [
r"\b[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*encodeURIComponent\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\(", # noqa: E501
r"\b[a-zA-Z0-9]+\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*encodeURIComponent\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\(", # noqa: E501
r'(?:\b|[^a-zA-Z0-9$])(?P<sig>[a-zA-Z0-9$]{2})\s*=\s*function\(\s*a\s*\)\s*{\s*a\s*=\s*a\.split\(\s*""\s*\)', # noqa: E501
r'(?P<sig>[a-zA-Z0-9$]+)\s*=\s*function\(\s*a\s*\)\s*{\s*a\s*=\s*a\.split\(\s*""\s*\)', # noqa: E501
r'(["\'])signature\1\s*,\s*(?P<sig>[a-zA-Z0-9$]+)\(',
r"\.sig\|\|(?P<sig>[a-zA-Z0-9$]+)\(",
r"yt\.akamaized\.net/\)\s*\|\|\s*.*?\s*[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*(?:encodeURIComponent\s*\()?\s*(?P<sig>[a-zA-Z0-9$]+)\(", # noqa: E501
r"\b[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*(?P<sig>[a-zA-Z0-9$]+)\(", # noqa: E501
r"\b[a-zA-Z0-9]+\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*(?P<sig>[a-zA-Z0-9$]+)\(", # noqa: E501
r"\bc\s*&&\s*a\.set\([^,]+\s*,\s*\([^)]*\)\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\(", # noqa: E501
r"\bc\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*\([^)]*\)\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\(", # noqa: E501
r"\bc\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*\([^)]*\)\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\(", # noqa: E501
]
logger.debug("finding initial function name")
for pattern in function_patterns:
regex = re.compile(pattern)
function_match = regex.search(js)
if function_match:
logger.debug("finished regex search, matched: %s", pattern)
return function_match.group(1)
raise RegexMatchError(
caller="get_initial_function_name", pattern="multiple"
)
def get_transform_plan(js: str) -> List[str]:
"""Extract the "transform plan".
The "transform plan" is the functions that the ciphered signature is
cycled through to obtain the actual signature.
:param str js:
The contents of the base.js asset file.
**Example**:
['DE.AJ(a,15)',
'DE.VR(a,3)',
'DE.AJ(a,51)',
'DE.VR(a,3)',
'DE.kT(a,51)',
'DE.kT(a,8)',
'DE.VR(a,3)',
'DE.kT(a,21)']
"""
name = re.escape(get_initial_function_name(js))
pattern = r"%s=function\(\w\){[a-z=\.\(\"\)]*;(.*);(?:.+)}" % name
logger.debug("getting transform plan")
return regex_search(pattern, js, group=1).split(";")
def get_transform_object(js: str, var: str) -> List[str]:
"""Extract the "transform object".
The "transform object" contains the function definitions referenced in the
"transform plan". The ``var`` argument is the obfuscated variable name
which contains these functions, for example, given the function call
``DE.AJ(a,15)`` returned by the transform plan, "DE" would be the var.
:param str js:
The contents of the base.js asset file.
:param str var:
The obfuscated variable name that stores an object with all functions
that descrambles the signature.
**Example**:
>>> get_transform_object(js, 'DE')
['AJ:function(a){a.reverse()}',
'VR:function(a,b){a.splice(0,b)}',
'kT:function(a,b){var c=a[0];a[0]=a[b%a.length];a[b]=c}']
"""
pattern = r"var %s={(.*?)};" % re.escape(var)
logger.debug("getting transform object")
regex = re.compile(pattern, flags=re.DOTALL)
transform_match = regex.search(js)
if not transform_match:
raise RegexMatchError(caller="get_transform_object", pattern=pattern)
return transform_match.group(1).replace("\n", " ").split(", ")
def get_transform_map(js: str, var: str) -> Dict:
"""Build a transform function lookup.
Build a lookup table of obfuscated JavaScript function names to the
Python equivalents.
:param str js:
The contents of the base.js asset file.
:param str var:
The obfuscated variable name that stores an object with all functions
that descrambles the signature.
"""
transform_object = get_transform_object(js, var)
mapper = {}
for obj in transform_object:
# AJ:function(a){a.reverse()} => AJ, function(a){a.reverse()}
name, function = obj.split(":", 1)
fn = map_functions(function)
mapper[name] = fn
return mapper
def reverse(arr: List, _: Optional[Any]):
"""Reverse elements in a list.
This function is equivalent to:
.. code-block:: javascript
function(a, b) { a.reverse() }
This method takes an unused ``b`` variable as their transform functions
universally sent two arguments.
**Example**:
>>> reverse([1, 2, 3, 4])
[4, 3, 2, 1]
"""
return arr[::-1]
def splice(arr: List, b: int):
"""Add/remove items to/from a list.
This function is equivalent to:
.. code-block:: javascript
function(a, b) { a.splice(0, b) }
**Example**:
>>> splice([1, 2, 3, 4], 2)
[1, 2]
"""
return arr[b:]
def swap(arr: List, b: int):
"""Swap positions at b modulus the list length.
This function is equivalent to:
.. code-block:: javascript
function(a, b) { var c=a[0];a[0]=a[b%a.length];a[b]=c }
**Example**:
>>> swap([1, 2, 3, 4], 2)
[3, 2, 1, 4]
"""
r = b % len(arr)
return list(chain([arr[r]], arr[1:r], [arr[0]], arr[r + 1 :]))
def map_functions(js_func: str) -> Callable:
"""For a given JavaScript transform function, return the Python equivalent.
:param str js_func:
The JavaScript version of the transform function.
"""
mapper = (
# function(a){a.reverse()}
(r"{\w\.reverse\(\)}", reverse),
# function(a,b){a.splice(0,b)}
(r"{\w\.splice\(0,\w\)}", splice),
# function(a,b){var c=a[0];a[0]=a[b%a.length];a[b]=c}
(r"{var\s\w=\w\[0\];\w\[0\]=\w\[\w\%\w.length\];\w\[\w\]=\w}", swap),
# function(a,b){var c=a[0];a[0]=a[b%a.length];a[b%a.length]=c}
(
r"{var\s\w=\w\[0\];\w\[0\]=\w\[\w\%\w.length\];\w\[\w\%\w.length\]=\w}",
swap,
),
)
for pattern, fn in mapper:
if re.search(pattern, js_func):
return fn
raise RegexMatchError(caller="map_functions", pattern="multiple")