From 3ff894cc0d556746ed8e68bca14d7805a015ec92 Mon Sep 17 00:00:00 2001 From: torque Date: Fri, 18 Aug 2023 23:01:33 -0700 Subject: [PATCH] keep thunkening There are a lot of callbacks to wrap. This is a sloppy commit that almost certainly needs fixes. --- src/connection.zig | 236 ++++++++++++++++++++++++++++++------------- src/subscription.zig | 25 ++--- src/thunk.zig | 23 +++++ 3 files changed, 194 insertions(+), 90 deletions(-) create mode 100644 src/thunk.zig diff --git a/src/connection.zig b/src/connection.zig index 4b69dfb..671aa62 100644 --- a/src/connection.zig +++ b/src/connection.zig @@ -16,6 +16,8 @@ const err_ = @import("./error.zig"); const Error = err_.Error; const Status = err_.Status; +const thunk = @import("./thunk.zig"); + pub const default_server_url: [:0]const u8 = nats_c.NATS_DEFAULT_URL; pub const Connection = opaque { @@ -178,12 +180,18 @@ const ConnectionOptions = opaque { ).raise(); } - // needs a simple thunk. same signature as the subscription completion callback and - // could use the same thunk. Perhaps I should move the thunks into a common module? - - // typedef const char* (*natsTokenHandler)(void *closure); - // NATS_EXTERN natsStatus natsOptions_SetTokenHandler(natsOptions *opts, natsTokenHandler tokenCb, void *closure); - // pub fn setTokenHandler(self: *ConnectionOptions, comptime T: type, callback: Thunked, userdata: T) Error!void + pub fn setTokenHandler( + self: *ConnectionOptions, + comptime T: type, + comptime callback: *const thunk.SimpleCallbackSignature(T), + userdata: T, + ) Error!void { + return Status.fromInt(nats_c.natsOptions_SetTokenHandler( + @ptrCast(self), + thunk.makeSimpleCallbackThunk(callback), + userdata, + )).raise(); + } pub fn setNoRandomize(self: *ConnectionOptions, no: bool) Error!void { return Status.fromInt( @@ -311,14 +319,20 @@ const ConnectionOptions = opaque { ).raise(); } - // needs a callback thunk - // typedef int64_t (*natsCustomReconnectDelayHandler)(natsConnection *nc, int attempts, void *closure); - // NATS_EXTERN natsStatus natsOptions_SetCustomReconnectDelay(natsOptions *opts, natsCustomReconnectDelayHandler cb, void *closure); - // pub fn setCustomReconnectDelay(self: *ConnectionOptions, max: c_int) Error!void { - // return Status.fromInt( - // nats_c.natsOptions_SetMaxReconnect(@ptrCast(self), max), - // ).raise(); - // } + pub fn setCustomReconnectDelay( + self: *ConnectionOptions, + comptime T: type, + comptime callback: *const ReconnectDelayCallbackSignature(T), + userdata: T, + ) Error!void { + return Status.fromInt( + nats_c.natsOptions_SetCustomReconnectDelay( + @ptrCast(self), + makeReconnectDelayCallbackThunk(T, callback), + userdata, + ), + ).raise(); + } pub fn setReconnectBufSize(self: *ConnectionOptions, size: c_int) Error!void { return Status.fromInt( @@ -343,61 +357,70 @@ const ConnectionOptions = opaque { // ).raise(); // } - // needs a callback thunk - // NATS_EXTERN natsStatus natsOptions_SetClosedCB(natsOptions *opts, natsConnectionHandler closedCb, void *closure); - // typedef void (*natsConnectionHandler)( - // natsConnection *nc, void *closure); - // pub fn setClosedCallback(self: *ConnectionOptions, max: c_int) Error!void { - // return Status.fromInt( - // nats_c.natsOptions_SetClosedCB(@ptrCast(self), max), - // ).raise(); - // } - - // needs a callback thunk - // NATS_EXTERN natsStatus natsOptions_SetDisconnectedCB(natsOptions *opts, natsConnectionHandler disconnectedCb, void *closure); - // typedef void (*natsConnectionHandler)( - // natsConnection *nc, void *closure); - // pub fn setDisconnectedCallback(self: *ConnectionOptions, max: c_int) Error!void { - // return Status.fromInt( - // nats_c.natsOptions_SetDisconnectedCB(@ptrCast(self), max), - // ).raise(); - // } - - // needs a callback thunk - // NATS_EXTERN natsStatus natsOptions_SetReconnectedCB(natsOptions *opts, natsConnectionHandler reconnectedCb, void *closure); - // typedef void (*natsConnectionHandler)( - // natsConnection *nc, void *closure); - // pub fn setReconnectedCallback(self: *ConnectionOptions, max: c_int) Error!void { - // return Status.fromInt( - // nats_c.natsOptions_SetReconnectedCB(@ptrCast(self), max), - // ).raise(); - // } - - // needs a callback thunk - // NATS_EXTERN natsStatus natsOptions_SetDiscoveredServersCB(natsOptions *opts, natsConnectionHandler discoveredServersCb, void *closure); - // typedef void (*natsConnectionHandler)( - // natsConnection *nc, void *closure); - // pub fn setDiscoveredServersCallback(self: *ConnectionOptions, max: c_int) Error!void { - // return Status.fromInt( - // nats_c.natsOptions_SetDiscoveredServersCB(@ptrCast(self), max), - // ).raise(); - // } - - pub fn ignoreDiscoveredServers(self: *ConnectionOptions, ignore: bool) Error!void { - return Status.fromInt( - nats_c.natsOptions_SetIgnoreDiscoveredServers(@ptrCast(self), ignore), - ).raise(); + pub fn setClosedCallback( + self: *ConnectionOptions, + comptime T: type, + comptime callback: *const ConnectionCallbackSignature(T), + userdata: *T, + ) Error!void { + return Status.fromInt(nats_c.natsOptions_SetClosedCB( + @ptrCast(self), + makeConnectionCallbackThunk(T, callback), + userdata, + )).raise(); } - // needs a callback thunk - // NATS_EXTERN natsStatus natsOptions_SetLameDuckModeCB(natsOptions *opts, natsConnectionHandler lameDuckCb, void *closure); - // typedef void (*natsConnectionHandler)( - // natsConnection *nc, void *closure); - // pub fn setLameDuckModeCallback(self: *ConnectionOptions, max: c_int) Error!void { - // return Status.fromInt( - // nats_c.natsOptions_SetLameDuckModeCB(@ptrCast(self), max), - // ).raise(); - // } + pub fn setDisconnectedCallback( + self: *ConnectionOptions, + comptime T: type, + comptime callback: *const ConnectionCallbackSignature(T), + userdata: *T, + ) Error!void { + return Status.fromInt(nats_c.natsOptions_SetClosedCB( + @ptrCast(self), + makeConnectionCallbackThunk(T, callback), + userdata, + )).raise(); + } + + pub fn setReconnectedCallback( + self: *ConnectionOptions, + comptime T: type, + comptime callback: *const ConnectionCallbackSignature(T), + userdata: *T, + ) Error!void { + return Status.fromInt(nats_c.natsOptions_SetClosedCB( + @ptrCast(self), + makeConnectionCallbackThunk(T, callback), + userdata, + )).raise(); + } + + pub fn setDiscoveredServersCallback( + self: *ConnectionOptions, + comptime T: type, + comptime callback: *const ConnectionCallbackSignature(T), + userdata: *T, + ) Error!void { + return Status.fromInt(nats_c.natsOptions_SetClosedCB( + @ptrCast(self), + makeConnectionCallbackThunk(T, callback), + userdata, + )).raise(); + } + + pub fn setLameDuckModeCallback( + self: *ConnectionOptions, + comptime T: type, + comptime callback: *const ConnectionCallbackSignature(T), + userdata: *T, + ) Error!void { + return Status.fromInt(nats_c.natsOptions_SetClosedCB( + @ptrCast(self), + makeConnectionCallbackThunk(T, callback), + userdata, + )).raise(); + } // needs a callback thunk // NATS_EXTERN natsStatus natsOptions_SetEventLoop(natsOptions *opts, void *loop, natsEvLoop_Attach attachCb, natsEvLoop_ReadAddRemove readCb, natsEvLoop_WriteAddRemove writeCb, natsEvLoop_Detach detachCb); @@ -406,12 +429,41 @@ const ConnectionOptions = opaque { // void *loop, // natsConnection *nc, // natsSock socket); - // pub fn setEventLoop(self: *ConnectionOptions, max: c_int) Error!void { - // return Status.fromInt( - // nats_c.natsOptions_SetEventLoop(@ptrCast(self), max), - // ).raise(); + // typedef natsStatus (*natsEvLoop_ReadAddRemove)( + // void *loop, + // bool add); + // typedef natsStatus (*natsEvLoop_WriteAddRemove)( + // void *loop, + // bool add); + // typedef natsStatus (*natsEvLoop_Detach)( + // void *loop); + + // pub fn setEventLoop( + // self: *ConnectionOptions, + // comptime T: type, + // comptime L: type, + // comptime attachCallback: *const AttachEventLoopCallbackSignature(T, L), + // comptime readCallback: *const AttachEventLoopCallbackSignature(T), + // comptime writeCallback: *const AttachEventLoopCallbackSignature(T), + // comptime detachCallback: *const thunk.SimpleCallbackSignature(T), + // loop: *L, + // ) Error!void { + // return Status.fromInt(nats_c.natsOptions_SetEventLoop( + // @ptrCast(self), + // @ptrCast(loop), + // makeAttachEventLoopCallbackThunk(T, L, attachCallback), + // makeEventLoopAddRemoveCallbackThunk(T, readCallback), + // makeEventLoopAddRemoveCallbackThunk(T, writeCallback), + // thunk.makeSimpleCallbackThunk(callback), + // )).raise(); // } + pub fn ignoreDiscoveredServers(self: *ConnectionOptions, ignore: bool) Error!void { + return Status.fromInt( + nats_c.natsOptions_SetIgnoreDiscoveredServers(@ptrCast(self), ignore), + ).raise(); + } + pub fn useGlobalMessageDelivery(self: *ConnectionOptions, use: bool) Error!void { return Status.fromInt( nats_c.natsOptions_UseGlobalMessageDelivery(@ptrCast(self), use), @@ -536,3 +588,45 @@ const ConnectionOptions = opaque { ).raise(); } }; + +const ConnectionCallback = fn (?*nats_c.natsConnection, ?*anyopaque) callconv(.C) void; + +pub fn ConnectionCallbackSignature(comptime T: type) type { + return fn (*Connection, *T) void; +} + +pub fn makeConnectionCallbackThunk( + comptime T: type, + comptime callback: *const ConnectionCallbackSignature(T), +) *const ConnectionCallback { + return struct { + fn thunk(conn: ?*nats_c.natsConnection, userdata: ?*anyopaque) callconv(.C) void { + const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable; + const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; + callback(connection, data); + } + }.thunk; +} + +const ReconnectDelayCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) i64; + +pub fn ReconnectDelayCallbackSignature(comptime T: type) type { + return fn (*Connection, c_int, *T) i64; +} + +pub fn makeReconnectDelayCallbackThunk( + comptime T: type, + comptime callback: *const ReconnectDelayCallbackSignature(T), +) *const ReconnectDelayCallback { + return struct { + fn thunk( + conn: ?*nats_c.natsConnection, + attempts: c_int, + userdata: ?*anyopaque, + ) callconv(.C) void { + const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable; + const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; + callback(connection, attempts, data); + } + }.thunk; +} diff --git a/src/subscription.zig b/src/subscription.zig index ce48133..2bacab8 100644 --- a/src/subscription.zig +++ b/src/subscription.zig @@ -5,11 +5,15 @@ pub const nats_c = @cImport({ }); const Connection = @import("./connection.zig").Connection; + const Message = @import("./message.zig").Message; + const err_ = @import("./error.zig"); const Error = err_.Error; const Status = err_.Status; +const thunk = @import("./thunk.zig"); + pub const MessageCount = struct { messages: c_int = 0, bytes: c_int = 0, @@ -154,12 +158,12 @@ pub const Subscription = opaque { pub fn setCompletionCallback( self: *Subscription, comptime T: type, - comptime callback: *const CompletionThunkCallback(T), + comptime callback: *const thunk.SimpleCallbackThunkSignature(T), userdata: *T, ) Error!void { return Status.fromInt(nats_c.natsSubscription_SetOnCompleteCB( @ptrCast(self), - completionCallbackThunk(callback), + thunk.makeSimpleCallbackThunk(callback), userdata, )).raise(); } @@ -199,20 +203,3 @@ pub fn subscriptionMessageThunk( } }.thunk; } - -const BareCompletionCallback = fn (?*anyopaque) callconv(.C) void; -pub fn CompletionThunkCallback(comptime T: type) type { - return fn (*T) void; -} - -pub fn completionCallbackThunk( - comptime T: type, - comptime callback: *const CompletionThunkCallback(T), -) *const BareSubscriptionCallback { - return struct { - fn thunk(userdata: ?*anyopaque) callconv(.C) void { - const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; - callback(data); - } - }.thunk; -} diff --git a/src/thunk.zig b/src/thunk.zig new file mode 100644 index 0000000..5d26a26 --- /dev/null +++ b/src/thunk.zig @@ -0,0 +1,23 @@ +const std = @import("std"); + +pub const nats_c = @cImport({ + @cInclude("nats/nats.h"); +}); + +const SimpleCallback = fn (?*anyopaque) callconv(.C) void; + +pub fn SimpleCallbackThunkSignature(comptime T: type) type { + return fn (*T) void; +} + +pub fn makeSimpleCallbackThunk( + comptime T: type, + comptime callback: *const SimpleCallbackThunkSignature(T), +) *const SimpleCallback { + return struct { + fn thunk(userdata: ?*anyopaque) callconv(.C) void { + const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; + callback(data); + } + }.thunk; +}