diff --git a/src/connection.zig b/src/connection.zig index 671aa62..5f55903 100644 --- a/src/connection.zig +++ b/src/connection.zig @@ -346,16 +346,20 @@ const ConnectionOptions = opaque { ).raise(); } - // needs a callback thunk - // typedef void (*natsErrHandler)( - // natsConnection *nc, natsSubscription *subscription, natsStatus err, - // void *closure); - // NATS_EXTERN natsStatus natsOptions_SetErrorHandler(natsOptions *opts, natsErrHandler errHandler, void *closure); - // pub fn setErrorHandler(self: *ConnectionOptions, max: c_int) Error!void { - // return Status.fromInt( - // nats_c.natsOptions_SetErrorHandler(@ptrCast(self), max), - // ).raise(); - // } + pub fn setErrorHandler( + self: *ConnectionOptions, + comptime T: type, + comptime callback: *const ErrorHandlerCallbackSignature(T), + userdata: *T, + ) Error!void { + return Status.fromInt( + nats_c.natsOptions_SetErrorHandler( + @ptrCast(self), + makeErrorHandlerCallbackThunk(T, callback), + userdata, + ), + ).raise(); + } pub fn setClosedCallback( self: *ConnectionOptions, @@ -422,41 +426,25 @@ const ConnectionOptions = opaque { )).raise(); } - // needs a callback thunk - // NATS_EXTERN natsStatus natsOptions_SetEventLoop(natsOptions *opts, void *loop, natsEvLoop_Attach attachCb, natsEvLoop_ReadAddRemove readCb, natsEvLoop_WriteAddRemove writeCb, natsEvLoop_Detach detachCb); - // typedef natsStatus (*natsEvLoop_Attach)( - // void **userData, - // void *loop, - // natsConnection *nc, - // natsSock socket); - // typedef natsStatus (*natsEvLoop_ReadAddRemove)( - // void *loop, - // bool add); - // typedef natsStatus (*natsEvLoop_WriteAddRemove)( - // void *loop, - // bool add); - // typedef natsStatus (*natsEvLoop_Detach)( - // void *loop); - - // pub fn setEventLoop( - // self: *ConnectionOptions, - // comptime T: type, - // comptime L: type, - // comptime attachCallback: *const AttachEventLoopCallbackSignature(T, L), - // comptime readCallback: *const AttachEventLoopCallbackSignature(T), - // comptime writeCallback: *const AttachEventLoopCallbackSignature(T), - // comptime detachCallback: *const thunk.SimpleCallbackSignature(T), - // loop: *L, - // ) Error!void { - // return Status.fromInt(nats_c.natsOptions_SetEventLoop( - // @ptrCast(self), - // @ptrCast(loop), - // makeAttachEventLoopCallbackThunk(T, L, attachCallback), - // makeEventLoopAddRemoveCallbackThunk(T, readCallback), - // makeEventLoopAddRemoveCallbackThunk(T, writeCallback), - // thunk.makeSimpleCallbackThunk(callback), - // )).raise(); - // } + pub fn setEventLoop( + self: *ConnectionOptions, + comptime T: type, + comptime L: type, + comptime attach_callback: *const AttachEventLoopCallbackSignature(T, L), + comptime read_callback: *const AttachEventLoopCallbackSignature(T), + comptime write_callback: *const AttachEventLoopCallbackSignature(T), + comptime detach_callback: *const thunk.SimpleCallbackSignature(T), + loop: *L, + ) Error!void { + return Status.fromInt(nats_c.natsOptions_SetEventLoop( + @ptrCast(self), + @ptrCast(loop), + makeAttachEventLoopCallbackThunk(T, L, attach_callback), + makeEventLoopAddRemoveCallbackThunk(T, read_callback), + makeEventLoopAddRemoveCallbackThunk(T, write_callback), + makeEventLoopDetachCallbackThunk(T, detach_callback), + )).raise(); + } pub fn ignoreDiscoveredServers(self: *ConnectionOptions, ignore: bool) Error!void { return Status.fromInt( @@ -508,23 +496,36 @@ const ConnectionOptions = opaque { ).raise(); } - // thunkem - // NATS_EXTERN natsStatus natsOptions_SetRetryOnFailedConnect(natsOptions *opts, bool retry, natsConnectionHandler connectedCb, void* closure); - // typedef void (*natsConnectionHandler)( - // natsConnection *nc, void *closure); + pub fn setRetryOnFailedConnect( + self: *ConnectionOptions, + comptime T: type, + comptime callback: *const ConnectionCallbackSignature(T), + userdata: *T, + ) Error!void { + return Status.fromInt(nats_c.natsOptions_SetRetryOnFailedConnect( + @ptrCast(self), + makeConnectionCallbackThunk(T, callback), + userdata, + )).raise(); + } - // 2 thunk 2 furious - // NATS_EXTERN natsStatus natsOptions_SetUserCredentialsCallbacks(natsOptions *opts, natsUserJWTHandler ujwtCB, void *ujwtClosure, natsSignatureHandler sigCB, void *sigClosure); - // typedef natsStatus (*natsUserJWTHandler)( - // char **userJWT, - // char **customErrTxt, - // void *closure); - // typedef natsStatus (*natsSignatureHandler)( - // char **customErrTxt, - // unsigned char **signature, - // int *signatureLength, - // const char *nonce, - // void *closure); + pub fn setUserCredentialsCallbacks( + self: *ConnectionOptions, + comptime T: type, + comptime U: type, + comptime jwt_callback: *const JwtHandlerCallbackSignature(T), + comptime sig_callback: *const SignatureHandlerCallbackSignature(U), + jwt_userdata: *T, + sig_userdata: *U, + ) Error!void { + return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks( + @ptrCast(self), + makeJwtHandlerCallbackThunk(T, jwt_callback), + jwt_userdata, + makeSignatureHandlerCallbackThunk(U, sig_callback), + sig_userdata, + )).raise(); + } pub fn setUserCredentialsFromFiles(self: *ConnectionOptions, user_or_chained_file: [:0]const u8, seed_file: [:0]const u8) Error!void { return Status.fromInt( @@ -545,14 +546,20 @@ const ConnectionOptions = opaque { ).raise(); } - // thunk - // NATS_EXTERN natsStatus natsOptions_SetNKey(natsOptions *opts, const char *pubKey, natsSignatureHandler sigCB, void *sigClosure); - // typedef natsStatus (*natsSignatureHandler)( - // char **customErrTxt, - // unsigned char **signature, - // int *signatureLength, - // const char *nonce, - // void *closure); + pub fn setNKey( + self: *ConnectionOptions, + comptime T: type, + comptime sig_callback: *const SignatureHandlerCallbackSignature(T), + pub_key: [:0]const u8, + sig_userdata: *T, + ) Error!void { + return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks( + @ptrCast(self), + pub_key.ptr, + makeSignatureHandlerCallbackThunk(T, sig_callback), + sig_userdata, + )).raise(); + } pub fn setNKeyFromSeed(self: *ConnectionOptions, pub_key: [:0]const u8, seed_file: [:0]const u8) Error!void { return Status.fromInt( @@ -623,10 +630,204 @@ pub fn makeReconnectDelayCallbackThunk( conn: ?*nats_c.natsConnection, attempts: c_int, userdata: ?*anyopaque, - ) callconv(.C) void { + ) callconv(.C) i64 { const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable; const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; - callback(connection, attempts, data); + return callback(connection, attempts, data); + } + }.thunk; +} + +const ErrorHandlerCallback = fn ( + ?*nats_c.natsConnection, + ?*nats_c.natsSubscription, + nats_c.natsStatus, + ?*anyopaque, +) void; + +pub fn ErrorHandlerCallbackSignature(comptime T: type) type { + return fn (*Connection, *Subscription, Status, *T) void; +} + +pub fn makeErrorHandlerCallbackThunk( + comptime T: type, + comptime callback: *const ErrorHandlerCallbackSignature(T), +) *const ErrorHandlerCallback { + return struct { + fn thunk( + conn: ?*nats_c.natsConnection, + sub: ?*nats_c.natsSubscription, + status: nats_c.natsStatus, + userdata: ?*anyopaque, + ) callconv(.C) void { + const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable; + const subscription: *Subscription = if (sub) |s| @ptrCast(s) else unreachable; + const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; + + callback(connection, subscription, Status.fromInt(status), data); + } + }.thunk; +} + +// natsSock is an fd on non-windows and SOCKET on windows +const AttachEventLoopCallback = fn ( + *?*anyopaque, + ?*anyopaque, + ?*nats_c.natsConnection, + nats_c.natsSock, +) nats_c.natsStatus; + +pub fn AttachEventLoopCallbackSignature(comptime T: type, comptime L: type) type { + return fn (*L, *Connection, c_int) anyerror!*T; +} + +pub fn makeAttachEventLoopCallbackThunk( + comptime T: type, + comptime L: type, + comptime callback: *const AttachEventLoopCallbackSignature(T, L), +) *const ReconnectDelayCallback { + return struct { + fn thunk( + userdata: *?*anyopaque, + loop: ?*anyopaque, + conn: ?*nats_c.natsConnection, + sock: ?*nats_c.natsSock, + ) callconv(.C) nats_c.natsStatus { + const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable; + const ev_loop: *L = if (loop) |l| @ptrCast(l) else unreachable; + + userdata.* = callback(ev_loop, connection, sock) catch |err| + return Status.fromError(err).toInt(); + + return nats_c.NATS_OK; + } + }.thunk; +} + +const EventLoopAddRemoveCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) nats_c.natsStatus; + +pub fn EventLoopAddRemoveCallbackSignature(comptime T: type) type { + return fn (*Connection, c_int, *T) anyerror!void; +} + +pub fn makeEventLoopAddRemoveCallbackThunk( + comptime T: type, + comptime callback: *const EventLoopAddRemoveCallbackSignature(T), +) *const ReconnectDelayCallback { + return struct { + fn thunk( + conn: ?*nats_c.natsConnection, + attempts: c_int, + userdata: ?*anyopaque, + ) callconv(.C) nats_c.natsStatus { + const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable; + const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; + callback(connection, attempts, data) catch |err| + return Status.fromError(err).toInt(); + + return nats_c.NATS_OK; + } + }.thunk; +} + +const EventLoopDetachCallback = fn (?*anyopaque) nats_c.natsStatus; + +pub fn EventLoopDetachCallbackSignature(comptime T: type) type { + return fn (*T) anyerror!void; +} + +pub fn makeEventLoopDetachCallbackThunk( + comptime T: type, + comptime callback: *const EventLoopDetachCallbackSignature(T), +) *const ReconnectDelayCallback { + return struct { + fn thunk( + userdata: ?*anyopaque, + ) callconv(.C) nats_c.natsStatus { + const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; + callback(data) catch |err| return Status.fromError(err).toInt(); + return nats_c.NATS_OK; + } + }.thunk; +} + +// THE NATS LIBRARY WILL TRY TO FREE THE TOKEN AND ALSO THE ERROR MESSAGE, SO THEY MUST +// BE ALLOCATED WITH THE C ALLOCATOR +const JwtHandlerCallback = fn (*?[*:0]u8, *?[*:0]u8, ?*anyopaque) nats_c.natsStatus; + +const JwtResponseOrError = union(enum) { + jwt: [:0]u8, + error_message: [:0]u8, +}; + +pub fn JwtHandlerCallbackSignature(comptime T: type) type { + return fn (*T) JwtResponseOrError; +} + +pub fn makeJwtHandlerCallbackThunk( + comptime T: type, + comptime callback: *const JwtHandlerCallbackSignature(T), +) *const ReconnectDelayCallback { + return struct { + fn thunk( + jwt_out: *?[*:0]u8, + err_out: *?[*:0]u8, + userdata: ?*anyopaque, + ) callconv(.C) nats_c.natsStatus { + const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; + + switch (callback(data)) { + .jwt => |jwt| { + jwt_out.* = jwt.ptr; + return nats_c.NATS_OK; + }, + .error_message => |msg| { + err_out.* = msg.ptr; + return nats_c.NATS_ERR; + }, + } + } + }.thunk; +} + +// THE NATS LIBRARY WILL TRY TO FREE THE SIGNATURE AND ALSO THE ERROR MESSAGE, SO THEY MUST +// BE ALLOCATED WITH THE C ALLOCATOR +const SignatureHandlerCallback = fn (*?[*:0]u8, *?[*]u8, *c_int, [*:0]const u8, ?*anyopaque) nats_c.natsStatus; + +const SignatureResponseOrError = union(enum) { + signature: []u8, + error_message: [:0]u8, +}; + +pub fn SignatureHandlerCallbackSignature(comptime T: type) type { + return fn ([:0]const u8, *T) SignatureResponseOrError; +} + +pub fn makeSignatureHandlerCallbackThunk( + comptime T: type, + comptime callback: *const SignatureHandlerCallbackSignature(T), +) *const ReconnectDelayCallback { + return struct { + fn thunk( + err_out: *?[*:0]u8, + sig_out: *?[*]u8, + sig_len_out: *c_int, + nonce: [*:0]const u8, + userdata: ?*anyopaque, + ) callconv(.C) nats_c.natsStatus { + const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; + + switch (callback(std.mem.sliceTo(nonce, 0), data)) { + .signature => |sig| { + sig_out.* = sig.ptr; + sig_len_out.* = sig.len; + return nats_c.NATS_OK; + }, + .error_message => |msg| { + err_out.* = msg.ptr; + return nats_c.NATS_ERR; + }, + } } }.thunk; } diff --git a/src/error.zig b/src/error.zig index fa6fc35..1290900 100644 --- a/src/error.zig +++ b/src/error.zig @@ -74,6 +74,10 @@ pub const Status = enum(c_int) { return @enumFromInt(int); } + pub fn toInt(self: Status) c_uint { + return @intFromEnum(self); + } + pub fn description(self: Status) [:0]const u8 { return std.mem.sliceTo(nats_c.natsStatus_GetText(self), 0); } @@ -163,6 +167,50 @@ pub const Status = enum(c_int) { _ => Error.UnknownError, }; } + + pub fn fromError(err: ?anyerror) Status { + return if (err) |e| + switch (e) { + Error.ProtocolError => .protocol_error, + Error.IoError => .io_error, + Error.LineTooLong => .line_too_long, + Error.ConnectionClosed => .connection_closed, + Error.NoServer => .no_server, + Error.StaleConnection => .stale_connection, + Error.SecureConnectionWanted => .secure_connection_wanted, + Error.SecureConnectionRequired => .secure_connection_required, + Error.ConnectionDisconnected => .connection_disconnected, + Error.ConnectionAuthFailed => .connection_auth_failed, + Error.NotPermitted => .not_permitted, + Error.NotFound => .not_found, + Error.AddressMissing => .address_missing, + Error.InvalidSubject => .invalid_subject, + Error.InvalidArg => .invalid_arg, + Error.InvalidSubscription => .invalid_subscription, + Error.InvalidTimeout => .invalid_timeout, + Error.IllegalState => .illegal_state, + Error.SlowConsumer => .slow_consumer, + Error.MaxPayload => .max_payload, + Error.MaxDeliveredMsgs => .max_delivered_msgs, + Error.InsufficientBuffer => .insufficient_buffer, + Error.NoMemory => .no_memory, + Error.SysError => .sys_error, + Error.Timeout => .timeout, + Error.FailedToInitialize => .failed_to_initialize, + Error.NotInitialized => .not_initialized, + Error.SslError => .ssl_error, + Error.NoServerSupport => .no_server_support, + Error.NotYetConnected => .not_yet_connected, + Error.Draining => .draining, + Error.InvalidQueueName => .invalid_queue_name, + Error.NoResponders => .no_responders, + Error.Mismatch => .mismatch, + Error.MissedHeartbeat => .missed_heartbeat, + else => .generic_error, + } + else + .okay; + } }; pub const Error = error{