diff --git a/src/connection.zig b/src/connection.zig new file mode 100644 index 0000000..81eddae --- /dev/null +++ b/src/connection.zig @@ -0,0 +1,135 @@ +const std = @import("std"); + +pub const nats_c = @cImport({ + @cInclude("nats/nats.h"); +}); + +const sub_ = @import("./subscription.zig"); +const Subscription = sub_.Subscription; +const ThunkCallback = sub_.ThunkCallback; +const messageThunk = sub_.messageThunk; + +const msg_ = @import("./message.zig"); +const Message = msg_.Message; + +const err_ = @import("./error.zig"); +const Error = err_.Error; +const Status = err_.Status; + +pub const Options = opaque {}; + +pub const default_server_url: [:0]const u8 = nats_c.NATS_DEFAULT_URL; + +pub const Connection = opaque { + pub fn connect(options: *Options) Error!*Connection { + var self: *Connection = undefined; + const status = Status.fromInt(nats_c.natsConnection_Connect(@ptrCast(&self), @ptrCast(options))); + return status.toError() orelse self; + } + + pub fn connectTo(urls: [:0]const u8) Error!*Connection { + var self: *Connection = undefined; + const status = Status.fromInt( + nats_c.natsConnection_ConnectTo(@ptrCast(&self), urls.ptr), + ); + + return status.toError() orelse self; + } + + pub fn close(self: *Connection) void { + return nats_c.natsConnection_Close(@ptrCast(self)); + } + + pub fn destroy(self: *Connection) void { + return nats_c.natsConnection_Destroy(@ptrCast(self)); + } + + pub fn publishString( + self: *Connection, + subject: [:0]const u8, + message: [:0]const u8, + ) Error!void { + const status = Status.fromInt(nats_c.natsConnection_PublishString( + @ptrCast(self), + subject, + message, + )); + return status.raise(); + } + + pub fn requestString( + self: *Connection, + subject: [:0]const u8, + request: [:0]const u8, + timeout: i64, + ) Error!*Message { + var msg: *Message = undefined; + const status = Status.fromInt(nats_c.natsConnection_RequestString( + @ptrCast(&msg), + @ptrCast(self), + subject, + request, + timeout, + )); + return status.toError() orelse msg; + } + + pub fn subscribe( + self: *Connection, + comptime T: type, + subject: [:0]const u8, + callback: ThunkCallback(T), + userdata: *T, + ) Error!*Subscription { + var sub: *Subscription = undefined; + const status = Status.fromInt(nats_c.natsConnection_Subscribe( + @ptrCast(&sub), + @ptrCast(self), + subject, + messageThunk(T, callback), + userdata, + )); + return status.toError() orelse sub; + } +}; + +// NATS_EXTERN natsStatus natsConnection_Connect(natsConnection **nc, natsOptions *options); +// NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc); +// NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc); +// NATS_EXTERN natsStatus natsConnection_ConnectTo(natsConnection **nc, const char *urls); +// NATS_EXTERN bool natsConnection_IsClosed(natsConnection *nc); +// NATS_EXTERN bool natsConnection_IsReconnecting(natsConnection *nc); +// NATS_EXTERN natsConnStatus natsConnection_Status(natsConnection *nc); +// NATS_EXTERN int natsConnection_Buffered(natsConnection *nc); +// NATS_EXTERN natsStatus natsConnection_Flush(natsConnection *nc); +// NATS_EXTERN natsStatus natsConnection_FlushTimeout(natsConnection *nc, int64_t timeout); +// NATS_EXTERN int64_t natsConnection_GetMaxPayload(natsConnection *nc); +// NATS_EXTERN natsStatus natsConnection_GetStats(natsConnection *nc, natsStatistics *stats); +// NATS_EXTERN natsStatus natsConnection_GetConnectedUrl(natsConnection *nc, char *buffer, size_t bufferSize); +// NATS_EXTERN natsStatus natsConnection_GetConnectedServerId(natsConnection *nc, char *buffer, size_t bufferSize); +// NATS_EXTERN natsStatus natsConnection_GetServers(natsConnection *nc, char ***servers, int *count); +// NATS_EXTERN natsStatus natsConnection_GetDiscoveredServers(natsConnection *nc, char ***servers, int *count); +// NATS_EXTERN natsStatus natsConnection_GetLastError(natsConnection *nc, const char **lastError); +// NATS_EXTERN natsStatus natsConnection_GetClientID(natsConnection *nc, uint64_t *cid); +// NATS_EXTERN natsStatus natsConnection_Drain(natsConnection *nc); +// NATS_EXTERN natsStatus natsConnection_DrainTimeout(natsConnection *nc, int64_t timeout); +// NATS_EXTERN natsStatus natsConnection_Sign(natsConnection *nc, const unsigned char *message, int messageLen, unsigned char sig[64]); +// NATS_EXTERN natsStatus natsConnection_GetClientIP(natsConnection *nc, char **ip); +// NATS_EXTERN natsStatus natsConnection_GetRTT(natsConnection *nc, int64_t *rtt); +// NATS_EXTERN natsStatus natsConnection_HasHeaderSupport(natsConnection *nc); +// NATS_EXTERN void natsConnection_Close(natsConnection *nc); +// NATS_EXTERN void natsConnection_Destroy(natsConnection *nc); +// NATS_EXTERN natsStatus natsConnection_Publish(natsConnection *nc, const char *subj, const void *data, int dataLen); +// NATS_EXTERN natsStatus natsConnection_PublishString(natsConnection *nc, const char *subj, const char *str); +// NATS_EXTERN natsStatus natsConnection_PublishMsg(natsConnection *nc, natsMsg *msg); +// NATS_EXTERN natsStatus natsConnection_PublishRequest(natsConnection *nc, const char *subj, const char *reply, const void *data, int dataLen); +// NATS_EXTERN natsStatus natsConnection_PublishRequestString(natsConnection *nc, const char *subj, const char *reply, const char *str); +// NATS_EXTERN natsStatus natsConnection_Request(natsMsg **replyMsg, natsConnection *nc, const char *subj, const void *data, int dataLen, int64_t timeout); +// NATS_EXTERN natsStatus natsConnection_RequestString(natsMsg **replyMsg, natsConnection *nc, const char *subj, const char *str, int64_t timeout); +// NATS_EXTERN natsStatus natsConnection_RequestMsg(natsMsg **replyMsg, natsConnection *nc,natsMsg *requestMsg, int64_t timeout); +// NATS_EXTERN natsStatus natsConnection_Subscribe(natsSubscription **sub, natsConnection *nc, const char *subject, natsMsgHandler cb, void *cbClosure); +// NATS_EXTERN natsStatus natsConnection_SubscribeTimeout(natsSubscription **sub, natsConnection *nc, const char *subject, int64_t timeout, natsMsgHandler cb, void *cbClosure); +// NATS_EXTERN natsStatus natsConnection_SubscribeSync(natsSubscription **sub, natsConnection *nc, const char *subject); +// NATS_EXTERN natsStatus natsConnection_QueueSubscribe(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup, natsMsgHandler cb, void *cbClosure); +// NATS_EXTERN natsStatus natsConnection_QueueSubscribeTimeout(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup, int64_t timeout, natsMsgHandler cb, void *cbClosure); +// NATS_EXTERN natsStatus natsConnection_QueueSubscribeSync(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup); diff --git a/src/error.zig b/src/error.zig new file mode 100644 index 0000000..fa6fc35 --- /dev/null +++ b/src/error.zig @@ -0,0 +1,263 @@ +const std = @import("std"); + +pub const nats_c = @cImport({ + @cInclude("nats/nats.h"); +}); + +// pub const AllocError = Error || std.mem.Allocator.Error; + +pub const ErrorInfo = struct { + code: ?Error, + desc: [:0]const u8, +}; + +pub fn getLastError() ErrorInfo { + const status: c_uint = 0; + const desc = nats_c.nats_GetLastError(&status); + + return .{ + .code = Status.fromInt(status).toError(), + .desc = std.mem.sliceTo(desc, 0), + }; +} + +pub fn getLastErrorStack(buffer: []u8) Error!void { + const status = Status.fromInt(nats_c.getLastErrorStack(buffer.ptr, buffer.len)); + return status.raise(); +} + +// NATS_EXTERN void nats_PrintLastErrorStack(FILE *file); + +pub const Status = enum(c_int) { + okay = nats_c.NATS_OK, + generic_error = nats_c.NATS_ERR, + protocol_error = nats_c.NATS_PROTOCOL_ERROR, + io_error = nats_c.NATS_IO_ERROR, + line_too_long = nats_c.NATS_LINE_TOO_LONG, + connection_closed = nats_c.NATS_CONNECTION_CLOSED, + no_server = nats_c.NATS_NO_SERVER, + stale_connection = nats_c.NATS_STALE_CONNECTION, + secure_connection_wanted = nats_c.NATS_SECURE_CONNECTION_WANTED, + secure_connection_required = nats_c.NATS_SECURE_CONNECTION_REQUIRED, + connection_disconnected = nats_c.NATS_CONNECTION_DISCONNECTED, + connection_auth_failed = nats_c.NATS_CONNECTION_AUTH_FAILED, + not_permitted = nats_c.NATS_NOT_PERMITTED, + not_found = nats_c.NATS_NOT_FOUND, + address_missing = nats_c.NATS_ADDRESS_MISSING, + invalid_subject = nats_c.NATS_INVALID_SUBJECT, + invalid_arg = nats_c.NATS_INVALID_ARG, + invalid_subscription = nats_c.NATS_INVALID_SUBSCRIPTION, + invalid_timeout = nats_c.NATS_INVALID_TIMEOUT, + illegal_state = nats_c.NATS_ILLEGAL_STATE, + slow_consumer = nats_c.NATS_SLOW_CONSUMER, + max_payload = nats_c.NATS_MAX_PAYLOAD, + max_delivered_msgs = nats_c.NATS_MAX_DELIVERED_MSGS, + insufficient_buffer = nats_c.NATS_INSUFFICIENT_BUFFER, + no_memory = nats_c.NATS_NO_MEMORY, + sys_error = nats_c.NATS_SYS_ERROR, + timeout = nats_c.NATS_TIMEOUT, + failed_to_initialize = nats_c.NATS_FAILED_TO_INITIALIZE, + not_initialized = nats_c.NATS_NOT_INITIALIZED, + ssl_error = nats_c.NATS_SSL_ERROR, + no_server_support = nats_c.NATS_NO_SERVER_SUPPORT, + not_yet_connected = nats_c.NATS_NOT_YET_CONNECTED, + draining = nats_c.NATS_DRAINING, + invalid_queue_name = nats_c.NATS_INVALID_QUEUE_NAME, + no_responders = nats_c.NATS_NO_RESPONDERS, + mismatch = nats_c.NATS_MISMATCH, + missed_heartbeat = nats_c.NATS_MISSED_HEARTBEAT, + _, + + // this is a weird quirk of translate-c. All the enum members are translated as independent + // constants of type c_int, but the enum type itself is translated as c_uint. + pub fn fromInt(int: c_uint) Status { + return @enumFromInt(int); + } + + pub fn description(self: Status) [:0]const u8 { + return std.mem.sliceTo(nats_c.natsStatus_GetText(self), 0); + } + + pub fn raise(self: Status) Error!void { + return switch (self) { + .okay => void{}, + .generic_error => Error.GenericError, + .protocol_error => Error.ProtocolError, + .io_error => Error.IoError, + .line_too_long => Error.LineTooLong, + .connection_closed => Error.ConnectionClosed, + .no_server => Error.NoServer, + .stale_connection => Error.StaleConnection, + .secure_connection_wanted => Error.SecureConnectionWanted, + .secure_connection_required => Error.SecureConnectionRequired, + .connection_disconnected => Error.ConnectionDisconnected, + .connection_auth_failed => Error.ConnectionAuthFailed, + .not_permitted => Error.NotPermitted, + .not_found => Error.NotFound, + .address_missing => Error.AddressMissing, + .invalid_subject => Error.InvalidSubject, + .invalid_arg => Error.InvalidArg, + .invalid_subscription => Error.InvalidSubscription, + .invalid_timeout => Error.InvalidTimeout, + .illegal_state => Error.IllegalState, + .slow_consumer => Error.SlowConsumer, + .max_payload => Error.MaxPayload, + .max_delivered_msgs => Error.MaxDeliveredMsgs, + .insufficient_buffer => Error.InsufficientBuffer, + .no_memory => Error.NoMemory, + .sys_error => Error.SysError, + .timeout => Error.Timeout, + .failed_to_initialize => Error.FailedToInitialize, + .not_initialized => Error.NotInitialized, + .ssl_error => Error.SslError, + .no_server_support => Error.NoServerSupport, + .not_yet_connected => Error.NotYetConnected, + .draining => Error.Draining, + .invalid_queue_name => Error.InvalidQueueName, + .no_responders => Error.NoResponders, + .mismatch => Error.Mismatch, + .missed_heartbeat => Error.MissedHeartbeat, + _ => Error.UnknownError, + }; + } + + pub fn toError(self: Status) ?Error { + return switch (self) { + .okay => null, + .generic_error => Error.GenericError, + .protocol_error => Error.ProtocolError, + .io_error => Error.IoError, + .line_too_long => Error.LineTooLong, + .connection_closed => Error.ConnectionClosed, + .no_server => Error.NoServer, + .stale_connection => Error.StaleConnection, + .secure_connection_wanted => Error.SecureConnectionWanted, + .secure_connection_required => Error.SecureConnectionRequired, + .connection_disconnected => Error.ConnectionDisconnected, + .connection_auth_failed => Error.ConnectionAuthFailed, + .not_permitted => Error.NotPermitted, + .not_found => Error.NotFound, + .address_missing => Error.AddressMissing, + .invalid_subject => Error.InvalidSubject, + .invalid_arg => Error.InvalidArg, + .invalid_subscription => Error.InvalidSubscription, + .invalid_timeout => Error.InvalidTimeout, + .illegal_state => Error.IllegalState, + .slow_consumer => Error.SlowConsumer, + .max_payload => Error.MaxPayload, + .max_delivered_msgs => Error.MaxDeliveredMsgs, + .insufficient_buffer => Error.InsufficientBuffer, + .no_memory => Error.NoMemory, + .sys_error => Error.SysError, + .timeout => Error.Timeout, + .failed_to_initialize => Error.FailedToInitialize, + .not_initialized => Error.NotInitialized, + .ssl_error => Error.SslError, + .no_server_support => Error.NoServerSupport, + .not_yet_connected => Error.NotYetConnected, + .draining => Error.Draining, + .invalid_queue_name => Error.InvalidQueueName, + .no_responders => Error.NoResponders, + .mismatch => Error.Mismatch, + .missed_heartbeat => Error.MissedHeartbeat, + _ => Error.UnknownError, + }; + } +}; + +pub const Error = error{ + /// Generic error + GenericError, + /// Error when parsing a protocol message, or not getting the expected message. + ProtocolError, + /// IO Error (network communication). + IoError, + /// The protocol message read from the socket does not fit in the read buffer. + LineTooLong, + + /// Operation on this connection failed because the connection is closed. + ConnectionClosed, + /// Unable to connect, the server could not be reached or is not running. + NoServer, + /// The server closed our connection because it did not receive PINGs at the expected interval. + StaleConnection, + /// The client is configured to use TLS, but the server is not. + SecureConnectionWanted, + /// The server expects a TLS connection. + SecureConnectionRequired, + /// The connection was disconnected. Depending on the configuration, the connection may reconnect. + ConnectionDisconnected, + + /// The connection failed due to authentication error. + ConnectionAuthFailed, + /// The action is not permitted. + NotPermitted, + /// An action could not complete because something was not found. So far, this is an internal error. + NotFound, + + /// Incorrect URL. For instance no host specified in the URL. + AddressMissing, + + /// Invalid subject, for instance NULL or empty string. + InvalidSubject, + /// An invalid argument is passed to a function. For instance passing NULL to an API that does not accept this value. + InvalidArg, + /// The call to a subscription function fails because the subscription has previously been closed. + InvalidSubscription, + /// Timeout must be positive numbers. + InvalidTimeout, + + /// An unexpected state, for instance calling natsSubscription_NextMsg on an asynchronous subscriber. + IllegalState, + + /// The maximum number of messages waiting to be delivered has been reached. Messages are dropped. + SlowConsumer, + + /// Attempt to send a payload larger than the maximum allowed by the NATS Server. + MaxPayload, + /// Attempt to receive more messages than allowed, for instance because of #natsSubscription_AutoUnsubscribe(). + MaxDeliveredMsgs, + + /// A buffer is not large enough to accommodate the data. + InsufficientBuffer, + + /// An operation could not complete because of insufficient memory. + NoMemory, + + /// Some system function returned an error. + SysError, + + /// An operation timed-out. For instance #natsSubscription_NextMsg(). + Timeout, + + /// The library failed to initialize. + FailedToInitialize, + /// The library is not yet initialized. + NotInitialized, + + /// An SSL error occurred when trying to establish a connection. + SslError, + + /// The server does not support this action. + NoServerSupport, + + /// A connection could not be immediately established and #natsOptions_SetRetryOnFailedConnect() specified a connected callback. The connect is retried asynchronously. + NotYetConnected, + + /// A connection and/or subscription entered the draining mode. Some operations will fail when in that mode. + Draining, + + /// An invalid queue name was passed when creating a queue subscription. + InvalidQueueName, + + /// No responders were running when the server received the request. + NoResponders, + + /// For JetStream subscriptions, it means that a consumer sequence mismatch was discovered. + Mismatch, + /// For JetStream subscriptions, it means that the library detected that server heartbeats have been missed. + MissedHeartbeat, + + /// The C API has returned an error that the zig layer does not know about. + UnknownError, +}; diff --git a/src/message.zig b/src/message.zig new file mode 100644 index 0000000..a897516 --- /dev/null +++ b/src/message.zig @@ -0,0 +1,30 @@ +const std = @import("std"); + +pub const nats_c = @cImport({ + @cInclude("nats/nats.h"); +}); + +pub const Message = opaque { + pub fn destroy(self: *Message) void { + nats_c.natsMsg_Destroy(@ptrCast(self)); + } + + pub fn getSubject(self: *Message) [:0]const u8 { + const subject = nats_c.natsMsg_GetSubject(@ptrCast(self)) orelse unreachable; + return std.mem.sliceTo(subject, 0); + } + + pub fn getReply(self: *Message) ?[:0]const u8 { + const reply = nats_c.natsMsg_GetReply(@ptrCast(self)) orelse return null; + return std.mem.sliceTo(reply, 0); + } + + pub fn getData(self: *Message) ?[:0]const u8 { + const data = nats_c.natsMsg_GetData(@ptrCast(self)) orelse return null; + return data[0..self.getDataLength() :0]; + } + + pub fn getDataLength(self: *Message) usize { + return @intCast(nats_c.natsMsg_GetDataLength(@ptrCast(self))); + } +}; diff --git a/src/nats.zig b/src/nats.zig index c3a4b7a..bf1be17 100644 --- a/src/nats.zig +++ b/src/nats.zig @@ -18,85 +18,140 @@ pub const nats_c = @cImport({ @cInclude("nats/nats.h"); }); -fn onMessage( - conn: ?*nats_c.natsConnection, - sub: ?*nats_c.natsSubscription, - message: ?*nats_c.natsMsg, - userdata: ?*anyopaque, -) callconv(.C) void { - _ = sub; - defer nats_c.natsMsg_Destroy(message); +const err_ = @import("./error.zig"); +const con_ = @import("./connection.zig"); +const sub_ = @import("./subscription.zig"); +const msg_ = @import("./message.zig"); - const msgData = nats_c.natsMsg_GetData(message)[0..@intCast(nats_c.natsMsg_GetDataLength(message))]; - std.debug.print("Received message: {s} - {s}\n", .{ nats_c.natsMsg_GetSubject(message), msgData }); +pub const default_server_url = con_.default_server_url; +pub const Connection = con_.Connection; +pub const Subscription = sub_.Subscription; +pub const Message = msg_.Message; - if (@as(?[*]const u8, nats_c.natsMsg_GetReply(message))) |reply| { - _ = nats_c.natsConnection_PublishString(conn, reply, "salutations"); +const Status = err_.Status; +pub const Error = err_.Error; + +fn onMessage(userdata: *bool, connection: *Connection, subscription: *Subscription, message: *Message) void { + _ = subscription; + + std.debug.print("Subject \"{s}\" received message: \"{s}\"\n", .{ + message.getSubject(), + message.getData() orelse "[null]", + }); + + if (message.getReply()) |reply| { + connection.publishString(reply, "salutations") catch @panic("HELP"); } - if (@as(?*bool, @ptrCast(userdata))) |signal| { - signal.* = true; - } + userdata.* = true; } -pub fn main() void { - var conn: ?*nats_c.natsConnection = null; - defer nats_c.natsConnection_Destroy(conn); +pub fn main() !void { + const connection = try Connection.connectTo(default_server_url); + defer connection.destroy(); - if (nats_c.natsConnection_ConnectTo(&conn, nats_c.NATS_DEFAULT_URL) != nats_c.NATS_OK) { - std.debug.print("oh no {s}\n", .{nats_c.NATS_DEFAULT_URL}); - return; - } - - var sub: ?*nats_c.natsSubscription = null; - defer nats_c.natsSubscription_Destroy(sub); var done = false; - if (nats_c.natsConnection_Subscribe(&sub, conn, "channel", onMessage, &done) != nats_c.NATS_OK) { - std.debug.print("whops\n", .{}); - return; - } + const subscription = try connection.subscribe(bool, "channel", onMessage, &done); + defer subscription.destroy(); while (!done) { - var reply: ?*nats_c.natsMsg = null; - defer nats_c.natsMsg_Destroy(reply); + const reply = try connection.requestString("channel", "greetings", 1000); + defer reply.destroy(); - if (nats_c.natsConnection_RequestString(&reply, conn, "channel", "whatsup", 1000) != nats_c.NATS_OK) { - std.debug.print("geez\n", .{}); - return; - } else if (reply) |message| { - const msgData = nats_c.natsMsg_GetData(message)[0..@intCast(nats_c.natsMsg_GetDataLength(message))]; - std.debug.print("Got reply: {s}\n", .{msgData}); - } + std.debug.print("Reply \"{s}\" got message: {s}\n", .{ + reply.getSubject(), + reply.getData() orelse "[null]", + }); } } -// NATS_EXTERN natsStatus nats_Open(int64_t lockSpinCount); -// NATS_EXTERN const char* nats_GetVersion(void); -// NATS_EXTERN uint32_t nats_GetVersionNumber(void); +pub fn getVersion() [:0]const u8 { + const verString = nats_c.nats_GetVersion(); + return std.mem.sliceTo(verString, 0); +} -// #define nats_CheckCompatibility() nats_CheckCompatibilityImpl(NATS_VERSION_REQUIRED_NUMBER, NATS_VERSION_NUMBER, NATS_VERSION_STRING) -// NATS_EXTERN bool nats_CheckCompatibilityImpl(uint32_t reqVerNumber, uint32_t verNumber, const char *verString); +pub fn getVersionNumber() u32 { + return nats_c.nats_GetVersionNumber(); +} -// NATS_EXTERN int64_t nats_Now(void); -// NATS_EXTERN int64_t nats_NowInNanoSeconds(void); -// NATS_EXTERN void nats_Sleep(int64_t sleepTime); -// NATS_EXTERN const char* nats_GetLastError(natsStatus *status); -// NATS_EXTERN natsStatus nats_GetLastErrorStack(char *buffer, size_t bufLen); -// NATS_EXTERN void nats_PrintLastErrorStack(FILE *file); -// NATS_EXTERN natsStatus nats_SetMessageDeliveryPoolSize(int max); -// NATS_EXTERN void nats_ReleaseThreadMemory(void); +pub fn checkCompatibility() bool { + return nats_c.nats_CheckCompatibilityImpl( + nats_c.NATS_VERSION_REQUIRED_NUMBER, + nats_c.NATS_VERSION_NUMBER, + nats_c.NATS_VERSION_STRING, + ); +} + +pub fn now() i64 { + return nats_c.nats_Now(); +} + +pub fn nowInNanoSeconds() i64 { + return nats_c.nats_NowInNanoSeconds(); +} + +pub fn sleep(sleep_time: i64) void { + return nats_c.nats_Sleep(sleep_time); +} + +pub fn setMessageDeliveryPoolSize(max: c_int) Error!void { + const status = Status.fromInt(nats_c.nats_SetMessageDeliveryPoolSize(max)); + return status.raise(); +} + +pub fn releaseThreadMemory() void { + return nats_c.nats_ReleaseThreadMemory(); +} + +pub fn init(lock_spin_count: i64) Error!void { + const status = Status.fromInt(nats_c.nats_Open(lock_spin_count)); + return status.raise(); +} + +pub fn deinit() void { + return nats_c.nats_Close(); +} + +pub fn deinitWait(timeout: i64) Error!void { + const status = Status.fromInt(nats_c.natsCloseAndWait(timeout)); + return status.raise(); +} + +pub const StatsCounts = struct { + messages_in: u64 = 0, + bytes_in: u64 = 0, + messages_out: u64 = 0, + bytes_out: u64 = 0, + reconnects: u64 = 0, +}; + +pub const Statistics = opaque { + pub fn create() Error!*Statistics { + var stats: *Statistics = undefined; + const status = Status.fromInt(nats_c.natsStatistics_Create(@ptrCast(&stats))); + return status.toError() orelse stats; + } + + pub fn deinit(self: *Statistics) void { + nats_c.natsStatistics_Destroy(@ptrCast(self)); + } + + pub fn getCounts(self: *Statistics) Error!StatsCounts { + var counts: StatsCounts = .{}; + const status = Status.fromInt(nats_c.natsStatistics_GetCounts)( + self, + &counts.messages_in, + &counts.bytes_in, + &counts.messages_out, + &counts.bytes_out, + &counts.reconnects, + ); + return status.toError() orelse counts; + } +}; // NATS_EXTERN natsStatus nats_Sign(const char *encodedSeed, const char *input, unsigned char **signature, int *signatureLength); -// NATS_EXTERN void nats_Close(void); -// NATS_EXTERN natsStatus nats_CloseAndWait(int64_t timeout); - -// NATS_EXTERN const char* natsStatus_GetText(natsStatus s); - -// NATS_EXTERN natsStatus natsStatistics_Create(natsStatistics **newStats); -// NATS_EXTERN natsStatus natsStatistics_GetCounts(const natsStatistics *stats, uint64_t *inMsgs, uint64_t *inBytes, uint64_t *outMsgs, uint64_t *outBytes, uint64_t *reconnects); -// NATS_EXTERN void natsStatistics_Destroy(natsStatistics *stats); - // NATS_EXTERN natsStatus natsOptions_Create(natsOptions **newOpts); // NATS_EXTERN natsStatus natsOptions_SetURL(natsOptions *opts, const char *url); // NATS_EXTERN natsStatus natsOptions_SetServers(natsOptions *opts, const char** servers, int serversCount); @@ -177,47 +232,6 @@ pub fn main() void { // NATS_EXTERN int stanMsg_GetDataLength(const stanMsg *msg); // NATS_EXTERN void stanMsg_Destroy(stanMsg *msg); -// NATS_EXTERN natsStatus natsConnection_Connect(natsConnection **nc, natsOptions *options); -// NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc); -// NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc); -// NATS_EXTERN natsStatus natsConnection_ConnectTo(natsConnection **nc, const char *urls); -// NATS_EXTERN bool natsConnection_IsClosed(natsConnection *nc); -// NATS_EXTERN bool natsConnection_IsReconnecting(natsConnection *nc); -// NATS_EXTERN natsConnStatus natsConnection_Status(natsConnection *nc); -// NATS_EXTERN int natsConnection_Buffered(natsConnection *nc); -// NATS_EXTERN natsStatus natsConnection_Flush(natsConnection *nc); -// NATS_EXTERN natsStatus natsConnection_FlushTimeout(natsConnection *nc, int64_t timeout); -// NATS_EXTERN int64_t natsConnection_GetMaxPayload(natsConnection *nc); -// NATS_EXTERN natsStatus natsConnection_GetStats(natsConnection *nc, natsStatistics *stats); -// NATS_EXTERN natsStatus natsConnection_GetConnectedUrl(natsConnection *nc, char *buffer, size_t bufferSize); -// NATS_EXTERN natsStatus natsConnection_GetConnectedServerId(natsConnection *nc, char *buffer, size_t bufferSize); -// NATS_EXTERN natsStatus natsConnection_GetServers(natsConnection *nc, char ***servers, int *count); -// NATS_EXTERN natsStatus natsConnection_GetDiscoveredServers(natsConnection *nc, char ***servers, int *count); -// NATS_EXTERN natsStatus natsConnection_GetLastError(natsConnection *nc, const char **lastError); -// NATS_EXTERN natsStatus natsConnection_GetClientID(natsConnection *nc, uint64_t *cid); -// NATS_EXTERN natsStatus natsConnection_Drain(natsConnection *nc); -// NATS_EXTERN natsStatus natsConnection_DrainTimeout(natsConnection *nc, int64_t timeout); -// NATS_EXTERN natsStatus natsConnection_Sign(natsConnection *nc, const unsigned char *message, int messageLen, unsigned char sig[64]); -// NATS_EXTERN natsStatus natsConnection_GetClientIP(natsConnection *nc, char **ip); -// NATS_EXTERN natsStatus natsConnection_GetRTT(natsConnection *nc, int64_t *rtt); -// NATS_EXTERN natsStatus natsConnection_HasHeaderSupport(natsConnection *nc); -// NATS_EXTERN void natsConnection_Close(natsConnection *nc); -// NATS_EXTERN void natsConnection_Destroy(natsConnection *nc); -// NATS_EXTERN natsStatus natsConnection_Publish(natsConnection *nc, const char *subj, const void *data, int dataLen); -// NATS_EXTERN natsStatus natsConnection_PublishString(natsConnection *nc, const char *subj, const char *str); -// NATS_EXTERN natsStatus natsConnection_PublishMsg(natsConnection *nc, natsMsg *msg); -// NATS_EXTERN natsStatus natsConnection_PublishRequest(natsConnection *nc, const char *subj, const char *reply, const void *data, int dataLen); -// NATS_EXTERN natsStatus natsConnection_PublishRequestString(natsConnection *nc, const char *subj, const char *reply, const char *str); -// NATS_EXTERN natsStatus natsConnection_Request(natsMsg **replyMsg, natsConnection *nc, const char *subj, const void *data, int dataLen, int64_t timeout); -// NATS_EXTERN natsStatus natsConnection_RequestString(natsMsg **replyMsg, natsConnection *nc, const char *subj, const char *str, int64_t timeout); -// NATS_EXTERN natsStatus natsConnection_RequestMsg(natsMsg **replyMsg, natsConnection *nc,natsMsg *requestMsg, int64_t timeout); -// NATS_EXTERN natsStatus natsConnection_Subscribe(natsSubscription **sub, natsConnection *nc, const char *subject, natsMsgHandler cb, void *cbClosure); -// NATS_EXTERN natsStatus natsConnection_SubscribeTimeout(natsSubscription **sub, natsConnection *nc, const char *subject, int64_t timeout, natsMsgHandler cb, void *cbClosure); -// NATS_EXTERN natsStatus natsConnection_SubscribeSync(natsSubscription **sub, natsConnection *nc, const char *subject); -// NATS_EXTERN natsStatus natsConnection_QueueSubscribe(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup, natsMsgHandler cb, void *cbClosure); -// NATS_EXTERN natsStatus natsConnection_QueueSubscribeTimeout(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup, int64_t timeout, natsMsgHandler cb, void *cbClosure); -// NATS_EXTERN natsStatus natsConnection_QueueSubscribeSync(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup); - // NATS_EXTERN natsStatus natsSubscription_NoDeliveryDelay(natsSubscription *sub); // NATS_EXTERN natsStatus natsSubscription_NextMsg(natsMsg **nextMsg, natsSubscription *sub, int64_t timeout); // NATS_EXTERN natsStatus natsSubscription_Unsubscribe(natsSubscription *sub); diff --git a/src/subscription.zig b/src/subscription.zig new file mode 100644 index 0000000..a0580b9 --- /dev/null +++ b/src/subscription.zig @@ -0,0 +1,84 @@ +const std = @import("std"); + +pub const nats_c = @cImport({ + @cInclude("nats/nats.h"); +}); + +const Connection = @import("./connection.zig").Connection; +const Message = @import("./message.zig").Message; +const err_ = @import("./error.zig"); +const Error = err_.Error; +const Status = err_.Status; + +const SubCallback = fn (?*nats_c.natsConnection, ?*nats_c.natsSubscription, ?*nats_c.natsMsg, ?*anyopaque) callconv(.C) void; +pub fn ThunkCallback(comptime T: type) type { + return fn (*T, *Connection, *Subscription, *Message) void; +} + +pub fn messageThunk(comptime T: type, comptime callback: *const ThunkCallback(T)) *const SubCallback { + return struct { + pub fn thunk( + conn: ?*nats_c.natsConnection, + sub: ?*nats_c.natsSubscription, + msg: ?*nats_c.natsMsg, + userdata: ?*anyopaque, + ) callconv(.C) void { + const message: *Message = if (msg) |m| @ptrCast(m) else unreachable; + defer message.destroy(); + + const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable; + const subscription: *Subscription = if (sub) |s| @ptrCast(s) else unreachable; + + const data: *T = if (userdata) |u| @ptrCast(u) else unreachable; + + callback(data, connection, subscription, message); + } + }.thunk; +} + +pub const Subscription = opaque { + pub fn destroy(self: *Subscription) void { + nats_c.natsSubscription_Destroy(@ptrCast(self)); + } +}; + +// pub fn Subscription(T: type) type { +// return struct { +// const Self = @This(); + +// _internal: *nats_c.natsSubscription, +// userdata: T, +// function: MessageCallback(T), + +// pub fn create( +// allocator: std.mem.Allocator, +// userdata: T, +// callback: MessageCallback(T), +// ) AllocError!*Self { +// const self: *Self = try std.mem.Allocator.create(Self); +// self.userdata = userdata; +// self.function = callback; +// return self; +// } + +// pub fn wrap( +// self: *Self, +// internal: *nats_c.natsSubscription, +// ) void { +// // self._internal = +// } + +// fn thunk( +// conn: ?*nats_c.natsConnection, +// sub: ?*nats_c.natsSubscription, +// msg: ?*nats_c.natsMsg, +// userdata: ?*const anyopaque, +// ) callconv(.C) void { +// self.function(self.userdata, connection, subscription, message); +// } +// }; +// } + +pub fn MessageCallback(comptime T: type) type { + return *const fn (userdata: T, connection: *Connection, subscription: *Subscription, message: *Message) void; +}