One of the things I have not learned but done a better job of internalizing while working with zig over the last few months is that the less magic that exists, the better. In the case of parameterized functions, this means that it is much better to restrict the range of types that are permitted to be passed than to perform type manipulation. In other words, it's more confusing to see a function that is parameterized with `SomeType` taking a pointer to that type than having it be parameterized directly to take the pointer. Obviously there are exceptions to this rule, like std.mem.eql taking slices of its parameterized type.
181 lines
5.3 KiB
Zig
181 lines
5.3 KiB
Zig
// This file is licensed under the CC0 1.0 license.
|
|
// See: https://creativecommons.org/publicdomain/zero/1.0/legalcode
|
|
|
|
const std = @import("std");
|
|
|
|
const nats = @import("nats");
|
|
|
|
const util = @import("./util.zig");
|
|
|
|
test "nats.Subscription" {
|
|
var server = try util.TestServer.launch(.{});
|
|
defer server.stop();
|
|
|
|
try nats.init(nats.default_spin_count);
|
|
defer nats.deinit();
|
|
|
|
const connection = try nats.Connection.connectTo(server.url);
|
|
defer connection.destroy();
|
|
|
|
const message_subject: [:0]const u8 = "hello";
|
|
const message_reply: [:0]const u8 = "reply";
|
|
const message_data: [:0]const u8 = "world";
|
|
|
|
const message = try nats.Message.create(message_subject, message_reply, message_data);
|
|
defer message.destroy();
|
|
|
|
{
|
|
const subscription = try connection.subscribeSync(message_subject);
|
|
defer subscription.destroy();
|
|
|
|
try subscription.autoUnsubscribe(1);
|
|
try subscription.setPendingLimits(.{ .messages = 10, .bytes = 1000 });
|
|
_ = try subscription.getPendingLimits();
|
|
_ = try subscription.getPending();
|
|
_ = try subscription.getMaxPending();
|
|
try subscription.clearMaxPending();
|
|
_ = try subscription.getDelivered();
|
|
_ = try subscription.getDropped();
|
|
_ = try subscription.getStats();
|
|
_ = try subscription.queuedMessageCount();
|
|
_ = subscription.getId();
|
|
const subj = subscription.getSubject() orelse return error.TestUnexpectedResult;
|
|
try std.testing.expectEqualStrings(message_subject, subj);
|
|
|
|
try connection.publishMessage(message);
|
|
const roundtrip = try subscription.nextMessage(1000);
|
|
try std.testing.expectEqualStrings(
|
|
message_data,
|
|
roundtrip.getData() orelse return error.TestUnexpectedResult,
|
|
);
|
|
|
|
try std.testing.expect(subscription.isValid() == false);
|
|
}
|
|
|
|
{
|
|
const subscription = try connection.queueSubscribeSync(message_subject, "queuegroup");
|
|
defer subscription.destroy();
|
|
|
|
try subscription.drain();
|
|
try subscription.waitForDrainCompletion(1000);
|
|
_ = subscription.drainCompletionStatus();
|
|
}
|
|
|
|
{
|
|
const subscription = try connection.subscribeSync(message_subject);
|
|
defer subscription.destroy();
|
|
|
|
try subscription.drain();
|
|
try subscription.waitForDrainCompletion(1000);
|
|
_ = subscription.drainCompletionStatus();
|
|
}
|
|
|
|
{
|
|
const subscription = try connection.subscribeSync(message_subject);
|
|
defer subscription.destroy();
|
|
|
|
try subscription.drainTimeout(1000);
|
|
try subscription.waitForDrainCompletion(1000);
|
|
}
|
|
}
|
|
|
|
fn onMessage(
|
|
userdata: *u32,
|
|
connection: *nats.Connection,
|
|
subscription: *nats.Subscription,
|
|
message: *nats.Message,
|
|
) void {
|
|
_ = subscription;
|
|
|
|
if (message.getReply()) |reply| {
|
|
connection.publish(reply, "greetings") catch @panic("OH NO");
|
|
} else @panic("HOW");
|
|
|
|
userdata.* += 1;
|
|
}
|
|
|
|
test "nats.Subscription (async)" {
|
|
var server = try util.TestServer.launch(.{});
|
|
defer server.stop();
|
|
|
|
try nats.init(nats.default_spin_count);
|
|
defer nats.deinit();
|
|
|
|
const connection = try nats.Connection.connectTo(server.url);
|
|
defer connection.destroy();
|
|
|
|
const message_subject: [:0]const u8 = "hello";
|
|
const message_reply: [:0]const u8 = "reply";
|
|
const message_data: [:0]const u8 = "world";
|
|
|
|
const message = try nats.Message.create(message_subject, message_reply, message_data);
|
|
defer message.destroy();
|
|
|
|
{
|
|
var count: u32 = 0;
|
|
const subscription = try connection.subscribe(*u32, message_subject, onMessage, &count);
|
|
defer subscription.destroy();
|
|
|
|
const response = try connection.requestMessage(message, 1000);
|
|
try std.testing.expectEqualStrings(
|
|
"greetings",
|
|
response.getData() orelse return error.TestUnexpectedResult,
|
|
);
|
|
}
|
|
|
|
{
|
|
var count: u32 = 0;
|
|
const subscription = try connection.subscribeTimeout(
|
|
*u32,
|
|
message_subject,
|
|
1000,
|
|
onMessage,
|
|
&count,
|
|
);
|
|
defer subscription.destroy();
|
|
|
|
const response = try connection.requestMessage(message, 1000);
|
|
try std.testing.expectEqualStrings(
|
|
"greetings",
|
|
response.getData() orelse return error.TestUnexpectedResult,
|
|
);
|
|
}
|
|
|
|
{
|
|
var count: u32 = 0;
|
|
const subscription = try connection.queueSubscribe(
|
|
*u32,
|
|
message_subject,
|
|
"queuegroup",
|
|
onMessage,
|
|
&count,
|
|
);
|
|
defer subscription.destroy();
|
|
|
|
const response = try connection.requestMessage(message, 1000);
|
|
try std.testing.expectEqualStrings(
|
|
"greetings",
|
|
response.getData() orelse return error.TestUnexpectedResult,
|
|
);
|
|
}
|
|
|
|
{
|
|
var count: u32 = 0;
|
|
const subscription = try connection.queueSubscribeTimeout(
|
|
*u32,
|
|
message_subject,
|
|
"queuegroup",
|
|
1000,
|
|
onMessage,
|
|
&count,
|
|
);
|
|
defer subscription.destroy();
|
|
|
|
const response = try connection.requestMessage(message, 1000);
|
|
try std.testing.expectEqualStrings(
|
|
"greetings",
|
|
response.getData() orelse return error.TestUnexpectedResult,
|
|
);
|
|
}
|
|
}
|