生成式AI应用开发指南 1. 生成式AI概述 1.1 什么是生成式AI 生成式AI是指能够生成新内容的AI系统,它学习了大量数据模式后,能够创造出与训练数据相似但全新的内容。与判别式AI不同,生成式AI专注于内容的生成和创造。
核心特点:
1.2 主要类型 文本生成 大语言模型(LLM):如GPT系列、Claude、LLaMA等 对话系统:ChatGPT、Gemini等 文本摘要:自动生成文章摘要 代码生成:辅助编程和代码补全 图像生成 扩散模型:Stable Diffusion、DALL-E 2 GAN网络:StyleGAN、BigGAN 图像编辑:图像修复、风格迁移 音频生成 语音合成:文本转语音 音乐生成:编曲、作曲 音效生成:环境音、音效设计 视频生成 视频编辑:智能剪辑 动画生成:关键帧动画 视频修复:超分辨率、去抖动 2. 技术栈选择 2.1 模型选择策略 开源模型 from transformers import AutoModelForCausalLM, AutoTokenizermodel_name = "meta-llama/Llama-2-7b-chat-hf" tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForCausalLM.from_pretrained(model_name) model.config.max_length = 4096 model.config.temperature = 0.7
云服务API OpenAI API:GPT-4、DALL-E Google AI:Gemini、PaLM Anthropic:Claude系列 国内服务:文心一言、讯飞星火 混合架构 class HybridGenerativeAI : def __init__ (self ): self.local_model = None self.api_clients = {} self.caching_layer = None def route_request (self, request_type, content ): """智能路由请求到最适合的处理方式""" if self._should_use_local(content): return self._process_local(content) else : return self._process_api(content)
2.2 开发框架选择 from transformers import pipelinetext_generator = pipeline( "text-generation" , model="gpt2" , device=0 ) image_generator = pipeline( "text-to-image" , model="stabilityai/stable-diffusion-2-1" )
LangChain框架 from langchain.prompts import PromptTemplatefrom langchain.chains import LLMChainfrom langchain.llms import OpenAIprompt_template = PromptTemplate( input_variables=["topic" , "length" ], template="关于{topic}的{length}字介绍:" ) llm = OpenAI(temperature=0.7 ) chain = LLMChain(llm=llm, prompt=prompt_template) result = chain.run(topic="人工智能" , length="500" )
自定义框架 class GenerativeAIFramework : def __init__ (self, config ): self.config = config self.models = {} self.prompts = {} self.memory = MemorySystem() def register_model (self, name, model_config ): """注册模型""" self.models[name] = self._load_model(model_config) def generate_response (self, prompt, context=None ): """生成响应""" processed_prompt = self._process_prompt(prompt, context) model = self._select_model(processed_prompt) response = model.generate(processed_prompt) return self._post_process(response)
3. 数据处理与预处理 3.1 数据收集与清洗 数据来源 公开数据集:Common Crawl、Wikipedia、GitHub 专业数据:医疗、法律、金融等专业领域数据 用户数据:用户交互、反馈数据 数据清洗流程 import reimport pandas as pdfrom text_preprocessing import TextCleanerclass DataPreprocessor : def __init__ (self, config ): self.config = config self.cleaner = TextCleaner() def clean_text (self, text ): """文本清洗""" text = re.sub(r'<[^>]+>' , '' , text) text = re.sub(r'[^\w\s一-鿿]' , '' , text) text = ' ' .join(text.split()) text = self.cleaner.remove_stopwords(text) return text def filter_quality (self, text, min_length=10 , max_length=512 ): """质量控制""" if len (text) < min_length or len (text) > max_length: return False if self._contains_inappropriate_content(text): return False return True def process_dataset (self, dataset_path ): """处理数据集""" df = pd.read_csv(dataset_path) df['cleaned_text' ] = df['text' ].apply(self.clean_text) df = df[df['cleaned_text' ].apply( lambda x: self.filter_quality(x) )] df = df.drop_duplicates() return df
3.2 数据增强 数据增强技术 import nlpaug.augmenter as naafrom sklearn.utils import shuffleclass DataAugmentor : def __init__ (self ): self.augmenters = { 'synonym' : naa.SynonymAug(), 'insert' : naa.ContextualWordEmbsAug(), 'swap' : naa.RandomWordAug(action="swap" ), 'delete' : naa.RandomWordAug(action="delete" ) } def augment_text (self, text, augment_type='synonym' , n_augment=3 ): """文本增强""" if augment_type not in self.augmenters: return [text] augmenter = self.augmenters[augment_type] augmented_texts = augmenter.augment(text, n=n_augment) return augmented_texts def create_balanced_dataset (self, df, target_ratio=0.8 ): """创建平衡数据集""" class_counts = df['label' ].value_counts() max_count = class_counts.max () balanced_data = [] for label in class_counts.index: class_data = df[df['label' ] == label] current_count = len (class_data) if current_count < max_count * target_ratio: n_augment = int (max_count * target_ratio - current_count) samples_to_augment = class_data.sample(n=min (n_augment, len (class_data))) for _, row in samples_to_augment.iterrows(): augmented_text = self.augment_text(row['text' ]) for aug_text in augmented_text: balanced_data.append({ 'text' : aug_text, 'label' : label }) balanced_df = pd.DataFrame(balanced_data) balanced_df = pd.concat([df, balanced_df], ignore_index=True ) balanced_df = shuffle(balanced_df) return balanced_df
3.3 数据标注 自动标注 class AutoLabeler : def __init__ (self, labeling_model ): self.model = labeling_model self.label_schema = {} def set_label_schema (self, schema ): """设置标签架构""" self.label_schema = schema def predict_labels (self, texts, confidence_threshold=0.8 ): """预测标签""" results = [] for text in texts: predictions = self.model.predict(text) filtered_predictions = { label: score for label, score in predictions.items() if score >= confidence_threshold } results.append({ 'text' : text, 'predictions' : filtered_predictions, 'needs_review' : len (filtered_predictions) == 0 }) return results def active_learning (self, unlabeled_data, n_samples=100 ): """主动学习选择样本""" uncertainties = [] for item in unlabeled_data: predictions = item['predictions' ] entropy = self._calculate_entropy(predictions) uncertainties.append({ 'item' : item, 'entropy' : entropy }) uncertainties.sort(key=lambda x: x['entropy' ], reverse=True ) return [u['item' ] for u in uncertainties[:n_samples]]
4. 模型训练与优化 4.1 模型微调 LoRA微调 import torchfrom transformers import TrainingArguments, Trainerfrom peft import LoraConfig, get_peft_modelclass LoRATrainer : def __init__ (self, model_name, config ): self.model_name = model_name self.config = config self.model = None self.tokenizer = None def setup_lora (self ): """设置LoRA""" self.model = AutoModelForCausalLM.from_pretrained(self.model_name) self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) lora_config = LoraConfig( r=self.config['r' ], lora_alpha=self.config['alpha' ], target_modules=self.config['target_modules' ], lora_dropout=self.config['dropout' ] ) self.model = get_peft_model(self.model, lora_config) for param in self.model.base_model.parameters(): param.requires_grad = False def train (self, train_dataset, eval_dataset ): """训练模型""" training_args = TrainingArguments( output_dir=self.config['output_dir' ], num_train_epochs=self.config['epochs' ], per_device_train_batch_size=self.config['batch_size' ], per_device_eval_batch_size=self.config['eval_batch_size' ], warmup_steps=self.config['warmup_steps' ], weight_decay=self.config['weight_decay' ], logging_dir=self.config['logging_dir' ], logging_steps=self.config['logging_steps' ], save_steps=self.config['save_steps' ], eval_steps=self.config['eval_steps' ], learning_rate=self.config['learning_rate' ] ) trainer = Trainer( model=self.model, args=training_args, train_dataset=train_dataset, eval_dataset=eval_dataset ) trainer.train() return trainer def save_model (self, path ): """保存模型""" self.model.save_pretrained(path) self.tokenizer.save_pretrained(path)
训练数据处理 from torch.utils.data import Dataset, DataLoaderclass GenerativeDataset (Dataset ): def __init__ (self, texts, tokenizer, max_length=512 ): self.texts = texts self.tokenizer = tokenizer self.max_length = max_length def __len__ (self ): return len (self.texts) def __getitem__ (self, idx ): text = self.texts[idx] encoding = self.tokenizer( text, truncation=True , padding='max_length' , max_length=self.max_length, return_tensors='pt' ) return { 'input_ids' : encoding['input_ids' ].squeeze(), 'attention_mask' : encoding['attention_mask' ].squeeze(), 'labels' : encoding['input_ids' ].squeeze() } def create_data_loader (texts, tokenizer, batch_size=8 , max_length=512 ): """创建数据加载器""" dataset = GenerativeDataset(texts, tokenizer, max_length) data_loader = DataLoader( dataset, batch_size=batch_size, shuffle=True ) return data_loader
4.2 模型评估 评估指标 import numpy as npfrom sklearn.metrics import accuracy_score, precision_recall_fscore_supportclass ModelEvaluator : def __init__ (self ): self.metrics = {} def calculate_perplexity (self, model, test_data ): """计算困惑度""" total_loss = 0 total_tokens = 0 for batch in test_data: outputs = model( input_ids=batch['input_ids' ], labels=batch['labels' ] ) loss = outputs.loss tokens = batch['input_ids' ].numel() total_loss += loss.item() * tokens total_tokens += tokens avg_loss = total_loss / total_tokens perplexity = np.exp(avg_loss) return perplexity def evaluate_text_quality (self, generated_texts, reference_texts ): """评估生成文本质量""" bleu_scores = [] rouge_scores = [] similarity_scores = [] for gen_text, ref_text in zip (generated_texts, reference_texts): bleu = self._calculate_bleu(gen_text, ref_text) rouge = self._calculate_rouge(gen_text, ref_text) similarity = self._calculate_similarity(gen_text, ref_text) bleu_scores.append(bleu) rouge_scores.append(rouge) similarity_scores.append(similarity) return { 'bleu' : np.mean(bleu_scores), 'rouge' : np.mean(rouge_scores), 'similarity' : np.mean(similarity_scores) } def benchmark_models (self, models, test_data ): """基准测试多个模型""" results = {} for model_name, model in models.items(): time_scores = self._measure_inference_time(model, test_data) quality_scores = self.evaluate_text_quality( model.generate(test_data), test_data['references' ] ) results[model_name] = { 'performance' : time_scores, 'quality' : quality_scores } return results
4.3 模型优化 量化技术 from transformers import BitsAndBytesConfigclass ModelOptimizer : def __init__ (self ): self.quantization_configs = { 'int8' : BitsAndBytesConfig( load_in_8bit=True , bnb_4bit_use_double_quant=True , bnb_4bit_quant_type='nf4' , bnb_4bit_compute_dtype=torch.bfloat16 ), 'int4' : BitsAndBytesConfig( load_in_4bit=True , bnb_4bit_use_double_quant=True , bnb_4bit_quant_type='nf4' , bnb_4bit_compute_dtype=torch.bfloat16 ) } def quantize_model (self, model, quant_type='int8' ): """量化模型""" if quant_type in self.quantization_configs: config = self.quantization_configs[quant_type] model = model.to('cuda' ) model = quantize_model(model, config) return model else : raise ValueError(f"Unsupported quantization type: {quant_type} " ) def optimize_memory (self, model ): """优化内存使用""" torch.cuda.empty_cache() if torch.cuda.device_count() > 1 : model = torch.nn.DataParallel(model) return model def prune_model (self, model, pruning_ratio=0.1 ): """剪枝模型""" parameters = list (model.named_parameters()) importances = {} for name, param in parameters: if 'weight' in name: importance = torch.abs (param).mean() importances[name] = importance sorted_importances = sorted ( importances.items(), key=lambda x: x[1 ], reverse=True ) pruned_model = copy.deepcopy(model) for name, importance in sorted_importances: if len (sorted_importances) * pruning_ratio < 0 : break param = pruned_model.get_parameter(name) mask = torch.abs (param) > importance * 0.1 param.data[~mask] = 0 return pruned_model
5. 应用架构设计 5.1 微服务架构 API服务设计 from fastapi import FastAPI, HTTPExceptionfrom fastapi.middleware.cors import CORSMiddlewarefrom pydantic import BaseModelimport uvicornclass GenerationRequest (BaseModel ): prompt: str max_length: int = 512 temperature: float = 0.7 top_p: float = 0.9 model: str = "default" class GenerationResponse (BaseModel ): text: str tokens: int time_taken: float model_used: str class GenerativeAIService : def __init__ (self, model_registry ): self.model_registry = model_registry self.cache = {} def generate_text (self, request: GenerationRequest ): """生成文本""" cache_key = self._generate_cache_key(request) if cache_key in self.cache: return self.cache[cache_key] model = self.model_registry.get_model(request.model) if not model: raise HTTPException(status_code=404 , detail="Model not found" ) start_time = time.time() result = model.generate( request.prompt, max_length=request.max_length, temperature=request.temperature, top_p=request.top_p ) end_time = time.time() response = GenerationResponse( text=result, tokens=len (result.split()), time_taken=end_time - start_time, model_used=request.model ) self.cache[cache_key] = response return response def _generate_cache_key (self, request: GenerationRequest ): """生成缓存键""" return hashlib.md5( f"{request.prompt} _{request.temperature} _{request.top_p} " .encode() ).hexdigest() app = FastAPI(title="Generative AI API" ) app.add_middleware( CORSMiddleware, allow_origins=["*" ], allow_credentials=True , allow_methods=["*" ], allow_headers=["*" ], ) model_registry = ModelRegistry() service = GenerativeAIService(model_registry) @app.post("/generate" , response_model=GenerationResponse ) async def generate_text (request: GenerationRequest ): """文本生成接口""" return service.generate_text(request) @app.post("/batch-generate" ) async def batch_generate (requests: List [GenerationRequest] ): """批量生成接口""" results = [] for req in requests: result = await generate_text(req) results.append(result) return results @app.get("/models" ) async def list_models (): """列出可用模型""" return {"models" : model_registry.list_models()}
负载均衡 from load_balancer import LoadBalancerfrom health_checker import HealthCheckerclass AIApplicationService : def __init__ (self, config ): self.config = config self.load_balancer = LoadBalancer(config['servers' ]) self.health_checker = HealthChecker() self.circuit_breaker = CircuitBreaker( failure_threshold=5 , recovery_timeout=30 ) def handle_request (self, request ): """处理请求""" if self.circuit_breaker.is_open(): raise HTTPException(status_code=503 , detail="Service unavailable" ) healthy_servers = self.health_checker.get_healthy_servers() if not healthy_servers: raise HTTPException(status_code=503 , detail="No healthy servers available" ) server = self.load_balancer.select_server(healthy_servers) try : result = self._forward_request(server, request) self.circuit_breaker.record_success() return result except Exception as e: self.circuit_breaker.record_failure() raise HTTPException(status_code=500 , detail="Internal server error" ) def _forward_request (self, server, request ): """转发请求到指定服务器""" pass
5.2 缓存策略 多层缓存设计 import redisfrom cachetools import TTLCacheclass MultiLevelCache : def __init__ (self, config ): self.config = config self.memory_cache = TTLCache( maxsize=config['memory_size' ], ttl=config['memory_ttl' ] ) self.redis_cache = redis.Redis( host=config['redis_host' ], port=config['redis_port' ], db=config['redis_db' ] ) self.disk_cache = DiskCache(config['disk_cache_path' ]) def get (self, key ): """获取缓存数据""" if key in self.memory_cache: return self.memory_cache[key] redis_value = self.redis_cache.get(key) if redis_value: value = json.loads(redis_value) self.memory_cache[key] = value return value disk_value = self.disk_cache.get(key) if disk_value: value = json.loads(disk_value) self.memory_cache[key] = value self.redis_cache.set (key, json.dumps(value)) return value return None def set (self, key, value, ttl=None ): """设置缓存数据""" self.memory_cache[key] = value if ttl: self.redis_cache.setex(key, ttl, json.dumps(value)) else : self.redis_cache.set (key, json.dumps(value)) self.disk_cache.set (key, json.dumps(value)) def invalidate (self, key ): """使缓存失效""" self.memory_cache.pop(key, None ) self.redis_cache.delete(key) self.disk_cache.delete(key) def clear (self ): """清空所有缓存""" self.memory_cache.clear() self.redis_cache.flushdb() self.disk_cache.clear()
5.3 监控与日志 性能监控 import prometheus_clientfrom prometheus_client import Counter, Histogram, Gaugeclass AIMonitor : def __init__ (self ): self.request_counter = Counter( 'ai_requests_total' , 'Total number of AI requests' , ['model' , 'endpoint' ] ) self.response_time = Histogram( 'ai_response_time_seconds' , 'AI response time in seconds' , ['model' , 'endpoint' ] ) self.error_counter = Counter( 'ai_errors_total' , 'Total number of AI errors' , ['error_type' , 'model' ] ) self.active_requests = Gauge( 'ai_active_requests' , 'Number of active AI requests' ) def record_request (self, model, endpoint, duration, success=True ): """记录请求""" self.request_counter.labels(model=model, endpoint=endpoint).inc() if success: self.response_time.labels(model=model, endpoint=endpoint).observe(duration) else : self.error_counter.labels( error_type='generation_failure' , model=model ).inc() def track_active_request (self, delta ): """跟踪活跃请求数""" self.active_requests.inc(delta) class MonitoringMiddleware : def __init__ (self, monitor ): self.monitor = monitor async def __call__ (self, request, call_next ): start_time = time.time() try : self.monitor.track_active_request(1 ) response = await call_next(request) duration = time.time() - start_time self.monitor.record_request( model=request.get('model' , 'default' ), endpoint=request.get('endpoint' , 'unknown' ), duration=duration, success=True ) return response except Exception as e: duration = time.time() - start_time self.monitor.record_request( model=request.get('model' , 'default' ), endpoint=request.get('endpoint' , 'unknown' ), duration=duration, success=False ) raise finally : self.monitor.track_active_request(-1 )
6. 部署与运维 6.1 容器化部署 Docker配置 FROM python:3.9 -slimWORKDIR /app RUN apt-get update && apt-get install -y \ gcc \ g++ \ git \ && rm -rf /var/lib/apt/lists/* COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . ENV PYTHONPATH=/appENV MODEL_PATH=/app/modelsEXPOSE 8000 CMD ["uvicorn" , "main:app" , "--host" , "0.0.0.0" , "--port" , "8000" ]
Docker Compose配置 version: '3.8' services: ai-service: build: . ports: - "8000:8000" environment: - MODEL_PATH=/app/models - REDIS_URL=redis://redis:6379 volumes: - ./models:/app/models - ./logs:/app/logs depends_on: - redis - postgres restart: unless-stopped redis: image: redis:6.2-alpine ports: - "6379:6379" volumes: - redis_data:/data restart: unless-stopped postgres: image: postgres:13 environment: POSTGRES_DB: ai_app POSTGRES_USER: ai_user POSTGRES_PASSWORD: ai_password volumes: - postgres_data:/var/lib/postgresql/data ports: - "5432:5432" restart: unless-stopped nginx: image: nginx:alpine ports: - "80:80" - "443:443" volumes: - ./nginx.conf:/etc/nginx/nginx.conf - ./ssl:/etc/nginx/ssl depends_on: - ai-service restart: unless-stopped volumes: redis_data: postgres_data:
6.2 Kubernetes部署 Kubernetes配置 apiVersion: apps/v1 kind: Deployment metadata: name: ai-service spec: replicas: 3 selector: matchLabels: app: ai-service template: metadata: labels: app: ai-service spec: containers: - name: ai-service image: ai-service:latest ports: - containerPort: 8000 env: - name: MODEL_PATH value: "/models" - name: REDIS_URL value: "redis://redis-service:6379" resources: requests: memory: "4Gi" cpu: "2" limits: memory: "8Gi" cpu: "4" volumeMounts: - name: models mountPath: /models - name: logs mountPath: /app/logs volumes: - name: models persistentVolumeClaim: claimName: models-pvc - name: logs persistentVolumeClaim: claimName: logs-pvc
Helm Chart replicaCount: 3 image: repository: ai-service pullPolicy: IfNotPresent tag: latest service: type: LoadBalancer port: 80 targetPort: 8000 resources: limits: cpu: 4 memory: 8Gi requests: cpu: 2 memory: 4Gi persistence: enabled: true size: 100Gi accessMode: ReadWriteOnce autoscaling: enabled: true minReplicas: 3 maxReplicas: 10 targetCPUUtilizationPercentage: 70 targetMemoryUtilizationPercentage: 80
6.3 CI/CD流程 GitHub Actions工作流 name: AI Service CI/CD on: push: branches: [ main , develop ] pull_request: branches: [ main ] jobs: test: runs-on: ubuntu-latest strategy: matrix: python-version: [3.9 , 3.10 ] steps: - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v3 with: python-version: ${{ matrix.python-version }} - name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements.txt pip install -r requirements-dev.txt - name: Run tests run: | pytest tests/ --cov=app --cov-report=xml - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 with: file: ./coverage.xml build: needs: test runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' steps: - uses: actions/checkout@v3 - name: Set up Docker Buildx uses: docker/setup-buildx-action@v2 - name: Login to DockerHub uses: docker/login-action@v2 with: username: ${{ secrets.DOCKER_USERNAME }} password: ${{ secrets.DOCKER_PASSWORD }} - name: Build and push uses: docker/build-push-action@v4 with: context: . push: true tags: ai-service:latest cache-from: type=gha cache-to: type=gha,mode=max deploy: needs: build runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' steps: - uses: actions/checkout@v3 - name: Setup kubectl uses: azure/setup-kubectl@v2 with: version: 'v1.23.0' - name: Configure kubeconfig run: | echo "${{ secrets.KUBE_CONFIG }}" | base64 -d > ~/.kube/config - name: Update Helm values run: | helm upgrade ai-service ./charts/ai-service -f values.yaml - name: Wait for deployment run: | kubectl rollout status deployment/ai-service
7. 安全与合规 7.1 数据安全 数据加密 from cryptography.fernet import Fernetfrom cryptography.hazmat.primitives import hashesfrom cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMACimport osclass DataEncryptor : def __init__ (self, password, salt=None ): self.password = password.encode() self.salt = salt or os.urandom(16 ) self.fernet = self._generate_key() def _generate_key (self ): """生成加密密钥""" kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32 , salt=self.salt, iterations=100000 , ) key = kdf.derive(self.password) return Fernet(key) def encrypt (self, data ): """加密数据""" if isinstance (data, str ): data = data.encode() return self.fernet.encrypt(data) def decrypt (self, encrypted_data ): """解密数据""" return self.fernet.decrypt(encrypted_data).decode() def encrypt_file (self, input_path, output_path ): """加密文件""" with open (input_path, 'rb' ) as f: data = f.read() encrypted_data = self.encrypt(data) with open (output_path, 'wb' ) as f: f.write(encrypted_data) def decrypt_file (self, input_path, output_path ): """解密文件""" with open (input_path, 'rb' ) as f: encrypted_data = f.read() decrypted_data = self.decrypt(encrypted_data) with open (output_path, 'wb' ) as f: f.write(decrypted_data.encode())
访问控制 from fastapi import HTTPException, Dependsfrom fastapi.security import OAuth2PasswordBearerfrom jose import JWTError, jwtfrom passlib.context import CryptContextclass SecurityManager : def __init__ (self, secret_key, algorithm="HS256" ): self.secret_key = secret_key self.algorithm = algorithm self.password_context = CryptContext(schemes=["bcrypt" ], deprecated="auto" ) self.oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token" ) def verify_password (self, plain_password, hashed_password ): """验证密码""" return self.password_context.verify(plain_password, hashed_password) def get_password_hash (self, password ): """获取密码哈希""" return self.password_context.hash (password) def create_access_token (self, data, expires_delta=None ): """创建访问令牌""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else : expire = datetime.utcnow() + timedelta(minutes=15 ) to_encode.update({"exp" : expire}) encoded_jwt = jwt.encode( to_encode, self.secret_key, algorithm=self.algorithm ) return encoded_jwt def verify_token (self, token: str ): """验证令牌""" try : payload = jwt.decode( token, self.secret_key, algorithms=[self.algorithm] ) username: str = payload.get("sub" ) if username is None : raise HTTPException( status_code=401 , detail="Could not validate credentials" ) return username except JWTError: raise HTTPException( status_code=401 , detail="Could not validate credentials" ) def require_role (self, required_role: str ): """要求特定角色""" def role_checker (current_user: dict = Depends(self.get_current_user ) ): if current_user.get("role" ) != required_role: raise HTTPException( status_code=403 , detail="Not enough permissions" ) return current_user return role_checker
7.2 内容安全 内容过滤 class ContentFilter : def __init__ (self, config ): self.config = config self.filters = { 'profanity' : self._check_profanity, 'hate_speech' : self._check_hate_speech, 'sensitive_info' : self._check_sensitive_info, 'copyright' : self._check_copyright } def filter_content (self, text ): """过滤内容""" results = {} for filter_name, filter_func in self.filters.items(): try : result = filter_func(text) results[filter_name] = result except Exception as e: results[filter_name] = {'error' : str (e)} return results def _check_profanity (self, text ): """检查亵渎内容""" profanity_words = ['bad_word1' , 'bad_word2' ] found_words = [word for word in profanity_words if word in text.lower()] return { 'is_profanity' : len (found_words) > 0 , 'found_words' : found_words, 'severity' : 'high' if len (found_words) > 5 else 'medium' } def _check_hate_speech (self, text ): """检查仇恨言论""" hate_patterns = [ r'hate_pattern1' , r'hate_pattern2' ] found_patterns = [] for pattern in hate_patterns: if re.search(pattern, text, re.IGNORECASE): found_patterns.append(pattern) return { 'is_hate_speech' : len (found_patterns) > 0 , 'found_patterns' : found_patterns, 'severity' : 'high' if len (found_patterns) > 3 else 'medium' } def _check_sensitive_info (self, text ): """检查敏感信息""" sensitive_patterns = { 'phone' : r'\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b' , 'email' : r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' , 'id_card' : r'\b\d{17}[\dXx]\b' } found_info = {} for info_type, pattern in sensitive_patterns.items(): matches = re.findall(pattern, text) if matches: found_info[info_type] = matches return { 'contains_sensitive_info' : len (found_info) > 0 , 'found_info' : found_info } def _check_copyright (self, text ): """检查版权内容""" return { 'is_copyright_violation' : False , 'similarity_score' : 0.0 }
8. 未来发展趋势 8.1 技术发展趋势 多模态融合 class MultimodalAI : def __init__ (self, text_model, image_model, audio_model ): self.text_model = text_model self.image_model = image_model self.audio_model = audio_model self.fusion_model = self._create_fusion_model() def _create_fusion_model (self ): """创建融合模型""" class FusionModel (nn.Module): def __init__ (self, text_dim, image_dim, audio_dim ): super ().__init__() self.text_encoder = nn.Linear(text_dim, 256 ) self.image_encoder = nn.Linear(image_dim, 256 ) self.audio_encoder = nn.Linear(audio_dim, 256 ) self.fusion_layer = nn.MultiheadAttention( embed_dim=256 , num_heads=8 ) self.output_layer = nn.Linear(256 , 512 ) def forward (self, text, image, audio ): text_emb = self.text_encoder(text) image_emb = self.image_encoder(image) audio_emb = self.audio_encoder(audio) combined = torch.stack([text_emb, image_emb, audio_emb], dim=1 ) fused, _ = self.fusion_layer(combined, combined, combined) output = self.output_layer(fused[:, 0 , :]) return output return FusionModel( text_dim=768 , image_dim=1024 , audio_dim=512 ) def process_multimodal_input (self, text_input, image_input, audio_input ): """处理多模态输入""" text_features = self.text_model.process(text_input) image_features = self.image_model.process(image_input) audio_features = self.audio_model.process(audio_input) fused_features = self.fusion_model( text_features, image_features, audio_features ) return fused_features
自监督学习 class SelfSupervisedLearning : def __init__ (self, model_config ): self.model_config = model_config self.model = self._create_model() def _create_model (self ): """创建自监督学习模型""" class SSLModel (nn.Module): def __init__ (self, vocab_size, hidden_size ): super ().__init__() self.encoder = nn.Embedding(vocab_size, hidden_size) self.mask_lm = nn.Linear(hidden_size, vocab_size) self.contrastive_head = nn.Linear(hidden_size, hidden_size) def forward (self, input_ids, masked_positions ): hidden = self.encoder(input_ids) lm_logits = self.mask_lm(hidden[masked_positions]) contrastive_features = self.contrastive_head(hidden) return { 'lm_logits' : lm_logits, 'contrastive_features' : contrastive_features } return SSLModel( vocab_size=self.model_config['vocab_size' ], hidden_size=self.model_config['hidden_size' ] ) def train_ssl (self, unlabeled_data ): """自监督训练""" for batch in unlabeled_data: masked_positions = self._mask_random_positions(batch['input_ids' ]) outputs = self.model( batch['input_ids' ], masked_positions ) mlm_loss = self._compute_mlm_loss( outputs['lm_logits' ], batch['input_ids' ][masked_positions] ) contrastive_loss = self._compute_contrastive_loss( outputs['contrastive_features' ] ) total_loss = mlm_loss + 0.5 * contrastive_loss total_loss.backward() self.optimizer.step() self.optimizer.zero_grad()
8.2 应用场景扩展 智能客服 class IntelligentCustomerService : def __init__ (self, config ): self.config = config self.nlu_engine = NLUEngine(config['nlu' ]) self.dialog_manager = DialogManager(config['dialog' ]) self.knowledge_base = KnowledgeBase(config['kb' ]) self.llm = LanguageModel(config['llm' ]) def process_user_input (self, user_input, user_context ): """处理用户输入""" intent = self.nlu_engine.detect_intent(user_input) dialog_state = self.dialog_manager.update_state( intent, user_context ) knowledge = self.knowledge_base.search( user_input, intent, dialog_state ) response = self.llm.generate_response( user_input, knowledge, dialog_state ) processed_response = self._post_process_response(response) return { 'response' : processed_response, 'intent' : intent, 'dialog_state' : dialog_state, 'confidence' : self._calculate_confidence(intent, response) } def handle_multiturn_conversation (self, conversation_history ): """处理多轮对话""" dialog_state = self._initialize_dialog_state() for turn in conversation_history: result = self.process_user_input( turn['user_input' ], dialog_state ) dialog_state.update(result['dialog_state' ]) turn['system_response' ] = result['response' ] return conversation_history
创意内容生成 class CreativeContentGenerator : def __init__ (self, config ): self.config = config self.models = { 'text' : TextGenerationModel(config['text' ]), 'image' : ImageGenerationModel(config['image' ]), 'music' : MusicGenerationModel(config['music' ]) } self.creative_prompt_engine = CreativePromptEngine() def generate_creative_content (self, idea, style, constraints=None ): """生成创意内容""" prompt = self.creative_prompt_engine.create_prompt( idea, style, constraints ) text_content = self.models['text' ].generate(prompt['text' ]) image_content = self.models['image' ].generate(prompt['image' ]) music_content = self.models['music' ].generate(prompt['music' ]) optimized_content = self._optimize_content( text_content, image_content, music_content ) return { 'text' : optimized_content['text' ], 'image' : optimized_content['image' ], 'music' : optimized_content['music' ], 'metadata' : { 'idea' : idea, 'style' : style, 'generation_time' : time.time() } } def generate_story (self, genre, characters, plot_elements ): """生成故事""" outline = self._create_story_outline(genre, characters, plot_elements) chapters = [] for chapter_outline in outline['chapters' ]: chapter_content = self.models['text' ].generate( chapter_outline, style='narrative' ) chapters.append(chapter_content) chapter_images = [] for chapter in chapters: image_prompt = self._extract_image_prompt(chapter) image_content = self.models['image' ].generate(image_prompt) chapter_images.append(image_content) return { 'chapters' : chapters, 'images' : chapter_images, 'outline' : outline }
9. 总结与最佳实践 9.1 开发建议 模块化设计 :将系统分解为独立的模块,便于维护和扩展性能优化 :合理使用缓存、批处理和并行计算监控告警 :建立完善的监控体系,及时发现和解决问题安全防护 :实施必要的安全措施,保护数据和用户隐私持续改进 :收集用户反馈,持续优化模型和应用体验9.2 技术选型建议 框架选择 :根据项目规模和团队技术栈选择合适的框架模型选择 :平衡性能、成本和效果,选择合适的模型部署方式 :根据业务需求选择公有云、私有云或混合部署扩展性设计 :考虑未来增长需求,预留扩展空间9.3 未来展望 生成式AI应用开发正处于快速发展阶段,未来将朝着更加智能化、个性化和多模态化的方向发展。开发者需要不断学习新技术,跟上行业发展趋势,才能在这个充满机遇的领域中保持竞争力。
通过本指南的学习,开发者应该能够掌握生成式AI应用开发的核心技术和实践方法,为构建高质量的生成式AI应用打下坚实基础。