Compare commits
3 Commits
d72463ee76
...
a53b32204d
Author | SHA1 | Date | |
---|---|---|---|
a53b32204d | |||
373616f234 | |||
6dc50530f1 |
@ -11,24 +11,25 @@ pub fn main() !void {
|
|||||||
const message = try nats.Message.create("subject", null, "message");
|
const message = try nats.Message.create("subject", null, "message");
|
||||||
defer message.destroy();
|
defer message.destroy();
|
||||||
|
|
||||||
try message.setHeaderValue("My-Key1", "value1");
|
try message.setHeaderValue("foo", "foo-value");
|
||||||
try message.setHeaderValue("My-Key2", "value2");
|
try message.setHeaderValue("bar", "bar-value");
|
||||||
try message.addHeaderValue("My-Key1", "value3");
|
try message.addHeaderValue("foo", "bar-value");
|
||||||
try message.setHeaderValue("My-Key3", "value4");
|
try message.setHeaderValue("baz", "baz-value");
|
||||||
|
try message.addHeaderValue("qux", "qux-value");
|
||||||
|
|
||||||
try message.deleteHeader("My-Key3");
|
try message.deleteHeader("baz");
|
||||||
|
|
||||||
{
|
{
|
||||||
var iter = try message.headerIterator();
|
var iter = try message.getHeaderIterator();
|
||||||
defer iter.destroy();
|
defer iter.destroy();
|
||||||
|
|
||||||
while (iter.next()) |resolv| {
|
while (iter.next()) |header| {
|
||||||
var val_iter = try resolv.getValueIterator();
|
var val_iter = try header.valueIterator();
|
||||||
defer val_iter.destroy();
|
defer val_iter.destroy();
|
||||||
|
|
||||||
std.debug.print("key '{s}' got: ", .{resolv.key});
|
std.debug.print("key '{s}' got: ", .{header.key});
|
||||||
while (val_iter.next()) |value| {
|
while (val_iter.next()) |value| {
|
||||||
std.debug.print("'{s}', ", .{value});
|
std.debug.print("'{s}'{s}", .{ value, if (val_iter.peek()) |_| ", " else "" });
|
||||||
}
|
}
|
||||||
std.debug.print("\n", .{});
|
std.debug.print("\n", .{});
|
||||||
}
|
}
|
||||||
@ -42,12 +43,12 @@ pub fn main() !void {
|
|||||||
defer received.destroy();
|
defer received.destroy();
|
||||||
|
|
||||||
{
|
{
|
||||||
var iter = try received.getHeaderValueIterator("My-Key1");
|
var iter = try received.getHeaderValueIterator("foo");
|
||||||
defer iter.destroy();
|
defer iter.destroy();
|
||||||
|
|
||||||
std.debug.print("For key 'My-Key1' got: ", .{});
|
std.debug.print("For key 'foo' got: ", .{});
|
||||||
while (iter.next()) |value| {
|
while (iter.next()) |value| {
|
||||||
std.debug.print("'{s}', ", .{value});
|
std.debug.print("'{s}'{s}", .{ value, if (iter.peek()) |_| ", " else "" });
|
||||||
}
|
}
|
||||||
std.debug.print("\n", .{});
|
std.debug.print("\n", .{});
|
||||||
}
|
}
|
||||||
|
@ -423,7 +423,7 @@ pub const Connection = opaque {
|
|||||||
) Error!*Subscription {
|
) Error!*Subscription {
|
||||||
var sub: *Subscription = undefined;
|
var sub: *Subscription = undefined;
|
||||||
|
|
||||||
const status = Status.fromInt(nats_c.natsConnection_QueueSubscribe(
|
const status = Status.fromInt(nats_c.natsConnection_QueueSubscribeTimeout(
|
||||||
@ptrCast(&sub),
|
@ptrCast(&sub),
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
subject.ptr,
|
subject.ptr,
|
||||||
@ -443,7 +443,7 @@ pub const Connection = opaque {
|
|||||||
) Error!*Subscription {
|
) Error!*Subscription {
|
||||||
var sub: *Subscription = undefined;
|
var sub: *Subscription = undefined;
|
||||||
|
|
||||||
const status = Status.fromInt(nats_c.natsConnection_SubscribeSync(
|
const status = Status.fromInt(nats_c.natsConnection_QueueSubscribeSync(
|
||||||
@ptrCast(&sub),
|
@ptrCast(&sub),
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
subject.ptr,
|
subject.ptr,
|
||||||
|
199
src/message.zig
199
src/message.zig
@ -76,102 +76,11 @@ pub const Message = opaque {
|
|||||||
return status.toError() orelse std.mem.sliceTo(value.?, 0);
|
return status.toError() orelse std.mem.sliceTo(value.?, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn getAllHeaderValues(self: *Message, key: [:0]const u8) Error![][*:0]const u8 {
|
|
||||||
var values: [*c][*c]const u8 = undefined;
|
|
||||||
var count: c_int = 0;
|
|
||||||
|
|
||||||
const status = Status.fromInt(nats_c.natsMsgHeader_Values(@ptrCast(self), key.ptr, &values, &count));
|
|
||||||
|
|
||||||
// the user must use std.mem.spanTo on each item they want to read to get a
|
|
||||||
// slice, since we can't do that automatically without having to allocate.
|
|
||||||
return status.toError() orelse blk: {
|
|
||||||
const coerced: [*][*:0]const u8 = @ptrFromInt(@intFromPtr(values));
|
|
||||||
break :blk coerced[0..@intCast(count)];
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn getHeaderValueIterator(self: *Message, key: [:0]const u8) Error!HeaderValueIterator {
|
pub fn getHeaderValueIterator(self: *Message, key: [:0]const u8) Error!HeaderValueIterator {
|
||||||
return .{ .values = try self.getAllHeaderValues(key) };
|
return .{ .values = try self.getAllHeaderValues(key) };
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn getAllHeaderKeys(self: *Message) Error![][*:0]const u8 {
|
pub fn getHeaderIterator(self: *Message) Error!HeaderIterator {
|
||||||
var keys: [*c][*c]const u8 = undefined;
|
|
||||||
var count: c_int = 0;
|
|
||||||
|
|
||||||
const status = Status.fromInt(nats_c.natsMsgHeader_Keys(@ptrCast(self), &keys, &count));
|
|
||||||
|
|
||||||
// TODO: manually assert no keys are NULL?
|
|
||||||
|
|
||||||
// the user must use std.mem.spanTo on each item they want to read to get a
|
|
||||||
// slice, since we can't do that automatically without having to allocate.
|
|
||||||
// the returned slice
|
|
||||||
return status.toError() orelse blk: {
|
|
||||||
const coerced: [*][*:0]const u8 = @ptrFromInt(@intFromPtr(keys));
|
|
||||||
break :blk coerced[0..@intCast(count)];
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const HeaderValueIterator = struct {
|
|
||||||
values: [][*:0]const u8,
|
|
||||||
index: usize = 0,
|
|
||||||
|
|
||||||
pub fn destroy(self: HeaderValueIterator) void {
|
|
||||||
std.heap.raw_c_allocator.free(self.values);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn next(self: *HeaderValueIterator) ?[:0]const u8 {
|
|
||||||
if (self.index >= self.values.len) return null;
|
|
||||||
defer self.index += 1;
|
|
||||||
|
|
||||||
return std.mem.sliceTo(self.values[self.index], 0);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
pub const HeaderIterator = struct {
|
|
||||||
message: *Message,
|
|
||||||
keys: [][*:0]const u8,
|
|
||||||
index: usize = 0,
|
|
||||||
|
|
||||||
pub const ValueResolver = struct {
|
|
||||||
message: *Message,
|
|
||||||
key: [:0]const u8,
|
|
||||||
|
|
||||||
pub fn getValue(self: ValueResolver) Error![:0]const u8 {
|
|
||||||
// TODO: if we didn't care about the lifecycle of self.message, we
|
|
||||||
// could do catch unreachable here and make this error-free
|
|
||||||
return try self.message.getHeaderValue(self.key);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn getValueIterator(self: ValueResolver) Error!HeaderValueIterator {
|
|
||||||
return .{
|
|
||||||
.values = try self.message.getAllHeaderValues(self.key),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
pub fn destroy(self: *HeaderIterator) void {
|
|
||||||
std.heap.raw_c_allocator.free(self.keys);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn next(self: *HeaderIterator) ?ValueResolver {
|
|
||||||
if (self.index >= self.keys.len) return null;
|
|
||||||
defer self.index += 1;
|
|
||||||
|
|
||||||
const sliced = std.mem.sliceTo(self.keys[self.index], 0);
|
|
||||||
return .{
|
|
||||||
.message = self.message,
|
|
||||||
.key = sliced,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn nextKey(self: *HeaderIterator) ?[:0]const u8 {
|
|
||||||
if (self.index >= self.keys.len) return null;
|
|
||||||
defer self.index += 1;
|
|
||||||
return std.mem.sliceTo(self.keys[self.index], 0);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
pub fn headerIterator(self: *Message) Error!HeaderIterator {
|
|
||||||
return .{
|
return .{
|
||||||
.message = self,
|
.message = self,
|
||||||
.keys = try self.getAllHeaderKeys(),
|
.keys = try self.getAllHeaderKeys(),
|
||||||
@ -186,6 +95,112 @@ pub const Message = opaque {
|
|||||||
pub fn isNoResponders(self: *Message) bool {
|
pub fn isNoResponders(self: *Message) bool {
|
||||||
return nats_c.natsMsg_IsNoResponders(@ptrCast(self));
|
return nats_c.natsMsg_IsNoResponders(@ptrCast(self));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// prefer using message.getHeaderValueIterator
|
||||||
|
pub fn getAllHeaderValues(self: *Message, key: [:0]const u8) Error![][*:0]const u8 {
|
||||||
|
var values: [*][*:0]const u8 = undefined;
|
||||||
|
var count: c_int = 0;
|
||||||
|
|
||||||
|
const status = Status.fromInt(
|
||||||
|
nats_c.natsMsgHeader_Values(@ptrCast(self), key.ptr, @ptrCast(&values), &count),
|
||||||
|
);
|
||||||
|
|
||||||
|
// the user must use std.mem.spanTo on each item they want to read to get a
|
||||||
|
// slice, since we can't do that automatically without having to allocate.
|
||||||
|
return status.toError() orelse values[0..@intCast(count)];
|
||||||
|
}
|
||||||
|
|
||||||
|
// prefer using message.getHeaderIterator
|
||||||
|
pub fn getAllHeaderKeys(self: *Message) Error![][*:0]const u8 {
|
||||||
|
var keys: [*][*:0]const u8 = undefined;
|
||||||
|
var count: c_int = 0;
|
||||||
|
|
||||||
|
const status = Status.fromInt(nats_c.natsMsgHeader_Keys(@ptrCast(self), @ptrCast(&keys), &count));
|
||||||
|
|
||||||
|
// the user must use std.mem.spanTo on each item they want to read to get a
|
||||||
|
// slice, since we can't do that automatically without having to allocate.
|
||||||
|
// the returned slice
|
||||||
|
return status.toError() orelse keys[0..@intCast(count)];
|
||||||
|
}
|
||||||
|
|
||||||
|
pub const HeaderValueIterator = struct {
|
||||||
|
values: [][*:0]const u8,
|
||||||
|
index: usize = 0,
|
||||||
|
|
||||||
|
pub fn destroy(self: HeaderValueIterator) void {
|
||||||
|
std.heap.raw_c_allocator.free(self.values);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub const deinit = HeaderValueIterator.destroy;
|
||||||
|
|
||||||
|
pub fn next(self: *HeaderValueIterator) ?[:0]const u8 {
|
||||||
|
if (self.index >= self.values.len) return null;
|
||||||
|
defer self.index += 1;
|
||||||
|
|
||||||
|
return std.mem.sliceTo(self.values[self.index], 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn peek(self: *HeaderValueIterator) ?[:0]const u8 {
|
||||||
|
if (self.index >= self.values.len) return null;
|
||||||
|
return std.mem.sliceTo(self.values[self.index], 0);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
pub const HeaderIterator = struct {
|
||||||
|
message: *Message,
|
||||||
|
keys: [][*:0]const u8,
|
||||||
|
index: usize = 0,
|
||||||
|
|
||||||
|
pub const ValueResolver = struct {
|
||||||
|
message: *Message,
|
||||||
|
key: [:0]const u8,
|
||||||
|
|
||||||
|
pub fn value(self: ValueResolver) Error![:0]const u8 {
|
||||||
|
// TODO: if we didn't care about the lifecycle of self.message, we
|
||||||
|
// could do catch unreachable here and make this error-free
|
||||||
|
return try self.message.getHeaderValue(self.key);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn valueIterator(self: ValueResolver) Error!HeaderValueIterator {
|
||||||
|
return try self.message.getHeaderValueIterator(self.key);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
pub fn destroy(self: *HeaderIterator) void {
|
||||||
|
std.heap.raw_c_allocator.free(self.keys);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub const deinit = HeaderIterator.destroy;
|
||||||
|
|
||||||
|
pub fn next(self: *HeaderIterator) ?ValueResolver {
|
||||||
|
if (self.index >= self.keys.len) return null;
|
||||||
|
defer self.index += 1;
|
||||||
|
|
||||||
|
return .{
|
||||||
|
.message = self.message,
|
||||||
|
.key = std.mem.sliceTo(self.keys[self.index], 0),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn peek(self: *HeaderIterator) ?ValueResolver {
|
||||||
|
if (self.index >= self.keys.len) return null;
|
||||||
|
return .{
|
||||||
|
.message = self.message,
|
||||||
|
.key = std.mem.sliceTo(self.keys[self.index], 0),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn nextKey(self: *HeaderIterator) ?[:0]const u8 {
|
||||||
|
if (self.index >= self.keys.len) return null;
|
||||||
|
defer self.index += 1;
|
||||||
|
return std.mem.sliceTo(self.keys[self.index], 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn peekKey(self: *HeaderIterator) ?[:0]const u8 {
|
||||||
|
if (self.index >= self.keys.len) return null;
|
||||||
|
return std.mem.sliceTo(self.keys[self.index], 0);
|
||||||
|
}
|
||||||
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: not implementing jetstream API right now
|
// TODO: not implementing jetstream API right now
|
||||||
|
@ -28,19 +28,19 @@ const Status = err_.Status;
|
|||||||
|
|
||||||
const thunk = @import("./thunk.zig");
|
const thunk = @import("./thunk.zig");
|
||||||
|
|
||||||
pub const MessageCount = struct {
|
|
||||||
messages: c_int = 0,
|
|
||||||
bytes: c_int = 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
pub const SubscriptionStats = struct {
|
|
||||||
pending: MessageCount = .{},
|
|
||||||
max_pending: MessageCount = .{},
|
|
||||||
delivered_messages: i64 = 0,
|
|
||||||
dropped_messages: i64 = 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
pub const Subscription = opaque {
|
pub const Subscription = opaque {
|
||||||
|
pub const MessageCount = struct {
|
||||||
|
messages: c_int = 0,
|
||||||
|
bytes: c_int = 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub const SubscriptionStats = struct {
|
||||||
|
pending: MessageCount = .{},
|
||||||
|
max_pending: MessageCount = .{},
|
||||||
|
delivered_messages: i64 = 0,
|
||||||
|
dropped_messages: i64 = 0,
|
||||||
|
};
|
||||||
|
|
||||||
pub fn isValid(self: *Subscription) bool {
|
pub fn isValid(self: *Subscription) bool {
|
||||||
return nats_c.natsSubscription_IsValid(@ptrCast(self));
|
return nats_c.natsSubscription_IsValid(@ptrCast(self));
|
||||||
}
|
}
|
||||||
@ -84,7 +84,7 @@ pub const Subscription = opaque {
|
|||||||
// invalid/closed subscriptions return null. should we convert that into an
|
// invalid/closed subscriptions return null. should we convert that into an
|
||||||
// error? could return error.InvalidSubscription
|
// error? could return error.InvalidSubscription
|
||||||
const result = nats_c.natsSubscription_GetSubject(@ptrCast(self)) orelse return null;
|
const result = nats_c.natsSubscription_GetSubject(@ptrCast(self)) orelse return null;
|
||||||
return std.mem.spanTo(u8, result, 0);
|
return std.mem.sliceTo(result, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn setPendingLimits(self: *Subscription, limit: MessageCount) Error!void {
|
pub fn setPendingLimits(self: *Subscription, limit: MessageCount) Error!void {
|
||||||
|
@ -5,4 +5,5 @@ test {
|
|||||||
_ = @import("./nats.zig");
|
_ = @import("./nats.zig");
|
||||||
_ = @import("./connection.zig");
|
_ = @import("./connection.zig");
|
||||||
_ = @import("./message.zig");
|
_ = @import("./message.zig");
|
||||||
|
_ = @import("./subscription.zig");
|
||||||
}
|
}
|
||||||
|
@ -5,56 +5,113 @@ const std = @import("std");
|
|||||||
|
|
||||||
const nats = @import("nats");
|
const nats = @import("nats");
|
||||||
|
|
||||||
test "message: create message" {
|
const util = @import("./util.zig");
|
||||||
const subject = "hello";
|
|
||||||
const reply = "reply";
|
test "nats.Message" {
|
||||||
const data = "world";
|
const message_subject: [:0]const u8 = "hello";
|
||||||
|
const message_reply: [:0]const u8 = "reply";
|
||||||
|
const message_data: [:0]const u8 = "world";
|
||||||
|
|
||||||
// have to initialize the library so the reference counter can correctly destroy
|
// have to initialize the library so the reference counter can correctly destroy
|
||||||
// objects, otherwise we segfault on trying to free the memory.
|
// objects, otherwise we segfault on trying to free the memory.
|
||||||
try nats.init(nats.default_spin_count);
|
try nats.init(nats.default_spin_count);
|
||||||
defer nats.deinit();
|
defer nats.deinit();
|
||||||
|
|
||||||
const message = try nats.Message.create(subject, reply, data);
|
{
|
||||||
|
const message = try nats.Message.create(message_subject, null, message_data);
|
||||||
|
defer message.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const message = try nats.Message.create(message_subject, message_reply, null);
|
||||||
|
defer message.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const message = try nats.Message.create(message_subject, null, null);
|
||||||
|
defer message.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
const message = try nats.Message.create(message_subject, message_reply, message_data);
|
||||||
defer message.destroy();
|
defer message.destroy();
|
||||||
|
|
||||||
const message2 = try nats.Message.create(subject, null, data);
|
const subject = message.getSubject();
|
||||||
defer message2.destroy();
|
try std.testing.expectEqualStrings(message_subject, subject);
|
||||||
|
|
||||||
const message3 = try nats.Message.create(subject, data, null);
|
const reply = message.getReply() orelse return error.TestUnexpectedResult;
|
||||||
defer message3.destroy();
|
try std.testing.expectEqualStrings(message_reply, reply);
|
||||||
|
|
||||||
const message4 = try nats.Message.create(subject, null, null);
|
const data = message.getData() orelse return error.TestUnexpectedResult;
|
||||||
defer message4.destroy();
|
try std.testing.expectEqualStrings(message_data, data);
|
||||||
|
|
||||||
|
try std.testing.expectEqual(message_data.len, message.getDataLength());
|
||||||
|
|
||||||
|
const message_header: [:0]const u8 = "foo";
|
||||||
|
const message_hvalues: []const [:0]const u8 = &.{ "bar", "baz" };
|
||||||
|
try message.setHeaderValue(message_header, message_hvalues[0]);
|
||||||
|
|
||||||
|
try std.testing.expectEqualStrings(message_hvalues[0], try message.getHeaderValue(message_header));
|
||||||
|
try message.addHeaderValue(message_header, message_hvalues[1]);
|
||||||
|
try std.testing.expectEqualStrings(message_hvalues[0], try message.getHeaderValue(message_header));
|
||||||
|
|
||||||
|
{
|
||||||
|
var idx: usize = 0;
|
||||||
|
var val_iter = try message.getHeaderValueIterator(message_header);
|
||||||
|
defer val_iter.destroy();
|
||||||
|
|
||||||
|
while (val_iter.next()) |value| : (idx += 1) {
|
||||||
|
try std.testing.expect(idx < message_hvalues.len);
|
||||||
|
try std.testing.expectEqualStrings(message_hvalues[idx], value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
var header_iter = try message.getHeaderIterator();
|
||||||
|
defer header_iter.destroy();
|
||||||
|
|
||||||
|
while (header_iter.next()) |header| {
|
||||||
|
try std.testing.expectEqualStrings(message_header, header.key);
|
||||||
|
try std.testing.expectEqualStrings(message_hvalues[0], try header.value());
|
||||||
|
|
||||||
|
var idx: usize = 0;
|
||||||
|
var val_iter = try header.valueIterator();
|
||||||
|
defer val_iter.destroy();
|
||||||
|
|
||||||
|
while (val_iter.next()) |value| : (idx += 1) {
|
||||||
|
try std.testing.expect(idx < message_hvalues.len);
|
||||||
|
try std.testing.expectEqualStrings(message_hvalues[idx], value);
|
||||||
|
}
|
||||||
|
|
||||||
|
try std.testing.expect(val_iter.peek() == null);
|
||||||
|
}
|
||||||
|
try std.testing.expect(header_iter.peek() == null);
|
||||||
|
}
|
||||||
|
|
||||||
|
try message.deleteHeader(message_header);
|
||||||
|
_ = message.isNoResponders();
|
||||||
}
|
}
|
||||||
|
|
||||||
test "message: get subject" {
|
test "send nats.Message" {
|
||||||
|
var server = try util.TestServer.launch(.{});
|
||||||
|
defer server.stop();
|
||||||
|
|
||||||
try nats.init(nats.default_spin_count);
|
try nats.init(nats.default_spin_count);
|
||||||
defer nats.deinit();
|
defer nats.deinit();
|
||||||
|
|
||||||
const subject = "hello";
|
const connection = try nats.Connection.connectTo(nats.default_server_url);
|
||||||
const message = try nats.Message.create(subject, null, null);
|
defer connection.destroy();
|
||||||
|
|
||||||
|
const message_subject: [:0]const u8 = "hello";
|
||||||
|
const message_reply: [:0]const u8 = "reply";
|
||||||
|
const message_data: [:0]const u8 = "world";
|
||||||
|
const message_header: [:0]const u8 = "foo";
|
||||||
|
const message_hvalues: []const [:0]const u8 = &.{ "bar", "baz" };
|
||||||
|
|
||||||
|
const message = try nats.Message.create(message_subject, message_reply, message_data);
|
||||||
defer message.destroy();
|
defer message.destroy();
|
||||||
|
|
||||||
const received = message.getSubject();
|
try message.setHeaderValue(message_header, message_hvalues[0]);
|
||||||
try std.testing.expectEqualStrings(subject, received);
|
try message.addHeaderValue(message_header, message_hvalues[1]);
|
||||||
}
|
|
||||||
|
try connection.publishMessage(message);
|
||||||
test "message: get reply" {
|
|
||||||
try nats.init(nats.default_spin_count);
|
|
||||||
defer nats.deinit();
|
|
||||||
|
|
||||||
const subject = "hello";
|
|
||||||
const reply = "reply";
|
|
||||||
const message = try nats.Message.create(subject, reply, null);
|
|
||||||
defer message.destroy();
|
|
||||||
|
|
||||||
const received = message.getReply() orelse return error.TestUnexpectedResult;
|
|
||||||
try std.testing.expectEqualStrings(reply, received);
|
|
||||||
|
|
||||||
const message2 = try nats.Message.create(subject, null, null);
|
|
||||||
defer message2.destroy();
|
|
||||||
|
|
||||||
const received2 = message2.getReply();
|
|
||||||
try std.testing.expect(received2 == null);
|
|
||||||
}
|
}
|
||||||
|
180
tests/subscription.zig
Normal file
180
tests/subscription.zig
Normal file
@ -0,0 +1,180 @@
|
|||||||
|
// This file is licensed under the CC0 1.0 license.
|
||||||
|
// See: https://creativecommons.org/publicdomain/zero/1.0/legalcode
|
||||||
|
|
||||||
|
const std = @import("std");
|
||||||
|
|
||||||
|
const nats = @import("nats");
|
||||||
|
|
||||||
|
const util = @import("./util.zig");
|
||||||
|
|
||||||
|
test "nats.Subscription" {
|
||||||
|
var server = try util.TestServer.launch(.{});
|
||||||
|
defer server.stop();
|
||||||
|
|
||||||
|
try nats.init(nats.default_spin_count);
|
||||||
|
defer nats.deinit();
|
||||||
|
|
||||||
|
const connection = try nats.Connection.connectTo(nats.default_server_url);
|
||||||
|
defer connection.destroy();
|
||||||
|
|
||||||
|
const message_subject: [:0]const u8 = "hello";
|
||||||
|
const message_reply: [:0]const u8 = "reply";
|
||||||
|
const message_data: [:0]const u8 = "world";
|
||||||
|
|
||||||
|
const message = try nats.Message.create(message_subject, message_reply, message_data);
|
||||||
|
defer message.destroy();
|
||||||
|
|
||||||
|
{
|
||||||
|
const subscription = try connection.subscribeSync(message_subject);
|
||||||
|
defer subscription.destroy();
|
||||||
|
|
||||||
|
try subscription.autoUnsubscribe(1);
|
||||||
|
try subscription.setPendingLimits(.{ .messages = 10, .bytes = 1000 });
|
||||||
|
_ = try subscription.getPendingLimits();
|
||||||
|
_ = try subscription.getPending();
|
||||||
|
_ = try subscription.getMaxPending();
|
||||||
|
try subscription.clearMaxPending();
|
||||||
|
_ = try subscription.getDelivered();
|
||||||
|
_ = try subscription.getDropped();
|
||||||
|
_ = try subscription.getStats();
|
||||||
|
_ = try subscription.queuedMessageCount();
|
||||||
|
_ = subscription.getId();
|
||||||
|
const subj = subscription.getSubject() orelse return error.TestUnexpectedResult;
|
||||||
|
try std.testing.expectEqualStrings(message_subject, subj);
|
||||||
|
|
||||||
|
try connection.publishMessage(message);
|
||||||
|
const roundtrip = try subscription.nextMessage(1000);
|
||||||
|
try std.testing.expectEqualStrings(
|
||||||
|
message_data,
|
||||||
|
roundtrip.getData() orelse return error.TestUnexpectedResult,
|
||||||
|
);
|
||||||
|
|
||||||
|
try std.testing.expect(subscription.isValid() == false);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const subscription = try connection.queueSubscribeSync(message_subject, "queuegroup");
|
||||||
|
defer subscription.destroy();
|
||||||
|
|
||||||
|
try subscription.drain();
|
||||||
|
try subscription.waitForDrainCompletion(1000);
|
||||||
|
_ = subscription.drainCompletionStatus();
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const subscription = try connection.subscribeSync(message_subject);
|
||||||
|
defer subscription.destroy();
|
||||||
|
|
||||||
|
try subscription.drain();
|
||||||
|
try subscription.waitForDrainCompletion(1000);
|
||||||
|
_ = subscription.drainCompletionStatus();
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const subscription = try connection.subscribeSync(message_subject);
|
||||||
|
defer subscription.destroy();
|
||||||
|
|
||||||
|
try subscription.drainTimeout(1000);
|
||||||
|
try subscription.waitForDrainCompletion(1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn onMessage(
|
||||||
|
userdata: *u32,
|
||||||
|
connection: *nats.Connection,
|
||||||
|
subscription: *nats.Subscription,
|
||||||
|
message: *nats.Message,
|
||||||
|
) void {
|
||||||
|
_ = subscription;
|
||||||
|
|
||||||
|
if (message.getReply()) |reply| {
|
||||||
|
connection.publish(reply, "greetings") catch @panic("OH NO");
|
||||||
|
} else @panic("HOW");
|
||||||
|
|
||||||
|
userdata.* += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
test "nats.Subscription (async)" {
|
||||||
|
var server = try util.TestServer.launch(.{});
|
||||||
|
defer server.stop();
|
||||||
|
|
||||||
|
try nats.init(nats.default_spin_count);
|
||||||
|
defer nats.deinit();
|
||||||
|
|
||||||
|
const connection = try nats.Connection.connectTo(nats.default_server_url);
|
||||||
|
defer connection.destroy();
|
||||||
|
|
||||||
|
const message_subject: [:0]const u8 = "hello";
|
||||||
|
const message_reply: [:0]const u8 = "reply";
|
||||||
|
const message_data: [:0]const u8 = "world";
|
||||||
|
|
||||||
|
const message = try nats.Message.create(message_subject, message_reply, message_data);
|
||||||
|
defer message.destroy();
|
||||||
|
|
||||||
|
{
|
||||||
|
var count: u32 = 0;
|
||||||
|
const subscription = try connection.subscribe(u32, message_subject, onMessage, &count);
|
||||||
|
defer subscription.destroy();
|
||||||
|
|
||||||
|
const response = try connection.requestMessage(message, 1000);
|
||||||
|
try std.testing.expectEqualStrings(
|
||||||
|
"greetings",
|
||||||
|
response.getData() orelse return error.TestUnexpectedResult,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
var count: u32 = 0;
|
||||||
|
const subscription = try connection.subscribeTimeout(
|
||||||
|
u32,
|
||||||
|
message_subject,
|
||||||
|
1000,
|
||||||
|
onMessage,
|
||||||
|
&count,
|
||||||
|
);
|
||||||
|
defer subscription.destroy();
|
||||||
|
|
||||||
|
const response = try connection.requestMessage(message, 1000);
|
||||||
|
try std.testing.expectEqualStrings(
|
||||||
|
"greetings",
|
||||||
|
response.getData() orelse return error.TestUnexpectedResult,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
var count: u32 = 0;
|
||||||
|
const subscription = try connection.queueSubscribe(
|
||||||
|
u32,
|
||||||
|
message_subject,
|
||||||
|
"queuegroup",
|
||||||
|
onMessage,
|
||||||
|
&count,
|
||||||
|
);
|
||||||
|
defer subscription.destroy();
|
||||||
|
|
||||||
|
const response = try connection.requestMessage(message, 1000);
|
||||||
|
try std.testing.expectEqualStrings(
|
||||||
|
"greetings",
|
||||||
|
response.getData() orelse return error.TestUnexpectedResult,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
var count: u32 = 0;
|
||||||
|
const subscription = try connection.queueSubscribeTimeout(
|
||||||
|
u32,
|
||||||
|
message_subject,
|
||||||
|
"queuegroup",
|
||||||
|
1000,
|
||||||
|
onMessage,
|
||||||
|
&count,
|
||||||
|
);
|
||||||
|
defer subscription.destroy();
|
||||||
|
|
||||||
|
const response = try connection.requestMessage(message, 1000);
|
||||||
|
try std.testing.expectEqualStrings(
|
||||||
|
"greetings",
|
||||||
|
response.getData() orelse return error.TestUnexpectedResult,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user