From 94a428139d795b01b41bbe149b379cdb785c9988 Mon Sep 17 00:00:00 2001 From: torque Date: Tue, 15 Aug 2023 22:16:49 -0700 Subject: [PATCH] message: wrap all basic API calls The streaming API is not wrapped because we do not build nats.c with the necessary libraries. The jetstream API is not wrapped because I am extremely lazy and it's outside of the minimum viable scope at the moment. It can happen after the other basic APIs have been wrapped. --- src/message.zig | 85 +++++++++++++++++++++++++++++++++++++++++++++++++ src/nats.zig | 29 ----------------- 2 files changed, 85 insertions(+), 29 deletions(-) diff --git a/src/message.zig b/src/message.zig index a897516..1c9d103 100644 --- a/src/message.zig +++ b/src/message.zig @@ -4,7 +4,24 @@ pub const nats_c = @cImport({ @cInclude("nats/nats.h"); }); +const err_ = @import("./error.zig"); +const Error = err_.Error; +const Status = err_.Status; + pub const Message = opaque { + pub fn create(subject: [:0]const u8, reply: ?[:0]const u8, data: ?[]const u8) Error!*Message { + var self: *Message = undefined; + const status = Status.fromInt(nats_c.natsMsg_Create( + @ptrCast(&self), + subject.ptr, + if (reply) |r| r.ptr else null, + if (data) |d| d.ptr else null, + if (data) |d| @intCast(d.len) else 0, + )); + + return status.toError() orelse self; + } + pub fn destroy(self: *Message) void { nats_c.natsMsg_Destroy(@ptrCast(self)); } @@ -27,4 +44,72 @@ pub const Message = opaque { pub fn getDataLength(self: *Message) usize { return @intCast(nats_c.natsMsg_GetDataLength(@ptrCast(self))); } + + pub fn setHeaderValue(self: *Message, key: [:0]const u8, value: ?[:0]const u8) Error!void { + const status = Status.fromInt(nats_c.natsMsgHeader_Set(@ptrCast(self), key.ptr, value.ptr)); + return status.raise(); + } + + pub fn addHeaderValue(self: *Message, key: [:0]const u8, value: ?[:0]const u8) Error!void { + const status = Status.fromInt(nats_c.natsMsgHeader_Add(@ptrCast(self), key.ptr, value.ptr)); + return status.raise(); + } + + pub fn getHeaderValue(self: *Message, key: [:0]const u8) Error!?[:0]const u8 { + var value: ?[*]u8 = null; + const status = Status.fromInt(nats_c.natsMsgHeader_Get(@ptrCast(self), key.ptr, &value)); + + return status.toError() orelse if (value) |val| std.mem.sliceTo(u8, val, 0) else null; + } + + pub fn getAllHeaderValues(self: *Message, key: [:0]const u8) Error![]?[*]const u8 { + var values: [*]?[*]const u8 = undefined; + var count: c_int = 0; + + const status = Status.fromInt(nats_c.natsMsgHeader_Values(@ptrCast(self), key.ptr, &values, &count)); + + // the user must use std.mem.spanTo on each item they want to read to get a + // slice, since we can't do that automatically without having to allocate. + return status.toError() orelse values[0..@intCast(count)]; + } + + pub fn getAllHeaderKeys(self: *Message) Error![][*]const u8 { + var keys: [*][*]const u8 = undefined; + var count: c_int = 0; + + const status = Status.fromInt(nats_c.natsMsgHeader_Keys(@ptrCast(self), &keys, &count)); + + // TODO: manually assert no keys are NULL? + + // the user must use std.mem.spanTo on each item they want to read to get a + // slice, since we can't do that automatically without having to allocate. + return status.toError() orelse keys[0..@intCast(count)]; + } + + pub fn deleteHeader(self: *Message, key: [:0]const u8) Error!void { + const status = Status.fromInt(nats_c.natsMsgHeader_Delete(@ptrCast(self), key.ptr)); + return status.raise(); + } + + pub fn isNoResponders(self: *Message) bool { + return nats_c.natsMsg_IsNoResponders(@ptrCast(self)); + } }; + +// TODO: not implementing jetstream API right now +// NATS_EXTERN natsStatus natsMsg_Ack(natsMsg *msg, jsOptions *opts); +// NATS_EXTERN natsStatus natsMsg_AckSync(natsMsg *msg, jsOptions *opts, jsErrCode *errCode); +// NATS_EXTERN natsStatus natsMsg_Nak(natsMsg *msg, jsOptions *opts); +// NATS_EXTERN natsStatus natsMsg_NakWithDelay(natsMsg *msg, int64_t delay, jsOptions *opts); +// NATS_EXTERN natsStatus natsMsg_InProgress(natsMsg *msg, jsOptions *opts); +// NATS_EXTERN natsStatus natsMsg_Term(natsMsg *msg, jsOptions *opts); +// NATS_EXTERN uint64_t natsMsg_GetSequence(natsMsg *msg); +// NATS_EXTERN int64_t natsMsg_GetTime(natsMsg *msg); + +// TODO: not implementing streaming API right now +// NATS_EXTERN uint64_t stanMsg_GetSequence(const stanMsg *msg); +// NATS_EXTERN int64_t stanMsg_GetTimestamp(const stanMsg *msg); +// NATS_EXTERN bool stanMsg_IsRedelivered(const stanMsg *msg); +// NATS_EXTERN const char* stanMsg_GetData(const stanMsg *msg); +// NATS_EXTERN int stanMsg_GetDataLength(const stanMsg *msg); +// NATS_EXTERN void stanMsg_Destroy(stanMsg *msg); diff --git a/src/nats.zig b/src/nats.zig index bf1be17..84ebad1 100644 --- a/src/nats.zig +++ b/src/nats.zig @@ -212,26 +212,6 @@ pub const Statistics = opaque { // NATS_EXTERN void natsInbox_Destroy(natsInbox *inbox); // NATS_EXTERN void natsMsgList_Destroy(natsMsgList *list); -// NATS_EXTERN natsStatus natsMsg_Create(natsMsg **newMsg, const char *subj, const char *reply, const char *data, int dataLen); -// NATS_EXTERN const char* natsMsg_GetSubject(const natsMsg *msg); -// NATS_EXTERN const char* natsMsg_GetReply(const natsMsg *msg); -// NATS_EXTERN const char* natsMsg_GetData(const natsMsg *msg); -// NATS_EXTERN int natsMsg_GetDataLength(const natsMsg *msg); -// NATS_EXTERN natsStatus natsMsgHeader_Set(natsMsg *msg, const char *key, const char *value); -// NATS_EXTERN natsStatus natsMsgHeader_Add(natsMsg *msg, const char *key, const char *value); -// NATS_EXTERN natsStatus natsMsgHeader_Get(natsMsg *msg, const char *key, const char **value); -// NATS_EXTERN natsStatus natsMsgHeader_Values(natsMsg *msg, const char *key, const char* **values, int *count); -// NATS_EXTERN natsStatus natsMsgHeader_Keys(natsMsg *msg, const char* **keys, int *count); -// NATS_EXTERN natsStatus natsMsgHeader_Delete(natsMsg *msg, const char *key); -// NATS_EXTERN bool natsMsg_IsNoResponders(natsMsg *msg); -// NATS_EXTERN void natsMsg_Destroy(natsMsg *msg); -// NATS_EXTERN uint64_t stanMsg_GetSequence(const stanMsg *msg); -// NATS_EXTERN int64_t stanMsg_GetTimestamp(const stanMsg *msg); -// NATS_EXTERN bool stanMsg_IsRedelivered(const stanMsg *msg); -// NATS_EXTERN const char* stanMsg_GetData(const stanMsg *msg); -// NATS_EXTERN int stanMsg_GetDataLength(const stanMsg *msg); -// NATS_EXTERN void stanMsg_Destroy(stanMsg *msg); - // NATS_EXTERN natsStatus natsSubscription_NoDeliveryDelay(natsSubscription *sub); // NATS_EXTERN natsStatus natsSubscription_NextMsg(natsMsg **nextMsg, natsSubscription *sub, int64_t timeout); // NATS_EXTERN natsStatus natsSubscription_Unsubscribe(natsSubscription *sub); @@ -254,12 +234,3 @@ pub const Statistics = opaque { // NATS_EXTERN natsStatus natsSubscription_DrainCompletionStatus(natsSubscription *sub); // NATS_EXTERN natsStatus natsSubscription_SetOnCompleteCB(natsSubscription *sub, natsOnCompleteCB cb, void *closure); // NATS_EXTERN void natsSubscription_Destroy(natsSubscription *sub); - -// NATS_EXTERN natsStatus natsMsg_Ack(natsMsg *msg, jsOptions *opts); -// NATS_EXTERN natsStatus natsMsg_AckSync(natsMsg *msg, jsOptions *opts, jsErrCode *errCode); -// NATS_EXTERN natsStatus natsMsg_Nak(natsMsg *msg, jsOptions *opts); -// NATS_EXTERN natsStatus natsMsg_NakWithDelay(natsMsg *msg, int64_t delay, jsOptions *opts); -// NATS_EXTERN natsStatus natsMsg_InProgress(natsMsg *msg, jsOptions *opts); -// NATS_EXTERN natsStatus natsMsg_Term(natsMsg *msg, jsOptions *opts); -// NATS_EXTERN uint64_t natsMsg_GetSequence(natsMsg *msg); -// NATS_EXTERN int64_t natsMsg_GetTime(natsMsg *msg);