connection.ConnectionOptions: fix a lot of issues

Various copypasta and typing errors around the thunks, some other
copypasta errors, etc. Also add a test which hits a decent chunk of
the API surface of ConnectionOptions, which is how these issues were
caught in the first place. This is an ugly test, but it has clearly
served its purpose.
This commit is contained in:
torque 2023-08-28 22:46:14 -07:00
parent 142021bfd3
commit 741af6f976
Signed by: torque
SSH Key Fingerprint: SHA256:nCrXefBNo6EbjNSQhv0nXmEg/VuNq3sMF5b8zETw3Tk
3 changed files with 165 additions and 35 deletions

View File

@ -420,7 +420,7 @@ pub const Connection = opaque {
}
};
const ConnectionOptions = opaque {
pub const ConnectionOptions = opaque {
pub fn create() Error!*ConnectionOptions {
var self: *ConnectionOptions = undefined;
const status = Status.fromInt(nats_c.natsOptions_Create(@ptrCast(&self)));
@ -438,11 +438,11 @@ const ConnectionOptions = opaque {
).raise();
}
pub fn setServers(self: *ConnectionOptions, servers: [][:0]const u8) Error!void {
pub fn setServers(self: *ConnectionOptions, servers: []const [*:0]const u8) Error!void {
return Status.fromInt(
nats_c.natsOptions_SetServers(
@ptrCast(self),
servers.ptr,
@constCast(@ptrCast(servers.ptr)),
@intCast(servers.len),
),
).raise();
@ -467,12 +467,12 @@ const ConnectionOptions = opaque {
pub fn setTokenHandler(
self: *ConnectionOptions,
comptime T: type,
comptime callback: *const thunk.SimpleCallbackSignature(T),
userdata: T,
comptime callback: *const TokenCallbackSignature(T),
userdata: *T,
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetTokenHandler(
@ptrCast(self),
thunk.makeSimpleCallbackThunk(callback),
makeTokenCallbackThunk(T, callback),
userdata,
)).raise();
}
@ -527,13 +527,13 @@ const ConnectionOptions = opaque {
pub fn setCiphers(self: *ConnectionOptions, ciphers: [:0]const u8) Error!void {
return Status.fromInt(
nats_c.natsOptions_SetSecure(@ptrCast(self), ciphers.ptr),
nats_c.natsOptions_SetCiphers(@ptrCast(self), ciphers.ptr),
).raise();
}
pub fn setCipherSuites(self: *ConnectionOptions, ciphers: [:0]const u8) Error!void {
return Status.fromInt(
nats_c.natsOptions_SetSecure(@ptrCast(self), ciphers.ptr),
nats_c.natsOptions_SetCipherSuites(@ptrCast(self), ciphers.ptr),
).raise();
}
@ -607,7 +607,7 @@ const ConnectionOptions = opaque {
self: *ConnectionOptions,
comptime T: type,
comptime callback: *const ReconnectDelayCallbackSignature(T),
userdata: T,
userdata: *T,
) Error!void {
return Status.fromInt(
nats_c.natsOptions_SetCustomReconnectDelay(
@ -784,10 +784,12 @@ const ConnectionOptions = opaque {
self: *ConnectionOptions,
comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T),
retry: bool,
userdata: *T,
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetRetryOnFailedConnect(
@ptrCast(self),
retry,
makeConnectionCallbackThunk(T, callback),
userdata,
)).raise();
@ -880,13 +882,31 @@ const ConnectionOptions = opaque {
}
};
const TokenCallback = fn (?*anyopaque) callconv(.C) [*c]const u8;
pub fn TokenCallbackSignature(comptime T: type) type {
return fn (*T) [:0]const u8;
}
fn makeTokenCallbackThunk(
comptime T: type,
comptime callback: *const TokenCallbackSignature(T),
) *const TokenCallback {
return struct {
fn thunk(userdata: ?*anyopaque) callconv(.C) [*c]const u8 {
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
return callback(data).ptr;
}
}.thunk;
}
const ConnectionCallback = fn (?*nats_c.natsConnection, ?*anyopaque) callconv(.C) void;
pub fn ConnectionCallbackSignature(comptime T: type) type {
return fn (*T, *Connection) void;
}
pub fn makeConnectionCallbackThunk(
fn makeConnectionCallbackThunk(
comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T),
) *const ConnectionCallback {
@ -899,13 +919,13 @@ pub fn makeConnectionCallbackThunk(
}.thunk;
}
const ReconnectDelayCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) i64;
const ReconnectDelayCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) callconv(.C) i64;
pub fn ReconnectDelayCallbackSignature(comptime T: type) type {
return fn (*T, *Connection, c_int) i64;
}
pub fn makeReconnectDelayCallbackThunk(
fn makeReconnectDelayCallbackThunk(
comptime T: type,
comptime callback: *const ReconnectDelayCallbackSignature(T),
) *const ReconnectDelayCallback {
@ -927,13 +947,13 @@ const ErrorHandlerCallback = fn (
?*nats_c.natsSubscription,
nats_c.natsStatus,
?*anyopaque,
) void;
) callconv(.C) void;
pub fn ErrorHandlerCallbackSignature(comptime T: type) type {
return fn (*T, *Connection, *Subscription, Status) void;
}
pub fn makeErrorHandlerCallbackThunk(
fn makeErrorHandlerCallbackThunk(
comptime T: type,
comptime callback: *const ErrorHandlerCallbackSignature(T),
) *const ErrorHandlerCallback {
@ -959,13 +979,13 @@ const AttachEventLoopCallback = fn (
?*anyopaque,
?*nats_c.natsConnection,
nats_c.natsSock,
) nats_c.natsStatus;
) callconv(.C) nats_c.natsStatus;
pub fn AttachEventLoopCallbackSignature(comptime T: type, comptime L: type) type {
return fn (*L, *Connection, c_int) anyerror!*T;
}
pub fn makeAttachEventLoopCallbackThunk(
fn makeAttachEventLoopCallbackThunk(
comptime T: type,
comptime L: type,
comptime callback: *const AttachEventLoopCallbackSignature(T, L),
@ -988,13 +1008,13 @@ pub fn makeAttachEventLoopCallbackThunk(
}.thunk;
}
const EventLoopAddRemoveCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) nats_c.natsStatus;
const EventLoopAddRemoveCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) callconv(.C) nats_c.natsStatus;
pub fn EventLoopAddRemoveCallbackSignature(comptime T: type) type {
return fn (*T, *Connection, c_int) anyerror!void;
}
pub fn makeEventLoopAddRemoveCallbackThunk(
fn makeEventLoopAddRemoveCallbackThunk(
comptime T: type,
comptime callback: *const EventLoopAddRemoveCallbackSignature(T),
) *const ReconnectDelayCallback {
@ -1014,13 +1034,13 @@ pub fn makeEventLoopAddRemoveCallbackThunk(
}.thunk;
}
const EventLoopDetachCallback = fn (?*anyopaque) nats_c.natsStatus;
const EventLoopDetachCallback = fn (?*anyopaque) callconv(.C) nats_c.natsStatus;
pub fn EventLoopDetachCallbackSignature(comptime T: type) type {
return fn (*T) anyerror!void;
}
pub fn makeEventLoopDetachCallbackThunk(
fn makeEventLoopDetachCallbackThunk(
comptime T: type,
comptime callback: *const EventLoopDetachCallbackSignature(T),
) *const ReconnectDelayCallback {
@ -1037,9 +1057,9 @@ pub fn makeEventLoopDetachCallbackThunk(
// THE NATS LIBRARY WILL TRY TO FREE THE TOKEN AND ALSO THE ERROR MESSAGE, SO THEY MUST
// BE ALLOCATED WITH THE C ALLOCATOR
const JwtHandlerCallback = fn (*?[*:0]u8, *?[*:0]u8, ?*anyopaque) nats_c.natsStatus;
const JwtHandlerCallback = fn (?*?[*:0]u8, ?*?[*:0]u8, ?*anyopaque) callconv(.C) nats_c.natsStatus;
const JwtResponseOrError = union(enum) {
pub const JwtResponseOrError = union(enum) {
jwt: [:0]u8,
error_message: [:0]u8,
};
@ -1048,17 +1068,19 @@ pub fn JwtHandlerCallbackSignature(comptime T: type) type {
return fn (*T) JwtResponseOrError;
}
pub fn makeJwtHandlerCallbackThunk(
fn makeJwtHandlerCallbackThunk(
comptime T: type,
comptime callback: *const JwtHandlerCallbackSignature(T),
) *const ReconnectDelayCallback {
) *const JwtHandlerCallback {
return struct {
fn thunk(
jwt_out: *?[*:0]u8,
err_out: *?[*:0]u8,
jwt_out_raw: ?*?[*:0]u8,
err_out_raw: ?*?[*:0]u8,
userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus {
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
const err_out = err_out_raw orelse unreachable;
const jwt_out = jwt_out_raw orelse unreachable;
switch (callback(data)) {
.jwt => |jwt| {
@ -1076,9 +1098,9 @@ pub fn makeJwtHandlerCallbackThunk(
// THE NATS LIBRARY WILL TRY TO FREE THE SIGNATURE AND ALSO THE ERROR MESSAGE, SO THEY MUST
// BE ALLOCATED WITH THE C ALLOCATOR
const SignatureHandlerCallback = fn (*?[*:0]u8, *?[*]u8, *c_int, [*:0]const u8, ?*anyopaque) nats_c.natsStatus;
const SignatureHandlerCallback = fn (?*?[*:0]u8, ?*?[*]u8, ?*c_int, ?[*:0]const u8, ?*anyopaque) callconv(.C) nats_c.natsStatus;
const SignatureResponseOrError = union(enum) {
pub const SignatureResponseOrError = union(enum) {
signature: []u8,
error_message: [:0]u8,
};
@ -1087,24 +1109,28 @@ pub fn SignatureHandlerCallbackSignature(comptime T: type) type {
return fn (*T, [:0]const u8) SignatureResponseOrError;
}
pub fn makeSignatureHandlerCallbackThunk(
fn makeSignatureHandlerCallbackThunk(
comptime T: type,
comptime callback: *const SignatureHandlerCallbackSignature(T),
) *const ReconnectDelayCallback {
) *const SignatureHandlerCallback {
return struct {
fn thunk(
err_out: *?[*:0]u8,
sig_out: *?[*]u8,
sig_len_out: *c_int,
nonce: [*:0]const u8,
err_out_raw: ?*?[*:0]u8,
sig_out_raw: ?*?[*]u8,
sig_len_out_raw: ?*c_int,
nonsense: ?[*:0]const u8,
userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus {
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
const nonce = nonsense orelse unreachable;
const err_out = err_out_raw orelse unreachable;
const sig_out = sig_out_raw orelse unreachable;
const sig_len_out = sig_len_out_raw orelse unreachable;
switch (callback(data, std.mem.sliceTo(nonce, 0))) {
.signature => |sig| {
sig_out.* = sig.ptr;
sig_len_out.* = sig.len;
sig_len_out.* = @intCast(sig.len);
return nats_c.NATS_OK;
},
.error_message => |msg| {

View File

@ -27,6 +27,8 @@ const sta_ = @import("./statistics.zig");
pub const default_server_url = con_.default_server_url;
pub const Connection = con_.Connection;
pub const ConnectionOptions = con_.ConnectionOptions;
pub const JwtResponseOrError = con_.JwtResponseOrError;
pub const SignatureResponseOrError = con_.SignatureResponseOrError;
pub const Subscription = sub_.Subscription;

View File

@ -42,3 +42,105 @@ test "nats.Connection.connectTo" {
defer connection.destroy();
}
}
fn tokenHandler(userdata: *u32) [:0]const u8 {
_ = userdata;
return "token";
}
fn reconnectDelayHandler(userdata: *u32, connection: *nats.Connection, attempts: c_int) i64 {
_ = userdata;
_ = connection;
_ = attempts;
return 0;
}
fn errorHandler(
userdata: *u32,
connection: *nats.Connection,
subscription: *nats.Subscription,
status: nats.Status,
) void {
_ = userdata;
_ = connection;
_ = subscription;
_ = status;
}
fn connectionHandler(userdata: *u32, connection: *nats.Connection) void {
_ = userdata;
_ = connection;
}
fn jwtHandler(userdata: *u32) nats.JwtResponseOrError {
_ = userdata;
// return .{ .jwt = std.heap.raw_c_allocator.dupeZ(u8, "abcdef") catch @panic("no!") };
return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "dang") catch @panic("no!") };
}
fn signatureHandler(userdata: *u32, nonce: [:0]const u8) nats.SignatureResponseOrError {
_ = userdata;
_ = nonce;
// return .{ .signature = std.heap.raw_c_allocator.dupe(u8, "01230123") catch @panic("no!") };
return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "whoops") catch @panic("no!") };
}
test "nats.ConnectionOptions" {
try nats.init(nats.default_spin_count);
defer nats.deinit();
const options = try nats.ConnectionOptions.create();
defer options.destroy();
var userdata: u32 = 0;
try options.setUrl(nats.default_server_url);
const servers = [_][*:0]const u8{ "nats://127.0.0.1:4442", "nats://127.0.0.1:4443" };
try options.setServers(&servers);
try options.setCredentials("user", "password");
try options.setToken("test_token");
// requires a functioning token handler, which I will not write right now. Also
// cannot be called if a token has already been set
// try options.setTokenHandler(u32, tokenHandler, &userdata);
try options.setNoRandomize(false);
try options.setTimeout(1000);
try options.setName("name");
// the following all require a build with openssl
// try options.setSecure(false);
// try options.setCiphers("-ALL:HIGH");
// try options.setCipherSuites("TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256");
// try options.setExpectedHostname("host.name");
// try options.skipServerVerification(true);
try options.setVerbose(true);
try options.setPedantic(true);
try options.setPingInterval(1000);
try options.setMaxPingsOut(100);
try options.setIoBufSize(1024);
try options.setAllowReconnect(false);
try options.setMaxReconnect(10);
try options.setReconnectWait(500);
try options.setReconnectJitter(100, 200);
try options.setCustomReconnectDelay(u32, reconnectDelayHandler, &userdata);
try options.setReconnectBufSize(1024);
try options.setMaxPendingMessages(50);
try options.setErrorHandler(u32, errorHandler, &userdata);
try options.setClosedCallback(u32, connectionHandler, &userdata);
try options.setDisconnectedCallback(u32, connectionHandler, &userdata);
try options.setDiscoveredServersCallback(u32, connectionHandler, &userdata);
try options.setLameDuckModeCallback(u32, connectionHandler, &userdata);
try options.ignoreDiscoveredServers(true);
try options.useGlobalMessageDelivery(false);
try options.ipResolutionOrder(.ipv4_first);
try options.setSendAsap(true);
try options.useOldRequestStyle(false);
try options.setFailRequestsOnDisconnect(true);
try options.setNoEcho(true);
try options.setRetryOnFailedConnect(u32, connectionHandler, true, &userdata);
try options.setUserCredentialsCallbacks(u32, u32, jwtHandler, signatureHandler, &userdata, &userdata);
try options.setWriteDeadline(5);
try options.disableNoResponders(true);
try options.setCustomInboxPrefix("_FOOBOX");
try options.setMessageBufferPadding(123);
}