Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  ControlChannel.swift   Sprache: unbekannt

 
Spracherkennung für: .swift vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]

import Foundation
import Observation
import OpenClawKit
import OpenClawProtocol
import SwiftUI

struct ControlHeartbeatEvent: Codable {
    let ts: Double
    let status: String
    let to: String?
    let preview: String?
    let durationMs: Double?
    let hasMedia: Bool?
    let reason: String?
}

struct ControlAgentEvent: Codable, Identifiable {
    var id: String {
        "\(self.runId)-\(self.seq)"
    }

    let runId: String
    let seq: Int
    let stream: String
    let ts: Double
    let data: [String: OpenClawProtocol.AnyCodable]
    let summary: String?
}

enum ControlChannelError: Error, LocalizedError {
    case disconnected
    case badResponse(String)

    var errorDescription: String? {
        switch self {
        case .disconnected: "Control channel disconnected"
        case let .badResponse(msg): msg
        }
    }
}

@MainActor
@Observable
final class ControlChannel {
    static let shared = ControlChannel()

    enum Mode {
        case local
        case remote(target: String, identity: String)
    }

    enum ConnectionState: Equatable {
        case disconnected
        case connecting
        case connected
        case degraded(String)
    }

    private(set) var state: ConnectionState = .disconnected {
        didSet {
            CanvasManager.shared.refreshDebugStatus()
            guard oldValue != self.state else { return }
            switch self.state {
            case .connected:
                self.logger.info("control channel state -> connected")
            case .connecting:
                self.logger.info("control channel state -> connecting")
            case .disconnected:
                self.logger.info("control channel state -> disconnected")
                self.scheduleRecovery(reason: "disconnected")
            case let .degraded(message):
                let detail = message.isEmpty ? "degraded" : "degraded: \(message)"
                self.logger.info("control channel state -> \(detail, privacy: .public)")
                self.scheduleRecovery(reason: message)
            }
        }
    }

    private(set) var lastPingMs: Double?
    private(set) var authSourceLabel: String?

    private let logger = Logger(subsystem: "ai.openclaw", category: "control")

    private var eventTask: Task<Void, Never>?
    private var recoveryTask: Task<Void, Never>?
    private var lastRecoveryAt: Date?

    private init() {
        self.startEventStream()
    }

    func configure() async {
        self.logger.info("control channel configure mode=local")
        await self.refreshEndpoint(reason: "configure")
    }

    func configure(mode: Mode = .local) async throws {
        switch mode {
        case .local:
            await self.configure()
        case let .remote(target, identity):
            do {
                _ = (target, identity)
                let idSet = !identity.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty
                self.logger.info(
                    "control channel configure mode=remote " +
                        "target=\(target, privacy: .public) identitySet=\(idSet, privacy: .public)")
                self.state = .connecting
                _ = try await GatewayEndpointStore.shared.ensureRemoteControlTunnel()
                await self.refreshEndpoint(reason: "configure")
            } catch {
                self.state = .degraded(error.localizedDescription)
                throw error
            }
        }
    }

    func refreshEndpoint(reason: String) async {
        self.logger.info("control channel refresh endpoint reason=\(reason, privacy: .public)")
        self.state = .connecting
        do {
            try await self.establishGatewayConnection()
            self.state = .connected
            PresenceReporter.shared.sendImmediate(reason: "connect")
        } catch {
            let message = self.friendlyGatewayMessage(error)
            self.state = .degraded(message)
        }
    }

    func disconnect() async {
        await GatewayConnection.shared.shutdown()
        self.state = .disconnected
        self.lastPingMs = nil
        self.authSourceLabel = nil
    }

    func health(timeout: TimeInterval? = nil) async throws -> Data {
        do {
            let start = Date()
            var params: [String: AnyHashable]?
            if let timeout {
                params = ["timeout": AnyHashable(Int(timeout * 1000))]
            }
            let timeoutMs = (timeout ?? 15) * 1000
            let payload = try await self.request(method: "health", params: params, timeoutMs: timeoutMs)
            let ms = Date().timeIntervalSince(start) * 1000
            self.lastPingMs = ms
            self.state = .connected
            return payload
        } catch {
            let message = self.friendlyGatewayMessage(error)
            self.state = .degraded(message)
            throw ControlChannelError.badResponse(message)
        }
    }

    func lastHeartbeat() async throws -> ControlHeartbeatEvent? {
        let data = try await self.request(method: "last-heartbeat")
        return try JSONDecoder().decode(ControlHeartbeatEvent?.self, from: data)
    }

    func request(
        method: String,
        params: [String: AnyHashable]? = nil,
        timeoutMs: Double? = nil) async throws -> Data
    {
        do {
            let rawParams = params?.reduce(into: [String: OpenClawKit.AnyCodable]()) {
                $0[$1.key] = OpenClawKit.AnyCodable($1.value.base)
            }
            let data = try await GatewayConnection.shared.request(
                method: method,
                params: rawParams,
                timeoutMs: timeoutMs)
            self.state = .connected
            return data
        } catch {
            let message = self.friendlyGatewayMessage(error)
            self.state = .degraded(message)
            throw ControlChannelError.badResponse(message)
        }
    }

    private func friendlyGatewayMessage(_ error: Error) -> String {
        // Map URLSession/WS errors into user-facing, actionable text.
        if let ctrlErr = error as? ControlChannelError, let desc = ctrlErr.errorDescription {
            return desc
        }

        if let authIssue = RemoteGatewayAuthIssue(error: error) {
            return authIssue.statusMessage
        }

        // If the gateway explicitly rejects the hello (e.g., auth/token mismatch), surface it.
        if let urlErr = error as? URLError,
           urlErr.code == .dataNotAllowed // used for WS close 1008 auth failures
        {
            let reason = urlErr.failureURLString ?? urlErr.localizedDescription
            let tokenKey = CommandResolver.connectionModeIsRemote()
                ? "gateway.remote.token"
                : "gateway.auth.token"
            return
                "Gateway rejected token; set \(tokenKey) or clear it on the gateway. Reason: \(reason)"
        }

        // Common misfire: we connected to the configured localhost port but it is occupied
        // by some other process (e.g. a local dev gateway or a stuck SSH forward).
        // The gateway handshake returns something we can't parse, which currently
        // surfaces as "hello failed (unexpected response)". Give the user a pointer
        // to free the port instead of a vague message.
        let nsError = error as NSError
        if nsError.domain == "Gateway",
           nsError.localizedDescription.contains("hello failed (unexpected response)")
        {
            let port = GatewayEnvironment.gatewayPort()
            return """
            Gateway handshake got non-gateway data on localhost:\(port).
            Another process is using that port or the SSH forward failed.
            Stop the local gateway/port-forward on \(port) and retry Remote mode.
            """
        }

        if let urlError = error as? URLError {
            let port = GatewayEnvironment.gatewayPort()
            switch urlError.code {
            case .cancelled:
                return "Gateway connection was closed; start the gateway (localhost:\(port)) and retry."
            case .cannotFindHost, .cannotConnectToHost:
                let isRemote = CommandResolver.connectionModeIsRemote()
                if isRemote {
                    return """
                    Cannot reach gateway at localhost:\(port).
                    Remote mode uses an SSH tunnel—check the SSH target and that the tunnel is running.
                    """
                }
                return "Cannot reach gateway at localhost:\(port); ensure the gateway is running."
            case .networkConnectionLost:
                return "Gateway connection dropped; gateway likely restarted—retry."
            case .timedOut:
                return "Gateway request timed out; check gateway on localhost:\(port)."
            case .notConnectedToInternet:
                return "No network connectivity; cannot reach gateway."
            default:
                break
            }
        }

        if nsError.domain == "Gateway", nsError.code == 5 {
            let port = GatewayEnvironment.gatewayPort()
            return "Gateway request timed out; check the gateway process on localhost:\(port)."
        }

        let detail = nsError.localizedDescription.isEmpty ? "unknown gateway error" : nsError.localizedDescription
        let trimmed = detail.trimmingCharacters(in: .whitespacesAndNewlines)
        if trimmed.lowercased().hasPrefix("gateway error:") { return trimmed }
        return "Gateway error: \(trimmed)"
    }

    private func scheduleRecovery(reason: String) {
        let now = Date()
        if let last = self.lastRecoveryAt, now.timeIntervalSince(last) < 10 { return }
        guard self.recoveryTask == nil else { return }
        self.lastRecoveryAt = now

        self.recoveryTask = Task { [weak self] in
            guard let self else { return }
            let mode = await MainActor.run { AppStateStore.shared.connectionMode }
            guard mode != .unconfigured else {
                self.recoveryTask = nil
                return
            }

            let trimmedReason = reason.trimmingCharacters(in: .whitespacesAndNewlines)
            let reasonText = trimmedReason.isEmpty ? "unknown" : trimmedReason
            self.logger.info(
                "control channel recovery starting " +
                    "mode=\(String(describing: mode), privacy: .public) " +
                    "reason=\(reasonText, privacy: .public)")
            if mode == .local {
                GatewayProcessManager.shared.setActive(true)
            }
            if mode == .remote {
                do {
                    let port = try await GatewayEndpointStore.shared.ensureRemoteControlTunnel()
                    self.logger.info("control channel recovery ensured SSH tunnel port=\(port, privacy: .public)")
                } catch {
                    self.logger.error(
                        "control channel recovery tunnel failed \(error.localizedDescription, privacy: .public)")
                }
            }

            await self.refreshEndpoint(reason: "recovery:\(reasonText)")
            if case .connected = self.state {
                self.logger.info("control channel recovery finished")
            } else if case let .degraded(message) = self.state {
                self.logger.error("control channel recovery failed \(message, privacy: .public)")
            }

            self.recoveryTask = nil
        }
    }

    private func establishGatewayConnection(timeoutMs: Int = 5000) async throws {
        try await GatewayConnection.shared.refresh()
        let ok = try await GatewayConnection.shared.healthOK(timeoutMs: timeoutMs)
        if ok == false {
            throw NSError(
                domain: "Gateway",
                code: 0,
                userInfo: [NSLocalizedDescriptionKey: "gateway health not ok"])
        }
        await self.refreshAuthSourceLabel()
    }

    private func refreshAuthSourceLabel() async {
        let isRemote = CommandResolver.connectionModeIsRemote()
        let authSource = await GatewayConnection.shared.authSource()
        self.authSourceLabel = Self.formatAuthSource(authSource, isRemote: isRemote)
    }

    private static func formatAuthSource(_ source: GatewayAuthSource?, isRemote: Bool) -> String? {
        guard let source else { return nil }
        switch source {
        case .deviceToken:
            return "Auth: device token (paired device)"
        case .bootstrapToken:
            return "Auth: bootstrap token (setup code)"
        case .sharedToken:
            return "Auth: shared token (\(isRemote ? "gateway.remote.token" : "gateway.auth.token"))"
        case .password:
            return "Auth: password (\(isRemote ? "gateway.remote.password" : "gateway.auth.password"))"
        case .none:
            return "Auth: none"
        }
    }

    func sendSystemEvent(_ text: String, params: [String: AnyHashable] = [:]) async throws {
        var merged = params
        merged["text"] = AnyHashable(text)
        _ = try await self.request(method: "system-event", params: merged)
    }

    private func startEventStream() {
        GatewayPushSubscription.restartTask(task: &self.eventTask) { [weak self] push in
            self?.handle(push: push)
        }
    }

    private func handle(push: GatewayPush) {
        switch push {
        case let .event(evt) where evt.event == "agent":
            if let payload = evt.payload,
               let agent = try? GatewayPayloadDecoding.decode(payload, as: ControlAgentEvent.self)
            {
                AgentEventStore.shared.append(agent)
                self.routeWorkActivity(from: agent)
            }
        case let .event(evt) where evt.event == "heartbeat":
            if let payload = evt.payload,
               let heartbeat = try? GatewayPayloadDecoding.decode(payload, as: ControlHeartbeatEvent.self),
               let data = try? JSONEncoder().encode(heartbeat)
            {
                NotificationCenter.default.post(name: .controlHeartbeat, object: data)
            }
        case let .event(evt) where evt.event == "shutdown":
            self.state = .degraded("gateway shutdown")
        case .snapshot:
            self.state = .connected
        default:
            break
        }
    }

    private func routeWorkActivity(from event: ControlAgentEvent) {
        // We currently treat VoiceWake as the "main" session for UI purposes.
        // In the future, the gateway can include a sessionKey to distinguish runs.
        let sessionKey = (event.data["sessionKey"]?.value as? String) ?? "main"

        switch event.stream.lowercased() {
        case "job":
            if let state = event.data["state"]?.value as? String {
                WorkActivityStore.shared.handleJob(sessionKey: sessionKey, state: state)
            }
        case "tool":
            let phase = event.data["phase"]?.value as? String ?? ""
            let name = event.data["name"]?.value as? String
            let meta = event.data["meta"]?.value as? String
            let args = Self.bridgeToProtocolArgs(event.data["args"])
            WorkActivityStore.shared.handleTool(
                sessionKey: sessionKey,
                phase: phase,
                name: name,
                meta: meta,
                args: args)
        default:
            break
        }
    }

    private static func bridgeToProtocolArgs(
        _ value: OpenClawProtocol.AnyCodable?) -> [String: OpenClawProtocol.AnyCodable]?
    {
        guard let value else { return nil }
        if let dict = value.value as? [String: OpenClawProtocol.AnyCodable] {
            return dict
        }
        if let dict = value.value as? [String: OpenClawKit.AnyCodable],
           let data = try? JSONEncoder().encode(dict),
           let decoded = try? JSONDecoder().decode([String: OpenClawProtocol.AnyCodable].self, from: data)
        {
            return decoded
        }
        if let data = try? JSONEncoder().encode(value),
           let decoded = try? JSONDecoder().decode([String: OpenClawProtocol.AnyCodable].self, from: data)
        {
            return decoded
        }
        return nil
    }
}

extension Notification.Name {
    static let controlHeartbeat = Notification.Name("openclaw.control.heartbeat")
    static let controlAgentEvent = Notification.Name("openclaw.control.agent")
}

[Dauer der Verarbeitung: 0.34 Sekunden, vorverarbeitet 2026-04-27]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge