A.I. Smart Router — A.I. 智能路由器 — 基于专家评分的智能 AI 模型路由
v0.1.2A.I. 智能路由器是一款基于专家评分的智能 AI 模型路由系统,通过语义域评分、上下文溢出保护和安全编辑,自动选择最优 AI 模型。支持 Claude、GPT、Gemini、Grok 等模型,具有自动回退链、人工干预门控和成本优化。该系统通过三层决策过程(意图检测、复杂度估计、特殊情况覆盖)智能路由请求,确保高效、安全的 AI 模型使用。
详细分析 ▾
运行时依赖
版本
智能路由器 0.1.2 引入状态管理和改进的上下文处理,添加了会话跨状态文档,实现了上下文守护逻辑,更新了路由逻辑以集成状态和上下文检查,并更新了文档以反映新功能和版本更新。
安装命令 点击复制
技能文档
使用分层分类自动将请求路由到最佳AI模型,支持自动回退处理和成本优化。
工作原理(默认静默模式)
路由器透明运行——用户正常发送消息,获得最适合其任务的模型响应。无需特殊命令。
可选可见性:在任何消息中包含 [show routing] 即可查看路由决策。
分层分类系统
路由器使用三层决策流程:
┌─────────────────────────────────────────────────────────────────┐
│ TIER 1: INTENT DETECTION │
│ Classify the primary purpose of the request │
├─────────────────────────────────────────────────────────────────┤
│ CODE │ ANALYSIS │ CREATIVE │ REALTIME │ GENERAL │
│ write/debug │ research │ writing │ news/live │ Q&A/chat │
│ refactor │ explain │ stories │ X/Twitter │ translate │
│ review │ compare │ brainstorm │ prices │ summarize │
└──────┬───────┴──────┬──────┴─────┬──────┴─────┬─────┴─────┬─────┘
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ TIER 2: COMPLEXITY ESTIMATION │
├─────────────────────────────────────────────────────────────────┤
│ SIMPLE (Tier $) │ MEDIUM (Tier $$) │ COMPLEX (Tier $$$)│
│ • One-step task │ • Multi-step task │ • Deep reasoning │
│ • Short response OK │ • Some nuance │ • Extensive output│
│ • Factual lookup │ • Moderate context │ • Critical task │
│ → Haiku/Flash │ → Sonnet/Grok/GPT │ → Opus/GPT-5 │
└──────────────────────────┴─────────────────────┴───────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ TIER 3: SPECIAL CASE OVERRIDES │
├─────────────────────────────────────────────────────────────────┤
│ CONDITION │ OVERRIDE TO │
│ ─────────────────────────────────────┼─────────────────────────│
│ Context >100K tokens │ → Gemini Pro (1M ctx) │
│ Context >500K tokens │ → Gemini Pro ONLY │
│ Needs real-time data │ → Grok (regardless) │
│ Image/vision input │ → Opus or Gemini Pro │
│ User explicit override │ → Requested model │
└──────────────────────────────────────┴──────────────────────────┘
意图检测模式
CODE 意图
- 关键词:write, code, debug, fix, refactor, implement, function, class, script, API, bug, error, compile, test, PR, commit
- 提及的文件扩展名:.py, .js, .ts, .go, .rs, .java 等
- 输入中的代码块
ANALYSIS 意图
- 关键词:analyze, explain, compare, research, understand, why, how does, evaluate, assess, review, investigate, examine
- 长篇问题
- "Help me understand..."
CREATIVE 意图
- 关键词:write (story/poem/essay), create, brainstorm, imagine, design, draft, compose
- 小说/叙事请求
- 营销/文案请求
REALTIME 意图
- 关键词:now, today, current, latest, trending, news, happening, live, price, score, weather
- X/Twitter 提及
- 股票/加密货币代码
- 体育比分
GENERAL 意图(默认)
- 简单问答
- 翻译
- 摘要
- 对话式
MIXED 意图(检测到多个意图)
当请求包含多个明确意图时(例如,"编写代码来分析此数据并创造性地解释它"):- 识别主要意图 — 主要交付成果是什么?
- 路由到最高能力模型 — 混合任务需要多功能性
- 默认为 COMPLEX 复杂度 — 多意图 = 多步骤
示例:
- "Write code AND explain how it works" → CODE(主要)+ ANALYSIS → 路由到 Opus
- "Summarize this AND what's the latest news on it" → REALTIME 优先 → Grok
- "Creative story using real current events" → REALTIME + CREATIVE → Grok(实时优先)
语言处理
非英语请求正常处理——所有支持的模型都有多语言能力:
| Model | Non-English Support |
|---|---|
| Opus/Sonnet/Haiku | Excellent (100+ languages) |
| GPT-5 | Excellent (100+ languages) |
| Gemini Pro/Flash | Excellent (100+ languages) |
| Grok | Good (major languages) |
- 关键词模式包含常见的非英语等价词
- 代码意图通过文件扩展名、代码块检测(语言无关)
- 复杂度通过查询长度估算(跨语言有效)
边缘情况:如果由于语言原因意图不明确,默认为 GENERAL 意图和 MEDIUM 复杂度。
复杂度信号
简单复杂度 ($)
- 短查询(<50字)
- 单个问号
- "Quick question", "Just tell me", "Briefly"
- 是/否格式
- 单位转换、定义
中等复杂度 ($$)
- 中等查询(50-200字)
- 多个方面需要解决
- "Explain", "Describe", "Compare"
- 提供一些上下文
复杂复杂度 ($$$)
- 长查询(>200字)或复杂任务
- "Step by step", "Thoroughly", "In detail"
- 多部分问题
- 关键/重要限定词
- 研究、分析或创意工作
路由矩阵
| Intent | Simple | Medium | Complex |
|---|---|---|---|
| CODE | Sonnet | Opus | Opus |
| ANALYSIS | Flash | GPT-5 | Opus |
| CREATIVE | Sonnet | Opus | Opus |
| REALTIME | Grok | Grok | Grok-3 |
| GENERAL | Flash | Sonnet | Opus |
Token 耗尽与自动模型切换
当模型在会话中途变得不可用时(token配额耗尽、达到速率限制、API错误),路由器自动切换到下一个最佳可用模型并通知用户。
通知格式
当由于耗尽发生模型切换时,用户会收到通知:
┌─────────────────────────────────────────────────────────────────┐
│ ⚠️ MODEL SWITCH NOTICE │
│ │
│ Your request could not be completed on claude-opus-4-5 │
│ (reason: token quota exhausted). │
│ │
│ ✅ Request completed using: anthropic/claude-sonnet-4-5 │
│ │
│ The response below was generated by the fallback model. │
└─────────────────────────────────────────────────────────────────┘
切换原因
| Reason | Description |
|---|---|
token quota exhausted | 达到每日/每月 token 限制 |
rate limit exceeded | 请求过于频繁 |
context window exceeded | 输入对于模型太大 |
API timeout | 模型响应时间过长 |
API error | 提供商返回错误 |
model unavailable | 模型暂时离线 |
实现
def execute_with_fallback(primary_model: str, fallback_chain: list[str], request: str) -> Response:
"""
Execute request with automatic fallback and user notification.
"""
attempted_models = []
switch_reason = None
# Try primary model first
models_to_try = [primary_model] + fallback_chain
for model in models_to_try:
try:
response = call_model(model, request)
# If we switched models, prepend notification
if attempted_models:
notification = build_switch_notification(
failed_model=attempted_models[0],
reason=switch_reason,
success_model=model
)
return Response(
content=notification + "\n\n---\n\n" + response.content,
model_used=model,
switched=True
)
return Response(content=response.content, model_used=model, switched=False)
except TokenQuotaExhausted:
attempted_models.append(model)
switch_reason = "token quota exhausted"
log_fallback(model, switch_reason)
continue
except RateLimitExceeded:
attempted_models.append(model)
switch_reason = "rate limit exceeded"
log_fallback(model, switch_reason)
continue
except ContextWindowExceeded:
attempted_models.append(model)
switch_reason = "context window exceeded"
log_fallback(model, switch_reason)
continue
except APITimeout:
attempted_models.append(model)
switch_reason = "API timeout"
log_fallback(model, switch_reason)
continue
except APIError as e:
attempted_models.append(model)
switch_reason = f"API error: {e.code}"
log_fallback(model, switch_reason)
continue
# All models exhausted
return build_exhaustion_error(attempted_models)
def build_switch_notification(failed_model: str, reason: str, success_model: str) -> str:
"""Build user-facing notification when model switch occurs."""
return f"""⚠️ MODEL SWITCH NOTICE
Your request could not be completed on {failed_model} (reason: {reason}).
✅ Request completed using: {success_model}
The response below was generated by the fallback model."""
def build_exhaustion_error(attempted_models: list[str]) -> Response:
"""Build error when all models are exhausted."""
models_tried = ", ".join(attempted_models)
return Response(
content=f"""❌ REQUEST FAILED
Unable to complete your request. All available models have been exhausted.
Models attempted: {models_tried}
What you can do:
- Wait — Token quotas typically reset hourly or daily
- Simplify — Try a shorter or simpler request
- Check status — Run
/router status to see model availability
If this persists, your human may need to check API quotas or add additional providers.""",
model_used=None,
switched=False,
failed=True
)
Token 耗尽的回退优先级
当模型耗尽时,路由器为相同任务类型选择下一个最佳模型:
| Original Model | Fallback Priority (same capability) |
|---|---|
| Opus | Sonnet → GPT-5 → Grok-3 → Gemini Pro |
| Sonnet | GPT-5 → Grok-3 → Opus → Haiku |
| GPT-5 | Sonnet → Opus → Grok-3 → Gemini Pro |
| Gemini Pro | Flash → GPT-5 → Opus → Sonnet |
| Grok-2/3 | (warn: no real-time fallback available) |
用户确认
模型切换后,代理应在响应中注明:
- 原始模型不可用
- 实际完成请求的模型
- 响应质量可能与原始模型的典型输出不同
这确保了透明度并设定了适当的期望。
流式响应与回退
使用流式响应时,回退处理需要特殊考虑:
async def execute_with_streaming_fallback(primary_model: str, fallback_chain: list[str], request: str):
"""
Handle streaming responses with mid-stream fallback.
If a model fails DURING streaming (not before), the partial response is lost.
Strategy: Don't start streaming until first chunk received successfully.
"""
models_to_try = [primary_model] + fallback_chain
for model in models_to_try:
try:
# Test with non-streaming ping first (optional, adds latency)
# await test_model_availability(model)
# Start streaming
stream = await call_model_streaming(model, request)
first_chunk = await stream.get_first_chunk(timeout=10_000) # 10s timeout for first chunk
# If we got here, model is responding — continue streaming
yield first_chunk
async for chunk in stream:
yield chunk
return # Success
except (FirstChunkTimeout, StreamError) as e:
log_fallback(model, str(e))
continue # Try next model
# All models failed
yield build_exhaustion_error(models_to_try)
关键见解:在提交模型之前等待第一个块。如果第一个块超时,在向用户显示任何部分响应之前进行回退。
重试时间配置
RETRY_CONFIG = {
"initial_timeout_ms": 30_000, # 30s for first attempt
"fallback_timeout_ms": 20_000, # 20s for fallback attempts (faster fail)
"max_retries_per_model": 1, # Don't retry same model
"backoff_multiplier": 1.5, # Not used (no same-model retry)
"circuit_breaker_threshold": 3, # Failures before skipping model entirely
"circuit_breaker_reset_ms": 300_000 # 5 min before trying failed model again
}
断路器:如果模型在 5 分钟内失败 3 次,在接下来的 5 分钟内完全跳过它。这可以防止反复访问宕机的服务。
回退链
当首选模型失败(速率限制、API 宕机、错误)时,级联到下一个选项:
代码任务
Opus → Sonnet → GPT-5 → Gemini Pro
分析任务
Opus → GPT-5 → Gemini Pro → Sonnet
创意任务
Opus → GPT-5 → Sonnet → Gemini Pro
实时任务
Grok-2 → Grok-3 → (warn: no real-time fallback)
常规任务
Flash → Haiku → Sonnet → GPT-5
长上下文(按大小分层)
┌─────────────────────────────────────────────────────────────────┐
│ LONG CONTEXT FALLBACK CHAIN │
├─────────────────────────────────────────────────────────────────┤
│ TOKEN COUNT │ FALLBACK CHAIN │
│ ───────────────────┼───────────────────────────────────────────│
│ 128K - 200K │ Opus (200K) → Sonnet (200K) → Gemini Pro │
│ 200K - 1M │ Gemini Pro → Flash (1M) → ERROR_MESSAGE │
│ > 1M │ ERROR_MESSAGE (no model supports this) │
└─────────────────────┴───────────────────────────────────────────┘
实现:
def handle_long_context(token_count: int, available_models: dict) -> str | ErrorMessage:
"""Route long-context requests with graceful degradation."""
# Tier 1: 128K - 200K tokens (Opus/Sonnet can handle)
if token_count <= 200_000:
for model in ["opus", "sonnet", "haiku", "gemini-pro", "flash"]:
if model in available_models and get_context_limit(model) >= token_count:
return model
# Tier 2: 200K - 1M tokens (only Gemini)
elif token_count <= 1_000_000:
for model in ["gemini-pro", "flash"]:
if model in available_models:
return model
# Tier 3: > 1M tokens (nothing available)
# Fall through to error
# No suitable model found — return helpful error
return build_context_error(token_count, available_models)
def build_context_error(token_count: int, available_models: dict) -> ErrorMessage:
"""Build a helpful error message when no model can handle the input."""
# Find the largest available context window
max_available = max(
(get_context_limit(m) for m in available_models),
default=0
)
# Determine what's missing
missing_models = []
if "gemini-pro" not in available_models and "flash" not in available_models:
missing_models.append("Gemini Pro/Flash (1M context)")
if token_count <= 200_000 and "opus" not in available_models:
missing_models.append("Opus (200K context)")
# Format token count for readability
if token_count >= 1_000_000:
token_display = f"{token_count / 1_000_000:.1f}M"
else:
token_display = f"{token_count // 1000}K"
return ErrorMessage(
title="Context Window Exceeded",
message=f"""Your input is approximately {token_display} tokens, which exceeds the context window of all currently available models.
Required: Gemini Pro (1M context) {"— currently unavailable" if "gemini-pro" not in available_models else ""}
Your max available: {max_available // 1000}K tokens
Options:
- Wait and retry — Gemini may be temporarily down
- Reduce input size — Remove unnecessary content to fit within {max_available // 1000}K tokens
- Split into chunks — I can process your input sequentially in smaller pieces
Would you like me to help split this into manageable chunks?""",
recoverable=True,
suggested_action="split_chunks"
)
示例错误输出:
⚠️ Context Window ExceededYour input is approximately 340K tokens, which exceeds the context
window of all currently available models.
Required: Gemini Pro (1M context) — currently unavailable
Your max available: 200K tokens
Options:
- Wait and retry — Gemini may be temporarily down
- Reduce input size — Remove unnecessary content to fit within 200K tokens
- Split into chunks — I can process your input sequentially in smaller pieces
Would you like me to help split this into manageable chunks?
动态模型发现
路由器在运行时自动检测可用提供商:
1. Check configured auth profiles
- Build available model list from authenticated providers
- Construct routing table using ONLY available models
- If preferred model unavailable, use best available alternative
示例:如果仅配置了 Anthropic 和 Google:
- 代码任务 → Opus(Anthropic 可用 ✓)
- 实时任务 → ⚠️ 没有 Grok → 回退到 Opus + 警告用户
- 长文档 → Gemini Pro(Google 可用 ✓)
成本优化
路由器在复杂度为 LOW 时考虑成本:
| Model | Cost Tier | Use When |
|---|---|---|
| Gemini Flash | $ | Simple tasks, high volume |
| Claude Haiku | $ | Simple tasks, quick responses |
| Claude Sonnet | $$ | Medium complexity |
| Grok 2 | $$ | Real-time needs only |
| GPT-5 | $$ | General fallback |
| Gemini Pro | $$$ | Long context needs |
| Claude Opus | $$$$ | Complex/critical tasks |
用户控制
显示路由决策
在任何消息中添加[show routing]:
[show routing] What's the weather in NYC?
输出包括:
[Routed → xai/grok-2-latest | Reason: REALTIME intent detected | Fallback: none available]
强制特定模型
显式覆盖:- "use grok: ..." → 强制使用 Grok
- "use claude: ..." → 强制使用 Opus
- "use gemini: ..." → 强制使用 Gemini Pro
- "use flash: ..." → 强制使用 Gemini Flash
- "use gpt: ..." → 强制使用 GPT-5
检查路由器状态
询问:"router status" 或 "/router" 查看:- 可用提供商
- 配置的模型
- 当前路由表
- 最近的路由决策
实现说明
代理实现
处理请求时:
1. DETECT available models (check auth profiles)
- CLASSIFY intent (code/analysis/creative/realtime/general)
- ESTIMATE complexity (simple/medium/complex)
- CHECK special cases (context size, vision, explicit override)
- FILTER by cost tier based on complexity ← BEFORE model selection
- SELECT model from filtered pool using routing matrix
- VERIFY model available, else use fallback chain (also cost-filtered)
- EXECUTE request with selected model
- IF failure, try next in fallback chain
- LOG routing decision (for debugging)
成本感知路由流程(关键顺序)
def route_with_fallback(request):
"""
Main routing function with CORRECT execution order.
Cost filtering MUST happen BEFORE routing table lookup.
"""
# Step 1: Discover available models
available_models = discover_providers()
# Step 2: Classify intent
intent = classify_intent(request)
# Step 3: Estimate complexity
complexity = estimate_complexity(request)
# Step 4: Check special-case overrides (these bypass cost filtering)
if user_override := get_user_model_override(request):
return execute_with_fallback(user_override, []) # No cost filter for explicit override
if token_count > 128_000:
return handle_long_context(token_count, available_models) # Special handling
if needs_realtime(request):
return execute_with_fallback("grok-2", ["grok-3"]) # Realtime bypasses cost
# ┌─────────────────────────────────────────────────────────────┐
# │ STEP 5: FILTER BY COST TIER — THIS MUST COME FIRST! │
# │ │
# │ Cost filtering happens BEFORE the routing table lookup, │
# │ NOT after. This ensures "what's 2+2?" never considers │
# │ Opus even momentarily. │
# └─────────────────────────────────────────────────────────────┘
allowed_tiers = get_allowed_tiers(complexity)
# SIMPLE → ["$"]
# MEDIUM → ["$", "$$"]
# COMPLEX → ["$", "$$", "$$$"]
cost_filtered_models = {
model: meta for model, meta in available_models.items()
if COST_TIERS.get(model) in allowed_tiers
}
# Step 6: NOW select from cost-filtered pool using routing preferences
preferences = ROUTING_PREFERENCES.get((intent, complexity), [])
for model in preferences:
if model in cost_filtered_models: # Only consider cost-appropriate models
selected_model = model
break
else:
# No preferred model in cost-filtered pool — use cheapest available
selected_model = select_cheapest(cost_filtered_models)
# Step 7: Build cost-filtered fallback chain
task_type = get_task_type(intent, complexity)
full_chain = MASTER_FALLBACK_CHAINS.get(task_type, [])
filtered_chain = [m for m in full_chain if m in cost_filtered_models and m != selected_model]
# Step 8-10: Execute with fallback + logging
return execute_with_fallback(selected_model, filtered_chain)
def get_allowed_tiers(complexity: str) -> list[str]:
"""Return allowed cost tiers for a given complexity level."""
return {
"SIMPLE": ["$"], # Budget only — no exceptions
"MEDIUM": ["$", "$$"], # Budget + standard
"COMPLEX": ["$", "$$", "$$$", "$$$$"], # All tiers — complex tasks deserve the best
}.get(complexity, ["$", "$$"])
# Example flow for "what's 2+2?":
#
# 1. available_models = {opus, sonnet, haiku, flash, grok-2, ...}
# 2. intent = GENERAL
# 3. complexity = SIMPLE
# 4. (no special cases)
# 5. allowed_tiers = ["$"] ← SIMPLE means $ only
# cost_filtered_models = {haiku, flash, grok-2} ← Opus/Sonnet EXCLUDED
# 6. preferences for (GENERAL, SIMPLE) = [flash, haiku, grok-2, sonnet]
# first match in cost_filtered = flash ✓
# 7. fallback_chain = [haiku, grok-2] ← Also cost-filtered
# 8. execute with flash
#
# Result: Opus is NEVER considered, not even momentarily.
成本优化:两种方法
┌─────────────────────────────────────────────────────────────────┐
│ COST OPTIMIZATION IMPLEMENTATION OPTIONS │
├─────────────────────────────────────────────────────────────────┤
│ │
│ APPROACH 1: Explicit filter_by_cost() (shown above) │
│ ───────────────────────────────────────────────────────────── │
│ • Calls get_allowed_tiers(complexity) explicitly │
│ • Filters available_models BEFORE routing table lookup │
│ • Most defensive — impossible to route wrong tier │
│ • Recommended for security-critical deployments │
│ │
│ APPROACH 2: Preference ordering (implicit) │
│ ───────────────────────────────────────────────────────────── │
│ • ROUTING_PREFERENCES lists cheapest capable models first │
│ • For SIMPLE tasks: [flash, haiku, grok-2, sonnet] │
│ • First available match wins → naturally picks cheapest │
│ • Simpler code, relies on correct preference ordering │
│ │
│ This implementation uses BOTH for defense-in-depth: │
│ • Preference ordering provides first line of cost awareness │
│ • Explicit filter_by_cost() guarantees tier enforcement │
│ │
│ For alternative implementations that rely solely on │
│ preference ordering, see references/models.md for the │
│ filter_by_cost() function if explicit enforcement is needed. │
│ │
└─────────────────────────────────────────────────────────────────┘
使用不同模型生成
使用 sessions_spawn 进行模型路由:
sessions_spawn(
task: "user's request",
model: "selected/model-id",
label: "task-type-query"
)
安全
- 绝不向不可信模型发送敏感数据
- API 密钥仅通过环境/认证配置文件处理
- 完整安全指南见
references/security.md
模型详情
详细功能和定价见 references/models.md。
免费技能或插件可能存在安全风险,如需更匹配、更安全的方案,建议联系付费定制