Compare commits

...

2 Commits

Author SHA1 Message Date
fbb45f1567
readme: add some notes about zig version support 2023-08-14 22:46:44 -07:00
aea64fb625
all: start wrapping API
This is pretty much just the API surface that is used in the demo code
for now, with a couple of minor things I tacked on while getting a
feel for how I was going to go about it. Unfortunately, the diff is
too messy to show the improvement provided by the wrapped API, but I'm
pretty pleased with how it is turning out so far. In general, there
have been no major hiccups.

Most of the time working on this was spent noodling about how to create
the subscription callback thunk to provide type safety and pointer
nullability guarantees for the subscription zig API. I'm satisfied
with the solution in place now: it seems to have good default
ergonomics and very few downsides except that the callback function
must be known at compile time. An earlier iteration of this code
supported runtime function pointers but had a showstopping problem
with memory management. In order to store the runtime callback
pointer, the thunk would need to be allocated, but there was no way of
attaching our allocated memory to the c-library subscription object.
No attachment meant that there were a number of error scenarios where
the allocated thunk couldn't be freed correctly.

Ultimately, if the user needs to use runtime callback pointers they can
write a comptime known callback that calls a runtime known callback.
This way all of the object lifetime and ownership is in the user's
hands, which is really where it needs to belong anyway for this.
2023-08-14 22:40:41 -07:00
6 changed files with 631 additions and 101 deletions

View File

@ -10,6 +10,10 @@ There are three main goals:
Right now, in service of goal 3, the underlying C library is built without certain features (notably, without TLS support and without streaming support) because those features require wrangling some complex transitive dependencies (OpenSSL and Protocol Buffers, respectively). Solving this limitation is somewhere on the roadmap, but it's not high priority.
# Zig Version Support
Since the language is still under active development, any written Zig code is a moving target. The plan is to support Zig `0.11.*` exclusively until the NATS library API has good coverage and is stabilized. At that point, if there are major breaking changes, a maintenance branch will be created, and master will probably move to track Zig master.
# Building
Currently, a demonstration executable can be built in the standard fashion, i.e. by running `zig build`.

135
src/connection.zig Normal file
View File

@ -0,0 +1,135 @@
const std = @import("std");
pub const nats_c = @cImport({
@cInclude("nats/nats.h");
});
const sub_ = @import("./subscription.zig");
const Subscription = sub_.Subscription;
const ThunkCallback = sub_.ThunkCallback;
const messageThunk = sub_.messageThunk;
const msg_ = @import("./message.zig");
const Message = msg_.Message;
const err_ = @import("./error.zig");
const Error = err_.Error;
const Status = err_.Status;
pub const Options = opaque {};
pub const default_server_url: [:0]const u8 = nats_c.NATS_DEFAULT_URL;
pub const Connection = opaque {
pub fn connect(options: *Options) Error!*Connection {
var self: *Connection = undefined;
const status = Status.fromInt(nats_c.natsConnection_Connect(@ptrCast(&self), @ptrCast(options)));
return status.toError() orelse self;
}
pub fn connectTo(urls: [:0]const u8) Error!*Connection {
var self: *Connection = undefined;
const status = Status.fromInt(
nats_c.natsConnection_ConnectTo(@ptrCast(&self), urls.ptr),
);
return status.toError() orelse self;
}
pub fn close(self: *Connection) void {
return nats_c.natsConnection_Close(@ptrCast(self));
}
pub fn destroy(self: *Connection) void {
return nats_c.natsConnection_Destroy(@ptrCast(self));
}
pub fn publishString(
self: *Connection,
subject: [:0]const u8,
message: [:0]const u8,
) Error!void {
const status = Status.fromInt(nats_c.natsConnection_PublishString(
@ptrCast(self),
subject,
message,
));
return status.raise();
}
pub fn requestString(
self: *Connection,
subject: [:0]const u8,
request: [:0]const u8,
timeout: i64,
) Error!*Message {
var msg: *Message = undefined;
const status = Status.fromInt(nats_c.natsConnection_RequestString(
@ptrCast(&msg),
@ptrCast(self),
subject,
request,
timeout,
));
return status.toError() orelse msg;
}
pub fn subscribe(
self: *Connection,
comptime T: type,
subject: [:0]const u8,
callback: ThunkCallback(T),
userdata: *T,
) Error!*Subscription {
var sub: *Subscription = undefined;
const status = Status.fromInt(nats_c.natsConnection_Subscribe(
@ptrCast(&sub),
@ptrCast(self),
subject,
messageThunk(T, callback),
userdata,
));
return status.toError() orelse sub;
}
};
// NATS_EXTERN natsStatus natsConnection_Connect(natsConnection **nc, natsOptions *options);
// NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc);
// NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_ConnectTo(natsConnection **nc, const char *urls);
// NATS_EXTERN bool natsConnection_IsClosed(natsConnection *nc);
// NATS_EXTERN bool natsConnection_IsReconnecting(natsConnection *nc);
// NATS_EXTERN natsConnStatus natsConnection_Status(natsConnection *nc);
// NATS_EXTERN int natsConnection_Buffered(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_Flush(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_FlushTimeout(natsConnection *nc, int64_t timeout);
// NATS_EXTERN int64_t natsConnection_GetMaxPayload(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_GetStats(natsConnection *nc, natsStatistics *stats);
// NATS_EXTERN natsStatus natsConnection_GetConnectedUrl(natsConnection *nc, char *buffer, size_t bufferSize);
// NATS_EXTERN natsStatus natsConnection_GetConnectedServerId(natsConnection *nc, char *buffer, size_t bufferSize);
// NATS_EXTERN natsStatus natsConnection_GetServers(natsConnection *nc, char ***servers, int *count);
// NATS_EXTERN natsStatus natsConnection_GetDiscoveredServers(natsConnection *nc, char ***servers, int *count);
// NATS_EXTERN natsStatus natsConnection_GetLastError(natsConnection *nc, const char **lastError);
// NATS_EXTERN natsStatus natsConnection_GetClientID(natsConnection *nc, uint64_t *cid);
// NATS_EXTERN natsStatus natsConnection_Drain(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_DrainTimeout(natsConnection *nc, int64_t timeout);
// NATS_EXTERN natsStatus natsConnection_Sign(natsConnection *nc, const unsigned char *message, int messageLen, unsigned char sig[64]);
// NATS_EXTERN natsStatus natsConnection_GetClientIP(natsConnection *nc, char **ip);
// NATS_EXTERN natsStatus natsConnection_GetRTT(natsConnection *nc, int64_t *rtt);
// NATS_EXTERN natsStatus natsConnection_HasHeaderSupport(natsConnection *nc);
// NATS_EXTERN void natsConnection_Close(natsConnection *nc);
// NATS_EXTERN void natsConnection_Destroy(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_Publish(natsConnection *nc, const char *subj, const void *data, int dataLen);
// NATS_EXTERN natsStatus natsConnection_PublishString(natsConnection *nc, const char *subj, const char *str);
// NATS_EXTERN natsStatus natsConnection_PublishMsg(natsConnection *nc, natsMsg *msg);
// NATS_EXTERN natsStatus natsConnection_PublishRequest(natsConnection *nc, const char *subj, const char *reply, const void *data, int dataLen);
// NATS_EXTERN natsStatus natsConnection_PublishRequestString(natsConnection *nc, const char *subj, const char *reply, const char *str);
// NATS_EXTERN natsStatus natsConnection_Request(natsMsg **replyMsg, natsConnection *nc, const char *subj, const void *data, int dataLen, int64_t timeout);
// NATS_EXTERN natsStatus natsConnection_RequestString(natsMsg **replyMsg, natsConnection *nc, const char *subj, const char *str, int64_t timeout);
// NATS_EXTERN natsStatus natsConnection_RequestMsg(natsMsg **replyMsg, natsConnection *nc,natsMsg *requestMsg, int64_t timeout);
// NATS_EXTERN natsStatus natsConnection_Subscribe(natsSubscription **sub, natsConnection *nc, const char *subject, natsMsgHandler cb, void *cbClosure);
// NATS_EXTERN natsStatus natsConnection_SubscribeTimeout(natsSubscription **sub, natsConnection *nc, const char *subject, int64_t timeout, natsMsgHandler cb, void *cbClosure);
// NATS_EXTERN natsStatus natsConnection_SubscribeSync(natsSubscription **sub, natsConnection *nc, const char *subject);
// NATS_EXTERN natsStatus natsConnection_QueueSubscribe(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup, natsMsgHandler cb, void *cbClosure);
// NATS_EXTERN natsStatus natsConnection_QueueSubscribeTimeout(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup, int64_t timeout, natsMsgHandler cb, void *cbClosure);
// NATS_EXTERN natsStatus natsConnection_QueueSubscribeSync(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup);

263
src/error.zig Normal file
View File

@ -0,0 +1,263 @@
const std = @import("std");
pub const nats_c = @cImport({
@cInclude("nats/nats.h");
});
// pub const AllocError = Error || std.mem.Allocator.Error;
pub const ErrorInfo = struct {
code: ?Error,
desc: [:0]const u8,
};
pub fn getLastError() ErrorInfo {
const status: c_uint = 0;
const desc = nats_c.nats_GetLastError(&status);
return .{
.code = Status.fromInt(status).toError(),
.desc = std.mem.sliceTo(desc, 0),
};
}
pub fn getLastErrorStack(buffer: []u8) Error!void {
const status = Status.fromInt(nats_c.getLastErrorStack(buffer.ptr, buffer.len));
return status.raise();
}
// NATS_EXTERN void nats_PrintLastErrorStack(FILE *file);
pub const Status = enum(c_int) {
okay = nats_c.NATS_OK,
generic_error = nats_c.NATS_ERR,
protocol_error = nats_c.NATS_PROTOCOL_ERROR,
io_error = nats_c.NATS_IO_ERROR,
line_too_long = nats_c.NATS_LINE_TOO_LONG,
connection_closed = nats_c.NATS_CONNECTION_CLOSED,
no_server = nats_c.NATS_NO_SERVER,
stale_connection = nats_c.NATS_STALE_CONNECTION,
secure_connection_wanted = nats_c.NATS_SECURE_CONNECTION_WANTED,
secure_connection_required = nats_c.NATS_SECURE_CONNECTION_REQUIRED,
connection_disconnected = nats_c.NATS_CONNECTION_DISCONNECTED,
connection_auth_failed = nats_c.NATS_CONNECTION_AUTH_FAILED,
not_permitted = nats_c.NATS_NOT_PERMITTED,
not_found = nats_c.NATS_NOT_FOUND,
address_missing = nats_c.NATS_ADDRESS_MISSING,
invalid_subject = nats_c.NATS_INVALID_SUBJECT,
invalid_arg = nats_c.NATS_INVALID_ARG,
invalid_subscription = nats_c.NATS_INVALID_SUBSCRIPTION,
invalid_timeout = nats_c.NATS_INVALID_TIMEOUT,
illegal_state = nats_c.NATS_ILLEGAL_STATE,
slow_consumer = nats_c.NATS_SLOW_CONSUMER,
max_payload = nats_c.NATS_MAX_PAYLOAD,
max_delivered_msgs = nats_c.NATS_MAX_DELIVERED_MSGS,
insufficient_buffer = nats_c.NATS_INSUFFICIENT_BUFFER,
no_memory = nats_c.NATS_NO_MEMORY,
sys_error = nats_c.NATS_SYS_ERROR,
timeout = nats_c.NATS_TIMEOUT,
failed_to_initialize = nats_c.NATS_FAILED_TO_INITIALIZE,
not_initialized = nats_c.NATS_NOT_INITIALIZED,
ssl_error = nats_c.NATS_SSL_ERROR,
no_server_support = nats_c.NATS_NO_SERVER_SUPPORT,
not_yet_connected = nats_c.NATS_NOT_YET_CONNECTED,
draining = nats_c.NATS_DRAINING,
invalid_queue_name = nats_c.NATS_INVALID_QUEUE_NAME,
no_responders = nats_c.NATS_NO_RESPONDERS,
mismatch = nats_c.NATS_MISMATCH,
missed_heartbeat = nats_c.NATS_MISSED_HEARTBEAT,
_,
// this is a weird quirk of translate-c. All the enum members are translated as independent
// constants of type c_int, but the enum type itself is translated as c_uint.
pub fn fromInt(int: c_uint) Status {
return @enumFromInt(int);
}
pub fn description(self: Status) [:0]const u8 {
return std.mem.sliceTo(nats_c.natsStatus_GetText(self), 0);
}
pub fn raise(self: Status) Error!void {
return switch (self) {
.okay => void{},
.generic_error => Error.GenericError,
.protocol_error => Error.ProtocolError,
.io_error => Error.IoError,
.line_too_long => Error.LineTooLong,
.connection_closed => Error.ConnectionClosed,
.no_server => Error.NoServer,
.stale_connection => Error.StaleConnection,
.secure_connection_wanted => Error.SecureConnectionWanted,
.secure_connection_required => Error.SecureConnectionRequired,
.connection_disconnected => Error.ConnectionDisconnected,
.connection_auth_failed => Error.ConnectionAuthFailed,
.not_permitted => Error.NotPermitted,
.not_found => Error.NotFound,
.address_missing => Error.AddressMissing,
.invalid_subject => Error.InvalidSubject,
.invalid_arg => Error.InvalidArg,
.invalid_subscription => Error.InvalidSubscription,
.invalid_timeout => Error.InvalidTimeout,
.illegal_state => Error.IllegalState,
.slow_consumer => Error.SlowConsumer,
.max_payload => Error.MaxPayload,
.max_delivered_msgs => Error.MaxDeliveredMsgs,
.insufficient_buffer => Error.InsufficientBuffer,
.no_memory => Error.NoMemory,
.sys_error => Error.SysError,
.timeout => Error.Timeout,
.failed_to_initialize => Error.FailedToInitialize,
.not_initialized => Error.NotInitialized,
.ssl_error => Error.SslError,
.no_server_support => Error.NoServerSupport,
.not_yet_connected => Error.NotYetConnected,
.draining => Error.Draining,
.invalid_queue_name => Error.InvalidQueueName,
.no_responders => Error.NoResponders,
.mismatch => Error.Mismatch,
.missed_heartbeat => Error.MissedHeartbeat,
_ => Error.UnknownError,
};
}
pub fn toError(self: Status) ?Error {
return switch (self) {
.okay => null,
.generic_error => Error.GenericError,
.protocol_error => Error.ProtocolError,
.io_error => Error.IoError,
.line_too_long => Error.LineTooLong,
.connection_closed => Error.ConnectionClosed,
.no_server => Error.NoServer,
.stale_connection => Error.StaleConnection,
.secure_connection_wanted => Error.SecureConnectionWanted,
.secure_connection_required => Error.SecureConnectionRequired,
.connection_disconnected => Error.ConnectionDisconnected,
.connection_auth_failed => Error.ConnectionAuthFailed,
.not_permitted => Error.NotPermitted,
.not_found => Error.NotFound,
.address_missing => Error.AddressMissing,
.invalid_subject => Error.InvalidSubject,
.invalid_arg => Error.InvalidArg,
.invalid_subscription => Error.InvalidSubscription,
.invalid_timeout => Error.InvalidTimeout,
.illegal_state => Error.IllegalState,
.slow_consumer => Error.SlowConsumer,
.max_payload => Error.MaxPayload,
.max_delivered_msgs => Error.MaxDeliveredMsgs,
.insufficient_buffer => Error.InsufficientBuffer,
.no_memory => Error.NoMemory,
.sys_error => Error.SysError,
.timeout => Error.Timeout,
.failed_to_initialize => Error.FailedToInitialize,
.not_initialized => Error.NotInitialized,
.ssl_error => Error.SslError,
.no_server_support => Error.NoServerSupport,
.not_yet_connected => Error.NotYetConnected,
.draining => Error.Draining,
.invalid_queue_name => Error.InvalidQueueName,
.no_responders => Error.NoResponders,
.mismatch => Error.Mismatch,
.missed_heartbeat => Error.MissedHeartbeat,
_ => Error.UnknownError,
};
}
};
pub const Error = error{
/// Generic error
GenericError,
/// Error when parsing a protocol message, or not getting the expected message.
ProtocolError,
/// IO Error (network communication).
IoError,
/// The protocol message read from the socket does not fit in the read buffer.
LineTooLong,
/// Operation on this connection failed because the connection is closed.
ConnectionClosed,
/// Unable to connect, the server could not be reached or is not running.
NoServer,
/// The server closed our connection because it did not receive PINGs at the expected interval.
StaleConnection,
/// The client is configured to use TLS, but the server is not.
SecureConnectionWanted,
/// The server expects a TLS connection.
SecureConnectionRequired,
/// The connection was disconnected. Depending on the configuration, the connection may reconnect.
ConnectionDisconnected,
/// The connection failed due to authentication error.
ConnectionAuthFailed,
/// The action is not permitted.
NotPermitted,
/// An action could not complete because something was not found. So far, this is an internal error.
NotFound,
/// Incorrect URL. For instance no host specified in the URL.
AddressMissing,
/// Invalid subject, for instance NULL or empty string.
InvalidSubject,
/// An invalid argument is passed to a function. For instance passing NULL to an API that does not accept this value.
InvalidArg,
/// The call to a subscription function fails because the subscription has previously been closed.
InvalidSubscription,
/// Timeout must be positive numbers.
InvalidTimeout,
/// An unexpected state, for instance calling natsSubscription_NextMsg on an asynchronous subscriber.
IllegalState,
/// The maximum number of messages waiting to be delivered has been reached. Messages are dropped.
SlowConsumer,
/// Attempt to send a payload larger than the maximum allowed by the NATS Server.
MaxPayload,
/// Attempt to receive more messages than allowed, for instance because of #natsSubscription_AutoUnsubscribe().
MaxDeliveredMsgs,
/// A buffer is not large enough to accommodate the data.
InsufficientBuffer,
/// An operation could not complete because of insufficient memory.
NoMemory,
/// Some system function returned an error.
SysError,
/// An operation timed-out. For instance #natsSubscription_NextMsg().
Timeout,
/// The library failed to initialize.
FailedToInitialize,
/// The library is not yet initialized.
NotInitialized,
/// An SSL error occurred when trying to establish a connection.
SslError,
/// The server does not support this action.
NoServerSupport,
/// A connection could not be immediately established and #natsOptions_SetRetryOnFailedConnect() specified a connected callback. The connect is retried asynchronously.
NotYetConnected,
/// A connection and/or subscription entered the draining mode. Some operations will fail when in that mode.
Draining,
/// An invalid queue name was passed when creating a queue subscription.
InvalidQueueName,
/// No responders were running when the server received the request.
NoResponders,
/// For JetStream subscriptions, it means that a consumer sequence mismatch was discovered.
Mismatch,
/// For JetStream subscriptions, it means that the library detected that server heartbeats have been missed.
MissedHeartbeat,
/// The C API has returned an error that the zig layer does not know about.
UnknownError,
};

30
src/message.zig Normal file
View File

@ -0,0 +1,30 @@
const std = @import("std");
pub const nats_c = @cImport({
@cInclude("nats/nats.h");
});
pub const Message = opaque {
pub fn destroy(self: *Message) void {
nats_c.natsMsg_Destroy(@ptrCast(self));
}
pub fn getSubject(self: *Message) [:0]const u8 {
const subject = nats_c.natsMsg_GetSubject(@ptrCast(self)) orelse unreachable;
return std.mem.sliceTo(subject, 0);
}
pub fn getReply(self: *Message) ?[:0]const u8 {
const reply = nats_c.natsMsg_GetReply(@ptrCast(self)) orelse return null;
return std.mem.sliceTo(reply, 0);
}
pub fn getData(self: *Message) ?[:0]const u8 {
const data = nats_c.natsMsg_GetData(@ptrCast(self)) orelse return null;
return data[0..self.getDataLength() :0];
}
pub fn getDataLength(self: *Message) usize {
return @intCast(nats_c.natsMsg_GetDataLength(@ptrCast(self)));
}
};

View File

@ -18,85 +18,140 @@ pub const nats_c = @cImport({
@cInclude("nats/nats.h");
});
fn onMessage(
conn: ?*nats_c.natsConnection,
sub: ?*nats_c.natsSubscription,
message: ?*nats_c.natsMsg,
userdata: ?*anyopaque,
) callconv(.C) void {
_ = sub;
defer nats_c.natsMsg_Destroy(message);
const err_ = @import("./error.zig");
const con_ = @import("./connection.zig");
const sub_ = @import("./subscription.zig");
const msg_ = @import("./message.zig");
const msgData = nats_c.natsMsg_GetData(message)[0..@intCast(nats_c.natsMsg_GetDataLength(message))];
std.debug.print("Received message: {s} - {s}\n", .{ nats_c.natsMsg_GetSubject(message), msgData });
pub const default_server_url = con_.default_server_url;
pub const Connection = con_.Connection;
pub const Subscription = sub_.Subscription;
pub const Message = msg_.Message;
if (@as(?[*]const u8, nats_c.natsMsg_GetReply(message))) |reply| {
_ = nats_c.natsConnection_PublishString(conn, reply, "salutations");
const Status = err_.Status;
pub const Error = err_.Error;
fn onMessage(userdata: *bool, connection: *Connection, subscription: *Subscription, message: *Message) void {
_ = subscription;
std.debug.print("Subject \"{s}\" received message: \"{s}\"\n", .{
message.getSubject(),
message.getData() orelse "[null]",
});
if (message.getReply()) |reply| {
connection.publishString(reply, "salutations") catch @panic("HELP");
}
if (@as(?*bool, @ptrCast(userdata))) |signal| {
signal.* = true;
}
userdata.* = true;
}
pub fn main() void {
var conn: ?*nats_c.natsConnection = null;
defer nats_c.natsConnection_Destroy(conn);
pub fn main() !void {
const connection = try Connection.connectTo(default_server_url);
defer connection.destroy();
if (nats_c.natsConnection_ConnectTo(&conn, nats_c.NATS_DEFAULT_URL) != nats_c.NATS_OK) {
std.debug.print("oh no {s}\n", .{nats_c.NATS_DEFAULT_URL});
return;
}
var sub: ?*nats_c.natsSubscription = null;
defer nats_c.natsSubscription_Destroy(sub);
var done = false;
if (nats_c.natsConnection_Subscribe(&sub, conn, "channel", onMessage, &done) != nats_c.NATS_OK) {
std.debug.print("whops\n", .{});
return;
}
const subscription = try connection.subscribe(bool, "channel", onMessage, &done);
defer subscription.destroy();
while (!done) {
var reply: ?*nats_c.natsMsg = null;
defer nats_c.natsMsg_Destroy(reply);
const reply = try connection.requestString("channel", "greetings", 1000);
defer reply.destroy();
if (nats_c.natsConnection_RequestString(&reply, conn, "channel", "whatsup", 1000) != nats_c.NATS_OK) {
std.debug.print("geez\n", .{});
return;
} else if (reply) |message| {
const msgData = nats_c.natsMsg_GetData(message)[0..@intCast(nats_c.natsMsg_GetDataLength(message))];
std.debug.print("Got reply: {s}\n", .{msgData});
}
std.debug.print("Reply \"{s}\" got message: {s}\n", .{
reply.getSubject(),
reply.getData() orelse "[null]",
});
}
}
// NATS_EXTERN natsStatus nats_Open(int64_t lockSpinCount);
// NATS_EXTERN const char* nats_GetVersion(void);
// NATS_EXTERN uint32_t nats_GetVersionNumber(void);
pub fn getVersion() [:0]const u8 {
const verString = nats_c.nats_GetVersion();
return std.mem.sliceTo(verString, 0);
}
// #define nats_CheckCompatibility() nats_CheckCompatibilityImpl(NATS_VERSION_REQUIRED_NUMBER, NATS_VERSION_NUMBER, NATS_VERSION_STRING)
// NATS_EXTERN bool nats_CheckCompatibilityImpl(uint32_t reqVerNumber, uint32_t verNumber, const char *verString);
pub fn getVersionNumber() u32 {
return nats_c.nats_GetVersionNumber();
}
// NATS_EXTERN int64_t nats_Now(void);
// NATS_EXTERN int64_t nats_NowInNanoSeconds(void);
// NATS_EXTERN void nats_Sleep(int64_t sleepTime);
// NATS_EXTERN const char* nats_GetLastError(natsStatus *status);
// NATS_EXTERN natsStatus nats_GetLastErrorStack(char *buffer, size_t bufLen);
// NATS_EXTERN void nats_PrintLastErrorStack(FILE *file);
// NATS_EXTERN natsStatus nats_SetMessageDeliveryPoolSize(int max);
// NATS_EXTERN void nats_ReleaseThreadMemory(void);
pub fn checkCompatibility() bool {
return nats_c.nats_CheckCompatibilityImpl(
nats_c.NATS_VERSION_REQUIRED_NUMBER,
nats_c.NATS_VERSION_NUMBER,
nats_c.NATS_VERSION_STRING,
);
}
pub fn now() i64 {
return nats_c.nats_Now();
}
pub fn nowInNanoSeconds() i64 {
return nats_c.nats_NowInNanoSeconds();
}
pub fn sleep(sleep_time: i64) void {
return nats_c.nats_Sleep(sleep_time);
}
pub fn setMessageDeliveryPoolSize(max: c_int) Error!void {
const status = Status.fromInt(nats_c.nats_SetMessageDeliveryPoolSize(max));
return status.raise();
}
pub fn releaseThreadMemory() void {
return nats_c.nats_ReleaseThreadMemory();
}
pub fn init(lock_spin_count: i64) Error!void {
const status = Status.fromInt(nats_c.nats_Open(lock_spin_count));
return status.raise();
}
pub fn deinit() void {
return nats_c.nats_Close();
}
pub fn deinitWait(timeout: i64) Error!void {
const status = Status.fromInt(nats_c.natsCloseAndWait(timeout));
return status.raise();
}
pub const StatsCounts = struct {
messages_in: u64 = 0,
bytes_in: u64 = 0,
messages_out: u64 = 0,
bytes_out: u64 = 0,
reconnects: u64 = 0,
};
pub const Statistics = opaque {
pub fn create() Error!*Statistics {
var stats: *Statistics = undefined;
const status = Status.fromInt(nats_c.natsStatistics_Create(@ptrCast(&stats)));
return status.toError() orelse stats;
}
pub fn deinit(self: *Statistics) void {
nats_c.natsStatistics_Destroy(@ptrCast(self));
}
pub fn getCounts(self: *Statistics) Error!StatsCounts {
var counts: StatsCounts = .{};
const status = Status.fromInt(nats_c.natsStatistics_GetCounts)(
self,
&counts.messages_in,
&counts.bytes_in,
&counts.messages_out,
&counts.bytes_out,
&counts.reconnects,
);
return status.toError() orelse counts;
}
};
// NATS_EXTERN natsStatus nats_Sign(const char *encodedSeed, const char *input, unsigned char **signature, int *signatureLength);
// NATS_EXTERN void nats_Close(void);
// NATS_EXTERN natsStatus nats_CloseAndWait(int64_t timeout);
// NATS_EXTERN const char* natsStatus_GetText(natsStatus s);
// NATS_EXTERN natsStatus natsStatistics_Create(natsStatistics **newStats);
// NATS_EXTERN natsStatus natsStatistics_GetCounts(const natsStatistics *stats, uint64_t *inMsgs, uint64_t *inBytes, uint64_t *outMsgs, uint64_t *outBytes, uint64_t *reconnects);
// NATS_EXTERN void natsStatistics_Destroy(natsStatistics *stats);
// NATS_EXTERN natsStatus natsOptions_Create(natsOptions **newOpts);
// NATS_EXTERN natsStatus natsOptions_SetURL(natsOptions *opts, const char *url);
// NATS_EXTERN natsStatus natsOptions_SetServers(natsOptions *opts, const char** servers, int serversCount);
@ -177,47 +232,6 @@ pub fn main() void {
// NATS_EXTERN int stanMsg_GetDataLength(const stanMsg *msg);
// NATS_EXTERN void stanMsg_Destroy(stanMsg *msg);
// NATS_EXTERN natsStatus natsConnection_Connect(natsConnection **nc, natsOptions *options);
// NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc);
// NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_ConnectTo(natsConnection **nc, const char *urls);
// NATS_EXTERN bool natsConnection_IsClosed(natsConnection *nc);
// NATS_EXTERN bool natsConnection_IsReconnecting(natsConnection *nc);
// NATS_EXTERN natsConnStatus natsConnection_Status(natsConnection *nc);
// NATS_EXTERN int natsConnection_Buffered(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_Flush(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_FlushTimeout(natsConnection *nc, int64_t timeout);
// NATS_EXTERN int64_t natsConnection_GetMaxPayload(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_GetStats(natsConnection *nc, natsStatistics *stats);
// NATS_EXTERN natsStatus natsConnection_GetConnectedUrl(natsConnection *nc, char *buffer, size_t bufferSize);
// NATS_EXTERN natsStatus natsConnection_GetConnectedServerId(natsConnection *nc, char *buffer, size_t bufferSize);
// NATS_EXTERN natsStatus natsConnection_GetServers(natsConnection *nc, char ***servers, int *count);
// NATS_EXTERN natsStatus natsConnection_GetDiscoveredServers(natsConnection *nc, char ***servers, int *count);
// NATS_EXTERN natsStatus natsConnection_GetLastError(natsConnection *nc, const char **lastError);
// NATS_EXTERN natsStatus natsConnection_GetClientID(natsConnection *nc, uint64_t *cid);
// NATS_EXTERN natsStatus natsConnection_Drain(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_DrainTimeout(natsConnection *nc, int64_t timeout);
// NATS_EXTERN natsStatus natsConnection_Sign(natsConnection *nc, const unsigned char *message, int messageLen, unsigned char sig[64]);
// NATS_EXTERN natsStatus natsConnection_GetClientIP(natsConnection *nc, char **ip);
// NATS_EXTERN natsStatus natsConnection_GetRTT(natsConnection *nc, int64_t *rtt);
// NATS_EXTERN natsStatus natsConnection_HasHeaderSupport(natsConnection *nc);
// NATS_EXTERN void natsConnection_Close(natsConnection *nc);
// NATS_EXTERN void natsConnection_Destroy(natsConnection *nc);
// NATS_EXTERN natsStatus natsConnection_Publish(natsConnection *nc, const char *subj, const void *data, int dataLen);
// NATS_EXTERN natsStatus natsConnection_PublishString(natsConnection *nc, const char *subj, const char *str);
// NATS_EXTERN natsStatus natsConnection_PublishMsg(natsConnection *nc, natsMsg *msg);
// NATS_EXTERN natsStatus natsConnection_PublishRequest(natsConnection *nc, const char *subj, const char *reply, const void *data, int dataLen);
// NATS_EXTERN natsStatus natsConnection_PublishRequestString(natsConnection *nc, const char *subj, const char *reply, const char *str);
// NATS_EXTERN natsStatus natsConnection_Request(natsMsg **replyMsg, natsConnection *nc, const char *subj, const void *data, int dataLen, int64_t timeout);
// NATS_EXTERN natsStatus natsConnection_RequestString(natsMsg **replyMsg, natsConnection *nc, const char *subj, const char *str, int64_t timeout);
// NATS_EXTERN natsStatus natsConnection_RequestMsg(natsMsg **replyMsg, natsConnection *nc,natsMsg *requestMsg, int64_t timeout);
// NATS_EXTERN natsStatus natsConnection_Subscribe(natsSubscription **sub, natsConnection *nc, const char *subject, natsMsgHandler cb, void *cbClosure);
// NATS_EXTERN natsStatus natsConnection_SubscribeTimeout(natsSubscription **sub, natsConnection *nc, const char *subject, int64_t timeout, natsMsgHandler cb, void *cbClosure);
// NATS_EXTERN natsStatus natsConnection_SubscribeSync(natsSubscription **sub, natsConnection *nc, const char *subject);
// NATS_EXTERN natsStatus natsConnection_QueueSubscribe(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup, natsMsgHandler cb, void *cbClosure);
// NATS_EXTERN natsStatus natsConnection_QueueSubscribeTimeout(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup, int64_t timeout, natsMsgHandler cb, void *cbClosure);
// NATS_EXTERN natsStatus natsConnection_QueueSubscribeSync(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup);
// NATS_EXTERN natsStatus natsSubscription_NoDeliveryDelay(natsSubscription *sub);
// NATS_EXTERN natsStatus natsSubscription_NextMsg(natsMsg **nextMsg, natsSubscription *sub, int64_t timeout);
// NATS_EXTERN natsStatus natsSubscription_Unsubscribe(natsSubscription *sub);

84
src/subscription.zig Normal file
View File

@ -0,0 +1,84 @@
const std = @import("std");
pub const nats_c = @cImport({
@cInclude("nats/nats.h");
});
const Connection = @import("./connection.zig").Connection;
const Message = @import("./message.zig").Message;
const err_ = @import("./error.zig");
const Error = err_.Error;
const Status = err_.Status;
const SubCallback = fn (?*nats_c.natsConnection, ?*nats_c.natsSubscription, ?*nats_c.natsMsg, ?*anyopaque) callconv(.C) void;
pub fn ThunkCallback(comptime T: type) type {
return fn (*T, *Connection, *Subscription, *Message) void;
}
pub fn messageThunk(comptime T: type, comptime callback: *const ThunkCallback(T)) *const SubCallback {
return struct {
pub fn thunk(
conn: ?*nats_c.natsConnection,
sub: ?*nats_c.natsSubscription,
msg: ?*nats_c.natsMsg,
userdata: ?*anyopaque,
) callconv(.C) void {
const message: *Message = if (msg) |m| @ptrCast(m) else unreachable;
defer message.destroy();
const connection: *Connection = if (conn) |c| @ptrCast(c) else unreachable;
const subscription: *Subscription = if (sub) |s| @ptrCast(s) else unreachable;
const data: *T = if (userdata) |u| @ptrCast(u) else unreachable;
callback(data, connection, subscription, message);
}
}.thunk;
}
pub const Subscription = opaque {
pub fn destroy(self: *Subscription) void {
nats_c.natsSubscription_Destroy(@ptrCast(self));
}
};
// pub fn Subscription(T: type) type {
// return struct {
// const Self = @This();
// _internal: *nats_c.natsSubscription,
// userdata: T,
// function: MessageCallback(T),
// pub fn create(
// allocator: std.mem.Allocator,
// userdata: T,
// callback: MessageCallback(T),
// ) AllocError!*Self {
// const self: *Self = try std.mem.Allocator.create(Self);
// self.userdata = userdata;
// self.function = callback;
// return self;
// }
// pub fn wrap(
// self: *Self,
// internal: *nats_c.natsSubscription,
// ) void {
// // self._internal =
// }
// fn thunk(
// conn: ?*nats_c.natsConnection,
// sub: ?*nats_c.natsSubscription,
// msg: ?*nats_c.natsMsg,
// userdata: ?*const anyopaque,
// ) callconv(.C) void {
// self.function(self.userdata, connection, subscription, message);
// }
// };
// }
pub fn MessageCallback(comptime T: type) type {
return *const fn (userdata: T, connection: *Connection, subscription: *Subscription, message: *Message) void;
}