Compare commits

...

2 Commits

Author SHA1 Message Date
9a4c80861c
connection: finish wrapping connection methods
I believe the standard API surface is now completely wrapped, ignoring
whatever cool copy-paste errors or other mistakes I introduced which
are currently undetectable due to lazy analysis and lack of test
coverage. Cool.
2023-08-19 19:00:15 -07:00
39edf12d34
connection.ConnectionOptions: finish hooking up callbacks
This was kind of tedious, but there were some interesting choices to be
made with regards to how the callback signatures should be adapted,
especially in the few cases that heavily use out-params in C. Ignoring
the lack of test coverage, this puts us pretty close to having the
entire basic API wrapped.
2023-08-19 17:15:29 -07:00
5 changed files with 762 additions and 168 deletions

View File

@ -6,8 +6,8 @@ pub const nats_c = @cImport({
const sub_ = @import("./subscription.zig");
const Subscription = sub_.Subscription;
const SubscriptionThunkCallback = sub_.SubscriptionThunkCallback;
const subscriptionMessageThunk = sub_.subscriptionMessageThunk;
const SubscriptionCallbackSignature = sub_.SubscriptionCallbackSignature;
const makeSubscriptionCallbackThunk = sub_.makeSubscriptionCallbackThunk;
const msg_ = @import("./message.zig");
const Message = msg_.Message;
@ -15,13 +15,38 @@ const Message = msg_.Message;
const err_ = @import("./error.zig");
const Error = err_.Error;
const Status = err_.Status;
const ErrorInfo = err_.ErrorInfo;
const sta_ = @import("./statistics.zig");
const Statistics = sta_.Statistics;
const StatsCounts = sta_.StatsCounts;
const thunk = @import("./thunk.zig");
pub const default_server_url: [:0]const u8 = nats_c.NATS_DEFAULT_URL;
pub const ConnectionStatus = enum(c_int) {
disconnected = nats_c.DISCONNECTED,
connecting = nats_c.CONNECTING,
connected = nats_c.CONNECTED,
closed = nats_c.CLOSED,
reconnecting = nats_c.RECONNECTING,
draining_subs = nats_c.DRAINING_SUBS,
draining_pubs = nats_c.DRAINING_PUBS,
_,
pub fn fromInt(int: c_uint) ConnectionStatus {
return @enumFromInt(int);
}
};
pub const AddressPort = struct {
address: [:0]u8,
port: u16,
};
pub const Connection = opaque {
pub fn connect(options: *ConnectionOptions) Error!*Connection {
pub fn create(options: *ConnectionOptions) Error!*Connection {
var self: *Connection = undefined;
const status = Status.fromInt(nats_c.natsConnection_Connect(@ptrCast(&self), @ptrCast(options)));
return status.toError() orelse self;
@ -44,98 +69,402 @@ pub const Connection = opaque {
return nats_c.natsConnection_Destroy(@ptrCast(self));
}
pub fn processReadEvent(self: *Connection) void {
nats_c.natsConnection_ProcessReadEvent(@ptrCast(self));
}
pub fn processWriteEvent(self: *Connection) void {
nats_c.natsConnection_ProcessWriteEvent(@ptrCast(self));
}
pub fn isClosed(self: *Connection) bool {
return nats_c.natsConnection_IsClosed(@ptrCast(self));
}
pub fn isReconnecting(self: *Connection) bool {
return nats_c.natsConnection_IsReconnecting(@ptrCast(self));
}
pub fn getStatus(self: *Connection) ConnectionStatus {
return ConnectionStatus.fromInt(nats_c.natsConnection_Status(@ptrCast(self)));
}
pub fn bytesBuffered(self: *Connection) c_int {
return nats_c.natsConnection_Buffered(@ptrCast(self));
}
pub fn flush(self: *Connection) Error!void {
return Status.fromInt(nats_c.natsConnection_Flush(@ptrCast(self))).raise();
}
pub fn flushTimeout(self: *Connection, timeout: i64) Error!void {
return Status.fromInt(nats_c.natsConnection_FlushTimeout(@ptrCast(self), timeout)).raise();
}
pub fn getMaxPayload(self: *Connection) c_int {
return nats_c.natsConnection_GetMaxPayload(@ptrCast(self));
}
pub fn getStats(self: *Connection) Error!StatsCounts {
var stats = try Statistics.create();
defer stats.destroy();
const status = Status.fromInt(
nats_c.natsConnection_GetStats(@ptrCast(self), @ptrCast(stats)),
);
return status.toError() orelse stats.getCounts();
}
pub fn getConnectedUrl(self: *Connection, buffer: []u8) Error![:0]u8 {
const status = Status.fromInt(
nats_c.natsConnection_GetConnectedUrl(@ptrCast(self), buffer.ptr, buffer.len),
);
// cast this to a c pointer so that sliceTo properly returns the sentinel
// terminated type, which is guaranteed by the backing library.
return status.toError() orelse std.mem.sliceTo(@as([*c]u8, buffer.ptr), 0);
}
pub fn getConnectedServerId(self: *Connection, buffer: []u8) Error![:0]u8 {
const status = Status.fromInt(
nats_c.natsConnection_GetConnectedServerId(@ptrCast(self), buffer.ptr, buffer.len),
);
// cast this to a c pointer so that sliceTo properly returns the sentinel
// terminated type, which is guaranteed by the backing library.
return status.toError() orelse std.mem.sliceTo(@as([*c]u8, buffer.ptr), 0);
}
pub fn getServers(self: *Connection) Error![][*:0]u8 {
var servers: [*][*:0]u8 = undefined;
var count: c_int = 0;
const status = Status.fromInt(
nats_c.natsConnection_GetServers(@ptrCast(self), &servers, &count),
);
return status.toError() orelse servers[0..@intCast(count)];
}
pub fn getDiscoveredServers(self: *Connection) Error![][*:0]u8 {
var servers: [*][*:0]u8 = undefined;
var count: c_int = 0;
const status = Status.fromInt(
nats_c.natsConnection_GetDiscoveredServers(@ptrCast(self), &servers, &count),
);
return status.toError() orelse servers[0..@intCast(count)];
}
pub fn getLastError(self: *Connection) ErrorInfo {
var desc: [*:0]const u8 = 0;
const status = nats_c.natsConnection_GetLastError(@ptrCast(self), &desc);
return .{
.code = Status.fromInt(status).toError(),
.desc = std.mem.sliceTo(desc, 0),
};
}
pub fn getClientId(self: *Connection) Error!u64 {
var id: u64 = 0;
const status = Status.fromInt(
nats_c.natsConnection_GetClientID(@ptrCast(self), &id),
);
return status.toError() orelse id;
}
pub fn drain(self: *Connection) Error!void {
return Status.fromInt(nats_c.natsConnection_Drain(@ptrCast(self))).raise();
}
pub fn drainTimeout(self: *Connection, timeout: i64) Error!void {
return Status.fromInt(nats_c.natsConnection_DrainTimeout(@ptrCast(self), timeout)).raise();
}
pub fn sign(self: *Connection, message: []const u8) Error![64]u8 {
var sig = [_]u8{0} ** 64;
const status = Status.fromInt(
nats_c.natsConnection_Sign(@ptrCast(self), message.ptr, message.len, &sig),
);
return status.toError() orelse sig;
}
pub fn getClientIp(self: *Connection) Error![:0]u8 {
var ip: [*c]u8 = null;
const status = Status.fromInt(
nats_c.natsConnection_GetClientIP(@ptrCast(self), &ip),
);
return status.toError() orelse std.mem.sliceTo(ip, 0);
}
pub fn getLocalIpAndPort(self: *Connection) Error!AddressPort {
var address: [*:0]u8 = undefined;
var port: c_int = 0;
const status = Status.fromInt(
nats_c.natsConnection_GetLocalIPAndPort(@ptrCast(self), &address, &port),
);
return status.toError() orelse .{
.address = std.mem.sliceTo(address, 0),
.port = @intCast(port),
};
}
pub fn getRtt(self: *Connection) Error!i64 {
var rtt: i64 = 0;
const status = Status.fromInt(
nats_c.natsConnection_GetRTT(@ptrCast(self), &rtt),
);
return status.toError() orelse rtt;
}
pub fn hasHeaderSupport(self: *Connection) bool {
const status = Status.fromInt(
nats_c.natsConnection_HasHeaderSupport(@ptrCast(self)),
);
return status == .okay;
}
pub fn publish(self: *Connection, subject: [:0]const u8, message: []const u8) Error!void {
return Status.fromInt(
nats_c.natsConnection_Publish(@ptrCast(self), subject, message.ptr, @intCast(message.len)),
).raise();
}
pub fn publishString(
self: *Connection,
subject: [:0]const u8,
message: [:0]const u8,
) Error!void {
const status = Status.fromInt(nats_c.natsConnection_PublishString(
return Status.fromInt(
nats_c.natsConnection_PublishString(@ptrCast(self), subject.ptr, message.ptr),
).raise();
}
pub fn publishMessage(self: *Connection, message: *Message) Error!void {
return Status.fromInt(
nats_c.natsConnection_PublishMsg(@ptrCast(self), @ptrCast(message)),
).raise();
}
pub fn publishRequest(
self: *Connection,
subject: [:0]const u8,
reply: [:0]const u8,
message: []const u8,
) Error!void {
return Status.fromInt(
nats_c.natsConnection_PublishRequest(
@ptrCast(self),
subject.ptr,
reply.ptr,
message.ptr,
@intCast(message.len),
),
).raise();
}
pub fn publishRequestString(
self: *Connection,
subject: [:0]const u8,
reply: [:0]const u8,
message: [:0]const u8,
) Error!void {
return Status.fromInt(
nats_c.natsConnection_PublishRequestString(
@ptrCast(self),
subject.ptr,
reply.ptr,
message.ptr,
),
).raise();
}
pub fn request(
self: *Connection,
subject: [:0]const u8,
req: []const u8,
timeout: i64,
) Error!*Message {
var response: *Message = undefined;
const status = Status.fromInt(nats_c.natsConnection_Request(
@ptrCast(&response),
@ptrCast(self),
subject,
message,
subject.ptr,
req.ptr,
@intCast(req.len),
timeout,
));
return status.raise();
return status.toError() orelse response;
}
pub fn requestString(
self: *Connection,
subject: [:0]const u8,
request: [:0]const u8,
req: [:0]const u8,
timeout: i64,
) Error!*Message {
var msg: *Message = undefined;
var response: *Message = undefined;
const status = Status.fromInt(nats_c.natsConnection_RequestString(
@ptrCast(&msg),
@ptrCast(&response),
@ptrCast(self),
subject,
request,
subject.ptr,
req.ptr,
timeout,
));
return status.toError() orelse msg;
return status.toError() orelse response;
}
pub fn requestMessage(
self: *Connection,
req: *Message,
timeout: i64,
) Error!*Message {
var response: *Message = undefined;
const status = Status.fromInt(nats_c.natsConnection_RequestMsg(
@ptrCast(&response),
@ptrCast(self),
@ptrCast(req),
timeout,
));
return status.toError() orelse response;
}
pub fn subscribe(
self: *Connection,
comptime T: type,
subject: [:0]const u8,
callback: SubscriptionThunkCallback(T),
callback: SubscriptionCallbackSignature(T),
userdata: *T,
) Error!*Subscription {
var sub: *Subscription = undefined;
const status = Status.fromInt(nats_c.natsConnection_Subscribe(
@ptrCast(&sub),
@ptrCast(self),
subject,
subscriptionMessageThunk(T, callback),
subject.ptr,
makeSubscriptionCallbackThunk(T, callback),
userdata,
));
return status.toError() orelse sub;
}
pub fn subscribeTimeout(
self: *Connection,
comptime T: type,
subject: [:0]const u8,
timeout: i64,
callback: SubscriptionCallbackSignature(T),
userdata: *T,
) Error!*Subscription {
var sub: *Subscription = undefined;
const status = Status.fromInt(nats_c.natsConnection_SubscribeTimeout(
@ptrCast(&sub),
@ptrCast(self),
subject.ptr,
timeout,
makeSubscriptionCallbackThunk(T, callback),
userdata,
));
return status.toError() orelse sub;
}
pub fn subscribeSync(self: *Connection, subject: [:0]const u8) Error!*Subscription {
var sub: *Subscription = undefined;
const status = Status.fromInt(nats_c.natsConnection_SubscribeSync(
@ptrCast(&sub),
@ptrCast(self),
subject.ptr,
));
return status.toError() orelse sub;
}
pub fn queueSubscribe(
self: *Connection,
comptime T: type,
subject: [:0]const u8,
queue_group: [:0]const u8,
callback: SubscriptionCallbackSignature(T),
userdata: *T,
) Error!*Subscription {
var sub: *Subscription = undefined;
const status = Status.fromInt(nats_c.natsConnection_QueueSubscribe(
@ptrCast(&sub),
@ptrCast(self),
subject.ptr,
queue_group.ptr,
makeSubscriptionCallbackThunk(T, callback),
userdata,
));
return status.toError() orelse sub;
}
pub fn queueSubscribeTimeout(
self: *Connection,
comptime T: type,
subject: [:0]const u8,
queue_group: [:0]const u8,
timeout: i64,
callback: SubscriptionCallbackSignature(T),
userdata: *T,
) Error!*Subscription {
var sub: *Subscription = undefined;
const status = Status.fromInt(nats_c.natsConnection_QueueSubscribe(
@ptrCast(&sub),
@ptrCast(self),
subject.ptr,
queue_group.ptr,
timeout,
makeSubscriptionCallbackThunk(T, callback),
userdata,
));
return status.toError() orelse sub;
}
pub fn queueSubscribeSync(
self: *Connection,
subject: [:0]const u8,
queue_group: [:0]const u8,
) Error!*Subscription {
var sub: *Subscription = undefined;
const status = Status.fromInt(nats_c.natsConnection_SubscribeSync(
@ptrCast(&sub),
@ptrCast(self),
subject.ptr,
queue_group.ptr,
));
return status.toError() orelse sub;
}
};
// NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc);
// NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc);
// NATS_EXTERN bool natsConnection_IsClosed(natsConnection *nc);
// NATS_EXTERN bool natsConnection_IsReconnecting(natsConnection *nc);
// NATS_EXTERN natsConnStatus natsConnection_Status(natsConnection *nc);
// NATS_EXTERN int natsConnection_Buffered(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_Flush(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_FlushTimeout(natsConnection *nc, int64_t timeout);
// NATS_EXTERN int64_t natsConnection_GetMaxPayload(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_GetStats(natsConnection *nc, natsStatistics *stats);
// NATS_EXTERN natsStatus natsConnection_GetConnectedUrl(natsConnection *nc, char *buffer, size_t bufferSize);
// NATS_EXTERN natsStatus natsConnection_GetConnectedServerId(natsConnection *nc, char *buffer, size_t bufferSize);
// NATS_EXTERN natsStatus natsConnection_GetServers(natsConnection *nc, char ***servers, int *count);
// NATS_EXTERN natsStatus natsConnection_GetDiscoveredServers(natsConnection *nc, char ***servers, int *count);
// NATS_EXTERN natsStatus natsConnection_GetLastError(natsConnection *nc, const char **lastError);
// NATS_EXTERN natsStatus natsConnection_GetClientID(natsConnection *nc, uint64_t *cid);
// NATS_EXTERN natsStatus natsConnection_Drain(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_DrainTimeout(natsConnection *nc, int64_t timeout);
// NATS_EXTERN natsStatus natsConnection_Sign(natsConnection *nc, const unsigned char *message, int messageLen, unsigned char sig[64]);
// NATS_EXTERN natsStatus natsConnection_GetClientIP(natsConnection *nc, char **ip);
// NATS_EXTERN natsStatus natsConnection_GetRTT(natsConnection *nc, int64_t *rtt);
// NATS_EXTERN natsStatus natsConnection_HasHeaderSupport(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_Publish(natsConnection *nc, const char *subj, const void *data, int dataLen);
// NATS_EXTERN natsStatus natsConnection_PublishString(natsConnection *nc, const char *subj, const char *str);
// NATS_EXTERN natsStatus natsConnection_PublishMsg(natsConnection *nc, natsMsg *msg);
// NATS_EXTERN natsStatus natsConnection_PublishRequest(natsConnection *nc, const char *subj, const char *reply, const void *data, int dataLen);
// NATS_EXTERN natsStatus natsConnection_PublishRequestString(natsConnection *nc, const char *subj, const char *reply, const char *str);
// NATS_EXTERN natsStatus natsConnection_Request(natsMsg **replyMsg, natsConnection *nc, const char *subj, const void *data, int dataLen, int64_t timeout);
// NATS_EXTERN natsStatus natsConnection_RequestString(natsMsg **replyMsg, natsConnection *nc, const char *subj, const char *str, int64_t timeout);
// NATS_EXTERN natsStatus natsConnection_RequestMsg(natsMsg **replyMsg, natsConnection *nc,natsMsg *requestMsg, int64_t timeout);
// NATS_EXTERN natsStatus natsConnection_Subscribe(natsSubscription **sub, natsConnection *nc, const char *subject, natsMsgHandler cb, void *cbClosure);
// NATS_EXTERN natsStatus natsConnection_SubscribeTimeout(natsSubscription **sub, natsConnection *nc, const char *subject, int64_t timeout, natsMsgHandler cb, void *cbClosure);
// NATS_EXTERN natsStatus natsConnection_SubscribeSync(natsSubscription **sub, natsConnection *nc, const char *subject);
// NATS_EXTERN natsStatus natsConnection_QueueSubscribe(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup, natsMsgHandler cb, void *cbClosure);
// NATS_EXTERN natsStatus natsConnection_QueueSubscribeTimeout(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup, int64_t timeout, natsMsgHandler cb, void *cbClosure);
// NATS_EXTERN natsStatus natsConnection_QueueSubscribeSync(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup);
//
const ConnectionOptions = opaque {
pub fn create() Error!*ConnectionOptions {
var self: *ConnectionOptions = undefined;
@ -346,16 +675,20 @@ const ConnectionOptions = opaque {
).raise();
}
// needs a callback thunk
// typedef void (*natsErrHandler)(
// natsConnection *nc, natsSubscription *subscription, natsStatus err,
// void *closure);
// NATS_EXTERN natsStatus natsOptions_SetErrorHandler(natsOptions *opts, natsErrHandler errHandler, void *closure);
// pub fn setErrorHandler(self: *ConnectionOptions, max: c_int) Error!void {
// return Status.fromInt(
// nats_c.natsOptions_SetErrorHandler(@ptrCast(self), max),
// ).raise();
// }
pub fn setErrorHandler(
self: *ConnectionOptions,
comptime T: type,
comptime callback: *const ErrorHandlerCallbackSignature(T),
userdata: *T,
) Error!void {
return Status.fromInt(
nats_c.natsOptions_SetErrorHandler(
@ptrCast(self),
makeErrorHandlerCallbackThunk(T, callback),
userdata,
),
).raise();
}
pub fn setClosedCallback(
self: *ConnectionOptions,
@ -422,41 +755,25 @@ const ConnectionOptions = opaque {
)).raise();
}
// needs a callback thunk
// NATS_EXTERN natsStatus natsOptions_SetEventLoop(natsOptions *opts, void *loop, natsEvLoop_Attach attachCb, natsEvLoop_ReadAddRemove readCb, natsEvLoop_WriteAddRemove writeCb, natsEvLoop_Detach detachCb);
// typedef natsStatus (*natsEvLoop_Attach)(
// void **userData,
// void *loop,
// natsConnection *nc,
// natsSock socket);
// typedef natsStatus (*natsEvLoop_ReadAddRemove)(
// void *loop,
// bool add);
// typedef natsStatus (*natsEvLoop_WriteAddRemove)(
// void *loop,
// bool add);
// typedef natsStatus (*natsEvLoop_Detach)(
// void *loop);
// pub fn setEventLoop(
// self: *ConnectionOptions,
// comptime T: type,
// comptime L: type,
// comptime attachCallback: *const AttachEventLoopCallbackSignature(T, L),
// comptime readCallback: *const AttachEventLoopCallbackSignature(T),
// comptime writeCallback: *const AttachEventLoopCallbackSignature(T),
// comptime detachCallback: *const thunk.SimpleCallbackSignature(T),
// loop: *L,
// ) Error!void {
// return Status.fromInt(nats_c.natsOptions_SetEventLoop(
// @ptrCast(self),
// @ptrCast(loop),
// makeAttachEventLoopCallbackThunk(T, L, attachCallback),
// makeEventLoopAddRemoveCallbackThunk(T, readCallback),
// makeEventLoopAddRemoveCallbackThunk(T, writeCallback),
// thunk.makeSimpleCallbackThunk(callback),
// )).raise();
// }
pub fn setEventLoop(
self: *ConnectionOptions,
comptime T: type,
comptime L: type,
comptime attach_callback: *const AttachEventLoopCallbackSignature(T, L),
comptime read_callback: *const AttachEventLoopCallbackSignature(T),
comptime write_callback: *const AttachEventLoopCallbackSignature(T),
comptime detach_callback: *const thunk.SimpleCallbackSignature(T),
loop: *L,
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetEventLoop(
@ptrCast(self),
@ptrCast(loop),
makeAttachEventLoopCallbackThunk(T, L, attach_callback),
makeEventLoopAddRemoveCallbackThunk(T, read_callback),
makeEventLoopAddRemoveCallbackThunk(T, write_callback),
makeEventLoopDetachCallbackThunk(T, detach_callback),
)).raise();
}
pub fn ignoreDiscoveredServers(self: *ConnectionOptions, ignore: bool) Error!void {
return Status.fromInt(
@ -508,23 +825,36 @@ const ConnectionOptions = opaque {
).raise();
}
// thunkem
// NATS_EXTERN natsStatus natsOptions_SetRetryOnFailedConnect(natsOptions *opts, bool retry, natsConnectionHandler connectedCb, void* closure);
// typedef void (*natsConnectionHandler)(
// natsConnection *nc, void *closure);
pub fn setRetryOnFailedConnect(
self: *ConnectionOptions,
comptime T: type,
comptime callback: *const ConnectionCallbackSignature(T),
userdata: *T,
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetRetryOnFailedConnect(
@ptrCast(self),
makeConnectionCallbackThunk(T, callback),
userdata,
)).raise();
}
// 2 thunk 2 furious
// NATS_EXTERN natsStatus natsOptions_SetUserCredentialsCallbacks(natsOptions *opts, natsUserJWTHandler ujwtCB, void *ujwtClosure, natsSignatureHandler sigCB, void *sigClosure);
// typedef natsStatus (*natsUserJWTHandler)(
// char **userJWT,
// char **customErrTxt,
// void *closure);
// typedef natsStatus (*natsSignatureHandler)(
// char **customErrTxt,
// unsigned char **signature,
// int *signatureLength,
// const char *nonce,
// void *closure);
pub fn setUserCredentialsCallbacks(
self: *ConnectionOptions,
comptime T: type,
comptime U: type,
comptime jwt_callback: *const JwtHandlerCallbackSignature(T),
comptime sig_callback: *const SignatureHandlerCallbackSignature(U),
jwt_userdata: *T,
sig_userdata: *U,
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks(
@ptrCast(self),
makeJwtHandlerCallbackThunk(T, jwt_callback),
jwt_userdata,
makeSignatureHandlerCallbackThunk(U, sig_callback),
sig_userdata,
)).raise();
}
pub fn setUserCredentialsFromFiles(self: *ConnectionOptions, user_or_chained_file: [:0]const u8, seed_file: [:0]const u8) Error!void {
return Status.fromInt(
@ -545,14 +875,20 @@ const ConnectionOptions = opaque {
).raise();
}
// thunk
// NATS_EXTERN natsStatus natsOptions_SetNKey(natsOptions *opts, const char *pubKey, natsSignatureHandler sigCB, void *sigClosure);
// typedef natsStatus (*natsSignatureHandler)(
// char **customErrTxt,
// unsigned char **signature,
// int *signatureLength,
// const char *nonce,
// void *closure);
pub fn setNKey(
self: *ConnectionOptions,
comptime T: type,
comptime sig_callback: *const SignatureHandlerCallbackSignature(T),
pub_key: [:0]const u8,
sig_userdata: *T,
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks(
@ptrCast(self),
pub_key.ptr,
makeSignatureHandlerCallbackThunk(T, sig_callback),
sig_userdata,
)).raise();
}
pub fn setNKeyFromSeed(self: *ConnectionOptions, pub_key: [:0]const u8, seed_file: [:0]const u8) Error!void {
return Status.fromInt(
@ -623,10 +959,204 @@ pub fn makeReconnectDelayCallbackThunk(
conn: ?*nats_c.natsConnection,
attempts: c_int,
userdata: ?*anyopaque,
) callconv(.C) void {
) callconv(.C) i64 {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
callback(connection, attempts, data);
return callback(connection, attempts, data);
}
}.thunk;
}
const ErrorHandlerCallback = fn (
?*nats_c.natsConnection,
?*nats_c.natsSubscription,
nats_c.natsStatus,
?*anyopaque,
) void;
pub fn ErrorHandlerCallbackSignature(comptime T: type) type {
return fn (*Connection, *Subscription, Status, *T) void;
}
pub fn makeErrorHandlerCallbackThunk(
comptime T: type,
comptime callback: *const ErrorHandlerCallbackSignature(T),
) *const ErrorHandlerCallback {
return struct {
fn thunk(
conn: ?*nats_c.natsConnection,
sub: ?*nats_c.natsSubscription,
status: nats_c.natsStatus,
userdata: ?*anyopaque,
) callconv(.C) void {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const subscription: *Subscription = if (sub) |s| @ptrCast(s) else unreachable;
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
callback(connection, subscription, Status.fromInt(status), data);
}
}.thunk;
}
// natsSock is an fd on non-windows and SOCKET on windows
const AttachEventLoopCallback = fn (
*?*anyopaque,
?*anyopaque,
?*nats_c.natsConnection,
nats_c.natsSock,
) nats_c.natsStatus;
pub fn AttachEventLoopCallbackSignature(comptime T: type, comptime L: type) type {
return fn (*L, *Connection, c_int) anyerror!*T;
}
pub fn makeAttachEventLoopCallbackThunk(
comptime T: type,
comptime L: type,
comptime callback: *const AttachEventLoopCallbackSignature(T, L),
) *const ReconnectDelayCallback {
return struct {
fn thunk(
userdata: *?*anyopaque,
loop: ?*anyopaque,
conn: ?*nats_c.natsConnection,
sock: ?*nats_c.natsSock,
) callconv(.C) nats_c.natsStatus {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const ev_loop: *L = if (loop) |l| @ptrCast(l) else unreachable;
userdata.* = callback(ev_loop, connection, sock) catch |err|
return Status.fromError(err).toInt();
return nats_c.NATS_OK;
}
}.thunk;
}
const EventLoopAddRemoveCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) nats_c.natsStatus;
pub fn EventLoopAddRemoveCallbackSignature(comptime T: type) type {
return fn (*Connection, c_int, *T) anyerror!void;
}
pub fn makeEventLoopAddRemoveCallbackThunk(
comptime T: type,
comptime callback: *const EventLoopAddRemoveCallbackSignature(T),
) *const ReconnectDelayCallback {
return struct {
fn thunk(
conn: ?*nats_c.natsConnection,
attempts: c_int,
userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
callback(connection, attempts, data) catch |err|
return Status.fromError(err).toInt();
return nats_c.NATS_OK;
}
}.thunk;
}
const EventLoopDetachCallback = fn (?*anyopaque) nats_c.natsStatus;
pub fn EventLoopDetachCallbackSignature(comptime T: type) type {
return fn (*T) anyerror!void;
}
pub fn makeEventLoopDetachCallbackThunk(
comptime T: type,
comptime callback: *const EventLoopDetachCallbackSignature(T),
) *const ReconnectDelayCallback {
return struct {
fn thunk(
userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus {
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
callback(data) catch |err| return Status.fromError(err).toInt();
return nats_c.NATS_OK;
}
}.thunk;
}
// THE NATS LIBRARY WILL TRY TO FREE THE TOKEN AND ALSO THE ERROR MESSAGE, SO THEY MUST
// BE ALLOCATED WITH THE C ALLOCATOR
const JwtHandlerCallback = fn (*?[*:0]u8, *?[*:0]u8, ?*anyopaque) nats_c.natsStatus;
const JwtResponseOrError = union(enum) {
jwt: [:0]u8,
error_message: [:0]u8,
};
pub fn JwtHandlerCallbackSignature(comptime T: type) type {
return fn (*T) JwtResponseOrError;
}
pub fn makeJwtHandlerCallbackThunk(
comptime T: type,
comptime callback: *const JwtHandlerCallbackSignature(T),
) *const ReconnectDelayCallback {
return struct {
fn thunk(
jwt_out: *?[*:0]u8,
err_out: *?[*:0]u8,
userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus {
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
switch (callback(data)) {
.jwt => |jwt| {
jwt_out.* = jwt.ptr;
return nats_c.NATS_OK;
},
.error_message => |msg| {
err_out.* = msg.ptr;
return nats_c.NATS_ERR;
},
}
}
}.thunk;
}
// THE NATS LIBRARY WILL TRY TO FREE THE SIGNATURE AND ALSO THE ERROR MESSAGE, SO THEY MUST
// BE ALLOCATED WITH THE C ALLOCATOR
const SignatureHandlerCallback = fn (*?[*:0]u8, *?[*]u8, *c_int, [*:0]const u8, ?*anyopaque) nats_c.natsStatus;
const SignatureResponseOrError = union(enum) {
signature: []u8,
error_message: [:0]u8,
};
pub fn SignatureHandlerCallbackSignature(comptime T: type) type {
return fn ([:0]const u8, *T) SignatureResponseOrError;
}
pub fn makeSignatureHandlerCallbackThunk(
comptime T: type,
comptime callback: *const SignatureHandlerCallbackSignature(T),
) *const ReconnectDelayCallback {
return struct {
fn thunk(
err_out: *?[*:0]u8,
sig_out: *?[*]u8,
sig_len_out: *c_int,
nonce: [*:0]const u8,
userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus {
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
switch (callback(std.mem.sliceTo(nonce, 0), data)) {
.signature => |sig| {
sig_out.* = sig.ptr;
sig_len_out.* = sig.len;
return nats_c.NATS_OK;
},
.error_message => |msg| {
err_out.* = msg.ptr;
return nats_c.NATS_ERR;
},
}
}
}.thunk;
}

View File

@ -74,6 +74,10 @@ pub const Status = enum(c_int) {
return @enumFromInt(int);
}
pub fn toInt(self: Status) c_uint {
return @intFromEnum(self);
}
pub fn description(self: Status) [:0]const u8 {
return std.mem.sliceTo(nats_c.natsStatus_GetText(self), 0);
}
@ -163,6 +167,50 @@ pub const Status = enum(c_int) {
_ => Error.UnknownError,
};
}
pub fn fromError(err: ?anyerror) Status {
return if (err) |e|
switch (e) {
Error.ProtocolError => .protocol_error,
Error.IoError => .io_error,
Error.LineTooLong => .line_too_long,
Error.ConnectionClosed => .connection_closed,
Error.NoServer => .no_server,
Error.StaleConnection => .stale_connection,
Error.SecureConnectionWanted => .secure_connection_wanted,
Error.SecureConnectionRequired => .secure_connection_required,
Error.ConnectionDisconnected => .connection_disconnected,
Error.ConnectionAuthFailed => .connection_auth_failed,
Error.NotPermitted => .not_permitted,
Error.NotFound => .not_found,
Error.AddressMissing => .address_missing,
Error.InvalidSubject => .invalid_subject,
Error.InvalidArg => .invalid_arg,
Error.InvalidSubscription => .invalid_subscription,
Error.InvalidTimeout => .invalid_timeout,
Error.IllegalState => .illegal_state,
Error.SlowConsumer => .slow_consumer,
Error.MaxPayload => .max_payload,
Error.MaxDeliveredMsgs => .max_delivered_msgs,
Error.InsufficientBuffer => .insufficient_buffer,
Error.NoMemory => .no_memory,
Error.SysError => .sys_error,
Error.Timeout => .timeout,
Error.FailedToInitialize => .failed_to_initialize,
Error.NotInitialized => .not_initialized,
Error.SslError => .ssl_error,
Error.NoServerSupport => .no_server_support,
Error.NotYetConnected => .not_yet_connected,
Error.Draining => .draining,
Error.InvalidQueueName => .invalid_queue_name,
Error.NoResponders => .no_responders,
Error.Mismatch => .mismatch,
Error.MissedHeartbeat => .missed_heartbeat,
else => .generic_error,
}
else
.okay;
}
};
pub const Error = error{

View File

@ -22,12 +22,19 @@ const err_ = @import("./error.zig");
const con_ = @import("./connection.zig");
const sub_ = @import("./subscription.zig");
const msg_ = @import("./message.zig");
const sta_ = @import("./statistics.zig");
pub const default_server_url = con_.default_server_url;
pub const Connection = con_.Connection;
pub const ConnectionOptions = con_.ConnectionOptions;
pub const Subscription = sub_.Subscription;
pub const Message = msg_.Message;
pub const Statistics = sta_.Statistics;
pub const StatsCounts = sta_.StatsCounts;
const Status = err_.Status;
pub const Error = err_.Error;
@ -129,39 +136,6 @@ pub fn deinitWait(timeout: i64) Error!void {
return status.raise();
}
pub const StatsCounts = struct {
messages_in: u64 = 0,
bytes_in: u64 = 0,
messages_out: u64 = 0,
bytes_out: u64 = 0,
reconnects: u64 = 0,
};
pub const Statistics = opaque {
pub fn create() Error!*Statistics {
var stats: *Statistics = undefined;
const status = Status.fromInt(nats_c.natsStatistics_Create(@ptrCast(&stats)));
return status.toError() orelse stats;
}
pub fn deinit(self: *Statistics) void {
nats_c.natsStatistics_Destroy(@ptrCast(self));
}
pub fn getCounts(self: *Statistics) Error!StatsCounts {
var counts: StatsCounts = .{};
const status = Status.fromInt(nats_c.natsStatistics_GetCounts)(
self,
&counts.messages_in,
&counts.bytes_in,
&counts.messages_out,
&counts.bytes_out,
&counts.reconnects,
);
return status.toError() orelse counts;
}
};
// This appears to be a jetstream API, but these two endpoints are trivial, so, whoops.
// I have no clue what this does, since there's basically no
pub const Inbox = opaque {

42
src/statistics.zig Normal file
View File

@ -0,0 +1,42 @@
const std = @import("std");
const nats_c = @cImport({
@cInclude("nats/nats.h");
});
const err_ = @import("./error.zig");
const Status = err_.Status;
const Error = err_.Error;
pub const StatsCounts = struct {
messages_in: u64 = 0,
bytes_in: u64 = 0,
messages_out: u64 = 0,
bytes_out: u64 = 0,
reconnects: u64 = 0,
};
pub const Statistics = opaque {
pub fn create() Error!*Statistics {
var stats: *Statistics = undefined;
const status = Status.fromInt(nats_c.natsStatistics_Create(@ptrCast(&stats)));
return status.toError() orelse stats;
}
pub fn destroy(self: *Statistics) void {
nats_c.natsStatistics_Destroy(@ptrCast(self));
}
pub fn getCounts(self: *Statistics) Error!StatsCounts {
var counts: StatsCounts = .{};
const status = Status.fromInt(nats_c.natsStatistics_GetCounts)(
self,
&counts.messages_in,
&counts.bytes_in,
&counts.messages_out,
&counts.bytes_out,
&counts.reconnects,
);
return status.toError() orelse counts;
}
};

View File

@ -169,21 +169,21 @@ pub const Subscription = opaque {
}
};
const BareSubscriptionCallback = fn (
const SubscriptionCallback = fn (
?*nats_c.natsConnection,
?*nats_c.natsSubscription,
?*nats_c.natsMsg,
?*anyopaque,
) callconv(.C) void;
pub fn SubscriptionThunkCallback(comptime T: type) type {
pub fn SubscriptionCallbackSignature(comptime T: type) type {
return fn (*T, *Connection, *Subscription, *Message) void;
}
pub fn subscriptionMessageThunk(
pub fn makeSubscriptionCallbackThunk(
comptime T: type,
comptime callback: *const SubscriptionThunkCallback(T),
) *const BareSubscriptionCallback {
comptime callback: *const SubscriptionCallbackSignature(T),
) *const SubscriptionCallback {
return struct {
fn thunk(
conn: ?*nats_c.natsConnection,