Compare commits

..

No commits in common. "f742ee1a85efdccdf451e6b9b2ee7f0cc43e06c5" and "b17a3fba6c2adaebda3f4e2862ef5f61be4740aa" have entirely different histories.

6 changed files with 54 additions and 70 deletions

View File

@ -1,5 +1,5 @@
.{
.name = "nats.zig",
.name = "nats-client",
.version = "0.0.1",
.paths = .{
"src",

View File

@ -348,7 +348,7 @@ pub const Connection = opaque {
@ptrCast(self),
subject.ptr,
makeSubscriptionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)),
userdata,
));
return status.toError() orelse sub;
}
@ -369,7 +369,7 @@ pub const Connection = opaque {
subject.ptr,
timeout,
makeSubscriptionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)),
userdata,
));
return status.toError() orelse sub;
@ -403,7 +403,7 @@ pub const Connection = opaque {
subject.ptr,
queue_group.ptr,
makeSubscriptionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)),
userdata,
));
return status.toError() orelse sub;
@ -427,7 +427,7 @@ pub const Connection = opaque {
queue_group.ptr,
timeout,
makeSubscriptionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)),
userdata,
));
return status.toError() orelse sub;
@ -504,7 +504,7 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetTokenHandler(
@ptrCast(self),
makeTokenCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)),
userdata,
)).raise();
}
@ -644,7 +644,7 @@ pub const ConnectionOptions = opaque {
nats_c.natsOptions_SetCustomReconnectDelay(
@ptrCast(self),
makeReconnectDelayCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)),
userdata,
),
).raise();
}
@ -671,7 +671,7 @@ pub const ConnectionOptions = opaque {
nats_c.natsOptions_SetErrorHandler(
@ptrCast(self),
makeErrorHandlerCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)),
userdata,
),
).raise();
}
@ -685,7 +685,7 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self),
makeConnectionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)),
userdata,
)).raise();
}
@ -698,7 +698,7 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self),
makeConnectionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)),
userdata,
)).raise();
}
@ -711,7 +711,7 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self),
makeConnectionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)),
userdata,
)).raise();
}
@ -724,7 +724,7 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self),
makeConnectionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)),
userdata,
)).raise();
}
@ -737,7 +737,7 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
@ptrCast(self),
makeConnectionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)),
userdata,
)).raise();
}
@ -753,7 +753,7 @@ pub const ConnectionOptions = opaque {
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetEventLoop(
@ptrCast(self),
@constCast(@ptrCast(loop)),
@ptrCast(loop),
makeAttachEventLoopCallbackThunk(T, L, attach_callback),
makeEventLoopAddRemoveCallbackThunk(T, read_callback),
makeEventLoopAddRemoveCallbackThunk(T, write_callback),
@ -822,7 +822,7 @@ pub const ConnectionOptions = opaque {
@ptrCast(self),
retry,
makeConnectionCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)),
userdata,
)).raise();
}
@ -838,17 +838,13 @@ pub const ConnectionOptions = opaque {
return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks(
@ptrCast(self),
makeJwtHandlerCallbackThunk(T, jwt_callback),
@constCast(@ptrCast(jwt_userdata)),
jwt_userdata,
makeSignatureHandlerCallbackThunk(U, sig_callback),
@constCast(@ptrCast(sig_userdata)),
sig_userdata,
)).raise();
}
pub fn setUserCredentialsFromFiles(
self: *ConnectionOptions,
user_or_chained_file: [:0]const u8,
seed_file: [:0]const u8,
) Error!void {
pub fn setUserCredentialsFromFiles(self: *ConnectionOptions, user_or_chained_file: [:0]const u8, seed_file: [:0]const u8) Error!void {
return Status.fromInt(
nats_c.natsOptions_SetUserCredentialsFromFiles(
@ptrCast(self),
@ -878,7 +874,7 @@ pub const ConnectionOptions = opaque {
@ptrCast(self),
pub_key.ptr,
makeSignatureHandlerCallbackThunk(T, sig_callback),
@constCast(@ptrCast(sig_userdata)),
sig_userdata,
)).raise();
}

View File

@ -174,12 +174,12 @@ pub const Subscription = opaque {
self: *Subscription,
comptime T: type,
comptime callback: *const thunk.SimpleCallbackThunkSignature(T),
userdata: T,
userdata: *T,
) Error!void {
return Status.fromInt(nats_c.natsSubscription_SetOnCompleteCB(
@ptrCast(self),
thunk.makeSimpleCallbackThunk(T, callback),
@constCast(@ptrCast(userdata)),
thunk.makeSimpleCallbackThunk(callback),
userdata,
)).raise();
}
};

View File

@ -23,7 +23,7 @@ pub fn checkUserDataType(comptime T: type) void {
.Optional => |info| switch (@typeInfo(info.child)) {
.Optional => @compileError(
"nats callbacks can only accept an (optional) single, many," ++
" or c pointer as userdata. \"" ++
" or c pointer as userdata, not slices. \"" ++
@typeName(T) ++ "\" has more than one optional specifier.",
),
else => checkUserDataType(info.child),
@ -38,7 +38,7 @@ pub fn checkUserDataType(comptime T: type) void {
},
else => @compileError(
"nats callbacks can only accept an (optional) single, many," ++
" or c pointer as userdata. \"" ++
" or c pointer as userdata, not slices. \"" ++
@typeName(T) ++ "\" is not a pointer type.",
),
}
@ -57,7 +57,7 @@ pub fn makeSimpleCallbackThunk(
comptime checkUserDataType(T);
return struct {
fn thunk(userdata: ?*anyopaque) callconv(.C) void {
const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
callback(data);
}
}.thunk;

View File

@ -99,7 +99,7 @@ test "nats.Connection" {
connection.drainTimeout(1000) catch {};
}
fn reconnectDelayHandler(userdata: *const u32, connection: *nats.Connection, attempts: c_int) i64 {
fn reconnectDelayHandler(userdata: *u32, connection: *nats.Connection, attempts: c_int) i64 {
_ = userdata;
_ = connection;
_ = attempts;
@ -108,7 +108,7 @@ fn reconnectDelayHandler(userdata: *const u32, connection: *nats.Connection, att
}
fn errorHandler(
userdata: *const u32,
userdata: *u32,
connection: *nats.Connection,
subscription: *nats.Subscription,
status: nats.Status,
@ -119,18 +119,18 @@ fn errorHandler(
_ = status;
}
fn connectionHandler(userdata: *const u32, connection: *nats.Connection) void {
fn connectionHandler(userdata: *u32, connection: *nats.Connection) void {
_ = userdata;
_ = connection;
}
fn jwtHandler(userdata: *const u32) nats.JwtResponseOrError {
fn jwtHandler(userdata: *u32) nats.JwtResponseOrError {
_ = userdata;
// return .{ .jwt = std.heap.raw_c_allocator.dupeZ(u8, "abcdef") catch @panic("no!") };
return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "dang") catch @panic("no!") };
}
fn signatureHandler(userdata: *const u32, nonce: [:0]const u8) nats.SignatureResponseOrError {
fn signatureHandler(userdata: *u32, nonce: [:0]const u8) nats.SignatureResponseOrError {
_ = userdata;
_ = nonce;
// return .{ .signature = std.heap.raw_c_allocator.dupe(u8, "01230123") catch @panic("no!") };
@ -144,7 +144,7 @@ test "nats.ConnectionOptions" {
const options = try nats.ConnectionOptions.create();
defer options.destroy();
const userdata: u32 = 0;
var userdata: u32 = 0;
try options.setUrl(nats.default_server_url);
const servers = [_][*:0]const u8{ "nats://127.0.0.1:4442", "nats://127.0.0.1:4443" };
@ -164,14 +164,14 @@ test "nats.ConnectionOptions" {
try options.setMaxReconnect(10);
try options.setReconnectWait(500);
try options.setReconnectJitter(100, 200);
try options.setCustomReconnectDelay(*const u32, reconnectDelayHandler, &userdata);
try options.setCustomReconnectDelay(*u32, reconnectDelayHandler, &userdata);
try options.setReconnectBufSize(1024);
try options.setMaxPendingMessages(50);
try options.setErrorHandler(*const u32, errorHandler, &userdata);
try options.setClosedCallback(*const u32, connectionHandler, &userdata);
try options.setDisconnectedCallback(*const u32, connectionHandler, &userdata);
try options.setDiscoveredServersCallback(*const u32, connectionHandler, &userdata);
try options.setLameDuckModeCallback(*const u32, connectionHandler, &userdata);
try options.setErrorHandler(*u32, errorHandler, &userdata);
try options.setClosedCallback(*u32, connectionHandler, &userdata);
try options.setDisconnectedCallback(*u32, connectionHandler, &userdata);
try options.setDiscoveredServersCallback(*u32, connectionHandler, &userdata);
try options.setLameDuckModeCallback(*u32, connectionHandler, &userdata);
try options.ignoreDiscoveredServers(true);
try options.useGlobalMessageDelivery(false);
try options.ipResolutionOrder(.ipv4_first);
@ -179,8 +179,8 @@ test "nats.ConnectionOptions" {
try options.useOldRequestStyle(false);
try options.setFailRequestsOnDisconnect(true);
try options.setNoEcho(true);
try options.setRetryOnFailedConnect(*const u32, connectionHandler, true, &userdata);
try options.setUserCredentialsCallbacks(*const u32, *const u32, jwtHandler, signatureHandler, &userdata, &userdata);
try options.setRetryOnFailedConnect(*u32, connectionHandler, true, &userdata);
try options.setUserCredentialsCallbacks(*u32, *u32, jwtHandler, signatureHandler, &userdata, &userdata);
try options.setWriteDeadline(5);
try options.disableNoResponders(true);
try options.setCustomInboxPrefix("_FOOBOX");

View File

@ -80,21 +80,18 @@ test "nats.Subscription" {
}
fn onMessage(
userdata: *const u32,
userdata: *u32,
connection: *nats.Connection,
subscription: *nats.Subscription,
message: *nats.Message,
) void {
_ = subscription;
_ = userdata;
if (message.getReply()) |reply| {
connection.publish(reply, "greetings") catch @panic("OH NO");
} else @panic("HOW");
}
fn onClose(userdata: *[]const u8) void {
userdata.* = "closed";
userdata.* += 1;
}
test "nats.Subscription (async)" {
@ -115,30 +112,21 @@ test "nats.Subscription (async)" {
defer message.destroy();
{
var closed: []const u8 = "test";
{
const count: u32 = 0;
const subscription = try connection.subscribe(*const u32, message_subject, onMessage, &count);
defer subscription.destroy();
var count: u32 = 0;
const subscription = try connection.subscribe(*u32, message_subject, onMessage, &count);
defer subscription.destroy();
try subscription.setCompletionCallback(*[]const u8, onClose, &closed);
const response = try connection.requestMessage(message, 1000);
try std.testing.expectEqualStrings(
"greetings",
response.getData() orelse return error.TestUnexpectedResult,
);
}
// we have to sleep to allow the close callback to run. I am worried this may
// still end up being flaky, however.
nats.sleep(1);
try std.testing.expectEqualStrings("closed", closed);
const response = try connection.requestMessage(message, 1000);
try std.testing.expectEqualStrings(
"greetings",
response.getData() orelse return error.TestUnexpectedResult,
);
}
{
const count: u32 = 0;
var count: u32 = 0;
const subscription = try connection.subscribeTimeout(
*const u32,
*u32,
message_subject,
1000,
onMessage,
@ -154,9 +142,9 @@ test "nats.Subscription (async)" {
}
{
const count: u32 = 0;
var count: u32 = 0;
const subscription = try connection.queueSubscribe(
*const u32,
*u32,
message_subject,
"queuegroup",
onMessage,
@ -174,7 +162,7 @@ test "nats.Subscription (async)" {
{
var count: u32 = 0;
const subscription = try connection.queueSubscribeTimeout(
*const u32,
*u32,
message_subject,
"queuegroup",
1000,