diff --git a/src/connection.zig b/src/connection.zig index e46e705..c541b30 100644 --- a/src/connection.zig +++ b/src/connection.zig @@ -16,7 +16,15 @@ const tls = @import("tls.zig"); const relay = @import("relay.zig"); const utils = @import("utils.zig"); -pub const DeviceId = identity.DeviceId; +// Re-export tipos desde connection/types.zig +pub const types = @import("connection/types.zig"); +pub const DeviceId = types.DeviceId; +pub const ConnectionState = types.ConnectionState; +pub const NatType = types.NatType; +pub const ConnectionMethod = types.ConnectionMethod; +pub const Error = types.Error; +pub const PeerInfo = types.PeerInfo; +pub const PeerAddress = types.PeerAddress; // ============================================================================= // Configuración @@ -24,210 +32,30 @@ pub const DeviceId = identity.DeviceId; /// Configuración del nodo P2P pub const Config = struct { - /// Nombre del dispositivo device_name: []const u8 = "zcatp2p", - - /// Puerto para conexiones entrantes listen_port: u16 = 22000, - - /// Habilitar discovery local local_discovery: bool = true, - - /// Intervalo de anuncio local (ms) local_announce_interval: u32 = 30000, - - /// Servidores de discovery global global_discovery_servers: []const []const u8 = &.{}, - - /// Servidores STUN stun_servers: []const []const u8 = &.{ "stun.l.google.com:19302", "stun.syncthing.net:3478", }, - - /// Servidores relay relay_servers: []const []const u8 = &.{}, - - /// Directorio de datos data_dir: []const u8, - - /// Compresión habilitada compression: bool = true, - - /// Habilitar NAT traversal automático nat_traversal: bool = true, - - /// Timeout de conexión (ms) connect_timeout: u32 = 30000, - - /// Intervalo de keepalive (ms) keepalive_interval: u32 = 60000, - - /// Callbacks on_device_discovered: ?*const fn (DeviceId, []const []const u8) void = null, on_message_received: ?*const fn (*Connection, protocol.Message) void = null, on_connection_state_changed: ?*const fn (*Connection, ConnectionState) void = null, }; // ============================================================================= -// Estados y tipos +// Buffer de recepción // ============================================================================= -/// Estado de conexión -pub const ConnectionState = enum { - disconnected, - resolving, // Buscando direcciones del peer - connecting, // Intentando conectar - handshaking, // TLS handshake en progreso - connected, - disconnecting, - @"error", - - pub fn isActive(self: ConnectionState) bool { - return self == .connecting or self == .handshaking or self == .connected; - } -}; - -/// Tipo de NAT detectado -pub const NatType = enum { - unknown, - none, // Sin NAT (IP pública directa) - full_cone, - restricted, - port_restricted, - symmetric, - blocked, - - pub fn canAcceptIncoming(self: NatType) bool { - return self == .none or self == .full_cone; - } - - pub fn needsHolePunch(self: NatType) bool { - return self == .restricted or self == .port_restricted; - } - - pub fn needsRelay(self: NatType) bool { - return self == .symmetric or self == .blocked; - } -}; - -/// Método de conexión usado -pub const ConnectionMethod = enum { - direct, // Conexión directa TCP - hole_punch, // UDP hole punching - relay, // A través de relay - local, // Red local -}; - -/// Errores del módulo -pub const Error = error{ - AlreadyInitialized, - NotInitialized, - InvalidDeviceId, - ConnectionFailed, - ConnectionTimeout, - ConnectionClosed, - PeerNotFound, - CertificateError, - TlsError, - ProtocolError, - CompressionError, - OutOfMemory, - IoError, - InvalidConfig, - AddressInUse, - NetworkUnreachable, - HandshakeFailed, - InvalidMessage, - RelayFailed, -}; - -// ============================================================================= -// Información de peers -// ============================================================================= - -/// Información de un peer -pub const PeerInfo = struct { - device_id: DeviceId, - device_name: []const u8, - client_name: []const u8, - client_version: []const u8, - addresses: [][]const u8, - connected_at: i64, - is_local: bool, - bytes_sent: u64, - bytes_received: u64, - connection_method: ConnectionMethod, - allocator: std.mem.Allocator, - - pub fn deinit(self: *PeerInfo) void { - self.allocator.free(self.device_name); - self.allocator.free(self.client_name); - self.allocator.free(self.client_version); - for (self.addresses) |addr| { - self.allocator.free(addr); - } - self.allocator.free(self.addresses); - } -}; - -/// Dirección de peer con metadatos -pub const PeerAddress = struct { - address: []const u8, - port: u16, - is_local: bool, - is_relay: bool, - last_seen: i64, - priority: u8, // 0 = más alta prioridad - - pub fn parse(addr_str: []const u8, allocator: std.mem.Allocator) !PeerAddress { - // Formato: "ip:port" o "relay://server/device_id" - if (std.mem.startsWith(u8, addr_str, "relay://")) { - return .{ - .address = try allocator.dupe(u8, addr_str), - .port = 0, - .is_local = false, - .is_relay = true, - .last_seen = utils.timestamp(), - .priority = 100, // Baja prioridad - }; - } - - // IP:puerto - if (std.mem.lastIndexOf(u8, addr_str, ":")) |colon| { - const port = std.fmt.parseInt(u16, addr_str[colon + 1 ..], 10) catch 22000; - const ip = addr_str[0..colon]; - - // Detectar si es local - const is_local = std.mem.startsWith(u8, ip, "192.168.") or - std.mem.startsWith(u8, ip, "10.") or - std.mem.startsWith(u8, ip, "172.16.") or - std.mem.startsWith(u8, ip, "172.17.") or - std.mem.startsWith(u8, ip, "172.18.") or - std.mem.startsWith(u8, ip, "172.19.") or - std.mem.startsWith(u8, ip, "172.2") or - std.mem.startsWith(u8, ip, "172.30.") or - std.mem.startsWith(u8, ip, "172.31."); - - return .{ - .address = try allocator.dupe(u8, ip), - .port = port, - .is_local = is_local, - .is_relay = false, - .last_seen = utils.timestamp(), - .priority = if (is_local) 10 else 50, - }; - } - - return error.InvalidAddress; - } -}; - -// ============================================================================= -// Conexión -// ============================================================================= - -/// Buffer para recepción de mensajes const RecvBuffer = struct { data: []u8, pos: usize, @@ -235,7 +63,7 @@ const RecvBuffer = struct { fn init(allocator: std.mem.Allocator) !RecvBuffer { return .{ - .data = try allocator.alloc(u8, 64 * 1024), // 64KB inicial + .data = try allocator.alloc(u8, 64 * 1024), .pos = 0, .allocator = allocator, }; @@ -250,6 +78,10 @@ const RecvBuffer = struct { } }; +// ============================================================================= +// Conexión +// ============================================================================= + /// Conexión con un peer pub const Connection = struct { io: std.Io, @@ -266,7 +98,7 @@ pub const Connection = struct { last_activity: i64, connection_method: ConnectionMethod, recv_buffer: ?RecvBuffer, - pending_acks: std.AutoHashMapUnmanaged(u32, i64), // message_id -> timestamp + pending_acks: std.AutoHashMapUnmanaged(u32, i64), error_message: ?[]const u8, pub fn init(io: std.Io, allocator: std.mem.Allocator, device_id: DeviceId) Connection { @@ -292,16 +124,10 @@ pub const Connection = struct { pub fn deinit(self: *Connection) void { self.close(); - if (self.peer_info) |*info| { - info.deinit(); - } - if (self.recv_buffer) |*buf| { - buf.deinit(); - } + if (self.peer_info) |*info| info.deinit(); + if (self.recv_buffer) |*buf| buf.deinit(); self.pending_acks.deinit(self.allocator); - if (self.error_message) |msg| { - self.allocator.free(msg); - } + if (self.error_message) |msg| self.allocator.free(msg); } pub fn getDeviceId(self: *Connection) DeviceId { @@ -327,12 +153,9 @@ pub const Connection = struct { self.state = .connecting; self.error_message = null; - // Ordenar direcciones por prioridad (locales primero) var sorted_addrs = std.ArrayList(PeerAddress).init(self.allocator); defer { - for (sorted_addrs.items) |*addr| { - self.allocator.free(addr.address); - } + for (sorted_addrs.items) |*addr| self.allocator.free(addr.address); sorted_addrs.deinit(); } @@ -342,14 +165,12 @@ pub const Connection = struct { } else |_| {} } - // Ordenar por prioridad std.mem.sort(PeerAddress, sorted_addrs.items, {}, struct { fn lessThan(_: void, a: PeerAddress, b: PeerAddress) bool { return a.priority < b.priority; } }.lessThan); - // Intentar cada dirección const deadline = utils.milliTimestamp() + timeout_ms; for (sorted_addrs.items) |peer_addr| { @@ -357,14 +178,11 @@ pub const Connection = struct { if (remaining <= 0) break; if (peer_addr.is_relay) { - // Conexión a través de relay self.connectViaRelay(peer_addr.address) catch continue; } else { - // Conexión directa TCP self.connectDirect(peer_addr.address, peer_addr.port, @intCast(@min(remaining, std.math.maxInt(u32)))) catch continue; } - // Si llegamos aquí, la conexión tuvo éxito self.connection_method = if (peer_addr.is_local) .local else if (peer_addr.is_relay) .relay else .direct; return; } @@ -377,15 +195,9 @@ pub const Connection = struct { fn connectDirect(self: *Connection, host: []const u8, port: u16, timeout_ms: u32) !void { _ = timeout_ms; - // Crear socket TCP - const sock = try std.posix.socket( - std.posix.AF.INET, - std.posix.SOCK.STREAM, - std.posix.IPPROTO.TCP, - ); + const sock = try std.posix.socket(std.posix.AF.INET, std.posix.SOCK.STREAM, std.posix.IPPROTO.TCP); errdefer std.posix.close(sock); - // Parsear dirección var octets: [4]u8 = undefined; var octet_idx: usize = 0; var current: u16 = 0; @@ -411,7 +223,6 @@ pub const Connection = struct { const addr = std.net.Address.initIp4(octets, port); - // Conectar std.posix.connect(sock, &addr.any, addr.getOsSockLen()) catch |err| { return switch (err) { error.ConnectionRefused => Error.ConnectionFailed, @@ -423,36 +234,23 @@ pub const Connection = struct { self.socket = sock; self.state = .handshaking; - // Iniciar TLS handshake try self.performTlsHandshake(); - - // Enviar HELLO try self.sendHello(); - - // Recibir HELLO del peer try self.receiveHello(); self.state = .connected; self.connected_at = utils.timestamp(); self.last_activity = self.connected_at; - - // Inicializar buffer de recepción self.recv_buffer = RecvBuffer.init(self.allocator) catch null; } fn connectViaRelay(self: *Connection, relay_addr: []const u8) !void { - // Parsear la URL del relay: relay://host:port/device_id - if (!std.mem.startsWith(u8, relay_addr, "relay://")) { - return Error.RelayFailed; - } + if (!std.mem.startsWith(u8, relay_addr, "relay://")) return Error.RelayFailed; - const rest = relay_addr[8..]; // Quitar "relay://" - - // Buscar el último / para separar host:port de device_id + const rest = relay_addr[8..]; const path_sep = std.mem.lastIndexOf(u8, rest, "/") orelse return Error.RelayFailed; const host_port = rest[0..path_sep]; - // Parsear host:port var host_end = host_port.len; var port: u16 = relay.RELAY_PORT; @@ -463,7 +261,6 @@ pub const Connection = struct { const host = host_port[0..host_end]; - // Parsear IP del host var octets: [4]u8 = undefined; var octet_idx: usize = 0; var current: u16 = 0; @@ -478,7 +275,6 @@ pub const Connection = struct { current = current * 10 + (c - '0'); if (current > 255) return Error.RelayFailed; } else { - // Hostname - por ahora no soportado (necesita DNS) return Error.RelayFailed; } } @@ -488,59 +284,40 @@ pub const Connection = struct { const addr = std.net.Address.initIp4(octets, port); - // Crear cliente relay var relay_client = relay.RelayClient.init(self.allocator, self.device_id); errdefer relay_client.deinit(); - // Conectar al servidor relay relay_client.connect(addr) catch return Error.RelayFailed; - - // Unirse al pool del relay relay_client.joinRelay() catch return Error.RelayFailed; - - // Solicitar conexión al dispositivo target relay_client.requestConnection(self.device_id) catch return Error.RelayFailed; - // Esperar a que la sesión esté conectada (con timeout) - const deadline = utils.milliTimestamp() + 10000; // 10 segundos + const deadline = utils.milliTimestamp() + 10000; while (utils.milliTimestamp() < deadline) { if (relay_client.state == .connected) break; if (relay_client.state == .@"error") return Error.RelayFailed; - // Recibir y procesar mensajes var recv_buf: [1024]u8 = undefined; if (relay_client.socket) |sock| { - // Non-blocking receive - const tv = std.posix.timeval{ .sec = 0, .usec = 100000 }; // 100ms + const tv = std.posix.timeval{ .sec = 0, .usec = 100000 }; std.posix.setsockopt(sock, std.posix.SOL.SOCKET, std.posix.SO.RCVTIMEO, std.mem.asBytes(&tv)) catch {}; - const n = std.posix.recv(sock, &recv_buf, 0) catch continue; - if (n > 0) { - relay_client.processMessage(recv_buf[0..n]) catch {}; - } + if (n > 0) relay_client.processMessage(recv_buf[0..n]) catch {}; } } - if (relay_client.state != .connected) { - return Error.RelayFailed; - } + if (relay_client.state != .connected) return Error.RelayFailed; - // Guardar socket del relay como socket de conexión self.socket = relay_client.socket; - relay_client.socket = null; // Transferir ownership + relay_client.socket = null; self.state = .connected; self.connected_at = utils.timestamp(); self.last_activity = self.connected_at; self.connection_method = .relay; - - // Inicializar buffer de recepción self.recv_buffer = RecvBuffer.init(self.allocator) catch null; } fn performTlsHandshake(self: *Connection) !void { - // Por ahora, usar conexión sin TLS para simplificar - // En producción, iniciar handshake TLS 1.3 aquí _ = self; } @@ -550,33 +327,24 @@ pub const Connection = struct { .client_name = "zcatp2p", .client_version = "0.1.0", .timestamp = utils.timestamp(), - .capabilities = .{ - .compression_lz4 = true, - .encryption_chacha20 = true, - }, + .capabilities = .{ .compression_lz4 = true, .encryption_chacha20 = true }, }; const payload = hello.encode(self.allocator) catch return Error.OutOfMemory; defer self.allocator.free(payload); - try self.sendRaw(.hello, payload); } fn receiveHello(self: *Connection) !void { const msg = try self.receiveMessage(5000); - defer { - if (msg.payload) |p| self.allocator.free(p); - } + defer if (msg.payload) |p| self.allocator.free(p); - if (msg.header.msg_type != .hello) { - return Error.ProtocolError; - } + if (msg.header.msg_type != .hello) return Error.ProtocolError; if (msg.payload) |payload| { var hello = protocol.HelloMessage.decode(payload, self.allocator) catch return Error.ProtocolError; defer hello.deinit(self.allocator); - // Crear PeerInfo self.peer_info = .{ .device_id = self.device_id, .device_name = self.allocator.dupe(u8, hello.device_name) catch return Error.OutOfMemory, @@ -610,19 +378,14 @@ pub const Connection = struct { defer self.allocator.free(payload); try self.sendRaw(.data, payload); - - // Registrar para ACK self.pending_acks.put(self.allocator, msg_id, utils.milliTimestamp()) catch {}; - return msg_id; } - /// Envía un mensaje raw fn sendRaw(self: *Connection, msg_type: protocol.MessageType, payload: []const u8) !void { const stream = self.socket orelse return Error.ConnectionClosed; var stream_writer = stream.writer(self.io, &.{}); - // Construir header const header = protocol.MessageHeader{ .msg_type = msg_type, .flags = .{}, @@ -630,73 +393,53 @@ pub const Connection = struct { }; const header_bytes = header.encode(); - - // Enviar header try stream_writer.interface.writeAll(&header_bytes); - - // Enviar payload - if (payload.len > 0) { - try stream_writer.interface.writeAll(payload); - } + if (payload.len > 0) try stream_writer.interface.writeAll(payload); self.bytes_sent += protocol.MessageHeader.SIZE + payload.len; self.last_activity = utils.timestamp(); } - /// Estructura para mensaje recibido const ReceivedMessage = struct { header: protocol.MessageHeader, payload: ?[]u8, }; - /// Recibe un mensaje fn receiveMessage(self: *Connection, timeout_ms: u32) !ReceivedMessage { const stream = self.socket orelse return Error.ConnectionClosed; - _ = timeout_ms; // TODO: timeout support in Stream.Reader + _ = timeout_ms; var stream_reader = stream.reader(self.io, &.{}); - // Leer header var header_buf: [protocol.MessageHeader.SIZE]u8 = undefined; try stream_reader.interface.readSliceAll(&header_buf); const header = protocol.MessageHeader.decode(&header_buf); + if (header.length > protocol.MAX_MESSAGE_LEN) return Error.ProtocolError; - // Validar longitud - if (header.length > protocol.MAX_MESSAGE_LEN) { - return Error.ProtocolError; - } - - // Leer payload var payload: ?[]u8 = null; if (header.length > 0) { payload = try self.allocator.alloc(u8, header.length); errdefer if (payload) |p| self.allocator.free(p); - try stream_reader.interface.readSliceAll(payload.?); } return .{ .header = header, .payload = payload }; } - /// Envía un ping pub fn ping(self: *Connection) Error!void { if (self.state != .connected) return Error.ConnectionClosed; try self.sendRaw(.ping, &.{}); } - /// Cierra la conexión pub fn close(self: *Connection) void { self.closeWithReason("connection closed"); } - /// Cierra la conexión con motivo pub fn closeWithReason(self: *Connection, reason: []const u8) void { if (self.state == .disconnected) return; - self.state = .disconnecting; - // Enviar CLOSE si el socket está activo if (self.socket != null) { const close_msg = protocol.CloseMessage{ .reason = reason }; if (close_msg.encode(self.allocator)) |payload| { @@ -705,64 +448,34 @@ pub const Connection = struct { } else |_| {} } - // Cerrar TLS if (self.tls_state) |tls_conn| { tls_conn.deinit(); self.allocator.destroy(tls_conn); self.tls_state = null; } - // Cerrar socket + if (self.socket) |sock| { + sock.close(self.io); + self.socket = null; + } - if (self.socket) |sock| { - - sock.close(self.io); - - self.socket = null; - - } - - - - self.state = .disconnected; - - } - - - - /// Espera hasta que la conexión esté establecida - - pub fn waitConnected(self: *Connection, timeout_ms: u32) Error!void { - - const deadline = utils.milliTimestamp() + timeout_ms; - - while (utils.milliTimestamp() < deadline) { - - switch (self.state) { - - .connected => return, - - .@"error", .disconnected => return Error.ConnectionFailed, - - else => { - - // TODO: better sleep - - const ts = std.posix.timespec{ .sec = 0, .nsec = 10 * std.time.ns_per_ms }; - - _ = std.posix.system.nanosleep(&ts, null); - - }, - - } - - } - - - - return Error.ConnectionTimeout; + self.state = .disconnected; + } + pub fn waitConnected(self: *Connection, timeout_ms: u32) Error!void { + const deadline = utils.milliTimestamp() + timeout_ms; + while (utils.milliTimestamp() < deadline) { + switch (self.state) { + .connected => return, + .@"error", .disconnected => return Error.ConnectionFailed, + else => { + const ts = std.posix.timespec{ .sec = 0, .nsec = 10 * std.time.ns_per_ms }; + _ = std.posix.system.nanosleep(&ts, null); + }, } + } + return Error.ConnectionTimeout; + } }; // ============================================================================= @@ -789,19 +502,14 @@ pub const P2P = struct { const self = allocator.create(P2P) catch return Error.OutOfMemory; errdefer allocator.destroy(self); - // Cargar o generar identidad persistente const key_path = std.fmt.allocPrint(allocator, "{s}/identity.key", .{config.data_dir}) catch return Error.OutOfMemory; defer allocator.free(key_path); - // Asegurar que el directorio existe if (std.Io.Dir.path.dirname(key_path)) |dir| { std.Io.Dir.createDirAbsolute(io, dir, .default_dir) catch {}; } - const ident = identity.Identity.loadOrGenerate(io, key_path) catch { - // Si falla, generar identidad temporal (no persistente) - return Error.CertificateError; - }; + const ident = identity.Identity.loadOrGenerate(io, key_path) catch return Error.CertificateError; const device_id = ident.device_id; self.* = .{ @@ -826,7 +534,6 @@ pub const P2P = struct { pub fn deinit(self: *P2P) void { self.stop(); - // Cerrar todas las conexiones var iter = self.connections.iterator(); while (iter.next()) |entry| { entry.value_ptr.*.deinit(); @@ -834,90 +541,60 @@ pub const P2P = struct { } self.connections.deinit(self.allocator); - // Limpiar componentes self.discovery_manager.deinit(); self.stun_client.deinit(); self.nat_manager.deinit(); - // Limpiar direcciones externas - for (self.external_addresses.items) |addr| { - self.allocator.free(addr); - } + for (self.external_addresses.items) |addr| self.allocator.free(addr); self.external_addresses.deinit(self.allocator); self.allocator.destroy(self); } - /// Inicia el nodo P2P pub fn start(self: *P2P) Error!void { if (self.running) return; - - // 1. Configurar NAT traversal - if (self.config.nat_traversal) { - try self.setupNatTraversal(); - } - - // 2. Iniciar listener TCP + if (self.config.nat_traversal) try self.setupNatTraversal(); try self.startListener(); - - // 3. Iniciar discovery try self.startDiscovery(); - self.running = true; } - /// Detiene el nodo P2P pub fn stop(self: *P2P) void { if (!self.running) return; - // Detener listener if (self.listener_socket) |*sock| { sock.deinit(self.io); self.listener_socket = null; } - // Eliminar port mapping if (self.port_mapping) |mapping| { self.nat_manager.unmapPort(mapping.external_port, mapping.protocol) catch {}; self.port_mapping = null; } - // Detener discovery self.discovery_manager.stopLocalDiscovery(); - // Cerrar todas las conexiones var iter = self.connections.iterator(); - while (iter.next()) |entry| { - entry.value_ptr.*.close(); - } + while (iter.next()) |entry| entry.value_ptr.*.close(); self.running = false; } fn setupNatTraversal(self: *P2P) !void { - // 1. Añadir servidores STUN - for (self.config.stun_servers) |server| { - self.stun_client.addServer(server) catch continue; - } + for (self.config.stun_servers) |server| self.stun_client.addServer(server) catch continue; - // 2. Descubrir IP externa via STUN if (self.stun_client.discoverExternalAddress() catch null) |addr| { var buf: [32]u8 = undefined; const addr_str = addr.format(&buf); if (addr_str.len > 0) { - const owned = self.allocator.dupe(u8, addr_str) catch null; - if (owned) |o| { - self.external_addresses.append(self.allocator, o) catch { - self.allocator.free(o); - }; - } + if (self.allocator.dupe(u8, addr_str)) |owned| { + self.external_addresses.append(self.allocator, owned) catch self.allocator.free(owned); + } else |_| {} } } - // 3. Detectar tipo de NAT self.nat_type = self.mapStunNatType(self.stun_client.detectNatType() catch .unknown); - // 4. Intentar port mapping si hay NAT if (self.nat_type != .none) { _ = self.nat_manager.discover() catch {}; @@ -926,26 +603,20 @@ pub const P2P = struct { self.config.listen_port, .TCP, "zcatp2p", - 3600, // 1 hora + 3600, ) catch .gateway_not_found; switch (result) { .success => |mapping| { self.port_mapping = mapping; - - // Añadir dirección externa con puerto mapeado if (self.nat_manager.getExternalIP() catch null) |ip| { defer self.allocator.free(ip); var addr_buf: [64]u8 = undefined; - const addr_str = std.fmt.bufPrint(&addr_buf, "{s}:{d}", .{ ip, mapping.external_port }) catch null; - if (addr_str) |s| { - const owned = self.allocator.dupe(u8, s) catch null; - if (owned) |o| { - self.external_addresses.append(self.allocator, o) catch { - self.allocator.free(o); - }; - } - } + if (std.fmt.bufPrint(&addr_buf, "{s}:{d}", .{ ip, mapping.external_port })) |s| { + if (self.allocator.dupe(u8, s)) |owned| { + self.external_addresses.append(self.allocator, owned) catch self.allocator.free(owned); + } else |_| {} + } else |_| {} } }, else => {}, @@ -969,19 +640,15 @@ pub const P2P = struct { fn startListener(self: *P2P) !void { const addr = std.Io.net.IpAddress.unspecified(self.config.listen_port); self.listener_socket = std.Io.net.listen(addr, self.io, .{}) catch return Error.IoError; - - // Añadir direcciones locales try self.discoverLocalAddresses(); } fn discoverLocalAddresses(self: *P2P) !void { - // Obtener IP local conectando a una dirección externa const addr = std.Io.net.IpAddress{ .ip4 = std.Io.net.Ip4Address{ .bytes = .{ 8, 8, 8, 8 }, .port = 53 } }; const stream = addr.connect(self.io, .{ .mode = .stream }) catch return; defer stream.close(self.io); - // TODO: get local address from stream/socket - const ip: u32 = 0x0100007f; // 127.0.0.1 fallback + const ip: u32 = 0x0100007f; var buf: [32]u8 = undefined; const addr_str = std.fmt.bufPrint(&buf, "{d}.{d}.{d}.{d}:{d}", .{ @@ -993,65 +660,44 @@ pub const P2P = struct { }) catch return; const owned = self.allocator.dupe(u8, addr_str) catch return; - self.external_addresses.append(self.allocator, owned) catch { - self.allocator.free(owned); - }; + self.external_addresses.append(self.allocator, owned) catch self.allocator.free(owned); } fn startDiscovery(self: *P2P) !void { - // Iniciar discovery local - if (self.config.local_discovery) { - self.discovery_manager.startLocalDiscovery(self.config.listen_port) catch {}; - } - - // Configurar discovery global - for (self.config.global_discovery_servers) |server| { - self.discovery_manager.addGlobalServer(server) catch continue; - } - - // Anunciar en discovery global - if (self.external_addresses.items.len > 0) { - self.discovery_manager.announceGlobal(self.external_addresses.items) catch {}; - } + if (self.config.local_discovery) self.discovery_manager.startLocalDiscovery(self.config.listen_port) catch {}; + for (self.config.global_discovery_servers) |server| self.discovery_manager.addGlobalServer(server) catch continue; + if (self.external_addresses.items.len > 0) self.discovery_manager.announceGlobal(self.external_addresses.items) catch {}; } // ------------------------------------------------------------------------- // API pública // ------------------------------------------------------------------------- - /// Obtiene el Device ID local pub fn getDeviceId(self: *P2P) DeviceId { return self.device_id; } - /// Obtiene el Device ID como string pub fn getDeviceIdString(self: *P2P, buf: []u8) []const u8 { return identity.deviceIdToString(self.device_id, buf); } - /// Parsea un Device ID desde string pub fn parseDeviceId(str: []const u8) Error!DeviceId { return identity.stringToDeviceId(str) catch Error.InvalidDeviceId; } - /// Compara dos Device IDs pub fn deviceIdEquals(a: DeviceId, b: DeviceId) bool { return identity.deviceIdEquals(a, b); } - /// Obtiene el Short ID pub fn getShortId(device_id: DeviceId) identity.ShortId { return identity.getShortId(device_id); } - /// Conecta a un dispositivo pub fn connect(self: *P2P, device_id: DeviceId) Error!*Connection { - // Verificar si ya existe conexión if (self.connections.get(device_id)) |conn| { if (conn.isConnected()) return conn; } - // Crear nueva conexión const conn = self.allocator.create(Connection) catch return Error.OutOfMemory; conn.* = Connection.init(self.allocator, device_id); conn.state = .resolving; @@ -1061,40 +707,29 @@ pub const P2P = struct { return Error.OutOfMemory; }; - // Buscar direcciones del peer const addresses = self.discovery_manager.lookup(device_id) catch null; if (addresses == null or addresses.?.len == 0) { conn.state = .@"error"; return Error.PeerNotFound; } - // Conectar conn.connectTo(addresses.?, self.config.connect_timeout) catch |err| { conn.state = .@"error"; return err; }; - // Notificar cambio de estado - if (self.config.on_connection_state_changed) |callback| { - callback(conn, conn.state); - } - + if (self.config.on_connection_state_changed) |callback| callback(conn, conn.state); return conn; } - /// Desconecta de un peer pub fn disconnect(self: *P2P, device_id: DeviceId) void { - if (self.connections.get(device_id)) |conn| { - conn.close(); - } + if (self.connections.get(device_id)) |conn| conn.close(); } - /// Obtiene una conexión existente pub fn getConnection(self: *P2P, device_id: DeviceId) ?*Connection { return self.connections.get(device_id); } - /// Número de conexiones activas pub fn connectionCount(self: *P2P) usize { var count: usize = 0; var iter = self.connections.iterator(); @@ -1104,38 +739,28 @@ pub const P2P = struct { return count; } - /// Obtiene las direcciones externas pub fn getExternalAddresses(self: *P2P) []const []const u8 { return self.external_addresses.items; } - /// Obtiene el tipo de NAT pub fn getNatType(self: *P2P) NatType { return self.nat_type; } - /// Indica si el nodo está ejecutándose pub fn isRunning(self: *P2P) bool { return self.running; } - /// Añade manualmente direcciones para un peer pub fn addPeerAddresses(self: *P2P, device_id: DeviceId, addresses: []const []const u8) !void { try self.discovery_manager.addKnownPeer(device_id, addresses); } - /// Fuerza un anuncio en discovery global pub fn announce(self: *P2P) !void { - if (self.external_addresses.items.len > 0) { - try self.discovery_manager.announceGlobal(self.external_addresses.items); - } + if (self.external_addresses.items.len > 0) try self.discovery_manager.announceGlobal(self.external_addresses.items); } - // Método para mantener compatibilidad con API anterior pub fn getExternalAddress(self: *P2P) ?[]const u8 { - if (self.external_addresses.items.len > 0) { - return self.external_addresses.items[0]; - } + if (self.external_addresses.items.len > 0) return self.external_addresses.items[0]; return null; } }; @@ -1148,9 +773,7 @@ test "p2p init/deinit" { const allocator = std.testing.allocator; const io = std.testing.io; - const p2p = try P2P.init(io, allocator, .{ - .data_dir = "/tmp/zcatp2p-test", - }); + const p2p = try P2P.init(io, allocator, .{ .data_dir = "/tmp/zcatp2p-test" }); defer p2p.deinit(); try std.testing.expect(p2p.connectionCount() == 0); @@ -1168,44 +791,3 @@ test "connection init" { try std.testing.expect(conn.state == .disconnected); try std.testing.expect(!conn.isConnected()); } - -test "connection state active check" { - try std.testing.expect(!ConnectionState.disconnected.isActive()); - try std.testing.expect(ConnectionState.connecting.isActive()); - try std.testing.expect(ConnectionState.connected.isActive()); - try std.testing.expect(!ConnectionState.@"error".isActive()); -} - -test "nat type capabilities" { - try std.testing.expect(NatType.none.canAcceptIncoming()); - try std.testing.expect(NatType.full_cone.canAcceptIncoming()); - try std.testing.expect(!NatType.restricted.canAcceptIncoming()); - - try std.testing.expect(NatType.restricted.needsHolePunch()); - try std.testing.expect(NatType.port_restricted.needsHolePunch()); - try std.testing.expect(!NatType.symmetric.needsHolePunch()); - - try std.testing.expect(NatType.symmetric.needsRelay()); - try std.testing.expect(NatType.blocked.needsRelay()); -} - -test "peer address parse" { - const allocator = std.testing.allocator; - - // IP local - const local = try PeerAddress.parse("192.168.1.100:22000", allocator); - defer allocator.free(local.address); - try std.testing.expect(local.is_local); - try std.testing.expect(!local.is_relay); - try std.testing.expectEqual(@as(u16, 22000), local.port); - - // IP pública - const public_addr = try PeerAddress.parse("8.8.8.8:22000", allocator); - defer allocator.free(public_addr.address); - try std.testing.expect(!public_addr.is_local); - - // Relay - const relay_addr = try PeerAddress.parse("relay://relay.example.com/device123", allocator); - defer allocator.free(relay_addr.address); - try std.testing.expect(relay_addr.is_relay); -} diff --git a/src/connection/types.zig b/src/connection/types.zig new file mode 100644 index 0000000..463c35c --- /dev/null +++ b/src/connection/types.zig @@ -0,0 +1,200 @@ +//! Tipos y estructuras para el módulo de conexión + +const std = @import("std"); +const identity = @import("../identity.zig"); +const utils = @import("../utils.zig"); + +pub const DeviceId = identity.DeviceId; + +// ============================================================================= +// Estados y tipos +// ============================================================================= + +/// Estado de conexión +pub const ConnectionState = enum { + disconnected, + resolving, + connecting, + handshaking, + connected, + disconnecting, + @"error", + + pub fn isActive(self: ConnectionState) bool { + return self == .connecting or self == .handshaking or self == .connected; + } +}; + +/// Tipo de NAT detectado +pub const NatType = enum { + unknown, + none, + full_cone, + restricted, + port_restricted, + symmetric, + blocked, + + pub fn canAcceptIncoming(self: NatType) bool { + return self == .none or self == .full_cone; + } + + pub fn needsHolePunch(self: NatType) bool { + return self == .restricted or self == .port_restricted; + } + + pub fn needsRelay(self: NatType) bool { + return self == .symmetric or self == .blocked; + } +}; + +/// Método de conexión usado +pub const ConnectionMethod = enum { + direct, + hole_punch, + relay, + local, +}; + +/// Errores del módulo +pub const Error = error{ + AlreadyInitialized, + NotInitialized, + InvalidDeviceId, + ConnectionFailed, + ConnectionTimeout, + ConnectionClosed, + PeerNotFound, + CertificateError, + TlsError, + ProtocolError, + CompressionError, + OutOfMemory, + IoError, + InvalidConfig, + AddressInUse, + NetworkUnreachable, + HandshakeFailed, + InvalidMessage, + RelayFailed, +}; + +// ============================================================================= +// Información de peers +// ============================================================================= + +/// Información de un peer +pub const PeerInfo = struct { + device_id: DeviceId, + device_name: []const u8, + client_name: []const u8, + client_version: []const u8, + addresses: [][]const u8, + connected_at: i64, + is_local: bool, + bytes_sent: u64, + bytes_received: u64, + connection_method: ConnectionMethod, + allocator: std.mem.Allocator, + + pub fn deinit(self: *PeerInfo) void { + self.allocator.free(self.device_name); + self.allocator.free(self.client_name); + self.allocator.free(self.client_version); + for (self.addresses) |addr| { + self.allocator.free(addr); + } + self.allocator.free(self.addresses); + } +}; + +/// Dirección de peer con metadatos +pub const PeerAddress = struct { + address: []const u8, + port: u16, + is_local: bool, + is_relay: bool, + last_seen: i64, + priority: u8, + + pub fn parse(addr_str: []const u8, allocator: std.mem.Allocator) !PeerAddress { + if (std.mem.startsWith(u8, addr_str, "relay://")) { + return .{ + .address = try allocator.dupe(u8, addr_str), + .port = 0, + .is_local = false, + .is_relay = true, + .last_seen = utils.timestamp(), + .priority = 100, + }; + } + + if (std.mem.lastIndexOf(u8, addr_str, ":")) |colon| { + const port = std.fmt.parseInt(u16, addr_str[colon + 1 ..], 10) catch 22000; + const ip = addr_str[0..colon]; + + const is_local = std.mem.startsWith(u8, ip, "192.168.") or + std.mem.startsWith(u8, ip, "10.") or + std.mem.startsWith(u8, ip, "172.16.") or + std.mem.startsWith(u8, ip, "172.17.") or + std.mem.startsWith(u8, ip, "172.18.") or + std.mem.startsWith(u8, ip, "172.19.") or + std.mem.startsWith(u8, ip, "172.2") or + std.mem.startsWith(u8, ip, "172.30.") or + std.mem.startsWith(u8, ip, "172.31."); + + return .{ + .address = try allocator.dupe(u8, ip), + .port = port, + .is_local = is_local, + .is_relay = false, + .last_seen = utils.timestamp(), + .priority = if (is_local) 10 else 50, + }; + } + + return error.InvalidAddress; + } +}; + +// ============================================================================= +// Tests +// ============================================================================= + +test "connection state active check" { + try std.testing.expect(!ConnectionState.disconnected.isActive()); + try std.testing.expect(ConnectionState.connecting.isActive()); + try std.testing.expect(ConnectionState.connected.isActive()); + try std.testing.expect(!ConnectionState.@"error".isActive()); +} + +test "nat type capabilities" { + try std.testing.expect(NatType.none.canAcceptIncoming()); + try std.testing.expect(NatType.full_cone.canAcceptIncoming()); + try std.testing.expect(!NatType.restricted.canAcceptIncoming()); + + try std.testing.expect(NatType.restricted.needsHolePunch()); + try std.testing.expect(NatType.port_restricted.needsHolePunch()); + try std.testing.expect(!NatType.symmetric.needsHolePunch()); + + try std.testing.expect(NatType.symmetric.needsRelay()); + try std.testing.expect(NatType.blocked.needsRelay()); +} + +test "peer address parse" { + const allocator = std.testing.allocator; + + const local = try PeerAddress.parse("192.168.1.100:22000", allocator); + defer allocator.free(local.address); + try std.testing.expect(local.is_local); + try std.testing.expect(!local.is_relay); + try std.testing.expectEqual(@as(u16, 22000), local.port); + + const public_addr = try PeerAddress.parse("8.8.8.8:22000", allocator); + defer allocator.free(public_addr.address); + try std.testing.expect(!public_addr.is_local); + + const relay_addr = try PeerAddress.parse("relay://relay.example.com/device123", allocator); + defer allocator.free(relay_addr.address); + try std.testing.expect(relay_addr.is_relay); +}