diff --git a/src/connection.zig b/src/connection.zig index a7243cf..e881a09 100644 --- a/src/connection.zig +++ b/src/connection.zig @@ -420,7 +420,7 @@ pub const Connection = opaque { } }; -const ConnectionOptions = opaque { +pub const ConnectionOptions = opaque { pub fn create() Error!*ConnectionOptions { var self: *ConnectionOptions = undefined; const status = Status.fromInt(nats_c.natsOptions_Create(@ptrCast(&self))); @@ -438,11 +438,11 @@ const ConnectionOptions = opaque { ).raise(); } - pub fn setServers(self: *ConnectionOptions, servers: [][:0]const u8) Error!void { + pub fn setServers(self: *ConnectionOptions, servers: []const [*:0]const u8) Error!void { return Status.fromInt( nats_c.natsOptions_SetServers( @ptrCast(self), - servers.ptr, + @constCast(@ptrCast(servers.ptr)), @intCast(servers.len), ), ).raise(); @@ -467,12 +467,12 @@ const ConnectionOptions = opaque { pub fn setTokenHandler( self: *ConnectionOptions, comptime T: type, - comptime callback: *const thunk.SimpleCallbackSignature(T), - userdata: T, + comptime callback: *const TokenCallbackSignature(T), + userdata: *T, ) Error!void { return Status.fromInt(nats_c.natsOptions_SetTokenHandler( @ptrCast(self), - thunk.makeSimpleCallbackThunk(callback), + makeTokenCallbackThunk(T, callback), userdata, )).raise(); } @@ -527,13 +527,13 @@ const ConnectionOptions = opaque { pub fn setCiphers(self: *ConnectionOptions, ciphers: [:0]const u8) Error!void { return Status.fromInt( - nats_c.natsOptions_SetSecure(@ptrCast(self), ciphers.ptr), + nats_c.natsOptions_SetCiphers(@ptrCast(self), ciphers.ptr), ).raise(); } pub fn setCipherSuites(self: *ConnectionOptions, ciphers: [:0]const u8) Error!void { return Status.fromInt( - nats_c.natsOptions_SetSecure(@ptrCast(self), ciphers.ptr), + nats_c.natsOptions_SetCipherSuites(@ptrCast(self), ciphers.ptr), ).raise(); } @@ -607,7 +607,7 @@ const ConnectionOptions = opaque { self: *ConnectionOptions, comptime T: type, comptime callback: *const ReconnectDelayCallbackSignature(T), - userdata: T, + userdata: *T, ) Error!void { return Status.fromInt( nats_c.natsOptions_SetCustomReconnectDelay( @@ -784,10 +784,12 @@ const ConnectionOptions = opaque { self: *ConnectionOptions, comptime T: type, comptime callback: *const ConnectionCallbackSignature(T), + retry: bool, userdata: *T, ) Error!void { return Status.fromInt(nats_c.natsOptions_SetRetryOnFailedConnect( @ptrCast(self), + retry, makeConnectionCallbackThunk(T, callback), userdata, )).raise(); @@ -880,13 +882,31 @@ const ConnectionOptions = opaque { } }; +const TokenCallback = fn (?*anyopaque) callconv(.C) [*c]const u8; + +pub fn TokenCallbackSignature(comptime T: type) type { + return fn (*T) [:0]const u8; +} + +fn makeTokenCallbackThunk( + comptime T: type, + comptime callback: *const TokenCallbackSignature(T), +) *const TokenCallback { + return struct { + fn thunk(userdata: ?*anyopaque) callconv(.C) [*c]const u8 { + const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable; + return callback(data).ptr; + } + }.thunk; +} + const ConnectionCallback = fn (?*nats_c.natsConnection, ?*anyopaque) callconv(.C) void; pub fn ConnectionCallbackSignature(comptime T: type) type { return fn (*T, *Connection) void; } -pub fn makeConnectionCallbackThunk( +fn makeConnectionCallbackThunk( comptime T: type, comptime callback: *const ConnectionCallbackSignature(T), ) *const ConnectionCallback { @@ -899,13 +919,13 @@ pub fn makeConnectionCallbackThunk( }.thunk; } -const ReconnectDelayCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) i64; +const ReconnectDelayCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) callconv(.C) i64; pub fn ReconnectDelayCallbackSignature(comptime T: type) type { return fn (*T, *Connection, c_int) i64; } -pub fn makeReconnectDelayCallbackThunk( +fn makeReconnectDelayCallbackThunk( comptime T: type, comptime callback: *const ReconnectDelayCallbackSignature(T), ) *const ReconnectDelayCallback { @@ -927,13 +947,13 @@ const ErrorHandlerCallback = fn ( ?*nats_c.natsSubscription, nats_c.natsStatus, ?*anyopaque, -) void; +) callconv(.C) void; pub fn ErrorHandlerCallbackSignature(comptime T: type) type { return fn (*T, *Connection, *Subscription, Status) void; } -pub fn makeErrorHandlerCallbackThunk( +fn makeErrorHandlerCallbackThunk( comptime T: type, comptime callback: *const ErrorHandlerCallbackSignature(T), ) *const ErrorHandlerCallback { @@ -959,13 +979,13 @@ const AttachEventLoopCallback = fn ( ?*anyopaque, ?*nats_c.natsConnection, nats_c.natsSock, -) nats_c.natsStatus; +) callconv(.C) nats_c.natsStatus; pub fn AttachEventLoopCallbackSignature(comptime T: type, comptime L: type) type { return fn (*L, *Connection, c_int) anyerror!*T; } -pub fn makeAttachEventLoopCallbackThunk( +fn makeAttachEventLoopCallbackThunk( comptime T: type, comptime L: type, comptime callback: *const AttachEventLoopCallbackSignature(T, L), @@ -988,13 +1008,13 @@ pub fn makeAttachEventLoopCallbackThunk( }.thunk; } -const EventLoopAddRemoveCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) nats_c.natsStatus; +const EventLoopAddRemoveCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) callconv(.C) nats_c.natsStatus; pub fn EventLoopAddRemoveCallbackSignature(comptime T: type) type { return fn (*T, *Connection, c_int) anyerror!void; } -pub fn makeEventLoopAddRemoveCallbackThunk( +fn makeEventLoopAddRemoveCallbackThunk( comptime T: type, comptime callback: *const EventLoopAddRemoveCallbackSignature(T), ) *const ReconnectDelayCallback { @@ -1014,13 +1034,13 @@ pub fn makeEventLoopAddRemoveCallbackThunk( }.thunk; } -const EventLoopDetachCallback = fn (?*anyopaque) nats_c.natsStatus; +const EventLoopDetachCallback = fn (?*anyopaque) callconv(.C) nats_c.natsStatus; pub fn EventLoopDetachCallbackSignature(comptime T: type) type { return fn (*T) anyerror!void; } -pub fn makeEventLoopDetachCallbackThunk( +fn makeEventLoopDetachCallbackThunk( comptime T: type, comptime callback: *const EventLoopDetachCallbackSignature(T), ) *const ReconnectDelayCallback { @@ -1037,9 +1057,9 @@ pub fn makeEventLoopDetachCallbackThunk( // THE NATS LIBRARY WILL TRY TO FREE THE TOKEN AND ALSO THE ERROR MESSAGE, SO THEY MUST // BE ALLOCATED WITH THE C ALLOCATOR -const JwtHandlerCallback = fn (*?[*:0]u8, *?[*:0]u8, ?*anyopaque) nats_c.natsStatus; +const JwtHandlerCallback = fn (?*?[*:0]u8, ?*?[*:0]u8, ?*anyopaque) callconv(.C) nats_c.natsStatus; -const JwtResponseOrError = union(enum) { +pub const JwtResponseOrError = union(enum) { jwt: [:0]u8, error_message: [:0]u8, }; @@ -1048,17 +1068,19 @@ pub fn JwtHandlerCallbackSignature(comptime T: type) type { return fn (*T) JwtResponseOrError; } -pub fn makeJwtHandlerCallbackThunk( +fn makeJwtHandlerCallbackThunk( comptime T: type, comptime callback: *const JwtHandlerCallbackSignature(T), -) *const ReconnectDelayCallback { +) *const JwtHandlerCallback { return struct { fn thunk( - jwt_out: *?[*:0]u8, - err_out: *?[*:0]u8, + jwt_out_raw: ?*?[*:0]u8, + err_out_raw: ?*?[*:0]u8, userdata: ?*anyopaque, ) callconv(.C) nats_c.natsStatus { const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable; + const err_out = err_out_raw orelse unreachable; + const jwt_out = jwt_out_raw orelse unreachable; switch (callback(data)) { .jwt => |jwt| { @@ -1076,9 +1098,9 @@ pub fn makeJwtHandlerCallbackThunk( // THE NATS LIBRARY WILL TRY TO FREE THE SIGNATURE AND ALSO THE ERROR MESSAGE, SO THEY MUST // BE ALLOCATED WITH THE C ALLOCATOR -const SignatureHandlerCallback = fn (*?[*:0]u8, *?[*]u8, *c_int, [*:0]const u8, ?*anyopaque) nats_c.natsStatus; +const SignatureHandlerCallback = fn (?*?[*:0]u8, ?*?[*]u8, ?*c_int, ?[*:0]const u8, ?*anyopaque) callconv(.C) nats_c.natsStatus; -const SignatureResponseOrError = union(enum) { +pub const SignatureResponseOrError = union(enum) { signature: []u8, error_message: [:0]u8, }; @@ -1087,24 +1109,28 @@ pub fn SignatureHandlerCallbackSignature(comptime T: type) type { return fn (*T, [:0]const u8) SignatureResponseOrError; } -pub fn makeSignatureHandlerCallbackThunk( +fn makeSignatureHandlerCallbackThunk( comptime T: type, comptime callback: *const SignatureHandlerCallbackSignature(T), -) *const ReconnectDelayCallback { +) *const SignatureHandlerCallback { return struct { fn thunk( - err_out: *?[*:0]u8, - sig_out: *?[*]u8, - sig_len_out: *c_int, - nonce: [*:0]const u8, + err_out_raw: ?*?[*:0]u8, + sig_out_raw: ?*?[*]u8, + sig_len_out_raw: ?*c_int, + nonsense: ?[*:0]const u8, userdata: ?*anyopaque, ) callconv(.C) nats_c.natsStatus { const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable; + const nonce = nonsense orelse unreachable; + const err_out = err_out_raw orelse unreachable; + const sig_out = sig_out_raw orelse unreachable; + const sig_len_out = sig_len_out_raw orelse unreachable; switch (callback(data, std.mem.sliceTo(nonce, 0))) { .signature => |sig| { sig_out.* = sig.ptr; - sig_len_out.* = sig.len; + sig_len_out.* = @intCast(sig.len); return nats_c.NATS_OK; }, .error_message => |msg| { diff --git a/src/nats.zig b/src/nats.zig index edbacdf..bace72f 100644 --- a/src/nats.zig +++ b/src/nats.zig @@ -27,6 +27,8 @@ const sta_ = @import("./statistics.zig"); pub const default_server_url = con_.default_server_url; pub const Connection = con_.Connection; pub const ConnectionOptions = con_.ConnectionOptions; +pub const JwtResponseOrError = con_.JwtResponseOrError; +pub const SignatureResponseOrError = con_.SignatureResponseOrError; pub const Subscription = sub_.Subscription; diff --git a/tests/connection.zig b/tests/connection.zig index 6341134..107a72f 100644 --- a/tests/connection.zig +++ b/tests/connection.zig @@ -42,3 +42,105 @@ test "nats.Connection.connectTo" { defer connection.destroy(); } } + +fn tokenHandler(userdata: *u32) [:0]const u8 { + _ = userdata; + return "token"; +} + +fn reconnectDelayHandler(userdata: *u32, connection: *nats.Connection, attempts: c_int) i64 { + _ = userdata; + _ = connection; + _ = attempts; + + return 0; +} + +fn errorHandler( + userdata: *u32, + connection: *nats.Connection, + subscription: *nats.Subscription, + status: nats.Status, +) void { + _ = userdata; + _ = connection; + _ = subscription; + _ = status; +} + +fn connectionHandler(userdata: *u32, connection: *nats.Connection) void { + _ = userdata; + _ = connection; +} + +fn jwtHandler(userdata: *u32) nats.JwtResponseOrError { + _ = userdata; + // return .{ .jwt = std.heap.raw_c_allocator.dupeZ(u8, "abcdef") catch @panic("no!") }; + return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "dang") catch @panic("no!") }; +} + +fn signatureHandler(userdata: *u32, nonce: [:0]const u8) nats.SignatureResponseOrError { + _ = userdata; + _ = nonce; + // return .{ .signature = std.heap.raw_c_allocator.dupe(u8, "01230123") catch @panic("no!") }; + return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "whoops") catch @panic("no!") }; +} + +test "nats.ConnectionOptions" { + try nats.init(nats.default_spin_count); + defer nats.deinit(); + + const options = try nats.ConnectionOptions.create(); + defer options.destroy(); + + var userdata: u32 = 0; + + try options.setUrl(nats.default_server_url); + const servers = [_][*:0]const u8{ "nats://127.0.0.1:4442", "nats://127.0.0.1:4443" }; + try options.setServers(&servers); + try options.setCredentials("user", "password"); + try options.setToken("test_token"); + // requires a functioning token handler, which I will not write right now. Also + // cannot be called if a token has already been set + // try options.setTokenHandler(u32, tokenHandler, &userdata); + try options.setNoRandomize(false); + try options.setTimeout(1000); + try options.setName("name"); + + // the following all require a build with openssl + // try options.setSecure(false); + // try options.setCiphers("-ALL:HIGH"); + // try options.setCipherSuites("TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256"); + // try options.setExpectedHostname("host.name"); + // try options.skipServerVerification(true); + try options.setVerbose(true); + try options.setPedantic(true); + try options.setPingInterval(1000); + try options.setMaxPingsOut(100); + try options.setIoBufSize(1024); + try options.setAllowReconnect(false); + try options.setMaxReconnect(10); + try options.setReconnectWait(500); + try options.setReconnectJitter(100, 200); + try options.setCustomReconnectDelay(u32, reconnectDelayHandler, &userdata); + try options.setReconnectBufSize(1024); + try options.setMaxPendingMessages(50); + try options.setErrorHandler(u32, errorHandler, &userdata); + try options.setClosedCallback(u32, connectionHandler, &userdata); + try options.setDisconnectedCallback(u32, connectionHandler, &userdata); + try options.setDiscoveredServersCallback(u32, connectionHandler, &userdata); + try options.setLameDuckModeCallback(u32, connectionHandler, &userdata); + try options.ignoreDiscoveredServers(true); + try options.useGlobalMessageDelivery(false); + try options.ipResolutionOrder(.ipv4_first); + try options.setSendAsap(true); + try options.useOldRequestStyle(false); + try options.setFailRequestsOnDisconnect(true); + try options.setNoEcho(true); + try options.setRetryOnFailedConnect(u32, connectionHandler, true, &userdata); + try options.setUserCredentialsCallbacks(u32, u32, jwtHandler, signatureHandler, &userdata, &userdata); + try options.setWriteDeadline(5); + try options.disableNoResponders(true); + try options.setCustomInboxPrefix("_FOOBOX"); + try options.setMessageBufferPadding(123); +}