From 39edf12d349f0744263ef4ae594aa0223f0e45fa Mon Sep 17 00:00:00 2001 From: torque Date: Sat, 19 Aug 2023 17:15:29 -0700 Subject: [PATCH] connection.ConnectionOptions: finish hooking up callbacks This was kind of tedious, but there were some interesting choices to be made with regards to how the callback signatures should be adapted, especially in the few cases that heavily use out-params in C. Ignoring the lack of test coverage, this puts us pretty close to having the entire basic API wrapped. --- src/connection.zig | 343 +++++++++++++++++++++++++++++++++++---------- src/error.zig | 48 +++++++ 2 files changed, 320 insertions(+), 71 deletions(-) diff --git a/src/connection.zig b/src/connection.zig index 671aa62..5f55903 100644 --- a/src/connection.zig +++ b/src/connection.zig @@ -346,16 +346,20 @@ const ConnectionOptions = opaque { ).raise(); } - // needs a callback thunk - // typedef void (*natsErrHandler)( - // natsConnection *nc, natsSubscription *subscription, natsStatus err, - // void *closure); - // NATS_EXTERN natsStatus natsOptions_SetErrorHandler(natsOptions *opts, natsErrHandler errHandler, void *closure); - // pub fn setErrorHandler(self: *ConnectionOptions, max: c_int) Error!void { - // return Status.fromInt( - // nats_c.natsOptions_SetErrorHandler(@ptrCast(self), max), - // ).raise(); - // } + pub fn setErrorHandler( + self: *ConnectionOptions, + comptime T: type, + comptime callback: *const ErrorHandlerCallbackSignature(T), + userdata: *T, + ) Error!void { + return Status.fromInt( + nats_c.natsOptions_SetErrorHandler( + @ptrCast(self), + makeErrorHandlerCallbackThunk(T, callback), + userdata, + ), + ).raise(); + } pub fn setClosedCallback( self: *ConnectionOptions, @@ -422,41 +426,25 @@ const ConnectionOptions = opaque { )).raise(); } - // needs a callback thunk - // NATS_EXTERN natsStatus natsOptions_SetEventLoop(natsOptions *opts, void *loop, natsEvLoop_Attach attachCb, natsEvLoop_ReadAddRemove readCb, natsEvLoop_WriteAddRemove writeCb, natsEvLoop_Detach detachCb); - // typedef natsStatus (*natsEvLoop_Attach)( - // void **userData, - // void *loop, - // natsConnection *nc, - // natsSock socket); - // typedef natsStatus (*natsEvLoop_ReadAddRemove)( - // void *loop, - // bool add); - // typedef natsStatus (*natsEvLoop_WriteAddRemove)( - // void *loop, - // bool add); - // typedef natsStatus (*natsEvLoop_Detach)( - // void *loop); - - // pub fn setEventLoop( - // self: *ConnectionOptions, - // comptime T: type, - // comptime L: type, - // comptime attachCallback: *const AttachEventLoopCallbackSignature(T, L), - // comptime readCallback: *const AttachEventLoopCallbackSignature(T), - // comptime writeCallback: *const AttachEventLoopCallbackSignature(T), - // comptime detachCallback: *const thunk.SimpleCallbackSignature(T), - // loop: *L, - // ) Error!void { - // return Status.fromInt(nats_c.natsOptions_SetEventLoop( - // @ptrCast(self), - // @ptrCast(loop), - // makeAttachEventLoopCallbackThunk(T, L, attachCallback), - // makeEventLoopAddRemoveCallbackThunk(T, readCallback), - // makeEventLoopAddRemoveCallbackThunk(T, writeCallback), - // thunk.makeSimpleCallbackThunk(callback), - // )).raise(); - // } + pub fn setEventLoop( + self: *ConnectionOptions, + comptime T: type, + comptime L: type, + comptime attach_callback: *const AttachEventLoopCallbackSignature(T, L), + comptime read_callback: *const AttachEventLoopCallbackSignature(T), + comptime write_callback: *const AttachEventLoopCallbackSignature(T), + comptime detach_callback: *const thunk.SimpleCallbackSignature(T), + loop: *L, + ) Error!void { + return Status.fromInt(nats_c.natsOptions_SetEventLoop( + @ptrCast(self), + @ptrCast(loop), + makeAttachEventLoopCallbackThunk(T, L, attach_callback), + makeEventLoopAddRemoveCallbackThunk(T, read_callback), + makeEventLoopAddRemoveCallbackThunk(T, write_callback), + makeEventLoopDetachCallbackThunk(T, detach_callback), + )).raise(); + } pub fn ignoreDiscoveredServers(self: *ConnectionOptions, ignore: bool) Error!void { return Status.fromInt( @@ -508,23 +496,36 @@ const ConnectionOptions = opaque { ).raise(); } - // thunkem - // NATS_EXTERN natsStatus natsOptions_SetRetryOnFailedConnect(natsOptions *opts, bool retry, natsConnectionHandler connectedCb, void* closure); - // typedef void (*natsConnectionHandler)( - // natsConnection *nc, void *closure); + pub fn setRetryOnFailedConnect( + self: *ConnectionOptions, + comptime T: type, + comptime callback: *const ConnectionCallbackSignature(T), + userdata: *T, + ) Error!void { + return Status.fromInt(nats_c.natsOptions_SetRetryOnFailedConnect( + @ptrCast(self), + makeConnectionCallbackThunk(T, callback), + userdata, + )).raise(); + } - // 2 thunk 2 furious - // NATS_EXTERN natsStatus natsOptions_SetUserCredentialsCallbacks(natsOptions *opts, natsUserJWTHandler ujwtCB, void *ujwtClosure, natsSignatureHandler sigCB, void *sigClosure); - // typedef natsStatus (*natsUserJWTHandler)( - // char **userJWT, - // char **customErrTxt, - // void *closure); - // typedef natsStatus (*natsSignatureHandler)( - // char **customErrTxt, - // unsigned char **signature, - // int *signatureLength, - // const char *nonce, - // void *closure); + pub fn setUserCredentialsCallbacks( + self: *ConnectionOptions, + comptime T: type, + comptime U: type, + comptime jwt_callback: *const JwtHandlerCallbackSignature(T), + comptime sig_callback: *const SignatureHandlerCallbackSignature(U), + jwt_userdata: *T, + sig_userdata: *U, + ) Error!void { + return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks( + @ptrCast(self), + makeJwtHandlerCallbackThunk(T, jwt_callback), + jwt_userdata, + makeSignatureHandlerCallbackThunk(U, sig_callback), + sig_userdata, + )).raise(); + } pub fn setUserCredentialsFromFiles(self: *ConnectionOptions, user_or_chained_file: [:0]const u8, seed_file: [:0]const u8) Error!void { return Status.fromInt( @@ -545,14 +546,20 @@ const ConnectionOptions = opaque { ).raise(); } - // thunk - // NATS_EXTERN natsStatus natsOptions_SetNKey(natsOptions *opts, const char *pubKey, natsSignatureHandler sigCB, void *sigClosure); - // typedef natsStatus (*natsSignatureHandler)( - // char **customErrTxt, - // unsigned char **signature, - // int *signatureLength, - // const char *nonce, - // void *closure); + pub fn setNKey( + self: *ConnectionOptions, + comptime T: type, + comptime sig_callback: *const SignatureHandlerCallbackSignature(T), + pub_key: [:0]const u8, + sig_userdata: *T, + ) Error!void { + return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks( + @ptrCast(self), + pub_key.ptr, + makeSignatureHandlerCallbackThunk(T, sig_callback), + sig_userdata, + )).raise(); + } pub fn setNKeyFromSeed(self: *ConnectionOptions, pub_key: [:0]const u8, seed_file: [:0]const u8) Error!void { return Status.fromInt( @@ -623,10 +630,204 @@ pub fn makeReconnectDelayCallbackThunk( conn: ?*nats_c.natsConnection, attempts: c_int, userdata: ?*anyopaque, - ) callconv(.C) void { + ) callconv(.C) i64 { const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable; const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; - callback(connection, attempts, data); + return callback(connection, attempts, data); + } + }.thunk; +} + +const ErrorHandlerCallback = fn ( + ?*nats_c.natsConnection, + ?*nats_c.natsSubscription, + nats_c.natsStatus, + ?*anyopaque, +) void; + +pub fn ErrorHandlerCallbackSignature(comptime T: type) type { + return fn (*Connection, *Subscription, Status, *T) void; +} + +pub fn makeErrorHandlerCallbackThunk( + comptime T: type, + comptime callback: *const ErrorHandlerCallbackSignature(T), +) *const ErrorHandlerCallback { + return struct { + fn thunk( + conn: ?*nats_c.natsConnection, + sub: ?*nats_c.natsSubscription, + status: nats_c.natsStatus, + userdata: ?*anyopaque, + ) callconv(.C) void { + const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable; + const subscription: *Subscription = if (sub) |s| @ptrCast(s) else unreachable; + const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; + + callback(connection, subscription, Status.fromInt(status), data); + } + }.thunk; +} + +// natsSock is an fd on non-windows and SOCKET on windows +const AttachEventLoopCallback = fn ( + *?*anyopaque, + ?*anyopaque, + ?*nats_c.natsConnection, + nats_c.natsSock, +) nats_c.natsStatus; + +pub fn AttachEventLoopCallbackSignature(comptime T: type, comptime L: type) type { + return fn (*L, *Connection, c_int) anyerror!*T; +} + +pub fn makeAttachEventLoopCallbackThunk( + comptime T: type, + comptime L: type, + comptime callback: *const AttachEventLoopCallbackSignature(T, L), +) *const ReconnectDelayCallback { + return struct { + fn thunk( + userdata: *?*anyopaque, + loop: ?*anyopaque, + conn: ?*nats_c.natsConnection, + sock: ?*nats_c.natsSock, + ) callconv(.C) nats_c.natsStatus { + const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable; + const ev_loop: *L = if (loop) |l| @ptrCast(l) else unreachable; + + userdata.* = callback(ev_loop, connection, sock) catch |err| + return Status.fromError(err).toInt(); + + return nats_c.NATS_OK; + } + }.thunk; +} + +const EventLoopAddRemoveCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) nats_c.natsStatus; + +pub fn EventLoopAddRemoveCallbackSignature(comptime T: type) type { + return fn (*Connection, c_int, *T) anyerror!void; +} + +pub fn makeEventLoopAddRemoveCallbackThunk( + comptime T: type, + comptime callback: *const EventLoopAddRemoveCallbackSignature(T), +) *const ReconnectDelayCallback { + return struct { + fn thunk( + conn: ?*nats_c.natsConnection, + attempts: c_int, + userdata: ?*anyopaque, + ) callconv(.C) nats_c.natsStatus { + const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable; + const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; + callback(connection, attempts, data) catch |err| + return Status.fromError(err).toInt(); + + return nats_c.NATS_OK; + } + }.thunk; +} + +const EventLoopDetachCallback = fn (?*anyopaque) nats_c.natsStatus; + +pub fn EventLoopDetachCallbackSignature(comptime T: type) type { + return fn (*T) anyerror!void; +} + +pub fn makeEventLoopDetachCallbackThunk( + comptime T: type, + comptime callback: *const EventLoopDetachCallbackSignature(T), +) *const ReconnectDelayCallback { + return struct { + fn thunk( + userdata: ?*anyopaque, + ) callconv(.C) nats_c.natsStatus { + const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; + callback(data) catch |err| return Status.fromError(err).toInt(); + return nats_c.NATS_OK; + } + }.thunk; +} + +// THE NATS LIBRARY WILL TRY TO FREE THE TOKEN AND ALSO THE ERROR MESSAGE, SO THEY MUST +// BE ALLOCATED WITH THE C ALLOCATOR +const JwtHandlerCallback = fn (*?[*:0]u8, *?[*:0]u8, ?*anyopaque) nats_c.natsStatus; + +const JwtResponseOrError = union(enum) { + jwt: [:0]u8, + error_message: [:0]u8, +}; + +pub fn JwtHandlerCallbackSignature(comptime T: type) type { + return fn (*T) JwtResponseOrError; +} + +pub fn makeJwtHandlerCallbackThunk( + comptime T: type, + comptime callback: *const JwtHandlerCallbackSignature(T), +) *const ReconnectDelayCallback { + return struct { + fn thunk( + jwt_out: *?[*:0]u8, + err_out: *?[*:0]u8, + userdata: ?*anyopaque, + ) callconv(.C) nats_c.natsStatus { + const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; + + switch (callback(data)) { + .jwt => |jwt| { + jwt_out.* = jwt.ptr; + return nats_c.NATS_OK; + }, + .error_message => |msg| { + err_out.* = msg.ptr; + return nats_c.NATS_ERR; + }, + } + } + }.thunk; +} + +// THE NATS LIBRARY WILL TRY TO FREE THE SIGNATURE AND ALSO THE ERROR MESSAGE, SO THEY MUST +// BE ALLOCATED WITH THE C ALLOCATOR +const SignatureHandlerCallback = fn (*?[*:0]u8, *?[*]u8, *c_int, [*:0]const u8, ?*anyopaque) nats_c.natsStatus; + +const SignatureResponseOrError = union(enum) { + signature: []u8, + error_message: [:0]u8, +}; + +pub fn SignatureHandlerCallbackSignature(comptime T: type) type { + return fn ([:0]const u8, *T) SignatureResponseOrError; +} + +pub fn makeSignatureHandlerCallbackThunk( + comptime T: type, + comptime callback: *const SignatureHandlerCallbackSignature(T), +) *const ReconnectDelayCallback { + return struct { + fn thunk( + err_out: *?[*:0]u8, + sig_out: *?[*]u8, + sig_len_out: *c_int, + nonce: [*:0]const u8, + userdata: ?*anyopaque, + ) callconv(.C) nats_c.natsStatus { + const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; + + switch (callback(std.mem.sliceTo(nonce, 0), data)) { + .signature => |sig| { + sig_out.* = sig.ptr; + sig_len_out.* = sig.len; + return nats_c.NATS_OK; + }, + .error_message => |msg| { + err_out.* = msg.ptr; + return nats_c.NATS_ERR; + }, + } } }.thunk; } diff --git a/src/error.zig b/src/error.zig index fa6fc35..1290900 100644 --- a/src/error.zig +++ b/src/error.zig @@ -74,6 +74,10 @@ pub const Status = enum(c_int) { return @enumFromInt(int); } + pub fn toInt(self: Status) c_uint { + return @intFromEnum(self); + } + pub fn description(self: Status) [:0]const u8 { return std.mem.sliceTo(nats_c.natsStatus_GetText(self), 0); } @@ -163,6 +167,50 @@ pub const Status = enum(c_int) { _ => Error.UnknownError, }; } + + pub fn fromError(err: ?anyerror) Status { + return if (err) |e| + switch (e) { + Error.ProtocolError => .protocol_error, + Error.IoError => .io_error, + Error.LineTooLong => .line_too_long, + Error.ConnectionClosed => .connection_closed, + Error.NoServer => .no_server, + Error.StaleConnection => .stale_connection, + Error.SecureConnectionWanted => .secure_connection_wanted, + Error.SecureConnectionRequired => .secure_connection_required, + Error.ConnectionDisconnected => .connection_disconnected, + Error.ConnectionAuthFailed => .connection_auth_failed, + Error.NotPermitted => .not_permitted, + Error.NotFound => .not_found, + Error.AddressMissing => .address_missing, + Error.InvalidSubject => .invalid_subject, + Error.InvalidArg => .invalid_arg, + Error.InvalidSubscription => .invalid_subscription, + Error.InvalidTimeout => .invalid_timeout, + Error.IllegalState => .illegal_state, + Error.SlowConsumer => .slow_consumer, + Error.MaxPayload => .max_payload, + Error.MaxDeliveredMsgs => .max_delivered_msgs, + Error.InsufficientBuffer => .insufficient_buffer, + Error.NoMemory => .no_memory, + Error.SysError => .sys_error, + Error.Timeout => .timeout, + Error.FailedToInitialize => .failed_to_initialize, + Error.NotInitialized => .not_initialized, + Error.SslError => .ssl_error, + Error.NoServerSupport => .no_server_support, + Error.NotYetConnected => .not_yet_connected, + Error.Draining => .draining, + Error.InvalidQueueName => .invalid_queue_name, + Error.NoResponders => .no_responders, + Error.Mismatch => .mismatch, + Error.MissedHeartbeat => .missed_heartbeat, + else => .generic_error, + } + else + .okay; + } }; pub const Error = error{