Compare commits

..

No commits in common. "2e06f44aa8f1df879de9e7948d339b225810ef27" and "ebd3e641119d8b5d256f802a49b48c5d06656eb5" have entirely different histories.

8 changed files with 67 additions and 81 deletions

View File

@ -54,7 +54,6 @@ const Example = struct {
const examples = [_]Example{ const examples = [_]Example{
.{ .name = "request_reply", .file = "examples/request_reply.zig" }, .{ .name = "request_reply", .file = "examples/request_reply.zig" },
.{ .name = "headers", .file = "examples/headers.zig" }, .{ .name = "headers", .file = "examples/headers.zig" },
.{ .name = "pub_bytes", .file = "examples/pub_bytes.zig" },
}; };
pub fn add_examples(b: *std.build, options: ExampleOptions) void { pub fn add_examples(b: *std.build, options: ExampleOptions) void {
@ -65,7 +64,7 @@ pub fn add_examples(b: *std.build, options: ExampleOptions) void {
.name = example.name, .name = example.name,
.root_source_file = .{ .path = example.file }, .root_source_file = .{ .path = example.file },
.target = options.target, .target = options.target,
.optimize = .Debug, .optimize = options.optimize,
}); });
ex_exe.addModule("nats", options.nats_module); ex_exe.addModule("nats", options.nats_module);

View File

@ -15,20 +15,14 @@ pub fn main() !void {
try message.deleteHeader("My-Key3"); try message.deleteHeader("My-Key3");
{
var iter = try message.headerIterator(); var iter = try message.headerIterator();
defer iter.destroy(); defer iter.destroy();
while (iter.next()) |resolv| { while (try iter.next()) |pair| {
var val_iter = try resolv.getValueIterator(); std.debug.print(
defer val_iter.destroy(); "Key: '{s}', Value: '{s}'\n",
.{ pair.key, pair.value orelse "null" },
std.debug.print("key '{s}' got: ", .{resolv.key}); );
while (val_iter.next()) |value| {
std.debug.print("'{s}', ", .{value});
}
std.debug.print("\n", .{});
}
} }
const subscription = try connection.subscribeSync("subject"); const subscription = try connection.subscribeSync("subject");
@ -39,12 +33,13 @@ pub fn main() !void {
defer received.destroy(); defer received.destroy();
{ {
var iter = try received.getHeaderValueIterator("My-Key1"); const vals1 = try received.getAllHeaderValues("My-Key1");
defer iter.destroy(); defer std.heap.raw_c_allocator.free(vals1);
std.debug.print("For key 'My-Key1' got: ", .{}); std.debug.print("For key 'My-Key1' got: ", .{});
while (iter.next()) |value| { for (vals1) |value| {
std.debug.print("'{s}', ", .{value}); const val = std.mem.sliceTo(value, 0);
std.debug.print("'{s}', ", .{val});
} }
std.debug.print("\n", .{}); std.debug.print("\n", .{});
} }

View File

@ -1,10 +0,0 @@
const std = @import("std");
const nats = @import("nats");
pub fn main() !void {
const connection = try nats.Connection.connectTo(nats.default_server_url);
defer connection.destroy();
const data = [_]u8{ 104, 101, 108, 108, 111, 33 };
try connection.publish("subject", &data);
}

View File

@ -15,7 +15,7 @@ fn onMessage(
}); });
if (message.getReply()) |reply| { if (message.getReply()) |reply| {
connection.publish(reply, "salutations") catch @panic("HELP"); connection.publishString(reply, "salutations") catch @panic("HELP");
} }
userdata.* = true; userdata.* = true;
@ -30,7 +30,7 @@ pub fn main() !void {
defer subscription.destroy(); defer subscription.destroy();
while (!done) { while (!done) {
const reply = try connection.request("channel", "greetings", 1000); const reply = try connection.requestString("channel", "greetings", 1000);
defer reply.destroy(); defer reply.destroy();
std.debug.print("Reply \"{s}\" got message: {s}\n", .{ std.debug.print("Reply \"{s}\" got message: {s}\n", .{
@ -38,10 +38,4 @@ pub fn main() !void {
reply.getData() orelse "[null]", reply.getData() orelse "[null]",
}); });
} }
const stats = try connection.getStats();
std.debug.print(
"Server stats => {{\n\tmessages_in: {d} ({d} B),\n\tmessages_out: {d} ({d} B),\n\treconnects: {d}\n}}\n",
.{ stats.messages_in, stats.bytes_in, stats.messages_out, stats.bytes_out, stats.reconnects },
);
} }

View File

@ -244,6 +244,16 @@ pub const Connection = opaque {
).raise(); ).raise();
} }
pub fn publishString(
self: *Connection,
subject: [:0]const u8,
message: [:0]const u8,
) Error!void {
return Status.fromInt(
nats_c.natsConnection_PublishString(@ptrCast(self), subject.ptr, message.ptr),
).raise();
}
pub fn publishMessage(self: *Connection, message: *Message) Error!void { pub fn publishMessage(self: *Connection, message: *Message) Error!void {
return Status.fromInt( return Status.fromInt(
nats_c.natsConnection_PublishMsg(@ptrCast(self), @ptrCast(message)), nats_c.natsConnection_PublishMsg(@ptrCast(self), @ptrCast(message)),
@ -267,6 +277,22 @@ pub const Connection = opaque {
).raise(); ).raise();
} }
pub fn publishRequestString(
self: *Connection,
subject: [:0]const u8,
reply: [:0]const u8,
message: [:0]const u8,
) Error!void {
return Status.fromInt(
nats_c.natsConnection_PublishRequestString(
@ptrCast(self),
subject.ptr,
reply.ptr,
message.ptr,
),
).raise();
}
pub fn request( pub fn request(
self: *Connection, self: *Connection,
subject: [:0]const u8, subject: [:0]const u8,
@ -287,6 +313,25 @@ pub const Connection = opaque {
return status.toError() orelse response; return status.toError() orelse response;
} }
pub fn requestString(
self: *Connection,
subject: [:0]const u8,
req: [:0]const u8,
timeout: i64,
) Error!*Message {
var response: *Message = undefined;
const status = Status.fromInt(nats_c.natsConnection_RequestString(
@ptrCast(&response),
@ptrCast(self),
subject.ptr,
req.ptr,
timeout,
));
return status.toError() orelse response;
}
pub fn requestMessage( pub fn requestMessage(
self: *Connection, self: *Connection,
req: *Message, req: *Message,

View File

@ -76,10 +76,6 @@ pub const Message = opaque {
}; };
} }
pub fn getHeaderValueIterator(self: *Message, key: [:0]const u8) Error!HeaderValueIterator {
return .{ .values = try self.getAllHeaderValues(key) };
}
pub fn getAllHeaderKeys(self: *Message) Error![][*:0]const u8 { pub fn getAllHeaderKeys(self: *Message) Error![][*:0]const u8 {
var keys: [*c][*c]const u8 = undefined; var keys: [*c][*c]const u8 = undefined;
var count: c_int = 0; var count: c_int = 0;
@ -97,56 +93,23 @@ pub const Message = opaque {
}; };
} }
pub const HeaderValueIterator = struct {
values: [][*:0]const u8,
index: usize = 0,
pub fn destroy(self: HeaderValueIterator) void {
std.heap.raw_c_allocator.free(self.values);
}
pub fn next(self: *HeaderValueIterator) ?[:0]const u8 {
if (self.index >= self.values.len) return null;
defer self.index += 1;
return std.mem.sliceTo(self.values[self.index], 0);
}
};
pub const HeaderIterator = struct { pub const HeaderIterator = struct {
message: *Message, message: *Message,
keys: [][*:0]const u8, keys: [][*:0]const u8,
index: usize = 0, index: usize = 0,
pub const ValueResolver = struct {
message: *Message,
key: [:0]const u8,
pub fn getValue(self: ValueResolver) Error![:0]const u8 {
// TODO: if we didn't care about the lifecycle of self.message, we
// could do catch unreachable here and make this error-free
return try self.message.getHeaderValue(self.key);
}
pub fn getValueIterator(self: ValueResolver) Error!HeaderValueIterator {
return .{
.values = try self.message.getAllHeaderValues(self.key),
};
}
};
pub fn destroy(self: *HeaderIterator) void { pub fn destroy(self: *HeaderIterator) void {
std.heap.raw_c_allocator.free(self.keys); std.heap.raw_c_allocator.free(self.keys);
} }
pub fn next(self: *HeaderIterator) ?ValueResolver { pub fn next(self: *HeaderIterator) Error!?struct { key: [:0]const u8, value: ?[:0]const u8 } {
if (self.index >= self.keys.len) return null; if (self.index >= self.keys.len) return null;
defer self.index += 1; defer self.index += 1;
const sliced = std.mem.sliceTo(self.keys[self.index], 0); const sliced = std.mem.sliceTo(self.keys[self.index], 0);
return .{ return .{
.message = self.message,
.key = sliced, .key = sliced,
.value = try self.message.getHeaderValue(sliced),
}; };
} }

View File

@ -35,7 +35,7 @@ pub const Message = msg_.Message;
pub const Statistics = sta_.Statistics; pub const Statistics = sta_.Statistics;
pub const StatsCounts = sta_.StatsCounts; pub const StatsCounts = sta_.StatsCounts;
pub const Status = err_.Status; const Status = err_.Status;
pub const Error = err_.Error; pub const Error = err_.Error;
pub fn getVersion() [:0]const u8 { pub fn getVersion() [:0]const u8 {

View File

@ -29,14 +29,14 @@ pub const Statistics = opaque {
pub fn getCounts(self: *Statistics) Error!StatsCounts { pub fn getCounts(self: *Statistics) Error!StatsCounts {
var counts: StatsCounts = .{}; var counts: StatsCounts = .{};
const status = Status.fromInt(nats_c.natsStatistics_GetCounts( const status = Status.fromInt(nats_c.natsStatistics_GetCounts)(
@ptrCast(self), self,
&counts.messages_in, &counts.messages_in,
&counts.bytes_in, &counts.bytes_in,
&counts.messages_out, &counts.messages_out,
&counts.bytes_out, &counts.bytes_out,
&counts.reconnects, &counts.reconnects,
)); );
return status.toError() orelse counts; return status.toError() orelse counts;
} }
}; };