diff --git a/scraper.py b/scraper.py index d5bb89b..e3f3969 100644 --- a/scraper.py +++ b/scraper.py @@ -26,7 +26,6 @@ from tenacity import * class Scraper: - """__________________________________________⬇️initialization(初始化)⬇️______________________________________""" # 初始化/initialization @@ -43,6 +42,9 @@ class Scraper: self.tiktok_api_headers = { 'User-Agent': 'com.ss.android.ugc.trill/494+Mozilla/5.0+(Linux;+Android+12;+2112123G+Build/SKQ1.211006.001;+wv)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Version/4.0+Chrome/107.0.5304.105+Mobile+Safari/537.36' } + self.bilibili_api_headers = { + 'User-Agent': 'com.ss.android.ugc.trill/494+Mozilla/5.0+(Linux;+Android+12;+2112123G+Build/SKQ1.211006.001;+wv)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Version/4.0+Chrome/107.0.5304.105+Mobile+Safari/537.36' + } # 判断配置文件是否存在/Check if the configuration file exists if os.path.exists('config.ini'): self.config = configparser.ConfigParser() @@ -82,15 +84,15 @@ class Scraper: except Exception as e: print('Error in get_url:', e) return None - + @staticmethod - def relpath(file): - """ Always locate to the correct relative path. """ - from sys import _getframe - from pathlib import Path - frame = _getframe(1) - curr_file = Path(frame.f_code.co_filename) - return str(curr_file.parent.joinpath(file).resolve()) + def relpath(file): + """ Always locate to the correct relative path. """ + from sys import _getframe + from pathlib import Path + frame = _getframe(1) + curr_file = Path(frame.f_code.co_filename) + return str(curr_file.parent.joinpath(file).resolve()) # 生成X-Bogus签名/Generate X-Bogus signature @staticmethod @@ -177,6 +179,35 @@ class Scraper: print('获取原始链接失败!') print(e) return None + elif 'b23.tv' in url or "bilibili" in url: + """ + bilibili视频链接类型(不全): + 1. https://b23.tv/Ya65brl + 2. https://www.bilibili.com/video/BV1MK4y1w7MV/ + bilibili用户链接类型(不全): + 1. https://www.douyin.com/user/MS4wLjABAAAAbLMPpOhVk441et7z7ECGcmGrK42KtoWOuR0_7pLZCcyFheA9__asY-kGfNAtYqXR?relation=0&vid=7157519152863890719 + bilibili直播链接类型(不全): + """ + if 'b23.tv' in url: + print('正在通过哔哩哔哩分享链接获取原始链接...') + try: + async with aiohttp.ClientSession() as session: + async with session.get(url, headers=self.headers, proxy=self.proxies, allow_redirects=False, + timeout=10) as response: + if response.status == 302: + url = response.headers['Location'].split('?')[0] if '?' in response.headers[ + 'Location'] else \ + response.headers['Location'] + print('获取原始链接成功, 原始链接为: {}'.format(url)) + return url + except Exception as e: + print('获取原始链接失败!') + print(e) + # return None + raise e + else: + print('该链接为原始链接,无需转换,原始链接为: {}'.format(url)) + return url """__________________________________________⬇️Douyin methods(抖音方法)⬇️______________________________________""" @@ -187,7 +218,7 @@ class Scraper: 大家可以去他的仓库点个star :) 顺便打个广告, 如果需要更稳定、快速、长期维护的抖音/TikTok API, 或者需要更多的数据(APP端), 请移步: https://api.tikhub.io - + [English] Thanks to John for providing a lot of help to this project You can go to his repository and give him a star :) @@ -205,7 +236,8 @@ class Scraper: """ # 调用JavaScript函数 query = urllib.parse.urlparse(url).query - xbogus = execjs.compile(open(self.relpath('./X-Bogus.js')).read()).call('sign', query, self.headers['User-Agent']) + xbogus = execjs.compile(open(self.relpath('./X-Bogus.js')).read()).call('sign', query, + self.headers['User-Agent']) print('生成的X-Bogus签名为: {}'.format(xbogus)) new_url = url + "&X-Bogus=" + xbogus return new_url @@ -422,6 +454,67 @@ class Scraper: # return None raise e + """__________________________________________⬇️bilibili methods(Bilibili方法)⬇️______________________________________""" + + # 获取TikTok视频ID/Get TikTok video ID + async def get_bilibili_video_id(self, original_url: str) -> Union[str, None]: + """ + 获取视频id + :param original_url: 视频链接 + :return: 视频id + """ + try: + # 转换链接/Convert link + original_url = await self.convert_share_urls(original_url) + # 获取视频ID/Get video ID + if "video/BV" in original_url: + video_id = str('video/BV'.join(re.findall(r"BV([0-9,a-z,A-Z]+)[?]{0,1}.*", original_url))) + elif "video/av" in original_url: # + video_id = str('video/av'.join(re.findall(r"av([0-9,a-z,A-Z]+)[?]{0,1}.*", original_url))) + + print('获取到的BiliBili视频ID是{}'.format(video_id)) + # 返回视频ID/Return video ID + return video_id + except Exception as e: + print('获取BiliBili视频ID出错了:{}'.format(e)) + return None + + @retry(stop=stop_after_attempt(4), wait=wait_fixed(7)) + async def get_bilibili_video_data(self, video_id: str) -> Union[dict, None]: + """ + 获取单个视频信息 + :param video_id: 视频id + :return: 视频信息 + """ + print('正在获取BiliBili视频数据...') + try: + # 构造访问链接/Construct the access link + api_url = f'https://api.bilibili.com/x/web-interface/view?bvid={video_id.replace("video/BV", "")}' + if "video/av" in video_id: + api_url = f'https://api.bilibili.com/x/web-interface/view?aid={video_id.replace("video/av", "")}' + print("正在获取视频数据API: {}".format(api_url)) + # 这里获取的是m端端播放地址,清晰度不高,需要请求两次 第一次拿需要端参数第二次才能拿到最终的播放地址 + async with aiohttp.ClientSession() as session: + async with session.get(api_url, headers=self.bilibili_api_headers, proxy=self.proxies, + timeout=10) as response: + response = await response.json() + avid = response.get("data", {}).get("aid", "") + cid = response.get("data", {}).get("cid", "") + print('获取视频信息成功!') + play_url_api = f"https://api.bilibili.com/x/player/playurl?avid={avid}&cid={cid}&platform=html5" # platform 参数得加上不加上获取的播放地址403 待解决 + async with aiohttp.ClientSession() as session: + async with session.get(play_url_api, headers=self.bilibili_api_headers, proxy=self.proxies, + timeout=10) as response: + response = await response.json() + video_data = response.get("data", {}).get("durl", [])[0]["url"] + return video_data + + + except Exception as e: + print('获取视频信息失败!原因:{}'.format(e)) + # return None + raise e + """__________________________________________⬇️Hybrid methods(混合方法)⬇️______________________________________""" # 自定义获取数据/Custom data acquisition @@ -625,15 +718,22 @@ class Scraper: """__________________________________________⬇️Test methods(测试方法)⬇️______________________________________""" -async def async_test(_douyin_url: str = None, _tiktok_url: str = None) -> None: +async def async_test(_douyin_url: str = None, _tiktok_url: str = None, _bilibili_url: str = None) -> None: # 异步测试/Async test start_time = time.time() print("正在进行异步测试...") + print("正在测试异步获取哔哩哔哩视频ID方法...") + bilibili_id = await api.get_bilibili_video_id(_bilibili_url) + print("正在测试异步获取哔哩哔哩视频数据方法...") + bilibili_data = await api.get_bilibili_video_data(bilibili_id) + print(bilibili_data) + print("正在测试异步获取抖音视频ID方法...") douyin_id = await api.get_douyin_video_id(_douyin_url) print("正在测试异步获取抖音视频数据方法...") douyin_data = await api.get_douyin_video_data(douyin_id) + print(douyin_data) print("正在测试异步获取TikTok视频ID方法...") tiktok_id = await api.get_tiktok_video_id(_tiktok_url) @@ -656,4 +756,5 @@ if __name__ == '__main__': # api.generate_x_bogus(params) douyin_url = 'https://v.douyin.com/rLyrQxA/6.66' tiktok_url = 'https://www.tiktok.com/@evil0ctal/video/7217027383390555438' - asyncio.run(async_test(_douyin_url=douyin_url, _tiktok_url=tiktok_url)) + bilibili_url = "https://b23.tv/Ya65brl" + asyncio.run(async_test(_douyin_url=douyin_url, _tiktok_url=tiktok_url, _bilibili_url=bilibili_url))