Update web_api.py
This commit is contained in:
parent
d4280cdc73
commit
a2fe29fa0d
1 changed files with 673 additions and 219 deletions
892
web_api.py
892
web_api.py
|
|
@ -2,247 +2,701 @@
|
|||
# -*- encoding: utf-8 -*-
|
||||
# @Author: https://github.com/Evil0ctal/
|
||||
# @Time: 2021/11/06
|
||||
# @Update: 2022/10/17
|
||||
# @Update: 2022/11/06
|
||||
# @Version: 3.0.0
|
||||
# @Function:
|
||||
# 创建一个接受提交参数的Flask应用程序。
|
||||
# 将scraper.py返回的内容以JSON格式返回。
|
||||
# 默认运行端口2333, 请自行在config.ini中修改。
|
||||
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
import requests
|
||||
import unicodedata
|
||||
|
||||
import configparser
|
||||
import json
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import zipfile
|
||||
|
||||
import requests
|
||||
import uvicorn
|
||||
from fastapi import FastAPI
|
||||
from fastapi.responses import ORJSONResponse, FileResponse
|
||||
from pydantic import BaseModel
|
||||
from starlette.responses import RedirectResponse
|
||||
|
||||
|
||||
from scraper import Scraper
|
||||
from werkzeug.urls import url_quote
|
||||
from flask import Flask, request, jsonify, make_response
|
||||
|
||||
app = Flask(__name__)
|
||||
app_config = configparser.ConfigParser()
|
||||
app_config.read('config.ini', encoding='utf-8')
|
||||
api_config = app_config['Web_API']
|
||||
headers = {
|
||||
'user-agent': 'Mozilla/5.0 (Linux; Android 8.0; Pixel 2 Build/OPD3.170816.012) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Mobile Safari/537.36 Edg/87.0.664.66'
|
||||
}
|
||||
# 读取配置文件
|
||||
config = configparser.ConfigParser()
|
||||
config.read('config.ini', encoding='utf-8')
|
||||
# 运行端口
|
||||
port = int(config["Web_API"]["Port"])
|
||||
# 域名
|
||||
domain = config["Web_API"]["Domain"]
|
||||
|
||||
# 创建FastAPI实例
|
||||
title = "Douyin TikTok Download API(api.douyin.wtf)"
|
||||
version = '3.0.0'
|
||||
update_time = "2022/10/31"
|
||||
description = """
|
||||
#### Description/说明
|
||||
<details>
|
||||
<summary>点击展开/Click to expand</summary>
|
||||
> [中文/Chinese]
|
||||
- 爬取Douyin以及TikTok的数据并返回,更多功能正在开发中。
|
||||
- 如果需要更多接口,请查看[https://api-v2.douyin.wtf/docs](https://api-v2.douyin.wtf/docs)。
|
||||
- 本项目开源在[GitHub:Douyin_TikTok_Download_API](https://github.com/Evil0ctal/Douyin_TikTok_Download_API)。
|
||||
- 全部端点数据均来自抖音以及TikTok的官方接口,如遇到问题或BUG或建议请在[issues](https://github.com/Evil0ctal/Douyin_TikTok_Download_API/issues)中反馈。
|
||||
- 本项目仅供学习交流使用,严禁用于违法用途,如有侵权请联系作者。
|
||||
> [英文/English]
|
||||
- Crawl the data of Douyin and TikTok and return it. More features are under development.
|
||||
- If you need more interfaces, please visit [https://api-v2.douyin.wtf/docs](https://api-v2.douyin.wtf/docs).
|
||||
- This project is open source on [GitHub: Douyin_TikTok_Download_API](https://github.com/Evil0ctal/Douyin_TikTok_Download_API).
|
||||
- All endpoint data comes from the official interface of Douyin and TikTok. If you have any questions or BUGs or suggestions, please feedback in [issues](
|
||||
- This project is for learning and communication only. It is strictly forbidden to be used for illegal purposes. If there is any infringement, please contact the author.
|
||||
</details>
|
||||
#### Contact author/联系作者
|
||||
<details>
|
||||
<summary>点击展开/Click to expand</summary>
|
||||
- WeChat: Evil0ctal
|
||||
- Email: [Evil0ctal1985@gmail.com](mailto:Evil0ctal1985@gmail.com)
|
||||
- Github: [https://github.com/Evil0ctal](https://github.com/Evil0ctal)
|
||||
</details>
|
||||
"""
|
||||
tags_metadata = [
|
||||
{
|
||||
"name": "Root",
|
||||
"description": "Root path info.",
|
||||
},
|
||||
{
|
||||
"name": "API",
|
||||
"description": "Hybrid interface, automatically determine the input link and return the simplified data/混合接口,自动判断输入链接返回精简后的数据。",
|
||||
},
|
||||
{
|
||||
"name": "Douyin",
|
||||
"description": "All Douyin API Endpoints/所有抖音接口节点",
|
||||
},
|
||||
{
|
||||
"name": "TikTok",
|
||||
"description": "All TikTok API Endpoints/所有TikTok接口节点",
|
||||
},
|
||||
{
|
||||
"name": "Download",
|
||||
"description": "Enter the share link and return the download file response./输入分享链接后返回下载文件响应",
|
||||
},
|
||||
{
|
||||
"name": "iOS_Shortcut",
|
||||
"description": "Get iOS shortcut info/获取iOS快捷指令信息",
|
||||
},
|
||||
]
|
||||
|
||||
# 创建Scraper对象
|
||||
api = Scraper()
|
||||
|
||||
# 创建FastAPI实例
|
||||
app = FastAPI(
|
||||
title=title,
|
||||
description=description,
|
||||
version=version,
|
||||
openapi_tags=tags_metadata
|
||||
)
|
||||
|
||||
""" ________________________⬇️端点响应模型(Endpoints Response Model)⬇️________________________"""
|
||||
|
||||
|
||||
def find_url(string):
|
||||
# 解析抖音分享口令中的链接并返回列表
|
||||
url = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', string)
|
||||
return url
|
||||
# API Root节点
|
||||
class APIRoot(BaseModel):
|
||||
API_status: str
|
||||
Version: str = version
|
||||
Update_time: str = update_time
|
||||
API_V1_Document: str
|
||||
API_V2_Document: str
|
||||
GitHub: str
|
||||
|
||||
|
||||
def clean_filename(string, author_name):
|
||||
# 替换不能用于文件名的字符('/ \ : * ? " < > |')
|
||||
rstr = r"[\/\\\:\*\?\"\<\>\|]"
|
||||
# 将上述字符替换为下划线
|
||||
new_title = re.sub(rstr, "_", string)
|
||||
# 新文件名
|
||||
filename = (new_title + '_' + author_name).replace('\n', '')
|
||||
return filename
|
||||
# API获取视频基础模型
|
||||
class iOS_Shortcut(BaseModel):
|
||||
version: str = None
|
||||
update: str = None
|
||||
link: str = None
|
||||
link_en: str = None
|
||||
note: str = None
|
||||
note_en: str = None
|
||||
|
||||
|
||||
@app.route("/", methods=["POST", "GET"])
|
||||
def index():
|
||||
# 显示基础信息
|
||||
index_info = {'API status': 'Running',
|
||||
'GitHub': 'https://github.com/Evil0ctal/Douyin_TikTok_Download_API',
|
||||
'Introduction': 'Free and open source Douyin/TikTok watermark-free video download tool, supports API calls.',
|
||||
'Web interface': 'https://douyin.wtf/',
|
||||
'iOS Shortcuts': 'https://api.douyin.wtf/ios',
|
||||
'API V1': 'https://api.douyin.wtf/',
|
||||
'API V2': 'https://api-v2.douyin.wtf/docs',
|
||||
'Parsing Douyin/TikTok videos': 'https://api.douyin.wtf/api?url=[Douyin/TikTok url]',
|
||||
'Return Video MP4 File Download': 'https://api.douyin.wtf/video?url=[Douyin/TikTok url]',
|
||||
'Return Video MP3 File Download': 'https://api.douyin.wtf/music?url=[Douyin/TikTok url]'}
|
||||
return jsonify(index_info)
|
||||
# API获取视频基础模型
|
||||
class API_Video_Response(BaseModel):
|
||||
status: str = None
|
||||
platform: str = None
|
||||
endpoint: str = None
|
||||
message: str = None
|
||||
total_time: float = None
|
||||
aweme_list: list = None
|
||||
|
||||
|
||||
@app.route("/api", methods=["POST", "GET"])
|
||||
def webapi():
|
||||
# 创建一个Flask应用获取POST参数并返回结果
|
||||
api = Scraper()
|
||||
content = request.args.get("url")
|
||||
if content != '':
|
||||
try:
|
||||
post_content = find_url(content)[0] # 尝试找出提交值中的链接
|
||||
except:
|
||||
# 返回错误信息
|
||||
return jsonify(status='failed', reason="Can not find valid Douyin/TikTok URL", function='webapi()', value=content)
|
||||
if api_config['Allow_Logs'] == 'True':
|
||||
# 将API记录在API_logs.txt中
|
||||
date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||||
with open('API_logs.txt', 'a') as f:
|
||||
f.write(date + " : " + post_content + '\n')
|
||||
try:
|
||||
# 开始时间
|
||||
start = time.time()
|
||||
# 校验是否为TikTok链接
|
||||
if 'tiktok.com' in post_content:
|
||||
result = api.tiktok(post_content)
|
||||
# 以JSON格式返回TikTok信息
|
||||
return jsonify(result)
|
||||
# 如果关键字不存在则判断为抖音链接
|
||||
elif 'douyin.com' in post_content:
|
||||
result = api.douyin(post_content)
|
||||
# 以JSON格式返回返回Douyin信息
|
||||
return jsonify(result)
|
||||
except Exception as e:
|
||||
# 混合解析API基础模型:
|
||||
class API_Hybrid_Response(BaseModel):
|
||||
status: str = None
|
||||
message: str = None
|
||||
endpoint: str = None
|
||||
url: str = None
|
||||
type: str = None
|
||||
platform: str = None
|
||||
aweme_id: str = None
|
||||
total_time: float = None
|
||||
official_api_url: dict = None
|
||||
desc: str = None
|
||||
create_time: int = None
|
||||
author: dict = None
|
||||
music: dict = None
|
||||
statistics: dict = None
|
||||
cover_data: dict = None
|
||||
hashtags: list = None
|
||||
video_data: dict = None
|
||||
image_data: dict = None
|
||||
|
||||
|
||||
# 混合解析API精简版基础模型:
|
||||
class API_Hybrid_Minimal_Response(BaseModel):
|
||||
status: str = None
|
||||
message: str = None
|
||||
platform: str = None
|
||||
type: str = None
|
||||
wm_video_url: str = None
|
||||
wm_video_url_HQ: str = None
|
||||
nwm_video_url: str = None
|
||||
nwm_video_url_HQ: str = None
|
||||
no_watermark_image_list: list or None = None
|
||||
watermark_image_list: list or None = None
|
||||
|
||||
|
||||
""" ________________________⬇️端点日志记录(Endpoint logs)⬇️________________________"""
|
||||
|
||||
|
||||
# 记录API请求日志
|
||||
async def api_logs(start_time, input_data, endpoint, error_data: dict = None):
|
||||
if config["Web_API"]["Allow_Logs"] == "True":
|
||||
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||||
total_time = float(format(time.time() - start_time, '.4f'))
|
||||
file_name = "API_logs.json"
|
||||
# 写入日志内容
|
||||
with open(file_name, "a", encoding="utf-8") as f:
|
||||
data = {
|
||||
"time": time_now,
|
||||
"endpoint": f'/{endpoint}/',
|
||||
"total_time": total_time,
|
||||
"input_data": input_data,
|
||||
"error_data": error_data if error_data else "No error"
|
||||
}
|
||||
f.write(json.dumps(data, ensure_ascii=False) + ",\n")
|
||||
print('日志记录成功!')
|
||||
return 1
|
||||
else:
|
||||
print('日志记录已关闭!')
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
""" ________________________⬇️Root端点(Root endpoint)⬇️________________________"""
|
||||
|
||||
|
||||
# Root端点
|
||||
@app.get("/", response_model=APIRoot, tags=["Root"])
|
||||
async def root():
|
||||
"""
|
||||
Root path info.
|
||||
"""
|
||||
data = {
|
||||
"API_status": "Running",
|
||||
"Version": version,
|
||||
"Update_time": update_time,
|
||||
"API_V1_Document": "https://api.douyin.wtf/docs",
|
||||
"API_V2_Document": "https://api-v2.douyin.wtf/docs",
|
||||
"GitHub": "https://github.com/Evil0ctal/Douyin_TikTok_Download_API",
|
||||
}
|
||||
return ORJSONResponse(data)
|
||||
|
||||
|
||||
""" ________________________⬇️混合解析端点(Hybrid parsing endpoints)⬇️________________________"""
|
||||
|
||||
|
||||
# 混合解析端点,自动判断输入链接返回精简后的数据
|
||||
# Hybrid parsing endpoint, automatically determine the input link and return the simplified data.
|
||||
@app.get("/api", tags=["API"], response_model=API_Hybrid_Response)
|
||||
async def hybrid_parsing(url: str, minimal: bool = False):
|
||||
"""
|
||||
## 用途/Usage
|
||||
- 获取[抖音|TikTok]单个视频数据,参数是视频链接或分享口令。
|
||||
- Get [Douyin|TikTok] single video data, the parameter is the video link or share code.
|
||||
## 参数/Parameter
|
||||
#### url(必填/Required)):
|
||||
- 视频链接。| 分享口令
|
||||
- The video link.| Share code
|
||||
- 例子/Example:
|
||||
`https://www.douyin.com/video/7153585499477757192`
|
||||
`https://v.douyin.com/MkmSwy7/`
|
||||
`https://vm.tiktok.com/TTPdkQvKjP/`
|
||||
`https://www.tiktok.com/@tvamii/video/7045537727743380782`
|
||||
#### minimal(选填/Optional Default:False):
|
||||
- 是否返回精简版数据。
|
||||
- Whether to return simplified data.
|
||||
- 例子/Example:
|
||||
`True`
|
||||
`False`
|
||||
## 返回值/Return
|
||||
- 用户当个视频数据的列表,列表内包含JSON数据。
|
||||
- List of user single video data, list contains JSON data.
|
||||
"""
|
||||
print("正在进行混合解析...")
|
||||
# 开始时间
|
||||
start_time = time.time()
|
||||
# 获取数据
|
||||
data = api.hybrid_parsing(url)
|
||||
# 是否精简
|
||||
if minimal:
|
||||
result = api.hybrid_parsing_minimal(data)
|
||||
else:
|
||||
# 更新数据
|
||||
result = {
|
||||
'url': url,
|
||||
"endpoint": "/api/",
|
||||
"total_time": float(format(time.time() - start_time, '.4f')),
|
||||
}
|
||||
# 合并数据
|
||||
result.update(data)
|
||||
# 记录API调用
|
||||
await api_logs(start_time=start_time,
|
||||
input_data={'url': url},
|
||||
endpoint='api')
|
||||
return ORJSONResponse(result)
|
||||
|
||||
|
||||
""" ________________________⬇️抖音视频解析端点(Douyin video parsing endpoint)⬇️________________________"""
|
||||
|
||||
|
||||
# 获取抖音单个视频数据/Get Douyin single video data
|
||||
@app.get("/douyin_video_data/", response_model=API_Video_Response, tags=["Douyin"])
|
||||
async def get_douyin_video_data(douyin_video_url: str = None, video_id: str = None):
|
||||
"""
|
||||
## 用途/Usage
|
||||
- 获取抖音用户单个视频数据,参数是视频链接|分享口令
|
||||
- Get the data of a single video of a Douyin user, the parameter is the video link.
|
||||
## 参数/Parameter
|
||||
#### douyin_video_url(选填/Optional):
|
||||
- 视频链接。| 分享口令
|
||||
- The video link.| Share code
|
||||
- 例子/Example:
|
||||
`https://www.douyin.com/video/7153585499477757192`
|
||||
`https://v.douyin.com/MkmSwy7/`
|
||||
#### video_id(选填/Optional):
|
||||
- 视频ID,可以从视频链接中获取。
|
||||
- The video ID, can be obtained from the video link.
|
||||
- 例子/Example:
|
||||
`7153585499477757192`
|
||||
#### 备注/Note:
|
||||
- 参数`douyin_video_url`和`video_id`二选一即可,如果都填写,优先使用`video_id`以获得更快的响应速度。
|
||||
- The parameters `douyin_video_url` and `video_id` can be selected, if both are filled in, the `video_id` is used first to get a faster response speed.
|
||||
## 返回值/Return
|
||||
- 用户当个视频数据的列表,列表内包含JSON数据。
|
||||
- List of user single video data, list contains JSON data.
|
||||
"""
|
||||
if video_id is None or video_id == '':
|
||||
# 获取视频ID
|
||||
video_id = api.get_douyin_video_id(douyin_video_url)
|
||||
if video_id is None:
|
||||
result = {
|
||||
"status": "failed",
|
||||
"platform": "douyin",
|
||||
"message": "video_id获取失败/Failed to get video_id",
|
||||
}
|
||||
return ORJSONResponse(result)
|
||||
if video_id is not None and video_id != '':
|
||||
# 开始时间
|
||||
start_time = time.time()
|
||||
print('获取到的video_id数据:{}'.format(video_id))
|
||||
if video_id is not None:
|
||||
video_data = api.get_douyin_video_data(video_id=video_id)
|
||||
if video_data is None:
|
||||
result = {
|
||||
"status": "failed",
|
||||
"platform": "douyin",
|
||||
"endpoint": "/douyin_video_data/",
|
||||
"message": "视频API数据获取失败/Failed to get video API data",
|
||||
}
|
||||
return ORJSONResponse(result)
|
||||
# print('获取到的video_data:{}'.format(video_data))
|
||||
# 记录API调用
|
||||
await api_logs(start_time=start_time,
|
||||
input_data={'douyin_video_url': douyin_video_url, 'video_id': video_id},
|
||||
endpoint='douyin_video_data')
|
||||
# 结束时间
|
||||
end = time.time()
|
||||
# 解析时间
|
||||
analyze_time = (format((end - start), '.4f') + 's')
|
||||
# 返回错误信息
|
||||
return jsonify(status='failed', reason=str(e), time=analyze_time, function='webapi()', value=content)
|
||||
else:
|
||||
# 返回错误信息
|
||||
return jsonify(status='failed', reason='url value cannot be empty', function='webapi()', value=content)
|
||||
|
||||
|
||||
@app.route("/ios", methods=["POST", "GET"])
|
||||
def ios_shortcut():
|
||||
# 用于检查快捷指令更新
|
||||
return jsonify(version=api_config['iOS_Shortcut_Version'],
|
||||
update=api_config['iOS_Shortcut_Update_Time'],
|
||||
link=api_config['iOS_Shortcut_Link'],
|
||||
link_en=api_config['iOS_Shortcut_Link_EN'],
|
||||
note=api_config['iOS_Shortcut_Update_Note'],
|
||||
note_en=api_config['iOS_Shortcut_Update_Note_EN'])
|
||||
|
||||
|
||||
@app.route("/video", methods=["POST", "GET"])
|
||||
def download_video():
|
||||
# 用于返回视频下载请求(返回MP4文件下载请求,面对大量请求时非常吃服务器内存,容易崩,慎用。)
|
||||
# 将api_switch的值设定为False可关闭该API
|
||||
api_switch = api_config['Video_Download']
|
||||
if api_switch == 'True':
|
||||
api = Scraper()
|
||||
content = request.args.get("url")
|
||||
if content == '':
|
||||
return jsonify(status='failed', reason='url value cannot be empty', function='download_music()',
|
||||
value=content)
|
||||
total_time = float(format(time.time() - start_time, '.4f'))
|
||||
# 返回数据
|
||||
result = {
|
||||
"status": "success",
|
||||
"platform": "douyin",
|
||||
"endpoint": "/douyin_video_data/",
|
||||
"message": "获取视频数据成功/Got video data successfully",
|
||||
"total_time": total_time,
|
||||
"aweme_list": [video_data]
|
||||
}
|
||||
return ORJSONResponse(result)
|
||||
else:
|
||||
post_content = find_url(content)[0]
|
||||
try:
|
||||
if 'douyin.com' in post_content:
|
||||
# 获取视频信息
|
||||
result = api.douyin(post_content)
|
||||
# 视频链接
|
||||
video_url = result['nwm_video_url']
|
||||
# 视频标题
|
||||
video_title = result['video_title']
|
||||
# 作者昵称
|
||||
video_author = result['video_author']
|
||||
# 清理文件名
|
||||
file_name = clean_filename(video_title, video_author)
|
||||
elif 'tiktok.com' in post_content:
|
||||
# 获取视频信息
|
||||
result = api.tiktok(post_content)
|
||||
# 无水印地址
|
||||
video_url = result['nwm_video_url']
|
||||
# 视频标题
|
||||
video_title = result['video_title']
|
||||
# 作者昵称
|
||||
video_author = result['video_author_nickname']
|
||||
# 清理文件名
|
||||
file_name = clean_filename(video_title, video_author)
|
||||
else:
|
||||
return jsonify(Status='Failed', Reason='Check submitted parameters!')
|
||||
# 获取视频文件字节流
|
||||
video_mp4 = requests.get(video_url, headers).content
|
||||
# 将字节流封装成返回对象
|
||||
response = make_response(video_mp4)
|
||||
# 添加响应头部信息
|
||||
response.headers['Content-Type'] = "video/mp4"
|
||||
# 他妈的,费了我老大劲才解决文件中文名的问题
|
||||
try:
|
||||
filename = file_name.encode('latin-1')
|
||||
except UnicodeEncodeError:
|
||||
filenames = {
|
||||
'filename': unicodedata.normalize('NFKD', file_name).encode('latin-1', 'ignore'),
|
||||
'filename*': "UTF-8''{}".format(url_quote(file_name) + '.mp4'),
|
||||
}
|
||||
else:
|
||||
filenames = {'filename': file_name + '.mp4'}
|
||||
# attachment表示以附件形式下载
|
||||
response.headers.set('Content-Disposition', 'attachment', **filenames)
|
||||
return response
|
||||
except Exception as e:
|
||||
return jsonify(status='failed', reason=str(e), function='download_video()', value=content)
|
||||
print('获取抖音video_id失败')
|
||||
result = {
|
||||
"status": "failed",
|
||||
"platform": "douyin",
|
||||
"endpoint": "/douyin_video_data/",
|
||||
"message": "获取视频ID失败/Failed to get video ID",
|
||||
"total_time": 0,
|
||||
"aweme_list": []
|
||||
}
|
||||
return ORJSONResponse(result)
|
||||
|
||||
|
||||
""" ________________________⬇️TikTok视频解析端点(TikTok video parsing endpoint)⬇️________________________"""
|
||||
|
||||
|
||||
# 获取TikTok单个视频数据/Get TikTok single video data
|
||||
@app.get("/tiktok_video_data/", response_class=ORJSONResponse, response_model=API_Video_Response, tags=["TikTok"])
|
||||
async def get_tiktok_video_data(tiktok_video_url: str = None, video_id: str = None):
|
||||
"""
|
||||
## 用途/Usage
|
||||
- 获取单个视频数据,参数是视频链接| 分享口令。
|
||||
- Get single video data, the parameter is the video link.
|
||||
## 参数/Parameter
|
||||
#### tiktok_video_url(选填/Optional):
|
||||
- 视频链接。| 分享口令
|
||||
- The video link.| Share code
|
||||
- 例子/Example:
|
||||
`https://www.tiktok.com/@evil0ctal/video/7156033831819037994`
|
||||
`https://vm.tiktok.com/TTPdkQvKjP/`
|
||||
#### video_id(选填/Optional):
|
||||
- 视频ID,可以从视频链接中获取。
|
||||
- The video ID, can be obtained from the video link.
|
||||
- 例子/Example:
|
||||
`7156033831819037994`
|
||||
#### 备注/Note:
|
||||
- 参数`tiktok_video_url`和`video_id`二选一即可,如果都填写,优先使用`video_id`以获得更快的响应速度。
|
||||
- The parameters `tiktok_video_url` and `video_id` can be selected, if both are filled in, the `video_id` is used first to get a faster response speed.
|
||||
## 返回值/Return
|
||||
- 用户当个视频数据的列表,列表内包含JSON数据。
|
||||
- List of user single video data, list contains JSON data.
|
||||
"""
|
||||
# 开始时间
|
||||
start_time = time.time()
|
||||
if video_id is None or video_id == "":
|
||||
video_id = api.get_tiktok_video_id(tiktok_video_url)
|
||||
if video_id is None:
|
||||
return ORJSONResponse({"status": "fail", "platform": "tiktok", "endpoint": "/tiktok_video_data/",
|
||||
"message": "获取视频ID失败/Get video ID failed"})
|
||||
if video_id is not None and video_id != '':
|
||||
print('开始解析单个TikTok视频数据')
|
||||
video_data = api.get_tiktok_video_data(video_id)
|
||||
# TikTok的API数据如果为空或者返回的数据中没有视频数据,就返回错误信息
|
||||
# If the TikTok API data is empty or there is no video data in the returned data, an error message is returned
|
||||
if video_data is None or video_data.get('aweme_id') != video_id:
|
||||
print('视频数据获取失败/Failed to get video data')
|
||||
result = {
|
||||
"status": "failed",
|
||||
"platform": "tiktok",
|
||||
"endpoint": "/tiktok_video_data/",
|
||||
"message": "视频数据获取失败/Failed to get video data"
|
||||
}
|
||||
return ORJSONResponse(result)
|
||||
# 记录API调用
|
||||
await api_logs(start_time=start_time,
|
||||
input_data={'tiktok_video_url': tiktok_video_url, 'video_id': video_id},
|
||||
endpoint='tiktok_video_data')
|
||||
# 结束时间
|
||||
total_time = float(format(time.time() - start_time, '.4f'))
|
||||
# 返回数据
|
||||
result = {
|
||||
"status": "success",
|
||||
"platform": "tiktok",
|
||||
"endpoint": "/tiktok_video_data/",
|
||||
"message": "获取视频数据成功/Got video data successfully",
|
||||
"total_time": total_time,
|
||||
"aweme_list": [video_data]
|
||||
}
|
||||
return ORJSONResponse(result)
|
||||
else:
|
||||
return jsonify(Status='Failed', Reason='This API is disabled. To enable it, set the value of "api_switch" to True.')
|
||||
print('视频链接错误/Video link error')
|
||||
result = {
|
||||
"status": "failed",
|
||||
"platform": "tiktok",
|
||||
"endpoint": "/tiktok_video_data/",
|
||||
"message": "视频链接错误/Video link error"
|
||||
}
|
||||
return ORJSONResponse(result)
|
||||
|
||||
|
||||
@app.route("/music", methods=["POST", "GET"])
|
||||
def download_music():
|
||||
# 用于返回视频下载请求(返回MP3文件下载请求,面对大量请求时非常吃服务器内存,容易崩,慎用。)
|
||||
# 将api_switch的值设定为False可关闭该API
|
||||
api_switch = api_config['Music_Download']
|
||||
if api_switch == 'True':
|
||||
api = Scraper()
|
||||
content = request.args.get("url")
|
||||
if content == '':
|
||||
return jsonify(status='failed', reason='url value cannot be empty', function='download_music()',
|
||||
value=content)
|
||||
""" ________________________⬇️iOS快捷指令更新端点(iOS Shortcut update endpoint)⬇️________________________"""
|
||||
|
||||
|
||||
@app.get("/ios", response_model=iOS_Shortcut, tags=["iOS_Shortcut"])
|
||||
async def Get_Shortcut():
|
||||
data = {
|
||||
'version': config["Web_API"]["iOS_Shortcut_Version"],
|
||||
'update': config["Web_API"]['iOS_Shortcut_Update_Time'],
|
||||
'link': config["Web_API"]['iOS_Shortcut_Link'],
|
||||
'link_en': config["Web_API"]['iOS_Shortcut_Link_EN'],
|
||||
'note': config["Web_API"]['iOS_Shortcut_Update_Note'],
|
||||
'note_en': config["Web_API"]['iOS_Shortcut_Update_Note_EN'],
|
||||
}
|
||||
return ORJSONResponse(data)
|
||||
|
||||
|
||||
""" ________________________⬇️下载文件端点/函数(Download file endpoints/functions)⬇️________________________"""
|
||||
|
||||
|
||||
# 下载文件端点/Download file endpoint
|
||||
@app.get("/download", tags=["Download"])
|
||||
async def download_file_hybrid(url: str, prefix: bool = True, watermark: bool = False):
|
||||
"""
|
||||
## 用途/Usage
|
||||
### [中文]
|
||||
- 将[抖音|TikTok]链接作为参数提交至此端点,返回[视频|图片]文件下载请求。
|
||||
### [English]
|
||||
- Submit the [Douyin|TikTok] link as a parameter to this endpoint and return the [video|picture] file download request.
|
||||
# 参数/Parameter
|
||||
- url:str -> [Douyin|TikTok] [视频|图片] 链接/ [Douyin|TikTok] [video|image] link
|
||||
- prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix
|
||||
- watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark
|
||||
"""
|
||||
# 是否开启此端点/Whether to enable this endpoint
|
||||
if config["Web_API"]["Download_Switch"] != "True":
|
||||
return ORJSONResponse({"status": "endpoint closed",
|
||||
"message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"})
|
||||
# 开始时间
|
||||
start_time = time.time()
|
||||
headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
||||
}
|
||||
data = api.hybrid_parsing(url)
|
||||
if data is None:
|
||||
return ORJSONResponse(data)
|
||||
else:
|
||||
# 记录API调用
|
||||
await api_logs(start_time=start_time,
|
||||
input_data={'url': url},
|
||||
endpoint='download')
|
||||
url_type = data.get('type')
|
||||
platform = data.get('platform')
|
||||
aweme_id = data.get('aweme_id')
|
||||
file_name_prefix = config["Web_API"]["File_Name_Prefix"] if prefix else ''
|
||||
root_path = config["Web_API"]["Download_Path"]
|
||||
# 查看目录是否存在,不存在就创建
|
||||
if not os.path.exists(root_path):
|
||||
os.makedirs(root_path)
|
||||
if url_type == 'video':
|
||||
file_name = file_name_prefix + platform + '_' + aweme_id + '.mp4' if not watermark else file_name_prefix + platform + '_' + aweme_id + '_watermark' + '.mp4'
|
||||
url = data.get('video_data').get('nwm_video_url_HQ') if not watermark else data.get('video_data').get('wm_video_url')
|
||||
print('url: ', url)
|
||||
file_path = root_path + "/" + file_name
|
||||
print('file_path: ', file_path)
|
||||
# 判断文件是否存在,存在就直接返回、
|
||||
if os.path.exists(file_path):
|
||||
print('文件已存在,直接返回')
|
||||
return FileResponse(path=file_path, media_type='video/mp4', filename=file_name)
|
||||
else:
|
||||
if platform == 'douyin':
|
||||
r = requests.get(url=url, headers=headers, allow_redirects=False).headers
|
||||
cdn_url = r.get('location')
|
||||
r = requests.get(cdn_url).content
|
||||
elif platform == 'tiktok':
|
||||
r = requests.get(url=url, headers=headers).content
|
||||
with open(file_path, 'wb') as f:
|
||||
f.write(r)
|
||||
return FileResponse(path=file_path, media_type='video/mp4', filename=file_name)
|
||||
elif url_type == 'image':
|
||||
url = data.get('image_data').get('no_watermark_image_list') if not watermark else data.get('image_data').get('watermark_image_list')
|
||||
print('url: ', url)
|
||||
zip_file_name = file_name_prefix + platform + '_' + aweme_id + '_images.zip' if not watermark else file_name_prefix + platform + '_' + aweme_id + '_images_watermark.zip'
|
||||
zip_file_path = root_path + "/" + zip_file_name
|
||||
print('zip_file_name: ', zip_file_name)
|
||||
print('zip_file_path: ', zip_file_path)
|
||||
# 判断文件是否存在,存在就直接返回、
|
||||
if os.path.exists(zip_file_path):
|
||||
print('文件已存在,直接返回')
|
||||
return FileResponse(path=zip_file_path, media_type='zip', filename=zip_file_name)
|
||||
file_path_list = []
|
||||
for i in url:
|
||||
r = requests.get(url=i, headers=headers)
|
||||
content_type = r.headers.get('content-type')
|
||||
file_format = content_type.split('/')[1]
|
||||
r = r.content
|
||||
index = int(url.index(i))
|
||||
file_name = file_name_prefix + platform + '_' + aweme_id + '_' + str(index + 1) + '.' + file_format if not watermark else \
|
||||
file_name_prefix + platform + '_' + aweme_id + '_' + str(index + 1) + '_watermark' + '.' + file_format
|
||||
file_path = root_path + "/" + file_name
|
||||
file_path_list.append(file_path)
|
||||
print('file_path: ', file_path)
|
||||
with open(file_path, 'wb') as f:
|
||||
f.write(r)
|
||||
if len(url) == len(file_path_list):
|
||||
zip_file = zipfile.ZipFile(zip_file_path, 'w')
|
||||
for f in file_path_list:
|
||||
zip_file.write(os.path.join(f), f, zipfile.ZIP_DEFLATED)
|
||||
zip_file.close()
|
||||
return FileResponse(path=zip_file_path, media_type='zip', filename=zip_file_name)
|
||||
else:
|
||||
post_content = find_url(content)[0]
|
||||
try:
|
||||
if 'douyin.com' in post_content:
|
||||
# 获取视频信息
|
||||
result = api.douyin(post_content)
|
||||
bgm_url = result['video_music']
|
||||
if bgm_url == "None":
|
||||
return jsonify(Status='Failed', Reason='This link has no music to get!')
|
||||
else:
|
||||
# 视频标题
|
||||
bgm_title = result['video_music_title']
|
||||
# 作者昵称
|
||||
author_name = result['video_music_author']
|
||||
# 清理文件名
|
||||
file_name = clean_filename(bgm_title, author_name)
|
||||
elif 'tiktok.com' in post_content:
|
||||
# 获取视频信息
|
||||
result = api.tiktok(post_content)
|
||||
# BGM链接
|
||||
bgm_url = result['video_music_url']
|
||||
# 视频标题
|
||||
bgm_title = result['video_music_title']
|
||||
# 作者昵称
|
||||
author_name = result['video_music_author']
|
||||
# 清理文件名
|
||||
file_name = clean_filename(bgm_title, author_name)
|
||||
else:
|
||||
return jsonify(Status='Failed', Reason='This link has no music to get!')
|
||||
video_bgm = requests.get(bgm_url, headers).content
|
||||
# 将bgm字节流封装成response对象
|
||||
response = make_response(video_bgm)
|
||||
# 添加响应头部信息
|
||||
response.headers['Content-Type'] = "video/mp3"
|
||||
# 他妈的,费了我老大劲才解决文件中文名的问题
|
||||
return ORJSONResponse(data)
|
||||
|
||||
|
||||
# 批量下载文件端点/Batch download file endpoint
|
||||
@app.get("/batch_download", tags=["Download"])
|
||||
async def batch_download_file(url_list: str, prefix: bool = True):
|
||||
"""
|
||||
批量下载文件端点/Batch download file endpoint
|
||||
未完工/Unfinished
|
||||
"""
|
||||
print('url_list: ', url_list)
|
||||
return ORJSONResponse({"status": "failed",
|
||||
"message": "嘿嘿嘿,这个功能还没做呢,等我有空再做吧/Hehehe, this function hasn't been done yet, I'll do it when I have time"})
|
||||
|
||||
|
||||
# 抖音链接格式下载端点(video)/Douyin link format download endpoint(video)
|
||||
@app.get("/video/{aweme_id}", tags=["Download"])
|
||||
async def download_douyin_video(aweme_id: str, prefix: bool = True, watermark: bool = False):
|
||||
"""
|
||||
## 用途/Usage
|
||||
### [中文]
|
||||
- 将抖音域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。
|
||||
- 例如原链接:https://douyin.com/video/1234567890123456789 改成 https://api.douyin.wtf/video/1234567890123456789 即可调用此端点。
|
||||
### [English]
|
||||
- Change the Douyin domain name to the current server domain name to call this endpoint and return the video file download request.
|
||||
- For example, the original link: https://douyin.com/video/1234567890123456789 becomes https://api.douyin.wtf/video/1234567890123456789 to call this endpoint.
|
||||
# 参数/Parameter
|
||||
- aweme_id:str -> 抖音视频ID/Douyin video ID
|
||||
- prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix
|
||||
- watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark
|
||||
"""
|
||||
# 是否开启此端点/Whether to enable this endpoint
|
||||
if config["Web_API"]["Download_Switch"] != "True":
|
||||
return ORJSONResponse({"status": "endpoint closed",
|
||||
"message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"})
|
||||
video_url = f"https://www.douyin.com/video/{aweme_id}"
|
||||
download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}"
|
||||
return RedirectResponse(download_url)
|
||||
|
||||
|
||||
# 抖音链接格式下载端点(video)/Douyin link format download endpoint(video)
|
||||
@app.get("/note/{aweme_id}", tags=["Download"])
|
||||
async def download_douyin_video(aweme_id: str, prefix: bool = True, watermark: bool = False):
|
||||
"""
|
||||
## 用途/Usage
|
||||
### [中文]
|
||||
- 将抖音域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。
|
||||
- 例如原链接:https://douyin.com/video/1234567890123456789 改成 https://api.douyin.wtf/video/1234567890123456789 即可调用此端点。
|
||||
### [English]
|
||||
- Change the Douyin domain name to the current server domain name to call this endpoint and return the video file download request.
|
||||
- For example, the original link: https://douyin.com/video/1234567890123456789 becomes https://api.douyin.wtf/video/1234567890123456789 to call this endpoint.
|
||||
# 参数/Parameter
|
||||
- aweme_id:str -> 抖音视频ID/Douyin video ID
|
||||
- prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix
|
||||
- watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark
|
||||
"""
|
||||
# 是否开启此端点/Whether to enable this endpoint
|
||||
if config["Web_API"]["Download_Switch"] != "True":
|
||||
return ORJSONResponse({"status": "endpoint closed",
|
||||
"message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"})
|
||||
video_url = f"https://www.douyin.com/video/{aweme_id}"
|
||||
download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}"
|
||||
return RedirectResponse(download_url)
|
||||
|
||||
|
||||
# 抖音链接格式下载端点/Douyin link format download endpoint
|
||||
@app.get("/discover", tags=["Download"])
|
||||
async def download_douyin_discover(modal_id: str, prefix: bool = True, watermark: bool = False):
|
||||
"""
|
||||
## 用途/Usage
|
||||
### [中文]
|
||||
- 将抖音域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。
|
||||
- 例如原链接:https://www.douyin.com/discover?modal_id=1234567890123456789 改成 https://api.douyin.wtf/discover?modal_id=1234567890123456789 即可调用此端点。
|
||||
### [English]
|
||||
- Change the Douyin domain name to the current server domain name to call this endpoint and return the video file download request.
|
||||
- For example, the original link: https://douyin.com/discover?modal_id=1234567890123456789 becomes https://api.douyin.wtf/discover?modal_id=1234567890123456789 to call this endpoint.
|
||||
# 参数/Parameter
|
||||
- modal_id: str -> 抖音视频ID/Douyin video ID
|
||||
- prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix
|
||||
- watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark
|
||||
"""
|
||||
# 是否开启此端点/Whether to enable this endpoint
|
||||
if config["Web_API"]["Download_Switch"] != "True":
|
||||
return ORJSONResponse({"status": "endpoint closed",
|
||||
"message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"})
|
||||
video_url = f"https://www.douyin.com/discover?modal_id={modal_id}"
|
||||
download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}"
|
||||
return RedirectResponse(download_url)
|
||||
|
||||
|
||||
# Tiktok链接格式下载端点(video)/Tiktok link format download endpoint(video)
|
||||
@app.get("/{user_id}/video/{aweme_id}", tags=["Download"])
|
||||
async def download_tiktok_video(user_id: str, aweme_id: str, prefix: bool = True, watermark: bool = False):
|
||||
"""
|
||||
## 用途/Usage
|
||||
### [中文]
|
||||
- 将TikTok域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。
|
||||
- 例如原链接:https://www.tiktok.com/@evil0ctal/video/7156033831819037994 改成 https://api.douyin.wtf/@evil0ctal/video/7156033831819037994 即可调用此端点。
|
||||
### [English]
|
||||
- Change the TikTok domain name to the current server domain name to call this endpoint and return the video file download request.
|
||||
- For example, the original link: https://www.tiktok.com/@evil0ctal/video/7156033831819037994 becomes https://api.douyin.wtf/@evil0ctal/video/7156033831819037994 to call this endpoint.
|
||||
# 参数/Parameter
|
||||
- user_id: str -> TikTok用户ID/TikTok user ID
|
||||
- aweme_id: str -> TikTok视频ID/TikTok video ID
|
||||
- prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix
|
||||
- watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark
|
||||
"""
|
||||
# 是否开启此端点/Whether to enable this endpoint
|
||||
if config["Web_API"]["Download_Switch"] != "True":
|
||||
return ORJSONResponse({"status": "endpoint closed",
|
||||
"message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"})
|
||||
video_url = f"https://www.tiktok.com/{user_id}/video/{aweme_id}"
|
||||
download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}"
|
||||
return RedirectResponse(download_url)
|
||||
|
||||
|
||||
# 定期清理[Download_Path]文件夹
|
||||
# Periodically clean the [Download_Path] folder
|
||||
def cleanup_path():
|
||||
while True:
|
||||
root_path = config["Web_API"]["Download_Path"]
|
||||
timer = int(config["Web_API"]["Download_Path_Clean_Timer"])
|
||||
# 查看目录是否存在,不存在就跳过
|
||||
if os.path.exists(root_path):
|
||||
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||||
print(f"{time_now}: Cleaning up the download folder...")
|
||||
for file in os.listdir("./download"):
|
||||
file_path = os.path.join("./download", file)
|
||||
try:
|
||||
filename = file_name.encode('latin-1')
|
||||
except UnicodeEncodeError:
|
||||
filenames = {
|
||||
'filename': unicodedata.normalize('NFKD', file_name).encode('latin-1', 'ignore'),
|
||||
'filename*': "UTF-8''{}".format(url_quote(file_name) + '.mp3'),
|
||||
}
|
||||
else:
|
||||
filenames = {'filename': file_name + '.mp3'}
|
||||
# attachment表示以附件形式下载
|
||||
response.headers.set('Content-Disposition', 'attachment', **filenames)
|
||||
return response
|
||||
except Exception as e:
|
||||
return jsonify(status='failed', reason=str(e), function='download_music()', value=content)
|
||||
else:
|
||||
return jsonify(Status='Failed', Reason='This API is disabled. To enable it, set the value of "api_switch" to True.')
|
||||
if os.path.isfile(file_path):
|
||||
os.remove(file_path)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
else:
|
||||
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||||
print(f"{time_now}: The download folder does not exist, skipping...")
|
||||
time.sleep(timer)
|
||||
|
||||
|
||||
""" ________________________⬇️项目启动执行函数(Project start execution function)⬇️________________________"""
|
||||
|
||||
|
||||
# 程序启动后执行/Execute after program startup
|
||||
@app.on_event("startup")
|
||||
async def startup_event():
|
||||
# 创建一个清理下载目录定时器线程并启动
|
||||
# Create a timer thread to clean up the download directory and start it
|
||||
download_path_clean_switches = True if config["Web_API"]["Download_Path_Clean_Switch"] == "True" else False
|
||||
if download_path_clean_switches:
|
||||
# 启动清理线程/Start cleaning thread
|
||||
thread_1 = threading.Thread(target=cleanup_path)
|
||||
thread_1.start()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 开启WebAPI
|
||||
if os.environ.get('PORT'):
|
||||
port = int(os.environ.get('PORT'))
|
||||
else:
|
||||
# 默认端口
|
||||
port = api_config['Port']
|
||||
app.run(host='0.0.0.0', port=port)
|
||||
# 建议使用gunicorn启动,使用uvicorn启动时请将debug设置为False
|
||||
# It is recommended to use gunicorn to start, when using uvicorn to start, please set debug to False
|
||||
# uvicorn web_api:app --host '0.0.0.0' --port 8000 --reload
|
||||
uvicorn.run("web_api:app", host='0.0.0.0', port=port, reload=True, access_log=False)
|
||||
|
|
|
|||
Loading…
Reference in a new issue