𝑻𝒆𝒏𝑪𝒍𝒂𝒘正在头脑风暴···
𝑻𝒆𝒏𝑲𝒊𝑺𝒆𝒀𝒂の𝑨𝒈𝒆𝒏𝒕助手
𝑻𝒆𝒏-𝒇𝒍𝒂𝒔𝒉

大语言模型微调实战

1. 大语言模型微调概述

1.1 什么是微调

大语言模型微调(Fine-tuning)是在预训练模型的基础上,使用特定任务的数据继续训练的过程。通过微调,可以让预训练模型更好地适应特定领域、特定语言或特定任务的特性。

微调的优势:

  • 减少大量标注数据需求
  • 加速模型收敛
  • 提高任务特定性能
  • 降低训练成本

1.2 微调策略分类

全参数微调(Full Fine-tuning)

  • 微调所有模型参数
  • 适用于有大量数据和计算资源
  • 效果最好但成本最高

部分参数微调(Partial Fine-tuning)

  • 只微调特定层的参数
  • 平衡效果和成本
  • 常用于中间层和输出层

提示工程(Prompt Engineering)

  • 不修改模型参数
  • 通过精心设计的提示词引导模型
  • 最简单但效果有限

参数高效微调(PEFT)

  • LoRA、AdaLoRA等方法
  • 只训练少量参数
  • 在有限资源下获得较好效果

1.3 微调应用场景

领域适应

  • 医疗:病历分析、医学问答
  • 法律:法律文书分析、案例检索
  • 金融:财务报告分析、风险评估

任务定制

  • 文本生成:特定风格写作、创意文案
  • 对话系统:客服对话、聊天助手
  • 信息抽取:实体识别、关系抽取

多语言支持

  • 中文优化:提升中文处理能力
  • 语言转换:跨语言任务适配
  • 方言支持:地区语言特色

2. 数据准备与预处理

2.1 数据收集策略

公开数据集

# Hugging Face数据集加载
from datasets import load_dataset

# 加载通用数据集
dataset = load_dataset("glue", "mrpc")
print(dataset)

# 加载中文数据集
chinese_dataset = load_dataset("clue", "tnews")
print(chinese_dataset)

# 加载自定义数据集
custom_dataset = load_dataset("json", data_files="my_data.json")

领域特定数据

# 医疗数据加载
medical_data = {
"train": [
{"instruction": "分析以下病历", "input": "患者主诉...", "output": "诊断结果..."},
{"instruction": "解释医学术语", "input": "心肌梗死", "output": "心脏血管阻塞导致的心肌缺血坏死"}
]
}

# 法律数据加载
legal_data = {
"train": [
{"instruction": "分析合同条款", "input": "合同条款内容...", "output": "风险分析..."},
{"instruction": "生成法律意见书", "input": "案件事实...", "output": "法律建议..."}
]
}

# 转换为Hugging Face格式
from datasets import Dataset
train_dataset = Dataset.from_dict(medical_data)

数据格式标准化

class DataFormatter:
def __init__(self, format_type="instruction"):
self.format_type = format_type

def format_conversation(self, data):
"""格式化对话数据"""
if self.format_type == "instruction":
return {
"prompt": data["instruction"] + "\n" + data["input"],
"completion": data["output"]
}
elif self.format_type == "chat":
return {
"messages": [
{"role": "user", "content": data["instruction"]},
{"role": "assistant", "content": data["output"]}
]
}
elif self.format_type == "completion":
return {
"text": data["instruction"] + "\n" + data["input"] + "\n" + data["output"]
}

def batch_format(self, dataset):
"""批量格式化数据"""
formatted_data = []
for item in dataset:
formatted_item = self.format_conversation(item)
formatted_data.append(formatted_item)
return formatted_data

2.2 数据质量保证

数据清洗

import re
import pandas as pd
from text_preprocessing import TextCleaner

class DataCleaner:
def __init__(self):
self.cleaner = TextCleaner()

def clean_text(self, text):
"""文本清洗"""
# 去除HTML标签
text = re.sub(r'<[^>]+>', '', text)

# 去除多余空格
text = ' '.join(text.split())

# 去除特殊字符
text = re.sub(r'[^\w\s一-鿿,。!?;:""''()【】、…—]', '', text)

# 标准化标点符号
text = text.replace(',', ',').replace('。', '。')

return text

def validate_content(self, text):
"""验证内容质量"""
# 检查长度
if len(text) < 10 or len(text) > 2000:
return False

# 检查重复性
if self._is_repetitive(text):
return False

# 检查语言质量
if not self._check_language_quality(text):
return False

return True

def _is_repetitive(self, text):
"""检查是否重复"""
words = text.split()
unique_words = set(words)
return len(unique_words) / len(words) < 0.1

def _check_language_quality(self, text):
"""检查语言质量"""
# 检查连续标点
if re.search(r'[。!?]{2,}', text):
return False

# 检查乱码
if re.search(r'[^\x00-\x7F一-鿿]', text):
return False

return True

数据标注

class DataAnnotator:
def __init__(self, annotation_schema):
self.schema = annotation_schema

def automatic_annotation(self, data, model):
"""自动标注"""
annotated_data = []

for item in data:
# 使用模型进行标注
annotation = model.predict(item)

# 验证标注质量
if self._validate_annotation(annotation):
annotated_item = {
**item,
'annotation': annotation
}
annotated_data.append(annotated_item)

return annotated_data

def manual_annotation_interface(self, data):
"""手动标注界面"""
# 创建标注界面
interface = self._create_annotation_ui(data)

# 处理标注结果
annotated_data = self._process_annotation_results(interface)

return annotated_data

def _validate_annotation(self, annotation):
"""验证标注质量"""
# 检查完整性
if not all(key in annotation for key in self.schema.keys()):
return False

# 检查一致性
if not self._check_annotation_consistency(annotation):
return False

return True

2.3 数据增强

文本增强

import nlpaug.augmenter as naa
from sklearn.utils import shuffle

class DataAugmentor:
def __init__(self):
self.augmenters = {
'synonym': naa.SynonymAug(),
'insert': naa.ContextualWordEmbsAug(),
'swap': naa.RandomWordAug(action="swap"),
'delete': naa.RandomWordAug(action="delete")
}

def augment_text(self, text, augment_type='synonym', n_augment=3):
"""文本增强"""
if augment_type not in self.augmenters:
return [text]

augmenter = self.augmenters[augment_type]
augmented_texts = augmenter.augment(text, n=n_augment)

return augmented_texts

def create_balanced_dataset(self, dataset, target_ratio=0.8):
"""创建平衡数据集"""
# 计算类别分布
class_counts = dataset['label'].value_counts()
max_count = class_counts.max()

balanced_data = []

for label in class_counts.index:
# 获取当前类别的数据
class_data = dataset[dataset['label'] == label]

# 计算需要增强的数量
current_count = len(class_data)
if current_count < max_count * target_ratio:
# 需要增强
n_augment = int(max_count * target_ratio - current_count)

# 随机选择样本进行增强
samples_to_augment = class_data.sample(n=min(n_augment, len(class_data)))

for _, row in samples_to_augment.iterrows():
augmented_text = self.augment_text(row['text'])
for aug_text in augmented_text:
balanced_data.append({
'text': aug_text,
'label': label
})

# 合并原始数据和增强数据
balanced_dataset = pd.concat([dataset, pd.DataFrame(balanced_data)], ignore_index=True)

# 打乱数据
balanced_dataset = shuffle(balanced_dataset)

return balanced_dataset

对话数据增强

class ConversationAugmentor:
def __init__(self):
self.dialog_templates = [
"你能否详细解释一下{topic}?",
"关于{topic},有什么需要注意的吗?",
"请告诉我{topic}的相关信息。",
"我想了解更多关于{topic}的内容。"
]

def augment_conversation(self, conversation, n_augment=2):
"""增强对话数据"""
augmented_conversations = [conversation]

# 生成不同的提问方式
for i in range(n_augment):
augmented_conv = self._create_augmented_version(conversation)
augmented_conversations.append(augmented_conv)

return augmented_conversations

def _create_augmented_version(self, conversation):
"""创建增强版本"""
augmented = conversation.copy()

# 替换提问方式
if 'instruction' in augmented:
original_instruction = augmented['instruction']
# 这里可以添加更复杂的替换逻辑
augmented['instruction'] = f"请详细说明{original_instruction}"

# 添加相关背景
if 'context' not in augmented:
augmented['context'] = "这是在专业领域内的问答。"

return augmented

3. 模型选择与准备

3.1 预训练模型选择

开源模型

from transformers import AutoModelForCausalLM, AutoTokenizer

class ModelSelector:
def __init__(self):
self.available_models = {
'llama2': {
'name': 'meta-llama/Llama-2-7b-chat-hf',
'size': '7B',
'language': 'multilingual',
'license': 'LLaMA 2 Community License'
},
'chatglm': {
'name': 'THUDM/chatglm3-6b',
'size': '6B',
'language': 'Chinese',
'license': 'Apache 2.0'
},
'baichuan': {
'name': 'baichuan-inc/Baichuan2-7B-Chat',
'size': '7B',
'language': 'Chinese',
'license': 'Apache 2.0'
},
'qwen': {
'name': 'Qwen/Qwen-7B-Chat',
'size': '7B',
'language': 'multilingual',
'license': 'Apache 2.0'
}
}

def select_model(self, requirements):
"""根据需求选择模型"""
best_model = None
best_score = 0

for model_name, model_info in self.available_models.items():
score = self._calculate_score(requirements, model_info)
if score > best_score:
best_score = score
best_model = model_name

return best_model

def _calculate_score(self, requirements, model_info):
"""计算适配分数"""
score = 0

# 语言匹配
if 'language' in requirements:
if requirements['language'] in model_info['language']:
score += 30

# 规模匹配
if 'size' in requirements:
size_score = self._size_match_score(requirements['size'], model_info['size'])
score += size_score

# 许可证要求
if 'license' in requirements:
if requirements['license'] == model_info['license']:
score += 20

return score

def _size_match_score(self, required_size, model_size):
"""规模匹配分数"""
size_mapping = {
'3B': 3,
'7B': 7,
'13B': 13,
'30B': 30,
'65B': 65
}

if required_size in size_mapping and model_size in size_mapping:
required_val = size_mapping[required_size]
model_val = size_mapping[model_size]

# 计算差异
diff = abs(required_val - model_val)
max_diff = max(required_val, model_val)

return (1 - diff / max_diff) * 50

return 0

模型加载

class ModelLoader:
def __init__(self):
self.loaded_models = {}

def load_model(self, model_name, device='cuda'):
"""加载模型"""
if model_name in self.loaded_models:
return self.loaded_models[model_name]

# 加载模型和tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name)

# 设置模型参数
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.bfloat16,
device_map=device,
trust_remote_code=True
)

# 保存到缓存
self.loaded_models[model_name] = {
'model': model,
'tokenizer': tokenizer
}

return model, tokenizer

def optimize_model(self, model, optimization_type='quantization'):
"""优化模型"""
if optimization_type == 'quantization':
return self._quantize_model(model)
elif optimization_type == 'pruning':
return self._prune_model(model)
elif optimization_type == 'distillation':
return self._distill_model(model)

def _quantize_model(self, model):
"""量化模型"""
from transformers import BitsAndBytesConfig

quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type='nf4',
bnb_4bit_compute_dtype=torch.bfloat16
)

return model.to(quantization_config)

def _prune_model(self, model, pruning_ratio=0.1):
"""剪枝模型"""
import torch.nn.utils.prune as prune

# 获取所有权重参数
parameters = [
(name, param) for name, param in model.named_parameters()
if 'weight' in name
]

# 剪枝
for name, param in parameters:
prune.l1_unstructured(param, name='weight', amount=pruning_ratio)

return model

def _distill_model(self, teacher_model, student_model):
"""知识蒸馏"""
# 实现知识蒸馏逻辑
return student_model

3.2 模型准备

模型配置

class ModelConfig:
def __init__(self, model_name, config):
self.model_name = model_name
self.config = config
self.model = None
self.tokenizer = None

def prepare_model(self):
"""准备模型"""
# 加载基础模型
self.model, self.tokenizer = self._load_base_model()

# 应用配置
self._apply_model_config()

# 设置特殊标记
self._setup_special_tokens()

return self.model, self.tokenizer

def _load_base_model(self):
"""加载基础模型"""
model_name = self.config.get('model_name', self.model_name)
device = self.config.get('device', 'cuda')

# 加载tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name)

# 加载模型
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.bfloat16,
device_map=device,
trust_remote_code=True
)

return model, tokenizer

def _apply_model_config(self):
"""应用模型配置"""
# 设置模型参数
if hasattr(self.model.config, 'max_length'):
self.model.config.max_length = self.config.get('max_length', 2048)

if hasattr(self.model.config, 'temperature'):
self.model.config.temperature = self.config.get('temperature', 0.7)

if hasattr(self.model.config, 'top_p'):
self.model.config.top_p = self.config.get('top_p', 0.9)

if hasattr(self.model.config, 'top_k'):
self.model.config.top_k = self.config.get('top_k', 50)

def _setup_special_tokens(self):
"""设置特殊标记"""
# 添加特殊标记
special_tokens = self.config.get('special_tokens', {})

if special_tokens:
# 扩展tokenizer
tokenizer_length = len(self.tokenizer)

# 添加新词
new_tokens = []
for token in special_tokens:
if token not in self.tokenizer.vocab:
new_tokens.append(token)

if new_tokens:
self.tokenizer.add_tokens(new_tokens)

# 调整模型嵌入层
self.model.resize_token_embeddings(tokenizer_length + len(new_tokens))

模型适配

class ModelAdapter:
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer

def adapt_for_instruction(self, instruction_format="chatml"):
"""适配指令格式"""
if instruction_format == "chatml":
# ChatML格式适配
self._setup_chatml_format()
elif instruction_format == "alpaca":
# Alpaca格式适配
self._setup_alpaca_format()
elif instruction_format == "vicuna":
# Vicuna格式适配
self._setup_vicuna_format()

def _setup_chatml_format(self):
"""设置ChatML格式"""
# 添加特殊标记
special_tokens = {
"bos_token": "<|startoftext|>",
"eos_token": "<|endoftext|>",
"unk_token": "<|unknown|>",
"pad_token": "<|pad|>"
}

self.tokenizer.add_special_tokens(special_tokens)

# 调整模型
self.model.resize_token_embeddings(len(self.tokenizer))

def _setup_alpaca_format(self):
"""设置Alpaca格式"""
# Alpaca指令格式
self.alpaca_template = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
{instruction}

### Input:
{input}

### Response:
{response}"""

def _setup_vicuna_format(self):
"""设置Vicuna格式"""
# Vicuna指令格式
self.vicuna_template = """A chat between a curious user and an artificial intelligence assistant. The assistant gives helpful, detailed, and polite answers to the user's questions.

USER: {instruction}
ASSISTANT: {response}"""

def format_instruction(self, instruction, input_text="", response=""):
"""格式化指令"""
if hasattr(self, 'alpaca_template'):
return self.alpaca_template.format(
instruction=instruction,
input=input_text,
response=response
)
elif hasattr(self, 'vicuna_template'):
return self.vicuna_template.format(
instruction=instruction,
input=input_text,
response=response
)
else:
# 默认格式
return f"Instruction: {instruction}\nInput: {input_text}\nResponse: {response}"

3.3 参数高效微调准备

LoRA配置

from peft import LoraConfig, get_peft_model, TaskType

class LoRAConfig:
def __init__(self, config):
self.config = config
self.lora_config = None

def create_lora_config(self):
"""创建LoRA配置"""
self.lora_config = LoraConfig(
task_type=TaskType.CAUSAL_LM,
inference_mode=False,
r=self.config.get('r', 64), # Rank
lora_alpha=self.config.get('alpha', 16), # Alpha
lora_dropout=self.config.get('dropout', 0.1), # Dropout
target_modules=self.config.get('target_modules', [
'q_proj', 'k_proj', 'v_proj', 'o_proj',
'gate_proj', 'up_proj', 'down_proj'
]),
modules_to_save=self.config.get('modules_to_save', []),
bias=self.config.get('bias', 'none')
)

return self.lora_config

def apply_lora(self, model):
"""应用LoRA"""
lora_config = self.create_lora_config()
model = get_peft_model(model, lora_config)
return model

def print_trainable_parameters(self, model):
"""打印可训练参数"""
trainable_params = 0
all_param = 0

for _, param in model.named_parameters():
all_param += param.numel()
if param.requires_grad:
trainable_params += param.numel()

print(
f"trainable params: {trainable_params} || all params: {all_param} || "
f"trainable%: {100 * trainable_params / all_param}"
)

其他PEFT方法

class PEFTConfig:
def __init__(self, config):
self.config = config

def create_adalora_config(self):
"""创建AdaLoRA配置"""
from peft import AdaLoraConfig

return AdaLoraConfig(
task_type=TaskType.CAUSAL_LM,
inference_mode=False,
r=self.config.get('r', 64),
target_modules=self.config.get('target_modules', [
'q_proj', 'k_proj', 'v_proj', 'o_proj'
]),
initial_alpha=self.config.get('alpha', 16),
beta1=self.config.get('beta1', 0.3),
beta2=self.config.get('beta2', 0.3),
orth_reg_weight=self.config.get('orth_reg_weight', 0.5),
lora_alpha=self.config.get('lora_alpha', 16),
lora_dropout=self.config.get('lora_dropout', 0.1)
)

def create_qlora_config(self):
"""创建QLoRA配置"""
from transformers import BitsAndBytesConfig

bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type='nf4',
bnb_4bit_compute_dtype=torch.bfloat16
)

return bnb_config

def create_prefix_tuning_config(self):
"""创建Prefix Tuning配置"""
from peft import PrefixTuningConfig

return PrefixTuningConfig(
task_type=TaskType.CAUSAL_LM,
num_virtual_tokens=self.config.get('num_virtual_tokens', 20),
encoder_hidden_size=self.config.get('encoder_hidden_size', 512)
)

4. 微调训练实现

4.1 训练参数配置

基础训练配置

from transformers import TrainingArguments, Trainer

class TrainingConfig:
def __init__(self, config):
self.config = config

def create_training_arguments(self):
"""创建训练参数"""
return TrainingArguments(
output_dir=self.config.get('output_dir', './fine-tuned-model'),
num_train_epochs=self.config.get('num_train_epochs', 3),
per_device_train_batch_size=self.config.get('per_device_train_batch_size', 8),
per_device_eval_batch_size=self.config.get('per_device_eval_batch_size', 8),
gradient_accumulation_steps=self.config.get('gradient_accumulation_steps', 1),
optim=self.config.get('optim', 'adamw_torch'),
save_steps=self.config.get('save_steps', 500),
logging_steps=self.config.get('logging_steps', 10),
learning_rate=self.config.get('learning_rate', 2e-5),
weight_decay=self.config.get('weight_decay', 0.001),
fp16=self.config.get('fp16', False),
bf16=self.config.get('bf16', True),
max_grad_norm=self.config.get('max_grad_norm', 1.0),
max_steps=self.config.get('max_steps', -1),
warmup_ratio=self.config.get('warmup_ratio', 0.03),
group_by_length=self.config.get('group_by_length', True),
length_column_name=self.config.get('length_column_name', 'length'),
eval_steps=self.config.get('eval_steps', 500),
eval_accumulation_steps=self.config.get('eval_accumulation_steps', 1),
remove_unused_columns=False,
report_to=self.config.get('report_to', 'tensorboard'),
ddp_timeout=self.config.get('ddp_timeout', 1800),
logging_first_step=self.config.get('logging_first_step', True),
load_best_model_at_end=self.config.get('load_best_model_at_end', True),
metric_for_best_model=self.config.get('metric_for_best_model', 'eval_loss'),
greater_is_better=self.config.get('greater_is_better', False),
save_total_limit=self.config.get('save_total_limit', 2),
seed=self.config.get('seed', 42),
data_seed=self.config.get('data_seed', 42),
torchdynamo=self.config.get('torchdynamo', None),
dataloader_drop_last=self.config.get('dataloader_drop_last', False),
dataloader_num_workers=self.config.get('dataloader_num_workers', 0),
dataloader_pin_memory=self.config.get('dataloader_pin_memory', True),
dataloader_prefetch_factor=self.config.get('dataloader_prefetch_factor', 2),
label_smoothing_factor=self.config.get('label_smoothing_factor', 0.0),
adafactor=self.config.get('adafactor', False),
jax_dynabatch_min=self.config.get('jax_dynabatch_min', 1),
jax_dynabatch_max=self.config.get('jax_dynabatch_max', 32),
jax_dynabatch_step=self.config.get('jax_dynabatch_step', 1),
jax_allow_mismatched_shapes=self.config.get('jax_allow_mismatched_shapes', False),
torch_empty_cache_freq=self.config.get('torch_empty_cache_freq', 0),
neftune_noise_alpha=self.config.get('neftune_noise_alpha', 0.0),
hub_model_id=self.config.get('hub_model_id', None),
hub_private_repo=self.config.get('hub_private_repo', False),
push_to_hub=self.config.get('push_to_hub', False),
hub_strategy=self.config.get('hub_strategy', 'checkpoint'),
hub_always_push=self.config.get('hub_always_push', False),
hub_revision=self.config.get('hub_revision', 'main'),
hub_token=self.config.get('hub_token', None),
deepspeed=self.config.get('deepspeed', None),
fsdp=self.config.get('fsdp', ''),
fsdp_config=self.config.get('fsdp_config', None),
fsdp_transformer_layer_cls_to_wrap=self.config.get('fsdp_transformer_layer_cls_to_wrap', None),
accelerator_config=self.config.get('accelerator_config', None),
kwargs_handlers=self.config.get('kwargs_handlers', None),
do_train=self.config.get('do_train', True),
do_eval=self.config.get('do_eval', True),
do_predict=self.config.get('do_predict', False),
evaluation_strategy=self.config.get('evaluation_strategy', 'no'),
prediction_loss_only=self.config.get('prediction_loss_only', False),
ignore_data_skip=self.config.get('ignore_data_skip', False),
dataloader_sort_fn=self.config.get('dataloader_sort_fn', None),
skip_memory_metrics=self.config.get('skip_memory_metrics', True),
push_to_hub_token=self.config.get('push_to_hub_token', None),
resume_from_checkpoint=self.config.get('resume_from_checkpoint', None),
metric_for_best_model_value=self.config.get('metric_for_best_model_value', None),
include_inputs_for_metrics=self.config.get('include_inputs_for_metrics', False),
fp16_full_eval=self.config.get('fp16_full_eval', False),
ddp_find_unused_parameters=self.config.get('ddp_find_unused_parameters', None),
evaluation_strategy=self.config.get('evaluation_strategy', 'no'),
eval_accumulation_steps=self.config.get('eval_accumulation_steps', 1)
)

自定义训练配置

class CustomTrainingConfig:
def __init__(self, base_config, custom_settings):
self.base_config = base_config
self.custom_settings = custom_settings

def get_config(self):
"""获取配置"""
config = self.base_config.copy()

# 应用自定义设置
for key, value in self.custom_settings.items():
if key in config:
# 覆盖现有配置
config[key] = value
else:
# 添加新配置
config[key] = value

return config

def validate_config(self):
"""验证配置"""
required_keys = [
'output_dir', 'num_train_epochs', 'learning_rate',
'per_device_train_batch_size', 'per_device_eval_batch_size'
]

for key in required_keys:
if key not in self.base_config:
raise ValueError(f"Missing required config: {key}")

# 验证参数合理性
if self.base_config['learning_rate'] <= 0:
raise ValueError("Learning rate must be positive")

if self.base_config['num_train_epochs'] <= 0:
raise ValueError("Number of epochs must be positive")

if self.base_config['per_device_train_batch_size'] <= 0:
raise ValueError("Batch size must be positive")

4.2 数据加载与预处理

数据集类

from torch.utils.data import Dataset
import torch

class FineTuningDataset(Dataset):
def __init__(self, dataset, tokenizer, max_length=512, is_instruction=True):
self.dataset = dataset
self.tokenizer = tokenizer
self.max_length = max_length
self.is_instruction = is_instruction

def __len__(self):
return len(self.dataset)

def __getitem__(self, idx):
item = self.dataset[idx]

# 处理不同格式
if self.is_instruction:
text = self._format_instruction(item)
else:
text = item['text']

# 编码文本
encoding = self.tokenizer(
text,
truncation=True,
padding='max_length',
max_length=self.max_length,
return_tensors='pt'
)

return {
'input_ids': encoding['input_ids'].squeeze(),
'attention_mask': encoding['attention_mask'].squeeze(),
'labels': encoding['input_ids'].squeeze()
}

def _format_instruction(self, item):
"""格式化指令"""
if 'instruction' in item and 'response' in item:
return f"Instruction: {item['instruction']}\n\nResponse: {item['response']}"
elif 'prompt' in item and 'completion' in item:
return f"Prompt: {item['prompt']}\n\nCompletion: {item['completion']}"
elif 'text' in item:
return item['text']
else:
return str(item)

def collate_fn(self, batch):
"""批处理函数"""
input_ids = torch.stack([item['input_ids'] for item in batch])
attention_mask = torch.stack([item['attention_mask'] for item in batch])
labels = torch.stack([item['labels'] for item in batch])

return {
'input_ids': input_ids,
'attention_mask': attention_mask,
'labels': labels
}

数据加载器

from torch.utils.data import DataLoader

class DataLoaderFactory:
def __init__(self, tokenizer, max_length=512):
self.tokenizer = tokenizer
self.max_length = max_length

def create_dataloader(self, dataset, batch_size=8, shuffle=True, is_instruction=True):
"""创建数据加载器"""
# 创建数据集
fine_tuning_dataset = FineTuningDataset(
dataset,
self.tokenizer,
self.max_length,
is_instruction
)

# 创建数据加载器
dataloader = DataLoader(
fine_tuning_dataset,
batch_size=batch_size,
shuffle=shuffle,
collate_fn=fine_tuning_dataset.collate_fn
)

return dataloader

def create_train_eval_dataloaders(self, train_dataset, eval_dataset, batch_size=8):
"""创建训练和验证数据加载器"""
train_dataloader = self.create_dataloader(
train_dataset,
batch_size=batch_size,
shuffle=True
)

eval_dataloader = self.create_dataloader(
eval_dataset,
batch_size=batch_size,
shuffle=False
)

return train_dataloader, eval_dataloader

4.3 训练实现

自定义Trainer

from transformers import Trainer, TrainingArguments
import torch
import numpy as np
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

class CustomTrainer(Trainer):
def __init__(self, model=None, args=None, data_collator=None,
train_dataset=None, eval_dataset=None,
tokenizer=None, compute_metrics=None):
super().__init__(
model=model,
args=args,
data_collator=data_collator,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
tokenizer=tokenizer,
compute_metrics=compute_metrics
)

def compute_loss(self, model, inputs, return_outputs=False):
"""计算损失"""
# 前向传播
outputs = model(**inputs)

# 计算损失
loss = outputs.loss

# 如果需要,返回输出
if return_outputs:
return loss, outputs
else:
return loss

def prediction_step(self, model, inputs, prediction_loss_only=False, ignore_keys=None):
"""预测步骤"""
with torch.no_grad():
outputs = model(**inputs)
loss = outputs.loss

if prediction_loss_only:
return (loss,)
else:
# 获取预测结果
logits = outputs.logits
predictions = torch.argmax(logits, dim=-1)

return (loss, predictions, outputs)

def save_model(self, output_dir=None, _internal_call=False):
"""保存模型"""
if output_dir is None:
output_dir = self.args.output_dir

# 保存模型
super().save_model(output_dir, _internal_call)

# 保存额外信息
if hasattr(self.model, 'save_pretrained'):
self.model.save_pretrained(output_dir)

if self.tokenizer is not None:
self.tokenizer.save_pretrained(output_dir)

训练函数

class FineTuningTrainer:
def __init__(self, model, tokenizer, config):
self.model = model
self.tokenizer = tokenizer
self.config = config
self.trainer = None

def train(self, train_dataset, eval_dataset=None):
"""训练模型"""
# 创建训练参数
training_args = self.config.create_training_arguments()

# 计算指标函数
compute_metrics = self._compute_metrics if self.config.get('compute_metrics', True) else None

# 创建训练器
self.trainer = CustomTrainer(
model=self.model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
tokenizer=self.tokenizer,
compute_metrics=compute_metrics
)

# 开始训练
print("Starting fine-tuning...")
self.trainer.train()

print("Fine-tuning completed!")

# 保存模型
self.trainer.save_model()

return self.model

def _compute_metrics(self, eval_preds):
"""计算评估指标"""
logits, labels = eval_preds

# 获取预测结果
predictions = np.argmax(logits, axis=-1)

# 计算指标
accuracy = accuracy_score(labels, predictions)
precision, recall, f1, _ = precision_recall_fscore_support(
labels, predictions, average='weighted'
)

return {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1': f1
}

def evaluate(self, eval_dataset):
"""评估模型"""
if self.trainer is None:
raise ValueError("Trainer not initialized. Call train() first.")

# 评估模型
results = self.trainer.evaluate(eval_dataset)

print("Evaluation results:")
for key, value in results.items():
print(f"{key}: {value:.4f}")

return results

混合精度训练

class MixedPrecisionTrainer:
def __init__(self, model, tokenizer, config):
self.model = model
self.tokenizer = tokenizer
self.config = config

def train_mixed_precision(self, train_dataset, eval_dataset=None):
"""混合精度训练"""
# 设置混合精度
scaler = torch.cuda.amp.GradScaler()

# 创建优化器
optimizer = torch.optim.AdamW(
self.model.parameters(),
lr=self.config.get('learning_rate', 2e-5)
)

# 创建数据加载器
dataloader = DataLoader(
train_dataset,
batch_size=self.config.get('per_device_train_batch_size', 8),
shuffle=True,
collate_fn=self._collate_fn
)

# 训练循环
self.model.train()
for epoch in range(self.config.get('num_train_epochs', 3)):
epoch_loss = 0.0

for batch in dataloader:
# 清空梯度
optimizer.zero_grad()

# 混合精度前向传播
with torch.cuda.amp.autocast():
outputs = self.model(
input_ids=batch['input_ids'],
attention_mask=batch['attention_mask'],
labels=batch['labels']
)
loss = outputs.loss

# 缩放损失并反向传播
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()

epoch_loss += loss.item()

avg_loss = epoch_loss / len(dataloader)
print(f"Epoch {epoch + 1}, Average Loss: {avg_loss:.4f}")

def _collate_fn(self, batch):
"""批处理函数"""
input_ids = torch.stack([item['input_ids'] for item in batch])
attention_mask = torch.stack([item['attention_mask'] for item in batch])
labels = torch.stack([item['labels'] for item in batch])

return {
'input_ids': input_ids,
'attention_mask': attention_mask,
'labels': labels
}

5. 评估与优化

5.1 评估指标

自动评估

class ModelEvaluator:
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
self.evaluation_metrics = {}

def evaluate_model(self, test_dataset, metrics=['perplexity', 'accuracy', 'bleu']):
"""评估模型"""
results = {}

for metric in metrics:
if metric == 'perplexity':
results[metric] = self._calculate_perplexity(test_dataset)
elif metric == 'accuracy':
results[metric] = self._calculate_accuracy(test_dataset)
elif metric == 'bleu':
results[metric] = self._calculate_bleu_score(test_dataset)
elif metric == 'rouge':
results[metric] = self._calculate_rouge_score(test_dataset)
elif metric == 'bert_score':
results[metric] = self._calculate_bert_score(test_dataset)

self.evaluation_metrics = results
return results

def _calculate_perplexity(self, dataset):
"""计算困惑度"""
total_loss = 0
total_tokens = 0

for item in dataset:
with torch.no_grad():
outputs = self.model(
input_ids=item['input_ids'].unsqueeze(0),
attention_mask=item['attention_mask'].unsqueeze(0),
labels=item['labels'].unsqueeze(0)
)

loss = outputs.loss
tokens = item['input_ids'].numel()

total_loss += loss.item() * tokens
total_tokens += tokens

avg_loss = total_loss / total_tokens
perplexity = torch.exp(torch.tensor(avg_loss)).item()

return perplexity

def _calculate_accuracy(self, dataset):
"""计算准确率"""
correct = 0
total = 0

for item in dataset:
with torch.no_grad():
outputs = self.model(
input_ids=item['input_ids'].unsqueeze(0),
attention_mask=item['attention_mask'].unsqueeze(0)
)

predictions = torch.argmax(outputs.logits, dim=-1)
targets = item['labels']

# 只计算非填充位置的准确率
mask = targets != self.tokenizer.pad_token_id
correct += (predictions == targets).sum().item()
total += mask.sum().item()

accuracy = correct / total if total > 0 else 0
return accuracy

def _calculate_bleu_score(self, dataset):
"""计算BLEU分数"""
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

smoothie = SmoothingFunction().method4
total_bleu = 0
count = 0

for item in dataset:
# 生成预测
with torch.no_grad():
outputs = self.model.generate(
input_ids=item['input_ids'].unsqueeze(0),
attention_mask=item['attention_mask'].unsqueeze(0),
max_length=self.tokenizer.model_max_length
)

prediction = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
reference = self.tokenizer.decode(item['labels'], skip_special_tokens=True)

# 计算BLEU
ref_tokens = reference.split()
pred_tokens = prediction.split()

try:
bleu = sentence_bleu(
[ref_tokens],
pred_tokens,
smoothing_function=smoothie
)
total_bleu += bleu
count += 1
except:
continue

return total_bleu / count if count > 0 else 0

def _calculate_rouge_score(self, dataset):
"""计算ROUGE分数"""
from rouge import Rouge

rouge = Rouge()
total_scores = {'rouge-1': 0, 'rouge-2': 0, 'rouge-l': 0}
count = 0

for item in dataset:
# 生成预测
with torch.no_grad():
outputs = self.model.generate(
input_ids=item['input_ids'].unsqueeze(0),
attention_mask=item['attention_mask'].unsqueeze(0),
max_length=self.tokenizer.model_max_length
)

prediction = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
reference = self.tokenizer.decode(item['labels'], skip_special_tokens=True)

# 计算ROUGE
scores = rouge.get_scores(prediction, reference)

total_scores['rouge-1'] += scores[0]['rouge-1']['f']
total_scores['rouge-2'] += scores[0]['rouge-2']['f']
total_scores['rouge-l'] += scores[0]['rouge-l']['f']
count += 1

rouge_scores = {
'rouge-1': total_scores['rouge-1'] / count,
'rouge-2': total_scores['rouge-2'] / count,
'rouge-l': total_scores['rouge-l'] / count
}

return rouge_scores

def _calculate_bert_score(self, dataset):
"""计算BERT分数"""
from bert_score import score

predictions = []
references = []

for item in dataset:
# 生成预测
with torch.no_grad():
outputs = self.model.generate(
input_ids=item['input_ids'].unsqueeze(0),
attention_mask=item['attention_mask'].unsqueeze(0),
max_length=self.tokenizer.model_max_length
)

prediction = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
reference = self.tokenizer.decode(item['labels'], skip_special_tokens=True)

predictions.append(prediction)
references.append(reference)

# 计算BERT分数
P, R, F1 = score(predictions, references, lang='zh')

return {
'precision': P.mean().item(),
'recall': R.mean().item(),
'f1': F1.mean().item()
}

人工评估

class HumanEvaluator:
def __init__(self):
self.evaluation_results = []
self.evaluation_criteria = {
'fluency': '流畅度',
'relevance': '相关性',
'accuracy': '准确性',
'completeness': '完整性',
'helpfulness': '有用性'
}

def conduct_evaluation(self, model, test_dataset, evaluators=3):
"""人工评估"""
results = []

for i in range(min(len(test_dataset), 10)): # 评估前10个样本
item = test_dataset[i]

# 生成模型输出
model_output = self._generate_model_output(model, item)

# 收集评价
evaluations = self._collect_evaluations(
item,
model_output,
evaluators
)

results.append({
'sample_id': i,
'model_output': model_output,
'evaluations': evaluations,
'average_score': self._calculate_average_score(evaluations)
})

self.evaluation_results = results
return results

def _generate_model_output(self, model, item):
"""生成模型输出"""
with torch.no_grad():
outputs = model.generate(
input_ids=item['input_ids'].unsqueeze(0),
attention_mask=item['attention_mask'].unsqueeze(0),
max_length=model.config.max_length
)

return model.tokenizer.decode(outputs[0], skip_special_tokens=True)

def _collect_evaluations(self, item, model_output, num_evaluators):
"""收集评价"""
evaluations = []

for evaluator_id in range(num_evaluators):
evaluation = self._single_evaluation(
item,
model_output,
evaluator_id
)
evaluations.append(evaluation)

return evaluations

def _single_evaluation(self, item, model_output, evaluator_id):
"""单个评价"""
# 这里应该实现实际的评价界面
# 示例返回模拟评价
return {
'evaluator_id': evaluator_id,
'criteria_scores': {
criterion: np.random.uniform(0.5, 1.0)
for criterion in self.evaluation_criteria
},
'comments': f"Evaluator {evaluator_id} comments..."
}

def _calculate_average_score(self, evaluations):
"""计算平均分数"""
if not evaluations:
return 0

criteria_scores = {
criterion: []
for criterion in self.evaluation_criteria
}

for eval in evaluations:
for criterion, score in eval['criteria_scores'].items():
criteria_scores[criterion].append(score)

average_scores = {
criterion: np.mean(scores)
for criterion, scores in criteria_scores.items()
}

overall_average = np.mean(list(average_scores.values()))

return {
'overall': overall_average,
'by_criteria': average_scores
}

5.2 模型优化

超参数调优

import optuna
from transformers import Trainer, TrainingArguments

class HyperparameterTuner:
def __init__(self, model, tokenizer, train_dataset, eval_dataset):
self.model = model
self.tokenizer = tokenizer
self.train_dataset = train_dataset
self.eval_dataset = eval_dataset
self.study = None

def tune_hyperparameters(self, n_trials=50, timeout=3600):
"""超参数调优"""
self.study = optuna.create_study(
direction='maximize',
pruner=optuna.pruners.MedianPruner(),
study_name='llm_finetuning_tuning'
)

self.study.optimize(
self._objective,
n_trials=n_trials,
timeout=timeout,
show_progress_bar=True
)

return self.study.best_params

def _objective(self, trial):
"""优化目标函数"""
# 定义搜索空间
params = {
'learning_rate': trial.suggest_float('learning_rate', 1e-6, 1e-4, log=True),
'num_train_epochs': trial.suggest_int('num_train_epochs', 1, 10),
'per_device_train_batch_size': trial.suggest_categorical('per_device_train_batch_size', [2, 4, 8, 16]),
'per_device_eval_batch_size': trial.suggest_categorical('per_device_eval_batch_size', [2, 4, 8, 16]),
'warmup_ratio': trial.suggest_float('warmup_ratio', 0.0, 0.3),
'weight_decay': trial.suggest_float('weight_decay', 0.0, 0.1),
'max_grad_norm': trial.suggest_float('max_grad_norm', 0.1, 1.0),
'adam_epsilon': trial.suggest_float('adam_epsilon', 1e-8, 1e-6, log=True),
'lr_scheduler_type': trial.suggest_categorical('lr_scheduler_type', ['linear', 'cosine', 'cosine_with_restarts'])
}

# 创建训练参数
training_args = TrainingArguments(
output_dir='./tuning_results',
logging_dir='./logs',
save_strategy='no',
evaluation_strategy='epoch',
load_best_model_at_end=False,
metric_for_best_model='eval_loss',
greater_is_better=False,
**params
)

# 创建训练器
trainer = Trainer(
model=self.model,
args=training_args,
train_dataset=self.train_dataset,
eval_dataset=self.eval_dataset,
tokenizer=self.tokenizer
)

# 训练和评估
trainer.train()

# 返回评估结果
eval_results = trainer.evaluate()
return eval_results['eval_loss']

def get_best_params(self):
"""获取最佳参数"""
if self.study is None:
raise ValueError("No study conducted. Run tune_hyperparameters() first.")

return self.study.best_params

def save_tuning_results(self, path):
"""保存调优结果"""
if self.study is None:
raise ValueError("No study conducted. Run tune_hyperparameters() first.")

import json
results = {
'best_params': self.study.best_params,
'best_value': self.study.best_value,
'trials': [trial.params for trial in self.study.trials]
}

with open(path, 'w') as f:
json.dump(results, f, indent=2)

模型压缩

class ModelCompressor:
def __init__(self, model):
self.model = model

def quantize_model(self, quantization_type='dynamic'):
"""量化模型"""
if quantization_type == 'dynamic':
return self._dynamic_quantization()
elif quantization_type == 'static':
return self._static_quantization()
elif quantization_type == 'aware':
return self._quantization_aware_training()

def _dynamic_quantization(self):
"""动态量化"""
import torch.quantization

# 转换为量化模型
quantized_model = torch.quantization.quantize_dynamic(
self.model,
{torch.nn.Linear},
dtype=torch.qint8
)

return quantized_model

def _static_quantization(self):
"""静态量化"""
import torch.quantization

# 设置校准数据集(这里需要实际的校准数据)
calibration_data = self._get_calibration_data()

# 融合卷积层
fused_model = torch.quantization.fuse_modules(
self.model,
[['conv', 'bn', 'relu']],
inplace=True
)

# 准备量化
fused_model.qconfig = torch.quantization.get_default_qconfig()

# 转换为量化模型
quantized_model = torch.quantization.convert(fused_model)

return quantized_model

def _quantization_aware_training(self):
"""量化感知训练"""
import torch.quantization

# 设置QAT配置
self.model.qconfig = torch.quantization.get_default_qat_qconfig()

# 准备量化
prepared_model = torch.quantization.prepare_qat(self.model)

# 训练(这里需要实际的训练代码)
# self.train(prepared_model)

# 转换为量化模型
quantized_model = torch.quantization.convert(prepared_model)

return quantized_model

def prune_model(self, pruning_ratio=0.1):
"""剪枝模型"""
import torch.nn.utils.prune as prune

# 获取所有线性层
layers = [
module for module in self.model.modules()
if isinstance(module, torch.nn.Linear)
]

# 剪枝
for layer in layers:
prune.l1_unstructured(layer, name='weight', amount=pruning_ratio)

return self.model

def distill_model(self, teacher_model, temperature=4.0):
"""知识蒸馏"""
import torch.nn.functional as F

# 定义蒸馏损失函数
def distillation_loss(student_outputs, teacher_outputs, targets):
# 软目标损失
soft_targets = F.softmax(teacher_outputs / temperature, dim=-1)
soft_student = F.log_softmax(student_outputs / temperature, dim=-1)
soft_loss = F.kl_div(soft_student, soft_targets, reduction='batchmean')

# 硬目标损失
hard_loss = F.cross_entropy(student_outputs, targets)

# 总损失
total_loss = 0.5 * soft_loss + 0.5 * hard_loss

return total_loss

# 训练学生模型
student_model = self._create_student_model()
optimizer = torch.optim.Adam(student_model.parameters())

for epoch in range(10): # 训练轮数
for batch in self.train_dataloader:
optimizer.zero_grad()

# 前向传播
student_outputs = student_model(batch['input_ids'])
teacher_outputs = teacher_model(batch['input_ids'])

# 计算损失
loss = distillation_loss(
student_outputs,
teacher_outputs,
batch['labels']
)

# 反向传播
loss.backward()
optimizer.step()

return student_model

6. 部署与应用

6.1 模型部署

FastAPI部署

from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn
from typing import List, Optional

class PredictionRequest(BaseModel):
text: str
max_length: int = 512
temperature: float = 0.7
top_p: float = 0.9
top_k: int = 50
num_return_sequences: int = 1

class PredictionResponse(BaseModel):
generated_text: str
input_length: int
output_length: int
generation_time: float
model_name: str

class LLMInferenceAPI:
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
self.model_name = "fine-tuned-llm"
self.device = next(model.parameters()).device

def generate_text(self, request: PredictionRequest):
"""生成文本"""
import time
start_time = time.time()

# 编码输入
inputs = self.tokenizer(
request.text,
return_tensors="pt",
max_length=request.max_length,
truncation=True
).to(self.device)

# 生成参数
generation_params = {
"input_ids": inputs["input_ids"],
"attention_mask": inputs["attention_mask"],
"max_length": request.max_length,
"temperature": request.temperature,
"top_p": request.top_p,
"top_k": request.top_k,
"num_return_sequences": request.num_return_sequences,
"do_sample": True,
"pad_token_id": self.tokenizer.eos_token_id
}

# 生成文本
with torch.no_grad():
outputs = self.model.generate(**generation_params)

# 解码输出
generated_texts = self.tokenizer.batch_decode(
outputs,
skip_special_tokens=True
)

end_time = time.time()

# 如果只生成一个序列
if request.num_return_sequences == 1:
generated_text = generated_texts[0]
else:
generated_text = generated_texts

return PredictionResponse(
generated_text=generated_text,
input_length=len(inputs["input_ids"][0]),
output_length=len(outputs[0]),
generation_time=end_time - start_time,
model_name=self.model_name
)

# FastAPI应用
app = FastAPI(title="LLM Fine-tuned Inference API")

# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

# 初始化API服务(这里需要在实际使用时加载模型)
llm_api = None

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""文本生成接口"""
if llm_api is None:
raise HTTPException(status_code=500, detail="Model not loaded")

return llm_api.generate_text(request)

@app.post("/batch-predict")
async def batch_predict(requests: List[PredictionRequest]):
"""批量生成接口"""
if llm_api is None:
raise HTTPException(status_code=500, detail="Model not loaded")

results = []
for request in requests:
result = llm_api.generate_text(request)
results.append(result)

return {"results": results}

@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy", "model_loaded": llm_api is not None}

@app.get("/models")
async def list_models():
"""列出可用模型"""
return {
"model_name": llm_api.model_name if llm_api else None,
"device": str(llm_api.device) if llm_api else None
}

if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

TGI部署

# text-generation-inference部署配置
from text_generation_server import Server

class TGIDeployer:
def __init__(self, model_path, device="auto"):
self.model_path = model_path
self.device = device
self.server = None

def deploy(self, num_shard=1, max_batch_size=32):
"""部署TGI服务"""
self.server = Server(
model_path=self.model_path,
device=self.device,
num_shard=num_shard,
max_batch_size=max_batch_size,
max_input_length=2048,
max_total_tokens=4096
)

# 启动服务
self.server.run()

def get_config(self):
"""获取配置"""
return {
"model_path": self.model_path,
"device": self.device,
"num_shard": 1,
"max_batch_size": 32,
"max_input_length": 2048,
"max_total_tokens": 4096
}

6.2 性能优化

推理优化

class InferenceOptimizer:
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
self.optimized_model = None

def optimize_inference(self):
"""优化推理性能"""
# 1. 模型量化
self.optimized_model = self._quantize_model()

# 2. 批处理优化
self._optimize_batch_processing()

# 3. 缓存优化
self._optimize_caching()

return self.optimized_model

def _quantize_model(self):
"""量化模型"""
import torch.quantization

# 动态量化
quantized_model = torch.quantization.quantize_dynamic(
self.model,
{torch.nn.Linear},
dtype=torch.qint8
)

return quantized_model

def _optimize_batch_processing(self):
"""优化批处理"""
from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
"""优化的批处理函数"""
# 按长度排序
batch.sort(key=lambda x: len(x['input_ids']), reverse=True)

# 填充序列
input_ids = pad_sequence(
[item['input_ids'] for item in batch],
batch_first=True,
padding_value=self.tokenizer.pad_token_id
)

attention_mask = pad_sequence(
[item['attention_mask'] for item in batch],
batch_first=True,
padding_value=0
)

return {
'input_ids': input_ids,
'attention_mask': attention_mask
}

self.collate_fn = collate_fn

def _optimize_caching(self):
"""优化缓存"""
self.cache = {}
self.cache_size = 1000

def cached_generation(self, text, **kwargs):
"""带缓存的生成"""
cache_key = hash((text, str(kwargs)))

if cache_key in self.cache:
return self.cache[cache_key]

# 生成结果
result = self._generate(text, **kwargs)

# 添加到缓存
if len(self.cache) >= self.cache_size:
# 简单的LRU实现
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]

self.cache[cache_key] = result
return result

内存优化

class MemoryOptimizer:
def __init__(self, model):
self.model = model

def optimize_memory(self):
"""优化内存使用"""
# 1. 清理缓存
torch.cuda.empty_cache()

# 2. 使用模型并行
if torch.cuda.device_count() > 1:
self.model = torch.nn.DataParallel(self.model)

# 3. 优化内存分配
self._optimize_memory_allocation()

# 4. 使用梯度检查点
self._use_gradient_checkpointing()

return self.model

def _optimize_memory_allocation(self):
"""优化内存分配"""
# 设置内存分配器
torch.cuda.set_per_process_memory_fraction(0.8)

# 使用内存高效的优化器
self.optimizer = torch.optim.AdamW(
self.model.parameters(),
lr=2e-5,
weight_decay=0.01,
betas=(0.9, 0.999),
eps=1e-8
)

def _use_gradient_checkpointing(self):
"""使用梯度检查点"""
self.model.gradient_checkpointing_enable()

# 对于Hugging Face模型
if hasattr(self.model, 'enable_input_require_grads'):
self.model.enable_input_require_grads()

6.3 监控与日志

性能监控

import prometheus_client
from prometheus_client import Counter, Histogram, Gauge

class LLMMonitor:
def __init__(self):
# 创建指标
self.request_counter = Counter(
'llm_requests_total',
'Total number of LLM requests',
['model', 'endpoint']
)

self.response_time = Histogram(
'llm_response_time_seconds',
'LLM response time in seconds',
['model', 'endpoint']
)

self.error_counter = Counter(
'llm_errors_total',
'Total number of LLM errors',
['error_type', 'model']
)

self.active_requests = Gauge(
'llm_active_requests',
'Number of active LLM requests'
)

self.memory_usage = Gauge(
'llm_memory_usage_bytes',
'LLM memory usage in bytes',
['device']
)

self.gpu_usage = Gauge(
'llm_gpu_usage_percent',
'LLM GPU usage percentage',
['device']
)

def record_request(self, model, endpoint, duration, success=True):
"""记录请求"""
self.request_counter.labels(model=model, endpoint=endpoint).inc()

if success:
self.response_time.labels(model=model, endpoint=endpoint).observe(duration)
else:
self.error_counter.labels(
error_type='generation_failure',
model=model
).inc()

def track_active_request(self, delta):
"""跟踪活跃请求数"""
self.active_requests.inc(delta)

def update_memory_usage(self, device='cuda'):
"""更新内存使用情况"""
if torch.cuda.is_available():
memory_allocated = torch.cuda.memory_allocated(device)
self.memory_usage.set(memory_allocated, labels={'device': device})

gpu_utilization = self._get_gpu_utilization()
self.gpu_usage.set(gpu_utilization, labels={'device': device})

def _get_gpu_utilization(self):
"""获取GPU利用率"""
# 这里可以使用nvidia-ml-py或其他库来获取GPU利用率
# 示例实现
try:
import GPUtil
gpus = GPUtil.getGPUs()
if gpus:
return gpus[0].utilization
except:
pass

return 0.0

日志记录

import logging
from logging.handlers import RotatingFileHandler
import json
import time

class LLMLogger:
def __init__(self, log_dir='./logs'):
self.log_dir = log_dir
self.setup_logging()

def setup_logging(self):
"""设置日志"""
# 创建日志目录
os.makedirs(self.log_dir, exist_ok=True)

# 配置日志格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# 文件处理器
file_handler = RotatingFileHandler(
os.path.join(self.log_dir, 'llm.log'),
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setFormatter(formatter)

# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)

# 配置根日志记录器
logging.basicConfig(
level=logging.INFO,
handlers=[file_handler, console_handler]
)

self.logger = logging.getLogger('LLM')

def log_generation(self, request, response, duration):
"""记录生成请求"""
log_entry = {
'timestamp': time.time(),
'request': {
'text': request.text,
'max_length': request.max_length,
'temperature': request.temperature,
'top_p': request.top_p
},
'response': {
'generated_text': response.generated_text,
'output_length': response.output_length
},
'duration': duration,
'success': True
}

self.logger.info(f"Generation: {json.dumps(log_entry)}")

def log_error(self, error, request=None):
"""记录错误"""
log_entry = {
'timestamp': time.time(),
'error': str(error),
'request': request if request else None
}

self.logger.error(f"Error: {json.dumps(log_entry)}")

def log_performance(self, metrics):
"""记录性能指标"""
log_entry = {
'timestamp': time.time(),
'metrics': metrics
}

self.logger.info(f"Performance: {json.dumps(log_entry)}")

7. 案例实践

7.1 中文医疗问答微调

数据准备

import pandas as pd

class MedicalDataProcessor:
def __init__(self, data_path):
self.data_path = data_path
self.processed_data = None

def process_medical_data(self):
"""处理医疗数据"""
# 加载数据
data = pd.read_csv(self.data_path)

# 数据清洗
data = self._clean_medical_data(data)

# 格式化
formatted_data = self._format_medical_data(data)

# 划分数据集
train_data, eval_data = self._split_dataset(formatted_data)

self.processed_data = {
'train': train_data,
'eval': eval_data
}

return self.processed_data

def _clean_medical_data(self, data):
"""清洗医疗数据"""
# 去除空值
data = data.dropna(subset=['question', 'answer'])

# 标准化文本
data['question'] = data['question'].str.strip()
data['answer'] = data['answer'].str.strip()

# 去除重复项
data = data.drop_duplicates(subset=['question'])

# 长度过滤
data = data[
(data['question'].str.len() > 10) &
(data['question'].str.len() < 500) &
(data['answer'].str.len() > 20) &
(data['answer'].str.len() < 2000)
]

return data

def _format_medical_data(self, data):
"""格式化医疗数据"""
formatted_data = []

for _, row in data.iterrows():
formatted_item = {
'instruction': f"医疗问题:{row['question']}",
'input': "",
'output': f"医疗回答:{row['answer']}"
}
formatted_data.append(formatted_item)

return formatted_data

def _split_dataset(self, data, train_ratio=0.8):
"""划分数据集"""
import numpy as np

# 随机打乱
np.random.seed(42)
indices = np.random.permutation(len(data))

train_size = int(len(data) * train_ratio)
train_indices = indices[:train_size]
eval_indices = indices[train_size:]

train_data = [data[i] for i in train_indices]
eval_data = [data[i] for i in eval_indices]

return train_data, eval_data

微调训练

class MedicalLLMTrainer:
def __init__(self, model_name, medical_data):
self.model_name = model_name
self.medical_data = medical_data
self.model = None
self.tokenizer = None

def setup_model(self):
"""设置模型"""
from transformers import AutoModelForCausalLM, AutoTokenizer

# 加载模型和tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
self.model = AutoModelForCausalLM.from_pretrained(
self.model_name,
torch_dtype=torch.bfloat16,
device_map="auto",
trust_remote_code=True
)

# 添加医疗领域特殊标记
special_tokens = {
"additional_special_tokens": [
"[MEDICAL]",
"[DIAGNOSIS]",
"[TREATMENT]",
"[PRESCRIPTION]"
]
}

self.tokenizer.add_special_tokens(special_tokens)
self.model.resize_token_embeddings(len(self.tokenizer))

def train_medical_llm(self, epochs=3, batch_size=8):
"""训练医疗LLM"""
from transformers import Trainer, TrainingArguments

# 准备数据集
train_dataset = self._prepare_dataset(self.medical_data['train'])
eval_dataset = self._prepare_dataset(self.medical_data['eval'])

# 训练参数
training_args = TrainingArguments(
output_dir="./medical_llm",
num_train_epochs=epochs,
per_device_train_batch_size=batch_size,
per_device_eval_batch_size=batch_size,
warmup_steps=100,
weight_decay=0.01,
logging_dir="./logs",
logging_steps=50,
evaluation_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
metric_for_best_model="eval_loss",
greater_is_better=False,
fp16=True
)

# 创建训练器
trainer = Trainer(
model=self.model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
tokenizer=self.tokenizer
)

# 开始训练
trainer.train()

# 保存模型
trainer.save_model()

return self.model

7.2 法律文档生成微调

法律数据准备

class LegalDataProcessor:
def __init__(self, data_path):
self.data_path = data_path
self.processed_data = None

def process_legal_data(self):
"""处理法律数据"""
# 加载数据
data = pd.read_json(self.data_path, lines=True)

# 数据清洗
data = self._clean_legal_data(data)

# 格式化
formatted_data = self._format_legal_data(data)

# 划分数据集
train_data, eval_data = self._split_dataset(formatted_data)

self.processed_data = {
'train': train_data,
'eval': eval_data
}

return self.processed_data

def _clean_legal_data(self, data):
"""清洗法律数据"""
# 去除空值
data = data.dropna(subset=['case_facts', 'legal_analysis', 'judgment'])

# 标准化文本
data['case_facts'] = data['case_facts'].str.strip()
data['legal_analysis'] = data['legal_analysis'].str.strip()
data['judgment'] = data['judgment'].str.strip()

# 去除重复项
data = data.drop_duplicates(subset=['case_facts'])

return data

def _format_legal_data(self, data):
"""格式化法律数据"""
formatted_data = []

for _, row in data.iterrows():
formatted_item = {
'instruction': "请根据以下案件事实生成法律分析意见:",
'input': f"案件事实:{row['case_facts']}",
'output': f"法律分析:{row['legal_analysis']}\n\n判决结果:{row['judgment']}"
}
formatted_data.append(formatted_item)

return formatted_data

法律文档生成训练

class LegalDocumentGenerator:
def __init__(self, model_name, legal_data):
self.model_name = model_name
self.legal_data = legal_data
self.model = None
self.tokenizer = None

def setup_model(self):
"""设置模型"""
from transformers import AutoModelForCausalLM, AutoTokenizer

# 加载模型和tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
self.model = AutoModelForCausalLM.from_pretrained(
self.model_name,
torch_dtype=torch.bfloat16,
device_map="auto",
trust_remote_code=True
)

# 设置法律文档格式模板
self.document_template = """法律文书生成

案件类型:{case_type}
审理法院:{court_name}
案号:{case_number}

案件事实:
{case_facts}

争议焦点:
{disputes}

法律分析:
{legal_analysis}

判决结果:
{judgment}

审判长:{judge}
审判日期:{trial_date}"""

def train_legal_generator(self, epochs=5, batch_size=4):
"""训练法律文档生成器"""
from transformers import Trainer, TrainingArguments

# 准备数据集
train_dataset = self._prepare_dataset(self.legal_data['train'])
eval_dataset = self._prepare_dataset(self.legal_data['eval'])

# 训练参数
training_args = TrainingArguments(
output_dir="./legal_document_generator",
num_train_epochs=epochs,
per_device_train_batch_size=batch_size,
per_device_eval_batch_size=batch_size,
warmup_steps=200,
weight_decay=0.01,
logging_dir="./logs",
logging_steps=100,
evaluation_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
metric_for_best_model="eval_loss",
greater_is_better=False,
fp16=True
)

# 创建训练器
trainer = Trainer(
model=self.model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
tokenizer=self.tokenizer
)

# 开始训练
trainer.train()

# 保存模型
trainer.save_model()

return self.model

def generate_legal_document(self, case_facts, case_type, court_name,
case_number, disputes, judge, trial_date):
"""生成法律文档"""
prompt = self.document_template.format(
case_type=case_type,
court_name=court_name,
case_number=case_number,
case_facts=case_facts,
disputes=disputes,
legal_analysis="", # 模型生成
judgment="", # 模型生成
judge=judge,
trial_date=trial_date
)

# 生成文档
with torch.no_grad():
outputs = self.model.generate(
self.tokenizer.encode(prompt, return_tensors="pt").to(self.model.device),
max_length=2000,
num_return_sequences=1,
temperature=0.7,
top_p=0.9
)

document = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
return document

8. 总结与展望

8.1 微调最佳实践

  1. 数据质量是关键:高质量、多样化的训练数据是成功微调的基础
  2. 模型选择要合适:根据任务需求和资源条件选择合适的预训练模型
  3. 参数调整要谨慎:学习率、批大小等参数对训练效果影响很大
  4. 评估要全面:结合自动评估和人工评估,确保模型质量
  5. 优化要持续:不断迭代优化模型,提高生成质量

8.2 未来发展方向

  1. 参数高效微调技术:发展更高效的微调方法,减少计算资源需求
  2. 多任务微调:支持同时学习多个任务,提高模型通用性
  3. 持续学习:实现模型的持续学习和适应,保持知识更新
  4. 个性化微调:根据用户偏好进行个性化微调
  5. 伦理安全:在微调过程中注重伦理和安全问题

8.3 学习资源推荐

  1. 官方文档:Hugging Face、PyTorch官方文档
  2. 开源项目:GitHub上的开源LLM微调项目
  3. 学术论文:关注最新的微调技术和方法论文
  4. 社区交流:参与相关技术社区的讨论和分享
  5. 实践项目:通过实际项目积累经验

通过本指南的学习,开发者应该能够掌握大语言模型微调的核心技术和实践方法,为构建高质量的生成式AI应用打下坚实基础。在实际应用中,还需要不断学习和实践,不断探索和创新,才能在快速发展的AI领域中保持竞争力。