diff --git a/src/connection.zig b/src/connection.zig index 81eddae..3d40dd8 100644 --- a/src/connection.zig +++ b/src/connection.zig @@ -6,8 +6,8 @@ pub const nats_c = @cImport({ const sub_ = @import("./subscription.zig"); const Subscription = sub_.Subscription; -const ThunkCallback = sub_.ThunkCallback; -const messageThunk = sub_.messageThunk; +const SubscriptionThunkCallback = sub_.SubscriptionThunkCallback; +const subscriptionMessageThunk = sub_.subscriptionMessageThunk; const msg_ = @import("./message.zig"); const Message = msg_.Message; @@ -78,7 +78,7 @@ pub const Connection = opaque { self: *Connection, comptime T: type, subject: [:0]const u8, - callback: ThunkCallback(T), + callback: SubscriptionThunkCallback(T), userdata: *T, ) Error!*Subscription { var sub: *Subscription = undefined; @@ -86,7 +86,7 @@ pub const Connection = opaque { @ptrCast(&sub), @ptrCast(self), subject, - messageThunk(T, callback), + subscriptionMessageThunk(T, callback), userdata, )); return status.toError() orelse sub; diff --git a/src/nats.zig b/src/nats.zig index 84ebad1..bd0145b 100644 --- a/src/nats.zig +++ b/src/nats.zig @@ -211,26 +211,3 @@ pub const Statistics = opaque { // NATS_EXTERN natsStatus natsInbox_Create(natsInbox **newInbox); // NATS_EXTERN void natsInbox_Destroy(natsInbox *inbox); // NATS_EXTERN void natsMsgList_Destroy(natsMsgList *list); - -// NATS_EXTERN natsStatus natsSubscription_NoDeliveryDelay(natsSubscription *sub); -// NATS_EXTERN natsStatus natsSubscription_NextMsg(natsMsg **nextMsg, natsSubscription *sub, int64_t timeout); -// NATS_EXTERN natsStatus natsSubscription_Unsubscribe(natsSubscription *sub); -// NATS_EXTERN natsStatus natsSubscription_AutoUnsubscribe(natsSubscription *sub, int max); -// NATS_EXTERN natsStatus natsSubscription_QueuedMsgs(natsSubscription *sub, uint64_t *queuedMsgs); -// NATS_EXTERN int64_t natsSubscription_GetID(natsSubscription* sub); -// NATS_EXTERN const char* natsSubscription_GetSubject(natsSubscription* sub); -// NATS_EXTERN natsStatus natsSubscription_SetPendingLimits(natsSubscription *sub, int msgLimit, int bytesLimit); -// NATS_EXTERN natsStatus natsSubscription_GetPendingLimits(natsSubscription *sub, int *msgLimit, int *bytesLimit); -// NATS_EXTERN natsStatus natsSubscription_GetPending(natsSubscription *sub, int *msgs, int *bytes); -// NATS_EXTERN natsStatus natsSubscription_GetDelivered(natsSubscription *sub, int64_t *msgs); -// NATS_EXTERN natsStatus natsSubscription_GetDropped(natsSubscription *sub, int64_t *msgs); -// NATS_EXTERN natsStatus natsSubscription_GetMaxPending(natsSubscription *sub, int *msgs, int *bytes); -// NATS_EXTERN natsStatus natsSubscription_ClearMaxPending(natsSubscription *sub); -// NATS_EXTERN natsStatus natsSubscription_GetStats(natsSubscription *sub, int *pendingMsgs, int *pendingBytes, int *maxPendingMsgs, int *maxPendingBytes, int64_t *deliveredMsgs, int64_t *droppedMsgs); -// NATS_EXTERN bool natsSubscription_IsValid(natsSubscription *sub); -// NATS_EXTERN natsStatus natsSubscription_Drain(natsSubscription *sub); -// NATS_EXTERN natsStatus natsSubscription_DrainTimeout(natsSubscription *sub, int64_t timeout); -// NATS_EXTERN natsStatus natsSubscription_WaitForDrainCompletion(natsSubscription *sub, int64_t timeout); -// NATS_EXTERN natsStatus natsSubscription_DrainCompletionStatus(natsSubscription *sub); -// NATS_EXTERN natsStatus natsSubscription_SetOnCompleteCB(natsSubscription *sub, natsOnCompleteCB cb, void *closure); -// NATS_EXTERN void natsSubscription_Destroy(natsSubscription *sub); diff --git a/src/subscription.zig b/src/subscription.zig index ef07946..ce48133 100644 --- a/src/subscription.zig +++ b/src/subscription.zig @@ -10,14 +10,178 @@ const err_ = @import("./error.zig"); const Error = err_.Error; const Status = err_.Status; -const SubCallback = fn (?*nats_c.natsConnection, ?*nats_c.natsSubscription, ?*nats_c.natsMsg, ?*anyopaque) callconv(.C) void; -pub fn ThunkCallback(comptime T: type) type { +pub const MessageCount = struct { + messages: c_int = 0, + bytes: c_int = 0, +}; + +pub const SubscriptionStats = struct { + pending: MessageCount = .{}, + max_pending: MessageCount = .{}, + delivered_messages: i64 = 0, + dropped_messages: i64 = 0, +}; + +pub const Subscription = opaque { + pub fn isValid(self: *Subscription) bool { + return nats_c.natsSubscription_IsValid(@ptrCast(self)); + } + + pub fn destroy(self: *Subscription) void { + nats_c.natsSubscription_Destroy(@ptrCast(self)); + } + + pub fn unsubscribe(self: *Subscription) Error!void { + return Status.fromInt(nats_c.natsSubscription_Unsubscribe(@ptrCast(self))).raise(); + } + + pub fn autoUnsubscribe(self: *Subscription, max: c_int) Error!void { + return Status.fromInt(nats_c.natsSubscription_AutoUnsubscribe(@ptrCast(self), max)).raise(); + } + + pub fn nextMessage(self: *Subscription, timeout: i64) Error!*Message { + var message: *Message = undefined; + const status = Status.fromInt(nats_c.natsSubscription_NextMsg( + @ptrCast(&message), + @ptrCast(self), + timeout, + )); + + return status.toError() orelse message; + } + + pub fn queuedMessageCount(self: *Subscription) Error!u64 { + var count: u64 = 0; + const status = Status.fromInt(nats_c.natsSubscription_QueuedMsgs(@ptrCast(self), &count)); + return status.toError() orelse count; + } + + pub fn getId(self: *Subscription) i64 { + // TODO: invalid/closed subscriptions return 0. Should we convert that into an + // error? could return error.InvalidSubscription + return nats_c.natsSubscription_GetID(@ptrCast(self)); + } + + pub fn getSubject(self: *Subscription) ?[:0]const u8 { + // invalid/closed subscriptions return null. should we convert that into an + // error? could return error.InvalidSubscription + const result = nats_c.natsSubscription_GetSubject(@ptrCast(self)) orelse return null; + return std.mem.spanTo(u8, result, 0); + } + + pub fn setPendingLimits(self: *Subscription, limit: MessageCount) Error!void { + return Status.fromInt( + nats_c.natsSubscription_SetPendingLimits(@ptrCast(self), limit.messages, limit.bytes), + ).raise(); + } + + pub fn getPendingLimits(self: *Subscription) Error!MessageCount { + var result: MessageCount = .{}; + const status = Status.fromInt( + nats_c.natsSubscription_GetPendingLimits(@ptrCast(self), &result.messages, &result.bytes), + ); + + return status.toError() orelse result; + } + + pub fn getPending(self: *Subscription) Error!MessageCount { + var result: MessageCount = .{}; + const status = Status.fromInt( + nats_c.natsSubscription_GetPending(@ptrCast(self), &result.messages, &result.bytes), + ); + + return status.toError() orelse result; + } + + pub fn getMaxPending(self: *Subscription) Error!MessageCount { + var result: MessageCount = .{}; + const status = Status.fromInt( + nats_c.natsSubscription_GetMaxPending(@ptrCast(self), &result.messages, &result.bytes), + ); + + return status.toError() orelse result; + } + + pub fn clearMaxPending(self: *Subscription) Error!void { + return Status.fromInt(nats_c.natsSubscription_ClearMaxPending(@ptrCast(self))).raise(); + } + + pub fn getDelivered(self: *Subscription) Error!i64 { + var result: i64 = 0; + const status = Status.fromInt(nats_c.natsSubscription_GetDelivered(@ptrCast(self), &result)); + + return status.toError() orelse result; + } + + pub fn getDropped(self: *Subscription) Error!i64 { + var result: i64 = 0; + const status = Status.fromInt(nats_c.natsSubscription_GetDropped(@ptrCast(self), &result)); + + return status.toError() orelse result; + } + + pub fn getStats(self: *Subscription) Error!SubscriptionStats { + var result: SubscriptionStats = .{}; + const status = Status.fromInt(nats_c.natsSubscription_GetStats( + @ptrCast(self), + &result.pending.messages, + &result.pending.bytes, + &result.max_pending.messages, + &result.max_pending.bytes, + &result.delivered_messages, + &result.dropped_messages, + )); + + return status.toError() orelse result; + } + + pub fn drain(self: *Subscription) Error!void { + return Status.fromInt(nats_c.natsSubscription_Drain(@ptrCast(self))).raise(); + } + + pub fn drainTimeout(self: *Subscription, timeout: i64) Error!void { + return Status.fromInt(nats_c.natsSubscription_DrainTimeout(@ptrCast(self), timeout)).raise(); + } + + pub fn waitForDrainCompletion(self: *Subscription, timeout: i64) Error!void { + return Status.fromInt(nats_c.natsSubscription_WaitForDrainCompletion(@ptrCast(self), timeout)).raise(); + } + + pub fn drainCompletionStatus(self: *Subscription) ?Error { + return Status.fromInt(nats_c.natsSubscription_DrainCompletionStatus(@ptrCast(self))).toError(); + } + + pub fn setCompletionCallback( + self: *Subscription, + comptime T: type, + comptime callback: *const CompletionThunkCallback(T), + userdata: *T, + ) Error!void { + return Status.fromInt(nats_c.natsSubscription_SetOnCompleteCB( + @ptrCast(self), + completionCallbackThunk(callback), + userdata, + )).raise(); + } +}; + +const BareSubscriptionCallback = fn ( + ?*nats_c.natsConnection, + ?*nats_c.natsSubscription, + ?*nats_c.natsMsg, + ?*anyopaque, +) callconv(.C) void; + +pub fn SubscriptionThunkCallback(comptime T: type) type { return fn (*T, *Connection, *Subscription, *Message) void; } -pub fn messageThunk(comptime T: type, comptime callback: *const ThunkCallback(T)) *const SubCallback { +pub fn subscriptionMessageThunk( + comptime T: type, + comptime callback: *const SubscriptionThunkCallback(T), +) *const BareSubscriptionCallback { return struct { - pub fn thunk( + fn thunk( conn: ?*nats_c.natsConnection, sub: ?*nats_c.natsSubscription, msg: ?*nats_c.natsMsg, @@ -36,8 +200,19 @@ pub fn messageThunk(comptime T: type, comptime callback: *const ThunkCallback(T) }.thunk; } -pub const Subscription = opaque { - pub fn destroy(self: *Subscription) void { - nats_c.natsSubscription_Destroy(@ptrCast(self)); - } -}; +const BareCompletionCallback = fn (?*anyopaque) callconv(.C) void; +pub fn CompletionThunkCallback(comptime T: type) type { + return fn (*T) void; +} + +pub fn completionCallbackThunk( + comptime T: type, + comptime callback: *const CompletionThunkCallback(T), +) *const BareSubscriptionCallback { + return struct { + fn thunk(userdata: ?*anyopaque) callconv(.C) void { + const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; + callback(data); + } + }.thunk; +}