Compare commits

...

3 Commits

Author SHA1 Message Date
741af6f976
connection.ConnectionOptions: fix a lot of issues
Various copypasta and typing errors around the thunks, some other
copypasta errors, etc. Also add a test which hits a decent chunk of
the API surface of ConnectionOptions, which is how these issues were
caught in the first place. This is an ugly test, but it has clearly
served its purpose.
2023-08-28 22:46:14 -07:00
142021bfd3
connection: make callback thunks pass userdata as first parameter
This makes the thunks compatible with standard struct methods, which
seems desirable.
2023-08-28 21:50:58 -07:00
ee803168a3
tests.connection: test auth varieties
Also fix password auth in the nats-server launcher.
2023-08-28 21:27:59 -07:00
4 changed files with 206 additions and 49 deletions

View File

@ -420,7 +420,7 @@ pub const Connection = opaque {
}
};
const ConnectionOptions = opaque {
pub const ConnectionOptions = opaque {
pub fn create() Error!*ConnectionOptions {
var self: *ConnectionOptions = undefined;
const status = Status.fromInt(nats_c.natsOptions_Create(@ptrCast(&self)));
@ -438,11 +438,11 @@ const ConnectionOptions = opaque {
).raise();
}
pub fn setServers(self: *ConnectionOptions, servers: [][:0]const u8) Error!void {
pub fn setServers(self: *ConnectionOptions, servers: []const [*:0]const u8) Error!void {
return Status.fromInt(
nats_c.natsOptions_SetServers(
@ptrCast(self),
servers.ptr,
@constCast(@ptrCast(servers.ptr)),
@intCast(servers.len),
),
).raise();
@ -467,12 +467,12 @@ const ConnectionOptions = opaque {
pub fn setTokenHandler(
self: *ConnectionOptions,
comptime T: type,
comptime callback: *const thunk.SimpleCallbackSignature(T),
userdata: T,
comptime callback: *const TokenCallbackSignature(T),
userdata: *T,
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetTokenHandler(
@ptrCast(self),
thunk.makeSimpleCallbackThunk(callback),
makeTokenCallbackThunk(T, callback),
userdata,
)).raise();
}
@ -527,13 +527,13 @@ const ConnectionOptions = opaque {
pub fn setCiphers(self: *ConnectionOptions, ciphers: [:0]const u8) Error!void {
return Status.fromInt(
nats_c.natsOptions_SetSecure(@ptrCast(self), ciphers.ptr),
nats_c.natsOptions_SetCiphers(@ptrCast(self), ciphers.ptr),
).raise();
}
pub fn setCipherSuites(self: *ConnectionOptions, ciphers: [:0]const u8) Error!void {
return Status.fromInt(
nats_c.natsOptions_SetSecure(@ptrCast(self), ciphers.ptr),
nats_c.natsOptions_SetCipherSuites(@ptrCast(self), ciphers.ptr),
).raise();
}
@ -607,7 +607,7 @@ const ConnectionOptions = opaque {
self: *ConnectionOptions,
comptime T: type,
comptime callback: *const ReconnectDelayCallbackSignature(T),
userdata: T,
userdata: *T,
) Error!void {
return Status.fromInt(
nats_c.natsOptions_SetCustomReconnectDelay(
@ -784,10 +784,12 @@ const ConnectionOptions = opaque {
self: *ConnectionOptions,
comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T),
retry: bool,
userdata: *T,
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetRetryOnFailedConnect(
@ptrCast(self),
retry,
makeConnectionCallbackThunk(T, callback),
userdata,
)).raise();
@ -880,13 +882,31 @@ const ConnectionOptions = opaque {
}
};
const TokenCallback = fn (?*anyopaque) callconv(.C) [*c]const u8;
pub fn TokenCallbackSignature(comptime T: type) type {
return fn (*T) [:0]const u8;
}
fn makeTokenCallbackThunk(
comptime T: type,
comptime callback: *const TokenCallbackSignature(T),
) *const TokenCallback {
return struct {
fn thunk(userdata: ?*anyopaque) callconv(.C) [*c]const u8 {
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
return callback(data).ptr;
}
}.thunk;
}
const ConnectionCallback = fn (?*nats_c.natsConnection, ?*anyopaque) callconv(.C) void;
pub fn ConnectionCallbackSignature(comptime T: type) type {
return fn (*Connection, *T) void;
return fn (*T, *Connection) void;
}
pub fn makeConnectionCallbackThunk(
fn makeConnectionCallbackThunk(
comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T),
) *const ConnectionCallback {
@ -894,18 +914,18 @@ pub fn makeConnectionCallbackThunk(
fn thunk(conn: ?*nats_c.natsConnection, userdata: ?*anyopaque) callconv(.C) void {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
callback(connection, data);
callback(data, connection);
}
}.thunk;
}
const ReconnectDelayCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) i64;
const ReconnectDelayCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) callconv(.C) i64;
pub fn ReconnectDelayCallbackSignature(comptime T: type) type {
return fn (*Connection, c_int, *T) i64;
return fn (*T, *Connection, c_int) i64;
}
pub fn makeReconnectDelayCallbackThunk(
fn makeReconnectDelayCallbackThunk(
comptime T: type,
comptime callback: *const ReconnectDelayCallbackSignature(T),
) *const ReconnectDelayCallback {
@ -917,7 +937,7 @@ pub fn makeReconnectDelayCallbackThunk(
) callconv(.C) i64 {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
return callback(connection, attempts, data);
return callback(data, connection, attempts);
}
}.thunk;
}
@ -927,13 +947,13 @@ const ErrorHandlerCallback = fn (
?*nats_c.natsSubscription,
nats_c.natsStatus,
?*anyopaque,
) void;
) callconv(.C) void;
pub fn ErrorHandlerCallbackSignature(comptime T: type) type {
return fn (*Connection, *Subscription, Status, *T) void;
return fn (*T, *Connection, *Subscription, Status) void;
}
pub fn makeErrorHandlerCallbackThunk(
fn makeErrorHandlerCallbackThunk(
comptime T: type,
comptime callback: *const ErrorHandlerCallbackSignature(T),
) *const ErrorHandlerCallback {
@ -948,7 +968,7 @@ pub fn makeErrorHandlerCallbackThunk(
const subscription: *Subscription = if (sub) |s| @ptrCast(s) else unreachable;
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
callback(connection, subscription, Status.fromInt(status), data);
callback(data, connection, subscription, Status.fromInt(status));
}
}.thunk;
}
@ -959,13 +979,13 @@ const AttachEventLoopCallback = fn (
?*anyopaque,
?*nats_c.natsConnection,
nats_c.natsSock,
) nats_c.natsStatus;
) callconv(.C) nats_c.natsStatus;
pub fn AttachEventLoopCallbackSignature(comptime T: type, comptime L: type) type {
return fn (*L, *Connection, c_int) anyerror!*T;
}
pub fn makeAttachEventLoopCallbackThunk(
fn makeAttachEventLoopCallbackThunk(
comptime T: type,
comptime L: type,
comptime callback: *const AttachEventLoopCallbackSignature(T, L),
@ -988,13 +1008,13 @@ pub fn makeAttachEventLoopCallbackThunk(
}.thunk;
}
const EventLoopAddRemoveCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) nats_c.natsStatus;
const EventLoopAddRemoveCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) callconv(.C) nats_c.natsStatus;
pub fn EventLoopAddRemoveCallbackSignature(comptime T: type) type {
return fn (*Connection, c_int, *T) anyerror!void;
return fn (*T, *Connection, c_int) anyerror!void;
}
pub fn makeEventLoopAddRemoveCallbackThunk(
fn makeEventLoopAddRemoveCallbackThunk(
comptime T: type,
comptime callback: *const EventLoopAddRemoveCallbackSignature(T),
) *const ReconnectDelayCallback {
@ -1006,7 +1026,7 @@ pub fn makeEventLoopAddRemoveCallbackThunk(
) callconv(.C) nats_c.natsStatus {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
callback(connection, attempts, data) catch |err|
callback(data, connection, attempts) catch |err|
return Status.fromError(err).toInt();
return nats_c.NATS_OK;
@ -1014,13 +1034,13 @@ pub fn makeEventLoopAddRemoveCallbackThunk(
}.thunk;
}
const EventLoopDetachCallback = fn (?*anyopaque) nats_c.natsStatus;
const EventLoopDetachCallback = fn (?*anyopaque) callconv(.C) nats_c.natsStatus;
pub fn EventLoopDetachCallbackSignature(comptime T: type) type {
return fn (*T) anyerror!void;
}
pub fn makeEventLoopDetachCallbackThunk(
fn makeEventLoopDetachCallbackThunk(
comptime T: type,
comptime callback: *const EventLoopDetachCallbackSignature(T),
) *const ReconnectDelayCallback {
@ -1037,9 +1057,9 @@ pub fn makeEventLoopDetachCallbackThunk(
// THE NATS LIBRARY WILL TRY TO FREE THE TOKEN AND ALSO THE ERROR MESSAGE, SO THEY MUST
// BE ALLOCATED WITH THE C ALLOCATOR
const JwtHandlerCallback = fn (*?[*:0]u8, *?[*:0]u8, ?*anyopaque) nats_c.natsStatus;
const JwtHandlerCallback = fn (?*?[*:0]u8, ?*?[*:0]u8, ?*anyopaque) callconv(.C) nats_c.natsStatus;
const JwtResponseOrError = union(enum) {
pub const JwtResponseOrError = union(enum) {
jwt: [:0]u8,
error_message: [:0]u8,
};
@ -1048,17 +1068,19 @@ pub fn JwtHandlerCallbackSignature(comptime T: type) type {
return fn (*T) JwtResponseOrError;
}
pub fn makeJwtHandlerCallbackThunk(
fn makeJwtHandlerCallbackThunk(
comptime T: type,
comptime callback: *const JwtHandlerCallbackSignature(T),
) *const ReconnectDelayCallback {
) *const JwtHandlerCallback {
return struct {
fn thunk(
jwt_out: *?[*:0]u8,
err_out: *?[*:0]u8,
jwt_out_raw: ?*?[*:0]u8,
err_out_raw: ?*?[*:0]u8,
userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus {
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
const err_out = err_out_raw orelse unreachable;
const jwt_out = jwt_out_raw orelse unreachable;
switch (callback(data)) {
.jwt => |jwt| {
@ -1076,35 +1098,39 @@ pub fn makeJwtHandlerCallbackThunk(
// THE NATS LIBRARY WILL TRY TO FREE THE SIGNATURE AND ALSO THE ERROR MESSAGE, SO THEY MUST
// BE ALLOCATED WITH THE C ALLOCATOR
const SignatureHandlerCallback = fn (*?[*:0]u8, *?[*]u8, *c_int, [*:0]const u8, ?*anyopaque) nats_c.natsStatus;
const SignatureHandlerCallback = fn (?*?[*:0]u8, ?*?[*]u8, ?*c_int, ?[*:0]const u8, ?*anyopaque) callconv(.C) nats_c.natsStatus;
const SignatureResponseOrError = union(enum) {
pub const SignatureResponseOrError = union(enum) {
signature: []u8,
error_message: [:0]u8,
};
pub fn SignatureHandlerCallbackSignature(comptime T: type) type {
return fn ([:0]const u8, *T) SignatureResponseOrError;
return fn (*T, [:0]const u8) SignatureResponseOrError;
}
pub fn makeSignatureHandlerCallbackThunk(
fn makeSignatureHandlerCallbackThunk(
comptime T: type,
comptime callback: *const SignatureHandlerCallbackSignature(T),
) *const ReconnectDelayCallback {
) *const SignatureHandlerCallback {
return struct {
fn thunk(
err_out: *?[*:0]u8,
sig_out: *?[*]u8,
sig_len_out: *c_int,
nonce: [*:0]const u8,
err_out_raw: ?*?[*:0]u8,
sig_out_raw: ?*?[*]u8,
sig_len_out_raw: ?*c_int,
nonsense: ?[*:0]const u8,
userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus {
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
const nonce = nonsense orelse unreachable;
const err_out = err_out_raw orelse unreachable;
const sig_out = sig_out_raw orelse unreachable;
const sig_len_out = sig_len_out_raw orelse unreachable;
switch (callback(std.mem.sliceTo(nonce, 0), data)) {
switch (callback(data, std.mem.sliceTo(nonce, 0))) {
.signature => |sig| {
sig_out.* = sig.ptr;
sig_len_out.* = sig.len;
sig_len_out.* = @intCast(sig.len);
return nats_c.NATS_OK;
},
.error_message => |msg| {

View File

@ -27,6 +27,8 @@ const sta_ = @import("./statistics.zig");
pub const default_server_url = con_.default_server_url;
pub const Connection = con_.Connection;
pub const ConnectionOptions = con_.ConnectionOptions;
pub const JwtResponseOrError = con_.JwtResponseOrError;
pub const SignatureResponseOrError = con_.SignatureResponseOrError;
pub const Subscription = sub_.Subscription;

View File

@ -5,14 +5,142 @@ const nats = @import("nats");
const util = @import("./util.zig");
test "nats.Connection.connectTo" {
var server = try util.TestServer.launch(.{});
defer server.stop();
{
var server = try util.TestServer.launch(.{});
defer server.stop();
try nats.init(nats.default_spin_count);
defer nats.deinit();
const connection = try nats.Connection.connectTo(nats.default_server_url);
defer connection.destroy();
}
{
var server = try util.TestServer.launch(.{
.auth = .{ .token = "test_token" },
});
defer server.stop();
try nats.init(nats.default_spin_count);
defer nats.deinit();
const connection = try nats.Connection.connectTo("nats://test_token@127.0.0.1:4222");
defer connection.destroy();
}
{
var server = try util.TestServer.launch(.{ .auth = .{
.password = .{ .user = "user", .pass = "password" },
} });
defer server.stop();
try nats.init(nats.default_spin_count);
defer nats.deinit();
const connection = try nats.Connection.connectTo("nats://user:password@127.0.0.1:4222");
defer connection.destroy();
}
}
fn tokenHandler(userdata: *u32) [:0]const u8 {
_ = userdata;
return "token";
}
fn reconnectDelayHandler(userdata: *u32, connection: *nats.Connection, attempts: c_int) i64 {
_ = userdata;
_ = connection;
_ = attempts;
return 0;
}
fn errorHandler(
userdata: *u32,
connection: *nats.Connection,
subscription: *nats.Subscription,
status: nats.Status,
) void {
_ = userdata;
_ = connection;
_ = subscription;
_ = status;
}
fn connectionHandler(userdata: *u32, connection: *nats.Connection) void {
_ = userdata;
_ = connection;
}
fn jwtHandler(userdata: *u32) nats.JwtResponseOrError {
_ = userdata;
// return .{ .jwt = std.heap.raw_c_allocator.dupeZ(u8, "abcdef") catch @panic("no!") };
return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "dang") catch @panic("no!") };
}
fn signatureHandler(userdata: *u32, nonce: [:0]const u8) nats.SignatureResponseOrError {
_ = userdata;
_ = nonce;
// return .{ .signature = std.heap.raw_c_allocator.dupe(u8, "01230123") catch @panic("no!") };
return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "whoops") catch @panic("no!") };
}
test "nats.ConnectionOptions" {
try nats.init(nats.default_spin_count);
defer nats.deinit();
const options = try nats.ConnectionOptions.create();
defer options.destroy();
var userdata: u32 = 0;
try options.setUrl(nats.default_server_url);
const servers = [_][*:0]const u8{ "nats://127.0.0.1:4442", "nats://127.0.0.1:4443" };
try options.setServers(&servers);
try options.setCredentials("user", "password");
try options.setToken("test_token");
// requires a functioning token handler, which I will not write right now. Also
// cannot be called if a token has already been set
// try options.setTokenHandler(u32, tokenHandler, &userdata);
try options.setNoRandomize(false);
try options.setTimeout(1000);
try options.setName("name");
// the following all require a build with openssl
// try options.setSecure(false);
// try options.setCiphers("-ALL:HIGH");
// try options.setCipherSuites("TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256");
// try options.setExpectedHostname("host.name");
// try options.skipServerVerification(true);
try options.setVerbose(true);
try options.setPedantic(true);
try options.setPingInterval(1000);
try options.setMaxPingsOut(100);
try options.setIoBufSize(1024);
try options.setAllowReconnect(false);
try options.setMaxReconnect(10);
try options.setReconnectWait(500);
try options.setReconnectJitter(100, 200);
try options.setCustomReconnectDelay(u32, reconnectDelayHandler, &userdata);
try options.setReconnectBufSize(1024);
try options.setMaxPendingMessages(50);
try options.setErrorHandler(u32, errorHandler, &userdata);
try options.setClosedCallback(u32, connectionHandler, &userdata);
try options.setDisconnectedCallback(u32, connectionHandler, &userdata);
try options.setDiscoveredServersCallback(u32, connectionHandler, &userdata);
try options.setLameDuckModeCallback(u32, connectionHandler, &userdata);
try options.ignoreDiscoveredServers(true);
try options.useGlobalMessageDelivery(false);
try options.ipResolutionOrder(.ipv4_first);
try options.setSendAsap(true);
try options.useOldRequestStyle(false);
try options.setFailRequestsOnDisconnect(true);
try options.setNoEcho(true);
try options.setRetryOnFailedConnect(u32, connectionHandler, true, &userdata);
try options.setUserCredentialsCallbacks(u32, u32, jwtHandler, signatureHandler, &userdata, &userdata);
try options.setWriteDeadline(5);
try options.disableNoResponders(true);
try options.setCustomInboxPrefix("_FOOBOX");
try options.setMessageBufferPadding(123);
}

View File

@ -40,7 +40,7 @@ pub const TestServer = struct {
const auth: [4][]const u8 = switch (options.auth) {
.none => .{""} ** 4,
.token => |tok| .{ "--auth", tok, "", "" },
.password => |auth| .{ "--user", auth.user, "--password", auth.pass },
.password => |auth| .{ "--user", auth.user, "--pass", auth.pass },
};
break :blk executable ++ listen ++ port ++ auth;
@ -64,6 +64,7 @@ pub const TestServer = struct {
}
_ = try child.kill();
std.debug.print("output: {s}\n", .{poller.fifo(.stderr).buf});
return error.NoLaunchStringFound;
}