From a53b32204dbaf99ad4f6852ec851ff2555867d87 Mon Sep 17 00:00:00 2001 From: torque Date: Sun, 3 Sep 2023 16:28:00 -0700 Subject: [PATCH] tests: add subscription method coverage This resulted in some minor binding fixes. In theory, all the main wrapped API endpoints are now covered. --- src/connection.zig | 4 +- src/subscription.zig | 26 +++--- tests/main.zig | 1 + tests/subscription.zig | 180 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 196 insertions(+), 15 deletions(-) create mode 100644 tests/subscription.zig diff --git a/src/connection.zig b/src/connection.zig index cefde3b..312c29d 100644 --- a/src/connection.zig +++ b/src/connection.zig @@ -423,7 +423,7 @@ pub const Connection = opaque { ) Error!*Subscription { var sub: *Subscription = undefined; - const status = Status.fromInt(nats_c.natsConnection_QueueSubscribe( + const status = Status.fromInt(nats_c.natsConnection_QueueSubscribeTimeout( @ptrCast(&sub), @ptrCast(self), subject.ptr, @@ -443,7 +443,7 @@ pub const Connection = opaque { ) Error!*Subscription { var sub: *Subscription = undefined; - const status = Status.fromInt(nats_c.natsConnection_SubscribeSync( + const status = Status.fromInt(nats_c.natsConnection_QueueSubscribeSync( @ptrCast(&sub), @ptrCast(self), subject.ptr, diff --git a/src/subscription.zig b/src/subscription.zig index 15c165e..27c0bfb 100644 --- a/src/subscription.zig +++ b/src/subscription.zig @@ -28,19 +28,19 @@ const Status = err_.Status; const thunk = @import("./thunk.zig"); -pub const MessageCount = struct { - messages: c_int = 0, - bytes: c_int = 0, -}; - -pub const SubscriptionStats = struct { - pending: MessageCount = .{}, - max_pending: MessageCount = .{}, - delivered_messages: i64 = 0, - dropped_messages: i64 = 0, -}; - pub const Subscription = opaque { + pub const MessageCount = struct { + messages: c_int = 0, + bytes: c_int = 0, + }; + + pub const SubscriptionStats = struct { + pending: MessageCount = .{}, + max_pending: MessageCount = .{}, + delivered_messages: i64 = 0, + dropped_messages: i64 = 0, + }; + pub fn isValid(self: *Subscription) bool { return nats_c.natsSubscription_IsValid(@ptrCast(self)); } @@ -84,7 +84,7 @@ pub const Subscription = opaque { // invalid/closed subscriptions return null. should we convert that into an // error? could return error.InvalidSubscription const result = nats_c.natsSubscription_GetSubject(@ptrCast(self)) orelse return null; - return std.mem.spanTo(u8, result, 0); + return std.mem.sliceTo(result, 0); } pub fn setPendingLimits(self: *Subscription, limit: MessageCount) Error!void { diff --git a/tests/main.zig b/tests/main.zig index 34f0ed4..2cf0fb6 100644 --- a/tests/main.zig +++ b/tests/main.zig @@ -5,4 +5,5 @@ test { _ = @import("./nats.zig"); _ = @import("./connection.zig"); _ = @import("./message.zig"); + _ = @import("./subscription.zig"); } diff --git a/tests/subscription.zig b/tests/subscription.zig new file mode 100644 index 0000000..0de237f --- /dev/null +++ b/tests/subscription.zig @@ -0,0 +1,180 @@ +// This file is licensed under the CC0 1.0 license. +// See: https://creativecommons.org/publicdomain/zero/1.0/legalcode + +const std = @import("std"); + +const nats = @import("nats"); + +const util = @import("./util.zig"); + +test "nats.Subscription" { + var server = try util.TestServer.launch(.{}); + defer server.stop(); + + try nats.init(nats.default_spin_count); + defer nats.deinit(); + + const connection = try nats.Connection.connectTo(nats.default_server_url); + defer connection.destroy(); + + const message_subject: [:0]const u8 = "hello"; + const message_reply: [:0]const u8 = "reply"; + const message_data: [:0]const u8 = "world"; + + const message = try nats.Message.create(message_subject, message_reply, message_data); + defer message.destroy(); + + { + const subscription = try connection.subscribeSync(message_subject); + defer subscription.destroy(); + + try subscription.autoUnsubscribe(1); + try subscription.setPendingLimits(.{ .messages = 10, .bytes = 1000 }); + _ = try subscription.getPendingLimits(); + _ = try subscription.getPending(); + _ = try subscription.getMaxPending(); + try subscription.clearMaxPending(); + _ = try subscription.getDelivered(); + _ = try subscription.getDropped(); + _ = try subscription.getStats(); + _ = try subscription.queuedMessageCount(); + _ = subscription.getId(); + const subj = subscription.getSubject() orelse return error.TestUnexpectedResult; + try std.testing.expectEqualStrings(message_subject, subj); + + try connection.publishMessage(message); + const roundtrip = try subscription.nextMessage(1000); + try std.testing.expectEqualStrings( + message_data, + roundtrip.getData() orelse return error.TestUnexpectedResult, + ); + + try std.testing.expect(subscription.isValid() == false); + } + + { + const subscription = try connection.queueSubscribeSync(message_subject, "queuegroup"); + defer subscription.destroy(); + + try subscription.drain(); + try subscription.waitForDrainCompletion(1000); + _ = subscription.drainCompletionStatus(); + } + + { + const subscription = try connection.subscribeSync(message_subject); + defer subscription.destroy(); + + try subscription.drain(); + try subscription.waitForDrainCompletion(1000); + _ = subscription.drainCompletionStatus(); + } + + { + const subscription = try connection.subscribeSync(message_subject); + defer subscription.destroy(); + + try subscription.drainTimeout(1000); + try subscription.waitForDrainCompletion(1000); + } +} + +fn onMessage( + userdata: *u32, + connection: *nats.Connection, + subscription: *nats.Subscription, + message: *nats.Message, +) void { + _ = subscription; + + if (message.getReply()) |reply| { + connection.publish(reply, "greetings") catch @panic("OH NO"); + } else @panic("HOW"); + + userdata.* += 1; +} + +test "nats.Subscription (async)" { + var server = try util.TestServer.launch(.{}); + defer server.stop(); + + try nats.init(nats.default_spin_count); + defer nats.deinit(); + + const connection = try nats.Connection.connectTo(nats.default_server_url); + defer connection.destroy(); + + const message_subject: [:0]const u8 = "hello"; + const message_reply: [:0]const u8 = "reply"; + const message_data: [:0]const u8 = "world"; + + const message = try nats.Message.create(message_subject, message_reply, message_data); + defer message.destroy(); + + { + var count: u32 = 0; + const subscription = try connection.subscribe(u32, message_subject, onMessage, &count); + defer subscription.destroy(); + + const response = try connection.requestMessage(message, 1000); + try std.testing.expectEqualStrings( + "greetings", + response.getData() orelse return error.TestUnexpectedResult, + ); + } + + { + var count: u32 = 0; + const subscription = try connection.subscribeTimeout( + u32, + message_subject, + 1000, + onMessage, + &count, + ); + defer subscription.destroy(); + + const response = try connection.requestMessage(message, 1000); + try std.testing.expectEqualStrings( + "greetings", + response.getData() orelse return error.TestUnexpectedResult, + ); + } + + { + var count: u32 = 0; + const subscription = try connection.queueSubscribe( + u32, + message_subject, + "queuegroup", + onMessage, + &count, + ); + defer subscription.destroy(); + + const response = try connection.requestMessage(message, 1000); + try std.testing.expectEqualStrings( + "greetings", + response.getData() orelse return error.TestUnexpectedResult, + ); + } + + { + var count: u32 = 0; + const subscription = try connection.queueSubscribeTimeout( + u32, + message_subject, + "queuegroup", + 1000, + onMessage, + &count, + ); + defer subscription.destroy(); + + const response = try connection.requestMessage(message, 1000); + try std.testing.expectEqualStrings( + "greetings", + response.getData() orelse return error.TestUnexpectedResult, + ); + } +}