torque 79a45fd2e3
git subrepo clone (merge) --branch=v3.6.1 https://github.com/nats-io/nats.c.git deps/nats.c
subrepo:
  subdir:   "deps/nats.c"
  merged:   "66cec7f"
upstream:
  origin:   "https://github.com/nats-io/nats.c.git"
  branch:   "v3.6.1"
  commit:   "66cec7f"
git-subrepo:
  version:  "0.4.6"
  commit:   "b8b46501e"
2023-08-15 00:21:33 -07:00

236 lines
7.0 KiB
C

// Copyright 2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "../examples.h"
static const char *usage = ""\
"-c cluster name (default \"test-cluster\")\n" \
"-id client ID (default \"client\"\n" \
"-count number of messages to receive\n" \
"-last deliver starting with last published message (default)\n" \
"-all deliver all available messages\n" \
"-seq deliver starting at given sequence number\n" \
"-durable durable subscription name\n" \
"-qgroup queue group name\n" \
"-unsubscribe unsubscribe the durable on exit\n";
static volatile bool done = false;
static void
onMsg(stanConnection *sc, stanSubscription *sub, const char *channel, stanMsg *msg, void *closure)
{
if (print)
printf("Received on [%s]: sequence:%" PRIu64 " data:%.*s timestamp:%" PRId64 " redelivered: %s\n",
channel,
stanMsg_GetSequence(msg),
stanMsg_GetDataLength(msg),
stanMsg_GetData(msg),
stanMsg_GetTimestamp(msg),
stanMsg_IsRedelivered(msg) ? "yes" : "no");
if (start == 0)
start = nats_Now();
// We should be using a mutex to protect those variables since
// they are used from the subscription's delivery and the main
// threads. For demo purposes, this is fine.
if (count == total-1)
{
natsStatus s;
elapsed = nats_Now() - start;
if (unsubscribe)
s = stanSubscription_Unsubscribe(sub);
else
s = stanSubscription_Close(sub);
if (s != NATS_OK)
{
printf("Error: %d - %s\n", s, natsStatus_GetText(s));
nats_PrintLastErrorStack(stderr);
}
}
/*
// If manual acknowledgment was selected, we would acknowledge
// the message this way:
stanSubscription_AckMsg(sub, msg);
*/
stanMsg_Destroy(msg);
// Increment only here so that when the main thread detects
// that the total has been sent, it does not start cleaning
// objects before we have closed the subscription and destroyed
// the last message. This is to reduce risk of valgrind reporting
// memory still in-use.
count++;
}
#if WIN32
static BOOL
sigHandler(DWORD fdwCtrlType)
{
if (fdwCtrlType==CTRL_C_EVENT)
{
done = true;
return TRUE;
}
return FALSE;
}
#else
static void
sigHandler(int ignored) {
done = true;
}
#endif
static void
connectionLostCB(stanConnection *sc, const char *errTxt, void *closure)
{
bool *connLost = (bool*) closure;
printf("Connection lost: %s\n", errTxt);
*connLost = true;
}
int main(int argc, char **argv)
{
natsStatus s;
stanConnOptions *connOpts = NULL;
stanSubOptions *subOpts = NULL;
stanConnection *sc = NULL;
stanSubscription *sub = NULL;
bool connLost = false;
opts = parseArgs(argc, argv, usage);
printf("Receiving %" PRId64 " messages from channel '%s'\n", total, subj);
// Now create STAN Connection Options and set the NATS Options.
s = stanConnOptions_Create(&connOpts);
if (s == NATS_OK)
s = stanConnOptions_SetNATSOptions(connOpts, opts);
// Add a callback to be notified if the STAN connection is lost for good.
if (s == NATS_OK)
s = stanConnOptions_SetConnectionLostHandler(connOpts, connectionLostCB, (void*)&connLost);
// Create the Connection using the STAN Connection Options
if (s == NATS_OK)
s = stanConnection_Connect(&sc, cluster, clientID, connOpts);
// Once connection is created, we can destroy opts and connOpts
natsOptions_Destroy(opts);
stanConnOptions_Destroy(connOpts);
if (s == NATS_OK)
s = stanSubOptions_Create(&subOpts);
// If durable
if ((s == NATS_OK) && (durable != NULL))
s = stanSubOptions_SetDurableName(subOpts, durable);
// Set position
if (s == NATS_OK)
{
if (deliverAll)
s = stanSubOptions_DeliverAllAvailable(subOpts);
else if (deliverLast)
s = stanSubOptions_StartWithLastReceived(subOpts);
else if (deliverSeq > 0)
s = stanSubOptions_StartAtSequence(subOpts, deliverSeq);
}
/*
// To manually acknowledge the messages, you would need to set this option
if (s == NATS_OK)
s = stanSubOptions_SetManualAckMode(subOpts, true);
// To change the number of MaxInflight messages, set this option.
// For instance, to receive a single message between each ACK, set
// the value to 1.
if (s == NATS_OK)
s = stanSubOptions_SetMaxInflight(subOpts, 1);
// To change the duration after which the server resends unacknowledged
// messages, use this option. For instance, cause re-delivery after 5 seconds:
if (s == NATS_OK)
s = stanSubOptions_SetAckWait(subOpts, 5000);
*/
// Create subscription
if (s == NATS_OK)
{
if (qgroup != NULL)
s = stanConnection_QueueSubscribe(&sub, sc, subj, qgroup, onMsg, NULL, subOpts);
else
s = stanConnection_Subscribe(&sub, sc, subj, onMsg, NULL, subOpts);
}
// Once subscription is created, we can destroy the subOpts object
stanSubOptions_Destroy(subOpts);
if (s == NATS_OK)
{
#if WIN32
SetConsoleCtrlHandler((PHANDLER_ROUTINE) sigHandler, TRUE);
#else
signal(SIGINT, sigHandler);
#endif
while (!done && !connLost && (count < total))
nats_Sleep(15);
if (!connLost)
printPerf("Received");
}
// If test was interrupted before receiving all expected messages,
// close or unsubscribe. Otherwise, this is done in the message
// callback.
if (!connLost && ((sub != NULL) && (count < total)))
{
if (unsubscribe)
s = stanSubscription_Unsubscribe(sub);
else
s = stanSubscription_Close(sub);
}
// If the connection was created, try to close it
if (!connLost && (sc != NULL))
{
natsStatus closeSts = stanConnection_Close(sc);
if ((s == NATS_OK) && (closeSts != NATS_OK))
s = closeSts;
}
if (s != NATS_OK)
{
printf("Error: %d - %s\n", s, natsStatus_GetText(s));
nats_PrintLastErrorStack(stderr);
}
// Destroy our objects
stanSubscription_Destroy(sub);
stanConnection_Destroy(sc);
// To silence reports of memory still in-use with valgrind.
nats_Sleep(50);
nats_Close();
return 0;
}