Compare commits
3 Commits
cba76ae724
...
741af6f976
Author | SHA1 | Date | |
---|---|---|---|
741af6f976 | |||
142021bfd3 | |||
ee803168a3 |
@ -420,7 +420,7 @@ pub const Connection = opaque {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
const ConnectionOptions = opaque {
|
pub const ConnectionOptions = opaque {
|
||||||
pub fn create() Error!*ConnectionOptions {
|
pub fn create() Error!*ConnectionOptions {
|
||||||
var self: *ConnectionOptions = undefined;
|
var self: *ConnectionOptions = undefined;
|
||||||
const status = Status.fromInt(nats_c.natsOptions_Create(@ptrCast(&self)));
|
const status = Status.fromInt(nats_c.natsOptions_Create(@ptrCast(&self)));
|
||||||
@ -438,11 +438,11 @@ const ConnectionOptions = opaque {
|
|||||||
).raise();
|
).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn setServers(self: *ConnectionOptions, servers: [][:0]const u8) Error!void {
|
pub fn setServers(self: *ConnectionOptions, servers: []const [*:0]const u8) Error!void {
|
||||||
return Status.fromInt(
|
return Status.fromInt(
|
||||||
nats_c.natsOptions_SetServers(
|
nats_c.natsOptions_SetServers(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
servers.ptr,
|
@constCast(@ptrCast(servers.ptr)),
|
||||||
@intCast(servers.len),
|
@intCast(servers.len),
|
||||||
),
|
),
|
||||||
).raise();
|
).raise();
|
||||||
@ -467,12 +467,12 @@ const ConnectionOptions = opaque {
|
|||||||
pub fn setTokenHandler(
|
pub fn setTokenHandler(
|
||||||
self: *ConnectionOptions,
|
self: *ConnectionOptions,
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const thunk.SimpleCallbackSignature(T),
|
comptime callback: *const TokenCallbackSignature(T),
|
||||||
userdata: T,
|
userdata: *T,
|
||||||
) Error!void {
|
) Error!void {
|
||||||
return Status.fromInt(nats_c.natsOptions_SetTokenHandler(
|
return Status.fromInt(nats_c.natsOptions_SetTokenHandler(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
thunk.makeSimpleCallbackThunk(callback),
|
makeTokenCallbackThunk(T, callback),
|
||||||
userdata,
|
userdata,
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
@ -527,13 +527,13 @@ const ConnectionOptions = opaque {
|
|||||||
|
|
||||||
pub fn setCiphers(self: *ConnectionOptions, ciphers: [:0]const u8) Error!void {
|
pub fn setCiphers(self: *ConnectionOptions, ciphers: [:0]const u8) Error!void {
|
||||||
return Status.fromInt(
|
return Status.fromInt(
|
||||||
nats_c.natsOptions_SetSecure(@ptrCast(self), ciphers.ptr),
|
nats_c.natsOptions_SetCiphers(@ptrCast(self), ciphers.ptr),
|
||||||
).raise();
|
).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn setCipherSuites(self: *ConnectionOptions, ciphers: [:0]const u8) Error!void {
|
pub fn setCipherSuites(self: *ConnectionOptions, ciphers: [:0]const u8) Error!void {
|
||||||
return Status.fromInt(
|
return Status.fromInt(
|
||||||
nats_c.natsOptions_SetSecure(@ptrCast(self), ciphers.ptr),
|
nats_c.natsOptions_SetCipherSuites(@ptrCast(self), ciphers.ptr),
|
||||||
).raise();
|
).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -607,7 +607,7 @@ const ConnectionOptions = opaque {
|
|||||||
self: *ConnectionOptions,
|
self: *ConnectionOptions,
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const ReconnectDelayCallbackSignature(T),
|
comptime callback: *const ReconnectDelayCallbackSignature(T),
|
||||||
userdata: T,
|
userdata: *T,
|
||||||
) Error!void {
|
) Error!void {
|
||||||
return Status.fromInt(
|
return Status.fromInt(
|
||||||
nats_c.natsOptions_SetCustomReconnectDelay(
|
nats_c.natsOptions_SetCustomReconnectDelay(
|
||||||
@ -784,10 +784,12 @@ const ConnectionOptions = opaque {
|
|||||||
self: *ConnectionOptions,
|
self: *ConnectionOptions,
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const ConnectionCallbackSignature(T),
|
comptime callback: *const ConnectionCallbackSignature(T),
|
||||||
|
retry: bool,
|
||||||
userdata: *T,
|
userdata: *T,
|
||||||
) Error!void {
|
) Error!void {
|
||||||
return Status.fromInt(nats_c.natsOptions_SetRetryOnFailedConnect(
|
return Status.fromInt(nats_c.natsOptions_SetRetryOnFailedConnect(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
|
retry,
|
||||||
makeConnectionCallbackThunk(T, callback),
|
makeConnectionCallbackThunk(T, callback),
|
||||||
userdata,
|
userdata,
|
||||||
)).raise();
|
)).raise();
|
||||||
@ -880,13 +882,31 @@ const ConnectionOptions = opaque {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const TokenCallback = fn (?*anyopaque) callconv(.C) [*c]const u8;
|
||||||
|
|
||||||
|
pub fn TokenCallbackSignature(comptime T: type) type {
|
||||||
|
return fn (*T) [:0]const u8;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn makeTokenCallbackThunk(
|
||||||
|
comptime T: type,
|
||||||
|
comptime callback: *const TokenCallbackSignature(T),
|
||||||
|
) *const TokenCallback {
|
||||||
|
return struct {
|
||||||
|
fn thunk(userdata: ?*anyopaque) callconv(.C) [*c]const u8 {
|
||||||
|
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
||||||
|
return callback(data).ptr;
|
||||||
|
}
|
||||||
|
}.thunk;
|
||||||
|
}
|
||||||
|
|
||||||
const ConnectionCallback = fn (?*nats_c.natsConnection, ?*anyopaque) callconv(.C) void;
|
const ConnectionCallback = fn (?*nats_c.natsConnection, ?*anyopaque) callconv(.C) void;
|
||||||
|
|
||||||
pub fn ConnectionCallbackSignature(comptime T: type) type {
|
pub fn ConnectionCallbackSignature(comptime T: type) type {
|
||||||
return fn (*Connection, *T) void;
|
return fn (*T, *Connection) void;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn makeConnectionCallbackThunk(
|
fn makeConnectionCallbackThunk(
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const ConnectionCallbackSignature(T),
|
comptime callback: *const ConnectionCallbackSignature(T),
|
||||||
) *const ConnectionCallback {
|
) *const ConnectionCallback {
|
||||||
@ -894,18 +914,18 @@ pub fn makeConnectionCallbackThunk(
|
|||||||
fn thunk(conn: ?*nats_c.natsConnection, userdata: ?*anyopaque) callconv(.C) void {
|
fn thunk(conn: ?*nats_c.natsConnection, userdata: ?*anyopaque) callconv(.C) void {
|
||||||
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
|
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
|
||||||
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
||||||
callback(connection, data);
|
callback(data, connection);
|
||||||
}
|
}
|
||||||
}.thunk;
|
}.thunk;
|
||||||
}
|
}
|
||||||
|
|
||||||
const ReconnectDelayCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) i64;
|
const ReconnectDelayCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) callconv(.C) i64;
|
||||||
|
|
||||||
pub fn ReconnectDelayCallbackSignature(comptime T: type) type {
|
pub fn ReconnectDelayCallbackSignature(comptime T: type) type {
|
||||||
return fn (*Connection, c_int, *T) i64;
|
return fn (*T, *Connection, c_int) i64;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn makeReconnectDelayCallbackThunk(
|
fn makeReconnectDelayCallbackThunk(
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const ReconnectDelayCallbackSignature(T),
|
comptime callback: *const ReconnectDelayCallbackSignature(T),
|
||||||
) *const ReconnectDelayCallback {
|
) *const ReconnectDelayCallback {
|
||||||
@ -917,7 +937,7 @@ pub fn makeReconnectDelayCallbackThunk(
|
|||||||
) callconv(.C) i64 {
|
) callconv(.C) i64 {
|
||||||
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
|
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
|
||||||
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
||||||
return callback(connection, attempts, data);
|
return callback(data, connection, attempts);
|
||||||
}
|
}
|
||||||
}.thunk;
|
}.thunk;
|
||||||
}
|
}
|
||||||
@ -927,13 +947,13 @@ const ErrorHandlerCallback = fn (
|
|||||||
?*nats_c.natsSubscription,
|
?*nats_c.natsSubscription,
|
||||||
nats_c.natsStatus,
|
nats_c.natsStatus,
|
||||||
?*anyopaque,
|
?*anyopaque,
|
||||||
) void;
|
) callconv(.C) void;
|
||||||
|
|
||||||
pub fn ErrorHandlerCallbackSignature(comptime T: type) type {
|
pub fn ErrorHandlerCallbackSignature(comptime T: type) type {
|
||||||
return fn (*Connection, *Subscription, Status, *T) void;
|
return fn (*T, *Connection, *Subscription, Status) void;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn makeErrorHandlerCallbackThunk(
|
fn makeErrorHandlerCallbackThunk(
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const ErrorHandlerCallbackSignature(T),
|
comptime callback: *const ErrorHandlerCallbackSignature(T),
|
||||||
) *const ErrorHandlerCallback {
|
) *const ErrorHandlerCallback {
|
||||||
@ -948,7 +968,7 @@ pub fn makeErrorHandlerCallbackThunk(
|
|||||||
const subscription: *Subscription = if (sub) |s| @ptrCast(s) else unreachable;
|
const subscription: *Subscription = if (sub) |s| @ptrCast(s) else unreachable;
|
||||||
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
||||||
|
|
||||||
callback(connection, subscription, Status.fromInt(status), data);
|
callback(data, connection, subscription, Status.fromInt(status));
|
||||||
}
|
}
|
||||||
}.thunk;
|
}.thunk;
|
||||||
}
|
}
|
||||||
@ -959,13 +979,13 @@ const AttachEventLoopCallback = fn (
|
|||||||
?*anyopaque,
|
?*anyopaque,
|
||||||
?*nats_c.natsConnection,
|
?*nats_c.natsConnection,
|
||||||
nats_c.natsSock,
|
nats_c.natsSock,
|
||||||
) nats_c.natsStatus;
|
) callconv(.C) nats_c.natsStatus;
|
||||||
|
|
||||||
pub fn AttachEventLoopCallbackSignature(comptime T: type, comptime L: type) type {
|
pub fn AttachEventLoopCallbackSignature(comptime T: type, comptime L: type) type {
|
||||||
return fn (*L, *Connection, c_int) anyerror!*T;
|
return fn (*L, *Connection, c_int) anyerror!*T;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn makeAttachEventLoopCallbackThunk(
|
fn makeAttachEventLoopCallbackThunk(
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime L: type,
|
comptime L: type,
|
||||||
comptime callback: *const AttachEventLoopCallbackSignature(T, L),
|
comptime callback: *const AttachEventLoopCallbackSignature(T, L),
|
||||||
@ -988,13 +1008,13 @@ pub fn makeAttachEventLoopCallbackThunk(
|
|||||||
}.thunk;
|
}.thunk;
|
||||||
}
|
}
|
||||||
|
|
||||||
const EventLoopAddRemoveCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) nats_c.natsStatus;
|
const EventLoopAddRemoveCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) callconv(.C) nats_c.natsStatus;
|
||||||
|
|
||||||
pub fn EventLoopAddRemoveCallbackSignature(comptime T: type) type {
|
pub fn EventLoopAddRemoveCallbackSignature(comptime T: type) type {
|
||||||
return fn (*Connection, c_int, *T) anyerror!void;
|
return fn (*T, *Connection, c_int) anyerror!void;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn makeEventLoopAddRemoveCallbackThunk(
|
fn makeEventLoopAddRemoveCallbackThunk(
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const EventLoopAddRemoveCallbackSignature(T),
|
comptime callback: *const EventLoopAddRemoveCallbackSignature(T),
|
||||||
) *const ReconnectDelayCallback {
|
) *const ReconnectDelayCallback {
|
||||||
@ -1006,7 +1026,7 @@ pub fn makeEventLoopAddRemoveCallbackThunk(
|
|||||||
) callconv(.C) nats_c.natsStatus {
|
) callconv(.C) nats_c.natsStatus {
|
||||||
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
|
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
|
||||||
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
||||||
callback(connection, attempts, data) catch |err|
|
callback(data, connection, attempts) catch |err|
|
||||||
return Status.fromError(err).toInt();
|
return Status.fromError(err).toInt();
|
||||||
|
|
||||||
return nats_c.NATS_OK;
|
return nats_c.NATS_OK;
|
||||||
@ -1014,13 +1034,13 @@ pub fn makeEventLoopAddRemoveCallbackThunk(
|
|||||||
}.thunk;
|
}.thunk;
|
||||||
}
|
}
|
||||||
|
|
||||||
const EventLoopDetachCallback = fn (?*anyopaque) nats_c.natsStatus;
|
const EventLoopDetachCallback = fn (?*anyopaque) callconv(.C) nats_c.natsStatus;
|
||||||
|
|
||||||
pub fn EventLoopDetachCallbackSignature(comptime T: type) type {
|
pub fn EventLoopDetachCallbackSignature(comptime T: type) type {
|
||||||
return fn (*T) anyerror!void;
|
return fn (*T) anyerror!void;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn makeEventLoopDetachCallbackThunk(
|
fn makeEventLoopDetachCallbackThunk(
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const EventLoopDetachCallbackSignature(T),
|
comptime callback: *const EventLoopDetachCallbackSignature(T),
|
||||||
) *const ReconnectDelayCallback {
|
) *const ReconnectDelayCallback {
|
||||||
@ -1037,9 +1057,9 @@ pub fn makeEventLoopDetachCallbackThunk(
|
|||||||
|
|
||||||
// THE NATS LIBRARY WILL TRY TO FREE THE TOKEN AND ALSO THE ERROR MESSAGE, SO THEY MUST
|
// THE NATS LIBRARY WILL TRY TO FREE THE TOKEN AND ALSO THE ERROR MESSAGE, SO THEY MUST
|
||||||
// BE ALLOCATED WITH THE C ALLOCATOR
|
// BE ALLOCATED WITH THE C ALLOCATOR
|
||||||
const JwtHandlerCallback = fn (*?[*:0]u8, *?[*:0]u8, ?*anyopaque) nats_c.natsStatus;
|
const JwtHandlerCallback = fn (?*?[*:0]u8, ?*?[*:0]u8, ?*anyopaque) callconv(.C) nats_c.natsStatus;
|
||||||
|
|
||||||
const JwtResponseOrError = union(enum) {
|
pub const JwtResponseOrError = union(enum) {
|
||||||
jwt: [:0]u8,
|
jwt: [:0]u8,
|
||||||
error_message: [:0]u8,
|
error_message: [:0]u8,
|
||||||
};
|
};
|
||||||
@ -1048,17 +1068,19 @@ pub fn JwtHandlerCallbackSignature(comptime T: type) type {
|
|||||||
return fn (*T) JwtResponseOrError;
|
return fn (*T) JwtResponseOrError;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn makeJwtHandlerCallbackThunk(
|
fn makeJwtHandlerCallbackThunk(
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const JwtHandlerCallbackSignature(T),
|
comptime callback: *const JwtHandlerCallbackSignature(T),
|
||||||
) *const ReconnectDelayCallback {
|
) *const JwtHandlerCallback {
|
||||||
return struct {
|
return struct {
|
||||||
fn thunk(
|
fn thunk(
|
||||||
jwt_out: *?[*:0]u8,
|
jwt_out_raw: ?*?[*:0]u8,
|
||||||
err_out: *?[*:0]u8,
|
err_out_raw: ?*?[*:0]u8,
|
||||||
userdata: ?*anyopaque,
|
userdata: ?*anyopaque,
|
||||||
) callconv(.C) nats_c.natsStatus {
|
) callconv(.C) nats_c.natsStatus {
|
||||||
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
||||||
|
const err_out = err_out_raw orelse unreachable;
|
||||||
|
const jwt_out = jwt_out_raw orelse unreachable;
|
||||||
|
|
||||||
switch (callback(data)) {
|
switch (callback(data)) {
|
||||||
.jwt => |jwt| {
|
.jwt => |jwt| {
|
||||||
@ -1076,35 +1098,39 @@ pub fn makeJwtHandlerCallbackThunk(
|
|||||||
|
|
||||||
// THE NATS LIBRARY WILL TRY TO FREE THE SIGNATURE AND ALSO THE ERROR MESSAGE, SO THEY MUST
|
// THE NATS LIBRARY WILL TRY TO FREE THE SIGNATURE AND ALSO THE ERROR MESSAGE, SO THEY MUST
|
||||||
// BE ALLOCATED WITH THE C ALLOCATOR
|
// BE ALLOCATED WITH THE C ALLOCATOR
|
||||||
const SignatureHandlerCallback = fn (*?[*:0]u8, *?[*]u8, *c_int, [*:0]const u8, ?*anyopaque) nats_c.natsStatus;
|
const SignatureHandlerCallback = fn (?*?[*:0]u8, ?*?[*]u8, ?*c_int, ?[*:0]const u8, ?*anyopaque) callconv(.C) nats_c.natsStatus;
|
||||||
|
|
||||||
const SignatureResponseOrError = union(enum) {
|
pub const SignatureResponseOrError = union(enum) {
|
||||||
signature: []u8,
|
signature: []u8,
|
||||||
error_message: [:0]u8,
|
error_message: [:0]u8,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub fn SignatureHandlerCallbackSignature(comptime T: type) type {
|
pub fn SignatureHandlerCallbackSignature(comptime T: type) type {
|
||||||
return fn ([:0]const u8, *T) SignatureResponseOrError;
|
return fn (*T, [:0]const u8) SignatureResponseOrError;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn makeSignatureHandlerCallbackThunk(
|
fn makeSignatureHandlerCallbackThunk(
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const SignatureHandlerCallbackSignature(T),
|
comptime callback: *const SignatureHandlerCallbackSignature(T),
|
||||||
) *const ReconnectDelayCallback {
|
) *const SignatureHandlerCallback {
|
||||||
return struct {
|
return struct {
|
||||||
fn thunk(
|
fn thunk(
|
||||||
err_out: *?[*:0]u8,
|
err_out_raw: ?*?[*:0]u8,
|
||||||
sig_out: *?[*]u8,
|
sig_out_raw: ?*?[*]u8,
|
||||||
sig_len_out: *c_int,
|
sig_len_out_raw: ?*c_int,
|
||||||
nonce: [*:0]const u8,
|
nonsense: ?[*:0]const u8,
|
||||||
userdata: ?*anyopaque,
|
userdata: ?*anyopaque,
|
||||||
) callconv(.C) nats_c.natsStatus {
|
) callconv(.C) nats_c.natsStatus {
|
||||||
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
||||||
|
const nonce = nonsense orelse unreachable;
|
||||||
|
const err_out = err_out_raw orelse unreachable;
|
||||||
|
const sig_out = sig_out_raw orelse unreachable;
|
||||||
|
const sig_len_out = sig_len_out_raw orelse unreachable;
|
||||||
|
|
||||||
switch (callback(std.mem.sliceTo(nonce, 0), data)) {
|
switch (callback(data, std.mem.sliceTo(nonce, 0))) {
|
||||||
.signature => |sig| {
|
.signature => |sig| {
|
||||||
sig_out.* = sig.ptr;
|
sig_out.* = sig.ptr;
|
||||||
sig_len_out.* = sig.len;
|
sig_len_out.* = @intCast(sig.len);
|
||||||
return nats_c.NATS_OK;
|
return nats_c.NATS_OK;
|
||||||
},
|
},
|
||||||
.error_message => |msg| {
|
.error_message => |msg| {
|
||||||
|
@ -27,6 +27,8 @@ const sta_ = @import("./statistics.zig");
|
|||||||
pub const default_server_url = con_.default_server_url;
|
pub const default_server_url = con_.default_server_url;
|
||||||
pub const Connection = con_.Connection;
|
pub const Connection = con_.Connection;
|
||||||
pub const ConnectionOptions = con_.ConnectionOptions;
|
pub const ConnectionOptions = con_.ConnectionOptions;
|
||||||
|
pub const JwtResponseOrError = con_.JwtResponseOrError;
|
||||||
|
pub const SignatureResponseOrError = con_.SignatureResponseOrError;
|
||||||
|
|
||||||
pub const Subscription = sub_.Subscription;
|
pub const Subscription = sub_.Subscription;
|
||||||
|
|
||||||
|
@ -5,14 +5,142 @@ const nats = @import("nats");
|
|||||||
const util = @import("./util.zig");
|
const util = @import("./util.zig");
|
||||||
|
|
||||||
test "nats.Connection.connectTo" {
|
test "nats.Connection.connectTo" {
|
||||||
|
{
|
||||||
var server = try util.TestServer.launch(.{});
|
var server = try util.TestServer.launch(.{});
|
||||||
defer server.stop();
|
defer server.stop();
|
||||||
|
|
||||||
{
|
|
||||||
try nats.init(nats.default_spin_count);
|
try nats.init(nats.default_spin_count);
|
||||||
defer nats.deinit();
|
defer nats.deinit();
|
||||||
|
|
||||||
const connection = try nats.Connection.connectTo(nats.default_server_url);
|
const connection = try nats.Connection.connectTo(nats.default_server_url);
|
||||||
defer connection.destroy();
|
defer connection.destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
var server = try util.TestServer.launch(.{
|
||||||
|
.auth = .{ .token = "test_token" },
|
||||||
|
});
|
||||||
|
defer server.stop();
|
||||||
|
|
||||||
|
try nats.init(nats.default_spin_count);
|
||||||
|
defer nats.deinit();
|
||||||
|
|
||||||
|
const connection = try nats.Connection.connectTo("nats://test_token@127.0.0.1:4222");
|
||||||
|
defer connection.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
var server = try util.TestServer.launch(.{ .auth = .{
|
||||||
|
.password = .{ .user = "user", .pass = "password" },
|
||||||
|
} });
|
||||||
|
defer server.stop();
|
||||||
|
|
||||||
|
try nats.init(nats.default_spin_count);
|
||||||
|
defer nats.deinit();
|
||||||
|
|
||||||
|
const connection = try nats.Connection.connectTo("nats://user:password@127.0.0.1:4222");
|
||||||
|
defer connection.destroy();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn tokenHandler(userdata: *u32) [:0]const u8 {
|
||||||
|
_ = userdata;
|
||||||
|
return "token";
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reconnectDelayHandler(userdata: *u32, connection: *nats.Connection, attempts: c_int) i64 {
|
||||||
|
_ = userdata;
|
||||||
|
_ = connection;
|
||||||
|
_ = attempts;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn errorHandler(
|
||||||
|
userdata: *u32,
|
||||||
|
connection: *nats.Connection,
|
||||||
|
subscription: *nats.Subscription,
|
||||||
|
status: nats.Status,
|
||||||
|
) void {
|
||||||
|
_ = userdata;
|
||||||
|
_ = connection;
|
||||||
|
_ = subscription;
|
||||||
|
_ = status;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn connectionHandler(userdata: *u32, connection: *nats.Connection) void {
|
||||||
|
_ = userdata;
|
||||||
|
_ = connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn jwtHandler(userdata: *u32) nats.JwtResponseOrError {
|
||||||
|
_ = userdata;
|
||||||
|
// return .{ .jwt = std.heap.raw_c_allocator.dupeZ(u8, "abcdef") catch @panic("no!") };
|
||||||
|
return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "dang") catch @panic("no!") };
|
||||||
|
}
|
||||||
|
|
||||||
|
fn signatureHandler(userdata: *u32, nonce: [:0]const u8) nats.SignatureResponseOrError {
|
||||||
|
_ = userdata;
|
||||||
|
_ = nonce;
|
||||||
|
// return .{ .signature = std.heap.raw_c_allocator.dupe(u8, "01230123") catch @panic("no!") };
|
||||||
|
return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "whoops") catch @panic("no!") };
|
||||||
|
}
|
||||||
|
|
||||||
|
test "nats.ConnectionOptions" {
|
||||||
|
try nats.init(nats.default_spin_count);
|
||||||
|
defer nats.deinit();
|
||||||
|
|
||||||
|
const options = try nats.ConnectionOptions.create();
|
||||||
|
defer options.destroy();
|
||||||
|
|
||||||
|
var userdata: u32 = 0;
|
||||||
|
|
||||||
|
try options.setUrl(nats.default_server_url);
|
||||||
|
const servers = [_][*:0]const u8{ "nats://127.0.0.1:4442", "nats://127.0.0.1:4443" };
|
||||||
|
try options.setServers(&servers);
|
||||||
|
try options.setCredentials("user", "password");
|
||||||
|
try options.setToken("test_token");
|
||||||
|
// requires a functioning token handler, which I will not write right now. Also
|
||||||
|
// cannot be called if a token has already been set
|
||||||
|
// try options.setTokenHandler(u32, tokenHandler, &userdata);
|
||||||
|
try options.setNoRandomize(false);
|
||||||
|
try options.setTimeout(1000);
|
||||||
|
try options.setName("name");
|
||||||
|
|
||||||
|
// the following all require a build with openssl
|
||||||
|
// try options.setSecure(false);
|
||||||
|
// try options.setCiphers("-ALL:HIGH");
|
||||||
|
// try options.setCipherSuites("TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256");
|
||||||
|
// try options.setExpectedHostname("host.name");
|
||||||
|
// try options.skipServerVerification(true);
|
||||||
|
try options.setVerbose(true);
|
||||||
|
try options.setPedantic(true);
|
||||||
|
try options.setPingInterval(1000);
|
||||||
|
try options.setMaxPingsOut(100);
|
||||||
|
try options.setIoBufSize(1024);
|
||||||
|
try options.setAllowReconnect(false);
|
||||||
|
try options.setMaxReconnect(10);
|
||||||
|
try options.setReconnectWait(500);
|
||||||
|
try options.setReconnectJitter(100, 200);
|
||||||
|
try options.setCustomReconnectDelay(u32, reconnectDelayHandler, &userdata);
|
||||||
|
try options.setReconnectBufSize(1024);
|
||||||
|
try options.setMaxPendingMessages(50);
|
||||||
|
try options.setErrorHandler(u32, errorHandler, &userdata);
|
||||||
|
try options.setClosedCallback(u32, connectionHandler, &userdata);
|
||||||
|
try options.setDisconnectedCallback(u32, connectionHandler, &userdata);
|
||||||
|
try options.setDiscoveredServersCallback(u32, connectionHandler, &userdata);
|
||||||
|
try options.setLameDuckModeCallback(u32, connectionHandler, &userdata);
|
||||||
|
try options.ignoreDiscoveredServers(true);
|
||||||
|
try options.useGlobalMessageDelivery(false);
|
||||||
|
try options.ipResolutionOrder(.ipv4_first);
|
||||||
|
try options.setSendAsap(true);
|
||||||
|
try options.useOldRequestStyle(false);
|
||||||
|
try options.setFailRequestsOnDisconnect(true);
|
||||||
|
try options.setNoEcho(true);
|
||||||
|
try options.setRetryOnFailedConnect(u32, connectionHandler, true, &userdata);
|
||||||
|
try options.setUserCredentialsCallbacks(u32, u32, jwtHandler, signatureHandler, &userdata, &userdata);
|
||||||
|
try options.setWriteDeadline(5);
|
||||||
|
try options.disableNoResponders(true);
|
||||||
|
try options.setCustomInboxPrefix("_FOOBOX");
|
||||||
|
try options.setMessageBufferPadding(123);
|
||||||
}
|
}
|
||||||
|
@ -40,7 +40,7 @@ pub const TestServer = struct {
|
|||||||
const auth: [4][]const u8 = switch (options.auth) {
|
const auth: [4][]const u8 = switch (options.auth) {
|
||||||
.none => .{""} ** 4,
|
.none => .{""} ** 4,
|
||||||
.token => |tok| .{ "--auth", tok, "", "" },
|
.token => |tok| .{ "--auth", tok, "", "" },
|
||||||
.password => |auth| .{ "--user", auth.user, "--password", auth.pass },
|
.password => |auth| .{ "--user", auth.user, "--pass", auth.pass },
|
||||||
};
|
};
|
||||||
|
|
||||||
break :blk executable ++ listen ++ port ++ auth;
|
break :blk executable ++ listen ++ port ++ auth;
|
||||||
@ -64,6 +64,7 @@ pub const TestServer = struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
_ = try child.kill();
|
_ = try child.kill();
|
||||||
|
std.debug.print("output: {s}\n", .{poller.fifo(.stderr).buf});
|
||||||
return error.NoLaunchStringFound;
|
return error.NoLaunchStringFound;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user