Compare commits

...

11 Commits

Author SHA1 Message Date
320bef2c63 build.zig.zon: match the project name 2024-04-06 15:36:35 -07:00
28c7890f6c subscription: fix setCompletionCallback
This was not being tested, and it was really broken. Now it is being
tested and it is no longer broken.
2024-04-06 15:36:35 -07:00
5939836bec all: allow passing more types of pointers as callback userdata
By performing more pointer casting gymnastics, additional types of
pointer can be supported. For example, now const qualified pointers
can be passed through thanks to the use of constCast. Also, having
explicit ptrCast allows nested pointers (e.g. a pointer to a slice,
such as `*[]const u8`) to be passed as userdata as well (the compiler
refuses to coerce a pointer to a pointer to `?*anyopaque` for some
reason, I guess because maybe it's ambiguous somehow?) Hopefully this
extra casting does not appreciably reduce the compiler's ability to
catch real bugs (for example, on a 64-bit machine, ptrCast can convert
a *u64 into a **u64 because there is no alignment change).

Also, the `volatile` pointer specifier is still not supported.
`allowzero` pointers probably also have a problem. Those are both
extreme edge cases, however.

This was intended to work before but did not due to an oversight.
Specifically, because the userdata pointers are stored as ?*anyopaque,
which does *not* have the const qualifier, they must have their const
qualifiers also removed. This is safe because the thunk guarantees
that the consumer code never sees the non-const version of the
pointer, and the nats library itself does nothing except pass the
pointer through to the user callback.

The tests have been updated to ensure this case works. The examples
still use a mutable userdata pointer to show that that case also
works. More tests could be added for the sake of increased rigor, but
I don't think it adds much.
2024-04-06 15:36:35 -07:00
4f26bf8fca thunk: fix slightly confusing compile errors
The error messages not on the slice detection code path should not be
talking about slices.
2024-04-06 15:36:35 -07:00
c4a8ae1a38 deps: update libressl dep for minor build system fix
This theoretically caches things better.
2024-01-15 16:26:13 -08:00
3462b3cdb6 all: don't automatically convert userdata types to pointer types
One of the things I have done a better job of internalizing while
working with zig over the last few months is that the less magic that
exists, the better. In the case of parameterized functions, this means
that it is much better to restrict the range of types that are
permitted to be passed than to perform type manipulation. In other
words, it's more confusing to see a function that is parameterized
with `SomeType` taking a pointer to that type than having it be
parameterized directly to take the pointer. Obviously there are
exceptions to this rule, like std.mem.eql taking slices of its
parameterized type.

In fact, this new approach fixes some edge cases. Null userdata may now
be passed in, since the user can now actually specify an optional
pointer type (e.g. `?*void` may be used to provide always-null
userdata). Additionally, a pointer to a constant value can now be
passed in, which wasn't possible before (this could have been worked
around by use of constCast and being careful, but that's an
exceedingly bad option compared to having the type system work for
you).
2024-01-01 15:20:06 -08:00
b28a91b97f tests: add shell script for generating keys
This may be useful later.
2023-11-07 22:55:08 -08:00
c6764fcf60 readme: some minor updates 2023-11-07 20:54:59 -08:00
c18e1eb237 deps: update libressl to 3.8.2 2023-11-07 20:54:00 -08:00
99daf922fd build: enable building protobuf-c and streaming support
This is a simple enough change. However, there are no tests for the
streaming functionality, and I need to decide if I want to actually
try to write bindings for them.
2023-11-06 22:32:22 -08:00
7794532fb4 tests: fix for 3.7.0
Well, at least it seems the update worked.
2023-11-06 22:30:57 -08:00
12 changed files with 182 additions and 118 deletions

View File

@@ -8,17 +8,17 @@ There are three main goals:
2. Provide a native-feeling Zig client API. 2. Provide a native-feeling Zig client API.
3. Support cross-compilation to the platforms that Zig supports. 3. Support cross-compilation to the platforms that Zig supports.
Right now, in service of goal 3, the underlying C library is built without certain features (notably, without streaming support) due to those features requiring managing some transitive dependencies (for streaming, the `protobuf-c` library). Solving this limitation is somewhere on the roadmap, but it's not high priority. `nats.c` is compiled against a copy of LibreSSL that has been wrapped with the zig build system. This appears to work, but it notably is not specifically OpenSSL, so there may be corner cases around encrypted connections. `nats.c` is compiled against a copy of LibreSSL that has been wrapped with the zig build system. This appears to work, but it notably is not specifically OpenSSL, so there may be corner cases around encrypted connections. The `protobuf-c` runtime library is compiled directly in-tree.
# Status # Status
All basic `nats.c` APIs are wrapped. The JetStream APIs are not currently wrapped, and the streaming API is not built or wrapped. It is unlikely I will wrap these as I do not require them for my primary use case. Contributions on this front are welcome. All basic `nats.c` APIs are wrapped. The JetStream APIs are not currently wrapped, and the streaming API is not wrapped. It is unlikely I will wrap these as I do not require them for my primary use case. Contributions on this front are welcome. People who are brave or desperate can use these APIs unwrapped through the exposed `nats.nats_c` object.
In theory, all wrapped APIs are referenced in unit tests so that they are at least checked to compile correctly. The unit tests do not do much in the way of behavioral testing, under the assumption that the underlying C library is well tested. However, there may be some gaps in the test coverage around less-common APIs. In theory, all wrapped APIs are referenced in unit tests so that they are at least checked to compile correctly. The unit tests do not do much in the way of behavioral testing, under the assumption that the underlying C library is well tested. However, there may be some gaps in the test coverage around less-common APIs.
The standard workflows around publishing and subscribing to messages seem to work well and feel (in my opinion) sufficiently Zig-like. Some of the APIs use getter/setter functions more heavily than I think a native Zig implementation would due to the fact that the underlying C library is designed with a very clean opaque handle API style. The standard workflows around publishing and subscribing to messages seem to work well and feel (in my opinion) sufficiently Zig-like. Some of the APIs use getter/setter functions more heavily than I think a native Zig implementation would, due to the fact that the underlying C library is designed with a very clean opaque handle API style.
Only tagged release versions of `nats.c` will be used. The current version of `nats.c` being used is `3.6.1`. Only tagged release versions of `nats.c` will be used. The current version of `nats.c` being used is `3.7.0`.
# Zig Version Support # Zig Version Support

View File

@@ -59,7 +59,7 @@ const examples = [_]Example{
.{ .name = "pub_bytes", .file = "examples/pub_bytes.zig" }, .{ .name = "pub_bytes", .file = "examples/pub_bytes.zig" },
}; };
pub fn add_examples(b: *std.build, options: ExampleOptions) void { pub fn add_examples(b: *std.Build, options: ExampleOptions) void {
const example_step = b.step("examples", "build examples"); const example_step = b.step("examples", "build examples");
inline for (examples) |example| { inline for (examples) |example| {

View File

@@ -1,10 +1,10 @@
.{ .{
.name = "nats-client", .name = "nats.zig",
.version = "0.0.1", .version = "0.0.1",
.dependencies = .{ .dependencies = .{
.libressl = .{ .libressl = .{
.url = "https://github.com/epicyclic-dev/LibreSSL-portable/archive/4d1e5c45115b8522561994f10e1ed3d2362efc54.tar.gz", .url = "https://github.com/epicyclic-dev/LibreSSL-portable/archive/4bbf9ad43fd5d56c8e15bc2e880aab7c4e49731b.tar.gz",
.hash = "1220686a95159e60d9be002cb5c97508dc27fbcade83fd7ceff35fa0cb355039bc81", .hash = "1220282c6f64f531b9d07d5ed1959708822f4f8dc2486a7005be391c8f5cdf2a502a",
}, },
}, },
} }

View File

@@ -29,10 +29,10 @@ pub fn main() !void {
defer connection.destroy(); defer connection.destroy();
var count: u32 = 0; var count: u32 = 0;
const subscription = try connection.subscribe(u32, "channel", onMessage, &count); const subscription = try connection.subscribe(*u32, "channel", onMessage, &count);
defer subscription.destroy(); defer subscription.destroy();
while (count < 10) : (nats.sleep(1000)) { while (count < 10) : (nats.sleep(100)) {
const reply = try connection.request("channel", "greetings", 1000); const reply = try connection.request("channel", "greetings", 1000);
defer reply.destroy(); defer reply.destroy();

View File

@@ -26,9 +26,12 @@ pub fn nats_c_lib(
lib.linkLibC(); lib.linkLibC();
lib.addCSourceFiles(&common_sources, &cflags); lib.addCSourceFiles(&common_sources, &cflags);
lib.addIncludePath(.{ .path = nats_src_prefix ++ "include" }); lib.addIncludePath(.{ .path = nats_src_prefix ++ "include" });
// if building with streaming support // if building with streaming support (protocol.pb-c.c includes
// lib.addIncludePath(.{ .path = nats_src_prefix ++ "stan" }); // <protobuf-c/protobuf-c.h>, unfortunately)
// lib.addCSourceFiles(&streaming_sources, &cflags); lib.addIncludePath(.{ .path = "deps" });
lib.addIncludePath(.{ .path = nats_src_prefix ++ "stan" });
lib.addCSourceFiles(&streaming_sources, &cflags);
lib.addCSourceFiles(&protobuf_c_sources, &cflags);
const ssl_dep = b.dependency("libressl", .{ const ssl_dep = b.dependency("libressl", .{
.target = options.target, .target = options.target,
@@ -62,6 +65,7 @@ pub fn nats_c_lib(
lib.defineCMacro("NATS_HAS_TLS", null); lib.defineCMacro("NATS_HAS_TLS", null);
lib.defineCMacro("NATS_USE_OPENSSL_1_1", null); lib.defineCMacro("NATS_USE_OPENSSL_1_1", null);
lib.defineCMacro("NATS_FORCE_HOST_VERIFICATION", null); lib.defineCMacro("NATS_FORCE_HOST_VERIFICATION", null);
lib.defineCMacro("NATS_HAS_STREAMING", null);
lib.defineCMacro("_REENTRANT", null); lib.defineCMacro("_REENTRANT", null);
inline for (install_headers) |header| { inline for (install_headers) |header| {
@@ -141,3 +145,7 @@ const streaming_sources = [_][]const u8{
nats_src_prefix ++ "stan/sopts.c", nats_src_prefix ++ "stan/sopts.c",
nats_src_prefix ++ "stan/sub.c", nats_src_prefix ++ "stan/sub.c",
}; };
const protobuf_c_sources = [_][]const u8{
"deps/protobuf-c/protobuf-c.c",
};

View File

@@ -18,24 +18,21 @@ pub const nats_c = @cImport({
@cInclude("nats/nats.h"); @cInclude("nats/nats.h");
}); });
const sub_ = @import("./subscription.zig"); const Subscription = @import("./subscription.zig").Subscription;
const Subscription = sub_.Subscription; const SubscriptionCallbackSignature = @import("./subscription.zig").SubscriptionCallbackSignature;
const SubscriptionCallbackSignature = sub_.SubscriptionCallbackSignature; const makeSubscriptionCallbackThunk = @import("./subscription.zig").makeSubscriptionCallbackThunk;
const makeSubscriptionCallbackThunk = sub_.makeSubscriptionCallbackThunk;
const msg_ = @import("./message.zig"); const Message = @import("./message.zig").Message;
const Message = msg_.Message;
const err_ = @import("./error.zig"); const Error = @import("./error.zig").Error;
const Error = err_.Error; const Status = @import("./error.zig").Status;
const Status = err_.Status; const ErrorInfo = @import("./error.zig").ErrorInfo;
const ErrorInfo = err_.ErrorInfo;
const sta_ = @import("./statistics.zig"); const Statistics = @import("./statistics.zig").Statistics;
const Statistics = sta_.Statistics; const StatsCounts = @import("./statistics.zig").StatsCounts;
const StatsCounts = sta_.StatsCounts;
const thunk = @import("./thunk.zig"); const thunk = @import("./thunk.zig");
const checkUserDataType = @import("./thunk.zig").checkUserDataType;
pub const default_server_url: [:0]const u8 = nats_c.NATS_DEFAULT_URL; pub const default_server_url: [:0]const u8 = nats_c.NATS_DEFAULT_URL;
@@ -343,7 +340,7 @@ pub const Connection = opaque {
comptime T: type, comptime T: type,
subject: [:0]const u8, subject: [:0]const u8,
callback: SubscriptionCallbackSignature(T), callback: SubscriptionCallbackSignature(T),
userdata: *T, userdata: T,
) Error!*Subscription { ) Error!*Subscription {
var sub: *Subscription = undefined; var sub: *Subscription = undefined;
const status = Status.fromInt(nats_c.natsConnection_Subscribe( const status = Status.fromInt(nats_c.natsConnection_Subscribe(
@@ -351,7 +348,7 @@ pub const Connection = opaque {
@ptrCast(self), @ptrCast(self),
subject.ptr, subject.ptr,
makeSubscriptionCallbackThunk(T, callback), makeSubscriptionCallbackThunk(T, callback),
userdata, @constCast(@ptrCast(userdata)),
)); ));
return status.toError() orelse sub; return status.toError() orelse sub;
} }
@@ -362,7 +359,7 @@ pub const Connection = opaque {
subject: [:0]const u8, subject: [:0]const u8,
timeout: i64, timeout: i64,
callback: SubscriptionCallbackSignature(T), callback: SubscriptionCallbackSignature(T),
userdata: *T, userdata: T,
) Error!*Subscription { ) Error!*Subscription {
var sub: *Subscription = undefined; var sub: *Subscription = undefined;
@@ -372,7 +369,7 @@ pub const Connection = opaque {
subject.ptr, subject.ptr,
timeout, timeout,
makeSubscriptionCallbackThunk(T, callback), makeSubscriptionCallbackThunk(T, callback),
userdata, @constCast(@ptrCast(userdata)),
)); ));
return status.toError() orelse sub; return status.toError() orelse sub;
@@ -396,7 +393,7 @@ pub const Connection = opaque {
subject: [:0]const u8, subject: [:0]const u8,
queue_group: [:0]const u8, queue_group: [:0]const u8,
callback: SubscriptionCallbackSignature(T), callback: SubscriptionCallbackSignature(T),
userdata: *T, userdata: T,
) Error!*Subscription { ) Error!*Subscription {
var sub: *Subscription = undefined; var sub: *Subscription = undefined;
@@ -406,7 +403,7 @@ pub const Connection = opaque {
subject.ptr, subject.ptr,
queue_group.ptr, queue_group.ptr,
makeSubscriptionCallbackThunk(T, callback), makeSubscriptionCallbackThunk(T, callback),
userdata, @constCast(@ptrCast(userdata)),
)); ));
return status.toError() orelse sub; return status.toError() orelse sub;
@@ -419,7 +416,7 @@ pub const Connection = opaque {
queue_group: [:0]const u8, queue_group: [:0]const u8,
timeout: i64, timeout: i64,
callback: SubscriptionCallbackSignature(T), callback: SubscriptionCallbackSignature(T),
userdata: *T, userdata: T,
) Error!*Subscription { ) Error!*Subscription {
var sub: *Subscription = undefined; var sub: *Subscription = undefined;
@@ -430,7 +427,7 @@ pub const Connection = opaque {
queue_group.ptr, queue_group.ptr,
timeout, timeout,
makeSubscriptionCallbackThunk(T, callback), makeSubscriptionCallbackThunk(T, callback),
userdata, @constCast(@ptrCast(userdata)),
)); ));
return status.toError() orelse sub; return status.toError() orelse sub;
@@ -502,12 +499,12 @@ pub const ConnectionOptions = opaque {
self: *ConnectionOptions, self: *ConnectionOptions,
comptime T: type, comptime T: type,
comptime callback: *const TokenCallbackSignature(T), comptime callback: *const TokenCallbackSignature(T),
userdata: *T, userdata: T,
) Error!void { ) Error!void {
return Status.fromInt(nats_c.natsOptions_SetTokenHandler( return Status.fromInt(nats_c.natsOptions_SetTokenHandler(
@ptrCast(self), @ptrCast(self),
makeTokenCallbackThunk(T, callback), makeTokenCallbackThunk(T, callback),
userdata, @constCast(@ptrCast(userdata)),
)).raise(); )).raise();
} }
@@ -641,13 +638,13 @@ pub const ConnectionOptions = opaque {
self: *ConnectionOptions, self: *ConnectionOptions,
comptime T: type, comptime T: type,
comptime callback: *const ReconnectDelayCallbackSignature(T), comptime callback: *const ReconnectDelayCallbackSignature(T),
userdata: *T, userdata: T,
) Error!void { ) Error!void {
return Status.fromInt( return Status.fromInt(
nats_c.natsOptions_SetCustomReconnectDelay( nats_c.natsOptions_SetCustomReconnectDelay(
@ptrCast(self), @ptrCast(self),
makeReconnectDelayCallbackThunk(T, callback), makeReconnectDelayCallbackThunk(T, callback),
userdata, @constCast(@ptrCast(userdata)),
), ),
).raise(); ).raise();
} }
@@ -668,13 +665,13 @@ pub const ConnectionOptions = opaque {
self: *ConnectionOptions, self: *ConnectionOptions,
comptime T: type, comptime T: type,
comptime callback: *const ErrorHandlerCallbackSignature(T), comptime callback: *const ErrorHandlerCallbackSignature(T),
userdata: *T, userdata: T,
) Error!void { ) Error!void {
return Status.fromInt( return Status.fromInt(
nats_c.natsOptions_SetErrorHandler( nats_c.natsOptions_SetErrorHandler(
@ptrCast(self), @ptrCast(self),
makeErrorHandlerCallbackThunk(T, callback), makeErrorHandlerCallbackThunk(T, callback),
userdata, @constCast(@ptrCast(userdata)),
), ),
).raise(); ).raise();
} }
@@ -683,12 +680,12 @@ pub const ConnectionOptions = opaque {
self: *ConnectionOptions, self: *ConnectionOptions,
comptime T: type, comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T), comptime callback: *const ConnectionCallbackSignature(T),
userdata: *T, userdata: T,
) Error!void { ) Error!void {
return Status.fromInt(nats_c.natsOptions_SetClosedCB( return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self), @ptrCast(self),
makeConnectionCallbackThunk(T, callback), makeConnectionCallbackThunk(T, callback),
userdata, @constCast(@ptrCast(userdata)),
)).raise(); )).raise();
} }
@@ -696,12 +693,12 @@ pub const ConnectionOptions = opaque {
self: *ConnectionOptions, self: *ConnectionOptions,
comptime T: type, comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T), comptime callback: *const ConnectionCallbackSignature(T),
userdata: *T, userdata: T,
) Error!void { ) Error!void {
return Status.fromInt(nats_c.natsOptions_SetClosedCB( return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self), @ptrCast(self),
makeConnectionCallbackThunk(T, callback), makeConnectionCallbackThunk(T, callback),
userdata, @constCast(@ptrCast(userdata)),
)).raise(); )).raise();
} }
@@ -709,12 +706,12 @@ pub const ConnectionOptions = opaque {
self: *ConnectionOptions, self: *ConnectionOptions,
comptime T: type, comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T), comptime callback: *const ConnectionCallbackSignature(T),
userdata: *T, userdata: T,
) Error!void { ) Error!void {
return Status.fromInt(nats_c.natsOptions_SetClosedCB( return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self), @ptrCast(self),
makeConnectionCallbackThunk(T, callback), makeConnectionCallbackThunk(T, callback),
userdata, @constCast(@ptrCast(userdata)),
)).raise(); )).raise();
} }
@@ -722,12 +719,12 @@ pub const ConnectionOptions = opaque {
self: *ConnectionOptions, self: *ConnectionOptions,
comptime T: type, comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T), comptime callback: *const ConnectionCallbackSignature(T),
userdata: *T, userdata: T,
) Error!void { ) Error!void {
return Status.fromInt(nats_c.natsOptions_SetClosedCB( return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self), @ptrCast(self),
makeConnectionCallbackThunk(T, callback), makeConnectionCallbackThunk(T, callback),
userdata, @constCast(@ptrCast(userdata)),
)).raise(); )).raise();
} }
@@ -735,12 +732,12 @@ pub const ConnectionOptions = opaque {
self: *ConnectionOptions, self: *ConnectionOptions,
comptime T: type, comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T), comptime callback: *const ConnectionCallbackSignature(T),
userdata: *T, userdata: T,
) Error!void { ) Error!void {
return Status.fromInt(nats_c.natsOptions_SetClosedCB( return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self), @ptrCast(self),
makeConnectionCallbackThunk(T, callback), makeConnectionCallbackThunk(T, callback),
userdata, @constCast(@ptrCast(userdata)),
)).raise(); )).raise();
} }
@@ -752,11 +749,11 @@ pub const ConnectionOptions = opaque {
comptime read_callback: *const AttachEventLoopCallbackSignature(T), comptime read_callback: *const AttachEventLoopCallbackSignature(T),
comptime write_callback: *const AttachEventLoopCallbackSignature(T), comptime write_callback: *const AttachEventLoopCallbackSignature(T),
comptime detach_callback: *const thunk.SimpleCallbackSignature(T), comptime detach_callback: *const thunk.SimpleCallbackSignature(T),
loop: *L, loop: L,
) Error!void { ) Error!void {
return Status.fromInt(nats_c.natsOptions_SetEventLoop( return Status.fromInt(nats_c.natsOptions_SetEventLoop(
@ptrCast(self), @ptrCast(self),
@ptrCast(loop), @constCast(@ptrCast(loop)),
makeAttachEventLoopCallbackThunk(T, L, attach_callback), makeAttachEventLoopCallbackThunk(T, L, attach_callback),
makeEventLoopAddRemoveCallbackThunk(T, read_callback), makeEventLoopAddRemoveCallbackThunk(T, read_callback),
makeEventLoopAddRemoveCallbackThunk(T, write_callback), makeEventLoopAddRemoveCallbackThunk(T, write_callback),
@@ -819,13 +816,13 @@ pub const ConnectionOptions = opaque {
comptime T: type, comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T), comptime callback: *const ConnectionCallbackSignature(T),
retry: bool, retry: bool,
userdata: *T, userdata: T,
) Error!void { ) Error!void {
return Status.fromInt(nats_c.natsOptions_SetRetryOnFailedConnect( return Status.fromInt(nats_c.natsOptions_SetRetryOnFailedConnect(
@ptrCast(self), @ptrCast(self),
retry, retry,
makeConnectionCallbackThunk(T, callback), makeConnectionCallbackThunk(T, callback),
userdata, @constCast(@ptrCast(userdata)),
)).raise(); )).raise();
} }
@@ -835,19 +832,23 @@ pub const ConnectionOptions = opaque {
comptime U: type, comptime U: type,
comptime jwt_callback: *const JwtHandlerCallbackSignature(T), comptime jwt_callback: *const JwtHandlerCallbackSignature(T),
comptime sig_callback: *const SignatureHandlerCallbackSignature(U), comptime sig_callback: *const SignatureHandlerCallbackSignature(U),
jwt_userdata: *T, jwt_userdata: T,
sig_userdata: *U, sig_userdata: U,
) Error!void { ) Error!void {
return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks( return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks(
@ptrCast(self), @ptrCast(self),
makeJwtHandlerCallbackThunk(T, jwt_callback), makeJwtHandlerCallbackThunk(T, jwt_callback),
jwt_userdata, @constCast(@ptrCast(jwt_userdata)),
makeSignatureHandlerCallbackThunk(U, sig_callback), makeSignatureHandlerCallbackThunk(U, sig_callback),
sig_userdata, @constCast(@ptrCast(sig_userdata)),
)).raise(); )).raise();
} }
pub fn setUserCredentialsFromFiles(self: *ConnectionOptions, user_or_chained_file: [:0]const u8, seed_file: [:0]const u8) Error!void { pub fn setUserCredentialsFromFiles(
self: *ConnectionOptions,
user_or_chained_file: [:0]const u8,
seed_file: [:0]const u8,
) Error!void {
return Status.fromInt( return Status.fromInt(
nats_c.natsOptions_SetUserCredentialsFromFiles( nats_c.natsOptions_SetUserCredentialsFromFiles(
@ptrCast(self), @ptrCast(self),
@@ -871,13 +872,13 @@ pub const ConnectionOptions = opaque {
comptime T: type, comptime T: type,
comptime sig_callback: *const SignatureHandlerCallbackSignature(T), comptime sig_callback: *const SignatureHandlerCallbackSignature(T),
pub_key: [:0]const u8, pub_key: [:0]const u8,
sig_userdata: *T, sig_userdata: T,
) Error!void { ) Error!void {
return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks( return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks(
@ptrCast(self), @ptrCast(self),
pub_key.ptr, pub_key.ptr,
makeSignatureHandlerCallbackThunk(T, sig_callback), makeSignatureHandlerCallbackThunk(T, sig_callback),
sig_userdata, @constCast(@ptrCast(sig_userdata)),
)).raise(); )).raise();
} }
@@ -919,16 +920,17 @@ pub const ConnectionOptions = opaque {
const TokenCallback = fn (?*anyopaque) callconv(.C) [*c]const u8; const TokenCallback = fn (?*anyopaque) callconv(.C) [*c]const u8;
pub fn TokenCallbackSignature(comptime T: type) type { pub fn TokenCallbackSignature(comptime T: type) type {
return fn (*T) [:0]const u8; return fn (T) [:0]const u8;
} }
fn makeTokenCallbackThunk( fn makeTokenCallbackThunk(
comptime T: type, comptime T: type,
comptime callback: *const TokenCallbackSignature(T), comptime callback: *const TokenCallbackSignature(T),
) *const TokenCallback { ) *const TokenCallback {
comptime checkUserDataType(T);
return struct { return struct {
fn thunk(userdata: ?*anyopaque) callconv(.C) [*c]const u8 { fn thunk(userdata: ?*anyopaque) callconv(.C) [*c]const u8 {
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable; const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
return callback(data).ptr; return callback(data).ptr;
} }
}.thunk; }.thunk;
@@ -937,17 +939,18 @@ fn makeTokenCallbackThunk(
const ConnectionCallback = fn (?*nats_c.natsConnection, ?*anyopaque) callconv(.C) void; const ConnectionCallback = fn (?*nats_c.natsConnection, ?*anyopaque) callconv(.C) void;
pub fn ConnectionCallbackSignature(comptime T: type) type { pub fn ConnectionCallbackSignature(comptime T: type) type {
return fn (*T, *Connection) void; return fn (T, *Connection) void;
} }
fn makeConnectionCallbackThunk( fn makeConnectionCallbackThunk(
comptime T: type, comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T), comptime callback: *const ConnectionCallbackSignature(T),
) *const ConnectionCallback { ) *const ConnectionCallback {
comptime checkUserDataType(T);
return struct { return struct {
fn thunk(conn: ?*nats_c.natsConnection, userdata: ?*anyopaque) callconv(.C) void { fn thunk(conn: ?*nats_c.natsConnection, userdata: ?*anyopaque) callconv(.C) void {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable; const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable; const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
callback(data, connection); callback(data, connection);
} }
}.thunk; }.thunk;
@@ -956,13 +959,14 @@ fn makeConnectionCallbackThunk(
const ReconnectDelayCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) callconv(.C) i64; const ReconnectDelayCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) callconv(.C) i64;
pub fn ReconnectDelayCallbackSignature(comptime T: type) type { pub fn ReconnectDelayCallbackSignature(comptime T: type) type {
return fn (*T, *Connection, c_int) i64; return fn (T, *Connection, c_int) i64;
} }
fn makeReconnectDelayCallbackThunk( fn makeReconnectDelayCallbackThunk(
comptime T: type, comptime T: type,
comptime callback: *const ReconnectDelayCallbackSignature(T), comptime callback: *const ReconnectDelayCallbackSignature(T),
) *const ReconnectDelayCallback { ) *const ReconnectDelayCallback {
comptime checkUserDataType(T);
return struct { return struct {
fn thunk( fn thunk(
conn: ?*nats_c.natsConnection, conn: ?*nats_c.natsConnection,
@@ -970,7 +974,7 @@ fn makeReconnectDelayCallbackThunk(
userdata: ?*anyopaque, userdata: ?*anyopaque,
) callconv(.C) i64 { ) callconv(.C) i64 {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable; const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable; const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
return callback(data, connection, attempts); return callback(data, connection, attempts);
} }
}.thunk; }.thunk;
@@ -984,13 +988,14 @@ const ErrorHandlerCallback = fn (
) callconv(.C) void; ) callconv(.C) void;
pub fn ErrorHandlerCallbackSignature(comptime T: type) type { pub fn ErrorHandlerCallbackSignature(comptime T: type) type {
return fn (*T, *Connection, *Subscription, Status) void; return fn (T, *Connection, *Subscription, Status) void;
} }
fn makeErrorHandlerCallbackThunk( fn makeErrorHandlerCallbackThunk(
comptime T: type, comptime T: type,
comptime callback: *const ErrorHandlerCallbackSignature(T), comptime callback: *const ErrorHandlerCallbackSignature(T),
) *const ErrorHandlerCallback { ) *const ErrorHandlerCallback {
comptime checkUserDataType(T);
return struct { return struct {
fn thunk( fn thunk(
conn: ?*nats_c.natsConnection, conn: ?*nats_c.natsConnection,
@@ -1000,7 +1005,7 @@ fn makeErrorHandlerCallbackThunk(
) callconv(.C) void { ) callconv(.C) void {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable; const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const subscription: *Subscription = if (sub) |s| @ptrCast(s) else unreachable; const subscription: *Subscription = if (sub) |s| @ptrCast(s) else unreachable;
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable; const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
callback(data, connection, subscription, Status.fromInt(status)); callback(data, connection, subscription, Status.fromInt(status));
} }
@@ -1016,7 +1021,7 @@ const AttachEventLoopCallback = fn (
) callconv(.C) nats_c.natsStatus; ) callconv(.C) nats_c.natsStatus;
pub fn AttachEventLoopCallbackSignature(comptime T: type, comptime L: type) type { pub fn AttachEventLoopCallbackSignature(comptime T: type, comptime L: type) type {
return fn (*L, *Connection, c_int) anyerror!*T; return fn (L, *Connection, c_int) anyerror!T;
} }
fn makeAttachEventLoopCallbackThunk( fn makeAttachEventLoopCallbackThunk(
@@ -1024,6 +1029,8 @@ fn makeAttachEventLoopCallbackThunk(
comptime L: type, comptime L: type,
comptime callback: *const AttachEventLoopCallbackSignature(T, L), comptime callback: *const AttachEventLoopCallbackSignature(T, L),
) *const ReconnectDelayCallback { ) *const ReconnectDelayCallback {
comptime checkUserDataType(T);
comptime checkUserDataType(L);
return struct { return struct {
fn thunk( fn thunk(
userdata: *?*anyopaque, userdata: *?*anyopaque,
@@ -1032,7 +1039,7 @@ fn makeAttachEventLoopCallbackThunk(
sock: ?*nats_c.natsSock, sock: ?*nats_c.natsSock,
) callconv(.C) nats_c.natsStatus { ) callconv(.C) nats_c.natsStatus {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable; const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const ev_loop: *L = if (loop) |l| @ptrCast(l) else unreachable; const ev_loop: L = if (loop) |l| @alignCast(@ptrCast(l)) else unreachable;
userdata.* = callback(ev_loop, connection, sock) catch |err| userdata.* = callback(ev_loop, connection, sock) catch |err|
return Status.fromError(err).toInt(); return Status.fromError(err).toInt();
@@ -1045,13 +1052,14 @@ fn makeAttachEventLoopCallbackThunk(
const EventLoopAddRemoveCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) callconv(.C) nats_c.natsStatus; const EventLoopAddRemoveCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) callconv(.C) nats_c.natsStatus;
pub fn EventLoopAddRemoveCallbackSignature(comptime T: type) type { pub fn EventLoopAddRemoveCallbackSignature(comptime T: type) type {
return fn (*T, *Connection, c_int) anyerror!void; return fn (T, *Connection, c_int) anyerror!void;
} }
fn makeEventLoopAddRemoveCallbackThunk( fn makeEventLoopAddRemoveCallbackThunk(
comptime T: type, comptime T: type,
comptime callback: *const EventLoopAddRemoveCallbackSignature(T), comptime callback: *const EventLoopAddRemoveCallbackSignature(T),
) *const ReconnectDelayCallback { ) *const ReconnectDelayCallback {
comptime checkUserDataType(T);
return struct { return struct {
fn thunk( fn thunk(
conn: ?*nats_c.natsConnection, conn: ?*nats_c.natsConnection,
@@ -1059,7 +1067,7 @@ fn makeEventLoopAddRemoveCallbackThunk(
userdata: ?*anyopaque, userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus { ) callconv(.C) nats_c.natsStatus {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable; const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable; const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
callback(data, connection, attempts) catch |err| callback(data, connection, attempts) catch |err|
return Status.fromError(err).toInt(); return Status.fromError(err).toInt();
@@ -1071,18 +1079,19 @@ fn makeEventLoopAddRemoveCallbackThunk(
const EventLoopDetachCallback = fn (?*anyopaque) callconv(.C) nats_c.natsStatus; const EventLoopDetachCallback = fn (?*anyopaque) callconv(.C) nats_c.natsStatus;
pub fn EventLoopDetachCallbackSignature(comptime T: type) type { pub fn EventLoopDetachCallbackSignature(comptime T: type) type {
return fn (*T) anyerror!void; return fn (T) anyerror!void;
} }
fn makeEventLoopDetachCallbackThunk( fn makeEventLoopDetachCallbackThunk(
comptime T: type, comptime T: type,
comptime callback: *const EventLoopDetachCallbackSignature(T), comptime callback: *const EventLoopDetachCallbackSignature(T),
) *const ReconnectDelayCallback { ) *const ReconnectDelayCallback {
comptime checkUserDataType(T);
return struct { return struct {
fn thunk( fn thunk(
userdata: ?*anyopaque, userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus { ) callconv(.C) nats_c.natsStatus {
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable; const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
callback(data) catch |err| return Status.fromError(err).toInt(); callback(data) catch |err| return Status.fromError(err).toInt();
return nats_c.NATS_OK; return nats_c.NATS_OK;
} }
@@ -1099,20 +1108,21 @@ pub const JwtResponseOrError = union(enum) {
}; };
pub fn JwtHandlerCallbackSignature(comptime T: type) type { pub fn JwtHandlerCallbackSignature(comptime T: type) type {
return fn (*T) JwtResponseOrError; return fn (T) JwtResponseOrError;
} }
fn makeJwtHandlerCallbackThunk( fn makeJwtHandlerCallbackThunk(
comptime T: type, comptime T: type,
comptime callback: *const JwtHandlerCallbackSignature(T), comptime callback: *const JwtHandlerCallbackSignature(T),
) *const JwtHandlerCallback { ) *const JwtHandlerCallback {
comptime checkUserDataType(T);
return struct { return struct {
fn thunk( fn thunk(
jwt_out_raw: ?*?[*:0]u8, jwt_out_raw: ?*?[*:0]u8,
err_out_raw: ?*?[*:0]u8, err_out_raw: ?*?[*:0]u8,
userdata: ?*anyopaque, userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus { ) callconv(.C) nats_c.natsStatus {
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable; const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
const err_out = err_out_raw orelse unreachable; const err_out = err_out_raw orelse unreachable;
const jwt_out = jwt_out_raw orelse unreachable; const jwt_out = jwt_out_raw orelse unreachable;
@@ -1140,13 +1150,14 @@ pub const SignatureResponseOrError = union(enum) {
}; };
pub fn SignatureHandlerCallbackSignature(comptime T: type) type { pub fn SignatureHandlerCallbackSignature(comptime T: type) type {
return fn (*T, [:0]const u8) SignatureResponseOrError; return fn (T, [:0]const u8) SignatureResponseOrError;
} }
fn makeSignatureHandlerCallbackThunk( fn makeSignatureHandlerCallbackThunk(
comptime T: type, comptime T: type,
comptime callback: *const SignatureHandlerCallbackSignature(T), comptime callback: *const SignatureHandlerCallbackSignature(T),
) *const SignatureHandlerCallback { ) *const SignatureHandlerCallback {
comptime checkUserDataType(T);
return struct { return struct {
fn thunk( fn thunk(
err_out_raw: ?*?[*:0]u8, err_out_raw: ?*?[*:0]u8,
@@ -1155,7 +1166,7 @@ fn makeSignatureHandlerCallbackThunk(
nonsense: ?[*:0]const u8, nonsense: ?[*:0]const u8,
userdata: ?*anyopaque, userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus { ) callconv(.C) nats_c.natsStatus {
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable; const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
const nonce = nonsense orelse unreachable; const nonce = nonsense orelse unreachable;
const err_out = err_out_raw orelse unreachable; const err_out = err_out_raw orelse unreachable;
const sig_out = sig_out_raw orelse unreachable; const sig_out = sig_out_raw orelse unreachable;

View File

@@ -27,6 +27,7 @@ const Error = err_.Error;
const Status = err_.Status; const Status = err_.Status;
const thunk = @import("./thunk.zig"); const thunk = @import("./thunk.zig");
const checkUserDataType = @import("./thunk.zig").checkUserDataType;
pub const Subscription = opaque { pub const Subscription = opaque {
pub const MessageCount = struct { pub const MessageCount = struct {
@@ -173,12 +174,12 @@ pub const Subscription = opaque {
self: *Subscription, self: *Subscription,
comptime T: type, comptime T: type,
comptime callback: *const thunk.SimpleCallbackThunkSignature(T), comptime callback: *const thunk.SimpleCallbackThunkSignature(T),
userdata: *T, userdata: T,
) Error!void { ) Error!void {
return Status.fromInt(nats_c.natsSubscription_SetOnCompleteCB( return Status.fromInt(nats_c.natsSubscription_SetOnCompleteCB(
@ptrCast(self), @ptrCast(self),
thunk.makeSimpleCallbackThunk(callback), thunk.makeSimpleCallbackThunk(T, callback),
userdata, @constCast(@ptrCast(userdata)),
)).raise(); )).raise();
} }
}; };
@@ -191,13 +192,14 @@ const SubscriptionCallback = fn (
) callconv(.C) void; ) callconv(.C) void;
pub fn SubscriptionCallbackSignature(comptime T: type) type { pub fn SubscriptionCallbackSignature(comptime T: type) type {
return fn (*T, *Connection, *Subscription, *Message) void; return fn (T, *Connection, *Subscription, *Message) void;
} }
pub fn makeSubscriptionCallbackThunk( pub fn makeSubscriptionCallbackThunk(
comptime T: type, comptime T: type,
comptime callback: *const SubscriptionCallbackSignature(T), comptime callback: *const SubscriptionCallbackSignature(T),
) *const SubscriptionCallback { ) *const SubscriptionCallback {
comptime checkUserDataType(T);
return struct { return struct {
fn thunk( fn thunk(
conn: ?*nats_c.natsConnection, conn: ?*nats_c.natsConnection,
@@ -211,7 +213,7 @@ pub fn makeSubscriptionCallbackThunk(
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable; const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const subscription: *Subscription = if (sub) |s| @ptrCast(s) else unreachable; const subscription: *Subscription = if (sub) |s| @ptrCast(s) else unreachable;
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable; const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
callback(data, connection, subscription, message); callback(data, connection, subscription, message);
} }

View File

@@ -18,19 +18,46 @@ pub const nats_c = @cImport({
@cInclude("nats/nats.h"); @cInclude("nats/nats.h");
}); });
pub fn checkUserDataType(comptime T: type) void {
switch (@typeInfo(T)) {
.Optional => |info| switch (@typeInfo(info.child)) {
.Optional => @compileError(
"nats callbacks can only accept an (optional) single, many," ++
" or c pointer as userdata. \"" ++
@typeName(T) ++ "\" has more than one optional specifier.",
),
else => checkUserDataType(info.child),
},
.Pointer => |info| switch (info.size) {
.Slice => @compileError(
"nats callbacks can only accept an (optional) single, many," ++
" or c pointer as userdata, not slices. \"" ++
@typeName(T) ++ "\" appears to be a slice.",
),
else => {},
},
else => @compileError(
"nats callbacks can only accept an (optional) single, many," ++
" or c pointer as userdata. \"" ++
@typeName(T) ++ "\" is not a pointer type.",
),
}
}
const SimpleCallback = fn (?*anyopaque) callconv(.C) void; const SimpleCallback = fn (?*anyopaque) callconv(.C) void;
pub fn SimpleCallbackThunkSignature(comptime T: type) type { pub fn SimpleCallbackThunkSignature(comptime T: type) type {
return fn (*T) void; return fn (T) void;
} }
pub fn makeSimpleCallbackThunk( pub fn makeSimpleCallbackThunk(
comptime T: type, comptime T: type,
comptime callback: *const SimpleCallbackThunkSignature(T), comptime callback: *const SimpleCallbackThunkSignature(T),
) *const SimpleCallback { ) *const SimpleCallback {
comptime checkUserDataType(T);
return struct { return struct {
fn thunk(userdata: ?*anyopaque) callconv(.C) void { fn thunk(userdata: ?*anyopaque) callconv(.C) void {
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable; const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
callback(data); callback(data);
} }
}.thunk; }.thunk;

View File

@@ -99,7 +99,7 @@ test "nats.Connection" {
connection.drainTimeout(1000) catch {}; connection.drainTimeout(1000) catch {};
} }
fn reconnectDelayHandler(userdata: *u32, connection: *nats.Connection, attempts: c_int) i64 { fn reconnectDelayHandler(userdata: *const u32, connection: *nats.Connection, attempts: c_int) i64 {
_ = userdata; _ = userdata;
_ = connection; _ = connection;
_ = attempts; _ = attempts;
@@ -108,7 +108,7 @@ fn reconnectDelayHandler(userdata: *u32, connection: *nats.Connection, attempts:
} }
fn errorHandler( fn errorHandler(
userdata: *u32, userdata: *const u32,
connection: *nats.Connection, connection: *nats.Connection,
subscription: *nats.Subscription, subscription: *nats.Subscription,
status: nats.Status, status: nats.Status,
@@ -119,18 +119,18 @@ fn errorHandler(
_ = status; _ = status;
} }
fn connectionHandler(userdata: *u32, connection: *nats.Connection) void { fn connectionHandler(userdata: *const u32, connection: *nats.Connection) void {
_ = userdata; _ = userdata;
_ = connection; _ = connection;
} }
fn jwtHandler(userdata: *u32) nats.JwtResponseOrError { fn jwtHandler(userdata: *const u32) nats.JwtResponseOrError {
_ = userdata; _ = userdata;
// return .{ .jwt = std.heap.raw_c_allocator.dupeZ(u8, "abcdef") catch @panic("no!") }; // return .{ .jwt = std.heap.raw_c_allocator.dupeZ(u8, "abcdef") catch @panic("no!") };
return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "dang") catch @panic("no!") }; return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "dang") catch @panic("no!") };
} }
fn signatureHandler(userdata: *u32, nonce: [:0]const u8) nats.SignatureResponseOrError { fn signatureHandler(userdata: *const u32, nonce: [:0]const u8) nats.SignatureResponseOrError {
_ = userdata; _ = userdata;
_ = nonce; _ = nonce;
// return .{ .signature = std.heap.raw_c_allocator.dupe(u8, "01230123") catch @panic("no!") }; // return .{ .signature = std.heap.raw_c_allocator.dupe(u8, "01230123") catch @panic("no!") };
@@ -144,7 +144,7 @@ test "nats.ConnectionOptions" {
const options = try nats.ConnectionOptions.create(); const options = try nats.ConnectionOptions.create();
defer options.destroy(); defer options.destroy();
var userdata: u32 = 0; const userdata: u32 = 0;
try options.setUrl(nats.default_server_url); try options.setUrl(nats.default_server_url);
const servers = [_][*:0]const u8{ "nats://127.0.0.1:4442", "nats://127.0.0.1:4443" }; const servers = [_][*:0]const u8{ "nats://127.0.0.1:4442", "nats://127.0.0.1:4443" };
@@ -164,14 +164,14 @@ test "nats.ConnectionOptions" {
try options.setMaxReconnect(10); try options.setMaxReconnect(10);
try options.setReconnectWait(500); try options.setReconnectWait(500);
try options.setReconnectJitter(100, 200); try options.setReconnectJitter(100, 200);
try options.setCustomReconnectDelay(u32, reconnectDelayHandler, &userdata); try options.setCustomReconnectDelay(*const u32, reconnectDelayHandler, &userdata);
try options.setReconnectBufSize(1024); try options.setReconnectBufSize(1024);
try options.setMaxPendingMessages(50); try options.setMaxPendingMessages(50);
try options.setErrorHandler(u32, errorHandler, &userdata); try options.setErrorHandler(*const u32, errorHandler, &userdata);
try options.setClosedCallback(u32, connectionHandler, &userdata); try options.setClosedCallback(*const u32, connectionHandler, &userdata);
try options.setDisconnectedCallback(u32, connectionHandler, &userdata); try options.setDisconnectedCallback(*const u32, connectionHandler, &userdata);
try options.setDiscoveredServersCallback(u32, connectionHandler, &userdata); try options.setDiscoveredServersCallback(*const u32, connectionHandler, &userdata);
try options.setLameDuckModeCallback(u32, connectionHandler, &userdata); try options.setLameDuckModeCallback(*const u32, connectionHandler, &userdata);
try options.ignoreDiscoveredServers(true); try options.ignoreDiscoveredServers(true);
try options.useGlobalMessageDelivery(false); try options.useGlobalMessageDelivery(false);
try options.ipResolutionOrder(.ipv4_first); try options.ipResolutionOrder(.ipv4_first);
@@ -179,8 +179,8 @@ test "nats.ConnectionOptions" {
try options.useOldRequestStyle(false); try options.useOldRequestStyle(false);
try options.setFailRequestsOnDisconnect(true); try options.setFailRequestsOnDisconnect(true);
try options.setNoEcho(true); try options.setNoEcho(true);
try options.setRetryOnFailedConnect(u32, connectionHandler, true, &userdata); try options.setRetryOnFailedConnect(*const u32, connectionHandler, true, &userdata);
try options.setUserCredentialsCallbacks(u32, u32, jwtHandler, signatureHandler, &userdata, &userdata); try options.setUserCredentialsCallbacks(*const u32, *const u32, jwtHandler, signatureHandler, &userdata, &userdata);
try options.setWriteDeadline(5); try options.setWriteDeadline(5);
try options.disableNoResponders(true); try options.disableNoResponders(true);
try options.setCustomInboxPrefix("_FOOBOX"); try options.setCustomInboxPrefix("_FOOBOX");
@@ -200,7 +200,7 @@ test "nats.ConnectionOptions (crypto edition)" {
defer options.destroy(); defer options.destroy();
var userdata: u32 = 0; var userdata: u32 = 0;
try options.setTokenHandler(u32, tokenHandler, &userdata); try options.setTokenHandler(*u32, tokenHandler, &userdata);
try options.setSecure(false); try options.setSecure(false);
try options.setCertificatesChain(rsa_cert, rsa_key); try options.setCertificatesChain(rsa_cert, rsa_key);
try options.setCiphers("-ALL:HIGH"); try options.setCiphers("-ALL:HIGH");

4
tests/data/genkeys.fish Normal file
View File

@@ -0,0 +1,4 @@
openssl req -x509 -newkey rsa:4096 -sha256 -days 36500 -nodes -keyout client-rsa.key -out client-rsa.cert -subj "/C=XX/ST=StateName/L=CityName/O=CompanyName/OU=CompanySectionName/CN=tests.nats.zig"
openssl req -x509 -newkey rsa:4096 -sha256 -days 36500 -nodes -keyout server-rsa.key -out server-rsa.cert -subj "/C=XX/ST=StateName/L=CityName/O=CompanyName/OU=CompanySectionName/CN=tests.nats.zig"
openssl req -x509 -newkey ec -pkeyopt ec_paramgen_curve:secp384r1 -days 36500 -nodes -keyout client-ecc.key -out client-ecc.cert -subj "/C=XX/ST=StateName/L=CityName/O=CompanyName/OU=CompanySectionName/CN=tests.nats.zig"
openssl req -x509 -newkey ec -pkeyopt ec_paramgen_curve:secp384r1 -days 36500 -nodes -keyout server-ecc.key -out server-ecc.cert -subj "/C=XX/ST=StateName/L=CityName/O=CompanyName/OU=CompanySectionName/CN=tests.nats.zig"

View File

@@ -9,8 +9,8 @@ test "version" {
const version = nats.getVersion(); const version = nats.getVersion();
const vernum = nats.getVersionNumber(); const vernum = nats.getVersionNumber();
try std.testing.expectEqualStrings("3.6.1", version); try std.testing.expectEqualStrings("3.7.0", version);
try std.testing.expectEqual(@as(u32, 0x03_06_01), vernum); try std.testing.expectEqual(@as(u32, 0x03_07_00), vernum);
try std.testing.expect(nats.checkCompatibility()); try std.testing.expect(nats.checkCompatibility());
} }

View File

@@ -80,18 +80,21 @@ test "nats.Subscription" {
} }
fn onMessage( fn onMessage(
userdata: *u32, userdata: *const u32,
connection: *nats.Connection, connection: *nats.Connection,
subscription: *nats.Subscription, subscription: *nats.Subscription,
message: *nats.Message, message: *nats.Message,
) void { ) void {
_ = subscription; _ = subscription;
_ = userdata;
if (message.getReply()) |reply| { if (message.getReply()) |reply| {
connection.publish(reply, "greetings") catch @panic("OH NO"); connection.publish(reply, "greetings") catch @panic("OH NO");
} else @panic("HOW"); } else @panic("HOW");
}
userdata.* += 1; fn onClose(userdata: *[]const u8) void {
userdata.* = "closed";
} }
test "nats.Subscription (async)" { test "nats.Subscription (async)" {
@@ -112,21 +115,30 @@ test "nats.Subscription (async)" {
defer message.destroy(); defer message.destroy();
{ {
var count: u32 = 0; var closed: []const u8 = "test";
const subscription = try connection.subscribe(u32, message_subject, onMessage, &count); {
const count: u32 = 0;
const subscription = try connection.subscribe(*const u32, message_subject, onMessage, &count);
defer subscription.destroy(); defer subscription.destroy();
try subscription.setCompletionCallback(*[]const u8, onClose, &closed);
const response = try connection.requestMessage(message, 1000); const response = try connection.requestMessage(message, 1000);
try std.testing.expectEqualStrings( try std.testing.expectEqualStrings(
"greetings", "greetings",
response.getData() orelse return error.TestUnexpectedResult, response.getData() orelse return error.TestUnexpectedResult,
); );
} }
// we have to sleep to allow the close callback to run. I am worried this may
// still end up being flaky, however.
nats.sleep(1);
try std.testing.expectEqualStrings("closed", closed);
}
{ {
var count: u32 = 0; const count: u32 = 0;
const subscription = try connection.subscribeTimeout( const subscription = try connection.subscribeTimeout(
u32, *const u32,
message_subject, message_subject,
1000, 1000,
onMessage, onMessage,
@@ -142,9 +154,9 @@ test "nats.Subscription (async)" {
} }
{ {
var count: u32 = 0; const count: u32 = 0;
const subscription = try connection.queueSubscribe( const subscription = try connection.queueSubscribe(
u32, *const u32,
message_subject, message_subject,
"queuegroup", "queuegroup",
onMessage, onMessage,
@@ -162,7 +174,7 @@ test "nats.Subscription (async)" {
{ {
var count: u32 = 0; var count: u32 = 0;
const subscription = try connection.queueSubscribeTimeout( const subscription = try connection.queueSubscribeTimeout(
u32, *const u32,
message_subject, message_subject,
"queuegroup", "queuegroup",
1000, 1000,