all: allow passing more types of pointers as callback userdata
By performing more pointer casting gymnastics, additional types of pointer can be supported. For example, now const qualified pointers can be passed through thanks to the use of constCast. Also, having explicit ptrCast allows nested pointers (e.g. a pointer to a slice, such as `*[]const u8`) to be passed as userdata as well (the compiler refuses to coerce a pointer to a pointer to `?*anyopaque` for some reason, I guess because maybe it's ambiguous somehow?) Hopefully this extra casting does not appreciably reduce the compiler's ability to catch real bugs (for example, on a 64-bit machine, ptrCast can convert a *u64 into a **u64 because there is no alignment change). Also, the `volatile` pointer specifier is still not supported. `allowzero` pointers probably also have a problem. Those are both extreme edge cases, however. This was intended to work before but did not due to an oversight. Specifically, because the userdata pointers are stored as ?*anyopaque, which does *not* have the const qualifier, they must have their const qualifiers also removed. This is safe because the thunk guarantees that the consumer code never sees the non-const version of the pointer, and the nats library itself does nothing except pass the pointer through to the user callback. The tests have been updated to ensure this case works. The examples still use a mutable userdata pointer to show that that case also works. More tests could be added for the sake of increased rigor, but I don't think it adds much.
This commit is contained in:
parent
4f26bf8fca
commit
5939836bec
@ -348,7 +348,7 @@ pub const Connection = opaque {
|
||||
@ptrCast(self),
|
||||
subject.ptr,
|
||||
makeSubscriptionCallbackThunk(T, callback),
|
||||
userdata,
|
||||
@constCast(@ptrCast(userdata)),
|
||||
));
|
||||
return status.toError() orelse sub;
|
||||
}
|
||||
@ -369,7 +369,7 @@ pub const Connection = opaque {
|
||||
subject.ptr,
|
||||
timeout,
|
||||
makeSubscriptionCallbackThunk(T, callback),
|
||||
userdata,
|
||||
@constCast(@ptrCast(userdata)),
|
||||
));
|
||||
|
||||
return status.toError() orelse sub;
|
||||
@ -403,7 +403,7 @@ pub const Connection = opaque {
|
||||
subject.ptr,
|
||||
queue_group.ptr,
|
||||
makeSubscriptionCallbackThunk(T, callback),
|
||||
userdata,
|
||||
@constCast(@ptrCast(userdata)),
|
||||
));
|
||||
|
||||
return status.toError() orelse sub;
|
||||
@ -427,7 +427,7 @@ pub const Connection = opaque {
|
||||
queue_group.ptr,
|
||||
timeout,
|
||||
makeSubscriptionCallbackThunk(T, callback),
|
||||
userdata,
|
||||
@constCast(@ptrCast(userdata)),
|
||||
));
|
||||
|
||||
return status.toError() orelse sub;
|
||||
@ -504,7 +504,7 @@ pub const ConnectionOptions = opaque {
|
||||
return Status.fromInt(nats_c.natsOptions_SetTokenHandler(
|
||||
@ptrCast(self),
|
||||
makeTokenCallbackThunk(T, callback),
|
||||
userdata,
|
||||
@constCast(@ptrCast(userdata)),
|
||||
)).raise();
|
||||
}
|
||||
|
||||
@ -644,7 +644,7 @@ pub const ConnectionOptions = opaque {
|
||||
nats_c.natsOptions_SetCustomReconnectDelay(
|
||||
@ptrCast(self),
|
||||
makeReconnectDelayCallbackThunk(T, callback),
|
||||
userdata,
|
||||
@constCast(@ptrCast(userdata)),
|
||||
),
|
||||
).raise();
|
||||
}
|
||||
@ -671,7 +671,7 @@ pub const ConnectionOptions = opaque {
|
||||
nats_c.natsOptions_SetErrorHandler(
|
||||
@ptrCast(self),
|
||||
makeErrorHandlerCallbackThunk(T, callback),
|
||||
userdata,
|
||||
@constCast(@ptrCast(userdata)),
|
||||
),
|
||||
).raise();
|
||||
}
|
||||
@ -685,7 +685,7 @@ pub const ConnectionOptions = opaque {
|
||||
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
||||
@ptrCast(self),
|
||||
makeConnectionCallbackThunk(T, callback),
|
||||
userdata,
|
||||
@constCast(@ptrCast(userdata)),
|
||||
)).raise();
|
||||
}
|
||||
|
||||
@ -698,7 +698,7 @@ pub const ConnectionOptions = opaque {
|
||||
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
||||
@ptrCast(self),
|
||||
makeConnectionCallbackThunk(T, callback),
|
||||
userdata,
|
||||
@constCast(@ptrCast(userdata)),
|
||||
)).raise();
|
||||
}
|
||||
|
||||
@ -711,7 +711,7 @@ pub const ConnectionOptions = opaque {
|
||||
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
||||
@ptrCast(self),
|
||||
makeConnectionCallbackThunk(T, callback),
|
||||
userdata,
|
||||
@constCast(@ptrCast(userdata)),
|
||||
)).raise();
|
||||
}
|
||||
|
||||
@ -724,7 +724,7 @@ pub const ConnectionOptions = opaque {
|
||||
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
||||
@ptrCast(self),
|
||||
makeConnectionCallbackThunk(T, callback),
|
||||
userdata,
|
||||
@constCast(@ptrCast(userdata)),
|
||||
)).raise();
|
||||
}
|
||||
|
||||
@ -737,7 +737,7 @@ pub const ConnectionOptions = opaque {
|
||||
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
||||
@ptrCast(self),
|
||||
makeConnectionCallbackThunk(T, callback),
|
||||
userdata,
|
||||
@constCast(@ptrCast(userdata)),
|
||||
)).raise();
|
||||
}
|
||||
|
||||
@ -753,7 +753,7 @@ pub const ConnectionOptions = opaque {
|
||||
) Error!void {
|
||||
return Status.fromInt(nats_c.natsOptions_SetEventLoop(
|
||||
@ptrCast(self),
|
||||
@ptrCast(loop),
|
||||
@constCast(@ptrCast(loop)),
|
||||
makeAttachEventLoopCallbackThunk(T, L, attach_callback),
|
||||
makeEventLoopAddRemoveCallbackThunk(T, read_callback),
|
||||
makeEventLoopAddRemoveCallbackThunk(T, write_callback),
|
||||
@ -822,7 +822,7 @@ pub const ConnectionOptions = opaque {
|
||||
@ptrCast(self),
|
||||
retry,
|
||||
makeConnectionCallbackThunk(T, callback),
|
||||
userdata,
|
||||
@constCast(@ptrCast(userdata)),
|
||||
)).raise();
|
||||
}
|
||||
|
||||
@ -838,13 +838,17 @@ pub const ConnectionOptions = opaque {
|
||||
return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks(
|
||||
@ptrCast(self),
|
||||
makeJwtHandlerCallbackThunk(T, jwt_callback),
|
||||
jwt_userdata,
|
||||
@constCast(@ptrCast(jwt_userdata)),
|
||||
makeSignatureHandlerCallbackThunk(U, sig_callback),
|
||||
sig_userdata,
|
||||
@constCast(@ptrCast(sig_userdata)),
|
||||
)).raise();
|
||||
}
|
||||
|
||||
pub fn setUserCredentialsFromFiles(self: *ConnectionOptions, user_or_chained_file: [:0]const u8, seed_file: [:0]const u8) Error!void {
|
||||
pub fn setUserCredentialsFromFiles(
|
||||
self: *ConnectionOptions,
|
||||
user_or_chained_file: [:0]const u8,
|
||||
seed_file: [:0]const u8,
|
||||
) Error!void {
|
||||
return Status.fromInt(
|
||||
nats_c.natsOptions_SetUserCredentialsFromFiles(
|
||||
@ptrCast(self),
|
||||
@ -874,7 +878,7 @@ pub const ConnectionOptions = opaque {
|
||||
@ptrCast(self),
|
||||
pub_key.ptr,
|
||||
makeSignatureHandlerCallbackThunk(T, sig_callback),
|
||||
sig_userdata,
|
||||
@constCast(@ptrCast(sig_userdata)),
|
||||
)).raise();
|
||||
}
|
||||
|
||||
|
@ -179,7 +179,7 @@ pub const Subscription = opaque {
|
||||
return Status.fromInt(nats_c.natsSubscription_SetOnCompleteCB(
|
||||
@ptrCast(self),
|
||||
thunk.makeSimpleCallbackThunk(callback),
|
||||
userdata,
|
||||
@constCast(userdata),
|
||||
)).raise();
|
||||
}
|
||||
};
|
||||
|
@ -99,7 +99,7 @@ test "nats.Connection" {
|
||||
connection.drainTimeout(1000) catch {};
|
||||
}
|
||||
|
||||
fn reconnectDelayHandler(userdata: *u32, connection: *nats.Connection, attempts: c_int) i64 {
|
||||
fn reconnectDelayHandler(userdata: *const u32, connection: *nats.Connection, attempts: c_int) i64 {
|
||||
_ = userdata;
|
||||
_ = connection;
|
||||
_ = attempts;
|
||||
@ -108,7 +108,7 @@ fn reconnectDelayHandler(userdata: *u32, connection: *nats.Connection, attempts:
|
||||
}
|
||||
|
||||
fn errorHandler(
|
||||
userdata: *u32,
|
||||
userdata: *const u32,
|
||||
connection: *nats.Connection,
|
||||
subscription: *nats.Subscription,
|
||||
status: nats.Status,
|
||||
@ -119,18 +119,18 @@ fn errorHandler(
|
||||
_ = status;
|
||||
}
|
||||
|
||||
fn connectionHandler(userdata: *u32, connection: *nats.Connection) void {
|
||||
fn connectionHandler(userdata: *const u32, connection: *nats.Connection) void {
|
||||
_ = userdata;
|
||||
_ = connection;
|
||||
}
|
||||
|
||||
fn jwtHandler(userdata: *u32) nats.JwtResponseOrError {
|
||||
fn jwtHandler(userdata: *const u32) nats.JwtResponseOrError {
|
||||
_ = userdata;
|
||||
// return .{ .jwt = std.heap.raw_c_allocator.dupeZ(u8, "abcdef") catch @panic("no!") };
|
||||
return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "dang") catch @panic("no!") };
|
||||
}
|
||||
|
||||
fn signatureHandler(userdata: *u32, nonce: [:0]const u8) nats.SignatureResponseOrError {
|
||||
fn signatureHandler(userdata: *const u32, nonce: [:0]const u8) nats.SignatureResponseOrError {
|
||||
_ = userdata;
|
||||
_ = nonce;
|
||||
// return .{ .signature = std.heap.raw_c_allocator.dupe(u8, "01230123") catch @panic("no!") };
|
||||
@ -144,7 +144,7 @@ test "nats.ConnectionOptions" {
|
||||
const options = try nats.ConnectionOptions.create();
|
||||
defer options.destroy();
|
||||
|
||||
var userdata: u32 = 0;
|
||||
const userdata: u32 = 0;
|
||||
|
||||
try options.setUrl(nats.default_server_url);
|
||||
const servers = [_][*:0]const u8{ "nats://127.0.0.1:4442", "nats://127.0.0.1:4443" };
|
||||
@ -164,14 +164,14 @@ test "nats.ConnectionOptions" {
|
||||
try options.setMaxReconnect(10);
|
||||
try options.setReconnectWait(500);
|
||||
try options.setReconnectJitter(100, 200);
|
||||
try options.setCustomReconnectDelay(*u32, reconnectDelayHandler, &userdata);
|
||||
try options.setCustomReconnectDelay(*const u32, reconnectDelayHandler, &userdata);
|
||||
try options.setReconnectBufSize(1024);
|
||||
try options.setMaxPendingMessages(50);
|
||||
try options.setErrorHandler(*u32, errorHandler, &userdata);
|
||||
try options.setClosedCallback(*u32, connectionHandler, &userdata);
|
||||
try options.setDisconnectedCallback(*u32, connectionHandler, &userdata);
|
||||
try options.setDiscoveredServersCallback(*u32, connectionHandler, &userdata);
|
||||
try options.setLameDuckModeCallback(*u32, connectionHandler, &userdata);
|
||||
try options.setErrorHandler(*const u32, errorHandler, &userdata);
|
||||
try options.setClosedCallback(*const u32, connectionHandler, &userdata);
|
||||
try options.setDisconnectedCallback(*const u32, connectionHandler, &userdata);
|
||||
try options.setDiscoveredServersCallback(*const u32, connectionHandler, &userdata);
|
||||
try options.setLameDuckModeCallback(*const u32, connectionHandler, &userdata);
|
||||
try options.ignoreDiscoveredServers(true);
|
||||
try options.useGlobalMessageDelivery(false);
|
||||
try options.ipResolutionOrder(.ipv4_first);
|
||||
@ -179,8 +179,8 @@ test "nats.ConnectionOptions" {
|
||||
try options.useOldRequestStyle(false);
|
||||
try options.setFailRequestsOnDisconnect(true);
|
||||
try options.setNoEcho(true);
|
||||
try options.setRetryOnFailedConnect(*u32, connectionHandler, true, &userdata);
|
||||
try options.setUserCredentialsCallbacks(*u32, *u32, jwtHandler, signatureHandler, &userdata, &userdata);
|
||||
try options.setRetryOnFailedConnect(*const u32, connectionHandler, true, &userdata);
|
||||
try options.setUserCredentialsCallbacks(*const u32, *const u32, jwtHandler, signatureHandler, &userdata, &userdata);
|
||||
try options.setWriteDeadline(5);
|
||||
try options.disableNoResponders(true);
|
||||
try options.setCustomInboxPrefix("_FOOBOX");
|
||||
|
@ -80,18 +80,17 @@ test "nats.Subscription" {
|
||||
}
|
||||
|
||||
fn onMessage(
|
||||
userdata: *u32,
|
||||
userdata: *const u32,
|
||||
connection: *nats.Connection,
|
||||
subscription: *nats.Subscription,
|
||||
message: *nats.Message,
|
||||
) void {
|
||||
_ = subscription;
|
||||
_ = userdata;
|
||||
|
||||
if (message.getReply()) |reply| {
|
||||
connection.publish(reply, "greetings") catch @panic("OH NO");
|
||||
} else @panic("HOW");
|
||||
|
||||
userdata.* += 1;
|
||||
}
|
||||
|
||||
test "nats.Subscription (async)" {
|
||||
@ -112,8 +111,8 @@ test "nats.Subscription (async)" {
|
||||
defer message.destroy();
|
||||
|
||||
{
|
||||
var count: u32 = 0;
|
||||
const subscription = try connection.subscribe(*u32, message_subject, onMessage, &count);
|
||||
const count: u32 = 0;
|
||||
const subscription = try connection.subscribe(*const u32, message_subject, onMessage, &count);
|
||||
defer subscription.destroy();
|
||||
|
||||
const response = try connection.requestMessage(message, 1000);
|
||||
@ -124,9 +123,9 @@ test "nats.Subscription (async)" {
|
||||
}
|
||||
|
||||
{
|
||||
var count: u32 = 0;
|
||||
const count: u32 = 0;
|
||||
const subscription = try connection.subscribeTimeout(
|
||||
*u32,
|
||||
*const u32,
|
||||
message_subject,
|
||||
1000,
|
||||
onMessage,
|
||||
@ -142,9 +141,9 @@ test "nats.Subscription (async)" {
|
||||
}
|
||||
|
||||
{
|
||||
var count: u32 = 0;
|
||||
const count: u32 = 0;
|
||||
const subscription = try connection.queueSubscribe(
|
||||
*u32,
|
||||
*const u32,
|
||||
message_subject,
|
||||
"queuegroup",
|
||||
onMessage,
|
||||
@ -162,7 +161,7 @@ test "nats.Subscription (async)" {
|
||||
{
|
||||
var count: u32 = 0;
|
||||
const subscription = try connection.queueSubscribeTimeout(
|
||||
*u32,
|
||||
*const u32,
|
||||
message_subject,
|
||||
"queuegroup",
|
||||
1000,
|
||||
|
Loading…
x
Reference in New Issue
Block a user