Update core.py,以实现按照 START_DAY 至 END_DAY ,每一天进行筛选,这样能够突破 1000 条视频的限制,最大程度爬取该关键词下的所有视频

添加了`get_pubtime_datetime`函数用以获取`pubtime_begin_s`和`pubtime_end_s`参数,并为`search`函数添加了`ALL_DAY`选项,若`ALL_DAY`未开启,则保留原先的搜索策略,但每个关键词最多返回 1000 条数据,若`ALL_DAY`已开启,则使用新策略,按照 START_DAY 至 END_DAY 按照每一天进行筛选,这样能够突破 1000 条视频的限制,最大程度爬取该关键词下的所有视频,新添加的`get_pubtime_datetime`函数仅在`search`中使用,需要用户按安装`datetime`和`pandas`模块。已测试完毕
This commit is contained in:
翟持江 2025-01-15 18:18:36 +08:00 committed by GitHub
parent bf87821d4b
commit f2b41b573b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -19,9 +19,10 @@ import os
import random
from asyncio import Task
from typing import Dict, List, Optional, Tuple, Union
from datetime import datetime, timedelta
import pandas as pd
from playwright.async_api import (BrowserContext, BrowserType, Page,
async_playwright)
from playwright.async_api import (BrowserContext, BrowserType, Page, async_playwright)
import config
from base.base_crawler import AbstractCrawler
@ -95,56 +96,122 @@ class BilibiliCrawler(AbstractCrawler):
utils.logger.info(
"[BilibiliCrawler.start] Bilibili Crawler finished ...")
async def get_pubtime_datetime(self, start: str = config.START_DAY, end: str = config.END_DAY) -> tuple[str, str]:
"""
获取 bilibili 作品发布日期起始时间戳 pubtime_begin_s 与发布日期结束时间戳 pubtime_end_s
---
:param start: 发布日期起始时间YYYY-MM-DD
:param end: 发布日期结束时间YYYY-MM-DD
Note
---
- 搜索的时间范围为 start end包含 start end
- 若要搜索同一天的内容为了包含 start 当天的搜索内容 pubtime_end_s 的值应该为 pubtime_begin_s 的值加上一天再减去一秒 start 当天的最后一秒
- 如仅搜索 2024-01-05 的内容pubtime_begin_s = 1704384000pubtime_end_s = 1704470399
转换为可读的 datetime 对象pubtime_begin_s = datetime.datetime(2024, 1, 5, 0, 0)pubtime_end_s = datetime.datetime(2024, 1, 5, 23, 59, 59)
- 若要搜索 start end 的内容为了包含 end 当天的搜索内容 pubtime_end_s 的值应该为 pubtime_end_s 的值加上一天再减去一秒 end 当天的最后一秒
- 如搜索 2024-01-05 - 2024-01-06 的内容pubtime_begin_s = 1704384000pubtime_end_s = 1704556799
转换为可读的 datetime 对象pubtime_begin_s = datetime.datetime(2024, 1, 5, 0, 0)pubtime_end_s = datetime.datetime(2024, 1, 6, 23, 59, 59)
"""
# 转换 start 与 end 为 datetime 对象
start_day: datetime = datetime.strptime(start, '%Y-%m-%d')
end_day: datetime = datetime.strptime(end, '%Y-%m-%d')
if start_day > end_day:
raise ValueError('Wrong time range, please check your start and end argument, to ensure that the start cannot exceed end')
elif start_day == end_day: # 搜索同一天的内容
end_day = start_day + timedelta(days=1) - timedelta(seconds=1) # 则将 end_day 设置为 start_day + 1 day - 1 second
else: # 搜索 start 至 end
end_day = end_day + timedelta(days=1) - timedelta(seconds=1) # 则将 end_day 设置为 end_day + 1 day - 1 second
# 将其重新转换为时间戳
return str(int(start_day.timestamp())), str(int(end_day.timestamp()))
async def search(self):
"""
search bilibili video with keywords
:return:
"""
utils.logger.info(
"[BilibiliCrawler.search] Begin search bilibli keywords")
utils.logger.info("[BilibiliCrawler.search] Begin search bilibli keywords")
bili_limit_count = 20 # bilibili limit page fixed value
if config.CRAWLER_MAX_NOTES_COUNT < bili_limit_count:
config.CRAWLER_MAX_NOTES_COUNT = bili_limit_count
start_page = config.START_PAGE # start page number
for keyword in config.KEYWORDS.split(","):
source_keyword_var.set(keyword)
utils.logger.info(
f"[BilibiliCrawler.search] Current search keyword: {keyword}")
page = 1
while (page - start_page + 1) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
if page < start_page:
utils.logger.info(
f"[BilibiliCrawler.search] Skip page: {page}")
utils.logger.info(f"[BilibiliCrawler.search] Current search keyword: {keyword}")
# 每个关键词最多返回 1000 条数据
if not config.ALL_DAY:
page = 1
while (page - start_page + 1) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
if page < start_page:
utils.logger.info(f"[BilibiliCrawler.search] Skip page: {page}")
page += 1
continue
utils.logger.info(f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, page: {page}")
video_id_list: List[str] = []
videos_res = await self.bili_client.search_video_by_keyword(
keyword=keyword,
page=page,
page_size=bili_limit_count,
order=SearchOrderType.DEFAULT,
pubtime_begin_s=0, # 作品发布日期起始时间戳
pubtime_end_s=0 # 作品发布日期结束日期时间戳
)
video_list: List[Dict] = videos_res.get("result")
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list]
video_items = await asyncio.gather(*task_list)
for video_item in video_items:
if video_item:
video_id_list.append(video_item.get("View").get("aid"))
await bilibili_store.update_bilibili_video(video_item)
await bilibili_store.update_up_info(video_item)
await self.get_bilibili_video(video_item, semaphore)
page += 1
continue
await self.batch_get_video_comments(video_id_list)
# 按照 START_DAY 至 END_DAY 按照每一天进行筛选,这样能够突破 1000 条视频的限制,最大程度爬取该关键词下的所有视频
else:
for day in pd.date_range(start=config.START_DAY, end=config.END_DAY, freq='D'):
# 按照每一天进行爬取的时间戳参数
pubtime_begin_s, pubtime_end_s = await self.get_pubtime_datetime(start=day.strftime('%Y-%m-%d'), end=day.strftime('%Y-%m-%d'))
page = 1
while (page - start_page + 1) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
# ! Catch any error if response return nothing, go to next day
try:
# ! Don't skip any page, to make sure gather all video in one day
# if page < start_page:
# utils.logger.info(f"[BilibiliCrawler.search] Skip page: {page}")
# page += 1
# continue
utils.logger.info(f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, page: {page}")
video_id_list: List[str] = []
videos_res = await self.bili_client.search_video_by_keyword(
keyword=keyword,
page=page,
page_size=bili_limit_count,
order=SearchOrderType.DEFAULT,
pubtime_begin_s=0, # 作品发布日期起始时间戳
pubtime_end_s=0 # 作品发布日期结束日期时间戳
)
video_list: List[Dict] = videos_res.get("result")
utils.logger.info(f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, date: {day.ctime()}, page: {page}")
video_id_list: List[str] = []
videos_res = await self.bili_client.search_video_by_keyword(
keyword=keyword,
page=page,
page_size=bili_limit_count,
order=SearchOrderType.DEFAULT,
pubtime_begin_s=pubtime_begin_s, # 作品发布日期起始时间戳
pubtime_end_s=pubtime_end_s # 作品发布日期结束日期时间戳
)
video_list: List[Dict] = videos_res.get("result")
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [
self.get_video_info_task(aid=video_item.get(
"aid"), bvid="", semaphore=semaphore)
for video_item in video_list
]
video_items = await asyncio.gather(*task_list)
for video_item in video_items:
if video_item:
video_id_list.append(video_item.get("View").get("aid"))
await bilibili_store.update_bilibili_video(video_item)
await bilibili_store.update_up_info(video_item)
await self.get_bilibili_video(video_item, semaphore)
page += 1
await self.batch_get_video_comments(video_id_list)
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list]
video_items = await asyncio.gather(*task_list)
for video_item in video_items:
if video_item:
video_id_list.append(video_item.get("View").get("aid"))
await bilibili_store.update_bilibili_video(video_item)
await bilibili_store.update_up_info(video_item)
await self.get_bilibili_video(video_item, semaphore)
page += 1
await self.batch_get_video_comments(video_id_list)
# go to next day
except Exception as e:
print(e)
break
async def batch_get_video_comments(self, video_id_list: List[str]):
"""