小鹅通 M3U8 视频下载器
| 支持自动解密、多线程并发与浏览器联动的本地命令行下载工具,配合浏览器,能够无缝接管课程下载任务,真正进入高效的“边看边存”状态 核心特性 浏览器联动,一键发送:运行后在本地启动 HTTP 服务(端口 8910),配合浏览器脚本,在课程页面点击“发送到下载”即可自动拉起后台下载,零繁琐链接复制,彻底解放双手。 开箱即用,免配依赖:脚本内置环境自检逻辑。首次运行自动检测并安装缺失的 Python 依赖(requests, pycryptodome, tqdm)。 全自动解密与合并,效率翻倍: 自动解密 → 自动获取并解析 AES-128 密钥与 IV,在内存中完成 TS 切片解密。 智能合并 → 极速调用 FFmpeg 将切片合并为 MP4,内置“Stream Copy”与“重编码”双重回退机制,确保视频完美合并。 多线程并发,极速下载:采用 ThreadPoolExecutor 提供多线程并发下载,搭配 tqdm 进度条,直观展示下载进度与切片完成情况。 稳定可靠,日志追溯:提供“控制台+本地日志(downloader.log)”双重记录,所有下载历史有迹可循。 重点说明: // @match *://*.xiaoeknow.com/* // @match *://*.xet.tech/* // @match *://*.xiaoe-tech.com/* // @match *://*.xet.pomoho.com/* // @match *://*.xet-pc.citv.cn/* 因平台支持自定义域名,因此若脚本中不包含所购买的课程的自定义域名,则需要手动进行添加 使用步骤:一、环境与工具准备安装扩展:为实现网页端联动,用户需要手动在浏览器(如 Chrome、Edge)的扩展商店中安装 Tampermonkey(油猴) 插件。添加脚本:在油猴插件中,添加并启用文件目录下的网页端用户脚本(用于在页面生成交互按钮) ![]()
二、一键发送任务,全自动下载 保持下载器的命令行窗口在后台运行,切勿关闭。 打开课程到播放页面,如需特定清晰度需要手动调整,等获取到视频地址后,直接点击由油猴脚本生成的“发送到下载”按钮。 接收到任务后,下载器将在本地 downloads 文件夹下全自动完成切片下载、密钥解密与最终的 MP4 视频合并导出。 ![]() 下载地址: https://wwamb.lanzoul.com/ijN623u0mxji 最低系统可支持到win7 32位,若存在不兼容问题可以直接替换环境 [Python] """小鹅通 M3U8 视频下载器 — 交互式命令行版"""import os, time, shutil, subprocess, binascii, refrom concurrent.futures import ThreadPoolExecutor, as_completedfrom urllib.parse import urljoinimport requestsfrom Crypto.Cipher import AESfrom Crypto.Util.Padding import unpadfrom tqdm import tqdm# 强制 stdout 行缓冲,确保 Windows CMD 中实时输出if hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(line_buffering=True)BASE_DIR = os.path.dirname(os.path.abspath(__file__))OUT = os.path.join(BASE_DIR, 'downloads')os.makedirs(OUT, exist_ok=True)LOG_FILE = os.path.join(BASE_DIR, 'downloader.log')def _console_encoding(): """获取 Windows 控制台实际代码页,避免 UTF-8/GBK 猜错""" try: import ctypes cp = ctypes.windll.kernel32.GetConsoleOutputCP() if cp == 65001: return 'utf-8' elif cp == 936: return 'gbk' else: return f'cp{cp}' except Exception: return (sys.stdout.encoding or 'utf-8').lower()CONSOLE_ENC = _console_encoding()def log(msg, end='\n'): """同时输出到控制台和日志文件。控制台按实际代码页编码,避免乱码""" ts = time.strftime('%H:%M:%S') line = f'[{ts}] {msg}' # 1. 文件日志(始终可靠) try: with open(LOG_FILE, 'a', encoding='utf-8') as f: f.write(line) if end: f.write(end) f.flush() except Exception: pass # 2. 控制台:按真实代码页编码 try: data = (line + end).encode(CONSOLE_ENC, errors='replace') except Exception: data = (line + end).encode('utf-8', errors='replace') try: os.write(1, data) except Exception: try: os.write(2, data) except Exception: passFFMPEG = os.path.join(BASE_DIR, 'ffmpeg.exe')def safe_name(name): """去除文件名中的非法字符""" return re.sub(r'[<>:"/\\|?*]', '_', name).strip()def find_ffmpeg(): for p in [FFMPEG, shutil.which('ffmpeg'), shutil.which('ffmpeg.exe')]: if p and os.path.isfile(p): return p return Nonedef download_one(name, m3u8_url): """下载单个视频""" s = requests.Session() s.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Referer': 'https://xiaoe-tech.com/', }) log(f'\n {name}') log(f' {"─" * 50}') # 1. 下载 M3U8 log(' [1/4] 下载索引...', end=' ') try: resp = s.get(m3u8_url, timeout=30) resp.raise_for_status() except Exception as e: log(f'失败: {e}') return False m3u8 = resp.text base = '/'.join(m3u8_url.split('/')[:-1]) + '/' # 2. 解析 segments = [] key_url = None iv = b'\x00' * 16 for line in m3u8.split('\n'): line = line.strip() if 'URI=' in line and 'AES-128' in line: a = line.find('URI="') + 5 b = line.find('"', a) key_url = line[a:b] if not key_url.startswith('http'): key_url = urljoin(base, key_url) iv_s = line.find('IV=0x') if iv_s != -1: iv = binascii.unhexlify(line[iv_s + 5:iv_s + 37]) elif line and not line.startswith('#'): u = line if line.startswith('http') else urljoin(base, line) segments.append(u) log(f'{len(segments)} 片段') # 3. 获取密钥 log(' [2/4] 获取密钥...', end=' ') try: key = s.get(key_url, timeout=15).content log(f'{len(key)} 字节') except Exception as e: log(f'失败: {e}') return False # 4. 并行下载 tmp = os.path.join(OUT, f'tmp_{int(time.time())}') os.makedirs(tmp, exist_ok=True) def dl_one(url, key, iv, idx): for _ in range(3): try: data = s.get(url, timeout=60).content break except: time.sleep(1) else: return None seg_iv = iv[:12] + idx.to_bytes(4, 'big') dec = AES.new(key, AES.MODE_CBC, iv=seg_iv).decrypt(data) try: dec = unpad(dec, AES.block_size) except: pass fp = os.path.join(tmp, f's_{idx:05d}.ts') with open(fp, 'wb') as f: f.write(dec) return fp t0 = time.time() results = {} with ThreadPoolExecutor(max_workers=6) as ex: fut = {ex.submit(dl_one, u, key, iv, i): i for i, u in enumerate(segments)} with tqdm(total=len(segments), desc=' [3/4] 下载中', unit='片', ncols=60, bar_format='{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt}') as pbar: for f in as_completed(fut): if f.result(): results[fut[f]] = f.result() pbar.update(1) elapsed = time.time() - t0 if not results: log(' 所有片段下载失败') shutil.rmtree(tmp, ignore_errors=True) return False # 5. 合并 log(f' [4/4] 合并 ({elapsed:.1f}s 下载, {len(results)}/{len(segments)} 成功)...', end=' ') files = [results[i] for i in sorted(results)] output = os.path.join(OUT, f'{safe_name(name)}.mp4') # Concat 列表 lst = os.path.join(OUT, '_concat.txt') with open(lst, 'w', encoding='utf-8') as f: for fp in files: f.write(f"file '{fp.replace(os.sep, '/')}'\n") if os.path.exists(output): os.remove(output) cmd = [find_ffmpeg() or 'ffmpeg', '-f', 'concat', '-safe', '0', '-i', lst, '-c', 'copy', '-movflags', '+faststart', '-y', output] r = subprocess.run(cmd, capture_output=True, encoding='utf-8', errors='replace', timeout=600) if os.path.exists(lst): os.remove(lst) if r.returncode != 0: # 重编码回退 lst2 = os.path.join(OUT, '_concat2.txt') with open(lst2, 'w', encoding='utf-8') as f: for fp in files: f.write(f"file '{fp.replace(os.sep, '/')}'\n") cmd2 = [find_ffmpeg() or 'ffmpeg', '-f', 'concat', '-safe', '0', '-i', lst2, '-c:v', 'libx264', '-c:a', 'aac', '-movflags', '+faststart', '-y', output] subprocess.run(cmd2, capture_output=True, encoding='utf-8', errors='replace', timeout=600) if os.path.exists(lst2): os.remove(lst2) shutil.rmtree(tmp, ignore_errors=True) if os.path.exists(output): mb = os.path.getsize(output) / 1048576 log(f' {mb:.1f} MB -> {output}') return True else: log(' 合并失败') return False# ============================================================# HTTP 服务 — 接收浏览器发来的下载任务# ============================================================import jsonimport tracebackimport threadingfrom http.server import HTTPServer, BaseHTTPRequestHandlerPORT = 8910total_count = 0count_lock = threading.Lock()# 线程池:支持同时下载最多3个download_pool = ThreadPoolExecutor(max_workers=3)class Handler(BaseHTTPRequestHandler): def log_message(self, *args): pass def do_OPTIONS(self): log(f'HTTP OPTIONS {self.path}') self._cors() def do_GET(self): log(f'HTTP GET {self.path}') self._cors() self._json({'status': 'running', 'port': PORT, 'total': total_count}) def do_POST(self): global total_count log(f'HTTP POST {self.path} 开始处理') try: length = int(self.headers.get('Content-Length', 0)) raw = self.rfile.read(length) data = json.loads(raw) name = data.get('name', '').strip() url = data.get('url', '').strip() log(f'HTTP POST {self.path} body_len={length}') log(f'JSON: name={name!r} url={url[:60]}...') if not name or not url: log('拒绝任务:name 或 url 为空') self._cors() self._json({'status': 'error', 'msg': 'missing name or url'}) return log(f'>>> 收到任务: [{name}]') log(f' URL: {url[:80]}...') self._cors() self._json({'status': 'accepted', 'msg': f'已接收: {name}'}) log(f'已响应 accepted,提交后台下载...') download_pool.submit(self._do_download, name, url) except Exception as e: tb = traceback.format_exc() log(f'HTTP POST 异常: {e}') log(f'异常堆栈: {tb}') self._cors() self._json({'status': 'error', 'msg': str(e)}) def _do_download(self, name, url): """在单独线程中执行下载""" global total_count log(f'开始下载: [{name}]') ok = download_one(name, url) with count_lock: if ok: total_count += 1 log(f'--- 任务结束: [{name}] {"成功" if ok else "失败"} (累计: {total_count}) ---') log('等待新任务...') def _cors(self): self.send_response(200) self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') self.send_header('Content-Type', 'application/json') self.end_headers() def _json(self, obj): body = json.dumps(obj, ensure_ascii=False).encode('utf-8') self.wfile.write(body)def main(): ffmpeg = find_ffmpeg() if not ffmpeg: log('未找到 ffmpeg.exe,请放到本目录下') sys.exit(1) server = HTTPServer(('127.0.0.1', PORT), Handler) log('=' * 50) log('小鹅通 M3U8 视频下载器 启动') log(f'ffmpeg: {ffmpeg}') log(f'输出: {OUT}') log(f'端口: localhost:{PORT}') log('等待浏览器发送下载任务...') log('在课程页面点击 "发送到下载" 即可') log('按 Ctrl+C 退出') log('=' * 50) try: server.serve_forever() except KeyboardInterrupt: log('\n退出。本次共下载 {} 个视频。'.format(total_count)) server.shutdown()if __name__ == '__main__': main()复制代码 |




