package main import ( "context" "encoding/json" "errors" "fmt" "log/slog" "net/http" "os" "os/signal" "sync" "syscall" "time" "github.com/google/uuid" ) // MCPRequest MCP请求结构体 type MCPRequest struct { Data interface{} `json:"data"` Type string `json:"type"` Metadata map[string]string `json:"metadata,omitempty"` Timestamp int64 `json:"timestamp"` } // MCPResponse MCP响应结构体 type MCPResponse struct { Success bool `json:"success"` Message string `json:"message,omitempty"` Data interface{} `json:"data,omitempty"` RequestID string `json:"request_id,omitempty"` Timestamp int64 `json:"timestamp"` } // SSEClient SSE客户端连接管理 type SSEClient struct { ID string Channel chan []byte } // SSEManager SSE管理器 type SSEManager struct { clients map[string]*SSEClient mutex sync.Mutex } // NewSSEManager 创建新的SSE管理器 func NewSSEManager() *SSEManager { return &SSEManager{ clients: make(map[string]*SSEClient), } } // AddClient 添加SSE客户端 func (m *SSEManager) AddClient(clientID string) *SSEClient { m.mutex.Lock() defer m.mutex.Unlock() client := &SSEClient{ ID: clientID, Channel: make(chan []byte, 10), } m.clients[clientID] = client return client } // RemoveClient 移除SSE客户端 func (m *SSEManager) RemoveClient(clientID string) { m.mutex.Lock() defer m.mutex.Unlock() if client, exists := m.clients[clientID]; exists { close(client.Channel) delete(m.clients, clientID) } } // Broadcast 广播消息给所有SSE客户端 func (m *SSEManager) Broadcast(message []byte) { m.mutex.Lock() defer m.mutex.Unlock() for _, client := range m.clients { select { case client.Channel <- message: default: // 如果客户端通道已满,跳过 } } } // 格式化时间,处理可能的格式错误 func formatTime(t time.Time, format string) (string, error) { if format == "" { return t.Format(time.RFC3339), nil } // 尝试使用自定义格式 formatted := t.Format(format) if formatted == format { // 如果格式化后的结果与格式字符串相同,说明格式无效 return "", errors.New("无效的时间格式") } return formatted, nil } // 处理获取当前时间请求 func handleGetCurrentTime(data map[string]interface{}) (map[string]interface{}, error) { // 获取当前时间 now := time.Now() // 获取可选的格式参数 format, _ := data["format"].(string) // 格式化时间 formattedTime, err := formatTime(now, format) if err != nil { // 如果格式无效,使用默认格式 formattedTime = now.Format(time.RFC3339) } // 返回结果 result := map[string]interface{}{ "current_time": formattedTime, "timestamp": now.Unix(), } return result, nil } // 处理订阅时间流请求 func handleSubscribeTimeStream(data map[string]interface{}) (map[string]interface{}, error) { // 生成唯一的流ID streamID := uuid.New().String() // 获取可选的间隔参数(默认1秒) interval := 1.0 if intervalVal, ok := data["interval"].(float64); ok { if intervalVal > 0 { interval = intervalVal } } // 获取可选的格式参数 format, _ := data["format"].(string) // 存储订阅信息(实际项目中可能需要更复杂的存储机制) // 这里我们只是返回流信息,实际的SSE连接会在/sse端点建立 // 返回结果 result := map[string]interface{}{ "stream_id": streamID, "sse_url": "http://localhost:8080/sse", "interval": interval, "format": format, } return result, nil } // 处理MCP请求提交 func handleSubmit(w http.ResponseWriter, r *http.Request) { // 检查请求方法 if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // 解析请求体 var request MCPRequest if err := json.NewDecoder(r.Body).Decode(&request); err != nil { response := MCPResponse{ Success: false, Message: "Invalid request body", Timestamp: time.Now().Unix(), } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(response) return } // 生成请求ID requestID := uuid.New().String() // 根据工具类型处理请求 var result map[string]interface{} var err error switch request.Type { case "get_current_time": // 确保data是map类型 dataMap, ok := request.Data.(map[string]interface{}) if !ok { dataMap = make(map[string]interface{}) } result, err = handleGetCurrentTime(dataMap) case "subscribe_time_stream": // 确保data是map类型 dataMap, ok := request.Data.(map[string]interface{}) if !ok { dataMap = make(map[string]interface{}) } result, err = handleSubscribeTimeStream(dataMap) default: err = errors.New("未知的工具类型") } // 构建响应 response := MCPResponse{ Success: err == nil, Message: func() string { if err != nil { return err.Error() } return "" }(), Data: result, RequestID: requestID, Timestamp: time.Now().Unix(), } // 返回响应 w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(response) // 记录日志 slog.Debug("处理MCP请求", "request_id", requestID, "tool_type", request.Type, "success", err == nil) } // 处理SSE连接 func handleSSE(w http.ResponseWriter, r *http.Request, sseManager *SSEManager) { // 设置响应头 w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") // 获取或生成客户端ID clientID := r.URL.Query().Get("client_id") if clientID == "" { clientID = uuid.New().String() } // 添加客户端到管理器 client := sseManager.AddClient(clientID) defer sseManager.RemoveClient(clientID) // 发送初始连接确认事件 fmt.Fprintf(w, "event: connected\ndata: {\"client_id\":\"%s\"}\n\n", clientID) flusher, ok := w.(http.Flusher) if !ok { slog.Error("不支持SSE", "client_id", clientID) return } flusher.Flush() // 获取时间格式参数(如果有) format := r.URL.Query().Get("format") // 获取时间间隔参数(如果有) interval := 1.0 if intervalStr := r.URL.Query().Get("interval"); intervalStr != "" { fmt.Sscanf(intervalStr, "%f", &interval) if interval <= 0 { interval = 1.0 } } // 创建上下文用于取消操作 ctx, cancel := context.WithCancel(r.Context()) defer cancel() // 创建定时器 ticker := time.NewTicker(time.Duration(interval * float64(time.Second))) defer ticker.Stop() slog.Info("SSE连接已建立", "client_id", clientID, "interval", interval, "format", format) // 发送时间更新 for { select { case <-ctx.Done(): // 客户端断开连接 slog.Info("SSE连接已断开", "client_id", clientID) return case now := <-ticker.C: // 格式化时间 var formattedTime string if format == "" { formattedTime = now.Format(time.RFC3339) } else { var err error formattedTime, err = formatTime(now, format) if err != nil { // 如果格式无效,使用默认格式 formattedTime = now.Format(time.RFC3339) } } // 创建时间更新事件 timeData := map[string]interface{}{ "current_time": formattedTime, "timestamp": now.Unix(), "interval": interval, } // 转换为JSON dataJSON, _ := json.Marshal(timeData) // 发送SSE事件 fmt.Fprintf(w, "event: time_update\ndata: %s\n\n", string(dataJSON)) flusher.Flush() slog.Debug("发送SSE时间更新", "client_id", clientID, "time", formattedTime) case message := <-client.Channel: // 发送广播消息 fmt.Fprintf(w, "event: broadcast\ndata: %s\n\n", string(message)) flusher.Flush() } } } // handleHealth 处理健康检查请求 func handleHealth(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("OK")) } func main() { // 设置slog日志,仅输出到控制台,不写入文件 logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) slog.SetDefault(logger) logger.Info("MCP Time Server 启动") // 创建SSE管理器 sseManager := NewSSEManager() // 创建路由器 r := http.NewServeMux() // 注册健康检查端点 r.HandleFunc("/health", handleHealth) // 注册MCP提交端点 r.HandleFunc("/mcp/v1/submit", handleSubmit) // 注册SSE端点 r.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) { handleSSE(w, r, sseManager) }) // 创建HTTP服务器 srv := &http.Server{ Addr: ":8080", Handler: r, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, IdleTimeout: 120 * time.Second, } // 启动服务器 go func() { logger.Info("MCP Time Server 启动成功", "address", ":8080") if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { logger.Error("服务器启动失败", "error", err) os.Exit(1) } }() // 等待中断信号 quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit logger.Info("MCP Time Server 正在关闭...") // 创建超时上下文 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // 优雅关闭服务器 if err := srv.Shutdown(ctx); err != nil { logger.Error("服务器关闭失败", "error", err) os.Exit(1) } logger.Info("MCP Time Server 已安全关闭") }