Compare commits
5 Commits
79a45fd2e3
...
3ced6db69d
Author | SHA1 | Date | |
---|---|---|---|
3ced6db69d | |||
970d274593 | |||
55690ced02 | |||
94a428139d | |||
b55dfe0732 |
@ -33,6 +33,8 @@ pub fn build(b: *std.Build) void {
|
||||
.optimize = optimize,
|
||||
});
|
||||
|
||||
main_tests.linkLibrary(nats_c);
|
||||
|
||||
const run_main_tests = b.addRunArtifact(main_tests);
|
||||
const test_step = b.step("test", "Run tests");
|
||||
test_step.dependOn(&run_main_tests.step);
|
||||
|
@ -6,8 +6,8 @@ pub const nats_c = @cImport({
|
||||
|
||||
const sub_ = @import("./subscription.zig");
|
||||
const Subscription = sub_.Subscription;
|
||||
const ThunkCallback = sub_.ThunkCallback;
|
||||
const messageThunk = sub_.messageThunk;
|
||||
const SubscriptionThunkCallback = sub_.SubscriptionThunkCallback;
|
||||
const subscriptionMessageThunk = sub_.subscriptionMessageThunk;
|
||||
|
||||
const msg_ = @import("./message.zig");
|
||||
const Message = msg_.Message;
|
||||
@ -78,7 +78,7 @@ pub const Connection = opaque {
|
||||
self: *Connection,
|
||||
comptime T: type,
|
||||
subject: [:0]const u8,
|
||||
callback: ThunkCallback(T),
|
||||
callback: SubscriptionThunkCallback(T),
|
||||
userdata: *T,
|
||||
) Error!*Subscription {
|
||||
var sub: *Subscription = undefined;
|
||||
@ -86,7 +86,7 @@ pub const Connection = opaque {
|
||||
@ptrCast(&sub),
|
||||
@ptrCast(self),
|
||||
subject,
|
||||
messageThunk(T, callback),
|
||||
subscriptionMessageThunk(T, callback),
|
||||
userdata,
|
||||
));
|
||||
return status.toError() orelse sub;
|
||||
|
145
src/message.zig
145
src/message.zig
@ -4,7 +4,24 @@ pub const nats_c = @cImport({
|
||||
@cInclude("nats/nats.h");
|
||||
});
|
||||
|
||||
const err_ = @import("./error.zig");
|
||||
const Error = err_.Error;
|
||||
const Status = err_.Status;
|
||||
|
||||
pub const Message = opaque {
|
||||
pub fn create(subject: [:0]const u8, reply: ?[:0]const u8, data: ?[]const u8) Error!*Message {
|
||||
var self: *Message = undefined;
|
||||
const status = Status.fromInt(nats_c.natsMsg_Create(
|
||||
@ptrCast(&self),
|
||||
subject.ptr,
|
||||
if (reply) |r| r.ptr else null,
|
||||
if (data) |d| d.ptr else null,
|
||||
if (data) |d| @intCast(d.len) else 0,
|
||||
));
|
||||
|
||||
return status.toError() orelse self;
|
||||
}
|
||||
|
||||
pub fn destroy(self: *Message) void {
|
||||
nats_c.natsMsg_Destroy(@ptrCast(self));
|
||||
}
|
||||
@ -27,4 +44,132 @@ pub const Message = opaque {
|
||||
pub fn getDataLength(self: *Message) usize {
|
||||
return @intCast(nats_c.natsMsg_GetDataLength(@ptrCast(self)));
|
||||
}
|
||||
|
||||
pub fn setHeaderValue(self: *Message, key: [:0]const u8, value: ?[:0]const u8) Error!void {
|
||||
const status = Status.fromInt(nats_c.natsMsgHeader_Set(@ptrCast(self), key.ptr, value.ptr));
|
||||
return status.raise();
|
||||
}
|
||||
|
||||
pub fn addHeaderValue(self: *Message, key: [:0]const u8, value: ?[:0]const u8) Error!void {
|
||||
const status = Status.fromInt(nats_c.natsMsgHeader_Add(@ptrCast(self), key.ptr, value.ptr));
|
||||
return status.raise();
|
||||
}
|
||||
|
||||
pub fn getHeaderValue(self: *Message, key: [:0]const u8) Error!?[:0]const u8 {
|
||||
var value: ?[*]u8 = null;
|
||||
const status = Status.fromInt(nats_c.natsMsgHeader_Get(@ptrCast(self), key.ptr, &value));
|
||||
|
||||
return status.toError() orelse if (value) |val| std.mem.sliceTo(u8, val, 0) else null;
|
||||
}
|
||||
|
||||
pub fn getAllHeaderValues(self: *Message, key: [:0]const u8) Error![]?[*]const u8 {
|
||||
var values: [*]?[*]const u8 = undefined;
|
||||
var count: c_int = 0;
|
||||
|
||||
const status = Status.fromInt(nats_c.natsMsgHeader_Values(@ptrCast(self), key.ptr, &values, &count));
|
||||
|
||||
// the user must use std.mem.spanTo on each item they want to read to get a
|
||||
// slice, since we can't do that automatically without having to allocate.
|
||||
return status.toError() orelse values[0..@intCast(count)];
|
||||
}
|
||||
|
||||
pub fn getAllHeaderKeys(self: *Message) Error![][*]const u8 {
|
||||
var keys: [*][*]const u8 = undefined;
|
||||
var count: c_int = 0;
|
||||
|
||||
const status = Status.fromInt(nats_c.natsMsgHeader_Keys(@ptrCast(self), &keys, &count));
|
||||
|
||||
// TODO: manually assert no keys are NULL?
|
||||
|
||||
// the user must use std.mem.spanTo on each item they want to read to get a
|
||||
// slice, since we can't do that automatically without having to allocate.
|
||||
return status.toError() orelse keys[0..@intCast(count)];
|
||||
}
|
||||
|
||||
pub fn deleteHeader(self: *Message, key: [:0]const u8) Error!void {
|
||||
const status = Status.fromInt(nats_c.natsMsgHeader_Delete(@ptrCast(self), key.ptr));
|
||||
return status.raise();
|
||||
}
|
||||
|
||||
pub fn isNoResponders(self: *Message) bool {
|
||||
return nats_c.natsMsg_IsNoResponders(@ptrCast(self));
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: not implementing jetstream API right now
|
||||
// NATS_EXTERN natsStatus natsMsg_Ack(natsMsg *msg, jsOptions *opts);
|
||||
// NATS_EXTERN natsStatus natsMsg_AckSync(natsMsg *msg, jsOptions *opts, jsErrCode *errCode);
|
||||
// NATS_EXTERN natsStatus natsMsg_Nak(natsMsg *msg, jsOptions *opts);
|
||||
// NATS_EXTERN natsStatus natsMsg_NakWithDelay(natsMsg *msg, int64_t delay, jsOptions *opts);
|
||||
// NATS_EXTERN natsStatus natsMsg_InProgress(natsMsg *msg, jsOptions *opts);
|
||||
// NATS_EXTERN natsStatus natsMsg_Term(natsMsg *msg, jsOptions *opts);
|
||||
// NATS_EXTERN uint64_t natsMsg_GetSequence(natsMsg *msg);
|
||||
// NATS_EXTERN int64_t natsMsg_GetTime(natsMsg *msg);
|
||||
|
||||
// TODO: not implementing streaming API right now
|
||||
// NATS_EXTERN uint64_t stanMsg_GetSequence(const stanMsg *msg);
|
||||
// NATS_EXTERN int64_t stanMsg_GetTimestamp(const stanMsg *msg);
|
||||
// NATS_EXTERN bool stanMsg_IsRedelivered(const stanMsg *msg);
|
||||
// NATS_EXTERN const char* stanMsg_GetData(const stanMsg *msg);
|
||||
// NATS_EXTERN int stanMsg_GetDataLength(const stanMsg *msg);
|
||||
// NATS_EXTERN void stanMsg_Destroy(stanMsg *msg);
|
||||
|
||||
test "message: create message" {
|
||||
const subject = "hello";
|
||||
const reply = "reply";
|
||||
const data = "world";
|
||||
|
||||
const nats = @import("./nats.zig");
|
||||
|
||||
// have to initialize the library so the reference counter can correctly destroy
|
||||
// objects, otherwise we segfault on trying to free the memory.
|
||||
try nats.init(-1);
|
||||
defer nats.deinit();
|
||||
|
||||
const message = try Message.create(subject, reply, data);
|
||||
defer message.destroy();
|
||||
|
||||
const message2 = try Message.create(subject, null, data);
|
||||
defer message2.destroy();
|
||||
|
||||
const message3 = try Message.create(subject, data, null);
|
||||
defer message3.destroy();
|
||||
|
||||
const message4 = try Message.create(subject, null, null);
|
||||
defer message4.destroy();
|
||||
}
|
||||
|
||||
test "message: get subject" {
|
||||
const nats = @import("./nats.zig");
|
||||
|
||||
try nats.init(-1);
|
||||
defer nats.deinit();
|
||||
|
||||
const subject = "hello";
|
||||
const message = try Message.create(subject, null, null);
|
||||
defer message.destroy();
|
||||
|
||||
const received = message.getSubject();
|
||||
try std.testing.expectEqualStrings(subject, received);
|
||||
}
|
||||
|
||||
test "message: get reply" {
|
||||
const nats = @import("./nats.zig");
|
||||
|
||||
try nats.init(-1);
|
||||
defer nats.deinit();
|
||||
|
||||
const subject = "hello";
|
||||
const reply = "reply";
|
||||
const message = try Message.create(subject, reply, null);
|
||||
defer message.destroy();
|
||||
|
||||
const received = message.getReply() orelse return error.TestUnexpectedResult;
|
||||
try std.testing.expectEqualStrings(reply, received);
|
||||
|
||||
const message2 = try Message.create(subject, null, null);
|
||||
defer message2.destroy();
|
||||
|
||||
const received2 = message2.getReply();
|
||||
try std.testing.expect(received2 == null);
|
||||
}
|
||||
|
58
src/nats.zig
58
src/nats.zig
@ -113,7 +113,7 @@ pub fn deinit() void {
|
||||
}
|
||||
|
||||
pub fn deinitWait(timeout: i64) Error!void {
|
||||
const status = Status.fromInt(nats_c.natsCloseAndWait(timeout));
|
||||
const status = Status.fromInt(nats_c.nats_CloseAndWait(timeout));
|
||||
return status.raise();
|
||||
}
|
||||
|
||||
@ -150,6 +150,10 @@ pub const Statistics = opaque {
|
||||
}
|
||||
};
|
||||
|
||||
test {
|
||||
std.testing.refAllDecls(@This());
|
||||
}
|
||||
|
||||
// NATS_EXTERN natsStatus nats_Sign(const char *encodedSeed, const char *input, unsigned char **signature, int *signatureLength);
|
||||
|
||||
// NATS_EXTERN natsStatus natsOptions_Create(natsOptions **newOpts);
|
||||
@ -211,55 +215,3 @@ pub const Statistics = opaque {
|
||||
// NATS_EXTERN natsStatus natsInbox_Create(natsInbox **newInbox);
|
||||
// NATS_EXTERN void natsInbox_Destroy(natsInbox *inbox);
|
||||
// NATS_EXTERN void natsMsgList_Destroy(natsMsgList *list);
|
||||
|
||||
// NATS_EXTERN natsStatus natsMsg_Create(natsMsg **newMsg, const char *subj, const char *reply, const char *data, int dataLen);
|
||||
// NATS_EXTERN const char* natsMsg_GetSubject(const natsMsg *msg);
|
||||
// NATS_EXTERN const char* natsMsg_GetReply(const natsMsg *msg);
|
||||
// NATS_EXTERN const char* natsMsg_GetData(const natsMsg *msg);
|
||||
// NATS_EXTERN int natsMsg_GetDataLength(const natsMsg *msg);
|
||||
// NATS_EXTERN natsStatus natsMsgHeader_Set(natsMsg *msg, const char *key, const char *value);
|
||||
// NATS_EXTERN natsStatus natsMsgHeader_Add(natsMsg *msg, const char *key, const char *value);
|
||||
// NATS_EXTERN natsStatus natsMsgHeader_Get(natsMsg *msg, const char *key, const char **value);
|
||||
// NATS_EXTERN natsStatus natsMsgHeader_Values(natsMsg *msg, const char *key, const char* **values, int *count);
|
||||
// NATS_EXTERN natsStatus natsMsgHeader_Keys(natsMsg *msg, const char* **keys, int *count);
|
||||
// NATS_EXTERN natsStatus natsMsgHeader_Delete(natsMsg *msg, const char *key);
|
||||
// NATS_EXTERN bool natsMsg_IsNoResponders(natsMsg *msg);
|
||||
// NATS_EXTERN void natsMsg_Destroy(natsMsg *msg);
|
||||
// NATS_EXTERN uint64_t stanMsg_GetSequence(const stanMsg *msg);
|
||||
// NATS_EXTERN int64_t stanMsg_GetTimestamp(const stanMsg *msg);
|
||||
// NATS_EXTERN bool stanMsg_IsRedelivered(const stanMsg *msg);
|
||||
// NATS_EXTERN const char* stanMsg_GetData(const stanMsg *msg);
|
||||
// NATS_EXTERN int stanMsg_GetDataLength(const stanMsg *msg);
|
||||
// NATS_EXTERN void stanMsg_Destroy(stanMsg *msg);
|
||||
|
||||
// NATS_EXTERN natsStatus natsSubscription_NoDeliveryDelay(natsSubscription *sub);
|
||||
// NATS_EXTERN natsStatus natsSubscription_NextMsg(natsMsg **nextMsg, natsSubscription *sub, int64_t timeout);
|
||||
// NATS_EXTERN natsStatus natsSubscription_Unsubscribe(natsSubscription *sub);
|
||||
// NATS_EXTERN natsStatus natsSubscription_AutoUnsubscribe(natsSubscription *sub, int max);
|
||||
// NATS_EXTERN natsStatus natsSubscription_QueuedMsgs(natsSubscription *sub, uint64_t *queuedMsgs);
|
||||
// NATS_EXTERN int64_t natsSubscription_GetID(natsSubscription* sub);
|
||||
// NATS_EXTERN const char* natsSubscription_GetSubject(natsSubscription* sub);
|
||||
// NATS_EXTERN natsStatus natsSubscription_SetPendingLimits(natsSubscription *sub, int msgLimit, int bytesLimit);
|
||||
// NATS_EXTERN natsStatus natsSubscription_GetPendingLimits(natsSubscription *sub, int *msgLimit, int *bytesLimit);
|
||||
// NATS_EXTERN natsStatus natsSubscription_GetPending(natsSubscription *sub, int *msgs, int *bytes);
|
||||
// NATS_EXTERN natsStatus natsSubscription_GetDelivered(natsSubscription *sub, int64_t *msgs);
|
||||
// NATS_EXTERN natsStatus natsSubscription_GetDropped(natsSubscription *sub, int64_t *msgs);
|
||||
// NATS_EXTERN natsStatus natsSubscription_GetMaxPending(natsSubscription *sub, int *msgs, int *bytes);
|
||||
// NATS_EXTERN natsStatus natsSubscription_ClearMaxPending(natsSubscription *sub);
|
||||
// NATS_EXTERN natsStatus natsSubscription_GetStats(natsSubscription *sub, int *pendingMsgs, int *pendingBytes, int *maxPendingMsgs, int *maxPendingBytes, int64_t *deliveredMsgs, int64_t *droppedMsgs);
|
||||
// NATS_EXTERN bool natsSubscription_IsValid(natsSubscription *sub);
|
||||
// NATS_EXTERN natsStatus natsSubscription_Drain(natsSubscription *sub);
|
||||
// NATS_EXTERN natsStatus natsSubscription_DrainTimeout(natsSubscription *sub, int64_t timeout);
|
||||
// NATS_EXTERN natsStatus natsSubscription_WaitForDrainCompletion(natsSubscription *sub, int64_t timeout);
|
||||
// NATS_EXTERN natsStatus natsSubscription_DrainCompletionStatus(natsSubscription *sub);
|
||||
// NATS_EXTERN natsStatus natsSubscription_SetOnCompleteCB(natsSubscription *sub, natsOnCompleteCB cb, void *closure);
|
||||
// NATS_EXTERN void natsSubscription_Destroy(natsSubscription *sub);
|
||||
|
||||
// NATS_EXTERN natsStatus natsMsg_Ack(natsMsg *msg, jsOptions *opts);
|
||||
// NATS_EXTERN natsStatus natsMsg_AckSync(natsMsg *msg, jsOptions *opts, jsErrCode *errCode);
|
||||
// NATS_EXTERN natsStatus natsMsg_Nak(natsMsg *msg, jsOptions *opts);
|
||||
// NATS_EXTERN natsStatus natsMsg_NakWithDelay(natsMsg *msg, int64_t delay, jsOptions *opts);
|
||||
// NATS_EXTERN natsStatus natsMsg_InProgress(natsMsg *msg, jsOptions *opts);
|
||||
// NATS_EXTERN natsStatus natsMsg_Term(natsMsg *msg, jsOptions *opts);
|
||||
// NATS_EXTERN uint64_t natsMsg_GetSequence(natsMsg *msg);
|
||||
// NATS_EXTERN int64_t natsMsg_GetTime(natsMsg *msg);
|
||||
|
@ -10,14 +10,178 @@ const err_ = @import("./error.zig");
|
||||
const Error = err_.Error;
|
||||
const Status = err_.Status;
|
||||
|
||||
const SubCallback = fn (?*nats_c.natsConnection, ?*nats_c.natsSubscription, ?*nats_c.natsMsg, ?*anyopaque) callconv(.C) void;
|
||||
pub fn ThunkCallback(comptime T: type) type {
|
||||
pub const MessageCount = struct {
|
||||
messages: c_int = 0,
|
||||
bytes: c_int = 0,
|
||||
};
|
||||
|
||||
pub const SubscriptionStats = struct {
|
||||
pending: MessageCount = .{},
|
||||
max_pending: MessageCount = .{},
|
||||
delivered_messages: i64 = 0,
|
||||
dropped_messages: i64 = 0,
|
||||
};
|
||||
|
||||
pub const Subscription = opaque {
|
||||
pub fn isValid(self: *Subscription) bool {
|
||||
return nats_c.natsSubscription_IsValid(@ptrCast(self));
|
||||
}
|
||||
|
||||
pub fn destroy(self: *Subscription) void {
|
||||
nats_c.natsSubscription_Destroy(@ptrCast(self));
|
||||
}
|
||||
|
||||
pub fn unsubscribe(self: *Subscription) Error!void {
|
||||
return Status.fromInt(nats_c.natsSubscription_Unsubscribe(@ptrCast(self))).raise();
|
||||
}
|
||||
|
||||
pub fn autoUnsubscribe(self: *Subscription, max: c_int) Error!void {
|
||||
return Status.fromInt(nats_c.natsSubscription_AutoUnsubscribe(@ptrCast(self), max)).raise();
|
||||
}
|
||||
|
||||
pub fn nextMessage(self: *Subscription, timeout: i64) Error!*Message {
|
||||
var message: *Message = undefined;
|
||||
const status = Status.fromInt(nats_c.natsSubscription_NextMsg(
|
||||
@ptrCast(&message),
|
||||
@ptrCast(self),
|
||||
timeout,
|
||||
));
|
||||
|
||||
return status.toError() orelse message;
|
||||
}
|
||||
|
||||
pub fn queuedMessageCount(self: *Subscription) Error!u64 {
|
||||
var count: u64 = 0;
|
||||
const status = Status.fromInt(nats_c.natsSubscription_QueuedMsgs(@ptrCast(self), &count));
|
||||
return status.toError() orelse count;
|
||||
}
|
||||
|
||||
pub fn getId(self: *Subscription) i64 {
|
||||
// TODO: invalid/closed subscriptions return 0. Should we convert that into an
|
||||
// error? could return error.InvalidSubscription
|
||||
return nats_c.natsSubscription_GetID(@ptrCast(self));
|
||||
}
|
||||
|
||||
pub fn getSubject(self: *Subscription) ?[:0]const u8 {
|
||||
// invalid/closed subscriptions return null. should we convert that into an
|
||||
// error? could return error.InvalidSubscription
|
||||
const result = nats_c.natsSubscription_GetSubject(@ptrCast(self)) orelse return null;
|
||||
return std.mem.spanTo(u8, result, 0);
|
||||
}
|
||||
|
||||
pub fn setPendingLimits(self: *Subscription, limit: MessageCount) Error!void {
|
||||
return Status.fromInt(
|
||||
nats_c.natsSubscription_SetPendingLimits(@ptrCast(self), limit.messages, limit.bytes),
|
||||
).raise();
|
||||
}
|
||||
|
||||
pub fn getPendingLimits(self: *Subscription) Error!MessageCount {
|
||||
var result: MessageCount = .{};
|
||||
const status = Status.fromInt(
|
||||
nats_c.natsSubscription_GetPendingLimits(@ptrCast(self), &result.messages, &result.bytes),
|
||||
);
|
||||
|
||||
return status.toError() orelse result;
|
||||
}
|
||||
|
||||
pub fn getPending(self: *Subscription) Error!MessageCount {
|
||||
var result: MessageCount = .{};
|
||||
const status = Status.fromInt(
|
||||
nats_c.natsSubscription_GetPending(@ptrCast(self), &result.messages, &result.bytes),
|
||||
);
|
||||
|
||||
return status.toError() orelse result;
|
||||
}
|
||||
|
||||
pub fn getMaxPending(self: *Subscription) Error!MessageCount {
|
||||
var result: MessageCount = .{};
|
||||
const status = Status.fromInt(
|
||||
nats_c.natsSubscription_GetMaxPending(@ptrCast(self), &result.messages, &result.bytes),
|
||||
);
|
||||
|
||||
return status.toError() orelse result;
|
||||
}
|
||||
|
||||
pub fn clearMaxPending(self: *Subscription) Error!void {
|
||||
return Status.fromInt(nats_c.natsSubscription_ClearMaxPending(@ptrCast(self))).raise();
|
||||
}
|
||||
|
||||
pub fn getDelivered(self: *Subscription) Error!i64 {
|
||||
var result: i64 = 0;
|
||||
const status = Status.fromInt(nats_c.natsSubscription_GetDelivered(@ptrCast(self), &result));
|
||||
|
||||
return status.toError() orelse result;
|
||||
}
|
||||
|
||||
pub fn getDropped(self: *Subscription) Error!i64 {
|
||||
var result: i64 = 0;
|
||||
const status = Status.fromInt(nats_c.natsSubscription_GetDropped(@ptrCast(self), &result));
|
||||
|
||||
return status.toError() orelse result;
|
||||
}
|
||||
|
||||
pub fn getStats(self: *Subscription) Error!SubscriptionStats {
|
||||
var result: SubscriptionStats = .{};
|
||||
const status = Status.fromInt(nats_c.natsSubscription_GetStats(
|
||||
@ptrCast(self),
|
||||
&result.pending.messages,
|
||||
&result.pending.bytes,
|
||||
&result.max_pending.messages,
|
||||
&result.max_pending.bytes,
|
||||
&result.delivered_messages,
|
||||
&result.dropped_messages,
|
||||
));
|
||||
|
||||
return status.toError() orelse result;
|
||||
}
|
||||
|
||||
pub fn drain(self: *Subscription) Error!void {
|
||||
return Status.fromInt(nats_c.natsSubscription_Drain(@ptrCast(self))).raise();
|
||||
}
|
||||
|
||||
pub fn drainTimeout(self: *Subscription, timeout: i64) Error!void {
|
||||
return Status.fromInt(nats_c.natsSubscription_DrainTimeout(@ptrCast(self), timeout)).raise();
|
||||
}
|
||||
|
||||
pub fn waitForDrainCompletion(self: *Subscription, timeout: i64) Error!void {
|
||||
return Status.fromInt(nats_c.natsSubscription_WaitForDrainCompletion(@ptrCast(self), timeout)).raise();
|
||||
}
|
||||
|
||||
pub fn drainCompletionStatus(self: *Subscription) ?Error {
|
||||
return Status.fromInt(nats_c.natsSubscription_DrainCompletionStatus(@ptrCast(self))).toError();
|
||||
}
|
||||
|
||||
pub fn setCompletionCallback(
|
||||
self: *Subscription,
|
||||
comptime T: type,
|
||||
comptime callback: *const CompletionThunkCallback(T),
|
||||
userdata: *T,
|
||||
) Error!void {
|
||||
return Status.fromInt(nats_c.natsSubscription_SetOnCompleteCB(
|
||||
@ptrCast(self),
|
||||
completionCallbackThunk(callback),
|
||||
userdata,
|
||||
)).raise();
|
||||
}
|
||||
};
|
||||
|
||||
const BareSubscriptionCallback = fn (
|
||||
?*nats_c.natsConnection,
|
||||
?*nats_c.natsSubscription,
|
||||
?*nats_c.natsMsg,
|
||||
?*anyopaque,
|
||||
) callconv(.C) void;
|
||||
|
||||
pub fn SubscriptionThunkCallback(comptime T: type) type {
|
||||
return fn (*T, *Connection, *Subscription, *Message) void;
|
||||
}
|
||||
|
||||
pub fn messageThunk(comptime T: type, comptime callback: *const ThunkCallback(T)) *const SubCallback {
|
||||
pub fn subscriptionMessageThunk(
|
||||
comptime T: type,
|
||||
comptime callback: *const SubscriptionThunkCallback(T),
|
||||
) *const BareSubscriptionCallback {
|
||||
return struct {
|
||||
pub fn thunk(
|
||||
fn thunk(
|
||||
conn: ?*nats_c.natsConnection,
|
||||
sub: ?*nats_c.natsSubscription,
|
||||
msg: ?*nats_c.natsMsg,
|
||||
@ -36,49 +200,19 @@ pub fn messageThunk(comptime T: type, comptime callback: *const ThunkCallback(T)
|
||||
}.thunk;
|
||||
}
|
||||
|
||||
pub const Subscription = opaque {
|
||||
pub fn destroy(self: *Subscription) void {
|
||||
nats_c.natsSubscription_Destroy(@ptrCast(self));
|
||||
}
|
||||
};
|
||||
|
||||
// pub fn Subscription(T: type) type {
|
||||
// return struct {
|
||||
// const Self = @This();
|
||||
|
||||
// _internal: *nats_c.natsSubscription,
|
||||
// userdata: T,
|
||||
// function: MessageCallback(T),
|
||||
|
||||
// pub fn create(
|
||||
// allocator: std.mem.Allocator,
|
||||
// userdata: T,
|
||||
// callback: MessageCallback(T),
|
||||
// ) AllocError!*Self {
|
||||
// const self: *Self = try std.mem.Allocator.create(Self);
|
||||
// self.userdata = userdata;
|
||||
// self.function = callback;
|
||||
// return self;
|
||||
// }
|
||||
|
||||
// pub fn wrap(
|
||||
// self: *Self,
|
||||
// internal: *nats_c.natsSubscription,
|
||||
// ) void {
|
||||
// // self._internal =
|
||||
// }
|
||||
|
||||
// fn thunk(
|
||||
// conn: ?*nats_c.natsConnection,
|
||||
// sub: ?*nats_c.natsSubscription,
|
||||
// msg: ?*nats_c.natsMsg,
|
||||
// userdata: ?*const anyopaque,
|
||||
// ) callconv(.C) void {
|
||||
// self.function(self.userdata, connection, subscription, message);
|
||||
// }
|
||||
// };
|
||||
// }
|
||||
|
||||
pub fn MessageCallback(comptime T: type) type {
|
||||
return *const fn (userdata: T, connection: *Connection, subscription: *Subscription, message: *Message) void;
|
||||
const BareCompletionCallback = fn (?*anyopaque) callconv(.C) void;
|
||||
pub fn CompletionThunkCallback(comptime T: type) type {
|
||||
return fn (*T) void;
|
||||
}
|
||||
|
||||
pub fn completionCallbackThunk(
|
||||
comptime T: type,
|
||||
comptime callback: *const CompletionThunkCallback(T),
|
||||
) *const BareSubscriptionCallback {
|
||||
return struct {
|
||||
fn thunk(userdata: ?*anyopaque) callconv(.C) void {
|
||||
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
|
||||
callback(data);
|
||||
}
|
||||
}.thunk;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user