connection.ConnectionOptions: finish hooking up callbacks

This was kind of tedious, but there were some interesting choices to be
made with regards to how the callback signatures should be adapted,
especially in the few cases that heavily use out-params in C. Ignoring
the lack of test coverage, this puts us pretty close to having the
entire basic API wrapped.
This commit is contained in:
torque 2023-08-19 17:15:29 -07:00
parent 3ff894cc0d
commit 39edf12d34
Signed by: torque
SSH Key Fingerprint: SHA256:nCrXefBNo6EbjNSQhv0nXmEg/VuNq3sMF5b8zETw3Tk
2 changed files with 320 additions and 71 deletions

View File

@ -346,16 +346,20 @@ const ConnectionOptions = opaque {
).raise(); ).raise();
} }
// needs a callback thunk pub fn setErrorHandler(
// typedef void (*natsErrHandler)( self: *ConnectionOptions,
// natsConnection *nc, natsSubscription *subscription, natsStatus err, comptime T: type,
// void *closure); comptime callback: *const ErrorHandlerCallbackSignature(T),
// NATS_EXTERN natsStatus natsOptions_SetErrorHandler(natsOptions *opts, natsErrHandler errHandler, void *closure); userdata: *T,
// pub fn setErrorHandler(self: *ConnectionOptions, max: c_int) Error!void { ) Error!void {
// return Status.fromInt( return Status.fromInt(
// nats_c.natsOptions_SetErrorHandler(@ptrCast(self), max), nats_c.natsOptions_SetErrorHandler(
// ).raise(); @ptrCast(self),
// } makeErrorHandlerCallbackThunk(T, callback),
userdata,
),
).raise();
}
pub fn setClosedCallback( pub fn setClosedCallback(
self: *ConnectionOptions, self: *ConnectionOptions,
@ -422,41 +426,25 @@ const ConnectionOptions = opaque {
)).raise(); )).raise();
} }
// needs a callback thunk pub fn setEventLoop(
// NATS_EXTERN natsStatus natsOptions_SetEventLoop(natsOptions *opts, void *loop, natsEvLoop_Attach attachCb, natsEvLoop_ReadAddRemove readCb, natsEvLoop_WriteAddRemove writeCb, natsEvLoop_Detach detachCb); self: *ConnectionOptions,
// typedef natsStatus (*natsEvLoop_Attach)( comptime T: type,
// void **userData, comptime L: type,
// void *loop, comptime attach_callback: *const AttachEventLoopCallbackSignature(T, L),
// natsConnection *nc, comptime read_callback: *const AttachEventLoopCallbackSignature(T),
// natsSock socket); comptime write_callback: *const AttachEventLoopCallbackSignature(T),
// typedef natsStatus (*natsEvLoop_ReadAddRemove)( comptime detach_callback: *const thunk.SimpleCallbackSignature(T),
// void *loop, loop: *L,
// bool add); ) Error!void {
// typedef natsStatus (*natsEvLoop_WriteAddRemove)( return Status.fromInt(nats_c.natsOptions_SetEventLoop(
// void *loop, @ptrCast(self),
// bool add); @ptrCast(loop),
// typedef natsStatus (*natsEvLoop_Detach)( makeAttachEventLoopCallbackThunk(T, L, attach_callback),
// void *loop); makeEventLoopAddRemoveCallbackThunk(T, read_callback),
makeEventLoopAddRemoveCallbackThunk(T, write_callback),
// pub fn setEventLoop( makeEventLoopDetachCallbackThunk(T, detach_callback),
// self: *ConnectionOptions, )).raise();
// comptime T: type, }
// comptime L: type,
// comptime attachCallback: *const AttachEventLoopCallbackSignature(T, L),
// comptime readCallback: *const AttachEventLoopCallbackSignature(T),
// comptime writeCallback: *const AttachEventLoopCallbackSignature(T),
// comptime detachCallback: *const thunk.SimpleCallbackSignature(T),
// loop: *L,
// ) Error!void {
// return Status.fromInt(nats_c.natsOptions_SetEventLoop(
// @ptrCast(self),
// @ptrCast(loop),
// makeAttachEventLoopCallbackThunk(T, L, attachCallback),
// makeEventLoopAddRemoveCallbackThunk(T, readCallback),
// makeEventLoopAddRemoveCallbackThunk(T, writeCallback),
// thunk.makeSimpleCallbackThunk(callback),
// )).raise();
// }
pub fn ignoreDiscoveredServers(self: *ConnectionOptions, ignore: bool) Error!void { pub fn ignoreDiscoveredServers(self: *ConnectionOptions, ignore: bool) Error!void {
return Status.fromInt( return Status.fromInt(
@ -508,23 +496,36 @@ const ConnectionOptions = opaque {
).raise(); ).raise();
} }
// thunkem pub fn setRetryOnFailedConnect(
// NATS_EXTERN natsStatus natsOptions_SetRetryOnFailedConnect(natsOptions *opts, bool retry, natsConnectionHandler connectedCb, void* closure); self: *ConnectionOptions,
// typedef void (*natsConnectionHandler)( comptime T: type,
// natsConnection *nc, void *closure); comptime callback: *const ConnectionCallbackSignature(T),
userdata: *T,
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetRetryOnFailedConnect(
@ptrCast(self),
makeConnectionCallbackThunk(T, callback),
userdata,
)).raise();
}
// 2 thunk 2 furious pub fn setUserCredentialsCallbacks(
// NATS_EXTERN natsStatus natsOptions_SetUserCredentialsCallbacks(natsOptions *opts, natsUserJWTHandler ujwtCB, void *ujwtClosure, natsSignatureHandler sigCB, void *sigClosure); self: *ConnectionOptions,
// typedef natsStatus (*natsUserJWTHandler)( comptime T: type,
// char **userJWT, comptime U: type,
// char **customErrTxt, comptime jwt_callback: *const JwtHandlerCallbackSignature(T),
// void *closure); comptime sig_callback: *const SignatureHandlerCallbackSignature(U),
// typedef natsStatus (*natsSignatureHandler)( jwt_userdata: *T,
// char **customErrTxt, sig_userdata: *U,
// unsigned char **signature, ) Error!void {
// int *signatureLength, return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks(
// const char *nonce, @ptrCast(self),
// void *closure); makeJwtHandlerCallbackThunk(T, jwt_callback),
jwt_userdata,
makeSignatureHandlerCallbackThunk(U, sig_callback),
sig_userdata,
)).raise();
}
pub fn setUserCredentialsFromFiles(self: *ConnectionOptions, user_or_chained_file: [:0]const u8, seed_file: [:0]const u8) Error!void { pub fn setUserCredentialsFromFiles(self: *ConnectionOptions, user_or_chained_file: [:0]const u8, seed_file: [:0]const u8) Error!void {
return Status.fromInt( return Status.fromInt(
@ -545,14 +546,20 @@ const ConnectionOptions = opaque {
).raise(); ).raise();
} }
// thunk pub fn setNKey(
// NATS_EXTERN natsStatus natsOptions_SetNKey(natsOptions *opts, const char *pubKey, natsSignatureHandler sigCB, void *sigClosure); self: *ConnectionOptions,
// typedef natsStatus (*natsSignatureHandler)( comptime T: type,
// char **customErrTxt, comptime sig_callback: *const SignatureHandlerCallbackSignature(T),
// unsigned char **signature, pub_key: [:0]const u8,
// int *signatureLength, sig_userdata: *T,
// const char *nonce, ) Error!void {
// void *closure); return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks(
@ptrCast(self),
pub_key.ptr,
makeSignatureHandlerCallbackThunk(T, sig_callback),
sig_userdata,
)).raise();
}
pub fn setNKeyFromSeed(self: *ConnectionOptions, pub_key: [:0]const u8, seed_file: [:0]const u8) Error!void { pub fn setNKeyFromSeed(self: *ConnectionOptions, pub_key: [:0]const u8, seed_file: [:0]const u8) Error!void {
return Status.fromInt( return Status.fromInt(
@ -623,10 +630,204 @@ pub fn makeReconnectDelayCallbackThunk(
conn: ?*nats_c.natsConnection, conn: ?*nats_c.natsConnection,
attempts: c_int, attempts: c_int,
userdata: ?*anyopaque, userdata: ?*anyopaque,
) callconv(.C) void { ) callconv(.C) i64 {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable; const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
callback(connection, attempts, data); return callback(connection, attempts, data);
}
}.thunk;
}
const ErrorHandlerCallback = fn (
?*nats_c.natsConnection,
?*nats_c.natsSubscription,
nats_c.natsStatus,
?*anyopaque,
) void;
pub fn ErrorHandlerCallbackSignature(comptime T: type) type {
return fn (*Connection, *Subscription, Status, *T) void;
}
pub fn makeErrorHandlerCallbackThunk(
comptime T: type,
comptime callback: *const ErrorHandlerCallbackSignature(T),
) *const ErrorHandlerCallback {
return struct {
fn thunk(
conn: ?*nats_c.natsConnection,
sub: ?*nats_c.natsSubscription,
status: nats_c.natsStatus,
userdata: ?*anyopaque,
) callconv(.C) void {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const subscription: *Subscription = if (sub) |s| @ptrCast(s) else unreachable;
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
callback(connection, subscription, Status.fromInt(status), data);
}
}.thunk;
}
// natsSock is an fd on non-windows and SOCKET on windows
const AttachEventLoopCallback = fn (
*?*anyopaque,
?*anyopaque,
?*nats_c.natsConnection,
nats_c.natsSock,
) nats_c.natsStatus;
pub fn AttachEventLoopCallbackSignature(comptime T: type, comptime L: type) type {
return fn (*L, *Connection, c_int) anyerror!*T;
}
pub fn makeAttachEventLoopCallbackThunk(
comptime T: type,
comptime L: type,
comptime callback: *const AttachEventLoopCallbackSignature(T, L),
) *const ReconnectDelayCallback {
return struct {
fn thunk(
userdata: *?*anyopaque,
loop: ?*anyopaque,
conn: ?*nats_c.natsConnection,
sock: ?*nats_c.natsSock,
) callconv(.C) nats_c.natsStatus {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const ev_loop: *L = if (loop) |l| @ptrCast(l) else unreachable;
userdata.* = callback(ev_loop, connection, sock) catch |err|
return Status.fromError(err).toInt();
return nats_c.NATS_OK;
}
}.thunk;
}
const EventLoopAddRemoveCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) nats_c.natsStatus;
pub fn EventLoopAddRemoveCallbackSignature(comptime T: type) type {
return fn (*Connection, c_int, *T) anyerror!void;
}
pub fn makeEventLoopAddRemoveCallbackThunk(
comptime T: type,
comptime callback: *const EventLoopAddRemoveCallbackSignature(T),
) *const ReconnectDelayCallback {
return struct {
fn thunk(
conn: ?*nats_c.natsConnection,
attempts: c_int,
userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
callback(connection, attempts, data) catch |err|
return Status.fromError(err).toInt();
return nats_c.NATS_OK;
}
}.thunk;
}
const EventLoopDetachCallback = fn (?*anyopaque) nats_c.natsStatus;
pub fn EventLoopDetachCallbackSignature(comptime T: type) type {
return fn (*T) anyerror!void;
}
pub fn makeEventLoopDetachCallbackThunk(
comptime T: type,
comptime callback: *const EventLoopDetachCallbackSignature(T),
) *const ReconnectDelayCallback {
return struct {
fn thunk(
userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus {
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
callback(data) catch |err| return Status.fromError(err).toInt();
return nats_c.NATS_OK;
}
}.thunk;
}
// THE NATS LIBRARY WILL TRY TO FREE THE TOKEN AND ALSO THE ERROR MESSAGE, SO THEY MUST
// BE ALLOCATED WITH THE C ALLOCATOR
const JwtHandlerCallback = fn (*?[*:0]u8, *?[*:0]u8, ?*anyopaque) nats_c.natsStatus;
const JwtResponseOrError = union(enum) {
jwt: [:0]u8,
error_message: [:0]u8,
};
pub fn JwtHandlerCallbackSignature(comptime T: type) type {
return fn (*T) JwtResponseOrError;
}
pub fn makeJwtHandlerCallbackThunk(
comptime T: type,
comptime callback: *const JwtHandlerCallbackSignature(T),
) *const ReconnectDelayCallback {
return struct {
fn thunk(
jwt_out: *?[*:0]u8,
err_out: *?[*:0]u8,
userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus {
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
switch (callback(data)) {
.jwt => |jwt| {
jwt_out.* = jwt.ptr;
return nats_c.NATS_OK;
},
.error_message => |msg| {
err_out.* = msg.ptr;
return nats_c.NATS_ERR;
},
}
}
}.thunk;
}
// THE NATS LIBRARY WILL TRY TO FREE THE SIGNATURE AND ALSO THE ERROR MESSAGE, SO THEY MUST
// BE ALLOCATED WITH THE C ALLOCATOR
const SignatureHandlerCallback = fn (*?[*:0]u8, *?[*]u8, *c_int, [*:0]const u8, ?*anyopaque) nats_c.natsStatus;
const SignatureResponseOrError = union(enum) {
signature: []u8,
error_message: [:0]u8,
};
pub fn SignatureHandlerCallbackSignature(comptime T: type) type {
return fn ([:0]const u8, *T) SignatureResponseOrError;
}
pub fn makeSignatureHandlerCallbackThunk(
comptime T: type,
comptime callback: *const SignatureHandlerCallbackSignature(T),
) *const ReconnectDelayCallback {
return struct {
fn thunk(
err_out: *?[*:0]u8,
sig_out: *?[*]u8,
sig_len_out: *c_int,
nonce: [*:0]const u8,
userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus {
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
switch (callback(std.mem.sliceTo(nonce, 0), data)) {
.signature => |sig| {
sig_out.* = sig.ptr;
sig_len_out.* = sig.len;
return nats_c.NATS_OK;
},
.error_message => |msg| {
err_out.* = msg.ptr;
return nats_c.NATS_ERR;
},
}
} }
}.thunk; }.thunk;
} }

View File

@ -74,6 +74,10 @@ pub const Status = enum(c_int) {
return @enumFromInt(int); return @enumFromInt(int);
} }
pub fn toInt(self: Status) c_uint {
return @intFromEnum(self);
}
pub fn description(self: Status) [:0]const u8 { pub fn description(self: Status) [:0]const u8 {
return std.mem.sliceTo(nats_c.natsStatus_GetText(self), 0); return std.mem.sliceTo(nats_c.natsStatus_GetText(self), 0);
} }
@ -163,6 +167,50 @@ pub const Status = enum(c_int) {
_ => Error.UnknownError, _ => Error.UnknownError,
}; };
} }
pub fn fromError(err: ?anyerror) Status {
return if (err) |e|
switch (e) {
Error.ProtocolError => .protocol_error,
Error.IoError => .io_error,
Error.LineTooLong => .line_too_long,
Error.ConnectionClosed => .connection_closed,
Error.NoServer => .no_server,
Error.StaleConnection => .stale_connection,
Error.SecureConnectionWanted => .secure_connection_wanted,
Error.SecureConnectionRequired => .secure_connection_required,
Error.ConnectionDisconnected => .connection_disconnected,
Error.ConnectionAuthFailed => .connection_auth_failed,
Error.NotPermitted => .not_permitted,
Error.NotFound => .not_found,
Error.AddressMissing => .address_missing,
Error.InvalidSubject => .invalid_subject,
Error.InvalidArg => .invalid_arg,
Error.InvalidSubscription => .invalid_subscription,
Error.InvalidTimeout => .invalid_timeout,
Error.IllegalState => .illegal_state,
Error.SlowConsumer => .slow_consumer,
Error.MaxPayload => .max_payload,
Error.MaxDeliveredMsgs => .max_delivered_msgs,
Error.InsufficientBuffer => .insufficient_buffer,
Error.NoMemory => .no_memory,
Error.SysError => .sys_error,
Error.Timeout => .timeout,
Error.FailedToInitialize => .failed_to_initialize,
Error.NotInitialized => .not_initialized,
Error.SslError => .ssl_error,
Error.NoServerSupport => .no_server_support,
Error.NotYetConnected => .not_yet_connected,
Error.Draining => .draining,
Error.InvalidQueueName => .invalid_queue_name,
Error.NoResponders => .no_responders,
Error.Mismatch => .mismatch,
Error.MissedHeartbeat => .missed_heartbeat,
else => .generic_error,
}
else
.okay;
}
}; };
pub const Error = error{ pub const Error = error{