Compare commits

..

No commits in common. "741af6f976868ead3102f718bfe870d75746a433" and "cba76ae724572fafb6d9c034036a2b33bbec5677" have entirely different histories.

4 changed files with 49 additions and 206 deletions

View File

@ -420,7 +420,7 @@ pub const Connection = opaque {
}
};
pub const ConnectionOptions = opaque {
const ConnectionOptions = opaque {
pub fn create() Error!*ConnectionOptions {
var self: *ConnectionOptions = undefined;
const status = Status.fromInt(nats_c.natsOptions_Create(@ptrCast(&self)));
@ -438,11 +438,11 @@ pub const ConnectionOptions = opaque {
).raise();
}
pub fn setServers(self: *ConnectionOptions, servers: []const [*:0]const u8) Error!void {
pub fn setServers(self: *ConnectionOptions, servers: [][:0]const u8) Error!void {
return Status.fromInt(
nats_c.natsOptions_SetServers(
@ptrCast(self),
@constCast(@ptrCast(servers.ptr)),
servers.ptr,
@intCast(servers.len),
),
).raise();
@ -467,12 +467,12 @@ pub const ConnectionOptions = opaque {
pub fn setTokenHandler(
self: *ConnectionOptions,
comptime T: type,
comptime callback: *const TokenCallbackSignature(T),
userdata: *T,
comptime callback: *const thunk.SimpleCallbackSignature(T),
userdata: T,
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetTokenHandler(
@ptrCast(self),
makeTokenCallbackThunk(T, callback),
thunk.makeSimpleCallbackThunk(callback),
userdata,
)).raise();
}
@ -527,13 +527,13 @@ pub const ConnectionOptions = opaque {
pub fn setCiphers(self: *ConnectionOptions, ciphers: [:0]const u8) Error!void {
return Status.fromInt(
nats_c.natsOptions_SetCiphers(@ptrCast(self), ciphers.ptr),
nats_c.natsOptions_SetSecure(@ptrCast(self), ciphers.ptr),
).raise();
}
pub fn setCipherSuites(self: *ConnectionOptions, ciphers: [:0]const u8) Error!void {
return Status.fromInt(
nats_c.natsOptions_SetCipherSuites(@ptrCast(self), ciphers.ptr),
nats_c.natsOptions_SetSecure(@ptrCast(self), ciphers.ptr),
).raise();
}
@ -607,7 +607,7 @@ pub const ConnectionOptions = opaque {
self: *ConnectionOptions,
comptime T: type,
comptime callback: *const ReconnectDelayCallbackSignature(T),
userdata: *T,
userdata: T,
) Error!void {
return Status.fromInt(
nats_c.natsOptions_SetCustomReconnectDelay(
@ -784,12 +784,10 @@ pub const ConnectionOptions = opaque {
self: *ConnectionOptions,
comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T),
retry: bool,
userdata: *T,
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetRetryOnFailedConnect(
@ptrCast(self),
retry,
makeConnectionCallbackThunk(T, callback),
userdata,
)).raise();
@ -882,31 +880,13 @@ pub const ConnectionOptions = opaque {
}
};
const TokenCallback = fn (?*anyopaque) callconv(.C) [*c]const u8;
pub fn TokenCallbackSignature(comptime T: type) type {
return fn (*T) [:0]const u8;
}
fn makeTokenCallbackThunk(
comptime T: type,
comptime callback: *const TokenCallbackSignature(T),
) *const TokenCallback {
return struct {
fn thunk(userdata: ?*anyopaque) callconv(.C) [*c]const u8 {
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
return callback(data).ptr;
}
}.thunk;
}
const ConnectionCallback = fn (?*nats_c.natsConnection, ?*anyopaque) callconv(.C) void;
pub fn ConnectionCallbackSignature(comptime T: type) type {
return fn (*T, *Connection) void;
return fn (*Connection, *T) void;
}
fn makeConnectionCallbackThunk(
pub fn makeConnectionCallbackThunk(
comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T),
) *const ConnectionCallback {
@ -914,18 +894,18 @@ fn makeConnectionCallbackThunk(
fn thunk(conn: ?*nats_c.natsConnection, userdata: ?*anyopaque) callconv(.C) void {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
callback(data, connection);
callback(connection, data);
}
}.thunk;
}
const ReconnectDelayCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) callconv(.C) i64;
const ReconnectDelayCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) i64;
pub fn ReconnectDelayCallbackSignature(comptime T: type) type {
return fn (*T, *Connection, c_int) i64;
return fn (*Connection, c_int, *T) i64;
}
fn makeReconnectDelayCallbackThunk(
pub fn makeReconnectDelayCallbackThunk(
comptime T: type,
comptime callback: *const ReconnectDelayCallbackSignature(T),
) *const ReconnectDelayCallback {
@ -937,7 +917,7 @@ fn makeReconnectDelayCallbackThunk(
) callconv(.C) i64 {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
return callback(data, connection, attempts);
return callback(connection, attempts, data);
}
}.thunk;
}
@ -947,13 +927,13 @@ const ErrorHandlerCallback = fn (
?*nats_c.natsSubscription,
nats_c.natsStatus,
?*anyopaque,
) callconv(.C) void;
) void;
pub fn ErrorHandlerCallbackSignature(comptime T: type) type {
return fn (*T, *Connection, *Subscription, Status) void;
return fn (*Connection, *Subscription, Status, *T) void;
}
fn makeErrorHandlerCallbackThunk(
pub fn makeErrorHandlerCallbackThunk(
comptime T: type,
comptime callback: *const ErrorHandlerCallbackSignature(T),
) *const ErrorHandlerCallback {
@ -968,7 +948,7 @@ fn makeErrorHandlerCallbackThunk(
const subscription: *Subscription = if (sub) |s| @ptrCast(s) else unreachable;
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
callback(data, connection, subscription, Status.fromInt(status));
callback(connection, subscription, Status.fromInt(status), data);
}
}.thunk;
}
@ -979,13 +959,13 @@ const AttachEventLoopCallback = fn (
?*anyopaque,
?*nats_c.natsConnection,
nats_c.natsSock,
) callconv(.C) nats_c.natsStatus;
) nats_c.natsStatus;
pub fn AttachEventLoopCallbackSignature(comptime T: type, comptime L: type) type {
return fn (*L, *Connection, c_int) anyerror!*T;
}
fn makeAttachEventLoopCallbackThunk(
pub fn makeAttachEventLoopCallbackThunk(
comptime T: type,
comptime L: type,
comptime callback: *const AttachEventLoopCallbackSignature(T, L),
@ -1008,13 +988,13 @@ fn makeAttachEventLoopCallbackThunk(
}.thunk;
}
const EventLoopAddRemoveCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) callconv(.C) nats_c.natsStatus;
const EventLoopAddRemoveCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) nats_c.natsStatus;
pub fn EventLoopAddRemoveCallbackSignature(comptime T: type) type {
return fn (*T, *Connection, c_int) anyerror!void;
return fn (*Connection, c_int, *T) anyerror!void;
}
fn makeEventLoopAddRemoveCallbackThunk(
pub fn makeEventLoopAddRemoveCallbackThunk(
comptime T: type,
comptime callback: *const EventLoopAddRemoveCallbackSignature(T),
) *const ReconnectDelayCallback {
@ -1026,7 +1006,7 @@ fn makeEventLoopAddRemoveCallbackThunk(
) callconv(.C) nats_c.natsStatus {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
callback(data, connection, attempts) catch |err|
callback(connection, attempts, data) catch |err|
return Status.fromError(err).toInt();
return nats_c.NATS_OK;
@ -1034,13 +1014,13 @@ fn makeEventLoopAddRemoveCallbackThunk(
}.thunk;
}
const EventLoopDetachCallback = fn (?*anyopaque) callconv(.C) nats_c.natsStatus;
const EventLoopDetachCallback = fn (?*anyopaque) nats_c.natsStatus;
pub fn EventLoopDetachCallbackSignature(comptime T: type) type {
return fn (*T) anyerror!void;
}
fn makeEventLoopDetachCallbackThunk(
pub fn makeEventLoopDetachCallbackThunk(
comptime T: type,
comptime callback: *const EventLoopDetachCallbackSignature(T),
) *const ReconnectDelayCallback {
@ -1057,9 +1037,9 @@ fn makeEventLoopDetachCallbackThunk(
// THE NATS LIBRARY WILL TRY TO FREE THE TOKEN AND ALSO THE ERROR MESSAGE, SO THEY MUST
// BE ALLOCATED WITH THE C ALLOCATOR
const JwtHandlerCallback = fn (?*?[*:0]u8, ?*?[*:0]u8, ?*anyopaque) callconv(.C) nats_c.natsStatus;
const JwtHandlerCallback = fn (*?[*:0]u8, *?[*:0]u8, ?*anyopaque) nats_c.natsStatus;
pub const JwtResponseOrError = union(enum) {
const JwtResponseOrError = union(enum) {
jwt: [:0]u8,
error_message: [:0]u8,
};
@ -1068,19 +1048,17 @@ pub fn JwtHandlerCallbackSignature(comptime T: type) type {
return fn (*T) JwtResponseOrError;
}
fn makeJwtHandlerCallbackThunk(
pub fn makeJwtHandlerCallbackThunk(
comptime T: type,
comptime callback: *const JwtHandlerCallbackSignature(T),
) *const JwtHandlerCallback {
) *const ReconnectDelayCallback {
return struct {
fn thunk(
jwt_out_raw: ?*?[*:0]u8,
err_out_raw: ?*?[*:0]u8,
jwt_out: *?[*:0]u8,
err_out: *?[*:0]u8,
userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus {
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
const err_out = err_out_raw orelse unreachable;
const jwt_out = jwt_out_raw orelse unreachable;
switch (callback(data)) {
.jwt => |jwt| {
@ -1098,39 +1076,35 @@ fn makeJwtHandlerCallbackThunk(
// THE NATS LIBRARY WILL TRY TO FREE THE SIGNATURE AND ALSO THE ERROR MESSAGE, SO THEY MUST
// BE ALLOCATED WITH THE C ALLOCATOR
const SignatureHandlerCallback = fn (?*?[*:0]u8, ?*?[*]u8, ?*c_int, ?[*:0]const u8, ?*anyopaque) callconv(.C) nats_c.natsStatus;
const SignatureHandlerCallback = fn (*?[*:0]u8, *?[*]u8, *c_int, [*:0]const u8, ?*anyopaque) nats_c.natsStatus;
pub const SignatureResponseOrError = union(enum) {
const SignatureResponseOrError = union(enum) {
signature: []u8,
error_message: [:0]u8,
};
pub fn SignatureHandlerCallbackSignature(comptime T: type) type {
return fn (*T, [:0]const u8) SignatureResponseOrError;
return fn ([:0]const u8, *T) SignatureResponseOrError;
}
fn makeSignatureHandlerCallbackThunk(
pub fn makeSignatureHandlerCallbackThunk(
comptime T: type,
comptime callback: *const SignatureHandlerCallbackSignature(T),
) *const SignatureHandlerCallback {
) *const ReconnectDelayCallback {
return struct {
fn thunk(
err_out_raw: ?*?[*:0]u8,
sig_out_raw: ?*?[*]u8,
sig_len_out_raw: ?*c_int,
nonsense: ?[*:0]const u8,
err_out: *?[*:0]u8,
sig_out: *?[*]u8,
sig_len_out: *c_int,
nonce: [*:0]const u8,
userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus {
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
const nonce = nonsense orelse unreachable;
const err_out = err_out_raw orelse unreachable;
const sig_out = sig_out_raw orelse unreachable;
const sig_len_out = sig_len_out_raw orelse unreachable;
switch (callback(data, std.mem.sliceTo(nonce, 0))) {
switch (callback(std.mem.sliceTo(nonce, 0), data)) {
.signature => |sig| {
sig_out.* = sig.ptr;
sig_len_out.* = @intCast(sig.len);
sig_len_out.* = sig.len;
return nats_c.NATS_OK;
},
.error_message => |msg| {

View File

@ -27,8 +27,6 @@ const sta_ = @import("./statistics.zig");
pub const default_server_url = con_.default_server_url;
pub const Connection = con_.Connection;
pub const ConnectionOptions = con_.ConnectionOptions;
pub const JwtResponseOrError = con_.JwtResponseOrError;
pub const SignatureResponseOrError = con_.SignatureResponseOrError;
pub const Subscription = sub_.Subscription;

View File

@ -5,142 +5,14 @@ const nats = @import("nats");
const util = @import("./util.zig");
test "nats.Connection.connectTo" {
{
var server = try util.TestServer.launch(.{});
defer server.stop();
var server = try util.TestServer.launch(.{});
defer server.stop();
{
try nats.init(nats.default_spin_count);
defer nats.deinit();
const connection = try nats.Connection.connectTo(nats.default_server_url);
defer connection.destroy();
}
{
var server = try util.TestServer.launch(.{
.auth = .{ .token = "test_token" },
});
defer server.stop();
try nats.init(nats.default_spin_count);
defer nats.deinit();
const connection = try nats.Connection.connectTo("nats://test_token@127.0.0.1:4222");
defer connection.destroy();
}
{
var server = try util.TestServer.launch(.{ .auth = .{
.password = .{ .user = "user", .pass = "password" },
} });
defer server.stop();
try nats.init(nats.default_spin_count);
defer nats.deinit();
const connection = try nats.Connection.connectTo("nats://user:password@127.0.0.1:4222");
defer connection.destroy();
}
}
fn tokenHandler(userdata: *u32) [:0]const u8 {
_ = userdata;
return "token";
}
fn reconnectDelayHandler(userdata: *u32, connection: *nats.Connection, attempts: c_int) i64 {
_ = userdata;
_ = connection;
_ = attempts;
return 0;
}
fn errorHandler(
userdata: *u32,
connection: *nats.Connection,
subscription: *nats.Subscription,
status: nats.Status,
) void {
_ = userdata;
_ = connection;
_ = subscription;
_ = status;
}
fn connectionHandler(userdata: *u32, connection: *nats.Connection) void {
_ = userdata;
_ = connection;
}
fn jwtHandler(userdata: *u32) nats.JwtResponseOrError {
_ = userdata;
// return .{ .jwt = std.heap.raw_c_allocator.dupeZ(u8, "abcdef") catch @panic("no!") };
return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "dang") catch @panic("no!") };
}
fn signatureHandler(userdata: *u32, nonce: [:0]const u8) nats.SignatureResponseOrError {
_ = userdata;
_ = nonce;
// return .{ .signature = std.heap.raw_c_allocator.dupe(u8, "01230123") catch @panic("no!") };
return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "whoops") catch @panic("no!") };
}
test "nats.ConnectionOptions" {
try nats.init(nats.default_spin_count);
defer nats.deinit();
const options = try nats.ConnectionOptions.create();
defer options.destroy();
var userdata: u32 = 0;
try options.setUrl(nats.default_server_url);
const servers = [_][*:0]const u8{ "nats://127.0.0.1:4442", "nats://127.0.0.1:4443" };
try options.setServers(&servers);
try options.setCredentials("user", "password");
try options.setToken("test_token");
// requires a functioning token handler, which I will not write right now. Also
// cannot be called if a token has already been set
// try options.setTokenHandler(u32, tokenHandler, &userdata);
try options.setNoRandomize(false);
try options.setTimeout(1000);
try options.setName("name");
// the following all require a build with openssl
// try options.setSecure(false);
// try options.setCiphers("-ALL:HIGH");
// try options.setCipherSuites("TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256");
// try options.setExpectedHostname("host.name");
// try options.skipServerVerification(true);
try options.setVerbose(true);
try options.setPedantic(true);
try options.setPingInterval(1000);
try options.setMaxPingsOut(100);
try options.setIoBufSize(1024);
try options.setAllowReconnect(false);
try options.setMaxReconnect(10);
try options.setReconnectWait(500);
try options.setReconnectJitter(100, 200);
try options.setCustomReconnectDelay(u32, reconnectDelayHandler, &userdata);
try options.setReconnectBufSize(1024);
try options.setMaxPendingMessages(50);
try options.setErrorHandler(u32, errorHandler, &userdata);
try options.setClosedCallback(u32, connectionHandler, &userdata);
try options.setDisconnectedCallback(u32, connectionHandler, &userdata);
try options.setDiscoveredServersCallback(u32, connectionHandler, &userdata);
try options.setLameDuckModeCallback(u32, connectionHandler, &userdata);
try options.ignoreDiscoveredServers(true);
try options.useGlobalMessageDelivery(false);
try options.ipResolutionOrder(.ipv4_first);
try options.setSendAsap(true);
try options.useOldRequestStyle(false);
try options.setFailRequestsOnDisconnect(true);
try options.setNoEcho(true);
try options.setRetryOnFailedConnect(u32, connectionHandler, true, &userdata);
try options.setUserCredentialsCallbacks(u32, u32, jwtHandler, signatureHandler, &userdata, &userdata);
try options.setWriteDeadline(5);
try options.disableNoResponders(true);
try options.setCustomInboxPrefix("_FOOBOX");
try options.setMessageBufferPadding(123);
}

View File

@ -40,7 +40,7 @@ pub const TestServer = struct {
const auth: [4][]const u8 = switch (options.auth) {
.none => .{""} ** 4,
.token => |tok| .{ "--auth", tok, "", "" },
.password => |auth| .{ "--user", auth.user, "--pass", auth.pass },
.password => |auth| .{ "--user", auth.user, "--password", auth.pass },
};
break :blk executable ++ listen ++ port ++ auth;
@ -64,7 +64,6 @@ pub const TestServer = struct {
}
_ = try child.kill();
std.debug.print("output: {s}\n", .{poller.fifo(.stderr).buf});
return error.NoLaunchStringFound;
}