diff --git a/web_api.py b/web_api.py
index 3a5097d..7fb9493 100644
--- a/web_api.py
+++ b/web_api.py
@@ -2,247 +2,701 @@
# -*- encoding: utf-8 -*-
# @Author: https://github.com/Evil0ctal/
# @Time: 2021/11/06
-# @Update: 2022/10/17
+# @Update: 2022/11/06
+# @Version: 3.0.0
# @Function:
# 创建一个接受提交参数的Flask应用程序。
# 将scraper.py返回的内容以JSON格式返回。
-# 默认运行端口2333, 请自行在config.ini中修改。
-import os
-import re
-import time
-import requests
-import unicodedata
+
import configparser
+import json
+import os
+import threading
+import time
+import zipfile
+
+import requests
+import uvicorn
+from fastapi import FastAPI
+from fastapi.responses import ORJSONResponse, FileResponse
+from pydantic import BaseModel
+from starlette.responses import RedirectResponse
+
+
from scraper import Scraper
-from werkzeug.urls import url_quote
-from flask import Flask, request, jsonify, make_response
-app = Flask(__name__)
-app_config = configparser.ConfigParser()
-app_config.read('config.ini', encoding='utf-8')
-api_config = app_config['Web_API']
-headers = {
- 'user-agent': 'Mozilla/5.0 (Linux; Android 8.0; Pixel 2 Build/OPD3.170816.012) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Mobile Safari/537.36 Edg/87.0.664.66'
-}
+# 读取配置文件
+config = configparser.ConfigParser()
+config.read('config.ini', encoding='utf-8')
+# 运行端口
+port = int(config["Web_API"]["Port"])
+# 域名
+domain = config["Web_API"]["Domain"]
+
+# 创建FastAPI实例
+title = "Douyin TikTok Download API(api.douyin.wtf)"
+version = '3.0.0'
+update_time = "2022/10/31"
+description = """
+#### Description/说明
+
+点击展开/Click to expand
+> [中文/Chinese]
+- 爬取Douyin以及TikTok的数据并返回,更多功能正在开发中。
+- 如果需要更多接口,请查看[https://api-v2.douyin.wtf/docs](https://api-v2.douyin.wtf/docs)。
+- 本项目开源在[GitHub:Douyin_TikTok_Download_API](https://github.com/Evil0ctal/Douyin_TikTok_Download_API)。
+- 全部端点数据均来自抖音以及TikTok的官方接口,如遇到问题或BUG或建议请在[issues](https://github.com/Evil0ctal/Douyin_TikTok_Download_API/issues)中反馈。
+- 本项目仅供学习交流使用,严禁用于违法用途,如有侵权请联系作者。
+> [英文/English]
+- Crawl the data of Douyin and TikTok and return it. More features are under development.
+- If you need more interfaces, please visit [https://api-v2.douyin.wtf/docs](https://api-v2.douyin.wtf/docs).
+- This project is open source on [GitHub: Douyin_TikTok_Download_API](https://github.com/Evil0ctal/Douyin_TikTok_Download_API).
+- All endpoint data comes from the official interface of Douyin and TikTok. If you have any questions or BUGs or suggestions, please feedback in [issues](
+- This project is for learning and communication only. It is strictly forbidden to be used for illegal purposes. If there is any infringement, please contact the author.
+
+#### Contact author/联系作者
+
+点击展开/Click to expand
+- WeChat: Evil0ctal
+- Email: [Evil0ctal1985@gmail.com](mailto:Evil0ctal1985@gmail.com)
+- Github: [https://github.com/Evil0ctal](https://github.com/Evil0ctal)
+
+"""
+tags_metadata = [
+ {
+ "name": "Root",
+ "description": "Root path info.",
+ },
+ {
+ "name": "API",
+ "description": "Hybrid interface, automatically determine the input link and return the simplified data/混合接口,自动判断输入链接返回精简后的数据。",
+ },
+ {
+ "name": "Douyin",
+ "description": "All Douyin API Endpoints/所有抖音接口节点",
+ },
+ {
+ "name": "TikTok",
+ "description": "All TikTok API Endpoints/所有TikTok接口节点",
+ },
+ {
+ "name": "Download",
+ "description": "Enter the share link and return the download file response./输入分享链接后返回下载文件响应",
+ },
+ {
+ "name": "iOS_Shortcut",
+ "description": "Get iOS shortcut info/获取iOS快捷指令信息",
+ },
+]
+
+# 创建Scraper对象
+api = Scraper()
+
+# 创建FastAPI实例
+app = FastAPI(
+ title=title,
+ description=description,
+ version=version,
+ openapi_tags=tags_metadata
+)
+
+""" ________________________⬇️端点响应模型(Endpoints Response Model)⬇️________________________"""
-def find_url(string):
- # 解析抖音分享口令中的链接并返回列表
- url = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', string)
- return url
+# API Root节点
+class APIRoot(BaseModel):
+ API_status: str
+ Version: str = version
+ Update_time: str = update_time
+ API_V1_Document: str
+ API_V2_Document: str
+ GitHub: str
-def clean_filename(string, author_name):
- # 替换不能用于文件名的字符('/ \ : * ? " < > |')
- rstr = r"[\/\\\:\*\?\"\<\>\|]"
- # 将上述字符替换为下划线
- new_title = re.sub(rstr, "_", string)
- # 新文件名
- filename = (new_title + '_' + author_name).replace('\n', '')
- return filename
+# API获取视频基础模型
+class iOS_Shortcut(BaseModel):
+ version: str = None
+ update: str = None
+ link: str = None
+ link_en: str = None
+ note: str = None
+ note_en: str = None
-@app.route("/", methods=["POST", "GET"])
-def index():
- # 显示基础信息
- index_info = {'API status': 'Running',
- 'GitHub': 'https://github.com/Evil0ctal/Douyin_TikTok_Download_API',
- 'Introduction': 'Free and open source Douyin/TikTok watermark-free video download tool, supports API calls.',
- 'Web interface': 'https://douyin.wtf/',
- 'iOS Shortcuts': 'https://api.douyin.wtf/ios',
- 'API V1': 'https://api.douyin.wtf/',
- 'API V2': 'https://api-v2.douyin.wtf/docs',
- 'Parsing Douyin/TikTok videos': 'https://api.douyin.wtf/api?url=[Douyin/TikTok url]',
- 'Return Video MP4 File Download': 'https://api.douyin.wtf/video?url=[Douyin/TikTok url]',
- 'Return Video MP3 File Download': 'https://api.douyin.wtf/music?url=[Douyin/TikTok url]'}
- return jsonify(index_info)
+# API获取视频基础模型
+class API_Video_Response(BaseModel):
+ status: str = None
+ platform: str = None
+ endpoint: str = None
+ message: str = None
+ total_time: float = None
+ aweme_list: list = None
-@app.route("/api", methods=["POST", "GET"])
-def webapi():
- # 创建一个Flask应用获取POST参数并返回结果
- api = Scraper()
- content = request.args.get("url")
- if content != '':
- try:
- post_content = find_url(content)[0] # 尝试找出提交值中的链接
- except:
- # 返回错误信息
- return jsonify(status='failed', reason="Can not find valid Douyin/TikTok URL", function='webapi()', value=content)
- if api_config['Allow_Logs'] == 'True':
- # 将API记录在API_logs.txt中
- date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- with open('API_logs.txt', 'a') as f:
- f.write(date + " : " + post_content + '\n')
- try:
- # 开始时间
- start = time.time()
- # 校验是否为TikTok链接
- if 'tiktok.com' in post_content:
- result = api.tiktok(post_content)
- # 以JSON格式返回TikTok信息
- return jsonify(result)
- # 如果关键字不存在则判断为抖音链接
- elif 'douyin.com' in post_content:
- result = api.douyin(post_content)
- # 以JSON格式返回返回Douyin信息
- return jsonify(result)
- except Exception as e:
+# 混合解析API基础模型:
+class API_Hybrid_Response(BaseModel):
+ status: str = None
+ message: str = None
+ endpoint: str = None
+ url: str = None
+ type: str = None
+ platform: str = None
+ aweme_id: str = None
+ total_time: float = None
+ official_api_url: dict = None
+ desc: str = None
+ create_time: int = None
+ author: dict = None
+ music: dict = None
+ statistics: dict = None
+ cover_data: dict = None
+ hashtags: list = None
+ video_data: dict = None
+ image_data: dict = None
+
+
+# 混合解析API精简版基础模型:
+class API_Hybrid_Minimal_Response(BaseModel):
+ status: str = None
+ message: str = None
+ platform: str = None
+ type: str = None
+ wm_video_url: str = None
+ wm_video_url_HQ: str = None
+ nwm_video_url: str = None
+ nwm_video_url_HQ: str = None
+ no_watermark_image_list: list or None = None
+ watermark_image_list: list or None = None
+
+
+""" ________________________⬇️端点日志记录(Endpoint logs)⬇️________________________"""
+
+
+# 记录API请求日志
+async def api_logs(start_time, input_data, endpoint, error_data: dict = None):
+ if config["Web_API"]["Allow_Logs"] == "True":
+ time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
+ total_time = float(format(time.time() - start_time, '.4f'))
+ file_name = "API_logs.json"
+ # 写入日志内容
+ with open(file_name, "a", encoding="utf-8") as f:
+ data = {
+ "time": time_now,
+ "endpoint": f'/{endpoint}/',
+ "total_time": total_time,
+ "input_data": input_data,
+ "error_data": error_data if error_data else "No error"
+ }
+ f.write(json.dumps(data, ensure_ascii=False) + ",\n")
+ print('日志记录成功!')
+ return 1
+ else:
+ print('日志记录已关闭!')
+ return 0
+
+
+
+""" ________________________⬇️Root端点(Root endpoint)⬇️________________________"""
+
+
+# Root端点
+@app.get("/", response_model=APIRoot, tags=["Root"])
+async def root():
+ """
+ Root path info.
+ """
+ data = {
+ "API_status": "Running",
+ "Version": version,
+ "Update_time": update_time,
+ "API_V1_Document": "https://api.douyin.wtf/docs",
+ "API_V2_Document": "https://api-v2.douyin.wtf/docs",
+ "GitHub": "https://github.com/Evil0ctal/Douyin_TikTok_Download_API",
+ }
+ return ORJSONResponse(data)
+
+
+""" ________________________⬇️混合解析端点(Hybrid parsing endpoints)⬇️________________________"""
+
+
+# 混合解析端点,自动判断输入链接返回精简后的数据
+# Hybrid parsing endpoint, automatically determine the input link and return the simplified data.
+@app.get("/api", tags=["API"], response_model=API_Hybrid_Response)
+async def hybrid_parsing(url: str, minimal: bool = False):
+ """
+ ## 用途/Usage
+ - 获取[抖音|TikTok]单个视频数据,参数是视频链接或分享口令。
+ - Get [Douyin|TikTok] single video data, the parameter is the video link or share code.
+ ## 参数/Parameter
+ #### url(必填/Required)):
+ - 视频链接。| 分享口令
+ - The video link.| Share code
+ - 例子/Example:
+ `https://www.douyin.com/video/7153585499477757192`
+ `https://v.douyin.com/MkmSwy7/`
+ `https://vm.tiktok.com/TTPdkQvKjP/`
+ `https://www.tiktok.com/@tvamii/video/7045537727743380782`
+ #### minimal(选填/Optional Default:False):
+ - 是否返回精简版数据。
+ - Whether to return simplified data.
+ - 例子/Example:
+ `True`
+ `False`
+ ## 返回值/Return
+ - 用户当个视频数据的列表,列表内包含JSON数据。
+ - List of user single video data, list contains JSON data.
+ """
+ print("正在进行混合解析...")
+ # 开始时间
+ start_time = time.time()
+ # 获取数据
+ data = api.hybrid_parsing(url)
+ # 是否精简
+ if minimal:
+ result = api.hybrid_parsing_minimal(data)
+ else:
+ # 更新数据
+ result = {
+ 'url': url,
+ "endpoint": "/api/",
+ "total_time": float(format(time.time() - start_time, '.4f')),
+ }
+ # 合并数据
+ result.update(data)
+ # 记录API调用
+ await api_logs(start_time=start_time,
+ input_data={'url': url},
+ endpoint='api')
+ return ORJSONResponse(result)
+
+
+""" ________________________⬇️抖音视频解析端点(Douyin video parsing endpoint)⬇️________________________"""
+
+
+# 获取抖音单个视频数据/Get Douyin single video data
+@app.get("/douyin_video_data/", response_model=API_Video_Response, tags=["Douyin"])
+async def get_douyin_video_data(douyin_video_url: str = None, video_id: str = None):
+ """
+ ## 用途/Usage
+ - 获取抖音用户单个视频数据,参数是视频链接|分享口令
+ - Get the data of a single video of a Douyin user, the parameter is the video link.
+ ## 参数/Parameter
+ #### douyin_video_url(选填/Optional):
+ - 视频链接。| 分享口令
+ - The video link.| Share code
+ - 例子/Example:
+ `https://www.douyin.com/video/7153585499477757192`
+ `https://v.douyin.com/MkmSwy7/`
+ #### video_id(选填/Optional):
+ - 视频ID,可以从视频链接中获取。
+ - The video ID, can be obtained from the video link.
+ - 例子/Example:
+ `7153585499477757192`
+ #### 备注/Note:
+ - 参数`douyin_video_url`和`video_id`二选一即可,如果都填写,优先使用`video_id`以获得更快的响应速度。
+ - The parameters `douyin_video_url` and `video_id` can be selected, if both are filled in, the `video_id` is used first to get a faster response speed.
+ ## 返回值/Return
+ - 用户当个视频数据的列表,列表内包含JSON数据。
+ - List of user single video data, list contains JSON data.
+ """
+ if video_id is None or video_id == '':
+ # 获取视频ID
+ video_id = api.get_douyin_video_id(douyin_video_url)
+ if video_id is None:
+ result = {
+ "status": "failed",
+ "platform": "douyin",
+ "message": "video_id获取失败/Failed to get video_id",
+ }
+ return ORJSONResponse(result)
+ if video_id is not None and video_id != '':
+ # 开始时间
+ start_time = time.time()
+ print('获取到的video_id数据:{}'.format(video_id))
+ if video_id is not None:
+ video_data = api.get_douyin_video_data(video_id=video_id)
+ if video_data is None:
+ result = {
+ "status": "failed",
+ "platform": "douyin",
+ "endpoint": "/douyin_video_data/",
+ "message": "视频API数据获取失败/Failed to get video API data",
+ }
+ return ORJSONResponse(result)
+ # print('获取到的video_data:{}'.format(video_data))
+ # 记录API调用
+ await api_logs(start_time=start_time,
+ input_data={'douyin_video_url': douyin_video_url, 'video_id': video_id},
+ endpoint='douyin_video_data')
# 结束时间
- end = time.time()
- # 解析时间
- analyze_time = (format((end - start), '.4f') + 's')
- # 返回错误信息
- return jsonify(status='failed', reason=str(e), time=analyze_time, function='webapi()', value=content)
- else:
- # 返回错误信息
- return jsonify(status='failed', reason='url value cannot be empty', function='webapi()', value=content)
-
-
-@app.route("/ios", methods=["POST", "GET"])
-def ios_shortcut():
- # 用于检查快捷指令更新
- return jsonify(version=api_config['iOS_Shortcut_Version'],
- update=api_config['iOS_Shortcut_Update_Time'],
- link=api_config['iOS_Shortcut_Link'],
- link_en=api_config['iOS_Shortcut_Link_EN'],
- note=api_config['iOS_Shortcut_Update_Note'],
- note_en=api_config['iOS_Shortcut_Update_Note_EN'])
-
-
-@app.route("/video", methods=["POST", "GET"])
-def download_video():
- # 用于返回视频下载请求(返回MP4文件下载请求,面对大量请求时非常吃服务器内存,容易崩,慎用。)
- # 将api_switch的值设定为False可关闭该API
- api_switch = api_config['Video_Download']
- if api_switch == 'True':
- api = Scraper()
- content = request.args.get("url")
- if content == '':
- return jsonify(status='failed', reason='url value cannot be empty', function='download_music()',
- value=content)
+ total_time = float(format(time.time() - start_time, '.4f'))
+ # 返回数据
+ result = {
+ "status": "success",
+ "platform": "douyin",
+ "endpoint": "/douyin_video_data/",
+ "message": "获取视频数据成功/Got video data successfully",
+ "total_time": total_time,
+ "aweme_list": [video_data]
+ }
+ return ORJSONResponse(result)
else:
- post_content = find_url(content)[0]
- try:
- if 'douyin.com' in post_content:
- # 获取视频信息
- result = api.douyin(post_content)
- # 视频链接
- video_url = result['nwm_video_url']
- # 视频标题
- video_title = result['video_title']
- # 作者昵称
- video_author = result['video_author']
- # 清理文件名
- file_name = clean_filename(video_title, video_author)
- elif 'tiktok.com' in post_content:
- # 获取视频信息
- result = api.tiktok(post_content)
- # 无水印地址
- video_url = result['nwm_video_url']
- # 视频标题
- video_title = result['video_title']
- # 作者昵称
- video_author = result['video_author_nickname']
- # 清理文件名
- file_name = clean_filename(video_title, video_author)
- else:
- return jsonify(Status='Failed', Reason='Check submitted parameters!')
- # 获取视频文件字节流
- video_mp4 = requests.get(video_url, headers).content
- # 将字节流封装成返回对象
- response = make_response(video_mp4)
- # 添加响应头部信息
- response.headers['Content-Type'] = "video/mp4"
- # 他妈的,费了我老大劲才解决文件中文名的问题
- try:
- filename = file_name.encode('latin-1')
- except UnicodeEncodeError:
- filenames = {
- 'filename': unicodedata.normalize('NFKD', file_name).encode('latin-1', 'ignore'),
- 'filename*': "UTF-8''{}".format(url_quote(file_name) + '.mp4'),
- }
- else:
- filenames = {'filename': file_name + '.mp4'}
- # attachment表示以附件形式下载
- response.headers.set('Content-Disposition', 'attachment', **filenames)
- return response
- except Exception as e:
- return jsonify(status='failed', reason=str(e), function='download_video()', value=content)
+ print('获取抖音video_id失败')
+ result = {
+ "status": "failed",
+ "platform": "douyin",
+ "endpoint": "/douyin_video_data/",
+ "message": "获取视频ID失败/Failed to get video ID",
+ "total_time": 0,
+ "aweme_list": []
+ }
+ return ORJSONResponse(result)
+
+
+""" ________________________⬇️TikTok视频解析端点(TikTok video parsing endpoint)⬇️________________________"""
+
+
+# 获取TikTok单个视频数据/Get TikTok single video data
+@app.get("/tiktok_video_data/", response_class=ORJSONResponse, response_model=API_Video_Response, tags=["TikTok"])
+async def get_tiktok_video_data(tiktok_video_url: str = None, video_id: str = None):
+ """
+ ## 用途/Usage
+ - 获取单个视频数据,参数是视频链接| 分享口令。
+ - Get single video data, the parameter is the video link.
+ ## 参数/Parameter
+ #### tiktok_video_url(选填/Optional):
+ - 视频链接。| 分享口令
+ - The video link.| Share code
+ - 例子/Example:
+ `https://www.tiktok.com/@evil0ctal/video/7156033831819037994`
+ `https://vm.tiktok.com/TTPdkQvKjP/`
+ #### video_id(选填/Optional):
+ - 视频ID,可以从视频链接中获取。
+ - The video ID, can be obtained from the video link.
+ - 例子/Example:
+ `7156033831819037994`
+ #### 备注/Note:
+ - 参数`tiktok_video_url`和`video_id`二选一即可,如果都填写,优先使用`video_id`以获得更快的响应速度。
+ - The parameters `tiktok_video_url` and `video_id` can be selected, if both are filled in, the `video_id` is used first to get a faster response speed.
+ ## 返回值/Return
+ - 用户当个视频数据的列表,列表内包含JSON数据。
+ - List of user single video data, list contains JSON data.
+ """
+ # 开始时间
+ start_time = time.time()
+ if video_id is None or video_id == "":
+ video_id = api.get_tiktok_video_id(tiktok_video_url)
+ if video_id is None:
+ return ORJSONResponse({"status": "fail", "platform": "tiktok", "endpoint": "/tiktok_video_data/",
+ "message": "获取视频ID失败/Get video ID failed"})
+ if video_id is not None and video_id != '':
+ print('开始解析单个TikTok视频数据')
+ video_data = api.get_tiktok_video_data(video_id)
+ # TikTok的API数据如果为空或者返回的数据中没有视频数据,就返回错误信息
+ # If the TikTok API data is empty or there is no video data in the returned data, an error message is returned
+ if video_data is None or video_data.get('aweme_id') != video_id:
+ print('视频数据获取失败/Failed to get video data')
+ result = {
+ "status": "failed",
+ "platform": "tiktok",
+ "endpoint": "/tiktok_video_data/",
+ "message": "视频数据获取失败/Failed to get video data"
+ }
+ return ORJSONResponse(result)
+ # 记录API调用
+ await api_logs(start_time=start_time,
+ input_data={'tiktok_video_url': tiktok_video_url, 'video_id': video_id},
+ endpoint='tiktok_video_data')
+ # 结束时间
+ total_time = float(format(time.time() - start_time, '.4f'))
+ # 返回数据
+ result = {
+ "status": "success",
+ "platform": "tiktok",
+ "endpoint": "/tiktok_video_data/",
+ "message": "获取视频数据成功/Got video data successfully",
+ "total_time": total_time,
+ "aweme_list": [video_data]
+ }
+ return ORJSONResponse(result)
else:
- return jsonify(Status='Failed', Reason='This API is disabled. To enable it, set the value of "api_switch" to True.')
+ print('视频链接错误/Video link error')
+ result = {
+ "status": "failed",
+ "platform": "tiktok",
+ "endpoint": "/tiktok_video_data/",
+ "message": "视频链接错误/Video link error"
+ }
+ return ORJSONResponse(result)
-@app.route("/music", methods=["POST", "GET"])
-def download_music():
- # 用于返回视频下载请求(返回MP3文件下载请求,面对大量请求时非常吃服务器内存,容易崩,慎用。)
- # 将api_switch的值设定为False可关闭该API
- api_switch = api_config['Music_Download']
- if api_switch == 'True':
- api = Scraper()
- content = request.args.get("url")
- if content == '':
- return jsonify(status='failed', reason='url value cannot be empty', function='download_music()',
- value=content)
+""" ________________________⬇️iOS快捷指令更新端点(iOS Shortcut update endpoint)⬇️________________________"""
+
+
+@app.get("/ios", response_model=iOS_Shortcut, tags=["iOS_Shortcut"])
+async def Get_Shortcut():
+ data = {
+ 'version': config["Web_API"]["iOS_Shortcut_Version"],
+ 'update': config["Web_API"]['iOS_Shortcut_Update_Time'],
+ 'link': config["Web_API"]['iOS_Shortcut_Link'],
+ 'link_en': config["Web_API"]['iOS_Shortcut_Link_EN'],
+ 'note': config["Web_API"]['iOS_Shortcut_Update_Note'],
+ 'note_en': config["Web_API"]['iOS_Shortcut_Update_Note_EN'],
+ }
+ return ORJSONResponse(data)
+
+
+""" ________________________⬇️下载文件端点/函数(Download file endpoints/functions)⬇️________________________"""
+
+
+# 下载文件端点/Download file endpoint
+@app.get("/download", tags=["Download"])
+async def download_file_hybrid(url: str, prefix: bool = True, watermark: bool = False):
+ """
+ ## 用途/Usage
+ ### [中文]
+ - 将[抖音|TikTok]链接作为参数提交至此端点,返回[视频|图片]文件下载请求。
+ ### [English]
+ - Submit the [Douyin|TikTok] link as a parameter to this endpoint and return the [video|picture] file download request.
+ # 参数/Parameter
+ - url:str -> [Douyin|TikTok] [视频|图片] 链接/ [Douyin|TikTok] [video|image] link
+ - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix
+ - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark
+ """
+ # 是否开启此端点/Whether to enable this endpoint
+ if config["Web_API"]["Download_Switch"] != "True":
+ return ORJSONResponse({"status": "endpoint closed",
+ "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"})
+ # 开始时间
+ start_time = time.time()
+ headers = {
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
+ }
+ data = api.hybrid_parsing(url)
+ if data is None:
+ return ORJSONResponse(data)
+ else:
+ # 记录API调用
+ await api_logs(start_time=start_time,
+ input_data={'url': url},
+ endpoint='download')
+ url_type = data.get('type')
+ platform = data.get('platform')
+ aweme_id = data.get('aweme_id')
+ file_name_prefix = config["Web_API"]["File_Name_Prefix"] if prefix else ''
+ root_path = config["Web_API"]["Download_Path"]
+ # 查看目录是否存在,不存在就创建
+ if not os.path.exists(root_path):
+ os.makedirs(root_path)
+ if url_type == 'video':
+ file_name = file_name_prefix + platform + '_' + aweme_id + '.mp4' if not watermark else file_name_prefix + platform + '_' + aweme_id + '_watermark' + '.mp4'
+ url = data.get('video_data').get('nwm_video_url_HQ') if not watermark else data.get('video_data').get('wm_video_url')
+ print('url: ', url)
+ file_path = root_path + "/" + file_name
+ print('file_path: ', file_path)
+ # 判断文件是否存在,存在就直接返回、
+ if os.path.exists(file_path):
+ print('文件已存在,直接返回')
+ return FileResponse(path=file_path, media_type='video/mp4', filename=file_name)
+ else:
+ if platform == 'douyin':
+ r = requests.get(url=url, headers=headers, allow_redirects=False).headers
+ cdn_url = r.get('location')
+ r = requests.get(cdn_url).content
+ elif platform == 'tiktok':
+ r = requests.get(url=url, headers=headers).content
+ with open(file_path, 'wb') as f:
+ f.write(r)
+ return FileResponse(path=file_path, media_type='video/mp4', filename=file_name)
+ elif url_type == 'image':
+ url = data.get('image_data').get('no_watermark_image_list') if not watermark else data.get('image_data').get('watermark_image_list')
+ print('url: ', url)
+ zip_file_name = file_name_prefix + platform + '_' + aweme_id + '_images.zip' if not watermark else file_name_prefix + platform + '_' + aweme_id + '_images_watermark.zip'
+ zip_file_path = root_path + "/" + zip_file_name
+ print('zip_file_name: ', zip_file_name)
+ print('zip_file_path: ', zip_file_path)
+ # 判断文件是否存在,存在就直接返回、
+ if os.path.exists(zip_file_path):
+ print('文件已存在,直接返回')
+ return FileResponse(path=zip_file_path, media_type='zip', filename=zip_file_name)
+ file_path_list = []
+ for i in url:
+ r = requests.get(url=i, headers=headers)
+ content_type = r.headers.get('content-type')
+ file_format = content_type.split('/')[1]
+ r = r.content
+ index = int(url.index(i))
+ file_name = file_name_prefix + platform + '_' + aweme_id + '_' + str(index + 1) + '.' + file_format if not watermark else \
+ file_name_prefix + platform + '_' + aweme_id + '_' + str(index + 1) + '_watermark' + '.' + file_format
+ file_path = root_path + "/" + file_name
+ file_path_list.append(file_path)
+ print('file_path: ', file_path)
+ with open(file_path, 'wb') as f:
+ f.write(r)
+ if len(url) == len(file_path_list):
+ zip_file = zipfile.ZipFile(zip_file_path, 'w')
+ for f in file_path_list:
+ zip_file.write(os.path.join(f), f, zipfile.ZIP_DEFLATED)
+ zip_file.close()
+ return FileResponse(path=zip_file_path, media_type='zip', filename=zip_file_name)
else:
- post_content = find_url(content)[0]
- try:
- if 'douyin.com' in post_content:
- # 获取视频信息
- result = api.douyin(post_content)
- bgm_url = result['video_music']
- if bgm_url == "None":
- return jsonify(Status='Failed', Reason='This link has no music to get!')
- else:
- # 视频标题
- bgm_title = result['video_music_title']
- # 作者昵称
- author_name = result['video_music_author']
- # 清理文件名
- file_name = clean_filename(bgm_title, author_name)
- elif 'tiktok.com' in post_content:
- # 获取视频信息
- result = api.tiktok(post_content)
- # BGM链接
- bgm_url = result['video_music_url']
- # 视频标题
- bgm_title = result['video_music_title']
- # 作者昵称
- author_name = result['video_music_author']
- # 清理文件名
- file_name = clean_filename(bgm_title, author_name)
- else:
- return jsonify(Status='Failed', Reason='This link has no music to get!')
- video_bgm = requests.get(bgm_url, headers).content
- # 将bgm字节流封装成response对象
- response = make_response(video_bgm)
- # 添加响应头部信息
- response.headers['Content-Type'] = "video/mp3"
- # 他妈的,费了我老大劲才解决文件中文名的问题
+ return ORJSONResponse(data)
+
+
+# 批量下载文件端点/Batch download file endpoint
+@app.get("/batch_download", tags=["Download"])
+async def batch_download_file(url_list: str, prefix: bool = True):
+ """
+ 批量下载文件端点/Batch download file endpoint
+ 未完工/Unfinished
+ """
+ print('url_list: ', url_list)
+ return ORJSONResponse({"status": "failed",
+ "message": "嘿嘿嘿,这个功能还没做呢,等我有空再做吧/Hehehe, this function hasn't been done yet, I'll do it when I have time"})
+
+
+# 抖音链接格式下载端点(video)/Douyin link format download endpoint(video)
+@app.get("/video/{aweme_id}", tags=["Download"])
+async def download_douyin_video(aweme_id: str, prefix: bool = True, watermark: bool = False):
+ """
+ ## 用途/Usage
+ ### [中文]
+ - 将抖音域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。
+ - 例如原链接:https://douyin.com/video/1234567890123456789 改成 https://api.douyin.wtf/video/1234567890123456789 即可调用此端点。
+ ### [English]
+ - Change the Douyin domain name to the current server domain name to call this endpoint and return the video file download request.
+ - For example, the original link: https://douyin.com/video/1234567890123456789 becomes https://api.douyin.wtf/video/1234567890123456789 to call this endpoint.
+ # 参数/Parameter
+ - aweme_id:str -> 抖音视频ID/Douyin video ID
+ - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix
+ - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark
+ """
+ # 是否开启此端点/Whether to enable this endpoint
+ if config["Web_API"]["Download_Switch"] != "True":
+ return ORJSONResponse({"status": "endpoint closed",
+ "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"})
+ video_url = f"https://www.douyin.com/video/{aweme_id}"
+ download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}"
+ return RedirectResponse(download_url)
+
+
+# 抖音链接格式下载端点(video)/Douyin link format download endpoint(video)
+@app.get("/note/{aweme_id}", tags=["Download"])
+async def download_douyin_video(aweme_id: str, prefix: bool = True, watermark: bool = False):
+ """
+ ## 用途/Usage
+ ### [中文]
+ - 将抖音域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。
+ - 例如原链接:https://douyin.com/video/1234567890123456789 改成 https://api.douyin.wtf/video/1234567890123456789 即可调用此端点。
+ ### [English]
+ - Change the Douyin domain name to the current server domain name to call this endpoint and return the video file download request.
+ - For example, the original link: https://douyin.com/video/1234567890123456789 becomes https://api.douyin.wtf/video/1234567890123456789 to call this endpoint.
+ # 参数/Parameter
+ - aweme_id:str -> 抖音视频ID/Douyin video ID
+ - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix
+ - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark
+ """
+ # 是否开启此端点/Whether to enable this endpoint
+ if config["Web_API"]["Download_Switch"] != "True":
+ return ORJSONResponse({"status": "endpoint closed",
+ "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"})
+ video_url = f"https://www.douyin.com/video/{aweme_id}"
+ download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}"
+ return RedirectResponse(download_url)
+
+
+# 抖音链接格式下载端点/Douyin link format download endpoint
+@app.get("/discover", tags=["Download"])
+async def download_douyin_discover(modal_id: str, prefix: bool = True, watermark: bool = False):
+ """
+ ## 用途/Usage
+ ### [中文]
+ - 将抖音域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。
+ - 例如原链接:https://www.douyin.com/discover?modal_id=1234567890123456789 改成 https://api.douyin.wtf/discover?modal_id=1234567890123456789 即可调用此端点。
+ ### [English]
+ - Change the Douyin domain name to the current server domain name to call this endpoint and return the video file download request.
+ - For example, the original link: https://douyin.com/discover?modal_id=1234567890123456789 becomes https://api.douyin.wtf/discover?modal_id=1234567890123456789 to call this endpoint.
+ # 参数/Parameter
+ - modal_id: str -> 抖音视频ID/Douyin video ID
+ - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix
+ - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark
+ """
+ # 是否开启此端点/Whether to enable this endpoint
+ if config["Web_API"]["Download_Switch"] != "True":
+ return ORJSONResponse({"status": "endpoint closed",
+ "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"})
+ video_url = f"https://www.douyin.com/discover?modal_id={modal_id}"
+ download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}"
+ return RedirectResponse(download_url)
+
+
+# Tiktok链接格式下载端点(video)/Tiktok link format download endpoint(video)
+@app.get("/{user_id}/video/{aweme_id}", tags=["Download"])
+async def download_tiktok_video(user_id: str, aweme_id: str, prefix: bool = True, watermark: bool = False):
+ """
+ ## 用途/Usage
+ ### [中文]
+ - 将TikTok域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。
+ - 例如原链接:https://www.tiktok.com/@evil0ctal/video/7156033831819037994 改成 https://api.douyin.wtf/@evil0ctal/video/7156033831819037994 即可调用此端点。
+ ### [English]
+ - Change the TikTok domain name to the current server domain name to call this endpoint and return the video file download request.
+ - For example, the original link: https://www.tiktok.com/@evil0ctal/video/7156033831819037994 becomes https://api.douyin.wtf/@evil0ctal/video/7156033831819037994 to call this endpoint.
+ # 参数/Parameter
+ - user_id: str -> TikTok用户ID/TikTok user ID
+ - aweme_id: str -> TikTok视频ID/TikTok video ID
+ - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix
+ - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark
+ """
+ # 是否开启此端点/Whether to enable this endpoint
+ if config["Web_API"]["Download_Switch"] != "True":
+ return ORJSONResponse({"status": "endpoint closed",
+ "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"})
+ video_url = f"https://www.tiktok.com/{user_id}/video/{aweme_id}"
+ download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}"
+ return RedirectResponse(download_url)
+
+
+# 定期清理[Download_Path]文件夹
+# Periodically clean the [Download_Path] folder
+def cleanup_path():
+ while True:
+ root_path = config["Web_API"]["Download_Path"]
+ timer = int(config["Web_API"]["Download_Path_Clean_Timer"])
+ # 查看目录是否存在,不存在就跳过
+ if os.path.exists(root_path):
+ time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
+ print(f"{time_now}: Cleaning up the download folder...")
+ for file in os.listdir("./download"):
+ file_path = os.path.join("./download", file)
try:
- filename = file_name.encode('latin-1')
- except UnicodeEncodeError:
- filenames = {
- 'filename': unicodedata.normalize('NFKD', file_name).encode('latin-1', 'ignore'),
- 'filename*': "UTF-8''{}".format(url_quote(file_name) + '.mp3'),
- }
- else:
- filenames = {'filename': file_name + '.mp3'}
- # attachment表示以附件形式下载
- response.headers.set('Content-Disposition', 'attachment', **filenames)
- return response
- except Exception as e:
- return jsonify(status='failed', reason=str(e), function='download_music()', value=content)
- else:
- return jsonify(Status='Failed', Reason='This API is disabled. To enable it, set the value of "api_switch" to True.')
+ if os.path.isfile(file_path):
+ os.remove(file_path)
+ except Exception as e:
+ print(e)
+ else:
+ time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
+ print(f"{time_now}: The download folder does not exist, skipping...")
+ time.sleep(timer)
+
+
+""" ________________________⬇️项目启动执行函数(Project start execution function)⬇️________________________"""
+
+
+# 程序启动后执行/Execute after program startup
+@app.on_event("startup")
+async def startup_event():
+ # 创建一个清理下载目录定时器线程并启动
+ # Create a timer thread to clean up the download directory and start it
+ download_path_clean_switches = True if config["Web_API"]["Download_Path_Clean_Switch"] == "True" else False
+ if download_path_clean_switches:
+ # 启动清理线程/Start cleaning thread
+ thread_1 = threading.Thread(target=cleanup_path)
+ thread_1.start()
if __name__ == '__main__':
- # 开启WebAPI
- if os.environ.get('PORT'):
- port = int(os.environ.get('PORT'))
- else:
- # 默认端口
- port = api_config['Port']
- app.run(host='0.0.0.0', port=port)
+ # 建议使用gunicorn启动,使用uvicorn启动时请将debug设置为False
+ # It is recommended to use gunicorn to start, when using uvicorn to start, please set debug to False
+ # uvicorn web_api:app --host '0.0.0.0' --port 8000 --reload
+ uvicorn.run("web_api:app", host='0.0.0.0', port=port, reload=True, access_log=False)