torque 79a45fd2e3
git subrepo clone (merge) --branch=v3.6.1 https://github.com/nats-io/nats.c.git deps/nats.c
subrepo:
  subdir:   "deps/nats.c"
  merged:   "66cec7f"
upstream:
  origin:   "https://github.com/nats-io/nats.c.git"
  branch:   "v3.6.1"
  commit:   "66cec7f"
git-subrepo:
  version:  "0.4.6"
  commit:   "b8b46501e"
2023-08-15 00:21:33 -07:00

144 lines
4.2 KiB
C

// Copyright 2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "../examples.h"
static const char *usage = ""\
"-txt text to send (default is 'hello')\n" \
"-count number of messages to send\n" \
"-sync publish synchronously (default is async)\n";
static volatile int ackCount = 0;
static volatile int errCount = 0;
static void
_pubAckHandler(const char *guid, const char *error, void *closure)
{
// This callback can be invoked by different threads for the
// same connection, so access should be protected. For this
// example, we don't.
ackCount++;
if (error != NULL)
{
printf("pub ack for guid:%s error=%s\n", guid, error);
errCount++;
}
}
static void
connectionLostCB(stanConnection *sc, const char *errTxt, void *closure)
{
bool *connLost = (bool*) closure;
printf("Connection lost: %s\n", errTxt);
*connLost = true;
}
int main(int argc, char **argv)
{
natsStatus s;
stanConnOptions *connOpts = NULL;
stanConnection *sc = NULL;
int len;
bool connLost = false;
opts = parseArgs(argc, argv, usage);
len = (int) strlen(payload);
printf("Sending %" PRId64 " messages to channel '%s'\n", total, subj);
// Now create STAN Connection Options and set the NATS Options.
s = stanConnOptions_Create(&connOpts);
if (s == NATS_OK)
s = stanConnOptions_SetNATSOptions(connOpts, opts);
// Set smaller ping intervals
if (s == NATS_OK)
s = stanConnOptions_SetPings(connOpts, 1, 5);
// Add a callback to be notified if the STAN connection is lost for good.
if (s == NATS_OK)
s = stanConnOptions_SetConnectionLostHandler(connOpts, connectionLostCB, (void*)&connLost);
/*
// To reduce MaxPubAcksInflight to 1000 and factor of 1.0
if (s == NATS_OK)
s = stanConnOptions_SetMaxPubAcksInflight(connOpts, 1000, 1.0);
*/
// Create the Connection using the STAN Connection Options
if (s == NATS_OK)
s = stanConnection_Connect(&sc, cluster, clientID, connOpts);
// Once the connection is created, we can destroy the options
natsOptions_Destroy(opts);
stanConnOptions_Destroy(connOpts);
if (s == NATS_OK)
start = nats_Now();
for (count = 0; (s == NATS_OK) && (count < total); count++)
{
if (async)
s = stanConnection_PublishAsync(sc, subj, (const void*) payload, len, _pubAckHandler, NULL);
else
s = stanConnection_Publish(sc, subj, (const void*) payload, len);
}
if (!connLost && (s == NATS_OK))
{
if (async)
{
while (ackCount != total)
nats_Sleep(15);
}
printPerf("Sent");
printf("Publish ack received: %d - with error: %d\n", ackCount, errCount);
}
// If the connection was created, try to close it
if (!connLost && (sc != NULL))
{
natsStatus closeSts = stanConnection_Close(sc);
if ((s == NATS_OK) && (closeSts != NATS_OK))
s = closeSts;
}
if (s != NATS_OK)
{
// If we finished before the end, let's wait a tiny
// bit to see if the failure is due to connection lost.
if (ackCount != total)
nats_Sleep(100);
// If connection was lost, the real reason is reported
// the the connectionLostCB callback.
if (!connLost)
{
printf("Error: %d - %s\n", s, natsStatus_GetText(s));
nats_PrintLastErrorStack(stderr);
}
}
// Destroy the connection
stanConnection_Destroy(sc);
// To silence reports of memory still in-use with valgrind.
nats_Sleep(50);
nats_Close();
return 0;
}