mirror of
https://github.com/RYDE-WORK/Langchain-Chatchat.git
synced 2026-01-19 21:37:20 +08:00
* 优化configs (#1474) * remove llm_model_dict * optimize configs * fix get_model_path * 更改一些默认参数,添加千帆的默认配置 * Update server_config.py.example * fix merge conflict for #1474 (#1494) * 修复ChatGPT api_base_url错误;用户可以在model_config在线模型配置中覆盖默认的api_base_url (#1496) * 优化LLM模型列表获取、切换的逻辑: (#1497) 1、更准确的获取未运行的可用模型 2、优化WEBUI模型列表显示与切换的控制逻辑 * 更新migrate.py和init_database.py,加强知识库迁移工具: (#1498) 1. 添加--update-in-db参数,按照数据库信息,从本地文件更新向量库 2. 添加--increament参数,根据本地文件增量更新向量库 3. 添加--prune-db参数,删除本地文件后,自动清理相关的向量库 4. 添加--prune-folder参数,根据数据库信息,清理无用的本地文件 5. 取消--update-info-only参数。数据库中存储了向量库信息,该操作意义不大 6. 添加--kb-name参数,所有操作支持指定操作的知识库,不指定则为所有本地知识库 7. 添加知识库迁移的测试用例 8. 删除milvus_kb_service的save_vector_store方法 * feat: support volc fangzhou * 使火山方舟正常工作,添加错误处理和测试用例 * feat: support volc fangzhou (#1501) * feat: support volc fangzhou --------- Co-authored-by: liunux4odoo <41217877+liunux4odoo@users.noreply.github.com> Co-authored-by: liqiankun.1111 <liqiankun.1111@bytedance.com> * 第一版初步agent实现 (#1503) * 第一版初步agent实现 * 增加steaming参数 * 修改了weather.py --------- Co-authored-by: zR <zRzRzRzRzRzRzR> * 添加configs/prompt_config.py,允许用户自定义prompt模板: (#1504) 1、 默认包含2个模板,分别用于LLM对话,知识库和搜索引擎对话 2、 server/utils.py提供函数get_prompt_template,获取指定的prompt模板内容(支持热加载) 3、 api.py中chat/knowledge_base_chat/search_engine_chat接口支持prompt_name参数 * 增加其它模型的参数适配 * 增加传入矢量名称加载 * 1. 搜索引擎问答支持历史记录; 2. 修复知识库问答历史记录传参错误:用户输入被传入history,问题出在webui中重复获取历史消息,api知识库对话接口并无问题。 * langchain日志开关 * move wrap_done & get_ChatOpenAI from server.chat.utils to server.utils (#1506) * 修复faiss_pool知识库缓存key错误 (#1507) * fix ReadMe anchor link (#1500) * fix : Duplicate variable and function name (#1509) Co-authored-by: Jim <zhangpengyi@taijihuabao.com> * Update README.md * fix #1519: streamlit-chatbox旧版BUG,但新版有兼容问题,先在webui中作处理,并限定chatbox版本 (#1525) close #1519 * 【功能新增】在线 LLM 模型支持阿里云通义千问 (#1534) * feat: add qwen-api * 使Qwen API支持temperature参数;添加测试用例 * 将online-api的sdk列为可选依赖 --------- Co-authored-by: liunux4odoo <liunux@qq.com> * 处理序列化至磁盘的逻辑 * remove depends on volcengine * update kb_doc_api: use Form instead of Body when upload file * 将所有httpx请求改为使用Client,提高效率,方便以后设置代理等。 (#1554) 将所有httpx请求改为使用Client,提高效率,方便以后设置代理等。 将本项目相关服务加入无代理列表,避免fastchat的服务器请求错误。(windows下无效) * update QR code * update readme_en,readme,requirements_api,requirements,model_config.py.example:测试baichuan2-7b;更新相关文档 * 新增特性:1.支持vllm推理加速框架;2. 更新支持模型列表 * 更新文件:1. startup,model_config.py.example,serve_config.py.example,FAQ * 1. debug vllm加速框架完毕;2. 修改requirements,requirements_api对vllm的依赖;3.注释掉serve_config中baichuan-7b的device为cpu的配置 * 1. 更新congif中关于vllm后端相关说明;2. 更新requirements,requirements_api; * 增加了仅限GPT4的agent功能,陆续补充,中文版readme已写 (#1611) * Dev (#1613) * 增加了仅限GPT4的agent功能,陆续补充,中文版readme已写 * issue提到的一个bug * 温度最小改成0,但是不应该支持负数 * 修改了最小的温度 * fix: set vllm based on platform to avoid error on windows * fix: langchain warnings for import from root * 修复webui中重建知识库以及对话界面UI错误 (#1615) * 修复bug:webui点重建知识库时,如果存在不支持的文件会导致整个接口错误;migrate中没有导入CHUNK_SIZE * 修复:webui对话界面的expander一直为running状态;简化历史消息获取方法 * 根据官方文档,添加对英文版的bge embedding的指示模板 (#1585) Co-authored-by: zR <2448370773@qq.com> * Dev (#1618) * 增加了仅限GPT4的agent功能,陆续补充,中文版readme已写 * issue提到的一个bug * 温度最小改成0,但是不应该支持负数 * 修改了最小的温度 * 增加了部分Agent支持和修改了启动文件的部分bug * 修改了GPU数量配置文件 * 1 1 * 修复配置文件错误 * 更新readme,稳定测试 * 更改readme 0928 (#1619) * 增加了仅限GPT4的agent功能,陆续补充,中文版readme已写 * issue提到的一个bug * 温度最小改成0,但是不应该支持负数 * 修改了最小的温度 * 增加了部分Agent支持和修改了启动文件的部分bug * 修改了GPU数量配置文件 * 1 1 * 修复配置文件错误 * 更新readme,稳定测试 * 更新readme * fix readme * 处理序列化至磁盘的逻辑 * update version number to v0.2.5 --------- Co-authored-by: qiankunli <qiankun.li@qq.com> Co-authored-by: liqiankun.1111 <liqiankun.1111@bytedance.com> Co-authored-by: zR <2448370773@qq.com> Co-authored-by: glide-the <2533736852@qq.com> Co-authored-by: Water Zheng <1499383852@qq.com> Co-authored-by: Jim Zhang <dividi_z@163.com> Co-authored-by: Jim <zhangpengyi@taijihuabao.com> Co-authored-by: imClumsyPanda <littlepanda0716@gmail.com> Co-authored-by: Leego <leegodev@hotmail.com> Co-authored-by: hzg0601 <hzg0601@163.com> Co-authored-by: WilliamChen-luckbob <58684828+WilliamChen-luckbob@users.noreply.github.com>
408 lines
16 KiB
Python
408 lines
16 KiB
Python
import os
|
||
|
||
from transformers import AutoTokenizer
|
||
|
||
from configs import (
|
||
EMBEDDING_MODEL,
|
||
KB_ROOT_PATH,
|
||
CHUNK_SIZE,
|
||
OVERLAP_SIZE,
|
||
ZH_TITLE_ENHANCE,
|
||
logger,
|
||
log_verbose,
|
||
text_splitter_dict,
|
||
LLM_MODEL,
|
||
TEXT_SPLITTER_NAME,
|
||
)
|
||
import importlib
|
||
from text_splitter import zh_title_enhance as func_zh_title_enhance
|
||
import langchain.document_loaders
|
||
from langchain.docstore.document import Document
|
||
from langchain.text_splitter import TextSplitter
|
||
from pathlib import Path
|
||
import json
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
from server.utils import run_in_thread_pool, embedding_device, get_model_worker_config
|
||
import io
|
||
from typing import List, Union, Callable, Dict, Optional, Tuple, Generator
|
||
import chardet
|
||
|
||
|
||
def validate_kb_name(knowledge_base_id: str) -> bool:
|
||
# 检查是否包含预期外的字符或路径攻击关键字
|
||
if "../" in knowledge_base_id:
|
||
return False
|
||
return True
|
||
|
||
|
||
def get_kb_path(knowledge_base_name: str):
|
||
return os.path.join(KB_ROOT_PATH, knowledge_base_name)
|
||
|
||
|
||
def get_doc_path(knowledge_base_name: str):
|
||
return os.path.join(get_kb_path(knowledge_base_name), "content")
|
||
|
||
|
||
def get_vs_path(knowledge_base_name: str, vector_name: str):
|
||
return os.path.join(get_kb_path(knowledge_base_name), vector_name)
|
||
|
||
|
||
def get_file_path(knowledge_base_name: str, doc_name: str):
|
||
return os.path.join(get_doc_path(knowledge_base_name), doc_name)
|
||
|
||
|
||
def list_kbs_from_folder():
|
||
return [f for f in os.listdir(KB_ROOT_PATH)
|
||
if os.path.isdir(os.path.join(KB_ROOT_PATH, f))]
|
||
|
||
|
||
def list_files_from_folder(kb_name: str):
|
||
doc_path = get_doc_path(kb_name)
|
||
return [file for file in os.listdir(doc_path)
|
||
if os.path.isfile(os.path.join(doc_path, file))]
|
||
|
||
|
||
def load_embeddings(model: str = EMBEDDING_MODEL, device: str = embedding_device()):
|
||
'''
|
||
从缓存中加载embeddings,可以避免多线程时竞争加载。
|
||
'''
|
||
from server.knowledge_base.kb_cache.base import embeddings_pool
|
||
return embeddings_pool.load_embeddings(model=model, device=device)
|
||
|
||
|
||
LOADER_DICT = {"UnstructuredHTMLLoader": ['.html'],
|
||
"UnstructuredMarkdownLoader": ['.md'],
|
||
"CustomJSONLoader": [".json"],
|
||
"CSVLoader": [".csv"],
|
||
"RapidOCRPDFLoader": [".pdf"],
|
||
"RapidOCRLoader": ['.png', '.jpg', '.jpeg', '.bmp'],
|
||
"UnstructuredFileLoader": ['.eml', '.msg', '.rst',
|
||
'.rtf', '.txt', '.xml',
|
||
'.docx', '.epub', '.odt',
|
||
'.ppt', '.pptx', '.tsv'],
|
||
}
|
||
SUPPORTED_EXTS = [ext for sublist in LOADER_DICT.values() for ext in sublist]
|
||
|
||
|
||
class CustomJSONLoader(langchain.document_loaders.JSONLoader):
|
||
'''
|
||
langchain的JSONLoader需要jq,在win上使用不便,进行替代。针对langchain==0.0.286
|
||
'''
|
||
|
||
def __init__(
|
||
self,
|
||
file_path: Union[str, Path],
|
||
content_key: Optional[str] = None,
|
||
metadata_func: Optional[Callable[[Dict, Dict], Dict]] = None,
|
||
text_content: bool = True,
|
||
json_lines: bool = False,
|
||
):
|
||
"""Initialize the JSONLoader.
|
||
|
||
Args:
|
||
file_path (Union[str, Path]): The path to the JSON or JSON Lines file.
|
||
content_key (str): The key to use to extract the content from the JSON if
|
||
results to a list of objects (dict).
|
||
metadata_func (Callable[Dict, Dict]): A function that takes in the JSON
|
||
object extracted by the jq_schema and the default metadata and returns
|
||
a dict of the updated metadata.
|
||
text_content (bool): Boolean flag to indicate whether the content is in
|
||
string format, default to True.
|
||
json_lines (bool): Boolean flag to indicate whether the input is in
|
||
JSON Lines format.
|
||
"""
|
||
self.file_path = Path(file_path).resolve()
|
||
self._content_key = content_key
|
||
self._metadata_func = metadata_func
|
||
self._text_content = text_content
|
||
self._json_lines = json_lines
|
||
|
||
def _parse(self, content: str, docs: List[Document]) -> None:
|
||
"""Convert given content to documents."""
|
||
data = json.loads(content)
|
||
|
||
# Perform some validation
|
||
# This is not a perfect validation, but it should catch most cases
|
||
# and prevent the user from getting a cryptic error later on.
|
||
if self._content_key is not None:
|
||
self._validate_content_key(data)
|
||
if self._metadata_func is not None:
|
||
self._validate_metadata_func(data)
|
||
|
||
for i, sample in enumerate(data, len(docs) + 1):
|
||
text = self._get_text(sample=sample)
|
||
metadata = self._get_metadata(
|
||
sample=sample, source=str(self.file_path), seq_num=i
|
||
)
|
||
docs.append(Document(page_content=text, metadata=metadata))
|
||
|
||
|
||
langchain.document_loaders.CustomJSONLoader = CustomJSONLoader
|
||
|
||
|
||
def get_LoaderClass(file_extension):
|
||
for LoaderClass, extensions in LOADER_DICT.items():
|
||
if file_extension in extensions:
|
||
return LoaderClass
|
||
|
||
|
||
# 把一些向量化共用逻辑从KnowledgeFile抽取出来,等langchain支持内存文件的时候,可以将非磁盘文件向量化
|
||
def get_loader(loader_name: str, file_path_or_content: Union[str, bytes, io.StringIO, io.BytesIO]):
|
||
'''
|
||
根据loader_name和文件路径或内容返回文档加载器。
|
||
'''
|
||
try:
|
||
if loader_name in ["RapidOCRPDFLoader", "RapidOCRLoader"]:
|
||
document_loaders_module = importlib.import_module('document_loaders')
|
||
else:
|
||
document_loaders_module = importlib.import_module('langchain.document_loaders')
|
||
DocumentLoader = getattr(document_loaders_module, loader_name)
|
||
except Exception as e:
|
||
msg = f"为文件{file_path_or_content}查找加载器{loader_name}时出错:{e}"
|
||
logger.error(f'{e.__class__.__name__}: {msg}',
|
||
exc_info=e if log_verbose else None)
|
||
document_loaders_module = importlib.import_module('langchain.document_loaders')
|
||
DocumentLoader = getattr(document_loaders_module, "UnstructuredFileLoader")
|
||
|
||
if loader_name == "UnstructuredFileLoader":
|
||
loader = DocumentLoader(file_path_or_content, autodetect_encoding=True)
|
||
elif loader_name == "CSVLoader":
|
||
# 自动识别文件编码类型,避免langchain loader 加载文件报编码错误
|
||
with open(file_path_or_content, 'rb') as struct_file:
|
||
encode_detect = chardet.detect(struct_file.read())
|
||
if encode_detect:
|
||
loader = DocumentLoader(file_path_or_content, encoding=encode_detect["encoding"])
|
||
else:
|
||
loader = DocumentLoader(file_path_or_content, encoding="utf-8")
|
||
|
||
elif loader_name == "JSONLoader":
|
||
loader = DocumentLoader(file_path_or_content, jq_schema=".", text_content=False)
|
||
elif loader_name == "CustomJSONLoader":
|
||
loader = DocumentLoader(file_path_or_content, text_content=False)
|
||
elif loader_name == "UnstructuredMarkdownLoader":
|
||
loader = DocumentLoader(file_path_or_content, mode="elements")
|
||
elif loader_name == "UnstructuredHTMLLoader":
|
||
loader = DocumentLoader(file_path_or_content, mode="elements")
|
||
else:
|
||
loader = DocumentLoader(file_path_or_content)
|
||
return loader
|
||
|
||
|
||
def make_text_splitter(
|
||
splitter_name: str = TEXT_SPLITTER_NAME,
|
||
chunk_size: int = CHUNK_SIZE,
|
||
chunk_overlap: int = OVERLAP_SIZE,
|
||
llm_model: str = LLM_MODEL,
|
||
):
|
||
"""
|
||
根据参数获取特定的分词器
|
||
"""
|
||
splitter_name = splitter_name or "SpacyTextSplitter"
|
||
try:
|
||
if splitter_name == "MarkdownHeaderTextSplitter": # MarkdownHeaderTextSplitter特殊判定
|
||
headers_to_split_on = text_splitter_dict[splitter_name]['headers_to_split_on']
|
||
text_splitter = langchain.text_splitter.MarkdownHeaderTextSplitter(
|
||
headers_to_split_on=headers_to_split_on)
|
||
else:
|
||
|
||
try: ## 优先使用用户自定义的text_splitter
|
||
text_splitter_module = importlib.import_module('text_splitter')
|
||
TextSplitter = getattr(text_splitter_module, splitter_name)
|
||
except: ## 否则使用langchain的text_splitter
|
||
text_splitter_module = importlib.import_module('langchain.text_splitter')
|
||
TextSplitter = getattr(text_splitter_module, splitter_name)
|
||
|
||
if text_splitter_dict[splitter_name]["source"] == "tiktoken": ## 从tiktoken加载
|
||
try:
|
||
text_splitter = TextSplitter.from_tiktoken_encoder(
|
||
encoding_name=text_splitter_dict[splitter_name]["tokenizer_name_or_path"],
|
||
pipeline="zh_core_web_sm",
|
||
chunk_size=chunk_size,
|
||
chunk_overlap=chunk_overlap
|
||
)
|
||
except:
|
||
text_splitter = TextSplitter.from_tiktoken_encoder(
|
||
encoding_name=text_splitter_dict[splitter_name]["tokenizer_name_or_path"],
|
||
chunk_size=chunk_size,
|
||
chunk_overlap=chunk_overlap
|
||
)
|
||
elif text_splitter_dict[splitter_name]["source"] == "huggingface": ## 从huggingface加载
|
||
if text_splitter_dict[splitter_name]["tokenizer_name_or_path"] == "":
|
||
config = get_model_worker_config(llm_model)
|
||
text_splitter_dict[splitter_name]["tokenizer_name_or_path"] = \
|
||
config.get("model_path")
|
||
|
||
if text_splitter_dict[splitter_name]["tokenizer_name_or_path"] == "gpt2":
|
||
from transformers import GPT2TokenizerFast
|
||
from langchain.text_splitter import CharacterTextSplitter
|
||
tokenizer = GPT2TokenizerFast.from_pretrained("gpt2")
|
||
else: ## 字符长度加载
|
||
tokenizer = AutoTokenizer.from_pretrained(
|
||
text_splitter_dict[splitter_name]["tokenizer_name_or_path"],
|
||
trust_remote_code=True)
|
||
text_splitter = TextSplitter.from_huggingface_tokenizer(
|
||
tokenizer=tokenizer,
|
||
chunk_size=chunk_size,
|
||
chunk_overlap=chunk_overlap
|
||
)
|
||
else:
|
||
try:
|
||
text_splitter = TextSplitter(
|
||
pipeline="zh_core_web_sm",
|
||
chunk_size=chunk_size,
|
||
chunk_overlap=chunk_overlap
|
||
)
|
||
except:
|
||
text_splitter = TextSplitter(
|
||
chunk_size=chunk_size,
|
||
chunk_overlap=chunk_overlap
|
||
)
|
||
except Exception as e:
|
||
print(e)
|
||
text_splitter_module = importlib.import_module('langchain.text_splitter')
|
||
TextSplitter = getattr(text_splitter_module, "RecursiveCharacterTextSplitter")
|
||
text_splitter = TextSplitter(chunk_size=250, chunk_overlap=50)
|
||
return text_splitter
|
||
|
||
class KnowledgeFile:
|
||
def __init__(
|
||
self,
|
||
filename: str,
|
||
knowledge_base_name: str
|
||
):
|
||
'''
|
||
对应知识库目录中的文件,必须是磁盘上存在的才能进行向量化等操作。
|
||
'''
|
||
self.kb_name = knowledge_base_name
|
||
self.filename = filename
|
||
self.ext = os.path.splitext(filename)[-1].lower()
|
||
if self.ext not in SUPPORTED_EXTS:
|
||
raise ValueError(f"暂未支持的文件格式 {self.ext}")
|
||
self.filepath = get_file_path(knowledge_base_name, filename)
|
||
self.docs = None
|
||
self.splited_docs = None
|
||
self.document_loader_name = get_LoaderClass(self.ext)
|
||
self.text_splitter_name = TEXT_SPLITTER_NAME
|
||
|
||
def file2docs(self, refresh: bool=False):
|
||
if self.docs is None or refresh:
|
||
logger.info(f"{self.document_loader_name} used for {self.filepath}")
|
||
loader = get_loader(self.document_loader_name, self.filepath)
|
||
self.docs = loader.load()
|
||
return self.docs
|
||
|
||
def docs2texts(
|
||
self,
|
||
docs: List[Document] = None,
|
||
zh_title_enhance: bool = ZH_TITLE_ENHANCE,
|
||
refresh: bool = False,
|
||
chunk_size: int = CHUNK_SIZE,
|
||
chunk_overlap: int = OVERLAP_SIZE,
|
||
text_splitter: TextSplitter = None,
|
||
):
|
||
docs = docs or self.file2docs(refresh=refresh)
|
||
if not docs:
|
||
return []
|
||
if self.ext not in [".csv"]:
|
||
if text_splitter is None:
|
||
text_splitter = make_text_splitter(splitter_name=self.text_splitter_name, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
|
||
if self.text_splitter_name == "MarkdownHeaderTextSplitter":
|
||
docs = text_splitter.split_text(docs[0].page_content)
|
||
for doc in docs:
|
||
# 如果文档有元数据
|
||
if doc.metadata:
|
||
doc.metadata["source"] = os.path.basename(self.filepath)
|
||
else:
|
||
docs = text_splitter.split_documents(docs)
|
||
|
||
print(f"文档切分示例:{docs[0]}")
|
||
if zh_title_enhance:
|
||
docs = func_zh_title_enhance(docs)
|
||
self.splited_docs = docs
|
||
return self.splited_docs
|
||
|
||
def file2text(
|
||
self,
|
||
zh_title_enhance: bool = ZH_TITLE_ENHANCE,
|
||
refresh: bool = False,
|
||
chunk_size: int = CHUNK_SIZE,
|
||
chunk_overlap: int = OVERLAP_SIZE,
|
||
text_splitter: TextSplitter = None,
|
||
):
|
||
if self.splited_docs is None or refresh:
|
||
docs = self.file2docs()
|
||
self.splited_docs = self.docs2texts(docs=docs,
|
||
zh_title_enhance=zh_title_enhance,
|
||
refresh=refresh,
|
||
chunk_size=chunk_size,
|
||
chunk_overlap=chunk_overlap,
|
||
text_splitter=text_splitter)
|
||
return self.splited_docs
|
||
|
||
def file_exist(self):
|
||
return os.path.isfile(self.filepath)
|
||
|
||
def get_mtime(self):
|
||
return os.path.getmtime(self.filepath)
|
||
|
||
def get_size(self):
|
||
return os.path.getsize(self.filepath)
|
||
|
||
|
||
def files2docs_in_thread(
|
||
files: List[Union[KnowledgeFile, Tuple[str, str], Dict]],
|
||
chunk_size: int = CHUNK_SIZE,
|
||
chunk_overlap: int = OVERLAP_SIZE,
|
||
zh_title_enhance: bool = ZH_TITLE_ENHANCE,
|
||
pool: ThreadPoolExecutor = None,
|
||
) -> Generator:
|
||
'''
|
||
利用多线程批量将磁盘文件转化成langchain Document.
|
||
如果传入参数是Tuple,形式为(filename, kb_name)
|
||
生成器返回值为 status, (kb_name, file_name, docs | error)
|
||
'''
|
||
def file2docs(*, file: KnowledgeFile, **kwargs) -> Tuple[bool, Tuple[str, str, List[Document]]]:
|
||
try:
|
||
return True, (file.kb_name, file.filename, file.file2text(**kwargs))
|
||
except Exception as e:
|
||
msg = f"从文件 {file.kb_name}/{file.filename} 加载文档时出错:{e}"
|
||
logger.error(f'{e.__class__.__name__}: {msg}',
|
||
exc_info=e if log_verbose else None)
|
||
return False, (file.kb_name, file.filename, msg)
|
||
|
||
kwargs_list = []
|
||
for i, file in enumerate(files):
|
||
kwargs = {}
|
||
try:
|
||
if isinstance(file, tuple) and len(file) >= 2:
|
||
filename=file[0]
|
||
kb_name=file[1]
|
||
file = KnowledgeFile(filename=filename, knowledge_base_name=kb_name)
|
||
elif isinstance(file, dict):
|
||
filename = file.pop("filename")
|
||
kb_name = file.pop("kb_name")
|
||
kwargs.update(file)
|
||
file = KnowledgeFile(filename=filename, knowledge_base_name=kb_name)
|
||
kwargs["file"] = file
|
||
kwargs["chunk_size"] = chunk_size
|
||
kwargs["chunk_overlap"] = chunk_overlap
|
||
kwargs["zh_title_enhance"] = zh_title_enhance
|
||
kwargs_list.append(kwargs)
|
||
except Exception as e:
|
||
yield False, (kb_name, filename, str(e))
|
||
|
||
for result in run_in_thread_pool(func=file2docs, params=kwargs_list, pool=pool):
|
||
yield result
|
||
|
||
|
||
if __name__ == "__main__":
|
||
from pprint import pprint
|
||
|
||
kb_file = KnowledgeFile(filename="test.txt", knowledge_base_name="samples")
|
||
# kb_file.text_splitter_name = "RecursiveCharacterTextSplitter"
|
||
docs = kb_file.file2docs()
|
||
pprint(docs[-1])
|
||
|
||
docs = kb_file.file2text()
|
||
pprint(docs[-1])
|