Compare commits
5 Commits
master
...
zig-0.11.x
Author | SHA1 | Date | |
---|---|---|---|
320bef2c63 | |||
28c7890f6c | |||
5939836bec | |||
4f26bf8fca | |||
c4a8ae1a38 |
@ -59,7 +59,7 @@ const examples = [_]Example{
|
|||||||
.{ .name = "pub_bytes", .file = "examples/pub_bytes.zig" },
|
.{ .name = "pub_bytes", .file = "examples/pub_bytes.zig" },
|
||||||
};
|
};
|
||||||
|
|
||||||
pub fn add_examples(b: *std.build, options: ExampleOptions) void {
|
pub fn add_examples(b: *std.Build, options: ExampleOptions) void {
|
||||||
const example_step = b.step("examples", "build examples");
|
const example_step = b.step("examples", "build examples");
|
||||||
|
|
||||||
inline for (examples) |example| {
|
inline for (examples) |example| {
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
.{
|
.{
|
||||||
.name = "nats-client",
|
.name = "nats.zig",
|
||||||
.version = "0.0.1",
|
.version = "0.0.1",
|
||||||
.dependencies = .{
|
.dependencies = .{
|
||||||
.libressl = .{
|
.libressl = .{
|
||||||
.url = "https://github.com/epicyclic-dev/LibreSSL-portable/archive/2b68369a2b883714cea05357aa378b3a3e8ef2f6.tar.gz",
|
.url = "https://github.com/epicyclic-dev/LibreSSL-portable/archive/4bbf9ad43fd5d56c8e15bc2e880aab7c4e49731b.tar.gz",
|
||||||
.hash = "12206b907fcb1dea424d122d29a0549bdc6c83648e0433973388b2efb6813b36a8fa",
|
.hash = "1220282c6f64f531b9d07d5ed1959708822f4f8dc2486a7005be391c8f5cdf2a502a",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -348,7 +348,7 @@ pub const Connection = opaque {
|
|||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
subject.ptr,
|
subject.ptr,
|
||||||
makeSubscriptionCallbackThunk(T, callback),
|
makeSubscriptionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
));
|
));
|
||||||
return status.toError() orelse sub;
|
return status.toError() orelse sub;
|
||||||
}
|
}
|
||||||
@ -369,7 +369,7 @@ pub const Connection = opaque {
|
|||||||
subject.ptr,
|
subject.ptr,
|
||||||
timeout,
|
timeout,
|
||||||
makeSubscriptionCallbackThunk(T, callback),
|
makeSubscriptionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
));
|
));
|
||||||
|
|
||||||
return status.toError() orelse sub;
|
return status.toError() orelse sub;
|
||||||
@ -403,7 +403,7 @@ pub const Connection = opaque {
|
|||||||
subject.ptr,
|
subject.ptr,
|
||||||
queue_group.ptr,
|
queue_group.ptr,
|
||||||
makeSubscriptionCallbackThunk(T, callback),
|
makeSubscriptionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
));
|
));
|
||||||
|
|
||||||
return status.toError() orelse sub;
|
return status.toError() orelse sub;
|
||||||
@ -427,7 +427,7 @@ pub const Connection = opaque {
|
|||||||
queue_group.ptr,
|
queue_group.ptr,
|
||||||
timeout,
|
timeout,
|
||||||
makeSubscriptionCallbackThunk(T, callback),
|
makeSubscriptionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
));
|
));
|
||||||
|
|
||||||
return status.toError() orelse sub;
|
return status.toError() orelse sub;
|
||||||
@ -504,7 +504,7 @@ pub const ConnectionOptions = opaque {
|
|||||||
return Status.fromInt(nats_c.natsOptions_SetTokenHandler(
|
return Status.fromInt(nats_c.natsOptions_SetTokenHandler(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
makeTokenCallbackThunk(T, callback),
|
makeTokenCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -644,7 +644,7 @@ pub const ConnectionOptions = opaque {
|
|||||||
nats_c.natsOptions_SetCustomReconnectDelay(
|
nats_c.natsOptions_SetCustomReconnectDelay(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
makeReconnectDelayCallbackThunk(T, callback),
|
makeReconnectDelayCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
),
|
),
|
||||||
).raise();
|
).raise();
|
||||||
}
|
}
|
||||||
@ -671,7 +671,7 @@ pub const ConnectionOptions = opaque {
|
|||||||
nats_c.natsOptions_SetErrorHandler(
|
nats_c.natsOptions_SetErrorHandler(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
makeErrorHandlerCallbackThunk(T, callback),
|
makeErrorHandlerCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
),
|
),
|
||||||
).raise();
|
).raise();
|
||||||
}
|
}
|
||||||
@ -685,7 +685,7 @@ pub const ConnectionOptions = opaque {
|
|||||||
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
makeConnectionCallbackThunk(T, callback),
|
makeConnectionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -698,7 +698,7 @@ pub const ConnectionOptions = opaque {
|
|||||||
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
makeConnectionCallbackThunk(T, callback),
|
makeConnectionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -711,7 +711,7 @@ pub const ConnectionOptions = opaque {
|
|||||||
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
makeConnectionCallbackThunk(T, callback),
|
makeConnectionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -724,7 +724,7 @@ pub const ConnectionOptions = opaque {
|
|||||||
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
makeConnectionCallbackThunk(T, callback),
|
makeConnectionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -737,7 +737,7 @@ pub const ConnectionOptions = opaque {
|
|||||||
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
makeConnectionCallbackThunk(T, callback),
|
makeConnectionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -753,7 +753,7 @@ pub const ConnectionOptions = opaque {
|
|||||||
) Error!void {
|
) Error!void {
|
||||||
return Status.fromInt(nats_c.natsOptions_SetEventLoop(
|
return Status.fromInt(nats_c.natsOptions_SetEventLoop(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
@ptrCast(loop),
|
@constCast(@ptrCast(loop)),
|
||||||
makeAttachEventLoopCallbackThunk(T, L, attach_callback),
|
makeAttachEventLoopCallbackThunk(T, L, attach_callback),
|
||||||
makeEventLoopAddRemoveCallbackThunk(T, read_callback),
|
makeEventLoopAddRemoveCallbackThunk(T, read_callback),
|
||||||
makeEventLoopAddRemoveCallbackThunk(T, write_callback),
|
makeEventLoopAddRemoveCallbackThunk(T, write_callback),
|
||||||
@ -822,7 +822,7 @@ pub const ConnectionOptions = opaque {
|
|||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
retry,
|
retry,
|
||||||
makeConnectionCallbackThunk(T, callback),
|
makeConnectionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -838,13 +838,17 @@ pub const ConnectionOptions = opaque {
|
|||||||
return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks(
|
return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
makeJwtHandlerCallbackThunk(T, jwt_callback),
|
makeJwtHandlerCallbackThunk(T, jwt_callback),
|
||||||
jwt_userdata,
|
@constCast(@ptrCast(jwt_userdata)),
|
||||||
makeSignatureHandlerCallbackThunk(U, sig_callback),
|
makeSignatureHandlerCallbackThunk(U, sig_callback),
|
||||||
sig_userdata,
|
@constCast(@ptrCast(sig_userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn setUserCredentialsFromFiles(self: *ConnectionOptions, user_or_chained_file: [:0]const u8, seed_file: [:0]const u8) Error!void {
|
pub fn setUserCredentialsFromFiles(
|
||||||
|
self: *ConnectionOptions,
|
||||||
|
user_or_chained_file: [:0]const u8,
|
||||||
|
seed_file: [:0]const u8,
|
||||||
|
) Error!void {
|
||||||
return Status.fromInt(
|
return Status.fromInt(
|
||||||
nats_c.natsOptions_SetUserCredentialsFromFiles(
|
nats_c.natsOptions_SetUserCredentialsFromFiles(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
@ -874,7 +878,7 @@ pub const ConnectionOptions = opaque {
|
|||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
pub_key.ptr,
|
pub_key.ptr,
|
||||||
makeSignatureHandlerCallbackThunk(T, sig_callback),
|
makeSignatureHandlerCallbackThunk(T, sig_callback),
|
||||||
sig_userdata,
|
@constCast(@ptrCast(sig_userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,12 +174,12 @@ pub const Subscription = opaque {
|
|||||||
self: *Subscription,
|
self: *Subscription,
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const thunk.SimpleCallbackThunkSignature(T),
|
comptime callback: *const thunk.SimpleCallbackThunkSignature(T),
|
||||||
userdata: *T,
|
userdata: T,
|
||||||
) Error!void {
|
) Error!void {
|
||||||
return Status.fromInt(nats_c.natsSubscription_SetOnCompleteCB(
|
return Status.fromInt(nats_c.natsSubscription_SetOnCompleteCB(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
thunk.makeSimpleCallbackThunk(callback),
|
thunk.makeSimpleCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -23,7 +23,7 @@ pub fn checkUserDataType(comptime T: type) void {
|
|||||||
.Optional => |info| switch (@typeInfo(info.child)) {
|
.Optional => |info| switch (@typeInfo(info.child)) {
|
||||||
.Optional => @compileError(
|
.Optional => @compileError(
|
||||||
"nats callbacks can only accept an (optional) single, many," ++
|
"nats callbacks can only accept an (optional) single, many," ++
|
||||||
" or c pointer as userdata, not slices. \"" ++
|
" or c pointer as userdata. \"" ++
|
||||||
@typeName(T) ++ "\" has more than one optional specifier.",
|
@typeName(T) ++ "\" has more than one optional specifier.",
|
||||||
),
|
),
|
||||||
else => checkUserDataType(info.child),
|
else => checkUserDataType(info.child),
|
||||||
@ -38,7 +38,7 @@ pub fn checkUserDataType(comptime T: type) void {
|
|||||||
},
|
},
|
||||||
else => @compileError(
|
else => @compileError(
|
||||||
"nats callbacks can only accept an (optional) single, many," ++
|
"nats callbacks can only accept an (optional) single, many," ++
|
||||||
" or c pointer as userdata, not slices. \"" ++
|
" or c pointer as userdata. \"" ++
|
||||||
@typeName(T) ++ "\" is not a pointer type.",
|
@typeName(T) ++ "\" is not a pointer type.",
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
@ -57,7 +57,7 @@ pub fn makeSimpleCallbackThunk(
|
|||||||
comptime checkUserDataType(T);
|
comptime checkUserDataType(T);
|
||||||
return struct {
|
return struct {
|
||||||
fn thunk(userdata: ?*anyopaque) callconv(.C) void {
|
fn thunk(userdata: ?*anyopaque) callconv(.C) void {
|
||||||
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
||||||
callback(data);
|
callback(data);
|
||||||
}
|
}
|
||||||
}.thunk;
|
}.thunk;
|
||||||
|
@ -99,7 +99,7 @@ test "nats.Connection" {
|
|||||||
connection.drainTimeout(1000) catch {};
|
connection.drainTimeout(1000) catch {};
|
||||||
}
|
}
|
||||||
|
|
||||||
fn reconnectDelayHandler(userdata: *u32, connection: *nats.Connection, attempts: c_int) i64 {
|
fn reconnectDelayHandler(userdata: *const u32, connection: *nats.Connection, attempts: c_int) i64 {
|
||||||
_ = userdata;
|
_ = userdata;
|
||||||
_ = connection;
|
_ = connection;
|
||||||
_ = attempts;
|
_ = attempts;
|
||||||
@ -108,7 +108,7 @@ fn reconnectDelayHandler(userdata: *u32, connection: *nats.Connection, attempts:
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn errorHandler(
|
fn errorHandler(
|
||||||
userdata: *u32,
|
userdata: *const u32,
|
||||||
connection: *nats.Connection,
|
connection: *nats.Connection,
|
||||||
subscription: *nats.Subscription,
|
subscription: *nats.Subscription,
|
||||||
status: nats.Status,
|
status: nats.Status,
|
||||||
@ -119,18 +119,18 @@ fn errorHandler(
|
|||||||
_ = status;
|
_ = status;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn connectionHandler(userdata: *u32, connection: *nats.Connection) void {
|
fn connectionHandler(userdata: *const u32, connection: *nats.Connection) void {
|
||||||
_ = userdata;
|
_ = userdata;
|
||||||
_ = connection;
|
_ = connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn jwtHandler(userdata: *u32) nats.JwtResponseOrError {
|
fn jwtHandler(userdata: *const u32) nats.JwtResponseOrError {
|
||||||
_ = userdata;
|
_ = userdata;
|
||||||
// return .{ .jwt = std.heap.raw_c_allocator.dupeZ(u8, "abcdef") catch @panic("no!") };
|
// return .{ .jwt = std.heap.raw_c_allocator.dupeZ(u8, "abcdef") catch @panic("no!") };
|
||||||
return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "dang") catch @panic("no!") };
|
return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "dang") catch @panic("no!") };
|
||||||
}
|
}
|
||||||
|
|
||||||
fn signatureHandler(userdata: *u32, nonce: [:0]const u8) nats.SignatureResponseOrError {
|
fn signatureHandler(userdata: *const u32, nonce: [:0]const u8) nats.SignatureResponseOrError {
|
||||||
_ = userdata;
|
_ = userdata;
|
||||||
_ = nonce;
|
_ = nonce;
|
||||||
// return .{ .signature = std.heap.raw_c_allocator.dupe(u8, "01230123") catch @panic("no!") };
|
// return .{ .signature = std.heap.raw_c_allocator.dupe(u8, "01230123") catch @panic("no!") };
|
||||||
@ -144,7 +144,7 @@ test "nats.ConnectionOptions" {
|
|||||||
const options = try nats.ConnectionOptions.create();
|
const options = try nats.ConnectionOptions.create();
|
||||||
defer options.destroy();
|
defer options.destroy();
|
||||||
|
|
||||||
var userdata: u32 = 0;
|
const userdata: u32 = 0;
|
||||||
|
|
||||||
try options.setUrl(nats.default_server_url);
|
try options.setUrl(nats.default_server_url);
|
||||||
const servers = [_][*:0]const u8{ "nats://127.0.0.1:4442", "nats://127.0.0.1:4443" };
|
const servers = [_][*:0]const u8{ "nats://127.0.0.1:4442", "nats://127.0.0.1:4443" };
|
||||||
@ -164,14 +164,14 @@ test "nats.ConnectionOptions" {
|
|||||||
try options.setMaxReconnect(10);
|
try options.setMaxReconnect(10);
|
||||||
try options.setReconnectWait(500);
|
try options.setReconnectWait(500);
|
||||||
try options.setReconnectJitter(100, 200);
|
try options.setReconnectJitter(100, 200);
|
||||||
try options.setCustomReconnectDelay(*u32, reconnectDelayHandler, &userdata);
|
try options.setCustomReconnectDelay(*const u32, reconnectDelayHandler, &userdata);
|
||||||
try options.setReconnectBufSize(1024);
|
try options.setReconnectBufSize(1024);
|
||||||
try options.setMaxPendingMessages(50);
|
try options.setMaxPendingMessages(50);
|
||||||
try options.setErrorHandler(*u32, errorHandler, &userdata);
|
try options.setErrorHandler(*const u32, errorHandler, &userdata);
|
||||||
try options.setClosedCallback(*u32, connectionHandler, &userdata);
|
try options.setClosedCallback(*const u32, connectionHandler, &userdata);
|
||||||
try options.setDisconnectedCallback(*u32, connectionHandler, &userdata);
|
try options.setDisconnectedCallback(*const u32, connectionHandler, &userdata);
|
||||||
try options.setDiscoveredServersCallback(*u32, connectionHandler, &userdata);
|
try options.setDiscoveredServersCallback(*const u32, connectionHandler, &userdata);
|
||||||
try options.setLameDuckModeCallback(*u32, connectionHandler, &userdata);
|
try options.setLameDuckModeCallback(*const u32, connectionHandler, &userdata);
|
||||||
try options.ignoreDiscoveredServers(true);
|
try options.ignoreDiscoveredServers(true);
|
||||||
try options.useGlobalMessageDelivery(false);
|
try options.useGlobalMessageDelivery(false);
|
||||||
try options.ipResolutionOrder(.ipv4_first);
|
try options.ipResolutionOrder(.ipv4_first);
|
||||||
@ -179,8 +179,8 @@ test "nats.ConnectionOptions" {
|
|||||||
try options.useOldRequestStyle(false);
|
try options.useOldRequestStyle(false);
|
||||||
try options.setFailRequestsOnDisconnect(true);
|
try options.setFailRequestsOnDisconnect(true);
|
||||||
try options.setNoEcho(true);
|
try options.setNoEcho(true);
|
||||||
try options.setRetryOnFailedConnect(*u32, connectionHandler, true, &userdata);
|
try options.setRetryOnFailedConnect(*const u32, connectionHandler, true, &userdata);
|
||||||
try options.setUserCredentialsCallbacks(*u32, *u32, jwtHandler, signatureHandler, &userdata, &userdata);
|
try options.setUserCredentialsCallbacks(*const u32, *const u32, jwtHandler, signatureHandler, &userdata, &userdata);
|
||||||
try options.setWriteDeadline(5);
|
try options.setWriteDeadline(5);
|
||||||
try options.disableNoResponders(true);
|
try options.disableNoResponders(true);
|
||||||
try options.setCustomInboxPrefix("_FOOBOX");
|
try options.setCustomInboxPrefix("_FOOBOX");
|
||||||
|
@ -80,18 +80,21 @@ test "nats.Subscription" {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn onMessage(
|
fn onMessage(
|
||||||
userdata: *u32,
|
userdata: *const u32,
|
||||||
connection: *nats.Connection,
|
connection: *nats.Connection,
|
||||||
subscription: *nats.Subscription,
|
subscription: *nats.Subscription,
|
||||||
message: *nats.Message,
|
message: *nats.Message,
|
||||||
) void {
|
) void {
|
||||||
_ = subscription;
|
_ = subscription;
|
||||||
|
_ = userdata;
|
||||||
|
|
||||||
if (message.getReply()) |reply| {
|
if (message.getReply()) |reply| {
|
||||||
connection.publish(reply, "greetings") catch @panic("OH NO");
|
connection.publish(reply, "greetings") catch @panic("OH NO");
|
||||||
} else @panic("HOW");
|
} else @panic("HOW");
|
||||||
|
}
|
||||||
|
|
||||||
userdata.* += 1;
|
fn onClose(userdata: *[]const u8) void {
|
||||||
|
userdata.* = "closed";
|
||||||
}
|
}
|
||||||
|
|
||||||
test "nats.Subscription (async)" {
|
test "nats.Subscription (async)" {
|
||||||
@ -112,21 +115,30 @@ test "nats.Subscription (async)" {
|
|||||||
defer message.destroy();
|
defer message.destroy();
|
||||||
|
|
||||||
{
|
{
|
||||||
var count: u32 = 0;
|
var closed: []const u8 = "test";
|
||||||
const subscription = try connection.subscribe(*u32, message_subject, onMessage, &count);
|
{
|
||||||
|
const count: u32 = 0;
|
||||||
|
const subscription = try connection.subscribe(*const u32, message_subject, onMessage, &count);
|
||||||
defer subscription.destroy();
|
defer subscription.destroy();
|
||||||
|
|
||||||
|
try subscription.setCompletionCallback(*[]const u8, onClose, &closed);
|
||||||
|
|
||||||
const response = try connection.requestMessage(message, 1000);
|
const response = try connection.requestMessage(message, 1000);
|
||||||
try std.testing.expectEqualStrings(
|
try std.testing.expectEqualStrings(
|
||||||
"greetings",
|
"greetings",
|
||||||
response.getData() orelse return error.TestUnexpectedResult,
|
response.getData() orelse return error.TestUnexpectedResult,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
// we have to sleep to allow the close callback to run. I am worried this may
|
||||||
|
// still end up being flaky, however.
|
||||||
|
nats.sleep(1);
|
||||||
|
try std.testing.expectEqualStrings("closed", closed);
|
||||||
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
var count: u32 = 0;
|
const count: u32 = 0;
|
||||||
const subscription = try connection.subscribeTimeout(
|
const subscription = try connection.subscribeTimeout(
|
||||||
*u32,
|
*const u32,
|
||||||
message_subject,
|
message_subject,
|
||||||
1000,
|
1000,
|
||||||
onMessage,
|
onMessage,
|
||||||
@ -142,9 +154,9 @@ test "nats.Subscription (async)" {
|
|||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
var count: u32 = 0;
|
const count: u32 = 0;
|
||||||
const subscription = try connection.queueSubscribe(
|
const subscription = try connection.queueSubscribe(
|
||||||
*u32,
|
*const u32,
|
||||||
message_subject,
|
message_subject,
|
||||||
"queuegroup",
|
"queuegroup",
|
||||||
onMessage,
|
onMessage,
|
||||||
@ -162,7 +174,7 @@ test "nats.Subscription (async)" {
|
|||||||
{
|
{
|
||||||
var count: u32 = 0;
|
var count: u32 = 0;
|
||||||
const subscription = try connection.queueSubscribeTimeout(
|
const subscription = try connection.queueSubscribeTimeout(
|
||||||
*u32,
|
*const u32,
|
||||||
message_subject,
|
message_subject,
|
||||||
"queuegroup",
|
"queuegroup",
|
||||||
1000,
|
1000,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user