diff --git a/scraper.py b/scraper.py index c2fcc61..4016b61 100644 --- a/scraper.py +++ b/scraper.py @@ -2,7 +2,8 @@ # -*- encoding: utf-8 -*- # @Author: https://github.com/Evil0ctal/ # @Time: 2021/11/06 -# @Update: 2022/10/08 +# @Update: 2022/11/06 +# @Version: 3.0.0 # @Function: # 核心代码,估值1块(๑•̀ㅂ•́)و✧ # 用于爬取Douyin/TikTok数据并以字典形式返回。 @@ -10,37 +11,57 @@ import re -import json -import time -import random +import orjson import requests +import traceback import configparser from tenacity import * class Scraper: """ - Scraper.douyin(link): - 输入参数为抖音视频/图集链接,完成解析后返回字典。 + 简介/Introduction - Scraper.tiktok(link): - 输入参数为TikTok视频/图集链接,完成解析后返回字典。 + Scraper.get_url(text: str) -> str or None + 用于检索出文本中的链接并返回/Used to retrieve the link in the text and return it. + + Scraper.convert_share_urls(self, url: str) -> str or None\n + 用于转换分享链接为原始链接/Convert share links to original links + + Scraper.get_douyin_video_id(self, original_url: str) -> str or None\n + 用于获取抖音视频ID/Get Douyin video ID + + Scraper.get_douyin_video_data(self, video_id: str) -> dict or None\n + 用于获取抖音视频数据/Get Douyin video data + + Scraper.get_douyin_live_video_data(self, original_url: str) -> str or None\n + 用于获取抖音直播视频数据/Get Douyin live video data + + Scraper.get_tiktok_video_id(self, original_url: str) -> str or None\n + 用于获取TikTok视频ID/Get TikTok video ID + + Scraper.get_tiktok_video_data(self, video_id: str) -> dict or None\n + 用于获取TikTok视频数据/Get TikTok video data + + Scraper.hybrid_parsing(self, video_url: str) -> dict\n + 用于混合解析/ Hybrid parsing + + Scraper.hybrid_parsing_minimal(data: dict) -> dict\n + 用于混合解析最小化/Hybrid parsing minimal """ + """__________________________________________⬇️initialization(初始化)⬇️______________________________________""" + + # 初始化/initialization def __init__(self): self.headers = { - 'user-agent': 'Mozilla/5.0 (Linux; Android 8.0; Pixel 2 Build/OPD3.170816.012) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Mobile Safari/537.36 Edg/87.0.664.66' + 'User-Agent': "Mozilla/5.0 (Linux; Android 8.0; Pixel 2 Build/OPD3.170816.012) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Mobile Safari/537.36 Edg/87.0.664.66" } - self.tiktok_headers = { - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", - "authority": "www.tiktok.com", - "Accept-Encoding": "gzip, deflate", - "Connection": "keep-alive", - "Host": "www.tiktok.com", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) coc_coc_browser/86.0.170 Chrome/80.0.3987.170 Safari/537.36", + self.douyin_cookies = { + 'Cookie': 'msToken=tsQyL2_m4XgtIij2GZfyu8XNXBfTGELdreF1jeIJTyktxMqf5MMIna8m1bv7zYz4pGLinNP2TvISbrzvFubLR8khwmAVLfImoWo3Ecnl_956MgOK9kOBdwM=; odin_tt=6db0a7d68fd2147ddaf4db0b911551e472d698d7b84a64a24cf07c49bdc5594b2fb7a42fd125332977218dd517a36ec3c658f84cebc6f806032eff34b36909607d5452f0f9d898810c369cd75fd5fb15; ttwid=1%7CfhiqLOzu_UksmD8_muF_TNvFyV909d0cw8CSRsmnbr0%7C1662368529%7C048a4e969ec3570e84a5faa3518aa7e16332cfc7fbcb789780135d33a34d94d2' } self.tiktok_api_headers = { - 'user-agent': 'com.ss.android.ugc.trill/2613 (Linux; U; Android 10; en_US; Pixel 4; Build/QQ3A.200805.001; Cronet/58.0.2991.0)' + 'User-Agent': 'com.ss.android.ugc.trill/2613 (Linux; U; Android 10; en_US; Pixel 4; Build/QQ3A.200805.001; Cronet/58.0.2991.0)' } self.app_config = configparser.ConfigParser() self.app_config.read('config.ini', encoding='utf-8') @@ -60,530 +81,436 @@ class Scraper: else: self.proxies = None - @retry(stop=stop_after_attempt(3), wait=wait_random(min=1, max=2)) - def douyin(self, original_url): - """ - 利用官方接口解析抖音链接信息 - :param original_url: 抖音/TikTok链接(支持长/短链接) - :return:包含信息的字典 - """ - headers = self.headers - try: - # 开始时间 - start = time.time() - # 判断是否为个人主页链接 - if 'user' in original_url: - return {'status': 'failed', 'reason': '暂不支持个人主页批量解析', 'function': 'Scraper.douyin()', - 'value': original_url} - else: - # 原视频链接 - r = requests.get(url=original_url, headers=headers, allow_redirects=False, proxies=self.proxies) - try: - # 2021/12/11 发现抖音做了限制,会自动重定向网址,但是可以从回执头中获取 - long_url = r.headers['Location'] - # 判断是否为个人主页链接 - if 'user' in long_url: - return {'status': 'failed', 'reason': '暂不支持个人主页批量解析', - 'function': 'Scraper.douyin()', - 'value': original_url} - except: - # 报错后判断为长链接,直接截取视频id - long_url = original_url - # 正则匹配出视频ID - try: - # 第一种链接类型 - # https://www.douyin.com/video/7086770907674348841 - key = re.findall('/video/(\d+)?', long_url)[0] - print('视频ID为: {}'.format(key)) - except Exception: - # 第二种链接类型 - # https://www.douyin.com/discover?modal_id=7086770907674348841 - key = re.findall('modal_id=(\d+)', long_url)[0] - print('视频ID为: {}'.format(key)) - # 构造抖音API链接 - api_url = f'https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids={key}' - print("正在请求抖音API链接: " + '\n' + api_url) - # 将回执以JSON格式处理 - js = json.loads(requests.get(url=api_url, headers=headers, proxies=self.proxies).text) - aweme_id = str(js['item_list'][0]['aweme_id']) - share_url = re.sub("/\\?.*", "", js['item_list'][0]['share_url']) - if share_url is None: - share_url = ( - "https://www.iesdouyin.com/share/video/" + aweme_id) if aweme_id is not None else original_url; - try: - music_share_url = "https://www.iesdouyin.com/share/music/" + str(js['item_list'][0]['music']['mid']) - except: - music_share_url = None - # 判断是否为图集 - if js['item_list'][0]['images'] is not None: - print("类型 = 图集") - # 类型为图集 - url_type = 'album' - # 图集标题 - album_title = str(js['item_list'][0]['desc']) - # 图集作者昵称 - album_author = str(js['item_list'][0]['author']['nickname']) - # 图集作者签名 - album_author_signature = str(js['item_list'][0]['author']['signature']) - # 图集作者UID - album_author_uid = str(js['item_list'][0]['author']['uid']) - # 图集作者抖音号 - album_author_id = str(js['item_list'][0]['author']['unique_id']) - if album_author_id == "": - # 如果作者未修改过抖音号,应使用此值以避免无法获取其抖音ID - album_author_id = str(js['item_list'][0]['author']['short_id']) - # 尝试获取图集BGM信息 - for key in js['item_list'][0]: - if key == 'music': - # 图集BGM链接 - album_music = str(js['item_list'][0]['music']['play_url']['url_list'][0] if len( - js['item_list'][0]['music']['play_url']['url_list']) > 0 else 'No BGM found') - # 图集BGM标题 - album_music_title = str(js['item_list'][0]['music']['title']) - # 图集BGM作者 - album_music_author = str(js['item_list'][0]['music']['author']) - # 图集BGM ID - album_music_id = str(js['item_list'][0]['music']['id']) - # 图集BGM MID - album_music_mid = str(js['item_list'][0]['music']['mid']) - break; - else: - # 图集BGM链接 - album_music = album_music_title = album_music_author = album_music_id = album_music_mid = 'No BGM found ' - # 图集ID - album_aweme_id = str(js['item_list'][0]['statistics']['aweme_id']) - # 评论数量 - album_comment_count = str(js['item_list'][0]['statistics']['comment_count']) - # 获赞数量 - album_digg_count = str(js['item_list'][0]['statistics']['digg_count']) - # 播放次数 - album_play_count = str(js['item_list'][0]['statistics']['play_count']) - # 分享次数 - album_share_count = str(js['item_list'][0]['statistics']['share_count']) - # 上传时间戳 - album_create_time = str(js['item_list'][0]['create_time']) - # 将话题保存在列表中 - album_hashtags = [] - for tag in js['item_list'][0]['text_extra']: - album_hashtags.append(tag['hashtag_name']) - # 将无水印图片链接保存在列表中 - images_list = [] - for data in js['item_list'][0]['images']: - images_list.append(data['url_list'][0]) - # 结束时间 - end = time.time() - # 解析时间 - analyze_time = format((end - start), '.4f') - # 将信息储存在字典中 - album_data = {'status': 'success', - 'analyze_time': (analyze_time + 's'), - 'url_type': url_type, - 'platform': 'douyin', - 'original_url': original_url, - 'share_url': share_url, - 'music_share_url': music_share_url, - 'api_url': api_url, - 'album_aweme_id': album_aweme_id, - 'album_title': album_title, - 'album_author': album_author, - 'album_author_signature': album_author_signature, - 'album_author_uid': album_author_uid, - 'album_author_id': album_author_id, - 'album_music': album_music, - 'album_music_title': album_music_title, - 'album_music_author': album_music_author, - 'album_music_id': album_music_id, - 'album_music_mid': album_music_mid, - 'album_comment_count': album_comment_count, - 'album_digg_count': album_digg_count, - 'album_play_count': album_play_count, - 'album_share_count': album_share_count, - 'album_create_time': album_create_time, - 'album_list': images_list, - 'album_hashtags': album_hashtags} - return album_data - else: - print("类型 = 视频") - # 类型为视频 - url_type = 'video' - # 视频标题 - video_title = str(js['item_list'][0]['desc']) - # 视频作者昵称 - video_author = str(js['item_list'][0]['author']['nickname']) - # 视频作者抖音号 - video_author_id = str(js['item_list'][0]['author']['unique_id']) - if video_author_id == "": - # 如果作者未修改过抖音号,应使用此值以避免无法获取其抖音ID - video_author_id = str(js['item_list'][0]['author']['short_id']) - # vid - vid = str(js['item_list'][0]['video']['vid']) - # 无水印1080p视频链接 - try: - r = requests.get( - "https://aweme.snssdk.com/aweme/v1/play/?video_id={}&radio=1080p&line=0".format(vid), - headers=headers, allow_redirects=False, proxies=self.proxies) - nwm_video_url_1080p = r.headers['Location'] - except: - nwm_video_url_1080p = "None" - # 有水印视频链接 - wm_video_url = str(js['item_list'][0]['video']['play_addr']['url_list'][0]) - # 无水印视频链接 (在回执JSON中将关键字'playwm'替换为'play'即可获得无水印地址) - nwm_video_url = str(js['item_list'][0]['video']['play_addr']['url_list'][0]).replace('playwm', - 'play') - # 去水印后视频链接(2022年1月1日抖音APi获取到的URL会进行跳转,需要在Location中获取直链) - r = requests.get(url=nwm_video_url, headers=headers, allow_redirects=False, proxies=self.proxies) - video_url = r.headers['Location'] - # 视频作者签名 - video_author_signature = str(js['item_list'][0]['author']['signature']) - # 视频作者UID - video_author_uid = str(js['item_list'][0]['author']['uid']) - # 尝试获取视频背景音乐 - for key in js['item_list'][0]: - if key == 'music': - if len(js['item_list'][0]['music']['play_url']['url_list']) != 0: - # 视频BGM链接 - video_music = str(js['item_list'][0]['music']['play_url']['url_list'][0]) - else: - video_music = 'No BGM found' - # 视频BGM标题 - video_music_title = str(js['item_list'][0]['music']['title']) - # 视频BGM作者 - video_music_author = str(js['item_list'][0]['music']['author']) - # 视频BGM ID - video_music_id = str(js['item_list'][0]['music']['id']) - # 视频BGM MID - video_music_mid = str(js['item_list'][0]['music']['mid']) - break; - else: - video_music = video_music_title = video_music_author = video_music_id = video_music_mid = 'No BGM found' - # 视频ID - video_aweme_id = str(js['item_list'][0]['statistics']['aweme_id']) - # 评论数量 - video_comment_count = str(js['item_list'][0]['statistics']['comment_count']) - # 获赞数量 - video_digg_count = str(js['item_list'][0]['statistics']['digg_count']) - # 播放次数 - video_play_count = str(js['item_list'][0]['statistics']['play_count']) - # 分享次数 - video_share_count = str(js['item_list'][0]['statistics']['share_count']) - # 上传时间戳 - video_create_time = str(js['item_list'][0]['create_time']) - # 视频封面 - video_cover = js['item_list'][0]['video']['cover']['url_list'][0] - # 视频动态封面 - video_dynamic_cover = js['item_list'][0]['video']['dynamic_cover']['url_list'][0] - # 视频原始封面 - video_origin_cover = js['item_list'][0]['video']['origin_cover']['url_list'][0] - # 将话题保存在列表中 - video_hashtags = [] - for tag in js['item_list'][0]['text_extra']: - video_hashtags.append(tag['hashtag_name']) - # 结束时间 - end = time.time() - # 解析时间 - analyze_time = format((end - start), '.4f') - # 返回包含数据的字典 - video_data = {'status': 'success', - 'analyze_time': (analyze_time + 's'), - 'url_type': url_type, - 'platform': 'douyin', - 'original_url': original_url, - 'share_url': share_url, - 'music_share_url': music_share_url, - 'api_url': api_url, - 'video_title': video_title, - 'nwm_video_url': video_url, - 'nwm_video_url_1080p': nwm_video_url_1080p, - 'wm_video_url': wm_video_url, - 'video_aweme_id': video_aweme_id, - 'video_author': video_author, - 'video_author_signature': video_author_signature, - 'video_author_uid': video_author_uid, - 'video_author_id': video_author_id, - 'video_music': video_music, - 'video_music_title': video_music_title, - 'video_music_author': video_music_author, - 'video_music_id': video_music_id, - 'video_music_mid': video_music_mid, - 'video_comment_count': video_comment_count, - 'video_digg_count': video_digg_count, - 'video_play_count': video_play_count, - 'video_share_count': video_share_count, - 'video_create_time': video_create_time, - 'video_cover': video_cover, - 'video_dynamic_cover': video_dynamic_cover, - 'video_origin_cover': video_origin_cover, - 'video_hashtags': video_hashtags} - return video_data - except Exception as e: - # 返回异常 - return {'status': 'failed', 'reason': e, 'function': 'Scraper.douyin()', 'value': original_url} + """__________________________________________⬇️utils(实用程序)⬇️______________________________________""" - @retry(stop=stop_after_attempt(3), wait=wait_random(min=1, max=2)) - def tiktok(self, original_url): - """ - 解析TikTok链接 - :param original_url:TikTok链接 - :return:包含信息的字典 - """ - headers = self.headers - # 开始时间 - start = time.time() - # 校验TikTok链接 - if '@' in original_url: - print("目标链接: ", original_url) - else: - # 从请求头中获取原始链接 - response = requests.get(url=original_url, headers=headers, allow_redirects=False, proxies=self.proxies) - true_link = response.headers['Location'].split("?")[0] - original_url = true_link - # TikTok请求头返回的第二种链接类型 - if '.html' in true_link: - response = requests.get(url=true_link, headers=headers, allow_redirects=False, proxies=self.proxies) - original_url = response.headers['Location'].split("?")[0] - print("目标链接: ", original_url) + # 检索字符串中的链接 + @staticmethod + def get_url(text: str) -> str or None: try: + # 从输入文字中提取索引链接存入列表/Extract index links from input text and store in list + url = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', text) + # 判断是否有链接/Check if there is a link + if len(url) > 0: + return url[0] + except Exception as e: + print(e) + return None + + # 转换链接/convert url + @retry(stop=stop_after_attempt(3), wait=wait_random(min=1, max=2)) + def convert_share_urls(self, url: str) -> str or None: + """ + 用于从短链接中获取长链接 + :return: 长链接 + """ + # 检索字符串中的链接/Retrieve links from string + url = self.get_url(url) + # 判断是否有链接/Check if there is a link + if url is None: + print('无法检索到链接/Unable to retrieve link') + return None + # 判断是否为抖音分享链接/judge if it is a douyin share link + if 'douyin' in url: + """ + 抖音视频链接类型(不全): + 1. https://v.douyin.com/MuKhKn3/ + 2. https://www.douyin.com/video/7157519152863890719 + 3. https://www.iesdouyin.com/share/video/7157519152863890719/?region=CN&mid=7157519152863890719&u_code=ffe6jgjg&titleType=title×tamp=1600000000&utm_source=copy_link&utm_campaign=client_share&utm_medium=android&app=aweme&iid=123456789&share_id=123456789 + 抖音用户链接类型(不全): + 1. https://www.douyin.com/user/MS4wLjABAAAAbLMPpOhVk441et7z7ECGcmGrK42KtoWOuR0_7pLZCcyFheA9__asY-kGfNAtYqXR?relation=0&vid=7157519152863890719 + 2. https://v.douyin.com/MuKoFP4/ + 抖音直播链接类型(不全): + 1. https://live.douyin.com/88815422890 + """ + if 'v.douyin' in url: + print('正在通过抖音分享链接获取原始链接...') + try: + response = requests.get(url, headers=self.headers, allow_redirects=False, + proxies=self.proxies) + if response.status_code == 302: + # 视频链接302重定向'Location'字段 + # https://www.iesdouyin.com/share/video/7148345687535570206/ + # 用户主页链接302重定向'Location'字段 + # https://www.iesdouyin.com/share/user/MS4wLjABAAAAbLMPpOhVk441et7z7ECGcmGrK42KtoWOuR0_7pLZCcyFheA9__asY-kGfNAtYqXR + url = response.headers['Location'].split('?')[0] if '?' in response.headers['Location'] else \ + response.headers['Location'] + print('获取原始链接成功, 原始链接为: {}'.format(url)) + return url + except Exception as e: + print('获取原始链接失败!') + print(e) + return None + elif 'www.douyin' in url: + print('该链接为原始链接,无需转换,原始链接为: {}'.format(url)) + return url + elif 'live.douyin' in url: + print('该链接为直播链接,无需转换,原始链接为: {}'.format(url)) + return url + # 判断是否为TikTok分享链接/judge if it is a TikTok share link + elif 'tiktok' in url: + """ + TikTok视频链接类型(不全): + 1. https://www.tiktok.com/@tiktok/video/6950000000000000000 + 2. https://www.tiktok.com/t/ZTRHcXS2C/ + TikTok用户链接类型(不全): + 1. https://www.tiktok.com/@tiktok + """ + if '@' in url: + print('该链接为原始链接,无需转换,原始链接为: {}'.format(url)) + return url + else: + print('正在通过TikTok分享链接获取原始链接...') + try: + response = requests.get(url, headers=self.headers, allow_redirects=False, + proxies=self.proxies) + if response.status_code == 301: + # 视频链接302重定向'Location'字段 + # https://www.tiktok.com/@tiktok/video/6950000000000000000 + # 用户主页链接302重定向'Location'字段 + # https://www.tiktok.com/@tiktok + url = response.headers['Location'].split('?')[0] if '?' in response.headers['Location'] else \ + response.headers['Location'] + print('获取原始链接成功, 原始链接为: {}'.format(url)) + return url + except Exception as e: + print('获取原始链接失败!') + print(e) + return None + + """__________________________________________⬇️Douyin methods(抖音方法)⬇️______________________________________""" + + # 获取抖音视频ID/Get Douyin video ID + def get_douyin_video_id(self, original_url: str) -> dict or None: + """ + 获取视频id + :param original_url: 视频链接 + :return: 视频id + """ + # 正则匹配出视频ID + try: + video_url = self.convert_share_urls(original_url) + # 链接类型: + # 视频页 https://www.douyin.com/video/7086770907674348841 + if '/video/' in video_url: + key = re.findall('/video/(\d+)?', video_url)[0] + print('获取到的抖音视频ID为: {}'.format(key)) + return key + # 发现页 https://www.douyin.com/discover?modal_id=7086770907674348841 + elif 'discover?' in video_url: + key = re.findall('modal_id=(\d+)', video_url)[0] + print('获取到的抖音视频ID为: {}'.format(key)) + return key + # 直播页 + if 'live.douyin' in video_url: + # https://live.douyin.com/1000000000000000000 + key = video_url.replace('https://live.douyin.com/', '') + print('获取到的抖音直播ID为: {}'.format(key)) + return key + except Exception as e: + print('获取抖音视频ID出错了:{}'.format(e)) + return None + + # 获取单个抖音视频数据/Get single Douyin video data + @retry(stop=stop_after_attempt(3), wait=wait_random(min=1, max=2)) + def get_douyin_video_data(self, video_id: str) -> dict or None: + """ + :param video_id: str - 抖音视频id + :return:dict - 包含信息的字典 + """ + print('正在获取抖音视频数据...') + try: + # 构造访问链接/Construct the access link + api_url = f"https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids={video_id}" + # 访问API/Access API + print("正在获取视频数据API: {}".format(api_url)) + response = requests.get(url=api_url, headers=self.headers, proxies=self.proxies, timeout=5).text + # 获取返回的json数据/Get the returned json data + data = orjson.loads(response) + # 获取视频数据/Get video data + video_data = data['item_list'][0] + print('获取视频数据成功!') + # print("抖音API返回数据: {}".format(video_data)) + return video_data + except Exception as e: + print('获取抖音视频数据失败!原因:{}'.format(e)) + return None + + # 获取单个抖音直播视频数据/Get single Douyin Live video data + @retry(stop=stop_after_attempt(3), wait=wait_random(min=1, max=2)) + def get_douyin_live_video_data(self, web_rid: str) -> dict or None: + print('正在获取抖音视频数据...') + try: + # 构造访问链接/Construct the access link + api_url = f"https://live.douyin.com/webcast/web/enter/?aid=6383&web_rid={web_rid}" + # 访问API/Access API + print("正在获取视频数据API: {}".format(api_url)) + response = requests.get(url=api_url, headers=self.douyin_cookies, proxies=self.proxies, timeout=5).text + # 获取返回的json数据/Get the returned json data + data = orjson.loads(response) + # 获取视频数据/Get video data + video_data = data['data'] + print('获取视频数据成功!') + # print("抖音API返回数据: {}".format(video_data)) + return video_data + except Exception as e: + print('获取抖音视频数据失败!原因:{}'.format(e)) + return None + + """__________________________________________⬇️TikTok methods(TikTok方法)⬇️______________________________________""" + + # 获取TikTok视频ID/Get TikTok video ID + def get_tiktok_video_id(self, original_url: str) -> str or None: + """ + 获取视频id + :param original_url: 视频链接 + :return: 视频id + """ + try: + # 转换链接/Convert link + original_url = self.convert_share_urls(original_url) # 获取视频ID video_id = re.findall('/video/(\d+)?', original_url)[0] print('获取到的TikTok视频ID是{}'.format(video_id)) - # 尝试从TikTok网页获取部分视频数据 - try: - tiktok_headers = self.tiktok_headers - html = requests.get(url=original_url, headers=tiktok_headers, proxies=self.proxies, timeout=1) - # 正则检索网页中存在的JSON信息 - resp = re.search('"ItemModule":{(.*)},"UserModule":', html.text).group(1) - resp_info = ('{"ItemModule":{' + resp + '}}') - result = json.loads(resp_info) - # 从网页中获得的视频JSON数据 - video_info = result["ItemModule"][video_id] - except: - video_info = None - # 准备API参数 - openudid = ''.join(random.sample('0123456789abcdef',16)) - uuid = ''.join(random.sample('01234567890123456',16)) - ts = int(time.time()) - # 构建API链接 - tiktok_api_link = 'https://api-h2.tiktokv.com/aweme/v1/feed/?aweme_id={}&version_name=26.1.3&version_code=2613&build_number=26.1.3&manifest_version_code=2613&update_version_code=2613&openudid={}&uuid={}&_rticket={}&ts={}&device_brand=Google&device_type=Pixel%204&device_platform=android&resolution=1080*1920&dpi=420&os_version=10&os_api=29&carrier_region=US&sys_region=US%C2%AEion=US&app_name=trill&app_language=en&language=en&timezone_name=America/New_York&timezone_offset=-14400&channel=googleplay&ac=wifi&mcc_mnc=310260&is_my_cn=0&aid=1180&ssmix=a&as=a1qwert123&cp=cbfhckdckkde1'.format( - video_id, openudid, uuid, ts*1000, ts) - print('正在请求API链接:{}'.format(tiktok_api_link)) - response = requests.get(url=tiktok_api_link, headers=self.tiktok_api_headers, proxies=self.proxies).text - # 将API获取到的内容格式化为JSON - result = json.loads(response) - # print(result) - if 'image_post_info' in response: - # 判断链接是图集链接 - url_type = 'album' - print('类型为图集/type album') - # 视频标题 - album_title = result["aweme_list"][0]["desc"] - # 视频作者昵称 - album_author_nickname = result["aweme_list"][0]['author']["nickname"] - # 视频作者ID - album_author_id = result["aweme_list"][0]['author']["unique_id"] - # 视频作者SEC_UID - album_author_sec_uid = result["aweme_list"][0]['author']["sec_uid"] - # 作者uid - album_author_uid = result["aweme_list"][0]['author']["uid"] - # 上传时间戳 - album_create_time = result["aweme_list"][0]['create_time'] - # 视频ID - album_aweme_id = result["aweme_list"][0]['statistics']['aweme_id'] - try: - # 视频BGM标题 - album_music_title = result["aweme_list"][0]['music']['title'] - # 视频BGM作者 - album_music_author = result["aweme_list"][0]['music']['author'] - # 视频BGM ID - album_music_id = result["aweme_list"][0]['music']['id'] - # 视频BGM链接 - album_music_url = result["aweme_list"][0]['music']['play_url']['url_list'][0] - except: - album_music_title, album_music_author, album_music_id, album_music_url = "None", "None", "None", "None" - # 评论数量 - album_comment_count = result["aweme_list"][0]['statistics']['comment_count'] - # 获赞数量 - album_digg_count = result["aweme_list"][0]['statistics']['digg_count'] - # 播放次数 - album_play_count = result["aweme_list"][0]['statistics']['play_count'] - # 下载次数 - album_download_count = result["aweme_list"][0]['statistics']['download_count'] - # 分享次数 - album_share_count = result["aweme_list"][0]['statistics']['share_count'] - # 无水印图集 - album_list = [] - for i in result["aweme_list"][0]['image_post_info']['images']: - album_list.append(i['display_image']['url_list'][0]) - # 结束时间 - end = time.time() - # 解析时间 - analyze_time = format((end - start), '.4f') - # 储存数据 - album_data = {'status': 'success', - 'analyze_time': (analyze_time + 's'), - 'url_type': url_type, - 'api_url': tiktok_api_link, - 'original_url': original_url, - 'platform': 'tiktok', - 'album_title': album_title, - 'album_list': album_list, - 'album_author_nickname': album_author_nickname, - 'album_author_id': album_author_id, - 'album_author_sec_uid': album_author_sec_uid, - 'album_author_uid': album_author_uid, - 'album_create_time': album_create_time, - 'album_aweme_id': album_aweme_id, - 'album_music_title': album_music_title, - 'album_music_author': album_music_author, - 'album_music_id': album_music_id, - 'album_music_url': album_music_url, - 'album_comment_count': album_comment_count, - 'album_digg_count': album_digg_count, - 'album_play_count': album_play_count, - 'album_share_count': album_share_count, - 'album_download_count': album_download_count - } - # 返回包含数据的字典 - return album_data - else: - # 类型为视频 - url_type = 'video' - print('类型为视频/type video') - # 无水印视频链接 - nwm_video_url = result["aweme_list"][0]["video"]["play_addr"]["url_list"][0] - try: - # 有水印视频链接 - wm_video_url = result["aweme_list"][0]["video"]['download_addr']['url_list'][0] - except Exception: - # 有水印视频链接 - wm_video_url = 'None' - # 视频标题 - video_title = result["aweme_list"][0]["desc"] - # 视频作者昵称 - video_author_nickname = result["aweme_list"][0]['author']["nickname"] - # 视频作者ID - video_author_id = result["aweme_list"][0]['author']["unique_id"] - # 视频作者SEC_UID - video_author_sec_uid = result["aweme_list"][0]['author']["sec_uid"] - # 视频作者UID - video_author_uid = result["aweme_list"][0]['author']["uid"] - # 上传时间戳 - video_create_time = result["aweme_list"][0]['create_time'] - # 视频ID - video_aweme_id = result["aweme_list"][0]['statistics']['aweme_id'] - try: - # 视频BGM标题 - video_music_title = result["aweme_list"][0]['music']['title'] - # 视频BGM作者 - video_music_author = result["aweme_list"][0]['music']['author'] - # 视频BGM ID - video_music_id = result["aweme_list"][0]['music']['id'] - # 视频BGM链接 - video_music_url = result["aweme_list"][0]['music']['play_url']['url_list'][0] - except: - video_music_title, video_music_author, video_music_id, video_music_url = "None", "None", "None", "None" - # 评论数量 - video_comment_count = result["aweme_list"][0]['statistics']['comment_count'] - # 获赞数量 - video_digg_count = result["aweme_list"][0]['statistics']['digg_count'] - # 播放次数 - video_play_count = result["aweme_list"][0]['statistics']['play_count'] - # 下载次数 - video_download_count = result["aweme_list"][0]['statistics']['download_count'] - # 分享次数 - video_share_count = result["aweme_list"][0]['statistics']['share_count'] - # 视频封面 - video_cover = result["aweme_list"][0]['video']['cover']['url_list'][0] - # 视频动态封面 - video_dynamic_cover = result["aweme_list"][0]['video']['dynamic_cover']['url_list'][0] - # 视频原始封面 - video_origin_cover = result["aweme_list"][0]['video']['origin_cover']['url_list'][0] - # 将话题保存在列表中 - video_hashtags = [] - for tag in result["aweme_list"][0]['text_extra']: - if 'hashtag_name' in tag: - video_hashtags.append(tag['hashtag_name']) - else: - continue - if video_info != None: - # 作者粉丝数量 - video_author_followerCount = video_info['authorStats']['followerCount'] - # 作者关注数量 - video_author_followingCount = video_info['authorStats']['followingCount'] - # 作者获赞数量 - video_author_heartCount = video_info['authorStats']['heartCount'] - # 作者视频数量 - video_author_videoCount = video_info['authorStats']['videoCount'] - # 作者已赞作品数量 - video_author_diggCount = video_info['authorStats']['diggCount'] - else: - # 作者粉丝数量 - video_author_followerCount = 'None' - # 作者关注数量 - video_author_followingCount = 'None' - # 作者获赞数量 - video_author_heartCount = 'None' - # 作者视频数量 - video_author_videoCount = 'None' - # 作者已赞作品数量 - video_author_diggCount = 'None' - # 结束时间 - end = time.time() - # 解析时间 - analyze_time = format((end - start), '.4f') - # 储存数据 - video_data = {'status': 'success', - 'analyze_time': (analyze_time + 's'), - 'url_type': url_type, - 'api_url': tiktok_api_link, - 'original_url': original_url, - 'platform': 'tiktok', - 'video_title': video_title, - 'nwm_video_url': nwm_video_url, - 'wm_video_url': wm_video_url, - 'video_author_nickname': video_author_nickname, - 'video_author_id': video_author_id, - 'video_author_sec_uid': video_author_sec_uid, - 'video_author_uid': video_author_uid, - 'video_create_time': video_create_time, - 'video_aweme_id': video_aweme_id, - 'video_music_title': video_music_title, - 'video_music_author': video_music_author, - 'video_music_id': video_music_id, - 'video_music_url': video_music_url, - 'video_comment_count': video_comment_count, - 'video_digg_count': video_digg_count, - 'video_play_count': video_play_count, - 'video_share_count': video_share_count, - 'video_download_count': video_download_count, - 'video_author_followerCount': video_author_followerCount, - 'video_author_followingCount': video_author_followingCount, - 'video_author_heartCount': video_author_heartCount, - 'video_author_videoCount': video_author_videoCount, - 'video_author_diggCount': video_author_diggCount, - 'video_cover': video_cover, - 'video_dynamic_cover': video_dynamic_cover, - 'video_origin_cover': video_origin_cover, - 'video_hashtags': video_hashtags - } - # 返回包含数据的字典 + # 返回视频ID/Return video ID + return video_id + except Exception as e: + print('获取TikTok视频ID出错了:{}'.format(e)) + return None + + @retry(stop=stop_after_attempt(3), wait=wait_random(min=1, max=2)) + def get_tiktok_video_data(self, video_id: str) -> dict or None: + """ + 获取单个视频信息 + :param video_id: 视频id + :return: 视频信息 + """ + print('正在获取TikTok视频数据...') + try: + api_url = f'https://api-h2.tiktokv.com/aweme/v1/feed/?aweme_id={video_id}&version_code=2613&aid=1180' + print("正在获取视频数据API: {}".format(api_url)) + response = requests.get(api_url, headers=self.tiktok_api_headers, + proxies=self.proxies) + if response.content != '': + data = orjson.loads(response.text) + video_data = data['aweme_list'][0] + print('获取视频信息成功!') return video_data except Exception as e: - # 异常捕获 - return {'status': 'failed', 'reason': e, 'function': 'Scraper.tiktok()', 'value': original_url} + print('获取视频信息失败!原因:{}'.format(e)) + return None + + """__________________________________________⬇️Hybrid methods(混合方法)⬇️______________________________________""" + + # 自定义获取数据/Custom data acquisition + def hybrid_parsing(self, video_url: str) -> dict: + # URL平台判断/Judge URL platform + url_platform = 'douyin' if 'douyin' in video_url else 'tiktok' + print('当前链接平台为:{}'.format(url_platform)) + # 获取视频ID/Get video ID + print("正在获取视频ID...") + video_id = self.get_douyin_video_id(video_url) if url_platform == 'douyin' else self.get_tiktok_video_id( + video_url) + if video_id: + print("获取视频ID成功,视频ID为:{}".format(video_id)) + # 获取视频数据/Get video data + print("正在获取视频数据...") + data = self.get_douyin_video_data(video_id) if url_platform == 'douyin' else self.get_tiktok_video_data( + video_id) + if data: + print("获取视频数据成功,正在判断数据类型...") + url_type_code = data['aweme_type'] + # 抖音类型代码例子 {'2': 'image', '4': 'video'} / TikTok type code example {'0': 'video', '150': 'image'} + print("数据类型代码: {}".format(url_type_code)) + # 判断链接类型/Judge link type + url_type = 'video' if url_type_code == 4 or url_type_code == 0 else 'image' + print("数据类型: {}".format(url_type)) + print("准备开始判断并处理数据...") + + """ + 以下为(视频||图片)数据处理的四个方法,如果你需要自定义数据处理请在这里修改. + The following are four methods of (video || image) data processing. + If you need to customize data processing, please modify it here. + """ + + """ + 创建已知数据字典(索引相同),稍后使用.update()方法更新数据 + Create a known data dictionary (index the same), + and then use the .update() method to update the data + """ + + result_data = { + 'status': 'success', + 'message': "更多接口请查看(More API see): https://api-v2.douyin.wtf/docs", + 'type': url_type, + 'platform': url_platform, + 'aweme_id': video_id, + 'official_api_url': + { + "User-Agent": self.headers["User-Agent"], + "api_url": f"https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids={video_id}" + } if url_platform == 'douyin' + else + { + "User-Agent": self.tiktok_api_headers["User-Agent"], + "api_url": f"https://api-h2.tiktokv.com/aweme/v1/feed/?aweme_id={video_id}&version_code=2613&aid=1180" + }, + 'desc': data["desc"], + 'create_time': data['create_time'], + 'author': data['author'], + 'music': data['music'], + 'statistics': data['statistics'], + 'cover_data': { + 'cover': data['video']['cover'], + 'origin_cover': data['video']['origin_cover'], + 'dynamic_cover': data['video']['dynamic_cover'] if url_type == 'video' else None + }, + 'hashtags': data['text_extra'] + } + # 创建一个空变量,稍后使用.update()方法更新数据/Create an empty variable and use the .update() method to update the data + api_data = None + # 判断链接类型并处理数据/Judge link type and process data + try: + # 抖音数据处理/Douyin data processing + if url_platform == 'douyin': + # 抖音视频数据处理/Douyin video data processing + if url_type == 'video': + print("正在处理抖音视频数据...") + # 将信息储存在字典中/Store information in a dictionary + uri = data['video']['play_addr']['uri'] + wm_video_url = data['video']['play_addr']['url_list'][0] + wm_video_url_HQ = f"https://aweme.snssdk.com/aweme/v1/playwm/?video_id={uri}&radio=1080p&line=0" + nwm_video_url = wm_video_url.replace('playwm', 'play') + nwm_video_url_HQ = f"https://aweme.snssdk.com/aweme/v1/play/?video_id={uri}&ratio=1080p&line=0" + api_data = { + 'video_data': + { + 'wm_video_url': wm_video_url, + 'wm_video_url_HQ': wm_video_url_HQ, + 'nwm_video_url': nwm_video_url, + 'nwm_video_url_HQ': nwm_video_url_HQ + } + } + # 抖音图片数据处理/Douyin image data processing + elif url_type == 'image': + print("正在处理抖音图片数据...") + # 无水印图片列表/No watermark image list + no_watermark_image_list = [] + # 有水印图片列表/With watermark image list + watermark_image_list = [] + # 遍历图片列表/Traverse image list + for i in data['images']: + no_watermark_image_list.append(i['url_list'][0]) + watermark_image_list.append(i['download_url_list'][0]) + api_data = { + 'image_data': + { + 'no_watermark_image_list': no_watermark_image_list, + 'watermark_image_list': watermark_image_list + } + } + # TikTok数据处理/TikTok data processing + elif url_platform == 'tiktok': + # TikTok视频数据处理/TikTok video data processing + if url_type == 'video': + print("正在处理TikTok视频数据...") + # 将信息储存在字典中/Store information in a dictionary + wm_video = data['video']['download_addr']['url_list'][0] + api_data = { + 'video_data': + { + 'wm_video_url': wm_video, + 'wm_video_url_HQ': wm_video, + 'nwm_video_url': data['video']['play_addr']['url_list'][0], + 'nwm_video_url_HQ': data['video']['bit_rate'][0]['play_addr']['url_list'][0] + } + } + # TikTok图片数据处理/TikTok image data processing + elif url_type == 'image': + print("正在处理TikTok图片数据...") + # 无水印图片列表/No watermark image list + no_watermark_image_list = [] + # 有水印图片列表/With watermark image list + watermark_image_list = [] + for i in data['image_post_info']['images']: + no_watermark_image_list.append(i['display_image']['url_list'][0]) + watermark_image_list.append(i['owner_watermark_image']['url_list'][0]) + api_data = { + 'image_data': + { + 'no_watermark_image_list': no_watermark_image_list, + 'watermark_image_list': watermark_image_list + } + } + # 更新数据/Update data + result_data.update(api_data) + # print("数据处理完成,最终数据: \n{}".format(result_data)) + # 返回数据/Return data + return result_data + except Exception as e: + traceback.print_exc() + print("数据处理失败!") + return {'status': 'failed', 'message': '数据处理失败!/Data processing failed!'} + else: + print("[抖音|TikTok方法]返回数据为空,无法处理!") + return {'status': 'failed', + 'message': '返回数据为空,无法处理!/Return data is empty and cannot be processed!'} + else: + print('获取视频ID失败!') + return {'status': 'failed', 'message': '获取视频ID失败!/Failed to get video ID!'} + + # 处理数据方便快捷指令使用/Process data for easy-to-use shortcuts + @staticmethod + def hybrid_parsing_minimal(data: dict) -> dict: + # 如果数据获取成功/If the data is successfully obtained + if data['status'] == 'success': + result = { + 'status': 'success', + 'message': data.get('message'), + 'platform': data.get('platform'), + 'type': data.get('type'), + 'desc': data.get('desc'), + 'wm_video_url': data['video_data']['wm_video_url'] if data['type'] == 'video' else None, + 'wm_video_url_HQ': data['video_data']['wm_video_url_HQ'] if data['type'] == 'video' else None, + 'nwm_video_url': data['video_data']['nwm_video_url'] if data['type'] == 'video' else None, + 'nwm_video_url_HQ': data['video_data']['nwm_video_url_HQ'] if data['type'] == 'video' else None, + 'no_watermark_image_list': data['image_data']['no_watermark_image_list'] if data[ + 'type'] == 'image' else None, + 'watermark_image_list': data['image_data']['watermark_image_list'] if data['type'] == 'image' else None + } + return result + else: + return data + + +"""__________________________________________⬇️Test methods(测试方法)⬇️______________________________________""" + + +# 测试/Test +def main_test(): + while True: + url = input("Enter your Douyin/TikTok url here to test: ") + if 'douyin.com' in url: + video_id = api.get_douyin_video_id(url) + if video_id: + video_data = api.get_douyin_video_data(video_id) + print(video_data) + else: + video_id = api.get_tiktok_video_id(url) + if video_id: + tiktok_data = api.get_tiktok_video_data(video_id) + print(tiktok_data) + + +def hybrid_test(): + while True: + url = input("Enter your Douyin/TikTok url here to test: ") + # 混合解析/Hybrid parsing + data = api.hybrid_parsing(url) + # 精简数据/Minimal data + minimal_data = api.hybrid_parsing_minimal(data) if __name__ == '__main__': + api = Scraper() # 测试类 - scraper = Scraper() - while True: - url = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', - input("Enter your Douyin/TikTok url here to test: "))[0] - try: - if 'douyin.com' in url: - douyin_date = scraper.douyin(url) - print(douyin_date) - else: - tiktok_date = scraper.tiktok(url) - print(tiktok_date) - except Exception as e: - print("Error: " + str(e)) + hybrid_test()