diff --git a/src/connection.zig b/src/connection.zig index 6d88c43..cefde3b 100644 --- a/src/connection.zig +++ b/src/connection.zig @@ -40,13 +40,13 @@ const thunk = @import("./thunk.zig"); pub const default_server_url: [:0]const u8 = nats_c.NATS_DEFAULT_URL; pub const ConnectionStatus = enum(c_int) { - disconnected = nats_c.DISCONNECTED, - connecting = nats_c.CONNECTING, - connected = nats_c.CONNECTED, - closed = nats_c.CLOSED, - reconnecting = nats_c.RECONNECTING, - draining_subs = nats_c.DRAINING_SUBS, - draining_pubs = nats_c.DRAINING_PUBS, + disconnected = nats_c.NATS_CONN_STATUS_DISCONNECTED, + connecting = nats_c.NATS_CONN_STATUS_CONNECTING, + connected = nats_c.NATS_CONN_STATUS_CONNECTED, + closed = nats_c.NATS_CONN_STATUS_CLOSED, + reconnecting = nats_c.NATS_CONN_STATUS_RECONNECTING, + draining_subs = nats_c.NATS_CONN_STATUS_DRAINING_SUBS, + draining_pubs = nats_c.NATS_CONN_STATUS_DRAINING_PUBS, _, pub fn fromInt(int: c_uint) ConnectionStatus { @@ -57,6 +57,10 @@ pub const ConnectionStatus = enum(c_int) { pub const AddressPort = struct { address: [:0]u8, port: u16, + + pub fn deinit(self: AddressPort) void { + std.heap.raw_c_allocator.free(self.address); + } }; pub const Connection = opaque { @@ -115,7 +119,7 @@ pub const Connection = opaque { return Status.fromInt(nats_c.natsConnection_FlushTimeout(@ptrCast(self), timeout)).raise(); } - pub fn getMaxPayload(self: *Connection) c_int { + pub fn getMaxPayload(self: *Connection) i64 { return nats_c.natsConnection_GetMaxPayload(@ptrCast(self)); } @@ -150,31 +154,47 @@ pub const Connection = opaque { return status.toError() orelse std.mem.sliceTo(@as([*c]u8, buffer.ptr), 0); } - pub fn getServers(self: *Connection) Error![][*:0]u8 { + pub const ServerList = struct { + raw_data: [][*:0]u8, + index: usize = 0, + + pub fn next(self: *ServerList) ?[:0]u8 { + if (self.index >= self.raw_data.len) return null; + + defer self.index += 1; + return std.mem.sliceTo(self.raw_data[self.index], 0); + } + + pub fn deinit(self: *ServerList) void { + std.heap.raw_c_allocator.free(self.raw_data); + } + }; + + pub fn getServers(self: *Connection) Error!ServerList { var servers: [*][*:0]u8 = undefined; var count: c_int = 0; const status = Status.fromInt( - nats_c.natsConnection_GetServers(@ptrCast(self), &servers, &count), + nats_c.natsConnection_GetServers(@ptrCast(self), @ptrCast(&servers), &count), ); - return status.toError() orelse servers[0..@intCast(count)]; + return status.toError() orelse .{ .raw_data = servers[0..@intCast(count)] }; } - pub fn getDiscoveredServers(self: *Connection) Error![][*:0]u8 { + pub fn getDiscoveredServers(self: *Connection) Error!ServerList { var servers: [*][*:0]u8 = undefined; var count: c_int = 0; const status = Status.fromInt( - nats_c.natsConnection_GetDiscoveredServers(@ptrCast(self), &servers, &count), + nats_c.natsConnection_GetDiscoveredServers(@ptrCast(self), @ptrCast(&servers), &count), ); - return status.toError() orelse servers[0..@intCast(count)]; + return status.toError() orelse .{ .raw_data = servers[0..@intCast(count)] }; } pub fn getLastError(self: *Connection) ErrorInfo { - var desc: [*:0]const u8 = 0; - const status = nats_c.natsConnection_GetLastError(@ptrCast(self), &desc); + var desc: [*:0]const u8 = undefined; + const status = nats_c.natsConnection_GetLastError(@ptrCast(self), @ptrCast(&desc)); return .{ .code = Status.fromInt(status).toError(), @@ -204,7 +224,7 @@ pub const Connection = opaque { var sig = [_]u8{0} ** 64; const status = Status.fromInt( - nats_c.natsConnection_Sign(@ptrCast(self), message.ptr, message.len, &sig), + nats_c.natsConnection_Sign(@ptrCast(self), message.ptr, @intCast(message.len), &sig), ); return status.toError() orelse sig; @@ -225,7 +245,7 @@ pub const Connection = opaque { var port: c_int = 0; const status = Status.fromInt( - nats_c.natsConnection_GetLocalIPAndPort(@ptrCast(self), &address, &port), + nats_c.natsConnection_GetLocalIPAndPort(@ptrCast(self), @ptrCast(&address), &port), ); return status.toError() orelse .{ diff --git a/tests/connection.zig b/tests/connection.zig index 738f4c9..5cf5749 100644 --- a/tests/connection.zig +++ b/tests/connection.zig @@ -48,9 +48,57 @@ test "nats.Connection.connectTo" { const connection = try nats.Connection.connectTo("nats://user:password@127.0.0.1:4222"); defer connection.destroy(); + connection.close(); } } +test "nats.Connection" { + var server = try util.TestServer.launch(.{}); + defer server.stop(); + + try nats.init(nats.default_spin_count); + defer nats.deinit(); + + const connection = try nats.Connection.connectTo(nats.default_server_url); + defer connection.destroy(); + + _ = connection.isClosed(); + _ = connection.isReconnecting(); + _ = connection.getStatus(); + _ = connection.bytesBuffered(); + try connection.flush(); + try connection.flushTimeout(100); + _ = connection.getMaxPayload(); + _ = try connection.getStats(); + { + // id is 56 bytes plus terminating zero + var buf = [_]u8{0} ** 57; + _ = try connection.getConnectedUrl(&buf); + _ = try connection.getConnectedServerId(&buf); + } + { + var servers = try connection.getServers(); + defer servers.deinit(); + + var discovered = try connection.getDiscoveredServers(); + defer discovered.deinit(); + } + + _ = connection.getLastError(); + _ = try connection.getClientId(); + // our connection does not have a JWT, so this call will always fail + _ = connection.sign("greetings") catch {}; + _ = try connection.getLocalIpAndPort(); + _ = connection.getRtt() catch {}; + _ = connection.hasHeaderSupport(); + // this closes the connection, but it does not block until the connection is closed, + // which can result in nondeterministic behavior for calls after this one. + try connection.drain(); + // this will return error.ConnectionClosed if the connection is already closed, so + // don't expect this to be error free. + connection.drainTimeout(1000) catch {}; +} + fn reconnectDelayHandler(userdata: *u32, connection: *nats.Connection, attempts: c_int) i64 { _ = userdata; _ = connection;