diff --git a/src/connection.zig b/src/connection.zig index 5f55903..d232546 100644 --- a/src/connection.zig +++ b/src/connection.zig @@ -6,8 +6,8 @@ pub const nats_c = @cImport({ const sub_ = @import("./subscription.zig"); const Subscription = sub_.Subscription; -const SubscriptionThunkCallback = sub_.SubscriptionThunkCallback; -const subscriptionMessageThunk = sub_.subscriptionMessageThunk; +const SubscriptionCallbackSignature = sub_.SubscriptionCallbackSignature; +const makeSubscriptionCallbackThunk = sub_.makeSubscriptionCallbackThunk; const msg_ = @import("./message.zig"); const Message = msg_.Message; @@ -15,13 +15,38 @@ const Message = msg_.Message; const err_ = @import("./error.zig"); const Error = err_.Error; const Status = err_.Status; +const ErrorInfo = err_.ErrorInfo; + +const sta_ = @import("./statistics.zig"); +const Statistics = sta_.Statistics; +const StatsCounts = sta_.StatsCounts; const thunk = @import("./thunk.zig"); pub const default_server_url: [:0]const u8 = nats_c.NATS_DEFAULT_URL; +pub const ConnectionStatus = enum(c_int) { + disconnected = nats_c.DISCONNECTED, + connecting = nats_c.CONNECTING, + connected = nats_c.CONNECTED, + closed = nats_c.CLOSED, + reconnecting = nats_c.RECONNECTING, + draining_subs = nats_c.DRAINING_SUBS, + draining_pubs = nats_c.DRAINING_PUBS, + _, + + pub fn fromInt(int: c_uint) ConnectionStatus { + return @enumFromInt(int); + } +}; + +pub const AddressPort = struct { + address: [:0]u8, + port: u16, +}; + pub const Connection = opaque { - pub fn connect(options: *ConnectionOptions) Error!*Connection { + pub fn create(options: *ConnectionOptions) Error!*Connection { var self: *Connection = undefined; const status = Status.fromInt(nats_c.natsConnection_Connect(@ptrCast(&self), @ptrCast(options))); return status.toError() orelse self; @@ -44,98 +69,402 @@ pub const Connection = opaque { return nats_c.natsConnection_Destroy(@ptrCast(self)); } + pub fn processReadEvent(self: *Connection) void { + nats_c.natsConnection_ProcessReadEvent(@ptrCast(self)); + } + + pub fn processWriteEvent(self: *Connection) void { + nats_c.natsConnection_ProcessWriteEvent(@ptrCast(self)); + } + + pub fn isClosed(self: *Connection) bool { + return nats_c.natsConnection_IsClosed(@ptrCast(self)); + } + + pub fn isReconnecting(self: *Connection) bool { + return nats_c.natsConnection_IsReconnecting(@ptrCast(self)); + } + + pub fn getStatus(self: *Connection) ConnectionStatus { + return ConnectionStatus.fromInt(nats_c.natsConnection_Status(@ptrCast(self))); + } + + pub fn bytesBuffered(self: *Connection) c_int { + return nats_c.natsConnection_Buffered(@ptrCast(self)); + } + + pub fn flush(self: *Connection) Error!void { + return Status.fromInt(nats_c.natsConnection_Flush(@ptrCast(self))).raise(); + } + + pub fn flushTimeout(self: *Connection, timeout: i64) Error!void { + return Status.fromInt(nats_c.natsConnection_FlushTimeout(@ptrCast(self), timeout)).raise(); + } + + pub fn getMaxPayload(self: *Connection) c_int { + return nats_c.natsConnection_GetMaxPayload(@ptrCast(self)); + } + + pub fn getStats(self: *Connection) Error!StatsCounts { + var stats = try Statistics.create(); + defer stats.destroy(); + + const status = Status.fromInt( + nats_c.natsConnection_GetStats(@ptrCast(self), @ptrCast(stats)), + ); + + return status.toError() orelse stats.getCounts(); + } + + pub fn getConnectedUrl(self: *Connection, buffer: []u8) Error![:0]u8 { + const status = Status.fromInt( + nats_c.natsConnection_GetConnectedUrl(@ptrCast(self), buffer.ptr, buffer.len), + ); + + // cast this to a c pointer so that sliceTo properly returns the sentinel + // terminated type, which is guaranteed by the backing library. + return status.toError() orelse std.mem.sliceTo(@as([*c]u8, buffer.ptr), 0); + } + + pub fn getConnectedServerId(self: *Connection, buffer: []u8) Error![:0]u8 { + const status = Status.fromInt( + nats_c.natsConnection_GetConnectedServerId(@ptrCast(self), buffer.ptr, buffer.len), + ); + + // cast this to a c pointer so that sliceTo properly returns the sentinel + // terminated type, which is guaranteed by the backing library. + return status.toError() orelse std.mem.sliceTo(@as([*c]u8, buffer.ptr), 0); + } + + pub fn getServers(self: *Connection) Error![][*:0]u8 { + var servers: [*][*:0]u8 = undefined; + var count: c_int = 0; + + const status = Status.fromInt( + nats_c.natsConnection_GetServers(@ptrCast(self), &servers, &count), + ); + + return status.toError() orelse servers[0..@intCast(count)]; + } + + pub fn getDiscoveredServers(self: *Connection) Error![][*:0]u8 { + var servers: [*][*:0]u8 = undefined; + var count: c_int = 0; + + const status = Status.fromInt( + nats_c.natsConnection_GetDiscoveredServers(@ptrCast(self), &servers, &count), + ); + + return status.toError() orelse servers[0..@intCast(count)]; + } + + pub fn getLastError(self: *Connection) ErrorInfo { + var desc: [*:0]const u8 = 0; + const status = nats_c.natsConnection_GetLastError(@ptrCast(self), &desc); + + return .{ + .code = Status.fromInt(status).toError(), + .desc = std.mem.sliceTo(desc, 0), + }; + } + + pub fn getClientId(self: *Connection) Error!u64 { + var id: u64 = 0; + + const status = Status.fromInt( + nats_c.natsConnection_GetClientID(@ptrCast(self), &id), + ); + + return status.toError() orelse id; + } + + pub fn drain(self: *Connection) Error!void { + return Status.fromInt(nats_c.natsConnection_Drain(@ptrCast(self))).raise(); + } + + pub fn drainTimeout(self: *Connection, timeout: i64) Error!void { + return Status.fromInt(nats_c.natsConnection_DrainTimeout(@ptrCast(self), timeout)).raise(); + } + + pub fn sign(self: *Connection, message: []const u8) Error![64]u8 { + var sig = [_]u8{0} ** 64; + + const status = Status.fromInt( + nats_c.natsConnection_Sign(@ptrCast(self), message.ptr, message.len, &sig), + ); + + return status.toError() orelse sig; + } + + pub fn getClientIp(self: *Connection) Error![:0]u8 { + var ip: [*c]u8 = null; + + const status = Status.fromInt( + nats_c.natsConnection_GetClientIP(@ptrCast(self), &ip), + ); + + return status.toError() orelse std.mem.sliceTo(ip, 0); + } + + pub fn getLocalIpAndPort(self: *Connection) Error!AddressPort { + var address: [*:0]u8 = undefined; + var port: c_int = 0; + + const status = Status.fromInt( + nats_c.natsConnection_GetLocalIPAndPort(@ptrCast(self), &address, &port), + ); + + return status.toError() orelse .{ + .address = std.mem.sliceTo(address, 0), + .port = @intCast(port), + }; + } + + pub fn getRtt(self: *Connection) Error!i64 { + var rtt: i64 = 0; + + const status = Status.fromInt( + nats_c.natsConnection_GetRTT(@ptrCast(self), &rtt), + ); + + return status.toError() orelse rtt; + } + + pub fn hasHeaderSupport(self: *Connection) bool { + const status = Status.fromInt( + nats_c.natsConnection_HasHeaderSupport(@ptrCast(self)), + ); + + return status == .okay; + } + + pub fn publish(self: *Connection, subject: [:0]const u8, message: []const u8) Error!void { + return Status.fromInt( + nats_c.natsConnection_Publish(@ptrCast(self), subject, message.ptr, @intCast(message.len)), + ).raise(); + } + pub fn publishString( self: *Connection, subject: [:0]const u8, message: [:0]const u8, ) Error!void { - const status = Status.fromInt(nats_c.natsConnection_PublishString( + return Status.fromInt( + nats_c.natsConnection_PublishString(@ptrCast(self), subject.ptr, message.ptr), + ).raise(); + } + + pub fn publishMessage(self: *Connection, message: *Message) Error!void { + return Status.fromInt( + nats_c.natsConnection_PublishMsg(@ptrCast(self), @ptrCast(message)), + ).raise(); + } + + pub fn publishRequest( + self: *Connection, + subject: [:0]const u8, + reply: [:0]const u8, + message: []const u8, + ) Error!void { + return Status.fromInt( + nats_c.natsConnection_PublishRequest( + @ptrCast(self), + subject.ptr, + reply.ptr, + message.ptr, + @intCast(message.len), + ), + ).raise(); + } + + pub fn publishRequestString( + self: *Connection, + subject: [:0]const u8, + reply: [:0]const u8, + message: [:0]const u8, + ) Error!void { + return Status.fromInt( + nats_c.natsConnection_PublishRequestString( + @ptrCast(self), + subject.ptr, + reply.ptr, + message.ptr, + ), + ).raise(); + } + + pub fn request( + self: *Connection, + subject: [:0]const u8, + req: []const u8, + timeout: i64, + ) Error!*Message { + var response: *Message = undefined; + + const status = Status.fromInt(nats_c.natsConnection_Request( + @ptrCast(&response), @ptrCast(self), - subject, - message, + subject.ptr, + req.ptr, + @intCast(req.len), + timeout, )); - return status.raise(); + + return status.toError() orelse response; } pub fn requestString( self: *Connection, subject: [:0]const u8, - request: [:0]const u8, + req: [:0]const u8, timeout: i64, ) Error!*Message { - var msg: *Message = undefined; + var response: *Message = undefined; + const status = Status.fromInt(nats_c.natsConnection_RequestString( - @ptrCast(&msg), + @ptrCast(&response), @ptrCast(self), - subject, - request, + subject.ptr, + req.ptr, timeout, )); - return status.toError() orelse msg; + + return status.toError() orelse response; + } + + pub fn requestMessage( + self: *Connection, + req: *Message, + timeout: i64, + ) Error!*Message { + var response: *Message = undefined; + + const status = Status.fromInt(nats_c.natsConnection_RequestMsg( + @ptrCast(&response), + @ptrCast(self), + @ptrCast(req), + timeout, + )); + + return status.toError() orelse response; } pub fn subscribe( self: *Connection, comptime T: type, subject: [:0]const u8, - callback: SubscriptionThunkCallback(T), + callback: SubscriptionCallbackSignature(T), userdata: *T, ) Error!*Subscription { var sub: *Subscription = undefined; const status = Status.fromInt(nats_c.natsConnection_Subscribe( @ptrCast(&sub), @ptrCast(self), - subject, - subscriptionMessageThunk(T, callback), + subject.ptr, + makeSubscriptionCallbackThunk(T, callback), userdata, )); return status.toError() orelse sub; } + + pub fn subscribeTimeout( + self: *Connection, + comptime T: type, + subject: [:0]const u8, + timeout: i64, + callback: SubscriptionCallbackSignature(T), + userdata: *T, + ) Error!*Subscription { + var sub: *Subscription = undefined; + + const status = Status.fromInt(nats_c.natsConnection_SubscribeTimeout( + @ptrCast(&sub), + @ptrCast(self), + subject.ptr, + timeout, + makeSubscriptionCallbackThunk(T, callback), + userdata, + )); + + return status.toError() orelse sub; + } + + pub fn subscribeSync(self: *Connection, subject: [:0]const u8) Error!*Subscription { + var sub: *Subscription = undefined; + + const status = Status.fromInt(nats_c.natsConnection_SubscribeSync( + @ptrCast(&sub), + @ptrCast(self), + subject.ptr, + )); + + return status.toError() orelse sub; + } + + pub fn queueSubscribe( + self: *Connection, + comptime T: type, + subject: [:0]const u8, + queue_group: [:0]const u8, + callback: SubscriptionCallbackSignature(T), + userdata: *T, + ) Error!*Subscription { + var sub: *Subscription = undefined; + + const status = Status.fromInt(nats_c.natsConnection_QueueSubscribe( + @ptrCast(&sub), + @ptrCast(self), + subject.ptr, + queue_group.ptr, + makeSubscriptionCallbackThunk(T, callback), + userdata, + )); + + return status.toError() orelse sub; + } + + pub fn queueSubscribeTimeout( + self: *Connection, + comptime T: type, + subject: [:0]const u8, + queue_group: [:0]const u8, + timeout: i64, + callback: SubscriptionCallbackSignature(T), + userdata: *T, + ) Error!*Subscription { + var sub: *Subscription = undefined; + + const status = Status.fromInt(nats_c.natsConnection_QueueSubscribe( + @ptrCast(&sub), + @ptrCast(self), + subject.ptr, + queue_group.ptr, + timeout, + makeSubscriptionCallbackThunk(T, callback), + userdata, + )); + + return status.toError() orelse sub; + } + + pub fn queueSubscribeSync( + self: *Connection, + subject: [:0]const u8, + queue_group: [:0]const u8, + ) Error!*Subscription { + var sub: *Subscription = undefined; + + const status = Status.fromInt(nats_c.natsConnection_SubscribeSync( + @ptrCast(&sub), + @ptrCast(self), + subject.ptr, + queue_group.ptr, + )); + + return status.toError() orelse sub; + } }; -// NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc); -// NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc); - -// NATS_EXTERN bool natsConnection_IsClosed(natsConnection *nc); -// NATS_EXTERN bool natsConnection_IsReconnecting(natsConnection *nc); - -// NATS_EXTERN natsConnStatus natsConnection_Status(natsConnection *nc); -// NATS_EXTERN int natsConnection_Buffered(natsConnection *nc); - -// NATS_EXTERN natsStatus natsConnection_Flush(natsConnection *nc); -// NATS_EXTERN natsStatus natsConnection_FlushTimeout(natsConnection *nc, int64_t timeout); - -// NATS_EXTERN int64_t natsConnection_GetMaxPayload(natsConnection *nc); -// NATS_EXTERN natsStatus natsConnection_GetStats(natsConnection *nc, natsStatistics *stats); -// NATS_EXTERN natsStatus natsConnection_GetConnectedUrl(natsConnection *nc, char *buffer, size_t bufferSize); -// NATS_EXTERN natsStatus natsConnection_GetConnectedServerId(natsConnection *nc, char *buffer, size_t bufferSize); -// NATS_EXTERN natsStatus natsConnection_GetServers(natsConnection *nc, char ***servers, int *count); -// NATS_EXTERN natsStatus natsConnection_GetDiscoveredServers(natsConnection *nc, char ***servers, int *count); -// NATS_EXTERN natsStatus natsConnection_GetLastError(natsConnection *nc, const char **lastError); -// NATS_EXTERN natsStatus natsConnection_GetClientID(natsConnection *nc, uint64_t *cid); -// NATS_EXTERN natsStatus natsConnection_Drain(natsConnection *nc); -// NATS_EXTERN natsStatus natsConnection_DrainTimeout(natsConnection *nc, int64_t timeout); -// NATS_EXTERN natsStatus natsConnection_Sign(natsConnection *nc, const unsigned char *message, int messageLen, unsigned char sig[64]); -// NATS_EXTERN natsStatus natsConnection_GetClientIP(natsConnection *nc, char **ip); -// NATS_EXTERN natsStatus natsConnection_GetRTT(natsConnection *nc, int64_t *rtt); -// NATS_EXTERN natsStatus natsConnection_HasHeaderSupport(natsConnection *nc); - -// NATS_EXTERN natsStatus natsConnection_Publish(natsConnection *nc, const char *subj, const void *data, int dataLen); -// NATS_EXTERN natsStatus natsConnection_PublishString(natsConnection *nc, const char *subj, const char *str); -// NATS_EXTERN natsStatus natsConnection_PublishMsg(natsConnection *nc, natsMsg *msg); -// NATS_EXTERN natsStatus natsConnection_PublishRequest(natsConnection *nc, const char *subj, const char *reply, const void *data, int dataLen); -// NATS_EXTERN natsStatus natsConnection_PublishRequestString(natsConnection *nc, const char *subj, const char *reply, const char *str); -// NATS_EXTERN natsStatus natsConnection_Request(natsMsg **replyMsg, natsConnection *nc, const char *subj, const void *data, int dataLen, int64_t timeout); -// NATS_EXTERN natsStatus natsConnection_RequestString(natsMsg **replyMsg, natsConnection *nc, const char *subj, const char *str, int64_t timeout); -// NATS_EXTERN natsStatus natsConnection_RequestMsg(natsMsg **replyMsg, natsConnection *nc,natsMsg *requestMsg, int64_t timeout); -// NATS_EXTERN natsStatus natsConnection_Subscribe(natsSubscription **sub, natsConnection *nc, const char *subject, natsMsgHandler cb, void *cbClosure); -// NATS_EXTERN natsStatus natsConnection_SubscribeTimeout(natsSubscription **sub, natsConnection *nc, const char *subject, int64_t timeout, natsMsgHandler cb, void *cbClosure); -// NATS_EXTERN natsStatus natsConnection_SubscribeSync(natsSubscription **sub, natsConnection *nc, const char *subject); -// NATS_EXTERN natsStatus natsConnection_QueueSubscribe(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup, natsMsgHandler cb, void *cbClosure); -// NATS_EXTERN natsStatus natsConnection_QueueSubscribeTimeout(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup, int64_t timeout, natsMsgHandler cb, void *cbClosure); -// NATS_EXTERN natsStatus natsConnection_QueueSubscribeSync(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup); - -// const ConnectionOptions = opaque { pub fn create() Error!*ConnectionOptions { var self: *ConnectionOptions = undefined; diff --git a/src/nats.zig b/src/nats.zig index d815bc6..d9809a3 100644 --- a/src/nats.zig +++ b/src/nats.zig @@ -22,12 +22,19 @@ const err_ = @import("./error.zig"); const con_ = @import("./connection.zig"); const sub_ = @import("./subscription.zig"); const msg_ = @import("./message.zig"); +const sta_ = @import("./statistics.zig"); pub const default_server_url = con_.default_server_url; pub const Connection = con_.Connection; +pub const ConnectionOptions = con_.ConnectionOptions; + pub const Subscription = sub_.Subscription; + pub const Message = msg_.Message; +pub const Statistics = sta_.Statistics; +pub const StatsCounts = sta_.StatsCounts; + const Status = err_.Status; pub const Error = err_.Error; @@ -129,39 +136,6 @@ pub fn deinitWait(timeout: i64) Error!void { return status.raise(); } -pub const StatsCounts = struct { - messages_in: u64 = 0, - bytes_in: u64 = 0, - messages_out: u64 = 0, - bytes_out: u64 = 0, - reconnects: u64 = 0, -}; - -pub const Statistics = opaque { - pub fn create() Error!*Statistics { - var stats: *Statistics = undefined; - const status = Status.fromInt(nats_c.natsStatistics_Create(@ptrCast(&stats))); - return status.toError() orelse stats; - } - - pub fn deinit(self: *Statistics) void { - nats_c.natsStatistics_Destroy(@ptrCast(self)); - } - - pub fn getCounts(self: *Statistics) Error!StatsCounts { - var counts: StatsCounts = .{}; - const status = Status.fromInt(nats_c.natsStatistics_GetCounts)( - self, - &counts.messages_in, - &counts.bytes_in, - &counts.messages_out, - &counts.bytes_out, - &counts.reconnects, - ); - return status.toError() orelse counts; - } -}; - // This appears to be a jetstream API, but these two endpoints are trivial, so, whoops. // I have no clue what this does, since there's basically no pub const Inbox = opaque { diff --git a/src/statistics.zig b/src/statistics.zig new file mode 100644 index 0000000..d40b923 --- /dev/null +++ b/src/statistics.zig @@ -0,0 +1,42 @@ +const std = @import("std"); + +const nats_c = @cImport({ + @cInclude("nats/nats.h"); +}); + +const err_ = @import("./error.zig"); +const Status = err_.Status; +const Error = err_.Error; + +pub const StatsCounts = struct { + messages_in: u64 = 0, + bytes_in: u64 = 0, + messages_out: u64 = 0, + bytes_out: u64 = 0, + reconnects: u64 = 0, +}; + +pub const Statistics = opaque { + pub fn create() Error!*Statistics { + var stats: *Statistics = undefined; + const status = Status.fromInt(nats_c.natsStatistics_Create(@ptrCast(&stats))); + return status.toError() orelse stats; + } + + pub fn destroy(self: *Statistics) void { + nats_c.natsStatistics_Destroy(@ptrCast(self)); + } + + pub fn getCounts(self: *Statistics) Error!StatsCounts { + var counts: StatsCounts = .{}; + const status = Status.fromInt(nats_c.natsStatistics_GetCounts)( + self, + &counts.messages_in, + &counts.bytes_in, + &counts.messages_out, + &counts.bytes_out, + &counts.reconnects, + ); + return status.toError() orelse counts; + } +}; diff --git a/src/subscription.zig b/src/subscription.zig index 2bacab8..92c8aae 100644 --- a/src/subscription.zig +++ b/src/subscription.zig @@ -169,21 +169,21 @@ pub const Subscription = opaque { } }; -const BareSubscriptionCallback = fn ( +const SubscriptionCallback = fn ( ?*nats_c.natsConnection, ?*nats_c.natsSubscription, ?*nats_c.natsMsg, ?*anyopaque, ) callconv(.C) void; -pub fn SubscriptionThunkCallback(comptime T: type) type { +pub fn SubscriptionCallbackSignature(comptime T: type) type { return fn (*T, *Connection, *Subscription, *Message) void; } -pub fn subscriptionMessageThunk( +pub fn makeSubscriptionCallbackThunk( comptime T: type, - comptime callback: *const SubscriptionThunkCallback(T), -) *const BareSubscriptionCallback { + comptime callback: *const SubscriptionCallbackSignature(T), +) *const SubscriptionCallback { return struct { fn thunk( conn: ?*nats_c.natsConnection,