connection: finish wrapping connection methods
I believe the standard API surface is now completely wrapped, ignoring whatever cool copy-paste errors or other mistakes I introduced which are currently undetectable due to lazy analysis and lack of test coverage. Cool.
This commit is contained in:
parent
39edf12d34
commit
9a4c80861c
@ -6,8 +6,8 @@ pub const nats_c = @cImport({
|
||||
|
||||
const sub_ = @import("./subscription.zig");
|
||||
const Subscription = sub_.Subscription;
|
||||
const SubscriptionThunkCallback = sub_.SubscriptionThunkCallback;
|
||||
const subscriptionMessageThunk = sub_.subscriptionMessageThunk;
|
||||
const SubscriptionCallbackSignature = sub_.SubscriptionCallbackSignature;
|
||||
const makeSubscriptionCallbackThunk = sub_.makeSubscriptionCallbackThunk;
|
||||
|
||||
const msg_ = @import("./message.zig");
|
||||
const Message = msg_.Message;
|
||||
@ -15,13 +15,38 @@ const Message = msg_.Message;
|
||||
const err_ = @import("./error.zig");
|
||||
const Error = err_.Error;
|
||||
const Status = err_.Status;
|
||||
const ErrorInfo = err_.ErrorInfo;
|
||||
|
||||
const sta_ = @import("./statistics.zig");
|
||||
const Statistics = sta_.Statistics;
|
||||
const StatsCounts = sta_.StatsCounts;
|
||||
|
||||
const thunk = @import("./thunk.zig");
|
||||
|
||||
pub const default_server_url: [:0]const u8 = nats_c.NATS_DEFAULT_URL;
|
||||
|
||||
pub const ConnectionStatus = enum(c_int) {
|
||||
disconnected = nats_c.DISCONNECTED,
|
||||
connecting = nats_c.CONNECTING,
|
||||
connected = nats_c.CONNECTED,
|
||||
closed = nats_c.CLOSED,
|
||||
reconnecting = nats_c.RECONNECTING,
|
||||
draining_subs = nats_c.DRAINING_SUBS,
|
||||
draining_pubs = nats_c.DRAINING_PUBS,
|
||||
_,
|
||||
|
||||
pub fn fromInt(int: c_uint) ConnectionStatus {
|
||||
return @enumFromInt(int);
|
||||
}
|
||||
};
|
||||
|
||||
pub const AddressPort = struct {
|
||||
address: [:0]u8,
|
||||
port: u16,
|
||||
};
|
||||
|
||||
pub const Connection = opaque {
|
||||
pub fn connect(options: *ConnectionOptions) Error!*Connection {
|
||||
pub fn create(options: *ConnectionOptions) Error!*Connection {
|
||||
var self: *Connection = undefined;
|
||||
const status = Status.fromInt(nats_c.natsConnection_Connect(@ptrCast(&self), @ptrCast(options)));
|
||||
return status.toError() orelse self;
|
||||
@ -44,98 +69,402 @@ pub const Connection = opaque {
|
||||
return nats_c.natsConnection_Destroy(@ptrCast(self));
|
||||
}
|
||||
|
||||
pub fn processReadEvent(self: *Connection) void {
|
||||
nats_c.natsConnection_ProcessReadEvent(@ptrCast(self));
|
||||
}
|
||||
|
||||
pub fn processWriteEvent(self: *Connection) void {
|
||||
nats_c.natsConnection_ProcessWriteEvent(@ptrCast(self));
|
||||
}
|
||||
|
||||
pub fn isClosed(self: *Connection) bool {
|
||||
return nats_c.natsConnection_IsClosed(@ptrCast(self));
|
||||
}
|
||||
|
||||
pub fn isReconnecting(self: *Connection) bool {
|
||||
return nats_c.natsConnection_IsReconnecting(@ptrCast(self));
|
||||
}
|
||||
|
||||
pub fn getStatus(self: *Connection) ConnectionStatus {
|
||||
return ConnectionStatus.fromInt(nats_c.natsConnection_Status(@ptrCast(self)));
|
||||
}
|
||||
|
||||
pub fn bytesBuffered(self: *Connection) c_int {
|
||||
return nats_c.natsConnection_Buffered(@ptrCast(self));
|
||||
}
|
||||
|
||||
pub fn flush(self: *Connection) Error!void {
|
||||
return Status.fromInt(nats_c.natsConnection_Flush(@ptrCast(self))).raise();
|
||||
}
|
||||
|
||||
pub fn flushTimeout(self: *Connection, timeout: i64) Error!void {
|
||||
return Status.fromInt(nats_c.natsConnection_FlushTimeout(@ptrCast(self), timeout)).raise();
|
||||
}
|
||||
|
||||
pub fn getMaxPayload(self: *Connection) c_int {
|
||||
return nats_c.natsConnection_GetMaxPayload(@ptrCast(self));
|
||||
}
|
||||
|
||||
pub fn getStats(self: *Connection) Error!StatsCounts {
|
||||
var stats = try Statistics.create();
|
||||
defer stats.destroy();
|
||||
|
||||
const status = Status.fromInt(
|
||||
nats_c.natsConnection_GetStats(@ptrCast(self), @ptrCast(stats)),
|
||||
);
|
||||
|
||||
return status.toError() orelse stats.getCounts();
|
||||
}
|
||||
|
||||
pub fn getConnectedUrl(self: *Connection, buffer: []u8) Error![:0]u8 {
|
||||
const status = Status.fromInt(
|
||||
nats_c.natsConnection_GetConnectedUrl(@ptrCast(self), buffer.ptr, buffer.len),
|
||||
);
|
||||
|
||||
// cast this to a c pointer so that sliceTo properly returns the sentinel
|
||||
// terminated type, which is guaranteed by the backing library.
|
||||
return status.toError() orelse std.mem.sliceTo(@as([*c]u8, buffer.ptr), 0);
|
||||
}
|
||||
|
||||
pub fn getConnectedServerId(self: *Connection, buffer: []u8) Error![:0]u8 {
|
||||
const status = Status.fromInt(
|
||||
nats_c.natsConnection_GetConnectedServerId(@ptrCast(self), buffer.ptr, buffer.len),
|
||||
);
|
||||
|
||||
// cast this to a c pointer so that sliceTo properly returns the sentinel
|
||||
// terminated type, which is guaranteed by the backing library.
|
||||
return status.toError() orelse std.mem.sliceTo(@as([*c]u8, buffer.ptr), 0);
|
||||
}
|
||||
|
||||
pub fn getServers(self: *Connection) Error![][*:0]u8 {
|
||||
var servers: [*][*:0]u8 = undefined;
|
||||
var count: c_int = 0;
|
||||
|
||||
const status = Status.fromInt(
|
||||
nats_c.natsConnection_GetServers(@ptrCast(self), &servers, &count),
|
||||
);
|
||||
|
||||
return status.toError() orelse servers[0..@intCast(count)];
|
||||
}
|
||||
|
||||
pub fn getDiscoveredServers(self: *Connection) Error![][*:0]u8 {
|
||||
var servers: [*][*:0]u8 = undefined;
|
||||
var count: c_int = 0;
|
||||
|
||||
const status = Status.fromInt(
|
||||
nats_c.natsConnection_GetDiscoveredServers(@ptrCast(self), &servers, &count),
|
||||
);
|
||||
|
||||
return status.toError() orelse servers[0..@intCast(count)];
|
||||
}
|
||||
|
||||
pub fn getLastError(self: *Connection) ErrorInfo {
|
||||
var desc: [*:0]const u8 = 0;
|
||||
const status = nats_c.natsConnection_GetLastError(@ptrCast(self), &desc);
|
||||
|
||||
return .{
|
||||
.code = Status.fromInt(status).toError(),
|
||||
.desc = std.mem.sliceTo(desc, 0),
|
||||
};
|
||||
}
|
||||
|
||||
pub fn getClientId(self: *Connection) Error!u64 {
|
||||
var id: u64 = 0;
|
||||
|
||||
const status = Status.fromInt(
|
||||
nats_c.natsConnection_GetClientID(@ptrCast(self), &id),
|
||||
);
|
||||
|
||||
return status.toError() orelse id;
|
||||
}
|
||||
|
||||
pub fn drain(self: *Connection) Error!void {
|
||||
return Status.fromInt(nats_c.natsConnection_Drain(@ptrCast(self))).raise();
|
||||
}
|
||||
|
||||
pub fn drainTimeout(self: *Connection, timeout: i64) Error!void {
|
||||
return Status.fromInt(nats_c.natsConnection_DrainTimeout(@ptrCast(self), timeout)).raise();
|
||||
}
|
||||
|
||||
pub fn sign(self: *Connection, message: []const u8) Error![64]u8 {
|
||||
var sig = [_]u8{0} ** 64;
|
||||
|
||||
const status = Status.fromInt(
|
||||
nats_c.natsConnection_Sign(@ptrCast(self), message.ptr, message.len, &sig),
|
||||
);
|
||||
|
||||
return status.toError() orelse sig;
|
||||
}
|
||||
|
||||
pub fn getClientIp(self: *Connection) Error![:0]u8 {
|
||||
var ip: [*c]u8 = null;
|
||||
|
||||
const status = Status.fromInt(
|
||||
nats_c.natsConnection_GetClientIP(@ptrCast(self), &ip),
|
||||
);
|
||||
|
||||
return status.toError() orelse std.mem.sliceTo(ip, 0);
|
||||
}
|
||||
|
||||
pub fn getLocalIpAndPort(self: *Connection) Error!AddressPort {
|
||||
var address: [*:0]u8 = undefined;
|
||||
var port: c_int = 0;
|
||||
|
||||
const status = Status.fromInt(
|
||||
nats_c.natsConnection_GetLocalIPAndPort(@ptrCast(self), &address, &port),
|
||||
);
|
||||
|
||||
return status.toError() orelse .{
|
||||
.address = std.mem.sliceTo(address, 0),
|
||||
.port = @intCast(port),
|
||||
};
|
||||
}
|
||||
|
||||
pub fn getRtt(self: *Connection) Error!i64 {
|
||||
var rtt: i64 = 0;
|
||||
|
||||
const status = Status.fromInt(
|
||||
nats_c.natsConnection_GetRTT(@ptrCast(self), &rtt),
|
||||
);
|
||||
|
||||
return status.toError() orelse rtt;
|
||||
}
|
||||
|
||||
pub fn hasHeaderSupport(self: *Connection) bool {
|
||||
const status = Status.fromInt(
|
||||
nats_c.natsConnection_HasHeaderSupport(@ptrCast(self)),
|
||||
);
|
||||
|
||||
return status == .okay;
|
||||
}
|
||||
|
||||
pub fn publish(self: *Connection, subject: [:0]const u8, message: []const u8) Error!void {
|
||||
return Status.fromInt(
|
||||
nats_c.natsConnection_Publish(@ptrCast(self), subject, message.ptr, @intCast(message.len)),
|
||||
).raise();
|
||||
}
|
||||
|
||||
pub fn publishString(
|
||||
self: *Connection,
|
||||
subject: [:0]const u8,
|
||||
message: [:0]const u8,
|
||||
) Error!void {
|
||||
const status = Status.fromInt(nats_c.natsConnection_PublishString(
|
||||
return Status.fromInt(
|
||||
nats_c.natsConnection_PublishString(@ptrCast(self), subject.ptr, message.ptr),
|
||||
).raise();
|
||||
}
|
||||
|
||||
pub fn publishMessage(self: *Connection, message: *Message) Error!void {
|
||||
return Status.fromInt(
|
||||
nats_c.natsConnection_PublishMsg(@ptrCast(self), @ptrCast(message)),
|
||||
).raise();
|
||||
}
|
||||
|
||||
pub fn publishRequest(
|
||||
self: *Connection,
|
||||
subject: [:0]const u8,
|
||||
reply: [:0]const u8,
|
||||
message: []const u8,
|
||||
) Error!void {
|
||||
return Status.fromInt(
|
||||
nats_c.natsConnection_PublishRequest(
|
||||
@ptrCast(self),
|
||||
subject,
|
||||
message,
|
||||
subject.ptr,
|
||||
reply.ptr,
|
||||
message.ptr,
|
||||
@intCast(message.len),
|
||||
),
|
||||
).raise();
|
||||
}
|
||||
|
||||
pub fn publishRequestString(
|
||||
self: *Connection,
|
||||
subject: [:0]const u8,
|
||||
reply: [:0]const u8,
|
||||
message: [:0]const u8,
|
||||
) Error!void {
|
||||
return Status.fromInt(
|
||||
nats_c.natsConnection_PublishRequestString(
|
||||
@ptrCast(self),
|
||||
subject.ptr,
|
||||
reply.ptr,
|
||||
message.ptr,
|
||||
),
|
||||
).raise();
|
||||
}
|
||||
|
||||
pub fn request(
|
||||
self: *Connection,
|
||||
subject: [:0]const u8,
|
||||
req: []const u8,
|
||||
timeout: i64,
|
||||
) Error!*Message {
|
||||
var response: *Message = undefined;
|
||||
|
||||
const status = Status.fromInt(nats_c.natsConnection_Request(
|
||||
@ptrCast(&response),
|
||||
@ptrCast(self),
|
||||
subject.ptr,
|
||||
req.ptr,
|
||||
@intCast(req.len),
|
||||
timeout,
|
||||
));
|
||||
return status.raise();
|
||||
|
||||
return status.toError() orelse response;
|
||||
}
|
||||
|
||||
pub fn requestString(
|
||||
self: *Connection,
|
||||
subject: [:0]const u8,
|
||||
request: [:0]const u8,
|
||||
req: [:0]const u8,
|
||||
timeout: i64,
|
||||
) Error!*Message {
|
||||
var msg: *Message = undefined;
|
||||
var response: *Message = undefined;
|
||||
|
||||
const status = Status.fromInt(nats_c.natsConnection_RequestString(
|
||||
@ptrCast(&msg),
|
||||
@ptrCast(&response),
|
||||
@ptrCast(self),
|
||||
subject,
|
||||
request,
|
||||
subject.ptr,
|
||||
req.ptr,
|
||||
timeout,
|
||||
));
|
||||
return status.toError() orelse msg;
|
||||
|
||||
return status.toError() orelse response;
|
||||
}
|
||||
|
||||
pub fn requestMessage(
|
||||
self: *Connection,
|
||||
req: *Message,
|
||||
timeout: i64,
|
||||
) Error!*Message {
|
||||
var response: *Message = undefined;
|
||||
|
||||
const status = Status.fromInt(nats_c.natsConnection_RequestMsg(
|
||||
@ptrCast(&response),
|
||||
@ptrCast(self),
|
||||
@ptrCast(req),
|
||||
timeout,
|
||||
));
|
||||
|
||||
return status.toError() orelse response;
|
||||
}
|
||||
|
||||
pub fn subscribe(
|
||||
self: *Connection,
|
||||
comptime T: type,
|
||||
subject: [:0]const u8,
|
||||
callback: SubscriptionThunkCallback(T),
|
||||
callback: SubscriptionCallbackSignature(T),
|
||||
userdata: *T,
|
||||
) Error!*Subscription {
|
||||
var sub: *Subscription = undefined;
|
||||
const status = Status.fromInt(nats_c.natsConnection_Subscribe(
|
||||
@ptrCast(&sub),
|
||||
@ptrCast(self),
|
||||
subject,
|
||||
subscriptionMessageThunk(T, callback),
|
||||
subject.ptr,
|
||||
makeSubscriptionCallbackThunk(T, callback),
|
||||
userdata,
|
||||
));
|
||||
return status.toError() orelse sub;
|
||||
}
|
||||
|
||||
pub fn subscribeTimeout(
|
||||
self: *Connection,
|
||||
comptime T: type,
|
||||
subject: [:0]const u8,
|
||||
timeout: i64,
|
||||
callback: SubscriptionCallbackSignature(T),
|
||||
userdata: *T,
|
||||
) Error!*Subscription {
|
||||
var sub: *Subscription = undefined;
|
||||
|
||||
const status = Status.fromInt(nats_c.natsConnection_SubscribeTimeout(
|
||||
@ptrCast(&sub),
|
||||
@ptrCast(self),
|
||||
subject.ptr,
|
||||
timeout,
|
||||
makeSubscriptionCallbackThunk(T, callback),
|
||||
userdata,
|
||||
));
|
||||
|
||||
return status.toError() orelse sub;
|
||||
}
|
||||
|
||||
pub fn subscribeSync(self: *Connection, subject: [:0]const u8) Error!*Subscription {
|
||||
var sub: *Subscription = undefined;
|
||||
|
||||
const status = Status.fromInt(nats_c.natsConnection_SubscribeSync(
|
||||
@ptrCast(&sub),
|
||||
@ptrCast(self),
|
||||
subject.ptr,
|
||||
));
|
||||
|
||||
return status.toError() orelse sub;
|
||||
}
|
||||
|
||||
pub fn queueSubscribe(
|
||||
self: *Connection,
|
||||
comptime T: type,
|
||||
subject: [:0]const u8,
|
||||
queue_group: [:0]const u8,
|
||||
callback: SubscriptionCallbackSignature(T),
|
||||
userdata: *T,
|
||||
) Error!*Subscription {
|
||||
var sub: *Subscription = undefined;
|
||||
|
||||
const status = Status.fromInt(nats_c.natsConnection_QueueSubscribe(
|
||||
@ptrCast(&sub),
|
||||
@ptrCast(self),
|
||||
subject.ptr,
|
||||
queue_group.ptr,
|
||||
makeSubscriptionCallbackThunk(T, callback),
|
||||
userdata,
|
||||
));
|
||||
|
||||
return status.toError() orelse sub;
|
||||
}
|
||||
|
||||
pub fn queueSubscribeTimeout(
|
||||
self: *Connection,
|
||||
comptime T: type,
|
||||
subject: [:0]const u8,
|
||||
queue_group: [:0]const u8,
|
||||
timeout: i64,
|
||||
callback: SubscriptionCallbackSignature(T),
|
||||
userdata: *T,
|
||||
) Error!*Subscription {
|
||||
var sub: *Subscription = undefined;
|
||||
|
||||
const status = Status.fromInt(nats_c.natsConnection_QueueSubscribe(
|
||||
@ptrCast(&sub),
|
||||
@ptrCast(self),
|
||||
subject.ptr,
|
||||
queue_group.ptr,
|
||||
timeout,
|
||||
makeSubscriptionCallbackThunk(T, callback),
|
||||
userdata,
|
||||
));
|
||||
|
||||
return status.toError() orelse sub;
|
||||
}
|
||||
|
||||
pub fn queueSubscribeSync(
|
||||
self: *Connection,
|
||||
subject: [:0]const u8,
|
||||
queue_group: [:0]const u8,
|
||||
) Error!*Subscription {
|
||||
var sub: *Subscription = undefined;
|
||||
|
||||
const status = Status.fromInt(nats_c.natsConnection_SubscribeSync(
|
||||
@ptrCast(&sub),
|
||||
@ptrCast(self),
|
||||
subject.ptr,
|
||||
queue_group.ptr,
|
||||
));
|
||||
|
||||
return status.toError() orelse sub;
|
||||
}
|
||||
};
|
||||
|
||||
// NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc);
|
||||
// NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc);
|
||||
|
||||
// NATS_EXTERN bool natsConnection_IsClosed(natsConnection *nc);
|
||||
// NATS_EXTERN bool natsConnection_IsReconnecting(natsConnection *nc);
|
||||
|
||||
// NATS_EXTERN natsConnStatus natsConnection_Status(natsConnection *nc);
|
||||
// NATS_EXTERN int natsConnection_Buffered(natsConnection *nc);
|
||||
|
||||
// NATS_EXTERN natsStatus natsConnection_Flush(natsConnection *nc);
|
||||
// NATS_EXTERN natsStatus natsConnection_FlushTimeout(natsConnection *nc, int64_t timeout);
|
||||
|
||||
// NATS_EXTERN int64_t natsConnection_GetMaxPayload(natsConnection *nc);
|
||||
// NATS_EXTERN natsStatus natsConnection_GetStats(natsConnection *nc, natsStatistics *stats);
|
||||
// NATS_EXTERN natsStatus natsConnection_GetConnectedUrl(natsConnection *nc, char *buffer, size_t bufferSize);
|
||||
// NATS_EXTERN natsStatus natsConnection_GetConnectedServerId(natsConnection *nc, char *buffer, size_t bufferSize);
|
||||
// NATS_EXTERN natsStatus natsConnection_GetServers(natsConnection *nc, char ***servers, int *count);
|
||||
// NATS_EXTERN natsStatus natsConnection_GetDiscoveredServers(natsConnection *nc, char ***servers, int *count);
|
||||
// NATS_EXTERN natsStatus natsConnection_GetLastError(natsConnection *nc, const char **lastError);
|
||||
// NATS_EXTERN natsStatus natsConnection_GetClientID(natsConnection *nc, uint64_t *cid);
|
||||
// NATS_EXTERN natsStatus natsConnection_Drain(natsConnection *nc);
|
||||
// NATS_EXTERN natsStatus natsConnection_DrainTimeout(natsConnection *nc, int64_t timeout);
|
||||
// NATS_EXTERN natsStatus natsConnection_Sign(natsConnection *nc, const unsigned char *message, int messageLen, unsigned char sig[64]);
|
||||
// NATS_EXTERN natsStatus natsConnection_GetClientIP(natsConnection *nc, char **ip);
|
||||
// NATS_EXTERN natsStatus natsConnection_GetRTT(natsConnection *nc, int64_t *rtt);
|
||||
// NATS_EXTERN natsStatus natsConnection_HasHeaderSupport(natsConnection *nc);
|
||||
|
||||
// NATS_EXTERN natsStatus natsConnection_Publish(natsConnection *nc, const char *subj, const void *data, int dataLen);
|
||||
// NATS_EXTERN natsStatus natsConnection_PublishString(natsConnection *nc, const char *subj, const char *str);
|
||||
// NATS_EXTERN natsStatus natsConnection_PublishMsg(natsConnection *nc, natsMsg *msg);
|
||||
// NATS_EXTERN natsStatus natsConnection_PublishRequest(natsConnection *nc, const char *subj, const char *reply, const void *data, int dataLen);
|
||||
// NATS_EXTERN natsStatus natsConnection_PublishRequestString(natsConnection *nc, const char *subj, const char *reply, const char *str);
|
||||
// NATS_EXTERN natsStatus natsConnection_Request(natsMsg **replyMsg, natsConnection *nc, const char *subj, const void *data, int dataLen, int64_t timeout);
|
||||
// NATS_EXTERN natsStatus natsConnection_RequestString(natsMsg **replyMsg, natsConnection *nc, const char *subj, const char *str, int64_t timeout);
|
||||
// NATS_EXTERN natsStatus natsConnection_RequestMsg(natsMsg **replyMsg, natsConnection *nc,natsMsg *requestMsg, int64_t timeout);
|
||||
// NATS_EXTERN natsStatus natsConnection_Subscribe(natsSubscription **sub, natsConnection *nc, const char *subject, natsMsgHandler cb, void *cbClosure);
|
||||
// NATS_EXTERN natsStatus natsConnection_SubscribeTimeout(natsSubscription **sub, natsConnection *nc, const char *subject, int64_t timeout, natsMsgHandler cb, void *cbClosure);
|
||||
// NATS_EXTERN natsStatus natsConnection_SubscribeSync(natsSubscription **sub, natsConnection *nc, const char *subject);
|
||||
// NATS_EXTERN natsStatus natsConnection_QueueSubscribe(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup, natsMsgHandler cb, void *cbClosure);
|
||||
// NATS_EXTERN natsStatus natsConnection_QueueSubscribeTimeout(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup, int64_t timeout, natsMsgHandler cb, void *cbClosure);
|
||||
// NATS_EXTERN natsStatus natsConnection_QueueSubscribeSync(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup);
|
||||
|
||||
//
|
||||
const ConnectionOptions = opaque {
|
||||
pub fn create() Error!*ConnectionOptions {
|
||||
var self: *ConnectionOptions = undefined;
|
||||
|
40
src/nats.zig
40
src/nats.zig
@ -22,12 +22,19 @@ const err_ = @import("./error.zig");
|
||||
const con_ = @import("./connection.zig");
|
||||
const sub_ = @import("./subscription.zig");
|
||||
const msg_ = @import("./message.zig");
|
||||
const sta_ = @import("./statistics.zig");
|
||||
|
||||
pub const default_server_url = con_.default_server_url;
|
||||
pub const Connection = con_.Connection;
|
||||
pub const ConnectionOptions = con_.ConnectionOptions;
|
||||
|
||||
pub const Subscription = sub_.Subscription;
|
||||
|
||||
pub const Message = msg_.Message;
|
||||
|
||||
pub const Statistics = sta_.Statistics;
|
||||
pub const StatsCounts = sta_.StatsCounts;
|
||||
|
||||
const Status = err_.Status;
|
||||
pub const Error = err_.Error;
|
||||
|
||||
@ -129,39 +136,6 @@ pub fn deinitWait(timeout: i64) Error!void {
|
||||
return status.raise();
|
||||
}
|
||||
|
||||
pub const StatsCounts = struct {
|
||||
messages_in: u64 = 0,
|
||||
bytes_in: u64 = 0,
|
||||
messages_out: u64 = 0,
|
||||
bytes_out: u64 = 0,
|
||||
reconnects: u64 = 0,
|
||||
};
|
||||
|
||||
pub const Statistics = opaque {
|
||||
pub fn create() Error!*Statistics {
|
||||
var stats: *Statistics = undefined;
|
||||
const status = Status.fromInt(nats_c.natsStatistics_Create(@ptrCast(&stats)));
|
||||
return status.toError() orelse stats;
|
||||
}
|
||||
|
||||
pub fn deinit(self: *Statistics) void {
|
||||
nats_c.natsStatistics_Destroy(@ptrCast(self));
|
||||
}
|
||||
|
||||
pub fn getCounts(self: *Statistics) Error!StatsCounts {
|
||||
var counts: StatsCounts = .{};
|
||||
const status = Status.fromInt(nats_c.natsStatistics_GetCounts)(
|
||||
self,
|
||||
&counts.messages_in,
|
||||
&counts.bytes_in,
|
||||
&counts.messages_out,
|
||||
&counts.bytes_out,
|
||||
&counts.reconnects,
|
||||
);
|
||||
return status.toError() orelse counts;
|
||||
}
|
||||
};
|
||||
|
||||
// This appears to be a jetstream API, but these two endpoints are trivial, so, whoops.
|
||||
// I have no clue what this does, since there's basically no
|
||||
pub const Inbox = opaque {
|
||||
|
42
src/statistics.zig
Normal file
42
src/statistics.zig
Normal file
@ -0,0 +1,42 @@
|
||||
const std = @import("std");
|
||||
|
||||
const nats_c = @cImport({
|
||||
@cInclude("nats/nats.h");
|
||||
});
|
||||
|
||||
const err_ = @import("./error.zig");
|
||||
const Status = err_.Status;
|
||||
const Error = err_.Error;
|
||||
|
||||
pub const StatsCounts = struct {
|
||||
messages_in: u64 = 0,
|
||||
bytes_in: u64 = 0,
|
||||
messages_out: u64 = 0,
|
||||
bytes_out: u64 = 0,
|
||||
reconnects: u64 = 0,
|
||||
};
|
||||
|
||||
pub const Statistics = opaque {
|
||||
pub fn create() Error!*Statistics {
|
||||
var stats: *Statistics = undefined;
|
||||
const status = Status.fromInt(nats_c.natsStatistics_Create(@ptrCast(&stats)));
|
||||
return status.toError() orelse stats;
|
||||
}
|
||||
|
||||
pub fn destroy(self: *Statistics) void {
|
||||
nats_c.natsStatistics_Destroy(@ptrCast(self));
|
||||
}
|
||||
|
||||
pub fn getCounts(self: *Statistics) Error!StatsCounts {
|
||||
var counts: StatsCounts = .{};
|
||||
const status = Status.fromInt(nats_c.natsStatistics_GetCounts)(
|
||||
self,
|
||||
&counts.messages_in,
|
||||
&counts.bytes_in,
|
||||
&counts.messages_out,
|
||||
&counts.bytes_out,
|
||||
&counts.reconnects,
|
||||
);
|
||||
return status.toError() orelse counts;
|
||||
}
|
||||
};
|
@ -169,21 +169,21 @@ pub const Subscription = opaque {
|
||||
}
|
||||
};
|
||||
|
||||
const BareSubscriptionCallback = fn (
|
||||
const SubscriptionCallback = fn (
|
||||
?*nats_c.natsConnection,
|
||||
?*nats_c.natsSubscription,
|
||||
?*nats_c.natsMsg,
|
||||
?*anyopaque,
|
||||
) callconv(.C) void;
|
||||
|
||||
pub fn SubscriptionThunkCallback(comptime T: type) type {
|
||||
pub fn SubscriptionCallbackSignature(comptime T: type) type {
|
||||
return fn (*T, *Connection, *Subscription, *Message) void;
|
||||
}
|
||||
|
||||
pub fn subscriptionMessageThunk(
|
||||
pub fn makeSubscriptionCallbackThunk(
|
||||
comptime T: type,
|
||||
comptime callback: *const SubscriptionThunkCallback(T),
|
||||
) *const BareSubscriptionCallback {
|
||||
comptime callback: *const SubscriptionCallbackSignature(T),
|
||||
) *const SubscriptionCallback {
|
||||
return struct {
|
||||
fn thunk(
|
||||
conn: ?*nats_c.natsConnection,
|
||||
|
Loading…
x
Reference in New Issue
Block a user