keep thunkening

There are a lot of callbacks to wrap. This is a sloppy commit that
almost certainly needs fixes.
This commit is contained in:
torque 2023-08-18 23:01:33 -07:00
parent 2e1a5579b9
commit 3ff894cc0d
Signed by: torque
SSH Key Fingerprint: SHA256:nCrXefBNo6EbjNSQhv0nXmEg/VuNq3sMF5b8zETw3Tk
3 changed files with 194 additions and 90 deletions

View File

@ -16,6 +16,8 @@ const err_ = @import("./error.zig");
const Error = err_.Error;
const Status = err_.Status;
const thunk = @import("./thunk.zig");
pub const default_server_url: [:0]const u8 = nats_c.NATS_DEFAULT_URL;
pub const Connection = opaque {
@ -178,12 +180,18 @@ const ConnectionOptions = opaque {
).raise();
}
// needs a simple thunk. same signature as the subscription completion callback and
// could use the same thunk. Perhaps I should move the thunks into a common module?
// typedef const char* (*natsTokenHandler)(void *closure);
// NATS_EXTERN natsStatus natsOptions_SetTokenHandler(natsOptions *opts, natsTokenHandler tokenCb, void *closure);
// pub fn setTokenHandler(self: *ConnectionOptions, comptime T: type, callback: Thunked, userdata: T) Error!void
pub fn setTokenHandler(
self: *ConnectionOptions,
comptime T: type,
comptime callback: *const thunk.SimpleCallbackSignature(T),
userdata: T,
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetTokenHandler(
@ptrCast(self),
thunk.makeSimpleCallbackThunk(callback),
userdata,
)).raise();
}
pub fn setNoRandomize(self: *ConnectionOptions, no: bool) Error!void {
return Status.fromInt(
@ -311,14 +319,20 @@ const ConnectionOptions = opaque {
).raise();
}
// needs a callback thunk
// typedef int64_t (*natsCustomReconnectDelayHandler)(natsConnection *nc, int attempts, void *closure);
// NATS_EXTERN natsStatus natsOptions_SetCustomReconnectDelay(natsOptions *opts, natsCustomReconnectDelayHandler cb, void *closure);
// pub fn setCustomReconnectDelay(self: *ConnectionOptions, max: c_int) Error!void {
// return Status.fromInt(
// nats_c.natsOptions_SetMaxReconnect(@ptrCast(self), max),
// ).raise();
// }
pub fn setCustomReconnectDelay(
self: *ConnectionOptions,
comptime T: type,
comptime callback: *const ReconnectDelayCallbackSignature(T),
userdata: T,
) Error!void {
return Status.fromInt(
nats_c.natsOptions_SetCustomReconnectDelay(
@ptrCast(self),
makeReconnectDelayCallbackThunk(T, callback),
userdata,
),
).raise();
}
pub fn setReconnectBufSize(self: *ConnectionOptions, size: c_int) Error!void {
return Status.fromInt(
@ -343,61 +357,70 @@ const ConnectionOptions = opaque {
// ).raise();
// }
// needs a callback thunk
// NATS_EXTERN natsStatus natsOptions_SetClosedCB(natsOptions *opts, natsConnectionHandler closedCb, void *closure);
// typedef void (*natsConnectionHandler)(
// natsConnection *nc, void *closure);
// pub fn setClosedCallback(self: *ConnectionOptions, max: c_int) Error!void {
// return Status.fromInt(
// nats_c.natsOptions_SetClosedCB(@ptrCast(self), max),
// ).raise();
// }
// needs a callback thunk
// NATS_EXTERN natsStatus natsOptions_SetDisconnectedCB(natsOptions *opts, natsConnectionHandler disconnectedCb, void *closure);
// typedef void (*natsConnectionHandler)(
// natsConnection *nc, void *closure);
// pub fn setDisconnectedCallback(self: *ConnectionOptions, max: c_int) Error!void {
// return Status.fromInt(
// nats_c.natsOptions_SetDisconnectedCB(@ptrCast(self), max),
// ).raise();
// }
// needs a callback thunk
// NATS_EXTERN natsStatus natsOptions_SetReconnectedCB(natsOptions *opts, natsConnectionHandler reconnectedCb, void *closure);
// typedef void (*natsConnectionHandler)(
// natsConnection *nc, void *closure);
// pub fn setReconnectedCallback(self: *ConnectionOptions, max: c_int) Error!void {
// return Status.fromInt(
// nats_c.natsOptions_SetReconnectedCB(@ptrCast(self), max),
// ).raise();
// }
// needs a callback thunk
// NATS_EXTERN natsStatus natsOptions_SetDiscoveredServersCB(natsOptions *opts, natsConnectionHandler discoveredServersCb, void *closure);
// typedef void (*natsConnectionHandler)(
// natsConnection *nc, void *closure);
// pub fn setDiscoveredServersCallback(self: *ConnectionOptions, max: c_int) Error!void {
// return Status.fromInt(
// nats_c.natsOptions_SetDiscoveredServersCB(@ptrCast(self), max),
// ).raise();
// }
pub fn ignoreDiscoveredServers(self: *ConnectionOptions, ignore: bool) Error!void {
return Status.fromInt(
nats_c.natsOptions_SetIgnoreDiscoveredServers(@ptrCast(self), ignore),
).raise();
pub fn setClosedCallback(
self: *ConnectionOptions,
comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T),
userdata: *T,
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self),
makeConnectionCallbackThunk(T, callback),
userdata,
)).raise();
}
// needs a callback thunk
// NATS_EXTERN natsStatus natsOptions_SetLameDuckModeCB(natsOptions *opts, natsConnectionHandler lameDuckCb, void *closure);
// typedef void (*natsConnectionHandler)(
// natsConnection *nc, void *closure);
// pub fn setLameDuckModeCallback(self: *ConnectionOptions, max: c_int) Error!void {
// return Status.fromInt(
// nats_c.natsOptions_SetLameDuckModeCB(@ptrCast(self), max),
// ).raise();
// }
pub fn setDisconnectedCallback(
self: *ConnectionOptions,
comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T),
userdata: *T,
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self),
makeConnectionCallbackThunk(T, callback),
userdata,
)).raise();
}
pub fn setReconnectedCallback(
self: *ConnectionOptions,
comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T),
userdata: *T,
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self),
makeConnectionCallbackThunk(T, callback),
userdata,
)).raise();
}
pub fn setDiscoveredServersCallback(
self: *ConnectionOptions,
comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T),
userdata: *T,
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self),
makeConnectionCallbackThunk(T, callback),
userdata,
)).raise();
}
pub fn setLameDuckModeCallback(
self: *ConnectionOptions,
comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T),
userdata: *T,
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self),
makeConnectionCallbackThunk(T, callback),
userdata,
)).raise();
}
// needs a callback thunk
// NATS_EXTERN natsStatus natsOptions_SetEventLoop(natsOptions *opts, void *loop, natsEvLoop_Attach attachCb, natsEvLoop_ReadAddRemove readCb, natsEvLoop_WriteAddRemove writeCb, natsEvLoop_Detach detachCb);
@ -406,12 +429,41 @@ const ConnectionOptions = opaque {
// void *loop,
// natsConnection *nc,
// natsSock socket);
// pub fn setEventLoop(self: *ConnectionOptions, max: c_int) Error!void {
// return Status.fromInt(
// nats_c.natsOptions_SetEventLoop(@ptrCast(self), max),
// ).raise();
// typedef natsStatus (*natsEvLoop_ReadAddRemove)(
// void *loop,
// bool add);
// typedef natsStatus (*natsEvLoop_WriteAddRemove)(
// void *loop,
// bool add);
// typedef natsStatus (*natsEvLoop_Detach)(
// void *loop);
// pub fn setEventLoop(
// self: *ConnectionOptions,
// comptime T: type,
// comptime L: type,
// comptime attachCallback: *const AttachEventLoopCallbackSignature(T, L),
// comptime readCallback: *const AttachEventLoopCallbackSignature(T),
// comptime writeCallback: *const AttachEventLoopCallbackSignature(T),
// comptime detachCallback: *const thunk.SimpleCallbackSignature(T),
// loop: *L,
// ) Error!void {
// return Status.fromInt(nats_c.natsOptions_SetEventLoop(
// @ptrCast(self),
// @ptrCast(loop),
// makeAttachEventLoopCallbackThunk(T, L, attachCallback),
// makeEventLoopAddRemoveCallbackThunk(T, readCallback),
// makeEventLoopAddRemoveCallbackThunk(T, writeCallback),
// thunk.makeSimpleCallbackThunk(callback),
// )).raise();
// }
pub fn ignoreDiscoveredServers(self: *ConnectionOptions, ignore: bool) Error!void {
return Status.fromInt(
nats_c.natsOptions_SetIgnoreDiscoveredServers(@ptrCast(self), ignore),
).raise();
}
pub fn useGlobalMessageDelivery(self: *ConnectionOptions, use: bool) Error!void {
return Status.fromInt(
nats_c.natsOptions_UseGlobalMessageDelivery(@ptrCast(self), use),
@ -536,3 +588,45 @@ const ConnectionOptions = opaque {
).raise();
}
};
const ConnectionCallback = fn (?*nats_c.natsConnection, ?*anyopaque) callconv(.C) void;
pub fn ConnectionCallbackSignature(comptime T: type) type {
return fn (*Connection, *T) void;
}
pub fn makeConnectionCallbackThunk(
comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T),
) *const ConnectionCallback {
return struct {
fn thunk(conn: ?*nats_c.natsConnection, userdata: ?*anyopaque) callconv(.C) void {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
callback(connection, data);
}
}.thunk;
}
const ReconnectDelayCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) i64;
pub fn ReconnectDelayCallbackSignature(comptime T: type) type {
return fn (*Connection, c_int, *T) i64;
}
pub fn makeReconnectDelayCallbackThunk(
comptime T: type,
comptime callback: *const ReconnectDelayCallbackSignature(T),
) *const ReconnectDelayCallback {
return struct {
fn thunk(
conn: ?*nats_c.natsConnection,
attempts: c_int,
userdata: ?*anyopaque,
) callconv(.C) void {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
callback(connection, attempts, data);
}
}.thunk;
}

View File

@ -5,11 +5,15 @@ pub const nats_c = @cImport({
});
const Connection = @import("./connection.zig").Connection;
const Message = @import("./message.zig").Message;
const err_ = @import("./error.zig");
const Error = err_.Error;
const Status = err_.Status;
const thunk = @import("./thunk.zig");
pub const MessageCount = struct {
messages: c_int = 0,
bytes: c_int = 0,
@ -154,12 +158,12 @@ pub const Subscription = opaque {
pub fn setCompletionCallback(
self: *Subscription,
comptime T: type,
comptime callback: *const CompletionThunkCallback(T),
comptime callback: *const thunk.SimpleCallbackThunkSignature(T),
userdata: *T,
) Error!void {
return Status.fromInt(nats_c.natsSubscription_SetOnCompleteCB(
@ptrCast(self),
completionCallbackThunk(callback),
thunk.makeSimpleCallbackThunk(callback),
userdata,
)).raise();
}
@ -199,20 +203,3 @@ pub fn subscriptionMessageThunk(
}
}.thunk;
}
const BareCompletionCallback = fn (?*anyopaque) callconv(.C) void;
pub fn CompletionThunkCallback(comptime T: type) type {
return fn (*T) void;
}
pub fn completionCallbackThunk(
comptime T: type,
comptime callback: *const CompletionThunkCallback(T),
) *const BareSubscriptionCallback {
return struct {
fn thunk(userdata: ?*anyopaque) callconv(.C) void {
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
callback(data);
}
}.thunk;
}

23
src/thunk.zig Normal file
View File

@ -0,0 +1,23 @@
const std = @import("std");
pub const nats_c = @cImport({
@cInclude("nats/nats.h");
});
const SimpleCallback = fn (?*anyopaque) callconv(.C) void;
pub fn SimpleCallbackThunkSignature(comptime T: type) type {
return fn (*T) void;
}
pub fn makeSimpleCallbackThunk(
comptime T: type,
comptime callback: *const SimpleCallbackThunkSignature(T),
) *const SimpleCallback {
return struct {
fn thunk(userdata: ?*anyopaque) callconv(.C) void {
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
callback(data);
}
}.thunk;
}