Compare commits

..

2 Commits

Author SHA1 Message Date
b17a3fba6c readme: update zig support status 2024-01-15 16:30:26 -08:00
4124b912eb build: update for zig-0.12.0-dev.2208+4debd4338
Incorporate various build API changes. Hopefully there won't be any
other major API changes before the 0.12.0 release.
2024-01-15 16:23:45 -08:00
9 changed files with 81 additions and 86 deletions

View File

@@ -22,7 +22,7 @@ Only tagged release versions of `nats.c` will be used. The current version of `n
# Zig Version Support # Zig Version Support
Since the language is still under active development, any written Zig code is a moving target. The plan is to support Zig `0.11.*` exclusively until the NATS library API has good coverage and is stabilized. At that point, if there are major breaking changes, a maintenance branch will be created, and master will probably move to track Zig master. Since the language is still under active development, any written Zig code is a moving target. The master branch targets zig 0.12 development versions (though it is not guaranteed to work with all versions. Check the commit history for specific version updates). The `zig-0.11.x` branch targets the current stable zig release, 0.11.
# Using # Using

View File

@@ -9,8 +9,9 @@ pub fn build(b: *std.Build) void {
const optimize = b.standardOptimizeOption(.{}); const optimize = b.standardOptimizeOption(.{});
const nats = b.addModule("nats", .{ const nats = b.addModule("nats", .{
.source_file = .{ .path = "src/nats.zig" }, .root_source_file = .{ .path = "src/nats.zig" },
}); });
nats.addIncludePath(.{ .path = b.getInstallPath(.header, "") });
const nats_c = nats_build.nats_c_lib(b, .{ const nats_c = nats_build.nats_c_lib(b, .{
.name = "nats-c", .name = "nats-c",
@@ -25,7 +26,7 @@ pub fn build(b: *std.Build) void {
.optimize = optimize, .optimize = optimize,
}); });
tests.addModule("nats", nats); tests.root_module.addImport("nats", nats);
tests.linkLibrary(nats_c); tests.linkLibrary(nats_c);
const run_main_tests = b.addRunArtifact(tests); const run_main_tests = b.addRunArtifact(tests);
@@ -42,7 +43,7 @@ pub fn build(b: *std.Build) void {
} }
const ExampleOptions = struct { const ExampleOptions = struct {
target: std.zig.CrossTarget, target: std.Build.ResolvedTarget,
optimize: std.builtin.OptimizeMode, optimize: std.builtin.OptimizeMode,
nats_module: *std.Build.Module, nats_module: *std.Build.Module,
nats_c: *std.Build.Step.Compile, nats_c: *std.Build.Step.Compile,
@@ -70,7 +71,7 @@ pub fn add_examples(b: *std.Build, options: ExampleOptions) void {
.optimize = .Debug, .optimize = .Debug,
}); });
ex_exe.addModule("nats", options.nats_module); ex_exe.root_module.addImport("nats", options.nats_module);
ex_exe.linkLibrary(options.nats_c); ex_exe.linkLibrary(options.nats_c);
const install = b.addInstallArtifact(ex_exe, .{}); const install = b.addInstallArtifact(ex_exe, .{});

View File

@@ -1,10 +1,20 @@
.{ .{
.name = "nats.zig", .name = "nats-client",
.version = "0.0.1", .version = "0.0.1",
.paths = .{
"src",
"deps/nats.c/src",
"deps/nats.c/LICENSE",
"deps/protobuf-c",
"build.zig",
"nats-c.build.zig",
"build.zig.zon",
"LICENSE",
},
.dependencies = .{ .dependencies = .{
.libressl = .{ .libressl = .{
.url = "https://github.com/epicyclic-dev/LibreSSL-portable/archive/4bbf9ad43fd5d56c8e15bc2e880aab7c4e49731b.tar.gz", .url = "https://github.com/epicyclic-dev/LibreSSL-portable/archive/9f74aeb1d2f5db5c375a1040cbd2b9abfa2749d1.tar.gz",
.hash = "1220282c6f64f531b9d07d5ed1959708822f4f8dc2486a7005be391c8f5cdf2a502a", .hash = "122092a200f7e27e90974013d7e5cd5ef27438f67016852b5244ea287018263e78dc",
}, },
}, },
} }

View File

@@ -5,7 +5,7 @@ const std = @import("std");
const NatsCOptions = struct { const NatsCOptions = struct {
name: []const u8, name: []const u8,
target: std.zig.CrossTarget, target: std.Build.ResolvedTarget,
optimize: std.builtin.OptimizeMode, optimize: std.builtin.OptimizeMode,
}; };
@@ -24,36 +24,36 @@ pub fn nats_c_lib(
}; };
lib.linkLibC(); lib.linkLibC();
lib.addCSourceFiles(&common_sources, &cflags); lib.addCSourceFiles(.{ .files = &common_sources, .flags = &cflags });
lib.addIncludePath(.{ .path = nats_src_prefix ++ "include" }); lib.addIncludePath(.{ .path = nats_src_prefix ++ "include" });
// if building with streaming support (protocol.pb-c.c includes // if building with streaming support (protocol.pb-c.c includes
// <protobuf-c/protobuf-c.h>, unfortunately) // <protobuf-c/protobuf-c.h>, unfortunately)
lib.addIncludePath(.{ .path = "deps" }); lib.addIncludePath(.{ .path = "deps" });
lib.addIncludePath(.{ .path = nats_src_prefix ++ "stan" }); lib.addIncludePath(.{ .path = nats_src_prefix ++ "stan" });
lib.addCSourceFiles(&streaming_sources, &cflags); lib.addCSourceFiles(.{ .files = &streaming_sources, .flags = &cflags });
lib.addCSourceFiles(&protobuf_c_sources, &cflags); lib.addCSourceFiles(.{ .files = &protobuf_c_sources, .flags = &cflags });
const ssl_dep = b.dependency("libressl", .{ const ssl_dep = b.dependency("libressl", .{
.target = options.target, .target = options.target,
.optimize = options.optimize, .optimize = options.optimize,
}); });
const tinfo = lib.target_info.target; const tinfo = options.target.result;
switch (tinfo.os.tag) { switch (tinfo.os.tag) {
.windows => { .windows => {
lib.addCSourceFiles(&win_sources, &cflags); lib.addCSourceFiles(.{ .files = &win_sources, .flags = &cflags });
if (tinfo.abi != .msvc) { if (tinfo.abi != .msvc) {
lib.addCSourceFiles(&.{"src/win-crosshack.c"}, &cflags); lib.addCSourceFiles(.{ .files = &.{"src/win-crosshack.c"}, .flags = &cflags });
} }
lib.defineCMacro("_WIN32", null); lib.defineCMacro("_WIN32", null);
lib.linkSystemLibrary("ws2_32"); lib.linkSystemLibrary("ws2_32");
}, },
.macos => { .macos => {
lib.addCSourceFiles(&unix_sources, &cflags); lib.addCSourceFiles(.{ .files = &unix_sources, .flags = &cflags });
lib.defineCMacro("DARWIN", null); lib.defineCMacro("DARWIN", null);
}, },
else => { else => {
lib.addCSourceFiles(&unix_sources, &cflags); lib.addCSourceFiles(.{ .files = &unix_sources, .flags = &cflags });
lib.defineCMacro("_GNU_SOURCE", null); lib.defineCMacro("_GNU_SOURCE", null);
lib.defineCMacro("LINUX", null); lib.defineCMacro("LINUX", null);
// may need to link pthread and rt. Not sure if those are included with linkLibC // may need to link pthread and rt. Not sure if those are included with linkLibC

View File

@@ -348,7 +348,7 @@ pub const Connection = opaque {
@ptrCast(self), @ptrCast(self),
subject.ptr, subject.ptr,
makeSubscriptionCallbackThunk(T, callback), makeSubscriptionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)), userdata,
)); ));
return status.toError() orelse sub; return status.toError() orelse sub;
} }
@@ -369,7 +369,7 @@ pub const Connection = opaque {
subject.ptr, subject.ptr,
timeout, timeout,
makeSubscriptionCallbackThunk(T, callback), makeSubscriptionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)), userdata,
)); ));
return status.toError() orelse sub; return status.toError() orelse sub;
@@ -403,7 +403,7 @@ pub const Connection = opaque {
subject.ptr, subject.ptr,
queue_group.ptr, queue_group.ptr,
makeSubscriptionCallbackThunk(T, callback), makeSubscriptionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)), userdata,
)); ));
return status.toError() orelse sub; return status.toError() orelse sub;
@@ -427,7 +427,7 @@ pub const Connection = opaque {
queue_group.ptr, queue_group.ptr,
timeout, timeout,
makeSubscriptionCallbackThunk(T, callback), makeSubscriptionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)), userdata,
)); ));
return status.toError() orelse sub; return status.toError() orelse sub;
@@ -504,7 +504,7 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetTokenHandler( return Status.fromInt(nats_c.natsOptions_SetTokenHandler(
@ptrCast(self), @ptrCast(self),
makeTokenCallbackThunk(T, callback), makeTokenCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)), userdata,
)).raise(); )).raise();
} }
@@ -644,7 +644,7 @@ pub const ConnectionOptions = opaque {
nats_c.natsOptions_SetCustomReconnectDelay( nats_c.natsOptions_SetCustomReconnectDelay(
@ptrCast(self), @ptrCast(self),
makeReconnectDelayCallbackThunk(T, callback), makeReconnectDelayCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)), userdata,
), ),
).raise(); ).raise();
} }
@@ -671,7 +671,7 @@ pub const ConnectionOptions = opaque {
nats_c.natsOptions_SetErrorHandler( nats_c.natsOptions_SetErrorHandler(
@ptrCast(self), @ptrCast(self),
makeErrorHandlerCallbackThunk(T, callback), makeErrorHandlerCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)), userdata,
), ),
).raise(); ).raise();
} }
@@ -685,7 +685,7 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetClosedCB( return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self), @ptrCast(self),
makeConnectionCallbackThunk(T, callback), makeConnectionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)), userdata,
)).raise(); )).raise();
} }
@@ -698,7 +698,7 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetClosedCB( return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self), @ptrCast(self),
makeConnectionCallbackThunk(T, callback), makeConnectionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)), userdata,
)).raise(); )).raise();
} }
@@ -711,7 +711,7 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetClosedCB( return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self), @ptrCast(self),
makeConnectionCallbackThunk(T, callback), makeConnectionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)), userdata,
)).raise(); )).raise();
} }
@@ -724,7 +724,7 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetClosedCB( return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self), @ptrCast(self),
makeConnectionCallbackThunk(T, callback), makeConnectionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)), userdata,
)).raise(); )).raise();
} }
@@ -737,7 +737,7 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetClosedCB( return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self), @ptrCast(self),
makeConnectionCallbackThunk(T, callback), makeConnectionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)), userdata,
)).raise(); )).raise();
} }
@@ -753,7 +753,7 @@ pub const ConnectionOptions = opaque {
) Error!void { ) Error!void {
return Status.fromInt(nats_c.natsOptions_SetEventLoop( return Status.fromInt(nats_c.natsOptions_SetEventLoop(
@ptrCast(self), @ptrCast(self),
@constCast(@ptrCast(loop)), @ptrCast(loop),
makeAttachEventLoopCallbackThunk(T, L, attach_callback), makeAttachEventLoopCallbackThunk(T, L, attach_callback),
makeEventLoopAddRemoveCallbackThunk(T, read_callback), makeEventLoopAddRemoveCallbackThunk(T, read_callback),
makeEventLoopAddRemoveCallbackThunk(T, write_callback), makeEventLoopAddRemoveCallbackThunk(T, write_callback),
@@ -822,7 +822,7 @@ pub const ConnectionOptions = opaque {
@ptrCast(self), @ptrCast(self),
retry, retry,
makeConnectionCallbackThunk(T, callback), makeConnectionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)), userdata,
)).raise(); )).raise();
} }
@@ -838,17 +838,13 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks( return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks(
@ptrCast(self), @ptrCast(self),
makeJwtHandlerCallbackThunk(T, jwt_callback), makeJwtHandlerCallbackThunk(T, jwt_callback),
@constCast(@ptrCast(jwt_userdata)), jwt_userdata,
makeSignatureHandlerCallbackThunk(U, sig_callback), makeSignatureHandlerCallbackThunk(U, sig_callback),
@constCast(@ptrCast(sig_userdata)), sig_userdata,
)).raise(); )).raise();
} }
pub fn setUserCredentialsFromFiles( pub fn setUserCredentialsFromFiles(self: *ConnectionOptions, user_or_chained_file: [:0]const u8, seed_file: [:0]const u8) Error!void {
self: *ConnectionOptions,
user_or_chained_file: [:0]const u8,
seed_file: [:0]const u8,
) Error!void {
return Status.fromInt( return Status.fromInt(
nats_c.natsOptions_SetUserCredentialsFromFiles( nats_c.natsOptions_SetUserCredentialsFromFiles(
@ptrCast(self), @ptrCast(self),
@@ -878,7 +874,7 @@ pub const ConnectionOptions = opaque {
@ptrCast(self), @ptrCast(self),
pub_key.ptr, pub_key.ptr,
makeSignatureHandlerCallbackThunk(T, sig_callback), makeSignatureHandlerCallbackThunk(T, sig_callback),
@constCast(@ptrCast(sig_userdata)), sig_userdata,
)).raise(); )).raise();
} }

View File

@@ -174,12 +174,12 @@ pub const Subscription = opaque {
self: *Subscription, self: *Subscription,
comptime T: type, comptime T: type,
comptime callback: *const thunk.SimpleCallbackThunkSignature(T), comptime callback: *const thunk.SimpleCallbackThunkSignature(T),
userdata: T, userdata: *T,
) Error!void { ) Error!void {
return Status.fromInt(nats_c.natsSubscription_SetOnCompleteCB( return Status.fromInt(nats_c.natsSubscription_SetOnCompleteCB(
@ptrCast(self), @ptrCast(self),
thunk.makeSimpleCallbackThunk(T, callback), thunk.makeSimpleCallbackThunk(callback),
@constCast(@ptrCast(userdata)), userdata,
)).raise(); )).raise();
} }
}; };

View File

@@ -23,7 +23,7 @@ pub fn checkUserDataType(comptime T: type) void {
.Optional => |info| switch (@typeInfo(info.child)) { .Optional => |info| switch (@typeInfo(info.child)) {
.Optional => @compileError( .Optional => @compileError(
"nats callbacks can only accept an (optional) single, many," ++ "nats callbacks can only accept an (optional) single, many," ++
" or c pointer as userdata. \"" ++ " or c pointer as userdata, not slices. \"" ++
@typeName(T) ++ "\" has more than one optional specifier.", @typeName(T) ++ "\" has more than one optional specifier.",
), ),
else => checkUserDataType(info.child), else => checkUserDataType(info.child),
@@ -38,7 +38,7 @@ pub fn checkUserDataType(comptime T: type) void {
}, },
else => @compileError( else => @compileError(
"nats callbacks can only accept an (optional) single, many," ++ "nats callbacks can only accept an (optional) single, many," ++
" or c pointer as userdata. \"" ++ " or c pointer as userdata, not slices. \"" ++
@typeName(T) ++ "\" is not a pointer type.", @typeName(T) ++ "\" is not a pointer type.",
), ),
} }
@@ -57,7 +57,7 @@ pub fn makeSimpleCallbackThunk(
comptime checkUserDataType(T); comptime checkUserDataType(T);
return struct { return struct {
fn thunk(userdata: ?*anyopaque) callconv(.C) void { fn thunk(userdata: ?*anyopaque) callconv(.C) void {
const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable; const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
callback(data); callback(data);
} }
}.thunk; }.thunk;

View File

@@ -99,7 +99,7 @@ test "nats.Connection" {
connection.drainTimeout(1000) catch {}; connection.drainTimeout(1000) catch {};
} }
fn reconnectDelayHandler(userdata: *const u32, connection: *nats.Connection, attempts: c_int) i64 { fn reconnectDelayHandler(userdata: *u32, connection: *nats.Connection, attempts: c_int) i64 {
_ = userdata; _ = userdata;
_ = connection; _ = connection;
_ = attempts; _ = attempts;
@@ -108,7 +108,7 @@ fn reconnectDelayHandler(userdata: *const u32, connection: *nats.Connection, att
} }
fn errorHandler( fn errorHandler(
userdata: *const u32, userdata: *u32,
connection: *nats.Connection, connection: *nats.Connection,
subscription: *nats.Subscription, subscription: *nats.Subscription,
status: nats.Status, status: nats.Status,
@@ -119,18 +119,18 @@ fn errorHandler(
_ = status; _ = status;
} }
fn connectionHandler(userdata: *const u32, connection: *nats.Connection) void { fn connectionHandler(userdata: *u32, connection: *nats.Connection) void {
_ = userdata; _ = userdata;
_ = connection; _ = connection;
} }
fn jwtHandler(userdata: *const u32) nats.JwtResponseOrError { fn jwtHandler(userdata: *u32) nats.JwtResponseOrError {
_ = userdata; _ = userdata;
// return .{ .jwt = std.heap.raw_c_allocator.dupeZ(u8, "abcdef") catch @panic("no!") }; // return .{ .jwt = std.heap.raw_c_allocator.dupeZ(u8, "abcdef") catch @panic("no!") };
return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "dang") catch @panic("no!") }; return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "dang") catch @panic("no!") };
} }
fn signatureHandler(userdata: *const u32, nonce: [:0]const u8) nats.SignatureResponseOrError { fn signatureHandler(userdata: *u32, nonce: [:0]const u8) nats.SignatureResponseOrError {
_ = userdata; _ = userdata;
_ = nonce; _ = nonce;
// return .{ .signature = std.heap.raw_c_allocator.dupe(u8, "01230123") catch @panic("no!") }; // return .{ .signature = std.heap.raw_c_allocator.dupe(u8, "01230123") catch @panic("no!") };
@@ -144,7 +144,7 @@ test "nats.ConnectionOptions" {
const options = try nats.ConnectionOptions.create(); const options = try nats.ConnectionOptions.create();
defer options.destroy(); defer options.destroy();
const userdata: u32 = 0; var userdata: u32 = 0;
try options.setUrl(nats.default_server_url); try options.setUrl(nats.default_server_url);
const servers = [_][*:0]const u8{ "nats://127.0.0.1:4442", "nats://127.0.0.1:4443" }; const servers = [_][*:0]const u8{ "nats://127.0.0.1:4442", "nats://127.0.0.1:4443" };
@@ -164,14 +164,14 @@ test "nats.ConnectionOptions" {
try options.setMaxReconnect(10); try options.setMaxReconnect(10);
try options.setReconnectWait(500); try options.setReconnectWait(500);
try options.setReconnectJitter(100, 200); try options.setReconnectJitter(100, 200);
try options.setCustomReconnectDelay(*const u32, reconnectDelayHandler, &userdata); try options.setCustomReconnectDelay(*u32, reconnectDelayHandler, &userdata);
try options.setReconnectBufSize(1024); try options.setReconnectBufSize(1024);
try options.setMaxPendingMessages(50); try options.setMaxPendingMessages(50);
try options.setErrorHandler(*const u32, errorHandler, &userdata); try options.setErrorHandler(*u32, errorHandler, &userdata);
try options.setClosedCallback(*const u32, connectionHandler, &userdata); try options.setClosedCallback(*u32, connectionHandler, &userdata);
try options.setDisconnectedCallback(*const u32, connectionHandler, &userdata); try options.setDisconnectedCallback(*u32, connectionHandler, &userdata);
try options.setDiscoveredServersCallback(*const u32, connectionHandler, &userdata); try options.setDiscoveredServersCallback(*u32, connectionHandler, &userdata);
try options.setLameDuckModeCallback(*const u32, connectionHandler, &userdata); try options.setLameDuckModeCallback(*u32, connectionHandler, &userdata);
try options.ignoreDiscoveredServers(true); try options.ignoreDiscoveredServers(true);
try options.useGlobalMessageDelivery(false); try options.useGlobalMessageDelivery(false);
try options.ipResolutionOrder(.ipv4_first); try options.ipResolutionOrder(.ipv4_first);
@@ -179,8 +179,8 @@ test "nats.ConnectionOptions" {
try options.useOldRequestStyle(false); try options.useOldRequestStyle(false);
try options.setFailRequestsOnDisconnect(true); try options.setFailRequestsOnDisconnect(true);
try options.setNoEcho(true); try options.setNoEcho(true);
try options.setRetryOnFailedConnect(*const u32, connectionHandler, true, &userdata); try options.setRetryOnFailedConnect(*u32, connectionHandler, true, &userdata);
try options.setUserCredentialsCallbacks(*const u32, *const u32, jwtHandler, signatureHandler, &userdata, &userdata); try options.setUserCredentialsCallbacks(*u32, *u32, jwtHandler, signatureHandler, &userdata, &userdata);
try options.setWriteDeadline(5); try options.setWriteDeadline(5);
try options.disableNoResponders(true); try options.disableNoResponders(true);
try options.setCustomInboxPrefix("_FOOBOX"); try options.setCustomInboxPrefix("_FOOBOX");

View File

@@ -80,21 +80,18 @@ test "nats.Subscription" {
} }
fn onMessage( fn onMessage(
userdata: *const u32, userdata: *u32,
connection: *nats.Connection, connection: *nats.Connection,
subscription: *nats.Subscription, subscription: *nats.Subscription,
message: *nats.Message, message: *nats.Message,
) void { ) void {
_ = subscription; _ = subscription;
_ = userdata;
if (message.getReply()) |reply| { if (message.getReply()) |reply| {
connection.publish(reply, "greetings") catch @panic("OH NO"); connection.publish(reply, "greetings") catch @panic("OH NO");
} else @panic("HOW"); } else @panic("HOW");
}
fn onClose(userdata: *[]const u8) void { userdata.* += 1;
userdata.* = "closed";
} }
test "nats.Subscription (async)" { test "nats.Subscription (async)" {
@@ -115,30 +112,21 @@ test "nats.Subscription (async)" {
defer message.destroy(); defer message.destroy();
{ {
var closed: []const u8 = "test"; var count: u32 = 0;
{ const subscription = try connection.subscribe(*u32, message_subject, onMessage, &count);
const count: u32 = 0; defer subscription.destroy();
const subscription = try connection.subscribe(*const u32, message_subject, onMessage, &count);
defer subscription.destroy();
try subscription.setCompletionCallback(*[]const u8, onClose, &closed); const response = try connection.requestMessage(message, 1000);
try std.testing.expectEqualStrings(
const response = try connection.requestMessage(message, 1000); "greetings",
try std.testing.expectEqualStrings( response.getData() orelse return error.TestUnexpectedResult,
"greetings", );
response.getData() orelse return error.TestUnexpectedResult,
);
}
// we have to sleep to allow the close callback to run. I am worried this may
// still end up being flaky, however.
nats.sleep(1);
try std.testing.expectEqualStrings("closed", closed);
} }
{ {
const count: u32 = 0; var count: u32 = 0;
const subscription = try connection.subscribeTimeout( const subscription = try connection.subscribeTimeout(
*const u32, *u32,
message_subject, message_subject,
1000, 1000,
onMessage, onMessage,
@@ -154,9 +142,9 @@ test "nats.Subscription (async)" {
} }
{ {
const count: u32 = 0; var count: u32 = 0;
const subscription = try connection.queueSubscribe( const subscription = try connection.queueSubscribe(
*const u32, *u32,
message_subject, message_subject,
"queuegroup", "queuegroup",
onMessage, onMessage,
@@ -174,7 +162,7 @@ test "nats.Subscription (async)" {
{ {
var count: u32 = 0; var count: u32 = 0;
const subscription = try connection.queueSubscribeTimeout( const subscription = try connection.queueSubscribeTimeout(
*const u32, *u32,
message_subject, message_subject,
"queuegroup", "queuegroup",
1000, 1000,