GitHub issue: rabbitmq/rabbitmq-server#16347
The issue is a review of what remains to be done for per-protocol connection limits. The stream plugin already has stream.max_connections and the web_mqtt plugin already has web_mqtt.max_connections. This document covers the implementation of mqtt.max_connections (PR #16367, under revision) and stomp.max_connections (PR #16368, open).
Status of PR #16367: The initial implementation used ranch:info(RanchRef) (same as the stream plugin). Reviewer @ansd identified that this gives a per-listener count, not a node-wide count. MQTT supports port-to-vhost mapping (multiple listeners on different ports), dual-stack (separate IPv4/IPv6 listeners), and mixed TCP/TLS deployments — each of which is a separate Ranch ref with its own supervisor. With four listeners and max_connections = 1000, the actual node limit would be 4000. The fix is to use the MQTT PG scope ETS table, described below.
AMQP 0-9-1 has two independent connection limit mechanisms. Understanding both is important context for the MQTT approach.
Set via ranch_connection_max in rabbitmq.conf. Applied in tcp_listener_sup.erl:38 as the max_connections field in the Ranch listener options (divided by num_conns_sups).
When the limit is reached, ranch_conns_sup does not send the resumption message to the acceptor process after handing off a connection (ranch_conns_sup.erl:264-268). The acceptor blocks inside start_protocol/3 and stops calling Transport:accept/2. The kernel continues completing TCP three-way handshakes and placing sockets in the OS accept queue (sized by the backlog option, default 128). Once that queue is full, the kernel on Linux silently drops new SYN packets. Clients experience a TCP connection timeout with no RST. The TCP listen queue is directly affected.
When a connection closes, ranch_conns_sup wakes a sleeping acceptor and accepting resumes.
Set via connection_max in rabbitmq.conf. Checked in rabbit_reader.erl:is_over_node_connection_limit/1 (line 1416), called from handle_method0 when connection.open is received (line 1318). The implementation calls ranch:info(RanchRef) and compares active_connections against the limit. If over the limit, rabbit_misc:protocol_error(not_allowed, ...) is raised, sending a connection.close frame to the client before closing. The TCP listen queue is not affected; Ranch keeps accepting normally.
mqtt.max_connections corresponds to the connection_max pattern - an application-level check in the connection handler, not a Ranch transport-level gate. The stream plugin uses ranch:info(RanchRef) for this, but that approach is incorrect for MQTT because MQTT supports multi-listener configurations (port-to-vhost mapping, dual-stack, TCP+TLS) where each listener has a separate Ranch ref. Instead, MQTT's existing PG scope is used for a true node-wide count (see enforcement approach below).
stream.max_connections- enforced inrabbit_stream_readerviaranch:info/1after Ranch accepts the connection, during the OPEN frameweb_mqtt.max_connections- enforced at the Ranch/Cowboy transport layer at listener startup timerabbit_mqtt_processoralready enforcescheck_vhost_connection_limit/1andcheck_user_connection_limit/1during CONNECT packet processing
MQTT uses rabbit_mqtt_sup -> rabbit_networking:tcp_listener_spec/10 -> tcp_listener_sup. The key point is that tcp_listener_sup passes max_connections to Ranch from rabbit.ranch_connection_max (the global node limit), not from a per-plugin setting.
Web MQTT bypasses tcp_listener_sup entirely and starts its Cowboy listeners directly, passing max_connections from application:get_env(rabbitmq_web_mqtt, max_connections, infinity). That is why web_mqtt can enforce a plugin-specific limit at the Ranch level without touching tcp_listener_sup.
Since MQTT uses tcp_listener_sup (shared infrastructure), adding a per-plugin max_connections at the Ranch transport layer would require modifying tcp_listener_sup, which affects every protocol that uses it. That approach is out of scope. Instead, we check the limit in the connection handler after Ranch has accepted the connection.
When Ranch accepts a connection, it starts a rabbit_mqtt_reader process. That process calls rabbit_networking:handshake/2 and then waits for the CONNECT packet. When the CONNECT packet arrives, rabbit_mqtt_processor:init/5 is called, which calls process_connect/5. The first check in the maybe chain inside process_connect/5 is check_node_connection_limit().
check_node_connection_limit/0 uses ets:info(persistent_term:get(?PG_SCOPE), size) to get the number of active MQTT connections node-wide, then compares against application:get_env(rabbitmq_mqtt, max_connections, infinity).
The MQTT plugin creates a node-local PG scope (rabbit:pg_local_scope(?PG_SCOPE)) in rabbit_mqtt_sup. Each MQTT connection registers in this scope by calling pg:join(PgScope, {VHost, ClientId}, self()) inside register_client_id/4 (line 721 of rabbit_mqtt_processor.erl). The MQTT spec requires unique ClientIDs per node, which RabbitMQ enforces, so each {VHost, ClientId} group has exactly one member. The OTP pg module stores one ETS row per group ({Group, AllPids, LocalPids}), so ets:info(PgScope, size) equals the number of active MQTT connections on the node.
This count is:
- Node-wide: the PG scope is local to the node; it is not split by listener, IP family, or transport
- O(1): a single ETS metadata lookup
- Inclusive of all transports: plain TCP, TLS, and Web MQTT connections all call
register_client_idand join the same PG scope
register_client_id is called at line 214 of process_connect, which is after the check_node_connection_limit() call at line 197. The current connection is therefore not yet in the PG scope when the check runs. The correct comparison is ActiveConns >= Limit: if the count is already at the limit, the new connection must be rejected (it would become the (Limit+1)-th).
This differs from the Ranch approach, where ranch:handshake/1 has already completed and the connection is already counted in active_connections, making > Limit correct for that case.
Web MQTT connections call rabbit_mqtt_processor:init/4 (which delegates to init/5 with RanchRef = undefined). They also call register_client_id on successful CONNECT, joining the same PG scope. With the PG scope approach, Web MQTT connections are counted in the node-wide total and are subject to mqtt.max_connections. This is the correct and intended behaviour — mqtt.max_connections is a node-wide limit for all MQTT connections regardless of transport.
rabbit_mqtt_processor:init/4 is in the -export list and is called by two places:
deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl— plain MQTTdeps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl:385— Web MQTT
Since check_node_connection_limit/0 takes no argument, RanchRef is not needed anywhere in the connect flow. init/5 is therefore unnecessary. All changes made to rabbit_mqtt_reader.erl in the enforcement commit (adding ranch_ref to #state{}, storing it in init/1, binding it in process_received_bytes/2, and passing it to init/5) can be fully reverted. Both callers use init/4 as before.
deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema — committed in "Add mqtt.max_connections cuttlefish schema mapping":
{mapping, "mqtt.max_connections", "rabbitmq_mqtt.max_connections",
[{datatype, [{atom, infinity}, integer]}, {validators, ["non_negative_integer"]}]}.
{translation, "rabbitmq_mqtt.max_connections",
fun(Conf) ->
case cuttlefish:conf_get("mqtt.max_connections", Conf, undefined) of
undefined -> cuttlefish:unset();
infinity -> infinity;
Val when is_integer(Val) -> Val;
_ -> cuttlefish:invalid("should be a non-negative integer")
end
end}.deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets — committed in same commit.
deps/rabbitmq_mqtt/test/auth_SUITE.erl — committed in "Add node_connection_limit test to auth_SUITE". Test behaviour is unchanged by the counting mechanism switch: with max_connections = 0, both approaches reject the first connection attempt (Ranch: 1 > 0; PG scope: 0 >= 0). No further changes needed.
Remove init/5 from the -export list. init/4 and init/5 are both currently exported; remove init/5.
Restore init/4 to its original full body — revert the wrapper introduced in the enforcement commit. init/4 calls process_connect/5 directly (not via init/5):
init(#mqtt_packet{fixed = #mqtt_packet_fixed{type = ?CONNECT},
variable = ConnectPacket},
Socket, ConnName, SendFun) ->
case rabbit_net:socket_ends(Socket, inbound) of
{ok, SocketEnds} ->
process_connect(ConnectPacket, Socket, ConnName, SendFun, SocketEnds);
{error, Reason} ->
{error, {socket_ends, Reason}}
end.Remove init/5 entirely — the clause added in the enforcement commit is deleted.
process_connect/6 → process_connect/5 — remove RanchRef from the function head; it is no longer used.
Call site — change ok ?= check_node_connection_limit(RanchRef) to ok ?= check_node_connection_limit().
Replace check_node_connection_limit/1 with check_node_connection_limit/0:
Remove both clauses of the old check_node_connection_limit/1 and replace with:
check_node_connection_limit() ->
case application:get_env(rabbitmq_mqtt, max_connections, infinity) of
infinity ->
ok;
Limit when is_integer(Limit), Limit >= 0 ->
PgScope = persistent_term:get(?PG_SCOPE),
case ets:info(PgScope, size) of
ActiveConns when is_integer(ActiveConns), ActiveConns >= Limit ->
?LOG_ERROR("MQTT connection failed: node connection limit ~tp is reached",
[Limit]),
{error, ?RC_QUOTA_EXCEEDED};
_ ->
ok
end;
_ ->
ok
end.?PG_SCOPE is already used in rabbit_mqtt_processor.erl (in register_client_id/4 and remove_duplicate_client_id_connections/3), so the macro is already in scope. ets:info/2 returns undefined if the table does not exist; the is_integer(ActiveConns) guard catches this and falls through to _ -> ok (fail open), though in practice the PG scope is always running when connections are being processed.
All three changes from the enforcement commit must be reverted:
- Remove
ranch_ref :: ranch:ref()from#state{} - Remove
ranch_ref = Reffrominit/1 - Remove
ranch_ref = RanchReffrom theprocess_received_bytes/2pattern match - Revert the
init/5call back toinit/4(removing theRanchRefargument)
deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl— keeps callinginit/4, no changes neededdeps/rabbit/src/tcp_listener_sup.erl— not modified; per-plugin limits do not go through shared infrastructuredeps/rabbit/src/rabbit_networking.erl— not modified
- The PG scope ETS table has one row per
{VHost, ClientId}group. Since MQTT enforces unique ClientIDs per vhost (enforced byremove_duplicate_client_id_connections/3), each group has exactly one member.ets:info(PgScope, size)therefore equals the number of active MQTT connections on the node. - The check runs at line 197, before
register_client_idat line 214. The current connection is not yet in the PG scope. Comparison is>= Limit. - With
max_connections = 0: PG size is 0 on the first attempt;0 >= 0is true; the connection is rejected. This matches the test behaviour. process_connect/1(arity 1, handles session/subscription setup after auth) is a separate function and is not affected.- The
non_negative_integervalidator is defined in the corerabbit.schemaand is available to all plugin schemas.
STOMP initializes the processor with rabbit_stomp_processor:initial_state/2, called from two places:
deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl:83- plain STOMP (hasRef)deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl:215- Web STOMP (noRef)
Parallel to the MQTT approach: keep initial_state/2 as a wrapper that calls initial_state/3 with undefined, so Web STOMP needs no change. The plain STOMP reader calls initial_state/3 with Ref.
In rabbit_stomp_reader.erl:101, rabbit_networking:register_non_amqp_connection(self()) is called during init/1, before any CONNECT frame arrives. This is unlike MQTT where registration happens after auth. This has no effect on the limit check because we use ranch:info(RanchRef) not the non-AMQP connection registry.
process_connect/3 in rabbit_stomp_processor.erl uses a maybe chain (lines 329-407). Error cases are handled in an else block using the error/3 helper, which returns {error, Message, Detail, State}. process_request/2 catches this and calls send_error/3 to transmit the ERROR frame.
Existing precedent: check_vhost_connection_limit/1 returns {error, quota_exceeded}, which the else block maps to a "Bad CONNECT" ERROR frame. We use a distinct atom node_connection_limit_exceeded to produce a distinct error message.
ranch is listed in deps/rabbitmq_stomp/Makefile:33 (DEPS = ranch rabbit_common rabbit).
STOMP does not have port-to-vhost mapping or the equivalent multi-listener configurations that make ranch:info(RanchRef) incorrect for MQTT. The Ranch-based approach is therefore acceptable for STOMP. If STOMP gains similar multi-listener configurations in the future, a PG-scope-style fix would be needed, but that is not a concern today.
deps/rabbitmq_stomp/priv/schema/rabbitmq_stomp.schema
Add after the stomp.num_acceptors.tcp mapping (after line 158):
{mapping, "stomp.max_connections", "rabbitmq_stomp.max_connections",
[{datatype, [{atom, infinity}, integer]}, {validators, ["non_negative_integer"]}]}.
{translation, "rabbitmq_stomp.max_connections",
fun(Conf) ->
case cuttlefish:conf_get("stomp.max_connections", Conf, undefined) of
undefined -> cuttlefish:unset();
infinity -> infinity;
Val when is_integer(Val) -> Val;
_ -> cuttlefish:invalid("should be a non-negative integer")
end
end}.deps/rabbitmq_stomp/test/config_schema_SUITE_data/rabbitmq_stomp.snippets
The file ends with {max_frame_size_unauthenticated, ...} (no trailing comma) followed by ].. Add a comma to that entry and append:
{max_connections,
"stomp.max_connections = 10",
[{rabbitmq_stomp,[{max_connections, 10}]}],
[rabbitmq_stomp]}deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl
- Add
ranch_ref :: ranch:ref() | undefinedto#cfg{}record. - Add
initial_state/3to the-exportlist. - Change
initial_state/2to a one-line wrapper callinginitial_state/3withundefined. - Add
initial_state/3- same body as currentinitial_state/2but storesRanchRefin#cfg{ranch_ref = RanchRef}. - In
process_connect/3, bindranch_ref = RanchRefin the function head pattern match alongside the existingconn_infoandssl_login_namebindings, then add as the firstmaybecheck:ok ?= check_node_connection_limit(RanchRef). - Add
elseclause:{error, node_connection_limit_exceeded} -> error("Bad CONNECT", "Connection refused: node connection limit reached", State). - Add
check_node_connection_limit/1nearcheck_vhost_connection_limit/1:
check_node_connection_limit(undefined) ->
ok;
check_node_connection_limit(RanchRef) ->
case application:get_env(rabbitmq_stomp, max_connections, infinity) of
infinity ->
ok;
Limit when is_integer(Limit), Limit >= 0 ->
#{active_connections := ActiveConns} = ranch:info(RanchRef),
case ActiveConns > Limit of
false ->
ok;
true ->
?LOG_ERROR("STOMP connection failed: node connection limit ~tp is reached",
[Limit]),
{error, node_connection_limit_exceeded}
end;
_ ->
ok
end.deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl
Change line 83-84 from rabbit_stomp_processor:initial_state(Configuration, ProcInitArgs) to rabbit_stomp_processor:initial_state(Configuration, ProcInitArgs, Ref). Ref is already in scope from line 64.
deps/rabbitmq_stomp/test/connections_SUITE.erl
Add node_connection_limit to all/0. Add test function:
node_connection_limit(Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,
[rabbitmq_stomp, max_connections, 0]),
StompPort = get_stomp_port(Config),
{ok, Sock} = gen_tcp:connect(localhost, StompPort, [{active, false}, binary]),
ConnectFrame = <<"CONNECT\nlogin:guest\npasscode:guest\naccept-version:1.2\n\n\0">>,
ok = gen_tcp:send(Sock, ConnectFrame),
{ok, Data} = gen_tcp:recv(Sock, 0, 5000),
{ok, Frame, _} = rabbit_stomp_frame:parse(Data, rabbit_stomp_frame:initial_state()),
'ERROR' = Frame#stomp_frame.command,
gen_tcp:close(Sock),
rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,
[rabbitmq_stomp, max_connections, infinity]).deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl- keeps callinginitial_state/2deps/rabbit/src/tcp_listener_sup.erl- not modifieddeps/rabbitmq_stomp/test/config_schema_SUITE.erl- snippets file drives this test automatically
ranch:info(RanchRef)counts the current connection inactive_connectionsbecauseranch:handshake/1(called insiderabbit_networking:handshake/2) has completed by the timeprocess_connect/3runscheck_node_connection_limit(undefined)returnsokimmediately, ensuring Web STOMP is unaffected- The
non_negative_integervalidator fromrabbit.schemais globally available