Add HTTP webserver interface (#6)

This commit is contained in:
15532th 2024-11-17 17:52:07 +03:00 committed by GitHub
parent 8e96931d26
commit d29f302fa4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 385 additions and 56 deletions

3
Dockerfile.webserver Normal file
View file

@ -0,0 +1,3 @@
FROM quay.io/invidious/youtube-trusted-session-generator:latest
COPY docker/scripts/startup-webserver.sh ./

View file

@ -2,7 +2,7 @@
## Description ## Description
This script will output two parameters: po_token and visitor_data. Needed for passing YouTube checks in Invidious. This script will output two parameters: po_token and visitor_data. Needed for passing YouTube checks in Invidious or the program that use the po_token functionality.
## What's po_token ## What's po_token
@ -15,28 +15,49 @@ These identity tokens (po_token and visitor_data) generated using this tool will
- You have to run this command on the same public IP address as the one blocked by YouTube. Not necessarily the same machine, just the same public IP address. - You have to run this command on the same public IP address as the one blocked by YouTube. Not necessarily the same machine, just the same public IP address.
Subsequent usage of this same token will work on the same IP range or even the same ASN. The point is to generate this token on a blocked IP as "unblocked" IP addresses seems to not generate a token valid for passing the checks on a blocked IP. Subsequent usage of this same token will work on the same IP range or even the same ASN. The point is to generate this token on a blocked IP as "unblocked" IP addresses seems to not generate a token valid for passing the checks on a blocked IP.
## Tutorial without Docker ## Tutorials for "oneshot" command: run the program and get the po_token and visitor_data values
1. Install Chromium or Google Chrome.
2. Create a new virtualenv: `virtualenv venv`
3. Activate the virtualenv: `source venv/bin/activate`
4. Install the dependencies: `pip install -r requirements.txt`
5. Run the script: `python index.py`
6. Copy paste the values of these the two parameters (po_token and visitor_data) in config.yaml
```
po_token: XXX
visitor_data: XXX
```
7. Restart Invidious.
## Tutorial with Docker ### Tutorial with Docker
1. Run the script: `docker run quay.io/invidious/youtube-trusted-session-generator` 1. Run the script: `docker run quay.io/invidious/youtube-trusted-session-generator`
2. Copy paste the values of these the two parameters (po_token and visitor_data) in config.yaml 2. Copy paste the values of these the two parameters (po_token and visitor_data) in config.yaml
``` ```
po_token: XXX po_token: XXX
visitor_data: XXX visitor_data: XXX
``` ```
3. Restart Invidious. 3. Restart Invidious or the program that use the po_token functionality.
## Why running as root for Docker? ### Tutorial without Docker
1. Install Chromium or Google Chrome.
2. Create a new virtualenv: `virtualenv venv`
3. Activate the virtualenv: `source venv/bin/activate`
4. Install the dependencies: `pip install -r requirements.txt`
5. Run the script: `python potoken-generator.py --oneshot`
6. Copy paste the values of these the two parameters (po_token and visitor_data) in config.yaml
```
po_token: XXX
visitor_data: XXX
```
7. Restart Invidious or the program that use the po_token functionality.
In "headless: false", Chromium does not support sanboxing when it is not ran by root user.
### Why running as root for Docker?
In "headless: false", Chromium does not support sanboxing when it is not ran by root user.
## Tutorials for "always running" program: Get po_token on demand using HTTP.
### Tutorial with Docker
Run the program: `docker run -p 8080:8080 quay.io/invidious/youtube-trusted-session-generator:webserver`
### Tutorial without Docker
1. Install Chromium or Google Chrome.
2. Create a new virtualenv: `virtualenv venv`
3. Activate the virtualenv: `source venv/bin/activate`
4. Install the dependencies: `pip install -r requirements.txt`
5. Run the program: `python potoken-generator.py`
### Usage of the HTTP API
Send your requests to http://localhost:8080/token in order to obtain your po_token.
You can also force refresh the po_token in the cache by sending a request to http://localhost:8080/update.

View file

@ -0,0 +1,14 @@
#!/bin/sh
echo "[INFO] internally launching GUI (X11 environment)"
XVFB_WHD=${XVFB_WHD:-1280x720x16}
echo "[INFO] starting Xvfb"
Xvfb :99 -ac -screen 0 $XVFB_WHD -nolisten tcp > /dev/null 2>&1 &
sleep 2
echo "[INFO] launching chromium instance"
# Run python script on display 0
DISPLAY=:99 python potoken-generator.py --bind 0.0.0.0

View file

@ -10,5 +10,5 @@ sleep 2
echo "[INFO] launching chromium instance" echo "[INFO] launching chromium instance"
# Run python script on display 99 # Run python script on display 0
DISPLAY=:99 python index.py DISPLAY=:99 python potoken-generator.py --oneshot

View file

@ -1,37 +0,0 @@
import asyncio
from nodriver import start, cdp, loop
import time
import json
import sys
async def main():
browser = await start(headless=False)
print("[INFO] launching browser.")
tab = browser.main_tab
tab.add_handler(cdp.network.RequestWillBeSent, send_handler)
page = await browser.get('https://www.youtube.com/embed/jNQXAC9IVRw')
await tab.wait(cdp.network.RequestWillBeSent)
print("[INFO] waiting 10 seconds for the page to fully load.")
await tab.sleep(10)
button_play = await tab.select("#movie_player")
await button_play.click()
await tab.wait(cdp.network.RequestWillBeSent)
print("[INFO] waiting additional 30 seconds for slower connections.")
await tab.sleep(30)
async def send_handler(event: cdp.network.RequestWillBeSent):
if "/youtubei/v1/player" in event.request.url:
post_data = event.request.post_data
post_data_json = json.loads(post_data)
visitor_data = post_data_json["context"]["client"]["visitorData"]
po_token = post_data_json["serviceIntegrityDimensions"]["poToken"]
print("visitor_data: " + visitor_data)
print("po_token: " + po_token)
if len(po_token) < 160:
print("[WARNING] there is a high chance that the potoken generated won't work. please try again on another internet connection.")
sys.exit(0)
return
if __name__ == '__main__':
loop().run_until_complete(main())

4
potoken-generator.py Normal file
View file

@ -0,0 +1,4 @@
import potoken_generator.main
if __name__ == '__main__':
potoken_generator.main.main()

View file

View file

@ -0,0 +1,150 @@
import asyncio
import dataclasses
import json
import logging
import time
from dataclasses import dataclass
from pathlib import Path
from tempfile import mkdtemp
from typing import Optional
import nodriver
logger = logging.getLogger('extractor')
@dataclass
class TokenInfo:
updated: int
potoken: str
visitor_data: str
def to_json(self) -> str:
as_dict = dataclasses.asdict(self)
as_json = json.dumps(as_dict)
return as_json
class PotokenExtractor:
def __init__(self, loop: asyncio.AbstractEventLoop,
update_interval: float = 3600,
browser_path: Optional[Path] = None) -> None:
self.update_interval: float = update_interval
self.browser_path: Optional[Path] = browser_path
self.profile_path = mkdtemp() # cleaned up on exit by nodriver
self._loop = loop
self._token_info: Optional[TokenInfo] = None
self._ongoing_update: asyncio.Lock = asyncio.Lock()
self._extraction_done: asyncio.Event = asyncio.Event()
self._update_requested: asyncio.Event = asyncio.Event()
def get(self) -> Optional[TokenInfo]:
return self._token_info
async def run_once(self) -> Optional[TokenInfo]:
await self._update()
return self.get()
async def run(self) -> None:
await self._update()
while True:
try:
await asyncio.wait_for(self._update_requested.wait(), timeout=self.update_interval)
logger.debug('initiating force update')
except asyncio.TimeoutError:
logger.debug('initiating scheduled update')
await self._update()
self._update_requested.clear()
def request_update(self) -> bool:
"""Request immediate update, return False if update request is already set"""
if self._ongoing_update.locked():
logger.debug('update process is already running')
return False
if self._update_requested.is_set():
logger.debug('force update has already been requested')
return False
self._loop.call_soon_threadsafe(self._update_requested.set)
logger.debug('force update requested')
return True
@staticmethod
def _extract_token(request: nodriver.cdp.network.Request) -> Optional[TokenInfo]:
post_data = request.post_data
try:
post_data_json = json.loads(post_data)
visitor_data = post_data_json['context']['client']['visitorData']
potoken = post_data_json['serviceIntegrityDimensions']['poToken']
except (json.JSONDecodeError, TypeError, KeyError) as e:
logger.warning(f'failed to extract token from request: {type(e)}, {e}')
return None
token_info = TokenInfo(
updated=int(time.time()),
potoken=potoken,
visitor_data=visitor_data
)
return token_info
async def _update(self) -> None:
try:
await asyncio.wait_for(self._perform_update(), timeout=600)
except asyncio.TimeoutError:
logger.error('update failed: hard limit timeout exceeded. Browser might be failing to start properly')
async def _perform_update(self) -> None:
if self._ongoing_update.locked():
logger.debug('update is already in progress')
return
async with self._ongoing_update:
logger.info('update started')
self._extraction_done.clear()
try:
browser = await nodriver.start(headless=False,
browser_executable_path=self.browser_path,
user_data_dir=self.profile_path)
except FileNotFoundError as e:
msg = "could not find Chromium. Make sure it's installed or provide direct path to the executable"
raise FileNotFoundError(msg) from e
tab = browser.main_tab
tab.add_handler(nodriver.cdp.network.RequestWillBeSent, self._send_handler)
await tab.get('https://www.youtube.com/embed/jNQXAC9IVRw')
player_clicked = await self._click_on_player(tab)
if player_clicked:
await self._wait_for_handler()
await tab.close()
browser.stop()
@staticmethod
async def _click_on_player(tab: nodriver.Tab) -> bool:
try:
player = await tab.select('#movie_player', 10)
except asyncio.TimeoutError:
logger.warning('update failed: unable to locate video player on the page')
return False
else:
await player.click()
return True
async def _wait_for_handler(self) -> bool:
try:
await asyncio.wait_for(self._extraction_done.wait(), timeout=30)
except asyncio.TimeoutError:
logger.warning('update failed: timeout waiting for outgoing API request')
return False
else:
logger.info('update was succeessful')
return True
async def _send_handler(self, event: nodriver.cdp.network.RequestWillBeSent) -> None:
if not event.request.method == 'POST':
return
if '/youtubei/v1/player' not in event.request.url:
return
token_info = self._extract_token(event.request)
if token_info is None:
return
logger.info(f'new token: {token_info.to_json()}')
self._token_info = token_info
self._extraction_done.set()

98
potoken_generator/main.py Normal file
View file

@ -0,0 +1,98 @@
import argparse
import asyncio
import logging
import sys
from pathlib import Path
from typing import Optional
import nodriver
from potoken_generator.extractor import PotokenExtractor, TokenInfo
from potoken_generator.server import PotokenServer
logger = logging.getLogger('potoken')
def print_token_and_exit(token_info: Optional[TokenInfo]):
if token_info is None:
logger.warning('failed to extract token')
sys.exit(1)
visitor_data = token_info.visitor_data
po_token = token_info.potoken
print('visitor_data: ' + visitor_data)
print('po_token: ' + po_token)
if len(po_token) < 160:
logger.warning("there is a high chance that the potoken generated won't work. Please try again on another internet connection")
sys.exit(1)
sys.exit(0)
async def run(loop: asyncio.AbstractEventLoop, oneshot: bool,
update_interval: int, bind_address: str, port: int,
browser_path: Optional[Path] = None) -> None:
potoken_extractor = PotokenExtractor(loop, update_interval=update_interval, browser_path=browser_path)
token = await potoken_extractor.run_once()
if oneshot:
print_token_and_exit(token)
extractor_task = loop.create_task(potoken_extractor.run())
potoken_server = PotokenServer(potoken_extractor, port=port, bind_address=bind_address)
server_task = loop.create_task(asyncio.to_thread(potoken_server.run))
try:
await asyncio.gather(extractor_task, server_task)
except Exception:
# exceptions raised by the tasks are intentionally propogated
# to ensure process exit code is 1 on error
raise
except (KeyboardInterrupt, asyncio.CancelledError):
logger.info('Stopping...')
finally:
potoken_server.stop()
def set_logging(log_level: int = logging.DEBUG) -> None:
log_format = '%(asctime)s.%(msecs)03d [%(name)s] [%(levelname)s] %(message)s'
datefmt = '%Y/%m/%d %H:%M:%S'
logging.basicConfig(level=log_level, format=log_format, datefmt=datefmt)
logging.getLogger('asyncio').setLevel(logging.INFO)
logging.getLogger('nodriver').setLevel(logging.WARNING)
logging.getLogger('uc').setLevel(logging.WARNING)
logging.getLogger('websockets').setLevel(logging.WARNING)
def args_parse() -> argparse.Namespace:
description = '''
Retrieve potoken using Chromium runned by nodriver, serve it on a json endpoint
Token is generated on startup, and then every UPDATE_INTERVAL seconds.
With web-server running on default port, the token is available on the
http://127.0.0.1:8080/token endpoint. It is possible to request immediate
token regeneration by accessing http://127.0.0.1:8080/update
'''
parser = argparse.ArgumentParser(description=description, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('-o', '--oneshot', action='store_true', default=False,
help='Do not start server. Generate token once, print it and exit')
parser.add_argument('--update-interval', '-u', type=int, default=3600,
help='How ofthen new token is generated, in seconds (default: %(default)s)')
parser.add_argument('--port', '-p', type=int, default=8080,
help='Port webserver is listening on (default: %(default)s)')
parser.add_argument('--bind', '-b', default='127.0.0.1',
help='Address webserver binds to (default: %(default)s)')
parser.add_argument('--chrome-path', '-c', type=Path, default=None,
help='Path to the Chromiun executable')
return parser.parse_args()
def main() -> None:
args = args_parse()
set_logging(logging.WARNING if args.oneshot else logging.INFO)
loop = nodriver.loop()
main_task = run(loop, oneshot=args.oneshot,
update_interval=args.update_interval,
bind_address=args.bind,
port=args.port,
browser_path=args.chrome_path
)
loop.run_until_complete(main_task)

View file

@ -0,0 +1,76 @@
import logging
from socketserver import ThreadingMixIn
from typing import Any, Callable, Dict, Optional, Tuple
from wsgiref.simple_server import WSGIServer, make_server
from potoken_generator.extractor import PotokenExtractor
logger = logging.getLogger('server')
class ThreadingWSGIServer(WSGIServer, ThreadingMixIn):
"""Thread per request HTTP server."""
daemon_threads: bool = True
class PotokenServer:
def __init__(self, potoken_extractor: PotokenExtractor, port: int = 8080, bind_address: str = '0.0.0.0') -> None:
self.port = port
self.bind_address = bind_address
self._potoken_extractor = potoken_extractor
self._httpd: Optional[ThreadingWSGIServer] = None
def get_potoken(self) -> Tuple[str, list, str]:
token = self._potoken_extractor.get()
if token is None:
status = '503 Service Unavailable'
headers = [('Content-Type', 'text/plain')]
page = 'Token has not yet been generated, try again later.'
else:
status = '200 OK'
headers = [('Content-Type', 'application/json')]
page = token.to_json()
return status, headers, page
def request_update(self) -> Tuple[str, list, str]:
status = '200 OK'
headers = [('Content-Type', 'text/plain')]
accepted = self._potoken_extractor.request_update()
if accepted:
page = 'Update request accepted, new token will be generated soon.'
else:
page = 'Update has already been requested, new token will be generated soon.'
return status, headers, page
def get_route_handler(self, route: str) -> Callable[[], Tuple[str, list, str]]:
handlers = {
# handler is a function returning a tuple of status, headers, page text
'/404': lambda: ('404 Not Found', [('Content-Type', 'text/plain')], 'Not Found'),
'/': lambda: ('302 Found', [('Location', '/token')], '/token'),
'/token': self.get_potoken,
'/update': self.request_update
}
return handlers.get(route) or handlers['/404']
def app(self, environ: Dict[str, Any], start_response):
route = environ['PATH_INFO']
handler = self.get_route_handler(route)
status, headers, page = handler()
start_response(status, headers)
return [page.encode('utf8')]
def run(self) -> None:
logger.info(f'Starting web-server at {self.bind_address}:{self.port}')
self._httpd = make_server(self.bind_address, self.port, self.app, ThreadingWSGIServer)
with self._httpd:
self._httpd.serve_forever()
def stop(self) -> None:
if self._httpd is None:
return
self._httpd.shutdown()