subrepo: subdir: "deps/nats.c" merged: "66cec7f" upstream: origin: "https://github.com/nats-io/nats.c.git" branch: "v3.6.1" commit: "66cec7f" git-subrepo: version: "0.4.6" commit: "b8b46501e"
434 lines
12 KiB
C
434 lines
12 KiB
C
// Copyright 2015-2018 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
#ifndef EXAMPLES_H_
|
|
#define EXAMPLES_H_
|
|
|
|
#include <nats.h>
|
|
#include <stdio.h>
|
|
#include <string.h>
|
|
|
|
#ifdef _WIN32
|
|
#define strcasecmp _stricmp
|
|
#define strdup _strdup
|
|
#else
|
|
#include <strings.h>
|
|
#include <signal.h>
|
|
#endif
|
|
|
|
#define STATS_IN 0x1
|
|
#define STATS_OUT 0x2
|
|
#define STATS_COUNT 0x4
|
|
|
|
#define MAX_SERVERS (10)
|
|
|
|
bool async = true;
|
|
const char *subj = "foo";
|
|
const char *payload= "hello";
|
|
const char *name = "worker";
|
|
int64_t total = 1000000;
|
|
|
|
volatile int64_t count = 0;
|
|
volatile int64_t dropped = 0;
|
|
int64_t start = 0;
|
|
volatile int64_t elapsed = 0;
|
|
bool print = false;
|
|
int64_t timeout = 10000; // 10 seconds.
|
|
|
|
natsOptions *opts = NULL;
|
|
|
|
const char *certFile = NULL;
|
|
const char *keyFile = NULL;
|
|
|
|
const char *cluster = "test-cluster";
|
|
const char *clientID = "client";
|
|
const char *qgroup = NULL;
|
|
const char *durable = NULL;
|
|
bool deliverAll = false;
|
|
bool deliverLast = true;
|
|
uint64_t deliverSeq = 0;
|
|
bool unsubscribe = false;
|
|
|
|
const char *stream = NULL;
|
|
bool pull = false;
|
|
bool flowctrl = false;
|
|
|
|
static natsStatus
|
|
printStats(int mode, natsConnection *conn, natsSubscription *sub,
|
|
natsStatistics *stats)
|
|
{
|
|
natsStatus s = NATS_OK;
|
|
uint64_t inMsgs = 0;
|
|
uint64_t inBytes = 0;
|
|
uint64_t outMsgs = 0;
|
|
uint64_t outBytes = 0;
|
|
uint64_t reconnected = 0;
|
|
int pending = 0;
|
|
int64_t delivered = 0;
|
|
int64_t sdropped = 0;
|
|
|
|
s = natsConnection_GetStats(conn, stats);
|
|
if (s == NATS_OK)
|
|
s = natsStatistics_GetCounts(stats, &inMsgs, &inBytes,
|
|
&outMsgs, &outBytes, &reconnected);
|
|
if ((s == NATS_OK) && (sub != NULL))
|
|
{
|
|
s = natsSubscription_GetStats(sub, &pending, NULL, NULL, NULL,
|
|
&delivered, &sdropped);
|
|
|
|
// Since we use AutoUnsubscribe(), when the max has been reached,
|
|
// the subscription is automatically closed, so this call would
|
|
// return "Invalid Subscription". Ignore this error.
|
|
if (s == NATS_INVALID_SUBSCRIPTION)
|
|
{
|
|
s = NATS_OK;
|
|
pending = 0;
|
|
}
|
|
}
|
|
|
|
if (s == NATS_OK)
|
|
{
|
|
if (mode & STATS_IN)
|
|
{
|
|
printf("In Msgs: %9" PRIu64 " - "\
|
|
"In Bytes: %9" PRIu64 " - ", inMsgs, inBytes);
|
|
}
|
|
if (mode & STATS_OUT)
|
|
{
|
|
printf("Out Msgs: %9" PRIu64 " - "\
|
|
"Out Bytes: %9" PRIu64 " - ", outMsgs, outBytes);
|
|
}
|
|
if (mode & STATS_COUNT)
|
|
{
|
|
printf("Delivered: %9" PRId64 " - ", delivered);
|
|
printf("Pending: %5d - ", pending);
|
|
printf("Dropped: %5" PRId64 " - ", sdropped);
|
|
}
|
|
printf("Reconnected: %3" PRIu64 "\n", reconnected);
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
static void
|
|
printPerf(const char *perfTxt)
|
|
{
|
|
if ((start > 0) && (elapsed == 0))
|
|
elapsed = nats_Now() - start;
|
|
|
|
if (elapsed <= 0)
|
|
printf("\nNot enough messages or too fast to report performance!\n");
|
|
else
|
|
printf("\n%s %" PRId64 " messages in "\
|
|
"%" PRId64 " milliseconds (%d msgs/sec)\n",
|
|
perfTxt, count, elapsed, (int)((count * 1000) / elapsed));
|
|
}
|
|
|
|
static void
|
|
printUsageAndExit(const char *progName, const char *usage)
|
|
{
|
|
printf("\nUsage: %s [options]\n\nThe options are:\n\n"\
|
|
"-h prints the usage\n" \
|
|
"-s server url(s) (list of comma separated nats urls)\n" \
|
|
"-tls use secure (SSL/TLS) connection\n" \
|
|
"-tlscacert trusted certificates file\n" \
|
|
"-tlscert client certificate (PEM format only)\n" \
|
|
"-tlskey client private key file (PEM format only)\n" \
|
|
"-tlsciphers ciphers suite\n"
|
|
"-tlshost server certificate's expected hostname\n" \
|
|
"-tlsskip skip server certificate verification\n" \
|
|
"-creds user credentials chained file\n" \
|
|
"-subj subject (default is 'foo')\n" \
|
|
"-print for consumers, print received messages (default is false)\n" \
|
|
"-wd write deadline in milliseconds\n" \
|
|
"%s\n",
|
|
progName, usage);
|
|
|
|
natsOptions_Destroy(opts);
|
|
nats_Close();
|
|
|
|
exit(1);
|
|
}
|
|
|
|
static natsStatus
|
|
parseUrls(const char *urls, natsOptions *gopts)
|
|
{
|
|
char *serverUrls[MAX_SERVERS];
|
|
int num = 0;
|
|
natsStatus s = NATS_OK;
|
|
char *urlsCopy = NULL;
|
|
char *commaPos = NULL;
|
|
char *ptr = NULL;
|
|
|
|
urlsCopy = strdup(urls);
|
|
if (urlsCopy == NULL)
|
|
return NATS_NO_MEMORY;
|
|
|
|
memset(serverUrls, 0, sizeof(serverUrls));
|
|
|
|
ptr = urlsCopy;
|
|
|
|
do
|
|
{
|
|
if (num == MAX_SERVERS)
|
|
{
|
|
s = NATS_INSUFFICIENT_BUFFER;
|
|
break;
|
|
}
|
|
|
|
serverUrls[num++] = ptr;
|
|
commaPos = strchr(ptr, ',');
|
|
if (commaPos != NULL)
|
|
{
|
|
ptr = (char*)(commaPos + 1);
|
|
*(commaPos) = '\0';
|
|
}
|
|
else
|
|
{
|
|
ptr = NULL;
|
|
}
|
|
} while (ptr != NULL);
|
|
|
|
if (s == NATS_OK)
|
|
s = natsOptions_SetServers(gopts, (const char**) serverUrls, num);
|
|
|
|
free(urlsCopy);
|
|
|
|
return s;
|
|
}
|
|
|
|
static natsOptions*
|
|
parseArgs(int argc, char **argv, const char *usage)
|
|
{
|
|
natsStatus s = NATS_OK;
|
|
bool urlsSet = false;
|
|
int i;
|
|
|
|
if (natsOptions_Create(&opts) != NATS_OK)
|
|
s = NATS_NO_MEMORY;
|
|
|
|
for (i=1; (i<argc) && (s == NATS_OK); i++)
|
|
{
|
|
if ((strcasecmp(argv[i], "-h") == 0)
|
|
|| (strcasecmp(argv[i], "-help") == 0))
|
|
{
|
|
printUsageAndExit(argv[0], usage);
|
|
}
|
|
else if (strcasecmp(argv[i], "-s") == 0)
|
|
{
|
|
if (i + 1 == argc)
|
|
printUsageAndExit(argv[0], usage);
|
|
|
|
s = parseUrls(argv[++i], opts);
|
|
if (s == NATS_OK)
|
|
urlsSet = true;
|
|
}
|
|
else if (strcasecmp(argv[i], "-tls") == 0)
|
|
{
|
|
s = natsOptions_SetSecure(opts, true);
|
|
}
|
|
else if (strcasecmp(argv[i], "-tlscacert") == 0)
|
|
{
|
|
if (i + 1 == argc)
|
|
printUsageAndExit(argv[0], usage);
|
|
|
|
s = natsOptions_LoadCATrustedCertificates(opts, argv[++i]);
|
|
}
|
|
else if (strcasecmp(argv[i], "-tlscert") == 0)
|
|
{
|
|
if (i + 1 == argc)
|
|
printUsageAndExit(argv[0], usage);
|
|
|
|
certFile = argv[++i];
|
|
}
|
|
else if (strcasecmp(argv[i], "-tlskey") == 0)
|
|
{
|
|
if (i + 1 == argc)
|
|
printUsageAndExit(argv[0], usage);
|
|
|
|
keyFile = argv[++i];
|
|
}
|
|
else if (strcasecmp(argv[i], "-tlsciphers") == 0)
|
|
{
|
|
if (i + 1 == argc)
|
|
printUsageAndExit(argv[0], usage);
|
|
|
|
s = natsOptions_SetCiphers(opts, argv[++i]);
|
|
}
|
|
else if (strcasecmp(argv[i], "-tlshost") == 0)
|
|
{
|
|
if (i + 1 == argc)
|
|
printUsageAndExit(argv[0], usage);
|
|
|
|
s = natsOptions_SetExpectedHostname(opts, argv[++i]);
|
|
}
|
|
else if (strcasecmp(argv[i], "-tlsskip") == 0)
|
|
{
|
|
s = natsOptions_SkipServerVerification(opts, true);
|
|
}
|
|
else if (strcasecmp(argv[i], "-sync") == 0)
|
|
{
|
|
async = false;
|
|
pull = false;
|
|
}
|
|
else if (strcasecmp(argv[i], "-subj") == 0)
|
|
{
|
|
if (i + 1 == argc)
|
|
printUsageAndExit(argv[0], usage);
|
|
|
|
subj = argv[++i];
|
|
}
|
|
else if (strcasecmp(argv[i], "-print") == 0)
|
|
{
|
|
print = true;
|
|
}
|
|
else if ((strcasecmp(argv[i], "-name") == 0) ||
|
|
(strcasecmp(argv[i], "-queue") == 0))
|
|
{
|
|
if (i + 1 == argc)
|
|
printUsageAndExit(argv[0], usage);
|
|
|
|
name = argv[++i];
|
|
}
|
|
else if (strcasecmp(argv[i], "-count") == 0)
|
|
{
|
|
if (i + 1 == argc)
|
|
printUsageAndExit(argv[0], usage);
|
|
|
|
total = atol(argv[++i]);
|
|
}
|
|
else if (strcasecmp(argv[i], "-txt") == 0)
|
|
{
|
|
if (i + 1 == argc)
|
|
printUsageAndExit(argv[0], usage);
|
|
|
|
payload = argv[++i];
|
|
}
|
|
else if (strcasecmp(argv[i], "-timeout") == 0)
|
|
{
|
|
if (i + 1 == argc)
|
|
printUsageAndExit(argv[0], usage);
|
|
|
|
timeout = atol(argv[++i]);
|
|
}
|
|
else if (strcasecmp(argv[i], "-gd") == 0)
|
|
{
|
|
s = natsOptions_UseGlobalMessageDelivery(opts, true);
|
|
}
|
|
else if (strcasecmp(argv[i], "-c") == 0)
|
|
{
|
|
if (i + 1 == argc)
|
|
printUsageAndExit(argv[0], usage);
|
|
|
|
cluster = argv[++i];
|
|
}
|
|
else if (strcasecmp(argv[i], "-id") == 0)
|
|
{
|
|
if (i + 1 == argc)
|
|
printUsageAndExit(argv[0], usage);
|
|
|
|
clientID = argv[++i];
|
|
}
|
|
else if (strcasecmp(argv[i], "-last") == 0)
|
|
{
|
|
deliverLast = true;
|
|
}
|
|
else if (strcasecmp(argv[i], "-all") == 0)
|
|
{
|
|
deliverAll = true;
|
|
}
|
|
else if (strcasecmp(argv[i], "-seq") == 0)
|
|
{
|
|
if (i + 1 == argc)
|
|
printUsageAndExit(argv[0], usage);
|
|
|
|
deliverSeq = atol(argv[++i]);
|
|
}
|
|
else if (strcasecmp(argv[i], "-durable") == 0)
|
|
{
|
|
if (i + 1 == argc)
|
|
printUsageAndExit(argv[0], usage);
|
|
|
|
durable = argv[++i];
|
|
}
|
|
else if (strcasecmp(argv[i], "-qgroup") == 0)
|
|
{
|
|
if (i + 1 == argc)
|
|
printUsageAndExit(argv[0], usage);
|
|
|
|
qgroup = argv[++i];
|
|
}
|
|
else if (strcasecmp(argv[i], "-unsubscribe") == 0)
|
|
{
|
|
unsubscribe = true;
|
|
}
|
|
else if (strcasecmp(argv[i], "-creds") == 0)
|
|
{
|
|
if (i + 1 == argc)
|
|
printUsageAndExit(argv[0], usage);
|
|
|
|
s = natsOptions_SetUserCredentialsFromFiles(opts, argv[++i], NULL);
|
|
}
|
|
else if (strcasecmp(argv[i], "-wd") == 0)
|
|
{
|
|
if (i + 1 == argc)
|
|
printUsageAndExit(argv[0], usage);
|
|
|
|
s = natsOptions_SetWriteDeadline(opts, atol(argv[++i]));
|
|
}
|
|
else if (strcasecmp(argv[i], "-stream") == 0)
|
|
{
|
|
if (i + 1 == argc)
|
|
printUsageAndExit(argv[0], usage);
|
|
|
|
stream = argv[++i];
|
|
}
|
|
else if (strcasecmp(argv[i], "-pull") == 0)
|
|
{
|
|
async = false;
|
|
pull = true;
|
|
}
|
|
else if (strcasecmp(argv[i], "-fc") == 0)
|
|
{
|
|
flowctrl = true;
|
|
}
|
|
else
|
|
{
|
|
printf("Unknown option: '%s'\n", argv[i]);
|
|
printUsageAndExit(argv[0], usage);
|
|
}
|
|
}
|
|
|
|
if ((s == NATS_OK) && ((certFile != NULL) || (keyFile != NULL)))
|
|
s = natsOptions_LoadCertificatesChain(opts, certFile, keyFile);
|
|
|
|
if ((s == NATS_OK) && !urlsSet)
|
|
s = parseUrls(NATS_DEFAULT_URL, opts);
|
|
|
|
if (s != NATS_OK)
|
|
{
|
|
printf("Error parsing arguments: %u - %s\n",
|
|
s, natsStatus_GetText(s));
|
|
|
|
nats_PrintLastErrorStack(stderr);
|
|
|
|
natsOptions_Destroy(opts);
|
|
nats_Close();
|
|
exit(1);
|
|
}
|
|
|
|
return opts;
|
|
}
|
|
|
|
#endif /* EXAMPLES_H_ */
|