From a2fe29fa0d15a8c20f084d120c9ef8cf78e499ce Mon Sep 17 00:00:00 2001 From: Evil0ctal Date: Tue, 8 Nov 2022 06:22:00 -0800 Subject: [PATCH] Update web_api.py --- web_api.py | 892 ++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 673 insertions(+), 219 deletions(-) diff --git a/web_api.py b/web_api.py index 3a5097d..7fb9493 100644 --- a/web_api.py +++ b/web_api.py @@ -2,247 +2,701 @@ # -*- encoding: utf-8 -*- # @Author: https://github.com/Evil0ctal/ # @Time: 2021/11/06 -# @Update: 2022/10/17 +# @Update: 2022/11/06 +# @Version: 3.0.0 # @Function: # 创建一个接受提交参数的Flask应用程序。 # 将scraper.py返回的内容以JSON格式返回。 -# 默认运行端口2333, 请自行在config.ini中修改。 -import os -import re -import time -import requests -import unicodedata + import configparser +import json +import os +import threading +import time +import zipfile + +import requests +import uvicorn +from fastapi import FastAPI +from fastapi.responses import ORJSONResponse, FileResponse +from pydantic import BaseModel +from starlette.responses import RedirectResponse + + from scraper import Scraper -from werkzeug.urls import url_quote -from flask import Flask, request, jsonify, make_response -app = Flask(__name__) -app_config = configparser.ConfigParser() -app_config.read('config.ini', encoding='utf-8') -api_config = app_config['Web_API'] -headers = { - 'user-agent': 'Mozilla/5.0 (Linux; Android 8.0; Pixel 2 Build/OPD3.170816.012) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Mobile Safari/537.36 Edg/87.0.664.66' -} +# 读取配置文件 +config = configparser.ConfigParser() +config.read('config.ini', encoding='utf-8') +# 运行端口 +port = int(config["Web_API"]["Port"]) +# 域名 +domain = config["Web_API"]["Domain"] + +# 创建FastAPI实例 +title = "Douyin TikTok Download API(api.douyin.wtf)" +version = '3.0.0' +update_time = "2022/10/31" +description = """ +#### Description/说明 +
+点击展开/Click to expand +> [中文/Chinese] +- 爬取Douyin以及TikTok的数据并返回,更多功能正在开发中。 +- 如果需要更多接口,请查看[https://api-v2.douyin.wtf/docs](https://api-v2.douyin.wtf/docs)。 +- 本项目开源在[GitHub:Douyin_TikTok_Download_API](https://github.com/Evil0ctal/Douyin_TikTok_Download_API)。 +- 全部端点数据均来自抖音以及TikTok的官方接口,如遇到问题或BUG或建议请在[issues](https://github.com/Evil0ctal/Douyin_TikTok_Download_API/issues)中反馈。 +- 本项目仅供学习交流使用,严禁用于违法用途,如有侵权请联系作者。 +> [英文/English] +- Crawl the data of Douyin and TikTok and return it. More features are under development. +- If you need more interfaces, please visit [https://api-v2.douyin.wtf/docs](https://api-v2.douyin.wtf/docs). +- This project is open source on [GitHub: Douyin_TikTok_Download_API](https://github.com/Evil0ctal/Douyin_TikTok_Download_API). +- All endpoint data comes from the official interface of Douyin and TikTok. If you have any questions or BUGs or suggestions, please feedback in [issues]( +- This project is for learning and communication only. It is strictly forbidden to be used for illegal purposes. If there is any infringement, please contact the author. +
+#### Contact author/联系作者 +
+点击展开/Click to expand +- WeChat: Evil0ctal +- Email: [Evil0ctal1985@gmail.com](mailto:Evil0ctal1985@gmail.com) +- Github: [https://github.com/Evil0ctal](https://github.com/Evil0ctal) +
+""" +tags_metadata = [ + { + "name": "Root", + "description": "Root path info.", + }, + { + "name": "API", + "description": "Hybrid interface, automatically determine the input link and return the simplified data/混合接口,自动判断输入链接返回精简后的数据。", + }, + { + "name": "Douyin", + "description": "All Douyin API Endpoints/所有抖音接口节点", + }, + { + "name": "TikTok", + "description": "All TikTok API Endpoints/所有TikTok接口节点", + }, + { + "name": "Download", + "description": "Enter the share link and return the download file response./输入分享链接后返回下载文件响应", + }, + { + "name": "iOS_Shortcut", + "description": "Get iOS shortcut info/获取iOS快捷指令信息", + }, +] + +# 创建Scraper对象 +api = Scraper() + +# 创建FastAPI实例 +app = FastAPI( + title=title, + description=description, + version=version, + openapi_tags=tags_metadata +) + +""" ________________________⬇️端点响应模型(Endpoints Response Model)⬇️________________________""" -def find_url(string): - # 解析抖音分享口令中的链接并返回列表 - url = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', string) - return url +# API Root节点 +class APIRoot(BaseModel): + API_status: str + Version: str = version + Update_time: str = update_time + API_V1_Document: str + API_V2_Document: str + GitHub: str -def clean_filename(string, author_name): - # 替换不能用于文件名的字符('/ \ : * ? " < > |') - rstr = r"[\/\\\:\*\?\"\<\>\|]" - # 将上述字符替换为下划线 - new_title = re.sub(rstr, "_", string) - # 新文件名 - filename = (new_title + '_' + author_name).replace('\n', '') - return filename +# API获取视频基础模型 +class iOS_Shortcut(BaseModel): + version: str = None + update: str = None + link: str = None + link_en: str = None + note: str = None + note_en: str = None -@app.route("/", methods=["POST", "GET"]) -def index(): - # 显示基础信息 - index_info = {'API status': 'Running', - 'GitHub': 'https://github.com/Evil0ctal/Douyin_TikTok_Download_API', - 'Introduction': 'Free and open source Douyin/TikTok watermark-free video download tool, supports API calls.', - 'Web interface': 'https://douyin.wtf/', - 'iOS Shortcuts': 'https://api.douyin.wtf/ios', - 'API V1': 'https://api.douyin.wtf/', - 'API V2': 'https://api-v2.douyin.wtf/docs', - 'Parsing Douyin/TikTok videos': 'https://api.douyin.wtf/api?url=[Douyin/TikTok url]', - 'Return Video MP4 File Download': 'https://api.douyin.wtf/video?url=[Douyin/TikTok url]', - 'Return Video MP3 File Download': 'https://api.douyin.wtf/music?url=[Douyin/TikTok url]'} - return jsonify(index_info) +# API获取视频基础模型 +class API_Video_Response(BaseModel): + status: str = None + platform: str = None + endpoint: str = None + message: str = None + total_time: float = None + aweme_list: list = None -@app.route("/api", methods=["POST", "GET"]) -def webapi(): - # 创建一个Flask应用获取POST参数并返回结果 - api = Scraper() - content = request.args.get("url") - if content != '': - try: - post_content = find_url(content)[0] # 尝试找出提交值中的链接 - except: - # 返回错误信息 - return jsonify(status='failed', reason="Can not find valid Douyin/TikTok URL", function='webapi()', value=content) - if api_config['Allow_Logs'] == 'True': - # 将API记录在API_logs.txt中 - date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - with open('API_logs.txt', 'a') as f: - f.write(date + " : " + post_content + '\n') - try: - # 开始时间 - start = time.time() - # 校验是否为TikTok链接 - if 'tiktok.com' in post_content: - result = api.tiktok(post_content) - # 以JSON格式返回TikTok信息 - return jsonify(result) - # 如果关键字不存在则判断为抖音链接 - elif 'douyin.com' in post_content: - result = api.douyin(post_content) - # 以JSON格式返回返回Douyin信息 - return jsonify(result) - except Exception as e: +# 混合解析API基础模型: +class API_Hybrid_Response(BaseModel): + status: str = None + message: str = None + endpoint: str = None + url: str = None + type: str = None + platform: str = None + aweme_id: str = None + total_time: float = None + official_api_url: dict = None + desc: str = None + create_time: int = None + author: dict = None + music: dict = None + statistics: dict = None + cover_data: dict = None + hashtags: list = None + video_data: dict = None + image_data: dict = None + + +# 混合解析API精简版基础模型: +class API_Hybrid_Minimal_Response(BaseModel): + status: str = None + message: str = None + platform: str = None + type: str = None + wm_video_url: str = None + wm_video_url_HQ: str = None + nwm_video_url: str = None + nwm_video_url_HQ: str = None + no_watermark_image_list: list or None = None + watermark_image_list: list or None = None + + +""" ________________________⬇️端点日志记录(Endpoint logs)⬇️________________________""" + + +# 记录API请求日志 +async def api_logs(start_time, input_data, endpoint, error_data: dict = None): + if config["Web_API"]["Allow_Logs"] == "True": + time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + total_time = float(format(time.time() - start_time, '.4f')) + file_name = "API_logs.json" + # 写入日志内容 + with open(file_name, "a", encoding="utf-8") as f: + data = { + "time": time_now, + "endpoint": f'/{endpoint}/', + "total_time": total_time, + "input_data": input_data, + "error_data": error_data if error_data else "No error" + } + f.write(json.dumps(data, ensure_ascii=False) + ",\n") + print('日志记录成功!') + return 1 + else: + print('日志记录已关闭!') + return 0 + + + +""" ________________________⬇️Root端点(Root endpoint)⬇️________________________""" + + +# Root端点 +@app.get("/", response_model=APIRoot, tags=["Root"]) +async def root(): + """ + Root path info. + """ + data = { + "API_status": "Running", + "Version": version, + "Update_time": update_time, + "API_V1_Document": "https://api.douyin.wtf/docs", + "API_V2_Document": "https://api-v2.douyin.wtf/docs", + "GitHub": "https://github.com/Evil0ctal/Douyin_TikTok_Download_API", + } + return ORJSONResponse(data) + + +""" ________________________⬇️混合解析端点(Hybrid parsing endpoints)⬇️________________________""" + + +# 混合解析端点,自动判断输入链接返回精简后的数据 +# Hybrid parsing endpoint, automatically determine the input link and return the simplified data. +@app.get("/api", tags=["API"], response_model=API_Hybrid_Response) +async def hybrid_parsing(url: str, minimal: bool = False): + """ + ## 用途/Usage + - 获取[抖音|TikTok]单个视频数据,参数是视频链接或分享口令。 + - Get [Douyin|TikTok] single video data, the parameter is the video link or share code. + ## 参数/Parameter + #### url(必填/Required)): + - 视频链接。| 分享口令 + - The video link.| Share code + - 例子/Example: + `https://www.douyin.com/video/7153585499477757192` + `https://v.douyin.com/MkmSwy7/` + `https://vm.tiktok.com/TTPdkQvKjP/` + `https://www.tiktok.com/@tvamii/video/7045537727743380782` + #### minimal(选填/Optional Default:False): + - 是否返回精简版数据。 + - Whether to return simplified data. + - 例子/Example: + `True` + `False` + ## 返回值/Return + - 用户当个视频数据的列表,列表内包含JSON数据。 + - List of user single video data, list contains JSON data. + """ + print("正在进行混合解析...") + # 开始时间 + start_time = time.time() + # 获取数据 + data = api.hybrid_parsing(url) + # 是否精简 + if minimal: + result = api.hybrid_parsing_minimal(data) + else: + # 更新数据 + result = { + 'url': url, + "endpoint": "/api/", + "total_time": float(format(time.time() - start_time, '.4f')), + } + # 合并数据 + result.update(data) + # 记录API调用 + await api_logs(start_time=start_time, + input_data={'url': url}, + endpoint='api') + return ORJSONResponse(result) + + +""" ________________________⬇️抖音视频解析端点(Douyin video parsing endpoint)⬇️________________________""" + + +# 获取抖音单个视频数据/Get Douyin single video data +@app.get("/douyin_video_data/", response_model=API_Video_Response, tags=["Douyin"]) +async def get_douyin_video_data(douyin_video_url: str = None, video_id: str = None): + """ + ## 用途/Usage + - 获取抖音用户单个视频数据,参数是视频链接|分享口令 + - Get the data of a single video of a Douyin user, the parameter is the video link. + ## 参数/Parameter + #### douyin_video_url(选填/Optional): + - 视频链接。| 分享口令 + - The video link.| Share code + - 例子/Example: + `https://www.douyin.com/video/7153585499477757192` + `https://v.douyin.com/MkmSwy7/` + #### video_id(选填/Optional): + - 视频ID,可以从视频链接中获取。 + - The video ID, can be obtained from the video link. + - 例子/Example: + `7153585499477757192` + #### 备注/Note: + - 参数`douyin_video_url`和`video_id`二选一即可,如果都填写,优先使用`video_id`以获得更快的响应速度。 + - The parameters `douyin_video_url` and `video_id` can be selected, if both are filled in, the `video_id` is used first to get a faster response speed. + ## 返回值/Return + - 用户当个视频数据的列表,列表内包含JSON数据。 + - List of user single video data, list contains JSON data. + """ + if video_id is None or video_id == '': + # 获取视频ID + video_id = api.get_douyin_video_id(douyin_video_url) + if video_id is None: + result = { + "status": "failed", + "platform": "douyin", + "message": "video_id获取失败/Failed to get video_id", + } + return ORJSONResponse(result) + if video_id is not None and video_id != '': + # 开始时间 + start_time = time.time() + print('获取到的video_id数据:{}'.format(video_id)) + if video_id is not None: + video_data = api.get_douyin_video_data(video_id=video_id) + if video_data is None: + result = { + "status": "failed", + "platform": "douyin", + "endpoint": "/douyin_video_data/", + "message": "视频API数据获取失败/Failed to get video API data", + } + return ORJSONResponse(result) + # print('获取到的video_data:{}'.format(video_data)) + # 记录API调用 + await api_logs(start_time=start_time, + input_data={'douyin_video_url': douyin_video_url, 'video_id': video_id}, + endpoint='douyin_video_data') # 结束时间 - end = time.time() - # 解析时间 - analyze_time = (format((end - start), '.4f') + 's') - # 返回错误信息 - return jsonify(status='failed', reason=str(e), time=analyze_time, function='webapi()', value=content) - else: - # 返回错误信息 - return jsonify(status='failed', reason='url value cannot be empty', function='webapi()', value=content) - - -@app.route("/ios", methods=["POST", "GET"]) -def ios_shortcut(): - # 用于检查快捷指令更新 - return jsonify(version=api_config['iOS_Shortcut_Version'], - update=api_config['iOS_Shortcut_Update_Time'], - link=api_config['iOS_Shortcut_Link'], - link_en=api_config['iOS_Shortcut_Link_EN'], - note=api_config['iOS_Shortcut_Update_Note'], - note_en=api_config['iOS_Shortcut_Update_Note_EN']) - - -@app.route("/video", methods=["POST", "GET"]) -def download_video(): - # 用于返回视频下载请求(返回MP4文件下载请求,面对大量请求时非常吃服务器内存,容易崩,慎用。) - # 将api_switch的值设定为False可关闭该API - api_switch = api_config['Video_Download'] - if api_switch == 'True': - api = Scraper() - content = request.args.get("url") - if content == '': - return jsonify(status='failed', reason='url value cannot be empty', function='download_music()', - value=content) + total_time = float(format(time.time() - start_time, '.4f')) + # 返回数据 + result = { + "status": "success", + "platform": "douyin", + "endpoint": "/douyin_video_data/", + "message": "获取视频数据成功/Got video data successfully", + "total_time": total_time, + "aweme_list": [video_data] + } + return ORJSONResponse(result) else: - post_content = find_url(content)[0] - try: - if 'douyin.com' in post_content: - # 获取视频信息 - result = api.douyin(post_content) - # 视频链接 - video_url = result['nwm_video_url'] - # 视频标题 - video_title = result['video_title'] - # 作者昵称 - video_author = result['video_author'] - # 清理文件名 - file_name = clean_filename(video_title, video_author) - elif 'tiktok.com' in post_content: - # 获取视频信息 - result = api.tiktok(post_content) - # 无水印地址 - video_url = result['nwm_video_url'] - # 视频标题 - video_title = result['video_title'] - # 作者昵称 - video_author = result['video_author_nickname'] - # 清理文件名 - file_name = clean_filename(video_title, video_author) - else: - return jsonify(Status='Failed', Reason='Check submitted parameters!') - # 获取视频文件字节流 - video_mp4 = requests.get(video_url, headers).content - # 将字节流封装成返回对象 - response = make_response(video_mp4) - # 添加响应头部信息 - response.headers['Content-Type'] = "video/mp4" - # 他妈的,费了我老大劲才解决文件中文名的问题 - try: - filename = file_name.encode('latin-1') - except UnicodeEncodeError: - filenames = { - 'filename': unicodedata.normalize('NFKD', file_name).encode('latin-1', 'ignore'), - 'filename*': "UTF-8''{}".format(url_quote(file_name) + '.mp4'), - } - else: - filenames = {'filename': file_name + '.mp4'} - # attachment表示以附件形式下载 - response.headers.set('Content-Disposition', 'attachment', **filenames) - return response - except Exception as e: - return jsonify(status='failed', reason=str(e), function='download_video()', value=content) + print('获取抖音video_id失败') + result = { + "status": "failed", + "platform": "douyin", + "endpoint": "/douyin_video_data/", + "message": "获取视频ID失败/Failed to get video ID", + "total_time": 0, + "aweme_list": [] + } + return ORJSONResponse(result) + + +""" ________________________⬇️TikTok视频解析端点(TikTok video parsing endpoint)⬇️________________________""" + + +# 获取TikTok单个视频数据/Get TikTok single video data +@app.get("/tiktok_video_data/", response_class=ORJSONResponse, response_model=API_Video_Response, tags=["TikTok"]) +async def get_tiktok_video_data(tiktok_video_url: str = None, video_id: str = None): + """ + ## 用途/Usage + - 获取单个视频数据,参数是视频链接| 分享口令。 + - Get single video data, the parameter is the video link. + ## 参数/Parameter + #### tiktok_video_url(选填/Optional): + - 视频链接。| 分享口令 + - The video link.| Share code + - 例子/Example: + `https://www.tiktok.com/@evil0ctal/video/7156033831819037994` + `https://vm.tiktok.com/TTPdkQvKjP/` + #### video_id(选填/Optional): + - 视频ID,可以从视频链接中获取。 + - The video ID, can be obtained from the video link. + - 例子/Example: + `7156033831819037994` + #### 备注/Note: + - 参数`tiktok_video_url`和`video_id`二选一即可,如果都填写,优先使用`video_id`以获得更快的响应速度。 + - The parameters `tiktok_video_url` and `video_id` can be selected, if both are filled in, the `video_id` is used first to get a faster response speed. + ## 返回值/Return + - 用户当个视频数据的列表,列表内包含JSON数据。 + - List of user single video data, list contains JSON data. + """ + # 开始时间 + start_time = time.time() + if video_id is None or video_id == "": + video_id = api.get_tiktok_video_id(tiktok_video_url) + if video_id is None: + return ORJSONResponse({"status": "fail", "platform": "tiktok", "endpoint": "/tiktok_video_data/", + "message": "获取视频ID失败/Get video ID failed"}) + if video_id is not None and video_id != '': + print('开始解析单个TikTok视频数据') + video_data = api.get_tiktok_video_data(video_id) + # TikTok的API数据如果为空或者返回的数据中没有视频数据,就返回错误信息 + # If the TikTok API data is empty or there is no video data in the returned data, an error message is returned + if video_data is None or video_data.get('aweme_id') != video_id: + print('视频数据获取失败/Failed to get video data') + result = { + "status": "failed", + "platform": "tiktok", + "endpoint": "/tiktok_video_data/", + "message": "视频数据获取失败/Failed to get video data" + } + return ORJSONResponse(result) + # 记录API调用 + await api_logs(start_time=start_time, + input_data={'tiktok_video_url': tiktok_video_url, 'video_id': video_id}, + endpoint='tiktok_video_data') + # 结束时间 + total_time = float(format(time.time() - start_time, '.4f')) + # 返回数据 + result = { + "status": "success", + "platform": "tiktok", + "endpoint": "/tiktok_video_data/", + "message": "获取视频数据成功/Got video data successfully", + "total_time": total_time, + "aweme_list": [video_data] + } + return ORJSONResponse(result) else: - return jsonify(Status='Failed', Reason='This API is disabled. To enable it, set the value of "api_switch" to True.') + print('视频链接错误/Video link error') + result = { + "status": "failed", + "platform": "tiktok", + "endpoint": "/tiktok_video_data/", + "message": "视频链接错误/Video link error" + } + return ORJSONResponse(result) -@app.route("/music", methods=["POST", "GET"]) -def download_music(): - # 用于返回视频下载请求(返回MP3文件下载请求,面对大量请求时非常吃服务器内存,容易崩,慎用。) - # 将api_switch的值设定为False可关闭该API - api_switch = api_config['Music_Download'] - if api_switch == 'True': - api = Scraper() - content = request.args.get("url") - if content == '': - return jsonify(status='failed', reason='url value cannot be empty', function='download_music()', - value=content) +""" ________________________⬇️iOS快捷指令更新端点(iOS Shortcut update endpoint)⬇️________________________""" + + +@app.get("/ios", response_model=iOS_Shortcut, tags=["iOS_Shortcut"]) +async def Get_Shortcut(): + data = { + 'version': config["Web_API"]["iOS_Shortcut_Version"], + 'update': config["Web_API"]['iOS_Shortcut_Update_Time'], + 'link': config["Web_API"]['iOS_Shortcut_Link'], + 'link_en': config["Web_API"]['iOS_Shortcut_Link_EN'], + 'note': config["Web_API"]['iOS_Shortcut_Update_Note'], + 'note_en': config["Web_API"]['iOS_Shortcut_Update_Note_EN'], + } + return ORJSONResponse(data) + + +""" ________________________⬇️下载文件端点/函数(Download file endpoints/functions)⬇️________________________""" + + +# 下载文件端点/Download file endpoint +@app.get("/download", tags=["Download"]) +async def download_file_hybrid(url: str, prefix: bool = True, watermark: bool = False): + """ + ## 用途/Usage + ### [中文] + - 将[抖音|TikTok]链接作为参数提交至此端点,返回[视频|图片]文件下载请求。 + ### [English] + - Submit the [Douyin|TikTok] link as a parameter to this endpoint and return the [video|picture] file download request. + # 参数/Parameter + - url:str -> [Douyin|TikTok] [视频|图片] 链接/ [Douyin|TikTok] [video|image] link + - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix + - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark + """ + # 是否开启此端点/Whether to enable this endpoint + if config["Web_API"]["Download_Switch"] != "True": + return ORJSONResponse({"status": "endpoint closed", + "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"}) + # 开始时间 + start_time = time.time() + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' + } + data = api.hybrid_parsing(url) + if data is None: + return ORJSONResponse(data) + else: + # 记录API调用 + await api_logs(start_time=start_time, + input_data={'url': url}, + endpoint='download') + url_type = data.get('type') + platform = data.get('platform') + aweme_id = data.get('aweme_id') + file_name_prefix = config["Web_API"]["File_Name_Prefix"] if prefix else '' + root_path = config["Web_API"]["Download_Path"] + # 查看目录是否存在,不存在就创建 + if not os.path.exists(root_path): + os.makedirs(root_path) + if url_type == 'video': + file_name = file_name_prefix + platform + '_' + aweme_id + '.mp4' if not watermark else file_name_prefix + platform + '_' + aweme_id + '_watermark' + '.mp4' + url = data.get('video_data').get('nwm_video_url_HQ') if not watermark else data.get('video_data').get('wm_video_url') + print('url: ', url) + file_path = root_path + "/" + file_name + print('file_path: ', file_path) + # 判断文件是否存在,存在就直接返回、 + if os.path.exists(file_path): + print('文件已存在,直接返回') + return FileResponse(path=file_path, media_type='video/mp4', filename=file_name) + else: + if platform == 'douyin': + r = requests.get(url=url, headers=headers, allow_redirects=False).headers + cdn_url = r.get('location') + r = requests.get(cdn_url).content + elif platform == 'tiktok': + r = requests.get(url=url, headers=headers).content + with open(file_path, 'wb') as f: + f.write(r) + return FileResponse(path=file_path, media_type='video/mp4', filename=file_name) + elif url_type == 'image': + url = data.get('image_data').get('no_watermark_image_list') if not watermark else data.get('image_data').get('watermark_image_list') + print('url: ', url) + zip_file_name = file_name_prefix + platform + '_' + aweme_id + '_images.zip' if not watermark else file_name_prefix + platform + '_' + aweme_id + '_images_watermark.zip' + zip_file_path = root_path + "/" + zip_file_name + print('zip_file_name: ', zip_file_name) + print('zip_file_path: ', zip_file_path) + # 判断文件是否存在,存在就直接返回、 + if os.path.exists(zip_file_path): + print('文件已存在,直接返回') + return FileResponse(path=zip_file_path, media_type='zip', filename=zip_file_name) + file_path_list = [] + for i in url: + r = requests.get(url=i, headers=headers) + content_type = r.headers.get('content-type') + file_format = content_type.split('/')[1] + r = r.content + index = int(url.index(i)) + file_name = file_name_prefix + platform + '_' + aweme_id + '_' + str(index + 1) + '.' + file_format if not watermark else \ + file_name_prefix + platform + '_' + aweme_id + '_' + str(index + 1) + '_watermark' + '.' + file_format + file_path = root_path + "/" + file_name + file_path_list.append(file_path) + print('file_path: ', file_path) + with open(file_path, 'wb') as f: + f.write(r) + if len(url) == len(file_path_list): + zip_file = zipfile.ZipFile(zip_file_path, 'w') + for f in file_path_list: + zip_file.write(os.path.join(f), f, zipfile.ZIP_DEFLATED) + zip_file.close() + return FileResponse(path=zip_file_path, media_type='zip', filename=zip_file_name) else: - post_content = find_url(content)[0] - try: - if 'douyin.com' in post_content: - # 获取视频信息 - result = api.douyin(post_content) - bgm_url = result['video_music'] - if bgm_url == "None": - return jsonify(Status='Failed', Reason='This link has no music to get!') - else: - # 视频标题 - bgm_title = result['video_music_title'] - # 作者昵称 - author_name = result['video_music_author'] - # 清理文件名 - file_name = clean_filename(bgm_title, author_name) - elif 'tiktok.com' in post_content: - # 获取视频信息 - result = api.tiktok(post_content) - # BGM链接 - bgm_url = result['video_music_url'] - # 视频标题 - bgm_title = result['video_music_title'] - # 作者昵称 - author_name = result['video_music_author'] - # 清理文件名 - file_name = clean_filename(bgm_title, author_name) - else: - return jsonify(Status='Failed', Reason='This link has no music to get!') - video_bgm = requests.get(bgm_url, headers).content - # 将bgm字节流封装成response对象 - response = make_response(video_bgm) - # 添加响应头部信息 - response.headers['Content-Type'] = "video/mp3" - # 他妈的,费了我老大劲才解决文件中文名的问题 + return ORJSONResponse(data) + + +# 批量下载文件端点/Batch download file endpoint +@app.get("/batch_download", tags=["Download"]) +async def batch_download_file(url_list: str, prefix: bool = True): + """ + 批量下载文件端点/Batch download file endpoint + 未完工/Unfinished + """ + print('url_list: ', url_list) + return ORJSONResponse({"status": "failed", + "message": "嘿嘿嘿,这个功能还没做呢,等我有空再做吧/Hehehe, this function hasn't been done yet, I'll do it when I have time"}) + + +# 抖音链接格式下载端点(video)/Douyin link format download endpoint(video) +@app.get("/video/{aweme_id}", tags=["Download"]) +async def download_douyin_video(aweme_id: str, prefix: bool = True, watermark: bool = False): + """ + ## 用途/Usage + ### [中文] + - 将抖音域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。 + - 例如原链接:https://douyin.com/video/1234567890123456789 改成 https://api.douyin.wtf/video/1234567890123456789 即可调用此端点。 + ### [English] + - Change the Douyin domain name to the current server domain name to call this endpoint and return the video file download request. + - For example, the original link: https://douyin.com/video/1234567890123456789 becomes https://api.douyin.wtf/video/1234567890123456789 to call this endpoint. + # 参数/Parameter + - aweme_id:str -> 抖音视频ID/Douyin video ID + - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix + - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark + """ + # 是否开启此端点/Whether to enable this endpoint + if config["Web_API"]["Download_Switch"] != "True": + return ORJSONResponse({"status": "endpoint closed", + "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"}) + video_url = f"https://www.douyin.com/video/{aweme_id}" + download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}" + return RedirectResponse(download_url) + + +# 抖音链接格式下载端点(video)/Douyin link format download endpoint(video) +@app.get("/note/{aweme_id}", tags=["Download"]) +async def download_douyin_video(aweme_id: str, prefix: bool = True, watermark: bool = False): + """ + ## 用途/Usage + ### [中文] + - 将抖音域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。 + - 例如原链接:https://douyin.com/video/1234567890123456789 改成 https://api.douyin.wtf/video/1234567890123456789 即可调用此端点。 + ### [English] + - Change the Douyin domain name to the current server domain name to call this endpoint and return the video file download request. + - For example, the original link: https://douyin.com/video/1234567890123456789 becomes https://api.douyin.wtf/video/1234567890123456789 to call this endpoint. + # 参数/Parameter + - aweme_id:str -> 抖音视频ID/Douyin video ID + - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix + - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark + """ + # 是否开启此端点/Whether to enable this endpoint + if config["Web_API"]["Download_Switch"] != "True": + return ORJSONResponse({"status": "endpoint closed", + "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"}) + video_url = f"https://www.douyin.com/video/{aweme_id}" + download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}" + return RedirectResponse(download_url) + + +# 抖音链接格式下载端点/Douyin link format download endpoint +@app.get("/discover", tags=["Download"]) +async def download_douyin_discover(modal_id: str, prefix: bool = True, watermark: bool = False): + """ + ## 用途/Usage + ### [中文] + - 将抖音域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。 + - 例如原链接:https://www.douyin.com/discover?modal_id=1234567890123456789 改成 https://api.douyin.wtf/discover?modal_id=1234567890123456789 即可调用此端点。 + ### [English] + - Change the Douyin domain name to the current server domain name to call this endpoint and return the video file download request. + - For example, the original link: https://douyin.com/discover?modal_id=1234567890123456789 becomes https://api.douyin.wtf/discover?modal_id=1234567890123456789 to call this endpoint. + # 参数/Parameter + - modal_id: str -> 抖音视频ID/Douyin video ID + - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix + - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark + """ + # 是否开启此端点/Whether to enable this endpoint + if config["Web_API"]["Download_Switch"] != "True": + return ORJSONResponse({"status": "endpoint closed", + "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"}) + video_url = f"https://www.douyin.com/discover?modal_id={modal_id}" + download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}" + return RedirectResponse(download_url) + + +# Tiktok链接格式下载端点(video)/Tiktok link format download endpoint(video) +@app.get("/{user_id}/video/{aweme_id}", tags=["Download"]) +async def download_tiktok_video(user_id: str, aweme_id: str, prefix: bool = True, watermark: bool = False): + """ + ## 用途/Usage + ### [中文] + - 将TikTok域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。 + - 例如原链接:https://www.tiktok.com/@evil0ctal/video/7156033831819037994 改成 https://api.douyin.wtf/@evil0ctal/video/7156033831819037994 即可调用此端点。 + ### [English] + - Change the TikTok domain name to the current server domain name to call this endpoint and return the video file download request. + - For example, the original link: https://www.tiktok.com/@evil0ctal/video/7156033831819037994 becomes https://api.douyin.wtf/@evil0ctal/video/7156033831819037994 to call this endpoint. + # 参数/Parameter + - user_id: str -> TikTok用户ID/TikTok user ID + - aweme_id: str -> TikTok视频ID/TikTok video ID + - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix + - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark + """ + # 是否开启此端点/Whether to enable this endpoint + if config["Web_API"]["Download_Switch"] != "True": + return ORJSONResponse({"status": "endpoint closed", + "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"}) + video_url = f"https://www.tiktok.com/{user_id}/video/{aweme_id}" + download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}" + return RedirectResponse(download_url) + + +# 定期清理[Download_Path]文件夹 +# Periodically clean the [Download_Path] folder +def cleanup_path(): + while True: + root_path = config["Web_API"]["Download_Path"] + timer = int(config["Web_API"]["Download_Path_Clean_Timer"]) + # 查看目录是否存在,不存在就跳过 + if os.path.exists(root_path): + time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + print(f"{time_now}: Cleaning up the download folder...") + for file in os.listdir("./download"): + file_path = os.path.join("./download", file) try: - filename = file_name.encode('latin-1') - except UnicodeEncodeError: - filenames = { - 'filename': unicodedata.normalize('NFKD', file_name).encode('latin-1', 'ignore'), - 'filename*': "UTF-8''{}".format(url_quote(file_name) + '.mp3'), - } - else: - filenames = {'filename': file_name + '.mp3'} - # attachment表示以附件形式下载 - response.headers.set('Content-Disposition', 'attachment', **filenames) - return response - except Exception as e: - return jsonify(status='failed', reason=str(e), function='download_music()', value=content) - else: - return jsonify(Status='Failed', Reason='This API is disabled. To enable it, set the value of "api_switch" to True.') + if os.path.isfile(file_path): + os.remove(file_path) + except Exception as e: + print(e) + else: + time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + print(f"{time_now}: The download folder does not exist, skipping...") + time.sleep(timer) + + +""" ________________________⬇️项目启动执行函数(Project start execution function)⬇️________________________""" + + +# 程序启动后执行/Execute after program startup +@app.on_event("startup") +async def startup_event(): + # 创建一个清理下载目录定时器线程并启动 + # Create a timer thread to clean up the download directory and start it + download_path_clean_switches = True if config["Web_API"]["Download_Path_Clean_Switch"] == "True" else False + if download_path_clean_switches: + # 启动清理线程/Start cleaning thread + thread_1 = threading.Thread(target=cleanup_path) + thread_1.start() if __name__ == '__main__': - # 开启WebAPI - if os.environ.get('PORT'): - port = int(os.environ.get('PORT')) - else: - # 默认端口 - port = api_config['Port'] - app.run(host='0.0.0.0', port=port) + # 建议使用gunicorn启动,使用uvicorn启动时请将debug设置为False + # It is recommended to use gunicorn to start, when using uvicorn to start, please set debug to False + # uvicorn web_api:app --host '0.0.0.0' --port 8000 --reload + uvicorn.run("web_api:app", host='0.0.0.0', port=port, reload=True, access_log=False)