Compare commits
7 Commits
c6764fcf60
...
zig-0.11.x
| Author | SHA1 | Date | |
|---|---|---|---|
|
320bef2c63
|
|||
|
28c7890f6c
|
|||
|
5939836bec
|
|||
|
4f26bf8fca
|
|||
|
c4a8ae1a38
|
|||
|
3462b3cdb6
|
|||
|
b28a91b97f
|
@@ -59,7 +59,7 @@ const examples = [_]Example{
|
|||||||
.{ .name = "pub_bytes", .file = "examples/pub_bytes.zig" },
|
.{ .name = "pub_bytes", .file = "examples/pub_bytes.zig" },
|
||||||
};
|
};
|
||||||
|
|
||||||
pub fn add_examples(b: *std.build, options: ExampleOptions) void {
|
pub fn add_examples(b: *std.Build, options: ExampleOptions) void {
|
||||||
const example_step = b.step("examples", "build examples");
|
const example_step = b.step("examples", "build examples");
|
||||||
|
|
||||||
inline for (examples) |example| {
|
inline for (examples) |example| {
|
||||||
|
|||||||
@@ -1,10 +1,10 @@
|
|||||||
.{
|
.{
|
||||||
.name = "nats-client",
|
.name = "nats.zig",
|
||||||
.version = "0.0.1",
|
.version = "0.0.1",
|
||||||
.dependencies = .{
|
.dependencies = .{
|
||||||
.libressl = .{
|
.libressl = .{
|
||||||
.url = "https://github.com/epicyclic-dev/LibreSSL-portable/archive/2b68369a2b883714cea05357aa378b3a3e8ef2f6.tar.gz",
|
.url = "https://github.com/epicyclic-dev/LibreSSL-portable/archive/4bbf9ad43fd5d56c8e15bc2e880aab7c4e49731b.tar.gz",
|
||||||
.hash = "12206b907fcb1dea424d122d29a0549bdc6c83648e0433973388b2efb6813b36a8fa",
|
.hash = "1220282c6f64f531b9d07d5ed1959708822f4f8dc2486a7005be391c8f5cdf2a502a",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,10 +29,10 @@ pub fn main() !void {
|
|||||||
defer connection.destroy();
|
defer connection.destroy();
|
||||||
|
|
||||||
var count: u32 = 0;
|
var count: u32 = 0;
|
||||||
const subscription = try connection.subscribe(u32, "channel", onMessage, &count);
|
const subscription = try connection.subscribe(*u32, "channel", onMessage, &count);
|
||||||
defer subscription.destroy();
|
defer subscription.destroy();
|
||||||
|
|
||||||
while (count < 10) : (nats.sleep(1000)) {
|
while (count < 10) : (nats.sleep(100)) {
|
||||||
const reply = try connection.request("channel", "greetings", 1000);
|
const reply = try connection.request("channel", "greetings", 1000);
|
||||||
defer reply.destroy();
|
defer reply.destroy();
|
||||||
|
|
||||||
|
|||||||
@@ -18,24 +18,21 @@ pub const nats_c = @cImport({
|
|||||||
@cInclude("nats/nats.h");
|
@cInclude("nats/nats.h");
|
||||||
});
|
});
|
||||||
|
|
||||||
const sub_ = @import("./subscription.zig");
|
const Subscription = @import("./subscription.zig").Subscription;
|
||||||
const Subscription = sub_.Subscription;
|
const SubscriptionCallbackSignature = @import("./subscription.zig").SubscriptionCallbackSignature;
|
||||||
const SubscriptionCallbackSignature = sub_.SubscriptionCallbackSignature;
|
const makeSubscriptionCallbackThunk = @import("./subscription.zig").makeSubscriptionCallbackThunk;
|
||||||
const makeSubscriptionCallbackThunk = sub_.makeSubscriptionCallbackThunk;
|
|
||||||
|
|
||||||
const msg_ = @import("./message.zig");
|
const Message = @import("./message.zig").Message;
|
||||||
const Message = msg_.Message;
|
|
||||||
|
|
||||||
const err_ = @import("./error.zig");
|
const Error = @import("./error.zig").Error;
|
||||||
const Error = err_.Error;
|
const Status = @import("./error.zig").Status;
|
||||||
const Status = err_.Status;
|
const ErrorInfo = @import("./error.zig").ErrorInfo;
|
||||||
const ErrorInfo = err_.ErrorInfo;
|
|
||||||
|
|
||||||
const sta_ = @import("./statistics.zig");
|
const Statistics = @import("./statistics.zig").Statistics;
|
||||||
const Statistics = sta_.Statistics;
|
const StatsCounts = @import("./statistics.zig").StatsCounts;
|
||||||
const StatsCounts = sta_.StatsCounts;
|
|
||||||
|
|
||||||
const thunk = @import("./thunk.zig");
|
const thunk = @import("./thunk.zig");
|
||||||
|
const checkUserDataType = @import("./thunk.zig").checkUserDataType;
|
||||||
|
|
||||||
pub const default_server_url: [:0]const u8 = nats_c.NATS_DEFAULT_URL;
|
pub const default_server_url: [:0]const u8 = nats_c.NATS_DEFAULT_URL;
|
||||||
|
|
||||||
@@ -343,7 +340,7 @@ pub const Connection = opaque {
|
|||||||
comptime T: type,
|
comptime T: type,
|
||||||
subject: [:0]const u8,
|
subject: [:0]const u8,
|
||||||
callback: SubscriptionCallbackSignature(T),
|
callback: SubscriptionCallbackSignature(T),
|
||||||
userdata: *T,
|
userdata: T,
|
||||||
) Error!*Subscription {
|
) Error!*Subscription {
|
||||||
var sub: *Subscription = undefined;
|
var sub: *Subscription = undefined;
|
||||||
const status = Status.fromInt(nats_c.natsConnection_Subscribe(
|
const status = Status.fromInt(nats_c.natsConnection_Subscribe(
|
||||||
@@ -351,7 +348,7 @@ pub const Connection = opaque {
|
|||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
subject.ptr,
|
subject.ptr,
|
||||||
makeSubscriptionCallbackThunk(T, callback),
|
makeSubscriptionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
));
|
));
|
||||||
return status.toError() orelse sub;
|
return status.toError() orelse sub;
|
||||||
}
|
}
|
||||||
@@ -362,7 +359,7 @@ pub const Connection = opaque {
|
|||||||
subject: [:0]const u8,
|
subject: [:0]const u8,
|
||||||
timeout: i64,
|
timeout: i64,
|
||||||
callback: SubscriptionCallbackSignature(T),
|
callback: SubscriptionCallbackSignature(T),
|
||||||
userdata: *T,
|
userdata: T,
|
||||||
) Error!*Subscription {
|
) Error!*Subscription {
|
||||||
var sub: *Subscription = undefined;
|
var sub: *Subscription = undefined;
|
||||||
|
|
||||||
@@ -372,7 +369,7 @@ pub const Connection = opaque {
|
|||||||
subject.ptr,
|
subject.ptr,
|
||||||
timeout,
|
timeout,
|
||||||
makeSubscriptionCallbackThunk(T, callback),
|
makeSubscriptionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
));
|
));
|
||||||
|
|
||||||
return status.toError() orelse sub;
|
return status.toError() orelse sub;
|
||||||
@@ -396,7 +393,7 @@ pub const Connection = opaque {
|
|||||||
subject: [:0]const u8,
|
subject: [:0]const u8,
|
||||||
queue_group: [:0]const u8,
|
queue_group: [:0]const u8,
|
||||||
callback: SubscriptionCallbackSignature(T),
|
callback: SubscriptionCallbackSignature(T),
|
||||||
userdata: *T,
|
userdata: T,
|
||||||
) Error!*Subscription {
|
) Error!*Subscription {
|
||||||
var sub: *Subscription = undefined;
|
var sub: *Subscription = undefined;
|
||||||
|
|
||||||
@@ -406,7 +403,7 @@ pub const Connection = opaque {
|
|||||||
subject.ptr,
|
subject.ptr,
|
||||||
queue_group.ptr,
|
queue_group.ptr,
|
||||||
makeSubscriptionCallbackThunk(T, callback),
|
makeSubscriptionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
));
|
));
|
||||||
|
|
||||||
return status.toError() orelse sub;
|
return status.toError() orelse sub;
|
||||||
@@ -419,7 +416,7 @@ pub const Connection = opaque {
|
|||||||
queue_group: [:0]const u8,
|
queue_group: [:0]const u8,
|
||||||
timeout: i64,
|
timeout: i64,
|
||||||
callback: SubscriptionCallbackSignature(T),
|
callback: SubscriptionCallbackSignature(T),
|
||||||
userdata: *T,
|
userdata: T,
|
||||||
) Error!*Subscription {
|
) Error!*Subscription {
|
||||||
var sub: *Subscription = undefined;
|
var sub: *Subscription = undefined;
|
||||||
|
|
||||||
@@ -430,7 +427,7 @@ pub const Connection = opaque {
|
|||||||
queue_group.ptr,
|
queue_group.ptr,
|
||||||
timeout,
|
timeout,
|
||||||
makeSubscriptionCallbackThunk(T, callback),
|
makeSubscriptionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
));
|
));
|
||||||
|
|
||||||
return status.toError() orelse sub;
|
return status.toError() orelse sub;
|
||||||
@@ -502,12 +499,12 @@ pub const ConnectionOptions = opaque {
|
|||||||
self: *ConnectionOptions,
|
self: *ConnectionOptions,
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const TokenCallbackSignature(T),
|
comptime callback: *const TokenCallbackSignature(T),
|
||||||
userdata: *T,
|
userdata: T,
|
||||||
) Error!void {
|
) Error!void {
|
||||||
return Status.fromInt(nats_c.natsOptions_SetTokenHandler(
|
return Status.fromInt(nats_c.natsOptions_SetTokenHandler(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
makeTokenCallbackThunk(T, callback),
|
makeTokenCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -641,13 +638,13 @@ pub const ConnectionOptions = opaque {
|
|||||||
self: *ConnectionOptions,
|
self: *ConnectionOptions,
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const ReconnectDelayCallbackSignature(T),
|
comptime callback: *const ReconnectDelayCallbackSignature(T),
|
||||||
userdata: *T,
|
userdata: T,
|
||||||
) Error!void {
|
) Error!void {
|
||||||
return Status.fromInt(
|
return Status.fromInt(
|
||||||
nats_c.natsOptions_SetCustomReconnectDelay(
|
nats_c.natsOptions_SetCustomReconnectDelay(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
makeReconnectDelayCallbackThunk(T, callback),
|
makeReconnectDelayCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
),
|
),
|
||||||
).raise();
|
).raise();
|
||||||
}
|
}
|
||||||
@@ -668,13 +665,13 @@ pub const ConnectionOptions = opaque {
|
|||||||
self: *ConnectionOptions,
|
self: *ConnectionOptions,
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const ErrorHandlerCallbackSignature(T),
|
comptime callback: *const ErrorHandlerCallbackSignature(T),
|
||||||
userdata: *T,
|
userdata: T,
|
||||||
) Error!void {
|
) Error!void {
|
||||||
return Status.fromInt(
|
return Status.fromInt(
|
||||||
nats_c.natsOptions_SetErrorHandler(
|
nats_c.natsOptions_SetErrorHandler(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
makeErrorHandlerCallbackThunk(T, callback),
|
makeErrorHandlerCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
),
|
),
|
||||||
).raise();
|
).raise();
|
||||||
}
|
}
|
||||||
@@ -683,12 +680,12 @@ pub const ConnectionOptions = opaque {
|
|||||||
self: *ConnectionOptions,
|
self: *ConnectionOptions,
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const ConnectionCallbackSignature(T),
|
comptime callback: *const ConnectionCallbackSignature(T),
|
||||||
userdata: *T,
|
userdata: T,
|
||||||
) Error!void {
|
) Error!void {
|
||||||
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
makeConnectionCallbackThunk(T, callback),
|
makeConnectionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -696,12 +693,12 @@ pub const ConnectionOptions = opaque {
|
|||||||
self: *ConnectionOptions,
|
self: *ConnectionOptions,
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const ConnectionCallbackSignature(T),
|
comptime callback: *const ConnectionCallbackSignature(T),
|
||||||
userdata: *T,
|
userdata: T,
|
||||||
) Error!void {
|
) Error!void {
|
||||||
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
makeConnectionCallbackThunk(T, callback),
|
makeConnectionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -709,12 +706,12 @@ pub const ConnectionOptions = opaque {
|
|||||||
self: *ConnectionOptions,
|
self: *ConnectionOptions,
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const ConnectionCallbackSignature(T),
|
comptime callback: *const ConnectionCallbackSignature(T),
|
||||||
userdata: *T,
|
userdata: T,
|
||||||
) Error!void {
|
) Error!void {
|
||||||
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
makeConnectionCallbackThunk(T, callback),
|
makeConnectionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -722,12 +719,12 @@ pub const ConnectionOptions = opaque {
|
|||||||
self: *ConnectionOptions,
|
self: *ConnectionOptions,
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const ConnectionCallbackSignature(T),
|
comptime callback: *const ConnectionCallbackSignature(T),
|
||||||
userdata: *T,
|
userdata: T,
|
||||||
) Error!void {
|
) Error!void {
|
||||||
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
makeConnectionCallbackThunk(T, callback),
|
makeConnectionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -735,12 +732,12 @@ pub const ConnectionOptions = opaque {
|
|||||||
self: *ConnectionOptions,
|
self: *ConnectionOptions,
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const ConnectionCallbackSignature(T),
|
comptime callback: *const ConnectionCallbackSignature(T),
|
||||||
userdata: *T,
|
userdata: T,
|
||||||
) Error!void {
|
) Error!void {
|
||||||
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
return Status.fromInt(nats_c.natsOptions_SetClosedCB(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
makeConnectionCallbackThunk(T, callback),
|
makeConnectionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -752,11 +749,11 @@ pub const ConnectionOptions = opaque {
|
|||||||
comptime read_callback: *const AttachEventLoopCallbackSignature(T),
|
comptime read_callback: *const AttachEventLoopCallbackSignature(T),
|
||||||
comptime write_callback: *const AttachEventLoopCallbackSignature(T),
|
comptime write_callback: *const AttachEventLoopCallbackSignature(T),
|
||||||
comptime detach_callback: *const thunk.SimpleCallbackSignature(T),
|
comptime detach_callback: *const thunk.SimpleCallbackSignature(T),
|
||||||
loop: *L,
|
loop: L,
|
||||||
) Error!void {
|
) Error!void {
|
||||||
return Status.fromInt(nats_c.natsOptions_SetEventLoop(
|
return Status.fromInt(nats_c.natsOptions_SetEventLoop(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
@ptrCast(loop),
|
@constCast(@ptrCast(loop)),
|
||||||
makeAttachEventLoopCallbackThunk(T, L, attach_callback),
|
makeAttachEventLoopCallbackThunk(T, L, attach_callback),
|
||||||
makeEventLoopAddRemoveCallbackThunk(T, read_callback),
|
makeEventLoopAddRemoveCallbackThunk(T, read_callback),
|
||||||
makeEventLoopAddRemoveCallbackThunk(T, write_callback),
|
makeEventLoopAddRemoveCallbackThunk(T, write_callback),
|
||||||
@@ -819,13 +816,13 @@ pub const ConnectionOptions = opaque {
|
|||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const ConnectionCallbackSignature(T),
|
comptime callback: *const ConnectionCallbackSignature(T),
|
||||||
retry: bool,
|
retry: bool,
|
||||||
userdata: *T,
|
userdata: T,
|
||||||
) Error!void {
|
) Error!void {
|
||||||
return Status.fromInt(nats_c.natsOptions_SetRetryOnFailedConnect(
|
return Status.fromInt(nats_c.natsOptions_SetRetryOnFailedConnect(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
retry,
|
retry,
|
||||||
makeConnectionCallbackThunk(T, callback),
|
makeConnectionCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -835,19 +832,23 @@ pub const ConnectionOptions = opaque {
|
|||||||
comptime U: type,
|
comptime U: type,
|
||||||
comptime jwt_callback: *const JwtHandlerCallbackSignature(T),
|
comptime jwt_callback: *const JwtHandlerCallbackSignature(T),
|
||||||
comptime sig_callback: *const SignatureHandlerCallbackSignature(U),
|
comptime sig_callback: *const SignatureHandlerCallbackSignature(U),
|
||||||
jwt_userdata: *T,
|
jwt_userdata: T,
|
||||||
sig_userdata: *U,
|
sig_userdata: U,
|
||||||
) Error!void {
|
) Error!void {
|
||||||
return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks(
|
return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
makeJwtHandlerCallbackThunk(T, jwt_callback),
|
makeJwtHandlerCallbackThunk(T, jwt_callback),
|
||||||
jwt_userdata,
|
@constCast(@ptrCast(jwt_userdata)),
|
||||||
makeSignatureHandlerCallbackThunk(U, sig_callback),
|
makeSignatureHandlerCallbackThunk(U, sig_callback),
|
||||||
sig_userdata,
|
@constCast(@ptrCast(sig_userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn setUserCredentialsFromFiles(self: *ConnectionOptions, user_or_chained_file: [:0]const u8, seed_file: [:0]const u8) Error!void {
|
pub fn setUserCredentialsFromFiles(
|
||||||
|
self: *ConnectionOptions,
|
||||||
|
user_or_chained_file: [:0]const u8,
|
||||||
|
seed_file: [:0]const u8,
|
||||||
|
) Error!void {
|
||||||
return Status.fromInt(
|
return Status.fromInt(
|
||||||
nats_c.natsOptions_SetUserCredentialsFromFiles(
|
nats_c.natsOptions_SetUserCredentialsFromFiles(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
@@ -871,13 +872,13 @@ pub const ConnectionOptions = opaque {
|
|||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime sig_callback: *const SignatureHandlerCallbackSignature(T),
|
comptime sig_callback: *const SignatureHandlerCallbackSignature(T),
|
||||||
pub_key: [:0]const u8,
|
pub_key: [:0]const u8,
|
||||||
sig_userdata: *T,
|
sig_userdata: T,
|
||||||
) Error!void {
|
) Error!void {
|
||||||
return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks(
|
return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
pub_key.ptr,
|
pub_key.ptr,
|
||||||
makeSignatureHandlerCallbackThunk(T, sig_callback),
|
makeSignatureHandlerCallbackThunk(T, sig_callback),
|
||||||
sig_userdata,
|
@constCast(@ptrCast(sig_userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -919,16 +920,17 @@ pub const ConnectionOptions = opaque {
|
|||||||
const TokenCallback = fn (?*anyopaque) callconv(.C) [*c]const u8;
|
const TokenCallback = fn (?*anyopaque) callconv(.C) [*c]const u8;
|
||||||
|
|
||||||
pub fn TokenCallbackSignature(comptime T: type) type {
|
pub fn TokenCallbackSignature(comptime T: type) type {
|
||||||
return fn (*T) [:0]const u8;
|
return fn (T) [:0]const u8;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn makeTokenCallbackThunk(
|
fn makeTokenCallbackThunk(
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const TokenCallbackSignature(T),
|
comptime callback: *const TokenCallbackSignature(T),
|
||||||
) *const TokenCallback {
|
) *const TokenCallback {
|
||||||
|
comptime checkUserDataType(T);
|
||||||
return struct {
|
return struct {
|
||||||
fn thunk(userdata: ?*anyopaque) callconv(.C) [*c]const u8 {
|
fn thunk(userdata: ?*anyopaque) callconv(.C) [*c]const u8 {
|
||||||
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
||||||
return callback(data).ptr;
|
return callback(data).ptr;
|
||||||
}
|
}
|
||||||
}.thunk;
|
}.thunk;
|
||||||
@@ -937,17 +939,18 @@ fn makeTokenCallbackThunk(
|
|||||||
const ConnectionCallback = fn (?*nats_c.natsConnection, ?*anyopaque) callconv(.C) void;
|
const ConnectionCallback = fn (?*nats_c.natsConnection, ?*anyopaque) callconv(.C) void;
|
||||||
|
|
||||||
pub fn ConnectionCallbackSignature(comptime T: type) type {
|
pub fn ConnectionCallbackSignature(comptime T: type) type {
|
||||||
return fn (*T, *Connection) void;
|
return fn (T, *Connection) void;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn makeConnectionCallbackThunk(
|
fn makeConnectionCallbackThunk(
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const ConnectionCallbackSignature(T),
|
comptime callback: *const ConnectionCallbackSignature(T),
|
||||||
) *const ConnectionCallback {
|
) *const ConnectionCallback {
|
||||||
|
comptime checkUserDataType(T);
|
||||||
return struct {
|
return struct {
|
||||||
fn thunk(conn: ?*nats_c.natsConnection, userdata: ?*anyopaque) callconv(.C) void {
|
fn thunk(conn: ?*nats_c.natsConnection, userdata: ?*anyopaque) callconv(.C) void {
|
||||||
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
|
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
|
||||||
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
||||||
callback(data, connection);
|
callback(data, connection);
|
||||||
}
|
}
|
||||||
}.thunk;
|
}.thunk;
|
||||||
@@ -956,13 +959,14 @@ fn makeConnectionCallbackThunk(
|
|||||||
const ReconnectDelayCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) callconv(.C) i64;
|
const ReconnectDelayCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) callconv(.C) i64;
|
||||||
|
|
||||||
pub fn ReconnectDelayCallbackSignature(comptime T: type) type {
|
pub fn ReconnectDelayCallbackSignature(comptime T: type) type {
|
||||||
return fn (*T, *Connection, c_int) i64;
|
return fn (T, *Connection, c_int) i64;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn makeReconnectDelayCallbackThunk(
|
fn makeReconnectDelayCallbackThunk(
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const ReconnectDelayCallbackSignature(T),
|
comptime callback: *const ReconnectDelayCallbackSignature(T),
|
||||||
) *const ReconnectDelayCallback {
|
) *const ReconnectDelayCallback {
|
||||||
|
comptime checkUserDataType(T);
|
||||||
return struct {
|
return struct {
|
||||||
fn thunk(
|
fn thunk(
|
||||||
conn: ?*nats_c.natsConnection,
|
conn: ?*nats_c.natsConnection,
|
||||||
@@ -970,7 +974,7 @@ fn makeReconnectDelayCallbackThunk(
|
|||||||
userdata: ?*anyopaque,
|
userdata: ?*anyopaque,
|
||||||
) callconv(.C) i64 {
|
) callconv(.C) i64 {
|
||||||
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
|
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
|
||||||
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
||||||
return callback(data, connection, attempts);
|
return callback(data, connection, attempts);
|
||||||
}
|
}
|
||||||
}.thunk;
|
}.thunk;
|
||||||
@@ -984,13 +988,14 @@ const ErrorHandlerCallback = fn (
|
|||||||
) callconv(.C) void;
|
) callconv(.C) void;
|
||||||
|
|
||||||
pub fn ErrorHandlerCallbackSignature(comptime T: type) type {
|
pub fn ErrorHandlerCallbackSignature(comptime T: type) type {
|
||||||
return fn (*T, *Connection, *Subscription, Status) void;
|
return fn (T, *Connection, *Subscription, Status) void;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn makeErrorHandlerCallbackThunk(
|
fn makeErrorHandlerCallbackThunk(
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const ErrorHandlerCallbackSignature(T),
|
comptime callback: *const ErrorHandlerCallbackSignature(T),
|
||||||
) *const ErrorHandlerCallback {
|
) *const ErrorHandlerCallback {
|
||||||
|
comptime checkUserDataType(T);
|
||||||
return struct {
|
return struct {
|
||||||
fn thunk(
|
fn thunk(
|
||||||
conn: ?*nats_c.natsConnection,
|
conn: ?*nats_c.natsConnection,
|
||||||
@@ -1000,7 +1005,7 @@ fn makeErrorHandlerCallbackThunk(
|
|||||||
) callconv(.C) void {
|
) callconv(.C) void {
|
||||||
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
|
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
|
||||||
const subscription: *Subscription = if (sub) |s| @ptrCast(s) else unreachable;
|
const subscription: *Subscription = if (sub) |s| @ptrCast(s) else unreachable;
|
||||||
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
||||||
|
|
||||||
callback(data, connection, subscription, Status.fromInt(status));
|
callback(data, connection, subscription, Status.fromInt(status));
|
||||||
}
|
}
|
||||||
@@ -1016,7 +1021,7 @@ const AttachEventLoopCallback = fn (
|
|||||||
) callconv(.C) nats_c.natsStatus;
|
) callconv(.C) nats_c.natsStatus;
|
||||||
|
|
||||||
pub fn AttachEventLoopCallbackSignature(comptime T: type, comptime L: type) type {
|
pub fn AttachEventLoopCallbackSignature(comptime T: type, comptime L: type) type {
|
||||||
return fn (*L, *Connection, c_int) anyerror!*T;
|
return fn (L, *Connection, c_int) anyerror!T;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn makeAttachEventLoopCallbackThunk(
|
fn makeAttachEventLoopCallbackThunk(
|
||||||
@@ -1024,6 +1029,8 @@ fn makeAttachEventLoopCallbackThunk(
|
|||||||
comptime L: type,
|
comptime L: type,
|
||||||
comptime callback: *const AttachEventLoopCallbackSignature(T, L),
|
comptime callback: *const AttachEventLoopCallbackSignature(T, L),
|
||||||
) *const ReconnectDelayCallback {
|
) *const ReconnectDelayCallback {
|
||||||
|
comptime checkUserDataType(T);
|
||||||
|
comptime checkUserDataType(L);
|
||||||
return struct {
|
return struct {
|
||||||
fn thunk(
|
fn thunk(
|
||||||
userdata: *?*anyopaque,
|
userdata: *?*anyopaque,
|
||||||
@@ -1032,7 +1039,7 @@ fn makeAttachEventLoopCallbackThunk(
|
|||||||
sock: ?*nats_c.natsSock,
|
sock: ?*nats_c.natsSock,
|
||||||
) callconv(.C) nats_c.natsStatus {
|
) callconv(.C) nats_c.natsStatus {
|
||||||
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
|
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
|
||||||
const ev_loop: *L = if (loop) |l| @ptrCast(l) else unreachable;
|
const ev_loop: L = if (loop) |l| @alignCast(@ptrCast(l)) else unreachable;
|
||||||
|
|
||||||
userdata.* = callback(ev_loop, connection, sock) catch |err|
|
userdata.* = callback(ev_loop, connection, sock) catch |err|
|
||||||
return Status.fromError(err).toInt();
|
return Status.fromError(err).toInt();
|
||||||
@@ -1045,13 +1052,14 @@ fn makeAttachEventLoopCallbackThunk(
|
|||||||
const EventLoopAddRemoveCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) callconv(.C) nats_c.natsStatus;
|
const EventLoopAddRemoveCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) callconv(.C) nats_c.natsStatus;
|
||||||
|
|
||||||
pub fn EventLoopAddRemoveCallbackSignature(comptime T: type) type {
|
pub fn EventLoopAddRemoveCallbackSignature(comptime T: type) type {
|
||||||
return fn (*T, *Connection, c_int) anyerror!void;
|
return fn (T, *Connection, c_int) anyerror!void;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn makeEventLoopAddRemoveCallbackThunk(
|
fn makeEventLoopAddRemoveCallbackThunk(
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const EventLoopAddRemoveCallbackSignature(T),
|
comptime callback: *const EventLoopAddRemoveCallbackSignature(T),
|
||||||
) *const ReconnectDelayCallback {
|
) *const ReconnectDelayCallback {
|
||||||
|
comptime checkUserDataType(T);
|
||||||
return struct {
|
return struct {
|
||||||
fn thunk(
|
fn thunk(
|
||||||
conn: ?*nats_c.natsConnection,
|
conn: ?*nats_c.natsConnection,
|
||||||
@@ -1059,7 +1067,7 @@ fn makeEventLoopAddRemoveCallbackThunk(
|
|||||||
userdata: ?*anyopaque,
|
userdata: ?*anyopaque,
|
||||||
) callconv(.C) nats_c.natsStatus {
|
) callconv(.C) nats_c.natsStatus {
|
||||||
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
|
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
|
||||||
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
||||||
callback(data, connection, attempts) catch |err|
|
callback(data, connection, attempts) catch |err|
|
||||||
return Status.fromError(err).toInt();
|
return Status.fromError(err).toInt();
|
||||||
|
|
||||||
@@ -1071,18 +1079,19 @@ fn makeEventLoopAddRemoveCallbackThunk(
|
|||||||
const EventLoopDetachCallback = fn (?*anyopaque) callconv(.C) nats_c.natsStatus;
|
const EventLoopDetachCallback = fn (?*anyopaque) callconv(.C) nats_c.natsStatus;
|
||||||
|
|
||||||
pub fn EventLoopDetachCallbackSignature(comptime T: type) type {
|
pub fn EventLoopDetachCallbackSignature(comptime T: type) type {
|
||||||
return fn (*T) anyerror!void;
|
return fn (T) anyerror!void;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn makeEventLoopDetachCallbackThunk(
|
fn makeEventLoopDetachCallbackThunk(
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const EventLoopDetachCallbackSignature(T),
|
comptime callback: *const EventLoopDetachCallbackSignature(T),
|
||||||
) *const ReconnectDelayCallback {
|
) *const ReconnectDelayCallback {
|
||||||
|
comptime checkUserDataType(T);
|
||||||
return struct {
|
return struct {
|
||||||
fn thunk(
|
fn thunk(
|
||||||
userdata: ?*anyopaque,
|
userdata: ?*anyopaque,
|
||||||
) callconv(.C) nats_c.natsStatus {
|
) callconv(.C) nats_c.natsStatus {
|
||||||
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
||||||
callback(data) catch |err| return Status.fromError(err).toInt();
|
callback(data) catch |err| return Status.fromError(err).toInt();
|
||||||
return nats_c.NATS_OK;
|
return nats_c.NATS_OK;
|
||||||
}
|
}
|
||||||
@@ -1099,20 +1108,21 @@ pub const JwtResponseOrError = union(enum) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
pub fn JwtHandlerCallbackSignature(comptime T: type) type {
|
pub fn JwtHandlerCallbackSignature(comptime T: type) type {
|
||||||
return fn (*T) JwtResponseOrError;
|
return fn (T) JwtResponseOrError;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn makeJwtHandlerCallbackThunk(
|
fn makeJwtHandlerCallbackThunk(
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const JwtHandlerCallbackSignature(T),
|
comptime callback: *const JwtHandlerCallbackSignature(T),
|
||||||
) *const JwtHandlerCallback {
|
) *const JwtHandlerCallback {
|
||||||
|
comptime checkUserDataType(T);
|
||||||
return struct {
|
return struct {
|
||||||
fn thunk(
|
fn thunk(
|
||||||
jwt_out_raw: ?*?[*:0]u8,
|
jwt_out_raw: ?*?[*:0]u8,
|
||||||
err_out_raw: ?*?[*:0]u8,
|
err_out_raw: ?*?[*:0]u8,
|
||||||
userdata: ?*anyopaque,
|
userdata: ?*anyopaque,
|
||||||
) callconv(.C) nats_c.natsStatus {
|
) callconv(.C) nats_c.natsStatus {
|
||||||
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
||||||
const err_out = err_out_raw orelse unreachable;
|
const err_out = err_out_raw orelse unreachable;
|
||||||
const jwt_out = jwt_out_raw orelse unreachable;
|
const jwt_out = jwt_out_raw orelse unreachable;
|
||||||
|
|
||||||
@@ -1140,13 +1150,14 @@ pub const SignatureResponseOrError = union(enum) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
pub fn SignatureHandlerCallbackSignature(comptime T: type) type {
|
pub fn SignatureHandlerCallbackSignature(comptime T: type) type {
|
||||||
return fn (*T, [:0]const u8) SignatureResponseOrError;
|
return fn (T, [:0]const u8) SignatureResponseOrError;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn makeSignatureHandlerCallbackThunk(
|
fn makeSignatureHandlerCallbackThunk(
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const SignatureHandlerCallbackSignature(T),
|
comptime callback: *const SignatureHandlerCallbackSignature(T),
|
||||||
) *const SignatureHandlerCallback {
|
) *const SignatureHandlerCallback {
|
||||||
|
comptime checkUserDataType(T);
|
||||||
return struct {
|
return struct {
|
||||||
fn thunk(
|
fn thunk(
|
||||||
err_out_raw: ?*?[*:0]u8,
|
err_out_raw: ?*?[*:0]u8,
|
||||||
@@ -1155,7 +1166,7 @@ fn makeSignatureHandlerCallbackThunk(
|
|||||||
nonsense: ?[*:0]const u8,
|
nonsense: ?[*:0]const u8,
|
||||||
userdata: ?*anyopaque,
|
userdata: ?*anyopaque,
|
||||||
) callconv(.C) nats_c.natsStatus {
|
) callconv(.C) nats_c.natsStatus {
|
||||||
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
||||||
const nonce = nonsense orelse unreachable;
|
const nonce = nonsense orelse unreachable;
|
||||||
const err_out = err_out_raw orelse unreachable;
|
const err_out = err_out_raw orelse unreachable;
|
||||||
const sig_out = sig_out_raw orelse unreachable;
|
const sig_out = sig_out_raw orelse unreachable;
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ const Error = err_.Error;
|
|||||||
const Status = err_.Status;
|
const Status = err_.Status;
|
||||||
|
|
||||||
const thunk = @import("./thunk.zig");
|
const thunk = @import("./thunk.zig");
|
||||||
|
const checkUserDataType = @import("./thunk.zig").checkUserDataType;
|
||||||
|
|
||||||
pub const Subscription = opaque {
|
pub const Subscription = opaque {
|
||||||
pub const MessageCount = struct {
|
pub const MessageCount = struct {
|
||||||
@@ -173,12 +174,12 @@ pub const Subscription = opaque {
|
|||||||
self: *Subscription,
|
self: *Subscription,
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const thunk.SimpleCallbackThunkSignature(T),
|
comptime callback: *const thunk.SimpleCallbackThunkSignature(T),
|
||||||
userdata: *T,
|
userdata: T,
|
||||||
) Error!void {
|
) Error!void {
|
||||||
return Status.fromInt(nats_c.natsSubscription_SetOnCompleteCB(
|
return Status.fromInt(nats_c.natsSubscription_SetOnCompleteCB(
|
||||||
@ptrCast(self),
|
@ptrCast(self),
|
||||||
thunk.makeSimpleCallbackThunk(callback),
|
thunk.makeSimpleCallbackThunk(T, callback),
|
||||||
userdata,
|
@constCast(@ptrCast(userdata)),
|
||||||
)).raise();
|
)).raise();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -191,13 +192,14 @@ const SubscriptionCallback = fn (
|
|||||||
) callconv(.C) void;
|
) callconv(.C) void;
|
||||||
|
|
||||||
pub fn SubscriptionCallbackSignature(comptime T: type) type {
|
pub fn SubscriptionCallbackSignature(comptime T: type) type {
|
||||||
return fn (*T, *Connection, *Subscription, *Message) void;
|
return fn (T, *Connection, *Subscription, *Message) void;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn makeSubscriptionCallbackThunk(
|
pub fn makeSubscriptionCallbackThunk(
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const SubscriptionCallbackSignature(T),
|
comptime callback: *const SubscriptionCallbackSignature(T),
|
||||||
) *const SubscriptionCallback {
|
) *const SubscriptionCallback {
|
||||||
|
comptime checkUserDataType(T);
|
||||||
return struct {
|
return struct {
|
||||||
fn thunk(
|
fn thunk(
|
||||||
conn: ?*nats_c.natsConnection,
|
conn: ?*nats_c.natsConnection,
|
||||||
@@ -211,7 +213,7 @@ pub fn makeSubscriptionCallbackThunk(
|
|||||||
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
|
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
|
||||||
const subscription: *Subscription = if (sub) |s| @ptrCast(s) else unreachable;
|
const subscription: *Subscription = if (sub) |s| @ptrCast(s) else unreachable;
|
||||||
|
|
||||||
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
||||||
|
|
||||||
callback(data, connection, subscription, message);
|
callback(data, connection, subscription, message);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,19 +18,46 @@ pub const nats_c = @cImport({
|
|||||||
@cInclude("nats/nats.h");
|
@cInclude("nats/nats.h");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
pub fn checkUserDataType(comptime T: type) void {
|
||||||
|
switch (@typeInfo(T)) {
|
||||||
|
.Optional => |info| switch (@typeInfo(info.child)) {
|
||||||
|
.Optional => @compileError(
|
||||||
|
"nats callbacks can only accept an (optional) single, many," ++
|
||||||
|
" or c pointer as userdata. \"" ++
|
||||||
|
@typeName(T) ++ "\" has more than one optional specifier.",
|
||||||
|
),
|
||||||
|
else => checkUserDataType(info.child),
|
||||||
|
},
|
||||||
|
.Pointer => |info| switch (info.size) {
|
||||||
|
.Slice => @compileError(
|
||||||
|
"nats callbacks can only accept an (optional) single, many," ++
|
||||||
|
" or c pointer as userdata, not slices. \"" ++
|
||||||
|
@typeName(T) ++ "\" appears to be a slice.",
|
||||||
|
),
|
||||||
|
else => {},
|
||||||
|
},
|
||||||
|
else => @compileError(
|
||||||
|
"nats callbacks can only accept an (optional) single, many," ++
|
||||||
|
" or c pointer as userdata. \"" ++
|
||||||
|
@typeName(T) ++ "\" is not a pointer type.",
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const SimpleCallback = fn (?*anyopaque) callconv(.C) void;
|
const SimpleCallback = fn (?*anyopaque) callconv(.C) void;
|
||||||
|
|
||||||
pub fn SimpleCallbackThunkSignature(comptime T: type) type {
|
pub fn SimpleCallbackThunkSignature(comptime T: type) type {
|
||||||
return fn (*T) void;
|
return fn (T) void;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn makeSimpleCallbackThunk(
|
pub fn makeSimpleCallbackThunk(
|
||||||
comptime T: type,
|
comptime T: type,
|
||||||
comptime callback: *const SimpleCallbackThunkSignature(T),
|
comptime callback: *const SimpleCallbackThunkSignature(T),
|
||||||
) *const SimpleCallback {
|
) *const SimpleCallback {
|
||||||
|
comptime checkUserDataType(T);
|
||||||
return struct {
|
return struct {
|
||||||
fn thunk(userdata: ?*anyopaque) callconv(.C) void {
|
fn thunk(userdata: ?*anyopaque) callconv(.C) void {
|
||||||
const data: *T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
const data: T = if (userdata) |u| @alignCast(@ptrCast(u)) else unreachable;
|
||||||
callback(data);
|
callback(data);
|
||||||
}
|
}
|
||||||
}.thunk;
|
}.thunk;
|
||||||
|
|||||||
@@ -99,7 +99,7 @@ test "nats.Connection" {
|
|||||||
connection.drainTimeout(1000) catch {};
|
connection.drainTimeout(1000) catch {};
|
||||||
}
|
}
|
||||||
|
|
||||||
fn reconnectDelayHandler(userdata: *u32, connection: *nats.Connection, attempts: c_int) i64 {
|
fn reconnectDelayHandler(userdata: *const u32, connection: *nats.Connection, attempts: c_int) i64 {
|
||||||
_ = userdata;
|
_ = userdata;
|
||||||
_ = connection;
|
_ = connection;
|
||||||
_ = attempts;
|
_ = attempts;
|
||||||
@@ -108,7 +108,7 @@ fn reconnectDelayHandler(userdata: *u32, connection: *nats.Connection, attempts:
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn errorHandler(
|
fn errorHandler(
|
||||||
userdata: *u32,
|
userdata: *const u32,
|
||||||
connection: *nats.Connection,
|
connection: *nats.Connection,
|
||||||
subscription: *nats.Subscription,
|
subscription: *nats.Subscription,
|
||||||
status: nats.Status,
|
status: nats.Status,
|
||||||
@@ -119,18 +119,18 @@ fn errorHandler(
|
|||||||
_ = status;
|
_ = status;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn connectionHandler(userdata: *u32, connection: *nats.Connection) void {
|
fn connectionHandler(userdata: *const u32, connection: *nats.Connection) void {
|
||||||
_ = userdata;
|
_ = userdata;
|
||||||
_ = connection;
|
_ = connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn jwtHandler(userdata: *u32) nats.JwtResponseOrError {
|
fn jwtHandler(userdata: *const u32) nats.JwtResponseOrError {
|
||||||
_ = userdata;
|
_ = userdata;
|
||||||
// return .{ .jwt = std.heap.raw_c_allocator.dupeZ(u8, "abcdef") catch @panic("no!") };
|
// return .{ .jwt = std.heap.raw_c_allocator.dupeZ(u8, "abcdef") catch @panic("no!") };
|
||||||
return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "dang") catch @panic("no!") };
|
return .{ .error_message = std.heap.raw_c_allocator.dupeZ(u8, "dang") catch @panic("no!") };
|
||||||
}
|
}
|
||||||
|
|
||||||
fn signatureHandler(userdata: *u32, nonce: [:0]const u8) nats.SignatureResponseOrError {
|
fn signatureHandler(userdata: *const u32, nonce: [:0]const u8) nats.SignatureResponseOrError {
|
||||||
_ = userdata;
|
_ = userdata;
|
||||||
_ = nonce;
|
_ = nonce;
|
||||||
// return .{ .signature = std.heap.raw_c_allocator.dupe(u8, "01230123") catch @panic("no!") };
|
// return .{ .signature = std.heap.raw_c_allocator.dupe(u8, "01230123") catch @panic("no!") };
|
||||||
@@ -144,7 +144,7 @@ test "nats.ConnectionOptions" {
|
|||||||
const options = try nats.ConnectionOptions.create();
|
const options = try nats.ConnectionOptions.create();
|
||||||
defer options.destroy();
|
defer options.destroy();
|
||||||
|
|
||||||
var userdata: u32 = 0;
|
const userdata: u32 = 0;
|
||||||
|
|
||||||
try options.setUrl(nats.default_server_url);
|
try options.setUrl(nats.default_server_url);
|
||||||
const servers = [_][*:0]const u8{ "nats://127.0.0.1:4442", "nats://127.0.0.1:4443" };
|
const servers = [_][*:0]const u8{ "nats://127.0.0.1:4442", "nats://127.0.0.1:4443" };
|
||||||
@@ -164,14 +164,14 @@ test "nats.ConnectionOptions" {
|
|||||||
try options.setMaxReconnect(10);
|
try options.setMaxReconnect(10);
|
||||||
try options.setReconnectWait(500);
|
try options.setReconnectWait(500);
|
||||||
try options.setReconnectJitter(100, 200);
|
try options.setReconnectJitter(100, 200);
|
||||||
try options.setCustomReconnectDelay(u32, reconnectDelayHandler, &userdata);
|
try options.setCustomReconnectDelay(*const u32, reconnectDelayHandler, &userdata);
|
||||||
try options.setReconnectBufSize(1024);
|
try options.setReconnectBufSize(1024);
|
||||||
try options.setMaxPendingMessages(50);
|
try options.setMaxPendingMessages(50);
|
||||||
try options.setErrorHandler(u32, errorHandler, &userdata);
|
try options.setErrorHandler(*const u32, errorHandler, &userdata);
|
||||||
try options.setClosedCallback(u32, connectionHandler, &userdata);
|
try options.setClosedCallback(*const u32, connectionHandler, &userdata);
|
||||||
try options.setDisconnectedCallback(u32, connectionHandler, &userdata);
|
try options.setDisconnectedCallback(*const u32, connectionHandler, &userdata);
|
||||||
try options.setDiscoveredServersCallback(u32, connectionHandler, &userdata);
|
try options.setDiscoveredServersCallback(*const u32, connectionHandler, &userdata);
|
||||||
try options.setLameDuckModeCallback(u32, connectionHandler, &userdata);
|
try options.setLameDuckModeCallback(*const u32, connectionHandler, &userdata);
|
||||||
try options.ignoreDiscoveredServers(true);
|
try options.ignoreDiscoveredServers(true);
|
||||||
try options.useGlobalMessageDelivery(false);
|
try options.useGlobalMessageDelivery(false);
|
||||||
try options.ipResolutionOrder(.ipv4_first);
|
try options.ipResolutionOrder(.ipv4_first);
|
||||||
@@ -179,8 +179,8 @@ test "nats.ConnectionOptions" {
|
|||||||
try options.useOldRequestStyle(false);
|
try options.useOldRequestStyle(false);
|
||||||
try options.setFailRequestsOnDisconnect(true);
|
try options.setFailRequestsOnDisconnect(true);
|
||||||
try options.setNoEcho(true);
|
try options.setNoEcho(true);
|
||||||
try options.setRetryOnFailedConnect(u32, connectionHandler, true, &userdata);
|
try options.setRetryOnFailedConnect(*const u32, connectionHandler, true, &userdata);
|
||||||
try options.setUserCredentialsCallbacks(u32, u32, jwtHandler, signatureHandler, &userdata, &userdata);
|
try options.setUserCredentialsCallbacks(*const u32, *const u32, jwtHandler, signatureHandler, &userdata, &userdata);
|
||||||
try options.setWriteDeadline(5);
|
try options.setWriteDeadline(5);
|
||||||
try options.disableNoResponders(true);
|
try options.disableNoResponders(true);
|
||||||
try options.setCustomInboxPrefix("_FOOBOX");
|
try options.setCustomInboxPrefix("_FOOBOX");
|
||||||
@@ -200,7 +200,7 @@ test "nats.ConnectionOptions (crypto edition)" {
|
|||||||
defer options.destroy();
|
defer options.destroy();
|
||||||
var userdata: u32 = 0;
|
var userdata: u32 = 0;
|
||||||
|
|
||||||
try options.setTokenHandler(u32, tokenHandler, &userdata);
|
try options.setTokenHandler(*u32, tokenHandler, &userdata);
|
||||||
try options.setSecure(false);
|
try options.setSecure(false);
|
||||||
try options.setCertificatesChain(rsa_cert, rsa_key);
|
try options.setCertificatesChain(rsa_cert, rsa_key);
|
||||||
try options.setCiphers("-ALL:HIGH");
|
try options.setCiphers("-ALL:HIGH");
|
||||||
|
|||||||
4
tests/data/genkeys.fish
Normal file
4
tests/data/genkeys.fish
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
openssl req -x509 -newkey rsa:4096 -sha256 -days 36500 -nodes -keyout client-rsa.key -out client-rsa.cert -subj "/C=XX/ST=StateName/L=CityName/O=CompanyName/OU=CompanySectionName/CN=tests.nats.zig"
|
||||||
|
openssl req -x509 -newkey rsa:4096 -sha256 -days 36500 -nodes -keyout server-rsa.key -out server-rsa.cert -subj "/C=XX/ST=StateName/L=CityName/O=CompanyName/OU=CompanySectionName/CN=tests.nats.zig"
|
||||||
|
openssl req -x509 -newkey ec -pkeyopt ec_paramgen_curve:secp384r1 -days 36500 -nodes -keyout client-ecc.key -out client-ecc.cert -subj "/C=XX/ST=StateName/L=CityName/O=CompanyName/OU=CompanySectionName/CN=tests.nats.zig"
|
||||||
|
openssl req -x509 -newkey ec -pkeyopt ec_paramgen_curve:secp384r1 -days 36500 -nodes -keyout server-ecc.key -out server-ecc.cert -subj "/C=XX/ST=StateName/L=CityName/O=CompanyName/OU=CompanySectionName/CN=tests.nats.zig"
|
||||||
@@ -80,18 +80,21 @@ test "nats.Subscription" {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn onMessage(
|
fn onMessage(
|
||||||
userdata: *u32,
|
userdata: *const u32,
|
||||||
connection: *nats.Connection,
|
connection: *nats.Connection,
|
||||||
subscription: *nats.Subscription,
|
subscription: *nats.Subscription,
|
||||||
message: *nats.Message,
|
message: *nats.Message,
|
||||||
) void {
|
) void {
|
||||||
_ = subscription;
|
_ = subscription;
|
||||||
|
_ = userdata;
|
||||||
|
|
||||||
if (message.getReply()) |reply| {
|
if (message.getReply()) |reply| {
|
||||||
connection.publish(reply, "greetings") catch @panic("OH NO");
|
connection.publish(reply, "greetings") catch @panic("OH NO");
|
||||||
} else @panic("HOW");
|
} else @panic("HOW");
|
||||||
|
}
|
||||||
|
|
||||||
userdata.* += 1;
|
fn onClose(userdata: *[]const u8) void {
|
||||||
|
userdata.* = "closed";
|
||||||
}
|
}
|
||||||
|
|
||||||
test "nats.Subscription (async)" {
|
test "nats.Subscription (async)" {
|
||||||
@@ -112,21 +115,30 @@ test "nats.Subscription (async)" {
|
|||||||
defer message.destroy();
|
defer message.destroy();
|
||||||
|
|
||||||
{
|
{
|
||||||
var count: u32 = 0;
|
var closed: []const u8 = "test";
|
||||||
const subscription = try connection.subscribe(u32, message_subject, onMessage, &count);
|
{
|
||||||
defer subscription.destroy();
|
const count: u32 = 0;
|
||||||
|
const subscription = try connection.subscribe(*const u32, message_subject, onMessage, &count);
|
||||||
|
defer subscription.destroy();
|
||||||
|
|
||||||
const response = try connection.requestMessage(message, 1000);
|
try subscription.setCompletionCallback(*[]const u8, onClose, &closed);
|
||||||
try std.testing.expectEqualStrings(
|
|
||||||
"greetings",
|
const response = try connection.requestMessage(message, 1000);
|
||||||
response.getData() orelse return error.TestUnexpectedResult,
|
try std.testing.expectEqualStrings(
|
||||||
);
|
"greetings",
|
||||||
|
response.getData() orelse return error.TestUnexpectedResult,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
// we have to sleep to allow the close callback to run. I am worried this may
|
||||||
|
// still end up being flaky, however.
|
||||||
|
nats.sleep(1);
|
||||||
|
try std.testing.expectEqualStrings("closed", closed);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
var count: u32 = 0;
|
const count: u32 = 0;
|
||||||
const subscription = try connection.subscribeTimeout(
|
const subscription = try connection.subscribeTimeout(
|
||||||
u32,
|
*const u32,
|
||||||
message_subject,
|
message_subject,
|
||||||
1000,
|
1000,
|
||||||
onMessage,
|
onMessage,
|
||||||
@@ -142,9 +154,9 @@ test "nats.Subscription (async)" {
|
|||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
var count: u32 = 0;
|
const count: u32 = 0;
|
||||||
const subscription = try connection.queueSubscribe(
|
const subscription = try connection.queueSubscribe(
|
||||||
u32,
|
*const u32,
|
||||||
message_subject,
|
message_subject,
|
||||||
"queuegroup",
|
"queuegroup",
|
||||||
onMessage,
|
onMessage,
|
||||||
@@ -162,7 +174,7 @@ test "nats.Subscription (async)" {
|
|||||||
{
|
{
|
||||||
var count: u32 = 0;
|
var count: u32 = 0;
|
||||||
const subscription = try connection.queueSubscribeTimeout(
|
const subscription = try connection.queueSubscribeTimeout(
|
||||||
u32,
|
*const u32,
|
||||||
message_subject,
|
message_subject,
|
||||||
"queuegroup",
|
"queuegroup",
|
||||||
1000,
|
1000,
|
||||||
|
|||||||
Reference in New Issue
Block a user