From f2b41b573bd9d17dce7ec6b57ded10e46c0b4439 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BF=9F=E6=8C=81=E6=B1=9F?= <129171955+2513502304@users.noreply.github.com> Date: Wed, 15 Jan 2025 18:18:36 +0800 Subject: [PATCH] =?UTF-8?q?Update=20core.py=EF=BC=8C=E4=BB=A5=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E6=8C=89=E7=85=A7=20START=5FDAY=20=E8=87=B3=20END=5FD?= =?UTF-8?q?AY=20=EF=BC=8C=E6=AF=8F=E4=B8=80=E5=A4=A9=E8=BF=9B=E8=A1=8C?= =?UTF-8?q?=E7=AD=9B=E9=80=89=EF=BC=8C=E8=BF=99=E6=A0=B7=E8=83=BD=E5=A4=9F?= =?UTF-8?q?=E7=AA=81=E7=A0=B4=201000=20=E6=9D=A1=E8=A7=86=E9=A2=91?= =?UTF-8?q?=E7=9A=84=E9=99=90=E5=88=B6=EF=BC=8C=E6=9C=80=E5=A4=A7=E7=A8=8B?= =?UTF-8?q?=E5=BA=A6=E7=88=AC=E5=8F=96=E8=AF=A5=E5=85=B3=E9=94=AE=E8=AF=8D?= =?UTF-8?q?=E4=B8=8B=E7=9A=84=E6=89=80=E6=9C=89=E8=A7=86=E9=A2=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 添加了`get_pubtime_datetime`函数用以获取`pubtime_begin_s`和`pubtime_end_s`参数,并为`search`函数添加了`ALL_DAY`选项,若`ALL_DAY`未开启,则保留原先的搜索策略,但每个关键词最多返回 1000 条数据,若`ALL_DAY`已开启,则使用新策略,按照 START_DAY 至 END_DAY 按照每一天进行筛选,这样能够突破 1000 条视频的限制,最大程度爬取该关键词下的所有视频,新添加的`get_pubtime_datetime`函数仅在`search`中使用,需要用户按安装`datetime`和`pandas`模块。已测试完毕 --- media_platform/bilibili/core.py | 143 +++++++++++++++++++++++--------- 1 file changed, 105 insertions(+), 38 deletions(-) diff --git a/media_platform/bilibili/core.py b/media_platform/bilibili/core.py index e698bbd..5fafc42 100644 --- a/media_platform/bilibili/core.py +++ b/media_platform/bilibili/core.py @@ -19,9 +19,10 @@ import os import random from asyncio import Task from typing import Dict, List, Optional, Tuple, Union +from datetime import datetime, timedelta +import pandas as pd -from playwright.async_api import (BrowserContext, BrowserType, Page, - async_playwright) +from playwright.async_api import (BrowserContext, BrowserType, Page, async_playwright) import config from base.base_crawler import AbstractCrawler @@ -95,56 +96,122 @@ class BilibiliCrawler(AbstractCrawler): utils.logger.info( "[BilibiliCrawler.start] Bilibili Crawler finished ...") + async def get_pubtime_datetime(self, start: str = config.START_DAY, end: str = config.END_DAY) -> tuple[str, str]: + """ + 获取 bilibili 作品发布日期起始时间戳 pubtime_begin_s 与发布日期结束时间戳 pubtime_end_s + --- + :param start: 发布日期起始时间,YYYY-MM-DD + :param end: 发布日期结束时间,YYYY-MM-DD + + Note + --- + - 搜索的时间范围为 start 至 end,包含 start 和 end + - 若要搜索同一天的内容,为了包含 start 当天的搜索内容,则 pubtime_end_s 的值应该为 pubtime_begin_s 的值加上一天再减去一秒,即 start 当天的最后一秒 + - 如仅搜索 2024-01-05 的内容,pubtime_begin_s = 1704384000,pubtime_end_s = 1704470399 + 转换为可读的 datetime 对象:pubtime_begin_s = datetime.datetime(2024, 1, 5, 0, 0),pubtime_end_s = datetime.datetime(2024, 1, 5, 23, 59, 59) + - 若要搜索 start 至 end 的内容,为了包含 end 当天的搜索内容,则 pubtime_end_s 的值应该为 pubtime_end_s 的值加上一天再减去一秒,即 end 当天的最后一秒 + - 如搜索 2024-01-05 - 2024-01-06 的内容,pubtime_begin_s = 1704384000,pubtime_end_s = 1704556799 + 转换为可读的 datetime 对象:pubtime_begin_s = datetime.datetime(2024, 1, 5, 0, 0),pubtime_end_s = datetime.datetime(2024, 1, 6, 23, 59, 59) + """ + # 转换 start 与 end 为 datetime 对象 + start_day: datetime = datetime.strptime(start, '%Y-%m-%d') + end_day: datetime = datetime.strptime(end, '%Y-%m-%d') + if start_day > end_day: + raise ValueError('Wrong time range, please check your start and end argument, to ensure that the start cannot exceed end') + elif start_day == end_day: # 搜索同一天的内容 + end_day = start_day + timedelta(days=1) - timedelta(seconds=1) # 则将 end_day 设置为 start_day + 1 day - 1 second + else: # 搜索 start 至 end + end_day = end_day + timedelta(days=1) - timedelta(seconds=1) # 则将 end_day 设置为 end_day + 1 day - 1 second + # 将其重新转换为时间戳 + return str(int(start_day.timestamp())), str(int(end_day.timestamp())) + async def search(self): """ search bilibili video with keywords :return: """ - utils.logger.info( - "[BilibiliCrawler.search] Begin search bilibli keywords") + utils.logger.info("[BilibiliCrawler.search] Begin search bilibli keywords") bili_limit_count = 20 # bilibili limit page fixed value if config.CRAWLER_MAX_NOTES_COUNT < bili_limit_count: config.CRAWLER_MAX_NOTES_COUNT = bili_limit_count start_page = config.START_PAGE # start page number for keyword in config.KEYWORDS.split(","): source_keyword_var.set(keyword) - utils.logger.info( - f"[BilibiliCrawler.search] Current search keyword: {keyword}") - page = 1 - while (page - start_page + 1) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: - if page < start_page: - utils.logger.info( - f"[BilibiliCrawler.search] Skip page: {page}") + utils.logger.info(f"[BilibiliCrawler.search] Current search keyword: {keyword}") + # 每个关键词最多返回 1000 条数据 + if not config.ALL_DAY: + page = 1 + while (page - start_page + 1) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: + if page < start_page: + utils.logger.info(f"[BilibiliCrawler.search] Skip page: {page}") + page += 1 + continue + + utils.logger.info(f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, page: {page}") + video_id_list: List[str] = [] + videos_res = await self.bili_client.search_video_by_keyword( + keyword=keyword, + page=page, + page_size=bili_limit_count, + order=SearchOrderType.DEFAULT, + pubtime_begin_s=0, # 作品发布日期起始时间戳 + pubtime_end_s=0 # 作品发布日期结束日期时间戳 + ) + video_list: List[Dict] = videos_res.get("result") + + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list] + video_items = await asyncio.gather(*task_list) + for video_item in video_items: + if video_item: + video_id_list.append(video_item.get("View").get("aid")) + await bilibili_store.update_bilibili_video(video_item) + await bilibili_store.update_up_info(video_item) + await self.get_bilibili_video(video_item, semaphore) page += 1 - continue + await self.batch_get_video_comments(video_id_list) + # 按照 START_DAY 至 END_DAY 按照每一天进行筛选,这样能够突破 1000 条视频的限制,最大程度爬取该关键词下的所有视频 + else: + for day in pd.date_range(start=config.START_DAY, end=config.END_DAY, freq='D'): + # 按照每一天进行爬取的时间戳参数 + pubtime_begin_s, pubtime_end_s = await self.get_pubtime_datetime(start=day.strftime('%Y-%m-%d'), end=day.strftime('%Y-%m-%d')) + page = 1 + while (page - start_page + 1) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: + # ! Catch any error if response return nothing, go to next day + try: + # ! Don't skip any page, to make sure gather all video in one day + # if page < start_page: + # utils.logger.info(f"[BilibiliCrawler.search] Skip page: {page}") + # page += 1 + # continue - utils.logger.info(f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, page: {page}") - video_id_list: List[str] = [] - videos_res = await self.bili_client.search_video_by_keyword( - keyword=keyword, - page=page, - page_size=bili_limit_count, - order=SearchOrderType.DEFAULT, - pubtime_begin_s=0, # 作品发布日期起始时间戳 - pubtime_end_s=0 # 作品发布日期结束日期时间戳 - ) - video_list: List[Dict] = videos_res.get("result") + utils.logger.info(f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, date: {day.ctime()}, page: {page}") + video_id_list: List[str] = [] + videos_res = await self.bili_client.search_video_by_keyword( + keyword=keyword, + page=page, + page_size=bili_limit_count, + order=SearchOrderType.DEFAULT, + pubtime_begin_s=pubtime_begin_s, # 作品发布日期起始时间戳 + pubtime_end_s=pubtime_end_s # 作品发布日期结束日期时间戳 + ) + video_list: List[Dict] = videos_res.get("result") - semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) - task_list = [ - self.get_video_info_task(aid=video_item.get( - "aid"), bvid="", semaphore=semaphore) - for video_item in video_list - ] - video_items = await asyncio.gather(*task_list) - for video_item in video_items: - if video_item: - video_id_list.append(video_item.get("View").get("aid")) - await bilibili_store.update_bilibili_video(video_item) - await bilibili_store.update_up_info(video_item) - await self.get_bilibili_video(video_item, semaphore) - page += 1 - await self.batch_get_video_comments(video_id_list) + semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list] + video_items = await asyncio.gather(*task_list) + for video_item in video_items: + if video_item: + video_id_list.append(video_item.get("View").get("aid")) + await bilibili_store.update_bilibili_video(video_item) + await bilibili_store.update_up_info(video_item) + await self.get_bilibili_video(video_item, semaphore) + page += 1 + await self.batch_get_video_comments(video_id_list) + # go to next day + except Exception as e: + print(e) + break async def batch_get_video_comments(self, video_id_list: List[str]): """