message: wrap all basic API calls
The streaming API is not wrapped because we do not build nats.c with the necessary libraries. The jetstream API is not wrapped because I am extremely lazy and it's outside of the minimum viable scope at the moment. It can happen after the other basic APIs have been wrapped.
This commit is contained in:
parent
b55dfe0732
commit
94a428139d
@ -4,7 +4,24 @@ pub const nats_c = @cImport({
|
||||
@cInclude("nats/nats.h");
|
||||
});
|
||||
|
||||
const err_ = @import("./error.zig");
|
||||
const Error = err_.Error;
|
||||
const Status = err_.Status;
|
||||
|
||||
pub const Message = opaque {
|
||||
pub fn create(subject: [:0]const u8, reply: ?[:0]const u8, data: ?[]const u8) Error!*Message {
|
||||
var self: *Message = undefined;
|
||||
const status = Status.fromInt(nats_c.natsMsg_Create(
|
||||
@ptrCast(&self),
|
||||
subject.ptr,
|
||||
if (reply) |r| r.ptr else null,
|
||||
if (data) |d| d.ptr else null,
|
||||
if (data) |d| @intCast(d.len) else 0,
|
||||
));
|
||||
|
||||
return status.toError() orelse self;
|
||||
}
|
||||
|
||||
pub fn destroy(self: *Message) void {
|
||||
nats_c.natsMsg_Destroy(@ptrCast(self));
|
||||
}
|
||||
@ -27,4 +44,72 @@ pub const Message = opaque {
|
||||
pub fn getDataLength(self: *Message) usize {
|
||||
return @intCast(nats_c.natsMsg_GetDataLength(@ptrCast(self)));
|
||||
}
|
||||
|
||||
pub fn setHeaderValue(self: *Message, key: [:0]const u8, value: ?[:0]const u8) Error!void {
|
||||
const status = Status.fromInt(nats_c.natsMsgHeader_Set(@ptrCast(self), key.ptr, value.ptr));
|
||||
return status.raise();
|
||||
}
|
||||
|
||||
pub fn addHeaderValue(self: *Message, key: [:0]const u8, value: ?[:0]const u8) Error!void {
|
||||
const status = Status.fromInt(nats_c.natsMsgHeader_Add(@ptrCast(self), key.ptr, value.ptr));
|
||||
return status.raise();
|
||||
}
|
||||
|
||||
pub fn getHeaderValue(self: *Message, key: [:0]const u8) Error!?[:0]const u8 {
|
||||
var value: ?[*]u8 = null;
|
||||
const status = Status.fromInt(nats_c.natsMsgHeader_Get(@ptrCast(self), key.ptr, &value));
|
||||
|
||||
return status.toError() orelse if (value) |val| std.mem.sliceTo(u8, val, 0) else null;
|
||||
}
|
||||
|
||||
pub fn getAllHeaderValues(self: *Message, key: [:0]const u8) Error![]?[*]const u8 {
|
||||
var values: [*]?[*]const u8 = undefined;
|
||||
var count: c_int = 0;
|
||||
|
||||
const status = Status.fromInt(nats_c.natsMsgHeader_Values(@ptrCast(self), key.ptr, &values, &count));
|
||||
|
||||
// the user must use std.mem.spanTo on each item they want to read to get a
|
||||
// slice, since we can't do that automatically without having to allocate.
|
||||
return status.toError() orelse values[0..@intCast(count)];
|
||||
}
|
||||
|
||||
pub fn getAllHeaderKeys(self: *Message) Error![][*]const u8 {
|
||||
var keys: [*][*]const u8 = undefined;
|
||||
var count: c_int = 0;
|
||||
|
||||
const status = Status.fromInt(nats_c.natsMsgHeader_Keys(@ptrCast(self), &keys, &count));
|
||||
|
||||
// TODO: manually assert no keys are NULL?
|
||||
|
||||
// the user must use std.mem.spanTo on each item they want to read to get a
|
||||
// slice, since we can't do that automatically without having to allocate.
|
||||
return status.toError() orelse keys[0..@intCast(count)];
|
||||
}
|
||||
|
||||
pub fn deleteHeader(self: *Message, key: [:0]const u8) Error!void {
|
||||
const status = Status.fromInt(nats_c.natsMsgHeader_Delete(@ptrCast(self), key.ptr));
|
||||
return status.raise();
|
||||
}
|
||||
|
||||
pub fn isNoResponders(self: *Message) bool {
|
||||
return nats_c.natsMsg_IsNoResponders(@ptrCast(self));
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: not implementing jetstream API right now
|
||||
// NATS_EXTERN natsStatus natsMsg_Ack(natsMsg *msg, jsOptions *opts);
|
||||
// NATS_EXTERN natsStatus natsMsg_AckSync(natsMsg *msg, jsOptions *opts, jsErrCode *errCode);
|
||||
// NATS_EXTERN natsStatus natsMsg_Nak(natsMsg *msg, jsOptions *opts);
|
||||
// NATS_EXTERN natsStatus natsMsg_NakWithDelay(natsMsg *msg, int64_t delay, jsOptions *opts);
|
||||
// NATS_EXTERN natsStatus natsMsg_InProgress(natsMsg *msg, jsOptions *opts);
|
||||
// NATS_EXTERN natsStatus natsMsg_Term(natsMsg *msg, jsOptions *opts);
|
||||
// NATS_EXTERN uint64_t natsMsg_GetSequence(natsMsg *msg);
|
||||
// NATS_EXTERN int64_t natsMsg_GetTime(natsMsg *msg);
|
||||
|
||||
// TODO: not implementing streaming API right now
|
||||
// NATS_EXTERN uint64_t stanMsg_GetSequence(const stanMsg *msg);
|
||||
// NATS_EXTERN int64_t stanMsg_GetTimestamp(const stanMsg *msg);
|
||||
// NATS_EXTERN bool stanMsg_IsRedelivered(const stanMsg *msg);
|
||||
// NATS_EXTERN const char* stanMsg_GetData(const stanMsg *msg);
|
||||
// NATS_EXTERN int stanMsg_GetDataLength(const stanMsg *msg);
|
||||
// NATS_EXTERN void stanMsg_Destroy(stanMsg *msg);
|
||||
|
29
src/nats.zig
29
src/nats.zig
@ -212,26 +212,6 @@ pub const Statistics = opaque {
|
||||
// NATS_EXTERN void natsInbox_Destroy(natsInbox *inbox);
|
||||
// NATS_EXTERN void natsMsgList_Destroy(natsMsgList *list);
|
||||
|
||||
// NATS_EXTERN natsStatus natsMsg_Create(natsMsg **newMsg, const char *subj, const char *reply, const char *data, int dataLen);
|
||||
// NATS_EXTERN const char* natsMsg_GetSubject(const natsMsg *msg);
|
||||
// NATS_EXTERN const char* natsMsg_GetReply(const natsMsg *msg);
|
||||
// NATS_EXTERN const char* natsMsg_GetData(const natsMsg *msg);
|
||||
// NATS_EXTERN int natsMsg_GetDataLength(const natsMsg *msg);
|
||||
// NATS_EXTERN natsStatus natsMsgHeader_Set(natsMsg *msg, const char *key, const char *value);
|
||||
// NATS_EXTERN natsStatus natsMsgHeader_Add(natsMsg *msg, const char *key, const char *value);
|
||||
// NATS_EXTERN natsStatus natsMsgHeader_Get(natsMsg *msg, const char *key, const char **value);
|
||||
// NATS_EXTERN natsStatus natsMsgHeader_Values(natsMsg *msg, const char *key, const char* **values, int *count);
|
||||
// NATS_EXTERN natsStatus natsMsgHeader_Keys(natsMsg *msg, const char* **keys, int *count);
|
||||
// NATS_EXTERN natsStatus natsMsgHeader_Delete(natsMsg *msg, const char *key);
|
||||
// NATS_EXTERN bool natsMsg_IsNoResponders(natsMsg *msg);
|
||||
// NATS_EXTERN void natsMsg_Destroy(natsMsg *msg);
|
||||
// NATS_EXTERN uint64_t stanMsg_GetSequence(const stanMsg *msg);
|
||||
// NATS_EXTERN int64_t stanMsg_GetTimestamp(const stanMsg *msg);
|
||||
// NATS_EXTERN bool stanMsg_IsRedelivered(const stanMsg *msg);
|
||||
// NATS_EXTERN const char* stanMsg_GetData(const stanMsg *msg);
|
||||
// NATS_EXTERN int stanMsg_GetDataLength(const stanMsg *msg);
|
||||
// NATS_EXTERN void stanMsg_Destroy(stanMsg *msg);
|
||||
|
||||
// NATS_EXTERN natsStatus natsSubscription_NoDeliveryDelay(natsSubscription *sub);
|
||||
// NATS_EXTERN natsStatus natsSubscription_NextMsg(natsMsg **nextMsg, natsSubscription *sub, int64_t timeout);
|
||||
// NATS_EXTERN natsStatus natsSubscription_Unsubscribe(natsSubscription *sub);
|
||||
@ -254,12 +234,3 @@ pub const Statistics = opaque {
|
||||
// NATS_EXTERN natsStatus natsSubscription_DrainCompletionStatus(natsSubscription *sub);
|
||||
// NATS_EXTERN natsStatus natsSubscription_SetOnCompleteCB(natsSubscription *sub, natsOnCompleteCB cb, void *closure);
|
||||
// NATS_EXTERN void natsSubscription_Destroy(natsSubscription *sub);
|
||||
|
||||
// NATS_EXTERN natsStatus natsMsg_Ack(natsMsg *msg, jsOptions *opts);
|
||||
// NATS_EXTERN natsStatus natsMsg_AckSync(natsMsg *msg, jsOptions *opts, jsErrCode *errCode);
|
||||
// NATS_EXTERN natsStatus natsMsg_Nak(natsMsg *msg, jsOptions *opts);
|
||||
// NATS_EXTERN natsStatus natsMsg_NakWithDelay(natsMsg *msg, int64_t delay, jsOptions *opts);
|
||||
// NATS_EXTERN natsStatus natsMsg_InProgress(natsMsg *msg, jsOptions *opts);
|
||||
// NATS_EXTERN natsStatus natsMsg_Term(natsMsg *msg, jsOptions *opts);
|
||||
// NATS_EXTERN uint64_t natsMsg_GetSequence(natsMsg *msg);
|
||||
// NATS_EXTERN int64_t natsMsg_GetTime(natsMsg *msg);
|
||||
|
Loading…
x
Reference in New Issue
Block a user