subrepo: subdir: "deps/nats.c" merged: "66cec7f" upstream: origin: "https://github.com/nats-io/nats.c.git" branch: "v3.6.1" commit: "66cec7f" git-subrepo: version: "0.4.6" commit: "b8b46501e"
202 lines
6.1 KiB
C
202 lines
6.1 KiB
C
// Copyright 2021 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
#include "examples.h"
|
|
|
|
static const char *usage = ""\
|
|
"-stream stream name (default is 'foo')\n" \
|
|
"-txt text to send (default is 'hello')\n" \
|
|
"-count number of messages to send\n" \
|
|
"-sync publish synchronously (default is async)\n";
|
|
|
|
static void
|
|
_jsPubErr(jsCtx *js, jsPubAckErr *pae, void *closure)
|
|
{
|
|
int *errors = (int*) closure;
|
|
|
|
printf("Error: %u - Code: %u - Text: %s\n", pae->Err, pae->ErrCode, pae->ErrText);
|
|
printf("Original message: %.*s\n", natsMsg_GetDataLength(pae->Msg), natsMsg_GetData(pae->Msg));
|
|
|
|
*errors = (*errors + 1);
|
|
|
|
// If we wanted to resend the original message, we would do something like that:
|
|
//
|
|
// js_PublishMsgAsync(js, &(pae->Msg), NULL);
|
|
//
|
|
// Note that we use `&(pae->Msg)` so that the library set it to NULL if it takes
|
|
// ownership, and the library will not destroy the message when this callback returns.
|
|
|
|
// No need to destroy anything, everything is handled by the library.
|
|
}
|
|
|
|
int main(int argc, char **argv)
|
|
{
|
|
natsConnection *conn = NULL;
|
|
natsStatistics *stats = NULL;
|
|
natsOptions *opts = NULL;
|
|
jsCtx *js = NULL;
|
|
jsOptions jsOpts;
|
|
jsErrCode jerr = 0;
|
|
natsStatus s;
|
|
int dataLen=0;
|
|
volatile int errors = 0;
|
|
bool delStream = false;
|
|
|
|
opts = parseArgs(argc, argv, usage);
|
|
dataLen = (int) strlen(payload);
|
|
|
|
s = natsConnection_Connect(&conn, opts);
|
|
|
|
if (s == NATS_OK)
|
|
s = jsOptions_Init(&jsOpts);
|
|
|
|
if (s == NATS_OK)
|
|
{
|
|
if (async)
|
|
{
|
|
jsOpts.PublishAsync.ErrHandler = _jsPubErr;
|
|
jsOpts.PublishAsync.ErrHandlerClosure = (void*) &errors;
|
|
}
|
|
s = natsConnection_JetStream(&js, conn, &jsOpts);
|
|
}
|
|
|
|
if (s == NATS_OK)
|
|
{
|
|
jsStreamInfo *si = NULL;
|
|
|
|
// First check if the stream already exists.
|
|
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
|
|
if (s == NATS_NOT_FOUND)
|
|
{
|
|
jsStreamConfig cfg;
|
|
|
|
// Since we are the one creating this stream, we can delete at the end.
|
|
delStream = true;
|
|
|
|
// Initialize the configuration structure.
|
|
jsStreamConfig_Init(&cfg);
|
|
cfg.Name = stream;
|
|
// Set the subject
|
|
cfg.Subjects = (const char*[1]){subj};
|
|
cfg.SubjectsLen = 1;
|
|
// Make it a memory stream.
|
|
cfg.Storage = js_MemoryStorage;
|
|
// Add the stream,
|
|
s = js_AddStream(&si, js, &cfg, NULL, &jerr);
|
|
}
|
|
if (s == NATS_OK)
|
|
{
|
|
printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
|
|
si->Config->Name, si->State.Msgs, si->State.Bytes);
|
|
|
|
// Need to destroy the returned stream object.
|
|
jsStreamInfo_Destroy(si);
|
|
}
|
|
}
|
|
|
|
if (s == NATS_OK)
|
|
s = natsStatistics_Create(&stats);
|
|
|
|
if (s == NATS_OK)
|
|
{
|
|
printf("\nSending %" PRId64 " messages to subject '%s'\n", total, stream);
|
|
start = nats_Now();
|
|
}
|
|
|
|
for (count = 0; (s == NATS_OK) && (count < total); count++)
|
|
{
|
|
if (async)
|
|
s = js_PublishAsync(js, subj, (const void*) payload, dataLen, NULL);
|
|
else
|
|
{
|
|
jsPubAck *pa = NULL;
|
|
|
|
s = js_Publish(&pa, js, subj, (const void*) payload, dataLen, NULL, &jerr);
|
|
if (s == NATS_OK)
|
|
{
|
|
if (pa->Duplicate)
|
|
printf("Got a duplicate message! Sequence=%" PRIu64 "\n", pa->Sequence);
|
|
|
|
jsPubAck_Destroy(pa);
|
|
}
|
|
}
|
|
}
|
|
|
|
if ((s == NATS_OK) && async)
|
|
{
|
|
jsPubOptions jsPubOpts;
|
|
|
|
jsPubOptions_Init(&jsPubOpts);
|
|
// Let's set it to 30 seconds, if getting "Timeout" errors,
|
|
// this may need to be increased based on the number of messages
|
|
// being sent.
|
|
jsPubOpts.MaxWait = 30000;
|
|
s = js_PublishAsyncComplete(js, &jsPubOpts);
|
|
if (s == NATS_TIMEOUT)
|
|
{
|
|
// Let's get the list of pending messages. We could resend,
|
|
// etc, but for now, just destroy them.
|
|
natsMsgList list;
|
|
|
|
js_PublishAsyncGetPendingList(&list, js);
|
|
natsMsgList_Destroy(&list);
|
|
}
|
|
}
|
|
|
|
if (s == NATS_OK)
|
|
{
|
|
jsStreamInfo *si = NULL;
|
|
|
|
elapsed = nats_Now() - start;
|
|
printStats(STATS_OUT, conn, NULL, stats);
|
|
printPerf("Sent");
|
|
|
|
if (errors != 0)
|
|
printf("There were %d asynchronous errors\n", errors);
|
|
|
|
// Let's report some stats after the run
|
|
s = js_GetStreamInfo(&si, js, stream, NULL, &jerr);
|
|
if (s == NATS_OK)
|
|
{
|
|
printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n",
|
|
si->Config->Name, si->State.Msgs, si->State.Bytes);
|
|
|
|
jsStreamInfo_Destroy(si);
|
|
}
|
|
}
|
|
if (delStream && (js != NULL))
|
|
{
|
|
printf("\nDeleting stream %s: ", stream);
|
|
s = js_DeleteStream(js, stream, NULL, &jerr);
|
|
if (s == NATS_OK)
|
|
printf("OK!");
|
|
printf("\n");
|
|
}
|
|
if (s != NATS_OK)
|
|
{
|
|
printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr);
|
|
nats_PrintLastErrorStack(stderr);
|
|
}
|
|
|
|
// Destroy all our objects to avoid report of memory leak
|
|
jsCtx_Destroy(js);
|
|
natsStatistics_Destroy(stats);
|
|
natsConnection_Destroy(conn);
|
|
natsOptions_Destroy(opts);
|
|
|
|
// To silence reports of memory still in used with valgrind
|
|
nats_Close();
|
|
|
|
return 0;
|
|
}
|