rsyslog/backport-Add-max-sessions-for-imptcp.c-similar-to-imtcp.c.patch
2021-01-15 15:35:15 +08:00

312 lines
9.9 KiB
Diff

From 6c50426dbfd309c03916918541095e9500ea98c6 Mon Sep 17 00:00:00 2001
From: Alfred Perlstein <alfred@fb.com>
Date: Sun, 9 Aug 2020 16:45:56 -0700
Subject: [PATCH 22/73] Add max sessions for imptcp.c similar to imtcp.c
The max is per-instance, not global across all instances.
There is also a bugfix where if epoll failed I think we could leave a
session linked in the list of sessions, this code unlinks it.
---
plugins/imptcp/imptcp.c | 72 ++++++++++++++++++++++++++++---------
tests/Makefile.am | 2 ++
tests/imptcp_maxsessions.sh | 46 ++++++++++++++++++++++++
3 files changed, 104 insertions(+), 16 deletions(-)
create mode 100755 tests/imptcp_maxsessions.sh
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 4d261a29f..e89971dbe 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -128,6 +128,7 @@ typedef struct configSettings_s {
uchar *lstnIP; /* which IP we should listen on? */
uchar *pszBindRuleset;
int wrkrMax; /* max number of workers (actually "helper workers") */
+ int iTCPSessMax; /* max open connections per instance */
} configSettings_t;
static configSettings_t cs;
@@ -164,6 +165,7 @@ struct instanceConf_s {
unsigned int ratelimitBurst;
uchar *startRegex;
regex_t start_preg; /* compiled version of startRegex */
+ int iTCPSessMax; /* max open connections */
struct instanceConf_s *next;
};
@@ -173,6 +175,7 @@ struct modConfData_s {
instanceConf_t *root, *tail;
int wrkrMax;
int bProcessOnPoller;
+ int iTCPSessMax;
sbool configSetViaV2Method;
};
@@ -182,6 +185,7 @@ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current lo
/* module-global parameters */
static struct cnfparamdescr modpdescr[] = {
{ "threads", eCmdHdlrPositiveInt, 0 },
+ { "maxsessions", eCmdHdlrInt, 0 },
{ "processOnPoller", eCmdHdlrBinary, 0 }
};
static struct cnfparamblk modpblk =
@@ -211,6 +215,7 @@ static struct cnfparamdescr inppdescr[] = {
{ "defaulttz", eCmdHdlrString, 0 },
{ "supportoctetcountedframing", eCmdHdlrBinary, 0 },
{ "framingfix.cisco.asa", eCmdHdlrBinary, 0 },
+ { "maxsessions", eCmdHdlrInt, 0 },
{ "notifyonconnectionclose", eCmdHdlrBinary, 0 },
{ "notifyonconnectionopen", eCmdHdlrBinary, 0 },
{ "compression.mode", eCmdHdlrGetWord, 0 },
@@ -269,6 +274,8 @@ struct ptcpsrv_s {
ruleset_t *pRuleset;
ptcplstn_t *pLstn; /* root of our listeners */
ptcpsess_t *pSess; /* root of our sessions */
+ int iTCPSessCnt;
+ int iTCPSessMax;
pthread_mutex_t mutSessLst;
sbool bKeepAlive; /* support keep-alive packets */
sbool bEmitMsgOnClose;
@@ -401,6 +408,24 @@ destructSess(ptcpsess_t *pSess)
free(pSess);
}
+/* remove session from server */
+static void
+unlinkSess(ptcpsess_t *pSess) {
+ ptcpsrv_t *pSrv = pSess->pLstn->pSrv;
+ pthread_mutex_lock(&pSrv->mutSessLst);
+ pSrv->iTCPSessCnt--;
+ /* finally unlink session from structures */
+ if(pSess->next != NULL)
+ pSess->next->prev = pSess->prev;
+ if(pSess->prev == NULL) {
+ /* need to update root! */
+ pSrv->pSess = pSess->next;
+ } else {
+ pSess->prev->next = pSess->next;
+ }
+ pthread_mutex_unlock(&pSrv->mutSessLst);
+}
+
static void
destructSrv(ptcpsrv_t *pSrv)
{
@@ -717,7 +742,7 @@ getPeerNames(prop_t **peerName, prop_t **peerIP, struct sockaddr *pAddr, sbool b
uchar szHname[NI_MAXHOST+1] = "";
struct addrinfo hints, *res;
sbool bMaliciousHName = 0;
-
+
DEFiRet;
*peerName = NULL;
@@ -1470,6 +1495,7 @@ addSess(ptcplstn_t *pLstn, int sock, prop_t *peerName, prop_t *peerIP)
int pmsg_size_factor;
CHKmalloc(pSess = malloc(sizeof(ptcpsess_t)));
+ pSess->next = NULL;
if(pLstn->pSrv->inst->startRegex == NULL) {
pmsg_size_factor = 1;
pSess->pMsg_save = NULL;
@@ -1494,7 +1520,17 @@ addSess(ptcplstn_t *pLstn, int sock, prop_t *peerName, prop_t *peerIP)
/* add to start of server's listener list */
pSess->prev = NULL;
+
pthread_mutex_lock(&pSrv->mutSessLst);
+ int iTCPSessMax = pSrv->inst->iTCPSessMax;
+ if (iTCPSessMax > 0 && pSrv->iTCPSessCnt >= iTCPSessMax) {
+ pthread_mutex_unlock(&pSrv->mutSessLst);
+ LogError(0, RS_RET_MAX_SESS_REACHED,
+ "too many tcp sessions - dropping incoming request");
+ ABORT_FINALIZE(RS_RET_MAX_SESS_REACHED);
+ }
+
+ pSrv->iTCPSessCnt++;
pSess->next = pSrv->pSess;
if(pSrv->pSess != NULL)
pSrv->pSess->prev = pSess;
@@ -1506,6 +1542,9 @@ addSess(ptcplstn_t *pLstn, int sock, prop_t *peerName, prop_t *peerIP)
finalize_it:
if(iRet != RS_RET_OK) {
if(pSess != NULL) {
+ if (pSess->next != NULL) {
+ unlinkSess(pSess);
+ }
free(pSess->pMsg_save);
free(pSess->pMsg);
free(pSess);
@@ -1566,24 +1605,14 @@ static rsRetVal
closeSess(ptcpsess_t *pSess)
{
DEFiRet;
-
+
if(pSess->compressionMode >= COMPRESS_STREAM_ALWAYS)
doZipFinish(pSess);
const int sock = pSess->sock;
close(sock);
- pthread_mutex_lock(&pSess->pLstn->pSrv->mutSessLst);
- /* finally unlink session from structures */
- if(pSess->next != NULL)
- pSess->next->prev = pSess->prev;
- if(pSess->prev == NULL) {
- /* need to update root! */
- pSess->pLstn->pSrv->pSess = pSess->next;
- } else {
- pSess->prev->next = pSess->next;
- }
- pthread_mutex_unlock(&pSess->pLstn->pSrv->mutSessLst);
+ unlinkSess(pSess);
if(pSess->pLstn->pSrv->bEmitMsgOnClose) {
LogMsg(0, RS_RET_NO_ERRCODE, LOG_INFO, "imptcp: session on socket %d closed "
@@ -1641,6 +1670,7 @@ createInstance(instanceConf_t **pinst)
inst->multiLine = 0;
inst->socketBacklog = 5;
inst->pszLstnPortFileName = NULL;
+ inst->iTCPSessMax = -1;
/* node created, let's add to config */
if(loadModConf->tail == NULL) {
@@ -1698,6 +1728,7 @@ static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *const pNe
inst->bEmitMsgOnOpen = cs.bEmitMsgOnOpen;
inst->iAddtlFrameDelim = cs.iAddtlFrameDelim;
inst->maxFrameSize = cs.maxFrameSize;
+ inst->iTCPSessMax = cs.iTCPSessMax;
finalize_it:
free(pNewVal);
@@ -1756,6 +1787,7 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
pSrv->flowControl = inst->flowControl;
pSrv->pRuleset = inst->pBindRuleset;
pSrv->pszInputName = ustrdup((inst->pszInputName == NULL) ? UCHAR_CONSTANT("imptcp") : inst->pszInputName);
+ pSrv->iTCPSessMax = inst->iTCPSessMax;
CHKiRet(prop.Construct(&pSrv->pInputName));
CHKiRet(prop.SetString(pSrv->pInputName, pSrv->pszInputName, ustrlen(pSrv->pszInputName)));
CHKiRet(prop.ConstructFinalize(pSrv->pInputName));
@@ -2022,13 +2054,13 @@ enqueueIoWork(epolld_t *epd, int dispatchInlineIfQueueFull) {
int dispatchInline;
int inlineDispatchThreshold;
DEFiRet;
-
+
CHKmalloc(n = malloc(sizeof(io_req_t)));
n->epd = epd;
-
+
inlineDispatchThreshold = DFLT_inlineDispatchThreshold * runModConf->wrkrMax;
dispatchInline = 0;
-
+
pthread_mutex_lock(&io_q.mut);
if (dispatchInlineIfQueueFull && io_q.sz > inlineDispatchThreshold) {
dispatchInline = 1;
@@ -2200,6 +2232,8 @@ CODESTARTnewInpInst
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
}
free(cstr);
+ } else if(!strcmp(inppblk.descr[i].name, "maxsessions")) {
+ inst->iTCPSessMax = (int) pvals[i].val.d.n;
} else if(!strcmp(inppblk.descr[i].name, "keepalive")) {
inst->bKeepAlive = (int) pvals[i].val.d.n;
} else if(!strcmp(inppblk.descr[i].name, "keepalive.probes")) {
@@ -2248,6 +2282,10 @@ CODESTARTnewInpInst
ABORT_FINALIZE(RS_RET_ERR);
}
}
+
+ if (inst->iTCPSessMax == -1) {
+ inst->iTCPSessMax = loadModConf->iTCPSessMax;
+ }
finalize_it:
CODE_STD_FINALIZERnewInpInst
cnfparamvalsDestruct(pvals, &inppblk);
@@ -2289,6 +2327,8 @@ CODESTARTsetModCnf
continue;
if(!strcmp(modpblk.descr[i].name, "threads")) {
loadModConf->wrkrMax = (int) pvals[i].val.d.n;
+ } else if(!strcmp(modpblk.descr[i].name, "maxsessions")) {
+ loadModConf->iTCPSessMax = (int) pvals[i].val.d.n;
} else if(!strcmp(modpblk.descr[i].name, "processOnPoller")) {
loadModConf->bProcessOnPoller = (int) pvals[i].val.d.n;
} else {
diff --git a/tests/Makefile.am b/tests/Makefile.am
index f982dad5d..0df67672c 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -1000,6 +1000,7 @@ TESTS += \
imptcp_no_octet_counted.sh \
imptcp_multi_line.sh \
imptcp_spframingfix.sh \
+ imptcp_maxsessions.sh \
imptcp_nonProcessingPoller.sh \
imptcp_veryLargeOctateCountedMessages.sh \
imptcp-basic-hup.sh \
@@ -2472,6 +2473,7 @@ EXTRA_DIST= \
testsuites/xlate_more_with_duplicates_and_nomatch.lkp_tbl \
testsuites/xlate_sparse_array_more_with_duplicates_and_nomatch.lkp_tbl \
json_var_cmpr.sh \
+ imptcp_maxsessions.sh \
imptcp_nonProcessingPoller.sh \
imptcp_veryLargeOctateCountedMessages.sh \
known_issues.supp \
diff --git a/tests/imptcp_maxsessions.sh b/tests/imptcp_maxsessions.sh
new file mode 100755
index 000000000..3162eb619
--- /dev/null
+++ b/tests/imptcp_maxsessions.sh
@@ -0,0 +1,46 @@
+#!/bin/bash
+# Test imtcp with many dropping connections
+# added 2010-08-10 by Rgerhards
+#
+# This file is part of the rsyslog project, released under GPLv3
+. ${srcdir:=.}/diag.sh init
+skip_platform "FreeBSD" "This test currently does not work on FreeBSD"
+export NUMMESSAGES=500
+
+MAXSESSIONS=10
+CONNECTIONS=20
+EXPECTED_DROPS=$((CONNECTIONS - MAXSESSIONS))
+
+EXPECTED_STR='too many tcp sessions - dropping incoming request'
+wait_too_many_sessions()
+{
+ test "$(grep "$EXPECTED_STR" "$RSYSLOG_OUT_LOG" | wc -l)" = "$EXPECTED_DROPS"
+}
+
+export QUEUE_EMPTY_CHECK_FUNC=wait_too_many_sessions
+generate_conf
+add_conf '
+$MaxMessageSize 10k
+
+module(load="../plugins/imptcp/.libs/imptcp" maxsessions="'$MAXSESSIONS'")
+input(type="imptcp" port="0" listenPortFileName="'$RSYSLOG_DYNNAME'.tcpflood_port")
+action(type="omfile" file=`echo $RSYSLOG_OUT_LOG`)
+
+$template outfmt,"%msg:F,58:2%,%msg:F,58:3%,%msg:F,58:4%\n"
+$OMFileFlushInterval 2
+$OMFileIOBufferSize 256k
+'
+startup
+
+echo "INFO: RSYSLOG_OUT_LOG: $RSYSLOG_OUT_LOG"
+
+echo "About to run tcpflood"
+tcpflood -c$CONNECTIONS -m$NUMMESSAGES -r -d100 -P129
+echo "done run tcpflood"
+shutdown_when_empty
+wait_shutdown
+
+content_count_check "$EXPECTED_STR" $EXPECTED_DROPS
+echo "Got expected drops: $EXPECTED_DROPS, looks good!"
+
+exit_test
--
2.23.0