Compare commits

...

4 Commits

Author SHA1 Message Date
f742ee1a85
build.zig.zon: match the project name 2024-04-06 15:35:19 -07:00
7db55e9ac9
subscription: fix setCompletionCallback
This was not being tested, and it was really broken. Now it is being
tested and it is no longer broken.
2024-04-06 15:34:38 -07:00
73fccb4662
all: allow passing more types of pointers as callback userdata
By performing more pointer casting gymnastics, additional types of
pointer can be supported. For example, now const qualified pointers
can be passed through thanks to the use of constCast. Also, having
explicit ptrCast allows nested pointers (e.g. a pointer to a slice,
such as `*[]const u8`) to be passed as userdata as well (the compiler
refuses to coerce a pointer to a pointer to `?*anyopaque` for some
reason, I guess because maybe it's ambiguous somehow?) Hopefully this
extra casting does not appreciably reduce the compiler's ability to
catch real bugs (for example, on a 64-bit machine, ptrCast can convert
a *u64 into a **u64 because there is no alignment change).

Also, the `volatile` pointer specifier is still not supported.
`allowzero` pointers probably also have a problem. Those are both
extreme edge cases, however.

This was intended to work before but did not due to an oversight.
Specifically, because the userdata pointers are stored as ?*anyopaque,
which does *not* have the const qualifier, they must have their const
qualifiers also removed. This is safe because the thunk guarantees
that the consumer code never sees the non-const version of the
pointer, and the nats library itself does nothing except pass the
pointer through to the user callback.

The tests have been updated to ensure this case works. The examples
still use a mutable userdata pointer to show that that case also
works. More tests could be added for the sake of increased rigor, but
I don't think it adds much.
2024-04-06 15:34:38 -07:00
42d0b24710
thunk: fix slightly confusing compile errors
The error messages not on the slice detection code path should not be
talking about slices.
2024-04-06 15:01:03 -07:00
6 changed files with 70 additions and 54 deletions

View File

@ -1,5 +1,5 @@
.{
.name = "nats-client",
.name = "nats.zig",
.version = "0.0.1",
.paths = .{
"src",

View File

@ -348,7 +348,7 @@ pub const Connection = opaque {
@ptrCast(self),
subject.ptr,
makeSubscriptionCallbackThunk(T, callback),
userdata,
@constCast(@ptrCast(userdata)),
));
return status.toError() orelse sub;
}
@ -369,7 +369,7 @@ pub const Connection = opaque {
subject.ptr,
timeout,
makeSubscriptionCallbackThunk(T, callback),
userdata,
@constCast(@ptrCast(userdata)),
));
return status.toError() orelse sub;
@ -403,7 +403,7 @@ pub const Connection = opaque {
subject.ptr,
queue_group.ptr,
makeSubscriptionCallbackThunk(T, callback),
userdata,
@constCast(@ptrCast(userdata)),
));
return status.toError() orelse sub;
@ -427,7 +427,7 @@ pub const Connection = opaque {
queue_group.ptr,
timeout,
makeSubscriptionCallbackThunk(T, callback),
userdata,
@constCast(@ptrCast(userdata)),
));
return status.toError() orelse sub;
@ -504,7 +504,7 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetTokenHandler(
@ptrCast(self),
makeTokenCallbackThunk(T, callback),
userdata,
@constCast(@ptrCast(userdata)),
)).raise();
}
@ -644,7 +644,7 @@ pub const ConnectionOptions = opaque {
nats_c.natsOptions_SetCustomReconnectDelay(
@ptrCast(self),
makeReconnectDelayCallbackThunk(T, callback),
userdata,
@constCast(@ptrCast(userdata)),
),
).raise();
}
@ -671,7 +671,7 @@ pub const ConnectionOptions = opaque {
nats_c.natsOptions_SetErrorHandler(
@ptrCast(self),
makeErrorHandlerCallbackThunk(T, callback),
userdata,
@constCast(@ptrCast(userdata)),
),
).raise();
}
@ -685,7 +685,7 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self),
makeConnectionCallbackThunk(T, callback),
userdata,
@constCast(@ptrCast(userdata)),
)).raise();
}
@ -698,7 +698,7 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self),
makeConnectionCallbackThunk(T, callback),
userdata,
@constCast(@ptrCast(userdata)),
)).raise();
}
@ -711,7 +711,7 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self),
makeConnectionCallbackThunk(T, callback),
userdata,
@constCast(@ptrCast(userdata)),
)).raise();
}
@ -724,7 +724,7 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self),
makeConnectionCallbackThunk(T, callback),
userdata,
@constCast(@ptrCast(userdata)),
)).raise();
}
@ -737,7 +737,7 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self),
makeConnectionCallbackThunk(T, callback),
userdata,
@constCast(@ptrCast(userdata)),
)).raise();
}
@ -753,7 +753,7 @@ pub const ConnectionOptions = opaque {
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetEventLoop(
@ptrCast(self),
@ptrCast(loop),
@constCast(@ptrCast(loop)),
makeAttachEventLoopCallbackThunk(T, L, attach_callback),
makeEventLoopAddRemoveCallbackThunk(T, read_callback),
makeEventLoopAddRemoveCallbackThunk(T, write_callback),
@ -822,7 +822,7 @@ pub const ConnectionOptions = opaque {
@ptrCast(self),
retry,
makeConnectionCallbackThunk(T, callback),
userdata,
@constCast(@ptrCast(userdata)),
)).raise();
}
@ -838,13 +838,17 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks(
@ptrCast(self),
makeJwtHandlerCallbackThunk(T, jwt_callback),
jwt_userdata,
@constCast(@ptrCast(jwt_userdata)),
makeSignatureHandlerCallbackThunk(U, sig_callback),
sig_userdata,
@constCast(@ptrCast(sig_userdata)),
)).raise();
}
pub fn setUserCredentialsFromFiles(self: *ConnectionOptions, user_or_chained_file: [:0]const u8, seed_file: [:0]const u8) Error!void {
pub fn setUserCredentialsFromFiles(
self: *ConnectionOptions,
user_or_chained_file: [:0]const u8,
seed_file: [:0]const u8,
) Error!void {
return Status.fromInt(
nats_c.natsOptions_SetUserCredentialsFromFiles(
@ptrCast(self),
@ -874,7 +878,7 @@ pub const ConnectionOptions = opaque {
@ptrCast(self),
pub_key.ptr,
makeSignatureHandlerCallbackThunk(T, sig_callback),
sig_userdata,
@constCast(@ptrCast(sig_userdata)),
)).raise();
}

View File

@ -174,12 +174,12 @@ pub const Subscription = opaque {
self: *Subscription,
comptime T: type,
comptime callback: *const thunk.SimpleCallbackThunkSignature(T),
userdata: *T,
userdata: T,
) Error!void {
return Status.fromInt(nats_c.natsSubscription_SetOnCompleteCB(
@ptrCast(self),
thunk.makeSimpleCallbackThunk(callback),
userdata,
thunk.makeSimpleCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)),
)).raise();
}
};

View File

@ -23,7 +23,7 @@ pub fn checkUserDataType(comptime T: type) void {
.Optional => |info| switch (@typeInfo(info.child)) {
.Optional => @compileError(
"nats callbacks can only accept an (optional) single, many," ++
" or c pointer as userdata, not slices. \"" ++
" or c pointer as userdata. \"" ++
@typeName(T) ++ "\" has more than one optional specifier.",
),
else => checkUserDataType(info.child),
@ -38,7 +38,7 @@ pub fn checkUserDataType(comptime T: type) void {
},
else => @compileError(
"nats callbacks can only accept an (optional) single, many," ++
" or c pointer as userdata, not slices. \"" ++
" or c pointer as userdata. \"" ++
@typeName(T) ++ "\" is not a pointer type.",
),
}
@ -57,7 +57,7 @@ pub fn makeSimpleCallbackThunk(
comptime checkUserDataType(T);
return struct {
fn thunk(userdata: ?*anyopaque) callconv(.C) void {
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
callback(data);
}
}.thunk;

View File

@ -99,7 +99,7 @@ test "nats.Connection" {
connection.drainTimeout(1000) catch {};
}
fn reconnectDelayHandler(userdata: *u32, connection: *nats.Connection, attempts: c_int) i64 {
fn reconnectDelayHandler(userdata: *const u32, connection: *nats.Connection, attempts: c_int) i64 {
_ = userdata;
_ = connection;
_ = attempts;
@ -108,7 +108,7 @@ fn reconnectDelayHandler(userdata: *u32, connection: *nats.Connection, attempts:
}
fn errorHandler(
userdata: *u32,
userdata: *const u32,
connection: *nats.Connection,
subscription: *nats.Subscription,
status: nats.Status,
@ -119,18 +119,18 @@ fn errorHandler(
_ = status;
}
fn connectionHandler(userdata: *u32, connection: *nats.Connection) void {
fn connectionHandler(userdata: *const u32, connection: *nats.Connection) void {
_ = userdata;
_ = connection;
}
fn jwtHandler(userdata: *u32) nats.JwtResponseOrError {
fn jwtHandler(userdata: *const u32) nats.JwtResponseOrError {
_ = userdata;
// return .{ .jwt = std.heap.raw_c_allocator.dupeZ(u8, "abcdef") catch @panic("no!") };
return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "dang") catch @panic("no!") };
}
fn signatureHandler(userdata: *u32, nonce: [:0]const u8) nats.SignatureResponseOrError {
fn signatureHandler(userdata: *const u32, nonce: [:0]const u8) nats.SignatureResponseOrError {
_ = userdata;
_ = nonce;
// return .{ .signature = std.heap.raw_c_allocator.dupe(u8, "01230123") catch @panic("no!") };
@ -144,7 +144,7 @@ test "nats.ConnectionOptions" {
const options = try nats.ConnectionOptions.create();
defer options.destroy();
var userdata: u32 = 0;
const userdata: u32 = 0;
try options.setUrl(nats.default_server_url);
const servers = [_][*:0]const u8{ "nats://127.0.0.1:4442", "nats://127.0.0.1:4443" };
@ -164,14 +164,14 @@ test "nats.ConnectionOptions" {
try options.setMaxReconnect(10);
try options.setReconnectWait(500);
try options.setReconnectJitter(100, 200);
try options.setCustomReconnectDelay(*u32, reconnectDelayHandler, &userdata);
try options.setCustomReconnectDelay(*const u32, reconnectDelayHandler, &userdata);
try options.setReconnectBufSize(1024);
try options.setMaxPendingMessages(50);
try options.setErrorHandler(*u32, errorHandler, &userdata);
try options.setClosedCallback(*u32, connectionHandler, &userdata);
try options.setDisconnectedCallback(*u32, connectionHandler, &userdata);
try options.setDiscoveredServersCallback(*u32, connectionHandler, &userdata);
try options.setLameDuckModeCallback(*u32, connectionHandler, &userdata);
try options.setErrorHandler(*const u32, errorHandler, &userdata);
try options.setClosedCallback(*const u32, connectionHandler, &userdata);
try options.setDisconnectedCallback(*const u32, connectionHandler, &userdata);
try options.setDiscoveredServersCallback(*const u32, connectionHandler, &userdata);
try options.setLameDuckModeCallback(*const u32, connectionHandler, &userdata);
try options.ignoreDiscoveredServers(true);
try options.useGlobalMessageDelivery(false);
try options.ipResolutionOrder(.ipv4_first);
@ -179,8 +179,8 @@ test "nats.ConnectionOptions" {
try options.useOldRequestStyle(false);
try options.setFailRequestsOnDisconnect(true);
try options.setNoEcho(true);
try options.setRetryOnFailedConnect(*u32, connectionHandler, true, &userdata);
try options.setUserCredentialsCallbacks(*u32, *u32, jwtHandler, signatureHandler, &userdata, &userdata);
try options.setRetryOnFailedConnect(*const u32, connectionHandler, true, &userdata);
try options.setUserCredentialsCallbacks(*const u32, *const u32, jwtHandler, signatureHandler, &userdata, &userdata);
try options.setWriteDeadline(5);
try options.disableNoResponders(true);
try options.setCustomInboxPrefix("_FOOBOX");

View File

@ -80,18 +80,21 @@ test "nats.Subscription" {
}
fn onMessage(
userdata: *u32,
userdata: *const u32,
connection: *nats.Connection,
subscription: *nats.Subscription,
message: *nats.Message,
) void {
_ = subscription;
_ = userdata;
if (message.getReply()) |reply| {
connection.publish(reply, "greetings") catch @panic("OH NO");
} else @panic("HOW");
}
userdata.* += 1;
fn onClose(userdata: *[]const u8) void {
userdata.* = "closed";
}
test "nats.Subscription (async)" {
@ -112,21 +115,30 @@ test "nats.Subscription (async)" {
defer message.destroy();
{
var count: u32 = 0;
const subscription = try connection.subscribe(*u32, message_subject, onMessage, &count);
defer subscription.destroy();
var closed: []const u8 = "test";
{
const count: u32 = 0;
const subscription = try connection.subscribe(*const u32, message_subject, onMessage, &count);
defer subscription.destroy();
const response = try connection.requestMessage(message, 1000);
try std.testing.expectEqualStrings(
"greetings",
response.getData() orelse return error.TestUnexpectedResult,
);
try subscription.setCompletionCallback(*[]const u8, onClose, &closed);
const response = try connection.requestMessage(message, 1000);
try std.testing.expectEqualStrings(
"greetings",
response.getData() orelse return error.TestUnexpectedResult,
);
}
// we have to sleep to allow the close callback to run. I am worried this may
// still end up being flaky, however.
nats.sleep(1);
try std.testing.expectEqualStrings("closed", closed);
}
{
var count: u32 = 0;
const count: u32 = 0;
const subscription = try connection.subscribeTimeout(
*u32,
*const u32,
message_subject,
1000,
onMessage,
@ -142,9 +154,9 @@ test "nats.Subscription (async)" {
}
{
var count: u32 = 0;
const count: u32 = 0;
const subscription = try connection.queueSubscribe(
*u32,
*const u32,
message_subject,
"queuegroup",
onMessage,
@ -162,7 +174,7 @@ test "nats.Subscription (async)" {
{
var count: u32 = 0;
const subscription = try connection.queueSubscribeTimeout(
*u32,
*const u32,
message_subject,
"queuegroup",
1000,