diff --git a/README.md b/README.md index 9258ceb..d96de9b 100644 --- a/README.md +++ b/README.md @@ -24,15 +24,26 @@ 通过使用此方式,免去了复现核心加密JS代码,逆向难度大大降低 # 功能列表 -| 平台 | 关键词搜索 | 指定帖子ID爬取 | 二级评论 | 指定创作者主页 | 登录态缓存 | IP代理池 | 生成评论词云图 | -|-----|-------|---------|-----|--------|-------|-------|-------| -| 小红书 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | -| 抖音 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | -| 快手 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | -| B 站 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | -| 微博 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | -| 贴吧 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | -| 知乎 | ✅ | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ | +| 平台 | 关键词搜索 | 指定帖子ID爬取 | 二级评论 | 指定创作者主页 | 登录态缓存 | IP代理池 | 生成评论词云图 | +| ------ | ---------- | -------------- | -------- | -------------- | ---------- | -------- | -------------- | +| 小红书 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | +| 抖音 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | +| 快手 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | +| B 站 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | +| 微博 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | +| 贴吧 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | +| 知乎 | ✅ | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ | + +### MediaCrawlerPro重磅发布啦!!! +> 主打学习成熟项目的架构设计,不仅仅是爬虫,Pro中的其他代码设计思路也是值得学习,欢迎大家关注!!! + +[MediaCrawlerPro](https://github.com/MediaCrawlerPro) 版本已经重构出来了,相较于开源版本的优势: +- 多账号+IP代理支持(重点!) +- 去除Playwright依赖,使用更加简单 +- 支持linux部署(Docker docker-compose) +- 代码重构优化,更加易读易维护(解耦JS签名逻辑) +- 代码质量更高,对于构建更大型的爬虫项目更加友好 +- 完美的架构设计,更加易扩展,源码学习的价值更大 # 安装部署方法 @@ -92,18 +103,6 @@ - 支持保存到csv中(data/目录下) - 支持保存到json中(data/目录下) -### MediaCrawlerPro重磅发布啦!!! -> 主打学习成熟项目的架构设计,不仅仅是爬虫,Pro中的其他代码设计思路也是值得学习,欢迎大家关注!!! -> -> 订阅Pro源代码访问权限,可以加我微信:yzglan,备注:Pro(有一定的门槛💰) - -[MediaCrawlerPro](https://github.com/MediaCrawlerPro) 版本已经重构出来了,相较于开源版本的优势: -- 多账号+IP代理支持(重点!) -- 去除Playwright依赖,使用更加简单 -- 支持linux部署(Docker docker-compose) -- 代码重构优化,更加易读易维护(解耦JS签名逻辑) -- 代码质量更高,对于构建更大型的爬虫项目更加友好 -- 完美的架构设计,更加易扩展,源码学习的价值更大 # 其他常见问题可以查看在线文档 diff --git a/config/base_config.py b/config/base_config.py index a78ab5e..6d6d8b8 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -1,24 +1,26 @@ -# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: -# 1. 不得用于任何商业用途。 -# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 -# 3. 不得进行大规模爬取或对平台造成运营干扰。 -# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。 +# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: +# 1. 不得用于任何商业用途。 +# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 +# 3. 不得进行大规模爬取或对平台造成运营干扰。 +# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。 # 5. 不得用于任何非法或不当的用途。 -# -# 详细许可条款请参阅项目根目录下的LICENSE文件。 -# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 +# +# 详细许可条款请参阅项目根目录下的LICENSE文件。 +# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 基础配置 PLATFORM = "xhs" -KEYWORDS = "编程副业,编程兼职" # 关键词搜索配置,以英文逗号分隔 +KEYWORDS = "编程副业,编程兼职" # 关键词搜索配置,以英文逗号分隔 LOGIN_TYPE = "qrcode" # qrcode or phone or cookie COOKIES = "" # 具体值参见media_platform.xxx.field下的枚举值,暂时只支持小红书 SORT_TYPE = "popularity_descending" # 具体值参见media_platform.xxx.field下的枚举值,暂时只支持抖音 PUBLISH_TIME_TYPE = 0 -CRAWLER_TYPE = "search" # 爬取类型,search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据) +CRAWLER_TYPE = ( + "search" # 爬取类型,search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据) +) # 是否开启 IP 代理 ENABLE_IP_PROXY = False @@ -63,7 +65,6 @@ ENABLE_GET_COMMENTS = True CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES = 10 - # 是否开启爬二级评论模式, 默认不开启爬二级评论 # 老版本项目使用了 db, 则需参考 schema/tables.sql line 287 增加表字段 ENABLE_GET_SUB_COMMENTS = False @@ -85,15 +86,12 @@ XHS_SPECIFIED_NOTE_URL_LIST = [ # 指定抖音需要爬取的ID列表 DY_SPECIFIED_ID_LIST = [ "7280854932641664319", - "7202432992642387233" + "7202432992642387233", # ........................ ] # 指定快手平台需要爬取的ID列表 -KS_SPECIFIED_ID_LIST = [ - "3xf8enb8dbj6uig", - "3x6zz972bchmvqe" -] +KS_SPECIFIED_ID_LIST = ["3xf8enb8dbj6uig", "3x6zz972bchmvqe"] # 指定B站平台需要爬取的视频bvid列表 BILI_SPECIFIED_ID_LIST = [ @@ -116,9 +114,7 @@ WEIBO_CREATOR_ID_LIST = [ ] # 指定贴吧需要爬取的帖子列表 -TIEBA_SPECIFIED_ID_LIST = [ - -] +TIEBA_SPECIFIED_ID_LIST = [] # 指定贴吧名称列表,爬取该贴吧下的帖子 TIEBA_NAME_LIST = [ @@ -167,8 +163,8 @@ ENABLE_GET_WORDCLOUD = False # 自定义词语及其分组 # 添加规则:xx:yy 其中xx为自定义添加的词组,yy为将xx该词组分到的组名。 CUSTOM_WORDS = { - '零几': '年份', # 将“零几”识别为一个整体 - '高频词': '专业术语' # 示例自定义词 + "零几": "年份", # 将“零几”识别为一个整体 + "高频词": "专业术语", # 示例自定义词 } # 停用(禁用)词文件路径 diff --git a/media_platform/xhs/client.py b/media_platform/xhs/client.py index f61323e..43f47e9 100644 --- a/media_platform/xhs/client.py +++ b/media_platform/xhs/client.py @@ -1,12 +1,12 @@ -# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: -# 1. 不得用于任何商业用途。 -# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 -# 3. 不得进行大规模爬取或对平台造成运营干扰。 -# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。 +# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: +# 1. 不得用于任何商业用途。 +# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 +# 3. 不得进行大规模爬取或对平台造成运营干扰。 +# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。 # 5. 不得用于任何非法或不当的用途。 -# -# 详细许可条款请参阅项目根目录下的LICENSE文件。 -# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 +# +# 详细许可条款请参阅项目根目录下的LICENSE文件。 +# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 import asyncio @@ -31,13 +31,13 @@ from .help import get_search_id, sign class XiaoHongShuClient(AbstractApiClient): def __init__( - self, - timeout=10, - proxies=None, - *, - headers: Dict[str, str], - playwright_page: Page, - cookie_dict: Dict[str, str], + self, + timeout=10, + proxies=None, + *, + headers: Dict[str, str], + playwright_page: Page, + cookie_dict: Dict[str, str], ): self.proxies = proxies self.timeout = timeout @@ -61,20 +61,22 @@ class XiaoHongShuClient(AbstractApiClient): Returns: """ - encrypt_params = await self.playwright_page.evaluate("([url, data]) => window._webmsxyw(url,data)", [url, data]) + encrypt_params = await self.playwright_page.evaluate( + "([url, data]) => window._webmsxyw(url,data)", [url, data] + ) local_storage = await self.playwright_page.evaluate("() => window.localStorage") signs = sign( a1=self.cookie_dict.get("a1", ""), b1=local_storage.get("b1", ""), x_s=encrypt_params.get("X-s", ""), - x_t=str(encrypt_params.get("X-t", "")) + x_t=str(encrypt_params.get("X-t", "")), ) headers = { "X-S": signs["x-s"], "X-T": signs["x-t"], "x-S-Common": signs["x-s-common"], - "X-B3-Traceid": signs["x-b3-traceid"] + "X-B3-Traceid": signs["x-b3-traceid"], } self.headers.update(headers) return self.headers @@ -92,20 +94,18 @@ class XiaoHongShuClient(AbstractApiClient): """ # return response.text - return_response = kwargs.pop('return_response', False) + return_response = kwargs.pop("return_response", False) async with httpx.AsyncClient(proxies=self.proxies) as client: - response = await client.request( - method, url, timeout=self.timeout, - **kwargs - ) + response = await client.request(method, url, timeout=self.timeout, **kwargs) if response.status_code == 471 or response.status_code == 461: # someday someone maybe will bypass captcha - verify_type = response.headers['Verifytype'] - verify_uuid = response.headers['Verifyuuid'] + verify_type = response.headers["Verifytype"] + verify_uuid = response.headers["Verifyuuid"] raise Exception( - f"出现验证码,请求失败,Verifytype: {verify_type},Verifyuuid: {verify_uuid}, Response: {response}") + f"出现验证码,请求失败,Verifytype: {verify_type},Verifyuuid: {verify_uuid}, Response: {response}" + ) if return_response: return response.text @@ -129,10 +129,11 @@ class XiaoHongShuClient(AbstractApiClient): """ final_uri = uri if isinstance(params, dict): - final_uri = (f"{uri}?" - f"{urlencode(params)}") + final_uri = f"{uri}?" f"{urlencode(params)}" headers = await self._pre_headers(final_uri) - return await self.request(method="GET", url=f"{self._host}{final_uri}", headers=headers) + return await self.request( + method="GET", url=f"{self._host}{final_uri}", headers=headers + ) async def post(self, uri: str, data: dict, **kwargs) -> Dict: """ @@ -145,15 +146,22 @@ class XiaoHongShuClient(AbstractApiClient): """ headers = await self._pre_headers(uri, data) - json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) - return await self.request(method="POST", url=f"{self._host}{uri}", - data=json_str, headers=headers, **kwargs) + json_str = json.dumps(data, separators=(",", ":"), ensure_ascii=False) + return await self.request( + method="POST", + url=f"{self._host}{uri}", + data=json_str, + headers=headers, + **kwargs, + ) async def get_note_media(self, url: str) -> Union[bytes, None]: async with httpx.AsyncClient(proxies=self.proxies) as client: response = await client.request("GET", url, timeout=self.timeout) if not response.reason_phrase == "OK": - utils.logger.error(f"[XiaoHongShuClient.get_note_media] request {url} err, res:{response.text}") + utils.logger.error( + f"[XiaoHongShuClient.get_note_media] request {url} err, res:{response.text}" + ) return None else: return response.content @@ -172,7 +180,9 @@ class XiaoHongShuClient(AbstractApiClient): if note_card.get("items"): ping_flag = True except Exception as e: - utils.logger.error(f"[XiaoHongShuClient.pong] Ping xhs failed: {e}, and try to login again...") + utils.logger.error( + f"[XiaoHongShuClient.pong] Ping xhs failed: {e}, and try to login again..." + ) ping_flag = False return ping_flag @@ -190,11 +200,13 @@ class XiaoHongShuClient(AbstractApiClient): self.cookie_dict = cookie_dict async def get_note_by_keyword( - self, keyword: str, - search_id: str = get_search_id(), - page: int = 1, page_size: int = 20, - sort: SearchSortType = SearchSortType.GENERAL, - note_type: SearchNoteType = SearchNoteType.ALL + self, + keyword: str, + search_id: str = get_search_id(), + page: int = 1, + page_size: int = 20, + sort: SearchSortType = SearchSortType.GENERAL, + note_type: SearchNoteType = SearchNoteType.ALL, ) -> Dict: """ 根据关键词搜索笔记 @@ -215,11 +227,13 @@ class XiaoHongShuClient(AbstractApiClient): "page_size": page_size, "search_id": search_id, "sort": sort.value, - "note_type": note_type.value + "note_type": note_type.value, } return await self.post(uri, data) - async def get_note_by_id(self, note_id: str, xsec_source: str, xsec_token: str) -> Dict: + async def get_note_by_id( + self, note_id: str, xsec_source: str, xsec_token: str + ) -> Dict: """ 获取笔记详情API Args: @@ -238,7 +252,7 @@ class XiaoHongShuClient(AbstractApiClient): "image_formats": ["jpg", "webp", "avif"], "extra": {"need_body_topic": 1}, "xsec_source": xsec_source, - "xsec_token": xsec_token + "xsec_token": xsec_token, } uri = "/api/sns/web/v1/feed" res = await self.post(uri, data) @@ -246,7 +260,9 @@ class XiaoHongShuClient(AbstractApiClient): res_dict: Dict = res["items"][0]["note_card"] return res_dict # 爬取频繁了可能会出现有的笔记能有结果有的没有 - utils.logger.error(f"[XiaoHongShuClient.get_note_by_id] get note id:{note_id} empty and res:{res}") + utils.logger.error( + f"[XiaoHongShuClient.get_note_by_id] get note id:{note_id} empty and res:{res}" + ) return dict() async def get_note_comments(self, note_id: str, cursor: str = "") -> Dict: @@ -264,11 +280,13 @@ class XiaoHongShuClient(AbstractApiClient): "note_id": note_id, "cursor": cursor, "top_comment_id": "", - "image_formats": "jpg,webp,avif" + "image_formats": "jpg,webp,avif", } return await self.get(uri, params) - async def get_note_sub_comments(self, note_id: str, root_comment_id: str, num: int = 10, cursor: str = ""): + async def get_note_sub_comments( + self, note_id: str, root_comment_id: str, num: int = 10, cursor: str = "" + ): """ 获取指定父评论下的子评论的API Args: @@ -289,9 +307,13 @@ class XiaoHongShuClient(AbstractApiClient): } return await self.get(uri, params) - async def get_note_all_comments(self, note_id: str, crawl_interval: float = 1.0, - callback: Optional[Callable] = None, - max_count: int = 10) -> List[Dict]: + async def get_note_all_comments( + self, + note_id: str, + crawl_interval: float = 1.0, + callback: Optional[Callable] = None, + max_count: int = 10, + ) -> List[Dict]: """ 获取指定笔记下的所有一级评论,该方法会一直查找一个帖子下的所有评论信息 Args: @@ -311,21 +333,28 @@ class XiaoHongShuClient(AbstractApiClient): comments_cursor = comments_res.get("cursor", "") if "comments" not in comments_res: utils.logger.info( - f"[XiaoHongShuClient.get_note_all_comments] No 'comments' key found in response: {comments_res}") + f"[XiaoHongShuClient.get_note_all_comments] No 'comments' key found in response: {comments_res}" + ) break comments = comments_res["comments"] if len(result) + len(comments) > max_count: - comments = comments[:max_count - len(result)] + comments = comments[: max_count - len(result)] if callback: await callback(note_id, comments) await asyncio.sleep(crawl_interval) result.extend(comments) - sub_comments = await self.get_comments_all_sub_comments(comments, crawl_interval, callback) + sub_comments = await self.get_comments_all_sub_comments( + comments, crawl_interval, callback + ) result.extend(sub_comments) return result - async def get_comments_all_sub_comments(self, comments: List[Dict], crawl_interval: float = 1.0, - callback: Optional[Callable] = None) -> List[Dict]: + async def get_comments_all_sub_comments( + self, + comments: List[Dict], + crawl_interval: float = 1.0, + callback: Optional[Callable] = None, + ) -> List[Dict]: """ 获取指定一级评论下的所有二级评论, 该方法会一直查找一级评论下的所有二级评论信息 Args: @@ -334,11 +363,12 @@ class XiaoHongShuClient(AbstractApiClient): callback: 一次评论爬取结束后 Returns: - + """ if not config.ENABLE_GET_SUB_COMMENTS: utils.logger.info( - f"[XiaoHongShuCrawler.get_comments_all_sub_comments] Crawling sub_comment mode is not enabled") + f"[XiaoHongShuCrawler.get_comments_all_sub_comments] Crawling sub_comment mode is not enabled" + ) return [] result = [] @@ -356,12 +386,15 @@ class XiaoHongShuClient(AbstractApiClient): sub_comment_cursor = comment.get("sub_comment_cursor") while sub_comment_has_more: - comments_res = await self.get_note_sub_comments(note_id, root_comment_id, 10, sub_comment_cursor) + comments_res = await self.get_note_sub_comments( + note_id, root_comment_id, 10, sub_comment_cursor + ) sub_comment_has_more = comments_res.get("has_more", False) sub_comment_cursor = comments_res.get("cursor", "") if "comments" not in comments_res: utils.logger.info( - f"[XiaoHongShuClient.get_comments_all_sub_comments] No 'comments' key found in response: {comments_res}") + f"[XiaoHongShuClient.get_comments_all_sub_comments] No 'comments' key found in response: {comments_res}" + ) break comments = comments_res["comments"] if callback: @@ -377,21 +410,23 @@ class XiaoHongShuClient(AbstractApiClient): eg: https://www.xiaohongshu.com/user/profile/59d8cb33de5fb4696bf17217 """ uri = f"/user/profile/{user_id}" - html_content = await self.request("GET", self._domain + uri, return_response=True, headers=self.headers) - match = re.search(r'", html)[ 0 @@ -518,32 +578,4 @@ class XiaoHongShuClient(AbstractApiClient): try: return get_note_dict(html) except: - href = re.findall(r'href="(.*?)"', html)[0] - href = unescape(href) - - utils.logger.info( - f"[XiaoHongShuClient.get_note_by_id_from_html] 出现验证码: {href}, 请手动验证" - ) - await self.playwright_page.goto(href) - # 等待用户完成操作页面重定向 - if await self.check_redirect(): - utils.logger.info( - f"[XiaoHongShuClient.get_note_by_id_from_html] 用户完成验证, 重定向到笔记详情页" - ) - - html = await self.playwright_page.content() - return get_note_dict(html) - else: - raise DataFetchError(html) - - @retry( - stop=stop_after_attempt(100), - wait=wait_fixed(5), - retry=retry_if_result(lambda value: value is False), - ) - async def check_redirect(self): - url = self.playwright_page.url - if url.startswith("https://www.xiaohongshu.com/explore"): - return True - return False - + return None diff --git a/media_platform/xhs/core.py b/media_platform/xhs/core.py index 6e1da30..8a69086 100644 --- a/media_platform/xhs/core.py +++ b/media_platform/xhs/core.py @@ -1,12 +1,12 @@ -# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: -# 1. 不得用于任何商业用途。 -# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 -# 3. 不得进行大规模爬取或对平台造成运营干扰。 -# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。 +# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: +# 1. 不得用于任何商业用途。 +# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 +# 3. 不得进行大规模爬取或对平台造成运营干扰。 +# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。 # 5. 不得用于任何非法或不当的用途。 -# -# 详细许可条款请参阅项目根目录下的LICENSE文件。 -# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 +# +# 详细许可条款请参阅项目根目录下的LICENSE文件。 +# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 import asyncio @@ -15,8 +15,7 @@ import random from asyncio import Task from typing import Dict, List, Optional, Tuple -from playwright.async_api import (BrowserContext, BrowserType, Page, - async_playwright) +from playwright.async_api import BrowserContext, BrowserType, Page, async_playwright from tenacity import RetryError import config @@ -48,28 +47,33 @@ class XiaoHongShuCrawler(AbstractCrawler): async def start(self) -> None: playwright_proxy_format, httpx_proxy_format = None, None if config.ENABLE_IP_PROXY: - ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True) + ip_proxy_pool = await create_ip_pool( + config.IP_PROXY_POOL_COUNT, enable_validate_ip=True + ) ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() - playwright_proxy_format, httpx_proxy_format = self.format_proxy_info(ip_proxy_info) + playwright_proxy_format, httpx_proxy_format = self.format_proxy_info( + ip_proxy_info + ) async with async_playwright() as playwright: # Launch a browser context. chromium = playwright.chromium self.browser_context = await self.launch_browser( - chromium, - None, - self.user_agent, - headless=config.HEADLESS + chromium, None, self.user_agent, headless=config.HEADLESS ) # stealth.min.js is a js script to prevent the website from detecting the crawler. await self.browser_context.add_init_script(path="libs/stealth.min.js") # add a cookie attribute webId to avoid the appearance of a sliding captcha on the webpage - await self.browser_context.add_cookies([{ - 'name': "webId", - 'value': "xxx123", # any value - 'domain': ".xiaohongshu.com", - 'path': "/" - }]) + await self.browser_context.add_cookies( + [ + { + "name": "webId", + "value": "xxx123", # any value + "domain": ".xiaohongshu.com", + "path": "/", + } + ] + ) self.context_page = await self.browser_context.new_page() await self.context_page.goto(self.index_url) @@ -81,10 +85,12 @@ class XiaoHongShuCrawler(AbstractCrawler): login_phone="", # input your phone number browser_context=self.browser_context, context_page=self.context_page, - cookie_str=config.COOKIES + cookie_str=config.COOKIES, ) await login_obj.begin() - await self.xhs_client.update_cookies(browser_context=self.browser_context) + await self.xhs_client.update_cookies( + browser_context=self.browser_context + ) crawler_type_var.set(config.CRAWLER_TYPE) if config.CRAWLER_TYPE == "search": @@ -103,33 +109,47 @@ class XiaoHongShuCrawler(AbstractCrawler): async def search(self) -> None: """Search for notes and retrieve their comment information.""" - utils.logger.info("[XiaoHongShuCrawler.search] Begin search xiaohongshu keywords") + utils.logger.info( + "[XiaoHongShuCrawler.search] Begin search xiaohongshu keywords" + ) xhs_limit_count = 20 # xhs limit page fixed value if config.CRAWLER_MAX_NOTES_COUNT < xhs_limit_count: config.CRAWLER_MAX_NOTES_COUNT = xhs_limit_count start_page = config.START_PAGE for keyword in config.KEYWORDS.split(","): source_keyword_var.set(keyword) - utils.logger.info(f"[XiaoHongShuCrawler.search] Current search keyword: {keyword}") + utils.logger.info( + f"[XiaoHongShuCrawler.search] Current search keyword: {keyword}" + ) page = 1 search_id = get_search_id() - while (page - start_page + 1) * xhs_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: + while ( + page - start_page + 1 + ) * xhs_limit_count <= config.CRAWLER_MAX_NOTES_COUNT: if page < start_page: utils.logger.info(f"[XiaoHongShuCrawler.search] Skip page {page}") page += 1 continue try: - utils.logger.info(f"[XiaoHongShuCrawler.search] search xhs keyword: {keyword}, page: {page}") + utils.logger.info( + f"[XiaoHongShuCrawler.search] search xhs keyword: {keyword}, page: {page}" + ) note_id_list: List[str] = [] notes_res = await self.xhs_client.get_note_by_keyword( keyword=keyword, search_id=search_id, page=page, - sort=SearchSortType(config.SORT_TYPE) if config.SORT_TYPE != '' else SearchSortType.GENERAL, + sort=( + SearchSortType(config.SORT_TYPE) + if config.SORT_TYPE != "" + else SearchSortType.GENERAL + ), ) - utils.logger.info(f"[XiaoHongShuCrawler.search] Search notes res:{notes_res}") - if not notes_res or not notes_res.get('has_more', False): + utils.logger.info( + f"[XiaoHongShuCrawler.search] Search notes res:{notes_res}" + ) + if not notes_res or not notes_res.get("has_more", False): utils.logger.info("No more content!") break semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) @@ -138,10 +158,10 @@ class XiaoHongShuCrawler(AbstractCrawler): note_id=post_item.get("id"), xsec_source=post_item.get("xsec_source"), xsec_token=post_item.get("xsec_token"), - semaphore=semaphore + semaphore=semaphore, ) for post_item in notes_res.get("items", {}) - if post_item.get('model_type') not in ('rec_query', 'hot_query') + if post_item.get("model_type") not in ("rec_query", "hot_query") ] note_details = await asyncio.gather(*task_list) for note_detail in note_details: @@ -150,18 +170,26 @@ class XiaoHongShuCrawler(AbstractCrawler): await self.get_notice_media(note_detail) note_id_list.append(note_detail.get("note_id")) page += 1 - utils.logger.info(f"[XiaoHongShuCrawler.search] Note details: {note_details}") + utils.logger.info( + f"[XiaoHongShuCrawler.search] Note details: {note_details}" + ) await self.batch_get_note_comments(note_id_list) except DataFetchError: - utils.logger.error("[XiaoHongShuCrawler.search] Get note detail error") + utils.logger.error( + "[XiaoHongShuCrawler.search] Get note detail error" + ) break async def get_creators_and_notes(self) -> None: """Get creator's notes and retrieve their comment information.""" - utils.logger.info("[XiaoHongShuCrawler.get_creators_and_notes] Begin get xiaohongshu creators") + utils.logger.info( + "[XiaoHongShuCrawler.get_creators_and_notes] Begin get xiaohongshu creators" + ) for user_id in config.XHS_CREATOR_ID_LIST: # get creator detail info from web html content - createor_info: Dict = await self.xhs_client.get_creator_info(user_id=user_id) + createor_info: Dict = await self.xhs_client.get_creator_info( + user_id=user_id + ) if createor_info: await xhs_store.save_creator(user_id, creator=createor_info) @@ -169,7 +197,7 @@ class XiaoHongShuCrawler(AbstractCrawler): all_notes_list = await self.xhs_client.get_all_notes_by_creator( user_id=user_id, crawl_interval=random.random(), - callback=self.fetch_creator_notes_detail + callback=self.fetch_creator_notes_detail, ) note_ids = [note_item.get("note_id") for note_item in all_notes_list] @@ -185,7 +213,7 @@ class XiaoHongShuCrawler(AbstractCrawler): note_id=post_item.get("note_id"), xsec_source=post_item.get("xsec_source"), xsec_token=post_item.get("xsec_token"), - semaphore=semaphore + semaphore=semaphore, ) for post_item in note_list ] @@ -205,12 +233,14 @@ class XiaoHongShuCrawler(AbstractCrawler): get_note_detail_task_list = [] for full_note_url in config.XHS_SPECIFIED_NOTE_URL_LIST: note_url_info: NoteUrlInfo = parse_note_info_from_note_url(full_note_url) - utils.logger.info(f"[XiaoHongShuCrawler.get_specified_notes] Parse note url info: {note_url_info}") + utils.logger.info( + f"[XiaoHongShuCrawler.get_specified_notes] Parse note url info: {note_url_info}" + ) crawler_task = self.get_note_detail_async_task( note_id=note_url_info.note_id, xsec_source=note_url_info.xsec_source, xsec_token=note_url_info.xsec_token, - semaphore=asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) + semaphore=asyncio.Semaphore(config.MAX_CONCURRENCY_NUM), ) get_note_detail_task_list.append(crawler_task) @@ -222,56 +252,103 @@ class XiaoHongShuCrawler(AbstractCrawler): await xhs_store.update_xhs_note(note_detail) await self.batch_get_note_comments(need_get_comment_note_ids) + async def get_note_detail_async_task( + self, + note_id: str, + xsec_source: str, + xsec_token: str, + semaphore: asyncio.Semaphore, + ) -> Optional[Dict]: + """Get note detail - async def get_note_detail_async_task(self, note_id: str, xsec_source: str, xsec_token: str, semaphore: asyncio.Semaphore) -> \ - Optional[Dict]: - """Get note detail""" + Args: + note_id: + xsec_source: + xsec_token: + semaphore: + + Returns: + Dict: note detail + """ + note_detail_from_html, note_detail_from_api = None, None async with semaphore: try: - note_detail: Dict = await self.xhs_client.get_note_by_id_from_html(note_id, xsec_source, xsec_token) - # note_detail: Dict = await self.xhs_client.get_note_by_id(note_id, xsec_source, xsec_token) - if not note_detail: + # 尝试直接获取网页版笔记详情,不携带cookie + note_detail_from_html: Dict = ( + await self.xhs_client.get_note_by_id_from_html( + note_id, xsec_source, xsec_token, enable_cookie=False + ) + ) + if not note_detail_from_html: + # 如果网页版笔记详情获取失败,则尝试使用cookie获取 + note_detail_from_html = ( + await self.xhs_client.get_note_by_id_from_html( + note_id, xsec_source, xsec_token, enable_cookie=True + ) + ) utils.logger.error( - f"[XiaoHongShuCrawler.get_note_detail_async_task] Get note detail error, note_id: {note_id}") + f"[XiaoHongShuCrawler.get_note_detail_async_task] Get note detail error, note_id: {note_id}" + ) return None - note_detail.update({"xsec_token": xsec_token, "xsec_source": xsec_source}) - return note_detail + if not note_detail_from_html: + # 如果网页版笔记详情获取失败,则尝试API获取 + note_detail_from_api: Dict = await self.xhs_client.get_note_by_id( + note_id, xsec_source, xsec_token + ) + note_detail = note_detail_from_html or note_detail_from_api + if note_detail: + note_detail.update( + {"xsec_token": xsec_token, "xsec_source": xsec_source} + ) + return note_detail except DataFetchError as ex: - utils.logger.error(f"[XiaoHongShuCrawler.get_note_detail_async_task] Get note detail error: {ex}") + utils.logger.error( + f"[XiaoHongShuCrawler.get_note_detail_async_task] Get note detail error: {ex}" + ) return None except KeyError as ex: utils.logger.error( - f"[XiaoHongShuCrawler.get_note_detail_async_task] have not fund note detail note_id:{note_id}, err: {ex}") + f"[XiaoHongShuCrawler.get_note_detail_async_task] have not fund note detail note_id:{note_id}, err: {ex}" + ) return None async def batch_get_note_comments(self, note_list: List[str]): """Batch get note comments""" if not config.ENABLE_GET_COMMENTS: - utils.logger.info(f"[XiaoHongShuCrawler.batch_get_note_comments] Crawling comment mode is not enabled") + utils.logger.info( + f"[XiaoHongShuCrawler.batch_get_note_comments] Crawling comment mode is not enabled" + ) return utils.logger.info( - f"[XiaoHongShuCrawler.batch_get_note_comments] Begin batch get note comments, note list: {note_list}") + f"[XiaoHongShuCrawler.batch_get_note_comments] Begin batch get note comments, note list: {note_list}" + ) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) task_list: List[Task] = [] for note_id in note_list: - task = asyncio.create_task(self.get_comments(note_id, semaphore), name=note_id) + task = asyncio.create_task( + self.get_comments(note_id, semaphore), name=note_id + ) task_list.append(task) await asyncio.gather(*task_list) async def get_comments(self, note_id: str, semaphore: asyncio.Semaphore): """Get note comments with keyword filtering and quantity limitation""" async with semaphore: - utils.logger.info(f"[XiaoHongShuCrawler.get_comments] Begin get note id comments {note_id}") + utils.logger.info( + f"[XiaoHongShuCrawler.get_comments] Begin get note id comments {note_id}" + ) await self.xhs_client.get_note_all_comments( note_id=note_id, crawl_interval=random.random(), callback=xhs_store.batch_update_xhs_note_comments, - max_count=CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES + max_count=CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES, ) @staticmethod - def format_proxy_info(ip_proxy_info: IpInfoModel) -> Tuple[Optional[Dict], Optional[Dict]]: + def format_proxy_info( + ip_proxy_info: IpInfoModel, + ) -> Tuple[Optional[Dict], Optional[Dict]]: """format proxy info for playwright and httpx""" playwright_proxy = { "server": f"{ip_proxy_info.protocol}{ip_proxy_info.ip}:{ip_proxy_info.port}", @@ -285,8 +362,12 @@ class XiaoHongShuCrawler(AbstractCrawler): async def create_xhs_client(self, httpx_proxy: Optional[str]) -> XiaoHongShuClient: """Create xhs client""" - utils.logger.info("[XiaoHongShuCrawler.create_xhs_client] Begin create xiaohongshu API client ...") - cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies()) + utils.logger.info( + "[XiaoHongShuCrawler.create_xhs_client] Begin create xiaohongshu API client ..." + ) + cookie_str, cookie_dict = utils.convert_cookies( + await self.browser_context.cookies() + ) xhs_client_obj = XiaoHongShuClient( proxies=httpx_proxy, headers={ @@ -294,7 +375,7 @@ class XiaoHongShuCrawler(AbstractCrawler): "Cookie": cookie_str, "Origin": "https://www.xiaohongshu.com", "Referer": "https://www.xiaohongshu.com", - "Content-Type": "application/json;charset=UTF-8" + "Content-Type": "application/json;charset=UTF-8", }, playwright_page=self.context_page, cookie_dict=cookie_dict, @@ -302,33 +383,35 @@ class XiaoHongShuCrawler(AbstractCrawler): return xhs_client_obj async def launch_browser( - self, - chromium: BrowserType, - playwright_proxy: Optional[Dict], - user_agent: Optional[str], - headless: bool = True + self, + chromium: BrowserType, + playwright_proxy: Optional[Dict], + user_agent: Optional[str], + headless: bool = True, ) -> BrowserContext: """Launch browser and create browser context""" - utils.logger.info("[XiaoHongShuCrawler.launch_browser] Begin create browser context ...") + utils.logger.info( + "[XiaoHongShuCrawler.launch_browser] Begin create browser context ..." + ) if config.SAVE_LOGIN_STATE: # feat issue #14 # we will save login state to avoid login every time - user_data_dir = os.path.join(os.getcwd(), "browser_data", - config.USER_DATA_DIR % config.PLATFORM) # type: ignore + user_data_dir = os.path.join( + os.getcwd(), "browser_data", config.USER_DATA_DIR % config.PLATFORM + ) # type: ignore browser_context = await chromium.launch_persistent_context( user_data_dir=user_data_dir, accept_downloads=True, headless=headless, proxy=playwright_proxy, # type: ignore viewport={"width": 1920, "height": 1080}, - user_agent=user_agent + user_agent=user_agent, ) return browser_context else: browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore browser_context = await browser.new_context( - viewport={"width": 1920, "height": 1080}, - user_agent=user_agent + viewport={"width": 1920, "height": 1080}, user_agent=user_agent ) return browser_context @@ -339,7 +422,9 @@ class XiaoHongShuCrawler(AbstractCrawler): async def get_notice_media(self, note_detail: Dict): if not config.ENABLE_GET_IMAGES: - utils.logger.info(f"[XiaoHongShuCrawler.get_notice_media] Crawling image mode is not enabled") + utils.logger.info( + f"[XiaoHongShuCrawler.get_notice_media] Crawling image mode is not enabled" + ) return await self.get_note_images(note_detail) await self.get_notice_video(note_detail) @@ -356,8 +441,8 @@ class XiaoHongShuCrawler(AbstractCrawler): image_list: List[Dict] = note_item.get("image_list", []) for img in image_list: - if img.get('url_default') != '': - img.update({'url': img.get('url_default')}) + if img.get("url_default") != "": + img.update({"url": img.get("url_default")}) if not image_list: return