subscription: wrap all basic API calls

Hopefully I am not missing anything here. Will need to comb through the
public API to make sure no calls are missing.
This commit is contained in:
torque 2023-08-15 22:18:26 -07:00
parent 94a428139d
commit 55690ced02
Signed by: torque
SSH Key Fingerprint: SHA256:nCrXefBNo6EbjNSQhv0nXmEg/VuNq3sMF5b8zETw3Tk
3 changed files with 188 additions and 36 deletions

View File

@ -6,8 +6,8 @@ pub const nats_c = @cImport({
const sub_ = @import("./subscription.zig");
const Subscription = sub_.Subscription;
const ThunkCallback = sub_.ThunkCallback;
const messageThunk = sub_.messageThunk;
const SubscriptionThunkCallback = sub_.SubscriptionThunkCallback;
const subscriptionMessageThunk = sub_.subscriptionMessageThunk;
const msg_ = @import("./message.zig");
const Message = msg_.Message;
@ -78,7 +78,7 @@ pub const Connection = opaque {
self: *Connection,
comptime T: type,
subject: [:0]const u8,
callback: ThunkCallback(T),
callback: SubscriptionThunkCallback(T),
userdata: *T,
) Error!*Subscription {
var sub: *Subscription = undefined;
@ -86,7 +86,7 @@ pub const Connection = opaque {
@ptrCast(&sub),
@ptrCast(self),
subject,
messageThunk(T, callback),
subscriptionMessageThunk(T, callback),
userdata,
));
return status.toError() orelse sub;

View File

@ -211,26 +211,3 @@ pub const Statistics = opaque {
// NATS_EXTERN natsStatus natsInbox_Create(natsInbox **newInbox);
// NATS_EXTERN void natsInbox_Destroy(natsInbox *inbox);
// NATS_EXTERN void natsMsgList_Destroy(natsMsgList *list);
// NATS_EXTERN natsStatus natsSubscription_NoDeliveryDelay(natsSubscription *sub);
// NATS_EXTERN natsStatus natsSubscription_NextMsg(natsMsg **nextMsg, natsSubscription *sub, int64_t timeout);
// NATS_EXTERN natsStatus natsSubscription_Unsubscribe(natsSubscription *sub);
// NATS_EXTERN natsStatus natsSubscription_AutoUnsubscribe(natsSubscription *sub, int max);
// NATS_EXTERN natsStatus natsSubscription_QueuedMsgs(natsSubscription *sub, uint64_t *queuedMsgs);
// NATS_EXTERN int64_t natsSubscription_GetID(natsSubscription* sub);
// NATS_EXTERN const char* natsSubscription_GetSubject(natsSubscription* sub);
// NATS_EXTERN natsStatus natsSubscription_SetPendingLimits(natsSubscription *sub, int msgLimit, int bytesLimit);
// NATS_EXTERN natsStatus natsSubscription_GetPendingLimits(natsSubscription *sub, int *msgLimit, int *bytesLimit);
// NATS_EXTERN natsStatus natsSubscription_GetPending(natsSubscription *sub, int *msgs, int *bytes);
// NATS_EXTERN natsStatus natsSubscription_GetDelivered(natsSubscription *sub, int64_t *msgs);
// NATS_EXTERN natsStatus natsSubscription_GetDropped(natsSubscription *sub, int64_t *msgs);
// NATS_EXTERN natsStatus natsSubscription_GetMaxPending(natsSubscription *sub, int *msgs, int *bytes);
// NATS_EXTERN natsStatus natsSubscription_ClearMaxPending(natsSubscription *sub);
// NATS_EXTERN natsStatus natsSubscription_GetStats(natsSubscription *sub, int *pendingMsgs, int *pendingBytes, int *maxPendingMsgs, int *maxPendingBytes, int64_t *deliveredMsgs, int64_t *droppedMsgs);
// NATS_EXTERN bool natsSubscription_IsValid(natsSubscription *sub);
// NATS_EXTERN natsStatus natsSubscription_Drain(natsSubscription *sub);
// NATS_EXTERN natsStatus natsSubscription_DrainTimeout(natsSubscription *sub, int64_t timeout);
// NATS_EXTERN natsStatus natsSubscription_WaitForDrainCompletion(natsSubscription *sub, int64_t timeout);
// NATS_EXTERN natsStatus natsSubscription_DrainCompletionStatus(natsSubscription *sub);
// NATS_EXTERN natsStatus natsSubscription_SetOnCompleteCB(natsSubscription *sub, natsOnCompleteCB cb, void *closure);
// NATS_EXTERN void natsSubscription_Destroy(natsSubscription *sub);

View File

@ -10,14 +10,178 @@ const err_ = @import("./error.zig");
const Error = err_.Error;
const Status = err_.Status;
const SubCallback = fn (?*nats_c.natsConnection, ?*nats_c.natsSubscription, ?*nats_c.natsMsg, ?*anyopaque) callconv(.C) void;
pub fn ThunkCallback(comptime T: type) type {
pub const MessageCount = struct {
messages: c_int = 0,
bytes: c_int = 0,
};
pub const SubscriptionStats = struct {
pending: MessageCount = .{},
max_pending: MessageCount = .{},
delivered_messages: i64 = 0,
dropped_messages: i64 = 0,
};
pub const Subscription = opaque {
pub fn isValid(self: *Subscription) bool {
return nats_c.natsSubscription_IsValid(@ptrCast(self));
}
pub fn destroy(self: *Subscription) void {
nats_c.natsSubscription_Destroy(@ptrCast(self));
}
pub fn unsubscribe(self: *Subscription) Error!void {
return Status.fromInt(nats_c.natsSubscription_Unsubscribe(@ptrCast(self))).raise();
}
pub fn autoUnsubscribe(self: *Subscription, max: c_int) Error!void {
return Status.fromInt(nats_c.natsSubscription_AutoUnsubscribe(@ptrCast(self), max)).raise();
}
pub fn nextMessage(self: *Subscription, timeout: i64) Error!*Message {
var message: *Message = undefined;
const status = Status.fromInt(nats_c.natsSubscription_NextMsg(
@ptrCast(&message),
@ptrCast(self),
timeout,
));
return status.toError() orelse message;
}
pub fn queuedMessageCount(self: *Subscription) Error!u64 {
var count: u64 = 0;
const status = Status.fromInt(nats_c.natsSubscription_QueuedMsgs(@ptrCast(self), &count));
return status.toError() orelse count;
}
pub fn getId(self: *Subscription) i64 {
// TODO: invalid/closed subscriptions return 0. Should we convert that into an
// error? could return error.InvalidSubscription
return nats_c.natsSubscription_GetID(@ptrCast(self));
}
pub fn getSubject(self: *Subscription) ?[:0]const u8 {
// invalid/closed subscriptions return null. should we convert that into an
// error? could return error.InvalidSubscription
const result = nats_c.natsSubscription_GetSubject(@ptrCast(self)) orelse return null;
return std.mem.spanTo(u8, result, 0);
}
pub fn setPendingLimits(self: *Subscription, limit: MessageCount) Error!void {
return Status.fromInt(
nats_c.natsSubscription_SetPendingLimits(@ptrCast(self), limit.messages, limit.bytes),
).raise();
}
pub fn getPendingLimits(self: *Subscription) Error!MessageCount {
var result: MessageCount = .{};
const status = Status.fromInt(
nats_c.natsSubscription_GetPendingLimits(@ptrCast(self), &result.messages, &result.bytes),
);
return status.toError() orelse result;
}
pub fn getPending(self: *Subscription) Error!MessageCount {
var result: MessageCount = .{};
const status = Status.fromInt(
nats_c.natsSubscription_GetPending(@ptrCast(self), &result.messages, &result.bytes),
);
return status.toError() orelse result;
}
pub fn getMaxPending(self: *Subscription) Error!MessageCount {
var result: MessageCount = .{};
const status = Status.fromInt(
nats_c.natsSubscription_GetMaxPending(@ptrCast(self), &result.messages, &result.bytes),
);
return status.toError() orelse result;
}
pub fn clearMaxPending(self: *Subscription) Error!void {
return Status.fromInt(nats_c.natsSubscription_ClearMaxPending(@ptrCast(self))).raise();
}
pub fn getDelivered(self: *Subscription) Error!i64 {
var result: i64 = 0;
const status = Status.fromInt(nats_c.natsSubscription_GetDelivered(@ptrCast(self), &result));
return status.toError() orelse result;
}
pub fn getDropped(self: *Subscription) Error!i64 {
var result: i64 = 0;
const status = Status.fromInt(nats_c.natsSubscription_GetDropped(@ptrCast(self), &result));
return status.toError() orelse result;
}
pub fn getStats(self: *Subscription) Error!SubscriptionStats {
var result: SubscriptionStats = .{};
const status = Status.fromInt(nats_c.natsSubscription_GetStats(
@ptrCast(self),
&result.pending.messages,
&result.pending.bytes,
&result.max_pending.messages,
&result.max_pending.bytes,
&result.delivered_messages,
&result.dropped_messages,
));
return status.toError() orelse result;
}
pub fn drain(self: *Subscription) Error!void {
return Status.fromInt(nats_c.natsSubscription_Drain(@ptrCast(self))).raise();
}
pub fn drainTimeout(self: *Subscription, timeout: i64) Error!void {
return Status.fromInt(nats_c.natsSubscription_DrainTimeout(@ptrCast(self), timeout)).raise();
}
pub fn waitForDrainCompletion(self: *Subscription, timeout: i64) Error!void {
return Status.fromInt(nats_c.natsSubscription_WaitForDrainCompletion(@ptrCast(self), timeout)).raise();
}
pub fn drainCompletionStatus(self: *Subscription) ?Error {
return Status.fromInt(nats_c.natsSubscription_DrainCompletionStatus(@ptrCast(self))).toError();
}
pub fn setCompletionCallback(
self: *Subscription,
comptime T: type,
comptime callback: *const CompletionThunkCallback(T),
userdata: *T,
) Error!void {
return Status.fromInt(nats_c.natsSubscription_SetOnCompleteCB(
@ptrCast(self),
completionCallbackThunk(callback),
userdata,
)).raise();
}
};
const BareSubscriptionCallback = fn (
?*nats_c.natsConnection,
?*nats_c.natsSubscription,
?*nats_c.natsMsg,
?*anyopaque,
) callconv(.C) void;
pub fn SubscriptionThunkCallback(comptime T: type) type {
return fn (*T, *Connection, *Subscription, *Message) void;
}
pub fn messageThunk(comptime T: type, comptime callback: *const ThunkCallback(T)) *const SubCallback {
pub fn subscriptionMessageThunk(
comptime T: type,
comptime callback: *const SubscriptionThunkCallback(T),
) *const BareSubscriptionCallback {
return struct {
pub fn thunk(
fn thunk(
conn: ?*nats_c.natsConnection,
sub: ?*nats_c.natsSubscription,
msg: ?*nats_c.natsMsg,
@ -36,8 +200,19 @@ pub fn messageThunk(comptime T: type, comptime callback: *const ThunkCallback(T)
}.thunk;
}
pub const Subscription = opaque {
pub fn destroy(self: *Subscription) void {
nats_c.natsSubscription_Destroy(@ptrCast(self));
}
};
const BareCompletionCallback = fn (?*anyopaque) callconv(.C) void;
pub fn CompletionThunkCallback(comptime T: type) type {
return fn (*T) void;
}
pub fn completionCallbackThunk(
comptime T: type,
comptime callback: *const CompletionThunkCallback(T),
) *const BareSubscriptionCallback {
return struct {
fn thunk(userdata: ?*anyopaque) callconv(.C) void {
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
callback(data);
}
}.thunk;
}