// Copyright 2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "examples.h" static const char *usage = ""\ "-stream stream name (default is 'foo')\n" \ "-txt text to send (default is 'hello')\n" \ "-count number of messages to send\n" \ "-sync publish synchronously (default is async)\n"; static void _jsPubErr(jsCtx *js, jsPubAckErr *pae, void *closure) { int *errors = (int*) closure; printf("Error: %u - Code: %u - Text: %s\n", pae->Err, pae->ErrCode, pae->ErrText); printf("Original message: %.*s\n", natsMsg_GetDataLength(pae->Msg), natsMsg_GetData(pae->Msg)); *errors = (*errors + 1); // If we wanted to resend the original message, we would do something like that: // // js_PublishMsgAsync(js, &(pae->Msg), NULL); // // Note that we use `&(pae->Msg)` so that the library set it to NULL if it takes // ownership, and the library will not destroy the message when this callback returns. // No need to destroy anything, everything is handled by the library. } int main(int argc, char **argv) { natsConnection *conn = NULL; natsStatistics *stats = NULL; natsOptions *opts = NULL; jsCtx *js = NULL; jsOptions jsOpts; jsErrCode jerr = 0; natsStatus s; int dataLen=0; volatile int errors = 0; bool delStream = false; opts = parseArgs(argc, argv, usage); dataLen = (int) strlen(payload); s = natsConnection_Connect(&conn, opts); if (s == NATS_OK) s = jsOptions_Init(&jsOpts); if (s == NATS_OK) { if (async) { jsOpts.PublishAsync.ErrHandler = _jsPubErr; jsOpts.PublishAsync.ErrHandlerClosure = (void*) &errors; } s = natsConnection_JetStream(&js, conn, &jsOpts); } if (s == NATS_OK) { jsStreamInfo *si = NULL; // First check if the stream already exists. s = js_GetStreamInfo(&si, js, stream, NULL, &jerr); if (s == NATS_NOT_FOUND) { jsStreamConfig cfg; // Since we are the one creating this stream, we can delete at the end. delStream = true; // Initialize the configuration structure. jsStreamConfig_Init(&cfg); cfg.Name = stream; // Set the subject cfg.Subjects = (const char*[1]){subj}; cfg.SubjectsLen = 1; // Make it a memory stream. cfg.Storage = js_MemoryStorage; // Add the stream, s = js_AddStream(&si, js, &cfg, NULL, &jerr); } if (s == NATS_OK) { printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n", si->Config->Name, si->State.Msgs, si->State.Bytes); // Need to destroy the returned stream object. jsStreamInfo_Destroy(si); } } if (s == NATS_OK) s = natsStatistics_Create(&stats); if (s == NATS_OK) { printf("\nSending %" PRId64 " messages to subject '%s'\n", total, stream); start = nats_Now(); } for (count = 0; (s == NATS_OK) && (count < total); count++) { if (async) s = js_PublishAsync(js, subj, (const void*) payload, dataLen, NULL); else { jsPubAck *pa = NULL; s = js_Publish(&pa, js, subj, (const void*) payload, dataLen, NULL, &jerr); if (s == NATS_OK) { if (pa->Duplicate) printf("Got a duplicate message! Sequence=%" PRIu64 "\n", pa->Sequence); jsPubAck_Destroy(pa); } } } if ((s == NATS_OK) && async) { jsPubOptions jsPubOpts; jsPubOptions_Init(&jsPubOpts); // Let's set it to 30 seconds, if getting "Timeout" errors, // this may need to be increased based on the number of messages // being sent. jsPubOpts.MaxWait = 30000; s = js_PublishAsyncComplete(js, &jsPubOpts); if (s == NATS_TIMEOUT) { // Let's get the list of pending messages. We could resend, // etc, but for now, just destroy them. natsMsgList list; js_PublishAsyncGetPendingList(&list, js); natsMsgList_Destroy(&list); } } if (s == NATS_OK) { jsStreamInfo *si = NULL; elapsed = nats_Now() - start; printStats(STATS_OUT, conn, NULL, stats); printPerf("Sent"); if (errors != 0) printf("There were %d asynchronous errors\n", errors); // Let's report some stats after the run s = js_GetStreamInfo(&si, js, stream, NULL, &jerr); if (s == NATS_OK) { printf("\nStream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n", si->Config->Name, si->State.Msgs, si->State.Bytes); jsStreamInfo_Destroy(si); } } if (delStream && (js != NULL)) { printf("\nDeleting stream %s: ", stream); s = js_DeleteStream(js, stream, NULL, &jerr); if (s == NATS_OK) printf("OK!"); printf("\n"); } if (s != NATS_OK) { printf("Error: %u - %s - jerr=%u\n", s, natsStatus_GetText(s), jerr); nats_PrintLastErrorStack(stderr); } // Destroy all our objects to avoid report of memory leak jsCtx_Destroy(js); natsStatistics_Destroy(stats); natsConnection_Destroy(conn); natsOptions_Destroy(opts); // To silence reports of memory still in used with valgrind nats_Close(); return 0; }