Compare commits

..

No commits in common. "9a4c80861c4e7032ea1f2f3b105b3566fec595e6" and "3ff894cc0d556746ed8e68bca14d7805a015ec92" have entirely different histories.

5 changed files with 167 additions and 761 deletions

View File

@ -6,8 +6,8 @@ pub const nats_c = @cImport({
const sub_ = @import("./subscription.zig"); const sub_ = @import("./subscription.zig");
const Subscription = sub_.Subscription; const Subscription = sub_.Subscription;
const SubscriptionCallbackSignature = sub_.SubscriptionCallbackSignature; const SubscriptionThunkCallback = sub_.SubscriptionThunkCallback;
const makeSubscriptionCallbackThunk = sub_.makeSubscriptionCallbackThunk; const subscriptionMessageThunk = sub_.subscriptionMessageThunk;
const msg_ = @import("./message.zig"); const msg_ = @import("./message.zig");
const Message = msg_.Message; const Message = msg_.Message;
@ -15,38 +15,13 @@ const Message = msg_.Message;
const err_ = @import("./error.zig"); const err_ = @import("./error.zig");
const Error = err_.Error; const Error = err_.Error;
const Status = err_.Status; const Status = err_.Status;
const ErrorInfo = err_.ErrorInfo;
const sta_ = @import("./statistics.zig");
const Statistics = sta_.Statistics;
const StatsCounts = sta_.StatsCounts;
const thunk = @import("./thunk.zig"); const thunk = @import("./thunk.zig");
pub const default_server_url: [:0]const u8 = nats_c.NATS_DEFAULT_URL; pub const default_server_url: [:0]const u8 = nats_c.NATS_DEFAULT_URL;
pub const ConnectionStatus = enum(c_int) {
disconnected = nats_c.DISCONNECTED,
connecting = nats_c.CONNECTING,
connected = nats_c.CONNECTED,
closed = nats_c.CLOSED,
reconnecting = nats_c.RECONNECTING,
draining_subs = nats_c.DRAINING_SUBS,
draining_pubs = nats_c.DRAINING_PUBS,
_,
pub fn fromInt(int: c_uint) ConnectionStatus {
return @enumFromInt(int);
}
};
pub const AddressPort = struct {
address: [:0]u8,
port: u16,
};
pub const Connection = opaque { pub const Connection = opaque {
pub fn create(options: *ConnectionOptions) Error!*Connection { pub fn connect(options: *ConnectionOptions) Error!*Connection {
var self: *Connection = undefined; var self: *Connection = undefined;
const status = Status.fromInt(nats_c.natsConnection_Connect(@ptrCast(&self), @ptrCast(options))); const status = Status.fromInt(nats_c.natsConnection_Connect(@ptrCast(&self), @ptrCast(options)));
return status.toError() orelse self; return status.toError() orelse self;
@ -69,402 +44,98 @@ pub const Connection = opaque {
return nats_c.natsConnection_Destroy(@ptrCast(self)); return nats_c.natsConnection_Destroy(@ptrCast(self));
} }
pub fn processReadEvent(self: *Connection) void {
nats_c.natsConnection_ProcessReadEvent(@ptrCast(self));
}
pub fn processWriteEvent(self: *Connection) void {
nats_c.natsConnection_ProcessWriteEvent(@ptrCast(self));
}
pub fn isClosed(self: *Connection) bool {
return nats_c.natsConnection_IsClosed(@ptrCast(self));
}
pub fn isReconnecting(self: *Connection) bool {
return nats_c.natsConnection_IsReconnecting(@ptrCast(self));
}
pub fn getStatus(self: *Connection) ConnectionStatus {
return ConnectionStatus.fromInt(nats_c.natsConnection_Status(@ptrCast(self)));
}
pub fn bytesBuffered(self: *Connection) c_int {
return nats_c.natsConnection_Buffered(@ptrCast(self));
}
pub fn flush(self: *Connection) Error!void {
return Status.fromInt(nats_c.natsConnection_Flush(@ptrCast(self))).raise();
}
pub fn flushTimeout(self: *Connection, timeout: i64) Error!void {
return Status.fromInt(nats_c.natsConnection_FlushTimeout(@ptrCast(self), timeout)).raise();
}
pub fn getMaxPayload(self: *Connection) c_int {
return nats_c.natsConnection_GetMaxPayload(@ptrCast(self));
}
pub fn getStats(self: *Connection) Error!StatsCounts {
var stats = try Statistics.create();
defer stats.destroy();
const status = Status.fromInt(
nats_c.natsConnection_GetStats(@ptrCast(self), @ptrCast(stats)),
);
return status.toError() orelse stats.getCounts();
}
pub fn getConnectedUrl(self: *Connection, buffer: []u8) Error![:0]u8 {
const status = Status.fromInt(
nats_c.natsConnection_GetConnectedUrl(@ptrCast(self), buffer.ptr, buffer.len),
);
// cast this to a c pointer so that sliceTo properly returns the sentinel
// terminated type, which is guaranteed by the backing library.
return status.toError() orelse std.mem.sliceTo(@as([*c]u8, buffer.ptr), 0);
}
pub fn getConnectedServerId(self: *Connection, buffer: []u8) Error![:0]u8 {
const status = Status.fromInt(
nats_c.natsConnection_GetConnectedServerId(@ptrCast(self), buffer.ptr, buffer.len),
);
// cast this to a c pointer so that sliceTo properly returns the sentinel
// terminated type, which is guaranteed by the backing library.
return status.toError() orelse std.mem.sliceTo(@as([*c]u8, buffer.ptr), 0);
}
pub fn getServers(self: *Connection) Error![][*:0]u8 {
var servers: [*][*:0]u8 = undefined;
var count: c_int = 0;
const status = Status.fromInt(
nats_c.natsConnection_GetServers(@ptrCast(self), &servers, &count),
);
return status.toError() orelse servers[0..@intCast(count)];
}
pub fn getDiscoveredServers(self: *Connection) Error![][*:0]u8 {
var servers: [*][*:0]u8 = undefined;
var count: c_int = 0;
const status = Status.fromInt(
nats_c.natsConnection_GetDiscoveredServers(@ptrCast(self), &servers, &count),
);
return status.toError() orelse servers[0..@intCast(count)];
}
pub fn getLastError(self: *Connection) ErrorInfo {
var desc: [*:0]const u8 = 0;
const status = nats_c.natsConnection_GetLastError(@ptrCast(self), &desc);
return .{
.code = Status.fromInt(status).toError(),
.desc = std.mem.sliceTo(desc, 0),
};
}
pub fn getClientId(self: *Connection) Error!u64 {
var id: u64 = 0;
const status = Status.fromInt(
nats_c.natsConnection_GetClientID(@ptrCast(self), &id),
);
return status.toError() orelse id;
}
pub fn drain(self: *Connection) Error!void {
return Status.fromInt(nats_c.natsConnection_Drain(@ptrCast(self))).raise();
}
pub fn drainTimeout(self: *Connection, timeout: i64) Error!void {
return Status.fromInt(nats_c.natsConnection_DrainTimeout(@ptrCast(self), timeout)).raise();
}
pub fn sign(self: *Connection, message: []const u8) Error![64]u8 {
var sig = [_]u8{0} ** 64;
const status = Status.fromInt(
nats_c.natsConnection_Sign(@ptrCast(self), message.ptr, message.len, &sig),
);
return status.toError() orelse sig;
}
pub fn getClientIp(self: *Connection) Error![:0]u8 {
var ip: [*c]u8 = null;
const status = Status.fromInt(
nats_c.natsConnection_GetClientIP(@ptrCast(self), &ip),
);
return status.toError() orelse std.mem.sliceTo(ip, 0);
}
pub fn getLocalIpAndPort(self: *Connection) Error!AddressPort {
var address: [*:0]u8 = undefined;
var port: c_int = 0;
const status = Status.fromInt(
nats_c.natsConnection_GetLocalIPAndPort(@ptrCast(self), &address, &port),
);
return status.toError() orelse .{
.address = std.mem.sliceTo(address, 0),
.port = @intCast(port),
};
}
pub fn getRtt(self: *Connection) Error!i64 {
var rtt: i64 = 0;
const status = Status.fromInt(
nats_c.natsConnection_GetRTT(@ptrCast(self), &rtt),
);
return status.toError() orelse rtt;
}
pub fn hasHeaderSupport(self: *Connection) bool {
const status = Status.fromInt(
nats_c.natsConnection_HasHeaderSupport(@ptrCast(self)),
);
return status == .okay;
}
pub fn publish(self: *Connection, subject: [:0]const u8, message: []const u8) Error!void {
return Status.fromInt(
nats_c.natsConnection_Publish(@ptrCast(self), subject, message.ptr, @intCast(message.len)),
).raise();
}
pub fn publishString( pub fn publishString(
self: *Connection, self: *Connection,
subject: [:0]const u8, subject: [:0]const u8,
message: [:0]const u8, message: [:0]const u8,
) Error!void { ) Error!void {
return Status.fromInt( const status = Status.fromInt(nats_c.natsConnection_PublishString(
nats_c.natsConnection_PublishString(@ptrCast(self), subject.ptr, message.ptr),
).raise();
}
pub fn publishMessage(self: *Connection, message: *Message) Error!void {
return Status.fromInt(
nats_c.natsConnection_PublishMsg(@ptrCast(self), @ptrCast(message)),
).raise();
}
pub fn publishRequest(
self: *Connection,
subject: [:0]const u8,
reply: [:0]const u8,
message: []const u8,
) Error!void {
return Status.fromInt(
nats_c.natsConnection_PublishRequest(
@ptrCast(self),
subject.ptr,
reply.ptr,
message.ptr,
@intCast(message.len),
),
).raise();
}
pub fn publishRequestString(
self: *Connection,
subject: [:0]const u8,
reply: [:0]const u8,
message: [:0]const u8,
) Error!void {
return Status.fromInt(
nats_c.natsConnection_PublishRequestString(
@ptrCast(self),
subject.ptr,
reply.ptr,
message.ptr,
),
).raise();
}
pub fn request(
self: *Connection,
subject: [:0]const u8,
req: []const u8,
timeout: i64,
) Error!*Message {
var response: *Message = undefined;
const status = Status.fromInt(nats_c.natsConnection_Request(
@ptrCast(&response),
@ptrCast(self), @ptrCast(self),
subject.ptr, subject,
req.ptr, message,
@intCast(req.len),
timeout,
)); ));
return status.raise();
return status.toError() orelse response;
} }
pub fn requestString( pub fn requestString(
self: *Connection, self: *Connection,
subject: [:0]const u8, subject: [:0]const u8,
req: [:0]const u8, request: [:0]const u8,
timeout: i64, timeout: i64,
) Error!*Message { ) Error!*Message {
var response: *Message = undefined; var msg: *Message = undefined;
const status = Status.fromInt(nats_c.natsConnection_RequestString( const status = Status.fromInt(nats_c.natsConnection_RequestString(
@ptrCast(&response), @ptrCast(&msg),
@ptrCast(self), @ptrCast(self),
subject.ptr, subject,
req.ptr, request,
timeout, timeout,
)); ));
return status.toError() orelse msg;
return status.toError() orelse response;
}
pub fn requestMessage(
self: *Connection,
req: *Message,
timeout: i64,
) Error!*Message {
var response: *Message = undefined;
const status = Status.fromInt(nats_c.natsConnection_RequestMsg(
@ptrCast(&response),
@ptrCast(self),
@ptrCast(req),
timeout,
));
return status.toError() orelse response;
} }
pub fn subscribe( pub fn subscribe(
self: *Connection, self: *Connection,
comptime T: type, comptime T: type,
subject: [:0]const u8, subject: [:0]const u8,
callback: SubscriptionCallbackSignature(T), callback: SubscriptionThunkCallback(T),
userdata: *T, userdata: *T,
) Error!*Subscription { ) Error!*Subscription {
var sub: *Subscription = undefined; var sub: *Subscription = undefined;
const status = Status.fromInt(nats_c.natsConnection_Subscribe( const status = Status.fromInt(nats_c.natsConnection_Subscribe(
@ptrCast(&sub), @ptrCast(&sub),
@ptrCast(self), @ptrCast(self),
subject.ptr, subject,
makeSubscriptionCallbackThunk(T, callback), subscriptionMessageThunk(T, callback),
userdata, userdata,
)); ));
return status.toError() orelse sub; return status.toError() orelse sub;
} }
pub fn subscribeTimeout(
self: *Connection,
comptime T: type,
subject: [:0]const u8,
timeout: i64,
callback: SubscriptionCallbackSignature(T),
userdata: *T,
) Error!*Subscription {
var sub: *Subscription = undefined;
const status = Status.fromInt(nats_c.natsConnection_SubscribeTimeout(
@ptrCast(&sub),
@ptrCast(self),
subject.ptr,
timeout,
makeSubscriptionCallbackThunk(T, callback),
userdata,
));
return status.toError() orelse sub;
}
pub fn subscribeSync(self: *Connection, subject: [:0]const u8) Error!*Subscription {
var sub: *Subscription = undefined;
const status = Status.fromInt(nats_c.natsConnection_SubscribeSync(
@ptrCast(&sub),
@ptrCast(self),
subject.ptr,
));
return status.toError() orelse sub;
}
pub fn queueSubscribe(
self: *Connection,
comptime T: type,
subject: [:0]const u8,
queue_group: [:0]const u8,
callback: SubscriptionCallbackSignature(T),
userdata: *T,
) Error!*Subscription {
var sub: *Subscription = undefined;
const status = Status.fromInt(nats_c.natsConnection_QueueSubscribe(
@ptrCast(&sub),
@ptrCast(self),
subject.ptr,
queue_group.ptr,
makeSubscriptionCallbackThunk(T, callback),
userdata,
));
return status.toError() orelse sub;
}
pub fn queueSubscribeTimeout(
self: *Connection,
comptime T: type,
subject: [:0]const u8,
queue_group: [:0]const u8,
timeout: i64,
callback: SubscriptionCallbackSignature(T),
userdata: *T,
) Error!*Subscription {
var sub: *Subscription = undefined;
const status = Status.fromInt(nats_c.natsConnection_QueueSubscribe(
@ptrCast(&sub),
@ptrCast(self),
subject.ptr,
queue_group.ptr,
timeout,
makeSubscriptionCallbackThunk(T, callback),
userdata,
));
return status.toError() orelse sub;
}
pub fn queueSubscribeSync(
self: *Connection,
subject: [:0]const u8,
queue_group: [:0]const u8,
) Error!*Subscription {
var sub: *Subscription = undefined;
const status = Status.fromInt(nats_c.natsConnection_SubscribeSync(
@ptrCast(&sub),
@ptrCast(self),
subject.ptr,
queue_group.ptr,
));
return status.toError() orelse sub;
}
}; };
// NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc);
// NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc);
// NATS_EXTERN bool natsConnection_IsClosed(natsConnection *nc);
// NATS_EXTERN bool natsConnection_IsReconnecting(natsConnection *nc);
// NATS_EXTERN natsConnStatus natsConnection_Status(natsConnection *nc);
// NATS_EXTERN int natsConnection_Buffered(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_Flush(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_FlushTimeout(natsConnection *nc, int64_t timeout);
// NATS_EXTERN int64_t natsConnection_GetMaxPayload(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_GetStats(natsConnection *nc, natsStatistics *stats);
// NATS_EXTERN natsStatus natsConnection_GetConnectedUrl(natsConnection *nc, char *buffer, size_t bufferSize);
// NATS_EXTERN natsStatus natsConnection_GetConnectedServerId(natsConnection *nc, char *buffer, size_t bufferSize);
// NATS_EXTERN natsStatus natsConnection_GetServers(natsConnection *nc, char ***servers, int *count);
// NATS_EXTERN natsStatus natsConnection_GetDiscoveredServers(natsConnection *nc, char ***servers, int *count);
// NATS_EXTERN natsStatus natsConnection_GetLastError(natsConnection *nc, const char **lastError);
// NATS_EXTERN natsStatus natsConnection_GetClientID(natsConnection *nc, uint64_t *cid);
// NATS_EXTERN natsStatus natsConnection_Drain(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_DrainTimeout(natsConnection *nc, int64_t timeout);
// NATS_EXTERN natsStatus natsConnection_Sign(natsConnection *nc, const unsigned char *message, int messageLen, unsigned char sig[64]);
// NATS_EXTERN natsStatus natsConnection_GetClientIP(natsConnection *nc, char **ip);
// NATS_EXTERN natsStatus natsConnection_GetRTT(natsConnection *nc, int64_t *rtt);
// NATS_EXTERN natsStatus natsConnection_HasHeaderSupport(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_Publish(natsConnection *nc, const char *subj, const void *data, int dataLen);
// NATS_EXTERN natsStatus natsConnection_PublishString(natsConnection *nc, const char *subj, const char *str);
// NATS_EXTERN natsStatus natsConnection_PublishMsg(natsConnection *nc, natsMsg *msg);
// NATS_EXTERN natsStatus natsConnection_PublishRequest(natsConnection *nc, const char *subj, const char *reply, const void *data, int dataLen);
// NATS_EXTERN natsStatus natsConnection_PublishRequestString(natsConnection *nc, const char *subj, const char *reply, const char *str);
// NATS_EXTERN natsStatus natsConnection_Request(natsMsg **replyMsg, natsConnection *nc, const char *subj, const void *data, int dataLen, int64_t timeout);
// NATS_EXTERN natsStatus natsConnection_RequestString(natsMsg **replyMsg, natsConnection *nc, const char *subj, const char *str, int64_t timeout);
// NATS_EXTERN natsStatus natsConnection_RequestMsg(natsMsg **replyMsg, natsConnection *nc,natsMsg *requestMsg, int64_t timeout);
// NATS_EXTERN natsStatus natsConnection_Subscribe(natsSubscription **sub, natsConnection *nc, const char *subject, natsMsgHandler cb, void *cbClosure);
// NATS_EXTERN natsStatus natsConnection_SubscribeTimeout(natsSubscription **sub, natsConnection *nc, const char *subject, int64_t timeout, natsMsgHandler cb, void *cbClosure);
// NATS_EXTERN natsStatus natsConnection_SubscribeSync(natsSubscription **sub, natsConnection *nc, const char *subject);
// NATS_EXTERN natsStatus natsConnection_QueueSubscribe(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup, natsMsgHandler cb, void *cbClosure);
// NATS_EXTERN natsStatus natsConnection_QueueSubscribeTimeout(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup, int64_t timeout, natsMsgHandler cb, void *cbClosure);
// NATS_EXTERN natsStatus natsConnection_QueueSubscribeSync(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup);
//
const ConnectionOptions = opaque { const ConnectionOptions = opaque {
pub fn create() Error!*ConnectionOptions { pub fn create() Error!*ConnectionOptions {
var self: *ConnectionOptions = undefined; var self: *ConnectionOptions = undefined;
@ -675,20 +346,16 @@ const ConnectionOptions = opaque {
).raise(); ).raise();
} }
pub fn setErrorHandler( // needs a callback thunk
self: *ConnectionOptions, // typedef void (*natsErrHandler)(
comptime T: type, // natsConnection *nc, natsSubscription *subscription, natsStatus err,
comptime callback: *const ErrorHandlerCallbackSignature(T), // void *closure);
userdata: *T, // NATS_EXTERN natsStatus natsOptions_SetErrorHandler(natsOptions *opts, natsErrHandler errHandler, void *closure);
) Error!void { // pub fn setErrorHandler(self: *ConnectionOptions, max: c_int) Error!void {
return Status.fromInt( // return Status.fromInt(
nats_c.natsOptions_SetErrorHandler( // nats_c.natsOptions_SetErrorHandler(@ptrCast(self), max),
@ptrCast(self), // ).raise();
makeErrorHandlerCallbackThunk(T, callback), // }
userdata,
),
).raise();
}
pub fn setClosedCallback( pub fn setClosedCallback(
self: *ConnectionOptions, self: *ConnectionOptions,
@ -755,25 +422,41 @@ const ConnectionOptions = opaque {
)).raise(); )).raise();
} }
pub fn setEventLoop( // needs a callback thunk
self: *ConnectionOptions, // NATS_EXTERN natsStatus natsOptions_SetEventLoop(natsOptions *opts, void *loop, natsEvLoop_Attach attachCb, natsEvLoop_ReadAddRemove readCb, natsEvLoop_WriteAddRemove writeCb, natsEvLoop_Detach detachCb);
comptime T: type, // typedef natsStatus (*natsEvLoop_Attach)(
comptime L: type, // void **userData,
comptime attach_callback: *const AttachEventLoopCallbackSignature(T, L), // void *loop,
comptime read_callback: *const AttachEventLoopCallbackSignature(T), // natsConnection *nc,
comptime write_callback: *const AttachEventLoopCallbackSignature(T), // natsSock socket);
comptime detach_callback: *const thunk.SimpleCallbackSignature(T), // typedef natsStatus (*natsEvLoop_ReadAddRemove)(
loop: *L, // void *loop,
) Error!void { // bool add);
return Status.fromInt(nats_c.natsOptions_SetEventLoop( // typedef natsStatus (*natsEvLoop_WriteAddRemove)(
@ptrCast(self), // void *loop,
@ptrCast(loop), // bool add);
makeAttachEventLoopCallbackThunk(T, L, attach_callback), // typedef natsStatus (*natsEvLoop_Detach)(
makeEventLoopAddRemoveCallbackThunk(T, read_callback), // void *loop);
makeEventLoopAddRemoveCallbackThunk(T, write_callback),
makeEventLoopDetachCallbackThunk(T, detach_callback), // pub fn setEventLoop(
)).raise(); // self: *ConnectionOptions,
} // comptime T: type,
// comptime L: type,
// comptime attachCallback: *const AttachEventLoopCallbackSignature(T, L),
// comptime readCallback: *const AttachEventLoopCallbackSignature(T),
// comptime writeCallback: *const AttachEventLoopCallbackSignature(T),
// comptime detachCallback: *const thunk.SimpleCallbackSignature(T),
// loop: *L,
// ) Error!void {
// return Status.fromInt(nats_c.natsOptions_SetEventLoop(
// @ptrCast(self),
// @ptrCast(loop),
// makeAttachEventLoopCallbackThunk(T, L, attachCallback),
// makeEventLoopAddRemoveCallbackThunk(T, readCallback),
// makeEventLoopAddRemoveCallbackThunk(T, writeCallback),
// thunk.makeSimpleCallbackThunk(callback),
// )).raise();
// }
pub fn ignoreDiscoveredServers(self: *ConnectionOptions, ignore: bool) Error!void { pub fn ignoreDiscoveredServers(self: *ConnectionOptions, ignore: bool) Error!void {
return Status.fromInt( return Status.fromInt(
@ -825,36 +508,23 @@ const ConnectionOptions = opaque {
).raise(); ).raise();
} }
pub fn setRetryOnFailedConnect( // thunkem
self: *ConnectionOptions, // NATS_EXTERN natsStatus natsOptions_SetRetryOnFailedConnect(natsOptions *opts, bool retry, natsConnectionHandler connectedCb, void* closure);
comptime T: type, // typedef void (*natsConnectionHandler)(
comptime callback: *const ConnectionCallbackSignature(T), // natsConnection *nc, void *closure);
userdata: *T,
) Error!void {
return Status.fromInt(nats_c.natsOptions_SetRetryOnFailedConnect(
@ptrCast(self),
makeConnectionCallbackThunk(T, callback),
userdata,
)).raise();
}
pub fn setUserCredentialsCallbacks( // 2 thunk 2 furious
self: *ConnectionOptions, // NATS_EXTERN natsStatus natsOptions_SetUserCredentialsCallbacks(natsOptions *opts, natsUserJWTHandler ujwtCB, void *ujwtClosure, natsSignatureHandler sigCB, void *sigClosure);
comptime T: type, // typedef natsStatus (*natsUserJWTHandler)(
comptime U: type, // char **userJWT,
comptime jwt_callback: *const JwtHandlerCallbackSignature(T), // char **customErrTxt,
comptime sig_callback: *const SignatureHandlerCallbackSignature(U), // void *closure);
jwt_userdata: *T, // typedef natsStatus (*natsSignatureHandler)(
sig_userdata: *U, // char **customErrTxt,
) Error!void { // unsigned char **signature,
return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks( // int *signatureLength,
@ptrCast(self), // const char *nonce,
makeJwtHandlerCallbackThunk(T, jwt_callback), // void *closure);
jwt_userdata,
makeSignatureHandlerCallbackThunk(U, sig_callback),
sig_userdata,
)).raise();
}
pub fn setUserCredentialsFromFiles(self: *ConnectionOptions, user_or_chained_file: [:0]const u8, seed_file: [:0]const u8) Error!void { pub fn setUserCredentialsFromFiles(self: *ConnectionOptions, user_or_chained_file: [:0]const u8, seed_file: [:0]const u8) Error!void {
return Status.fromInt( return Status.fromInt(
@ -875,20 +545,14 @@ const ConnectionOptions = opaque {
).raise(); ).raise();
} }
pub fn setNKey( // thunk
self: *ConnectionOptions, // NATS_EXTERN natsStatus natsOptions_SetNKey(natsOptions *opts, const char *pubKey, natsSignatureHandler sigCB, void *sigClosure);
comptime T: type, // typedef natsStatus (*natsSignatureHandler)(
comptime sig_callback: *const SignatureHandlerCallbackSignature(T), // char **customErrTxt,
pub_key: [:0]const u8, // unsigned char **signature,
sig_userdata: *T, // int *signatureLength,
) Error!void { // const char *nonce,
return Status.fromInt(nats_c.natsOptions_SetUserCredentialsCallbacks( // void *closure);
@ptrCast(self),
pub_key.ptr,
makeSignatureHandlerCallbackThunk(T, sig_callback),
sig_userdata,
)).raise();
}
pub fn setNKeyFromSeed(self: *ConnectionOptions, pub_key: [:0]const u8, seed_file: [:0]const u8) Error!void { pub fn setNKeyFromSeed(self: *ConnectionOptions, pub_key: [:0]const u8, seed_file: [:0]const u8) Error!void {
return Status.fromInt( return Status.fromInt(
@ -959,204 +623,10 @@ pub fn makeReconnectDelayCallbackThunk(
conn: ?*nats_c.natsConnection, conn: ?*nats_c.natsConnection,
attempts: c_int, attempts: c_int,
userdata: ?*anyopaque, userdata: ?*anyopaque,
) callconv(.C) i64 {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
return callback(connection, attempts, data);
}
}.thunk;
}
const ErrorHandlerCallback = fn (
?*nats_c.natsConnection,
?*nats_c.natsSubscription,
nats_c.natsStatus,
?*anyopaque,
) void;
pub fn ErrorHandlerCallbackSignature(comptime T: type) type {
return fn (*Connection, *Subscription, Status, *T) void;
}
pub fn makeErrorHandlerCallbackThunk(
comptime T: type,
comptime callback: *const ErrorHandlerCallbackSignature(T),
) *const ErrorHandlerCallback {
return struct {
fn thunk(
conn: ?*nats_c.natsConnection,
sub: ?*nats_c.natsSubscription,
status: nats_c.natsStatus,
userdata: ?*anyopaque,
) callconv(.C) void { ) callconv(.C) void {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable; const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const subscription: *Subscription = if (sub) |s| @ptrCast(s) else unreachable;
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
callback(connection, attempts, data);
callback(connection, subscription, Status.fromInt(status), data);
}
}.thunk;
}
// natsSock is an fd on non-windows and SOCKET on windows
const AttachEventLoopCallback = fn (
*?*anyopaque,
?*anyopaque,
?*nats_c.natsConnection,
nats_c.natsSock,
) nats_c.natsStatus;
pub fn AttachEventLoopCallbackSignature(comptime T: type, comptime L: type) type {
return fn (*L, *Connection, c_int) anyerror!*T;
}
pub fn makeAttachEventLoopCallbackThunk(
comptime T: type,
comptime L: type,
comptime callback: *const AttachEventLoopCallbackSignature(T, L),
) *const ReconnectDelayCallback {
return struct {
fn thunk(
userdata: *?*anyopaque,
loop: ?*anyopaque,
conn: ?*nats_c.natsConnection,
sock: ?*nats_c.natsSock,
) callconv(.C) nats_c.natsStatus {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const ev_loop: *L = if (loop) |l| @ptrCast(l) else unreachable;
userdata.* = callback(ev_loop, connection, sock) catch |err|
return Status.fromError(err).toInt();
return nats_c.NATS_OK;
}
}.thunk;
}
const EventLoopAddRemoveCallback = fn (?*nats_c.natsConnection, c_int, ?*anyopaque) nats_c.natsStatus;
pub fn EventLoopAddRemoveCallbackSignature(comptime T: type) type {
return fn (*Connection, c_int, *T) anyerror!void;
}
pub fn makeEventLoopAddRemoveCallbackThunk(
comptime T: type,
comptime callback: *const EventLoopAddRemoveCallbackSignature(T),
) *const ReconnectDelayCallback {
return struct {
fn thunk(
conn: ?*nats_c.natsConnection,
attempts: c_int,
userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus {
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
callback(connection, attempts, data) catch |err|
return Status.fromError(err).toInt();
return nats_c.NATS_OK;
}
}.thunk;
}
const EventLoopDetachCallback = fn (?*anyopaque) nats_c.natsStatus;
pub fn EventLoopDetachCallbackSignature(comptime T: type) type {
return fn (*T) anyerror!void;
}
pub fn makeEventLoopDetachCallbackThunk(
comptime T: type,
comptime callback: *const EventLoopDetachCallbackSignature(T),
) *const ReconnectDelayCallback {
return struct {
fn thunk(
userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus {
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
callback(data) catch |err| return Status.fromError(err).toInt();
return nats_c.NATS_OK;
}
}.thunk;
}
// THE NATS LIBRARY WILL TRY TO FREE THE TOKEN AND ALSO THE ERROR MESSAGE, SO THEY MUST
// BE ALLOCATED WITH THE C ALLOCATOR
const JwtHandlerCallback = fn (*?[*:0]u8, *?[*:0]u8, ?*anyopaque) nats_c.natsStatus;
const JwtResponseOrError = union(enum) {
jwt: [:0]u8,
error_message: [:0]u8,
};
pub fn JwtHandlerCallbackSignature(comptime T: type) type {
return fn (*T) JwtResponseOrError;
}
pub fn makeJwtHandlerCallbackThunk(
comptime T: type,
comptime callback: *const JwtHandlerCallbackSignature(T),
) *const ReconnectDelayCallback {
return struct {
fn thunk(
jwt_out: *?[*:0]u8,
err_out: *?[*:0]u8,
userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus {
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
switch (callback(data)) {
.jwt => |jwt| {
jwt_out.* = jwt.ptr;
return nats_c.NATS_OK;
},
.error_message => |msg| {
err_out.* = msg.ptr;
return nats_c.NATS_ERR;
},
}
}
}.thunk;
}
// THE NATS LIBRARY WILL TRY TO FREE THE SIGNATURE AND ALSO THE ERROR MESSAGE, SO THEY MUST
// BE ALLOCATED WITH THE C ALLOCATOR
const SignatureHandlerCallback = fn (*?[*:0]u8, *?[*]u8, *c_int, [*:0]const u8, ?*anyopaque) nats_c.natsStatus;
const SignatureResponseOrError = union(enum) {
signature: []u8,
error_message: [:0]u8,
};
pub fn SignatureHandlerCallbackSignature(comptime T: type) type {
return fn ([:0]const u8, *T) SignatureResponseOrError;
}
pub fn makeSignatureHandlerCallbackThunk(
comptime T: type,
comptime callback: *const SignatureHandlerCallbackSignature(T),
) *const ReconnectDelayCallback {
return struct {
fn thunk(
err_out: *?[*:0]u8,
sig_out: *?[*]u8,
sig_len_out: *c_int,
nonce: [*:0]const u8,
userdata: ?*anyopaque,
) callconv(.C) nats_c.natsStatus {
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
switch (callback(std.mem.sliceTo(nonce, 0), data)) {
.signature => |sig| {
sig_out.* = sig.ptr;
sig_len_out.* = sig.len;
return nats_c.NATS_OK;
},
.error_message => |msg| {
err_out.* = msg.ptr;
return nats_c.NATS_ERR;
},
}
} }
}.thunk; }.thunk;
} }

View File

@ -74,10 +74,6 @@ pub const Status = enum(c_int) {
return @enumFromInt(int); return @enumFromInt(int);
} }
pub fn toInt(self: Status) c_uint {
return @intFromEnum(self);
}
pub fn description(self: Status) [:0]const u8 { pub fn description(self: Status) [:0]const u8 {
return std.mem.sliceTo(nats_c.natsStatus_GetText(self), 0); return std.mem.sliceTo(nats_c.natsStatus_GetText(self), 0);
} }
@ -167,50 +163,6 @@ pub const Status = enum(c_int) {
_ => Error.UnknownError, _ => Error.UnknownError,
}; };
} }
pub fn fromError(err: ?anyerror) Status {
return if (err) |e|
switch (e) {
Error.ProtocolError => .protocol_error,
Error.IoError => .io_error,
Error.LineTooLong => .line_too_long,
Error.ConnectionClosed => .connection_closed,
Error.NoServer => .no_server,
Error.StaleConnection => .stale_connection,
Error.SecureConnectionWanted => .secure_connection_wanted,
Error.SecureConnectionRequired => .secure_connection_required,
Error.ConnectionDisconnected => .connection_disconnected,
Error.ConnectionAuthFailed => .connection_auth_failed,
Error.NotPermitted => .not_permitted,
Error.NotFound => .not_found,
Error.AddressMissing => .address_missing,
Error.InvalidSubject => .invalid_subject,
Error.InvalidArg => .invalid_arg,
Error.InvalidSubscription => .invalid_subscription,
Error.InvalidTimeout => .invalid_timeout,
Error.IllegalState => .illegal_state,
Error.SlowConsumer => .slow_consumer,
Error.MaxPayload => .max_payload,
Error.MaxDeliveredMsgs => .max_delivered_msgs,
Error.InsufficientBuffer => .insufficient_buffer,
Error.NoMemory => .no_memory,
Error.SysError => .sys_error,
Error.Timeout => .timeout,
Error.FailedToInitialize => .failed_to_initialize,
Error.NotInitialized => .not_initialized,
Error.SslError => .ssl_error,
Error.NoServerSupport => .no_server_support,
Error.NotYetConnected => .not_yet_connected,
Error.Draining => .draining,
Error.InvalidQueueName => .invalid_queue_name,
Error.NoResponders => .no_responders,
Error.Mismatch => .mismatch,
Error.MissedHeartbeat => .missed_heartbeat,
else => .generic_error,
}
else
.okay;
}
}; };
pub const Error = error{ pub const Error = error{

View File

@ -22,19 +22,12 @@ const err_ = @import("./error.zig");
const con_ = @import("./connection.zig"); const con_ = @import("./connection.zig");
const sub_ = @import("./subscription.zig"); const sub_ = @import("./subscription.zig");
const msg_ = @import("./message.zig"); const msg_ = @import("./message.zig");
const sta_ = @import("./statistics.zig");
pub const default_server_url = con_.default_server_url; pub const default_server_url = con_.default_server_url;
pub const Connection = con_.Connection; pub const Connection = con_.Connection;
pub const ConnectionOptions = con_.ConnectionOptions;
pub const Subscription = sub_.Subscription; pub const Subscription = sub_.Subscription;
pub const Message = msg_.Message; pub const Message = msg_.Message;
pub const Statistics = sta_.Statistics;
pub const StatsCounts = sta_.StatsCounts;
const Status = err_.Status; const Status = err_.Status;
pub const Error = err_.Error; pub const Error = err_.Error;
@ -136,6 +129,39 @@ pub fn deinitWait(timeout: i64) Error!void {
return status.raise(); return status.raise();
} }
pub const StatsCounts = struct {
messages_in: u64 = 0,
bytes_in: u64 = 0,
messages_out: u64 = 0,
bytes_out: u64 = 0,
reconnects: u64 = 0,
};
pub const Statistics = opaque {
pub fn create() Error!*Statistics {
var stats: *Statistics = undefined;
const status = Status.fromInt(nats_c.natsStatistics_Create(@ptrCast(&stats)));
return status.toError() orelse stats;
}
pub fn deinit(self: *Statistics) void {
nats_c.natsStatistics_Destroy(@ptrCast(self));
}
pub fn getCounts(self: *Statistics) Error!StatsCounts {
var counts: StatsCounts = .{};
const status = Status.fromInt(nats_c.natsStatistics_GetCounts)(
self,
&counts.messages_in,
&counts.bytes_in,
&counts.messages_out,
&counts.bytes_out,
&counts.reconnects,
);
return status.toError() orelse counts;
}
};
// This appears to be a jetstream API, but these two endpoints are trivial, so, whoops. // This appears to be a jetstream API, but these two endpoints are trivial, so, whoops.
// I have no clue what this does, since there's basically no // I have no clue what this does, since there's basically no
pub const Inbox = opaque { pub const Inbox = opaque {

View File

@ -1,42 +0,0 @@
const std = @import("std");
const nats_c = @cImport({
@cInclude("nats/nats.h");
});
const err_ = @import("./error.zig");
const Status = err_.Status;
const Error = err_.Error;
pub const StatsCounts = struct {
messages_in: u64 = 0,
bytes_in: u64 = 0,
messages_out: u64 = 0,
bytes_out: u64 = 0,
reconnects: u64 = 0,
};
pub const Statistics = opaque {
pub fn create() Error!*Statistics {
var stats: *Statistics = undefined;
const status = Status.fromInt(nats_c.natsStatistics_Create(@ptrCast(&stats)));
return status.toError() orelse stats;
}
pub fn destroy(self: *Statistics) void {
nats_c.natsStatistics_Destroy(@ptrCast(self));
}
pub fn getCounts(self: *Statistics) Error!StatsCounts {
var counts: StatsCounts = .{};
const status = Status.fromInt(nats_c.natsStatistics_GetCounts)(
self,
&counts.messages_in,
&counts.bytes_in,
&counts.messages_out,
&counts.bytes_out,
&counts.reconnects,
);
return status.toError() orelse counts;
}
};

View File

@ -169,21 +169,21 @@ pub const Subscription = opaque {
} }
}; };
const SubscriptionCallback = fn ( const BareSubscriptionCallback = fn (
?*nats_c.natsConnection, ?*nats_c.natsConnection,
?*nats_c.natsSubscription, ?*nats_c.natsSubscription,
?*nats_c.natsMsg, ?*nats_c.natsMsg,
?*anyopaque, ?*anyopaque,
) callconv(.C) void; ) callconv(.C) void;
pub fn SubscriptionCallbackSignature(comptime T: type) type { pub fn SubscriptionThunkCallback(comptime T: type) type {
return fn (*T, *Connection, *Subscription, *Message) void; return fn (*T, *Connection, *Subscription, *Message) void;
} }
pub fn makeSubscriptionCallbackThunk( pub fn subscriptionMessageThunk(
comptime T: type, comptime T: type,
comptime callback: *const SubscriptionCallbackSignature(T), comptime callback: *const SubscriptionThunkCallback(T),
) *const SubscriptionCallback { ) *const BareSubscriptionCallback {
return struct { return struct {
fn thunk( fn thunk(
conn: ?*nats_c.natsConnection, conn: ?*nats_c.natsConnection,