diff --git a/src/subscription.zig b/src/subscription.zig index 3adba8e..a01522c 100644 --- a/src/subscription.zig +++ b/src/subscription.zig @@ -174,12 +174,12 @@ pub const Subscription = opaque { self: *Subscription, comptime T: type, comptime callback: *const thunk.SimpleCallbackThunkSignature(T), - userdata: *T, + userdata: T, ) Error!void { return Status.fromInt(nats_c.natsSubscription_SetOnCompleteCB( @ptrCast(self), - thunk.makeSimpleCallbackThunk(callback), - @constCast(userdata), + thunk.makeSimpleCallbackThunk(T, callback), + @constCast(@ptrCast(userdata)), )).raise(); } }; diff --git a/src/thunk.zig b/src/thunk.zig index bfd1f2c..956fbd9 100644 --- a/src/thunk.zig +++ b/src/thunk.zig @@ -57,7 +57,7 @@ pub fn makeSimpleCallbackThunk( comptime checkUserDataType(T); return struct { fn thunk(userdata: ?*anyopaque) callconv(.C) void { - const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable; + const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable; callback(data); } }.thunk; diff --git a/tests/subscription.zig b/tests/subscription.zig index cb2b8e7..b2feab8 100644 --- a/tests/subscription.zig +++ b/tests/subscription.zig @@ -93,6 +93,10 @@ fn onMessage( } else @panic("HOW"); } +fn onClose(userdata: *[]const u8) void { + userdata.* = "closed"; +} + test "nats.Subscription (async)" { var server = try util.TestServer.launch(.{}); defer server.stop(); @@ -111,15 +115,24 @@ test "nats.Subscription (async)" { defer message.destroy(); { - const count: u32 = 0; - const subscription = try connection.subscribe(*const u32, message_subject, onMessage, &count); - defer subscription.destroy(); + var closed: []const u8 = "test"; + { + const count: u32 = 0; + const subscription = try connection.subscribe(*const u32, message_subject, onMessage, &count); + defer subscription.destroy(); - const response = try connection.requestMessage(message, 1000); - try std.testing.expectEqualStrings( - "greetings", - response.getData() orelse return error.TestUnexpectedResult, - ); + try subscription.setCompletionCallback(*[]const u8, onClose, &closed); + + const response = try connection.requestMessage(message, 1000); + try std.testing.expectEqualStrings( + "greetings", + response.getData() orelse return error.TestUnexpectedResult, + ); + } + // we have to sleep to allow the close callback to run. I am worried this may + // still end up being flaky, however. + nats.sleep(1); + try std.testing.expectEqualStrings("closed", closed); } {