Merge remote-tracking branch 'kvai/main' into server-prefix-cache

This commit is contained in:
ceerrep 2025-02-18 17:51:22 +08:00
commit db61375f1a
8 changed files with 403 additions and 3 deletions

5
.gitignore vendored
View File

@ -23,3 +23,8 @@ tmp1.txt
test_65_300_1536.txt
test.txt
book
ktransformers/tests/mmlu_result_silicon.json
ktransformers/tests/chat_txt.txt
mmlu_result_q4km.json
mmlu_result_q4km.log
ktransformers/tests/mmlu_result_silicon.log

View File

@ -18,4 +18,8 @@ dev_install:
echo "Installing ktransformers"
KTRANSFORMERS_FORCE_BUILD=TRUE pip install -e . -v --no-build-isolation
echo "Installation completed successfully"
echo "Installation completed successfully"
install_numa:
USE_NUMA=1 make dev_install
install_no_numa:
env -u USE_NUMA make dev_install

View File

@ -21,6 +21,7 @@ KTransformers 是一个以 Python 为中心的灵活框架,其核心是可扩
<h2 id="Updates">🔥 更新</h2>
* **2025 年 2 月 15 日**KTransformers V0.2.1: 长上下文(从4K到8K24GB VRAM) & 稍快的速度(+15%)(最快 16 Tokens/s),文档请参见 [这里](./doc/en/DeepseekR1_V3_tutorial.md) 和 [在线指南](https://kvcache-ai.github.io/ktransformers/) 。
* **2025 年 2 月 10 日**:支持 Deepseek-R1 和 V3 在单个24GB VRAM/多 GPU 和 382G DRAM 上运行,速度提升高达 3~28 倍。详细教程请参见 [这里](./doc/en/DeepseekR1_V3_tutorial.md)。
* **2024 年 8 月 28 日**:支持 InternLM2.5-7B-Chat-1M 模型下的 1M 上下文,使用 24GB 的 VRAM 和 150GB 的 DRAM。详细教程请参见 [这里](./doc/en/long_context_tutorial.md)。
* **2024 年 8 月 28 日**:将 DeepseekV2 所需的 VRAM 从 21G 降低到 11G。

Binary file not shown.

Before

Width:  |  Height:  |  Size: 829 KiB

After

Width:  |  Height:  |  Size: 258 KiB

View File

@ -171,7 +171,7 @@ Attention! If you are testing R1 and it may skip thinking. So you can add arg: `
#### Dual socket version (64 cores)
Make suer before you install (use install.sh or `make dev_install`), setting the env var `USE_NUMA=1` by `export USE_NUMA=1` (if already installed, reinstall it with this env var set). You may check the doc [here](./install.md) for install details. <br>
Make sure before you install (use install.sh or `make dev_install`), setting the env var `USE_NUMA=1` by `export USE_NUMA=1` (if already installed, reinstall it with this env var set). You may check the doc [here](./install.md) for install details. <br>
Test Command:
``` shell

View File

@ -68,4 +68,4 @@ Make sure you:
The detailed error:
>ImportError: /mnt/data/miniconda3/envs/xxx/bin/../lib/libstdc++.so.6: version `GLIBCXX_3.4.30' not found (required by /home/xxx/xxx/ktransformers/./cpuinfer_ext.cpython-312-x86_64-linux-gnu.so)
It may because of your conda env have no this version. Your can first exit your conda env by `conda deactivate` and use `whereis libstdc++.so.6` to find the path. And re enter your conda env and copy the .so by `cp <path of outter libstdc++> <path of your conda env libstdc++>`
Running conda install -c conda-forge libstdcxx-ng can solve the problem.

View File

@ -0,0 +1,195 @@
import argparse
import random
import time
import json
import requests
import pandas as pd
from datasets import load_dataset
import os
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'
os.environ['https_proxy'] = ''
os.environ['http_proxy'] = ''
hint = 'There is a single choice question. Answer the question by replying A, B, C, D, E, F, G, H, I, J. No other answers are accepted. Just the letter.'
class DataEvaluator:
def __init__(self):
# self.template_prompt = template_prompt
self.data = []
def load_data(self, file_path):
"""
Load data from a Parquet file into a list.
Each record in the Parquet file should represent an individual record.
"""
# 读取 Parquet 文件
# dataset = load_dataset('parquet', data_files=file_path)
ds = load_dataset("TIGER-Lab/MMLU-Pro")
df = pd.DataFrame(ds['test'])
# print(ds)
# # ds_1 = ds['train']
# ds_2 = ds['validation']
# ds_3 = ds['test']
# # 将数据集转换为 Pandas DataFrame
# df_test = pd.DataFrame(ds['test'])
# df_val = pd.DataFrame(ds['validation'])
# for _, row in df.iterrows():
# self.data.append(row.to_dict())
# df = pd.read_parquet(file_path)
for _, row in df.iterrows():
self.data.append(row.to_dict())
def get_prompt(self, record):
"""
Combine fields from a record with the template prompt to create a full prompt.
:param record: Dictionary containing fields to populate the template.
:return: A formatted prompt string.
"""
# 查看ABCD。。。的选项
options_str = "\n".join([f"{chr(65+i)}. {opt}" for i, opt in enumerate(record['options'])])
prompt = hint + "\nQuestion: " + record['question'] + "\n" + options_str + "\nAnswer: '"
return prompt
def post_processing(self, text):
"""
Perform post-processing on the prediction string.
:param text: The raw prediction string.
:return: Processed prediction string.
"""
text = text.lstrip('\n').split('\n')[0]
return text[:1]
def score(self, pred, answers):
"""
Calculate scores between the prediction and the answer.
Uses ROUGE scores as the evaluation metric.
:param pred: The predicted string.
:param answer: The reference answer string.
:return: A dictionary containing ROUGE scores.
"""
for answer in answers:
if pred == answer:
return 1
return 0
# Function to generate text using API
def generate_text(api_url, question, model_name, stream=False):
headers = {
'accept': 'application/json',
'Content-Type': 'application/json',
# 添加 API Key
'Authorization' : 'Bearer '
}
data = {
"messages": [{"content": question, "role": "user"}],
"model": model_name,
"stream": stream,
# "temperature": 0.0
}
print("POST data:", data)
response = requests.post(api_url, headers=headers, json=data)
if response.status_code == 200:
result = response.json()
return result.get('choices', [{}])[0].get('message', {}).get('content', '').strip()
else:
print(f"API Request failed with status code {response.status_code}")
return None
# Main function to handle multiple evaluations
def main(concurrent_requests, data_evaluator: DataEvaluator, result_file, log_file, api_url, model_name):
start_total_time = time.time()
total_score = 0
results = []
# 设置随机数种子
random.seed(42)
random.shuffle(data_evaluator.data)
for i in range(min(concurrent_requests, len(data_evaluator.data))):
# Randomly select a data item from data for each request
data_item = data_evaluator.data[i]
question = data_evaluator.get_prompt(data_item)
# print(question)
# Start the timer for this evaluation
start_time = time.time()
try:
# Generate prediction using the API
prediction = generate_text(api_url, question, model_name)
if prediction is None:
raise Exception(f"Failed to get prediction for {question}")
answer = data_item['answer']
# Compute score
score = data_evaluator.score(data_evaluator.post_processing(prediction), answer)
# Calculate the time taken
elapsed_time = time.time() - start_time
# Collect the result data
result_data = {
"question_id": data_item['question_id'],
"answer": answer,
"prediction": data_evaluator.post_processing(prediction),
"score": score,
"time": elapsed_time
}
# Write results to result.json with each field on a new line
with open(result_file, 'a', encoding='utf-8') as f:
json.dump(result_data, f, ensure_ascii=False, indent=4)
f.write("\n") # Ensure each JSON object is on a new line
results.append(result_data)
# Aggregate scores
total_score += score
except Exception as e:
print(f"Error processing request {i}: {e}")
# Calculate total time and throughput
total_time = time.time() - start_total_time
throughput = concurrent_requests / total_time
# Log the total time, throughput, and average ROUGE scores
with open(log_file, 'a', encoding='utf-8') as log_f:
log_f.write(f"Total Time: {total_time:.2f} seconds\n")
log_f.write(f"Throughput: {throughput:.2f} requests per second\n")
log_f.write(f"Average Scores: {total_score / concurrent_requests}\n")
log_f.write('-' * 40 + '\n')
print(f"Results saved to {result_file}")
print(f"Log saved to {log_file}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="API Generate Tester")
parser.add_argument("--concurrent", type=int, default=1000, help="Number of concurrent evaluations")
parser.add_argument("--file", type=str, default="TIGER-Lab/MMLU-Pro", help="Path to the mmlu.jsonl file")
parser.add_argument("--result", type=str, default="./mmlu_pro.json", help="Path to save the result JSON file")
parser.add_argument("--log", type=str, default="./mmlu_pro.log", help="Path to save the log file")
parser.add_argument("--model", type=str, default="Pro/deepseek-ai/DeepSeek-V3", help="Model name or path")
parser.add_argument("--api_url", type=str, default="http://localhost:10002/v1/chat/completions", help="API URL")
# parser.add_argument("--api_url", type=str, default="https://api.siliconflow.cn/v1/chat/completions", help="API URL")
args = parser.parse_args()
# Load the data from the provided file
# template_prompt = hint + "\nQuestion: {question}\nA. {options}\nB. {option_b}\nC. {option_c}\nD. {option_d}\nAnswer: '"
# template_prompt_pro = hint + "\nQuestion: {question}\nA. {options[0]}\nB. {options[1]}\nC. {options[2]}\nD. {options[3]}\nE. {options[4]}\nF. {options[5]}\nG. \
# {options[6]}\nH. {options[7]}\nI. {options[8]}\nJ. {options[9]}\nAnswer: '"
# Load the data from the provided file
data_evaluator = DataEvaluator()
data_evaluator.load_data(args.file)
# Run the main function with the specified number of concurrent evaluations
main(args.concurrent, data_evaluator, args.result, args.log, args.api_url, args.model)

View File

@ -0,0 +1,195 @@
import argparse
import random
import time
import json
import requests
import pandas as pd
from datasets import load_dataset
import os
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'
os.environ['https_proxy'] = ''
os.environ['http_proxy'] = ''
hint = 'There is a single choice question. Answer the question by replying A, B, C, D. No other answers are accepted. Just the letter.'
class DataEvaluator:
def __init__(self):
# self.template_prompt = template_prompt
self.data = []
def load_data(self, file_path):
"""
Load data from a Parquet file into a list.
Each record in the Parquet file should represent an individual record.
"""
# 读取 Parquet 文件
# dataset = load_dataset('parquet', data_files=file_path)
ds = load_dataset(file_path,"all")
df = pd.DataFrame(ds['test'])
# print(ds)
# # ds_1 = ds['train']
# ds_2 = ds['validation']
# ds_3 = ds['test']
# # 将数据集转换为 Pandas DataFrame
# df_test = pd.DataFrame(ds['test'])
# df_val = pd.DataFrame(ds['validation'])
# for _, row in df.iterrows():
# self.data.append(row.to_dict())
# df = pd.read_parquet(file_path)
for _, row in df.iterrows():
self.data.append(row.to_dict())
def get_prompt(self, record):
"""
Combine fields from a record with the template prompt to create a full prompt.
:param record: Dictionary containing fields to populate the template.
:return: A formatted prompt string.
"""
# 查看ABCD。。。的选项
options_str = "\n".join([f"{chr(65 + i)}. {opt}" for i, opt in enumerate(record['choices'])])
prompt = hint + "\nQuestion: " + record['question'] + "\n" + options_str + "\nAnswer: '"
return prompt
def post_processing(self, text):
"""
Perform post-processing on the prediction string.
:param text: The raw prediction string.
:return: Processed prediction string.
"""
text = text.lstrip('\n').split('\n')[0]
return text[:1]
def score(self, pred, answers):
"""
Calculate scores between the prediction and the answer.
Uses ROUGE scores as the evaluation metric.
:param pred: The predicted string.
:param answer: The reference answer string.
:return: A dictionary containing ROUGE scores.
"""
for answer in answers:
if pred == answer:
return 1
return 0
# Function to generate text using API
def generate_text(api_url, question, model_name, stream=False):
headers = {
'accept': 'application/json',
'Content-Type': 'application/json',
# 添加 API Key
'Authorization' : 'Bearer '
}
data = {
"messages": [{"content": question, "role": "user"}],
"model": model_name,
"stream": stream,
# "temperature": 0.0
}
print("POST data:", data)
response = requests.post(api_url, headers=headers, json=data)
if response.status_code == 200:
result = response.json()
return result.get('choices', [{}])[0].get('message', {}).get('content', '').strip()
else:
print(f"API Request failed with status code {response.status_code}")
return None
# Main function to handle multiple evaluations
def main(concurrent_requests, data_evaluator: DataEvaluator, result_file, log_file, api_url, model_name):
start_total_time = time.time()
total_score = 0
results = []
# 设置随机数种子
random.seed(42)
random.shuffle(data_evaluator.data)
for i in range(min(concurrent_requests, len(data_evaluator.data))):
# Randomly select a data item from data for each request
data_item = data_evaluator.data[i]
question = data_evaluator.get_prompt(data_item)
# print(question)
# Start the timer for this evaluation
start_time = time.time()
try:
# Generate prediction using the API
prediction = generate_text(api_url, question, model_name)
if prediction is None:
raise Exception(f"Failed to get prediction for {question}")
answer = chr(data_item['answer'] + 65)
# Compute score
score = data_evaluator.score(data_evaluator.post_processing(prediction), answer)
# Calculate the time taken
elapsed_time = time.time() - start_time
# Collect the result data
result_data = {
"question_id": i,
"answer": answer,
"prediction": data_evaluator.post_processing(prediction),
"score": score,
"time": elapsed_time
}
# Write results to result.json with each field on a new line
with open(result_file, 'a', encoding='utf-8') as f:
json.dump(result_data, f, ensure_ascii=False, indent=4)
f.write("\n") # Ensure each JSON object is on a new line
results.append(result_data)
# Aggregate scores
total_score += score
except Exception as e:
print(f"Error processing request {i}: {e}")
# Calculate total time and throughput
total_time = time.time() - start_total_time
throughput = concurrent_requests / total_time
# Log the total time, throughput, and average ROUGE scores
with open(log_file, 'a', encoding='utf-8') as log_f:
log_f.write(f"Total Time: {total_time:.2f} seconds\n")
log_f.write(f"Throughput: {throughput:.2f} requests per second\n")
log_f.write(f"Average Scores: {total_score / concurrent_requests}\n")
log_f.write('-' * 40 + '\n')
print(f"Results saved to {result_file}")
print(f"Log saved to {log_file}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="API Generate Tester")
parser.add_argument("--concurrent", type=int, default=1000, help="Number of concurrent evaluations")
parser.add_argument("--file", type=str, default="cais/mmlu", help="Path to the mmlu.jsonl file")
parser.add_argument("--result", type=str, default="./mmlu_result_silicon.json", help="Path to save the result JSON file")
parser.add_argument("--log", type=str, default="./mmlu_result_silicon.log", help="Path to save the log file")
parser.add_argument("--model", type=str, default="Pro/deepseek-ai/DeepSeek-V3", help="Model name or path")
parser.add_argument("--api_url", type=str, default="http://localhost:10003/v1/chat/completions", help="API URL")
# parser.add_argument("--api_url", type=str, default="https://api.siliconflow.cn/v1/chat/completions", help="API URL")
args = parser.parse_args()
# Load the data from the provided file
# template_prompt = hint + "\nQuestion: {question}\nA. {options}\nB. {option_b}\nC. {option_c}\nD. {option_d}\nAnswer: '"
# template_prompt_pro = hint + "\nQuestion: {question}\nA. {options[0]}\nB. {options[1]}\nC. {options[2]}\nD. {options[3]}\nE. {options[4]}\nF. {options[5]}\nG. \
# {options[6]}\nH. {options[7]}\nI. {options[8]}\nJ. {options[9]}\nAnswer: '"
# Load the data from the provided file
data_evaluator = DataEvaluator()
data_evaluator.load_data(args.file)
# Run the main function with the specified number of concurrent evaluations
main(args.concurrent, data_evaluator, args.result, args.log, args.api_url, args.model)