|
@@ -85,8 +85,8 @@
|
|
|
idle_timer :: maybe(reference()),
|
|
idle_timer :: maybe(reference()),
|
|
|
%% Zone name
|
|
%% Zone name
|
|
|
zone :: atom(),
|
|
zone :: atom(),
|
|
|
- %% Listener Name
|
|
|
|
|
- listener :: atom()
|
|
|
|
|
|
|
+ %% Listener Type and Name
|
|
|
|
|
+ listener :: {Type::atom(), Name::atom()}
|
|
|
}).
|
|
}).
|
|
|
|
|
|
|
|
-type(state() :: #state{}).
|
|
-type(state() :: #state{}).
|
|
@@ -173,12 +173,12 @@ call(WsPid, Req, Timeout) when is_pid(WsPid) ->
|
|
|
%% WebSocket callbacks
|
|
%% WebSocket callbacks
|
|
|
%%--------------------------------------------------------------------
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
-init(Req, #{zone := Zone, listener := Listener} = Opts) ->
|
|
|
|
|
|
|
+init(Req, #{listener := {Type, Listener}} = Opts) ->
|
|
|
%% WS Transport Idle Timeout
|
|
%% WS Transport Idle Timeout
|
|
|
- WsOpts = #{compress => get_ws_opts(Zone, Listener, compress),
|
|
|
|
|
- deflate_opts => get_ws_opts(Zone, Listener, deflate_opts),
|
|
|
|
|
- max_frame_size => get_ws_opts(Zone, Listener, max_frame_size),
|
|
|
|
|
- idle_timeout => get_ws_opts(Zone, Listener, idle_timeout)
|
|
|
|
|
|
|
+ WsOpts = #{compress => get_ws_opts(Type, Listener, compress),
|
|
|
|
|
+ deflate_opts => get_ws_opts(Type, Listener, deflate_opts),
|
|
|
|
|
+ max_frame_size => get_ws_opts(Type, Listener, max_frame_size),
|
|
|
|
|
+ idle_timeout => get_ws_opts(Type, Listener, idle_timeout)
|
|
|
},
|
|
},
|
|
|
case check_origin_header(Req, Opts) of
|
|
case check_origin_header(Req, Opts) of
|
|
|
{error, Message} ->
|
|
{error, Message} ->
|
|
@@ -187,17 +187,17 @@ init(Req, #{zone := Zone, listener := Listener} = Opts) ->
|
|
|
ok -> parse_sec_websocket_protocol(Req, Opts, WsOpts)
|
|
ok -> parse_sec_websocket_protocol(Req, Opts, WsOpts)
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-parse_sec_websocket_protocol(Req, #{zone := Zone, listener := Listener} = Opts, WsOpts) ->
|
|
|
|
|
|
|
+parse_sec_websocket_protocol(Req, #{listener := {Type, Listener}} = Opts, WsOpts) ->
|
|
|
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
|
|
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
|
|
|
undefined ->
|
|
undefined ->
|
|
|
- case get_ws_opts(Zone, Listener, fail_if_no_subprotocol) of
|
|
|
|
|
|
|
+ case get_ws_opts(Type, Listener, fail_if_no_subprotocol) of
|
|
|
true ->
|
|
true ->
|
|
|
{ok, cowboy_req:reply(400, Req), WsOpts};
|
|
{ok, cowboy_req:reply(400, Req), WsOpts};
|
|
|
false ->
|
|
false ->
|
|
|
{cowboy_websocket, Req, [Req, Opts], WsOpts}
|
|
{cowboy_websocket, Req, [Req, Opts], WsOpts}
|
|
|
end;
|
|
end;
|
|
|
Subprotocols ->
|
|
Subprotocols ->
|
|
|
- SupportedSubprotocols = get_ws_opts(Zone, Listener, supported_subprotocols),
|
|
|
|
|
|
|
+ SupportedSubprotocols = get_ws_opts(Type, Listener, supported_subprotocols),
|
|
|
NSupportedSubprotocols = [list_to_binary(Subprotocol)
|
|
NSupportedSubprotocols = [list_to_binary(Subprotocol)
|
|
|
|| Subprotocol <- SupportedSubprotocols],
|
|
|| Subprotocol <- SupportedSubprotocols],
|
|
|
case pick_subprotocol(Subprotocols, NSupportedSubprotocols) of
|
|
case pick_subprotocol(Subprotocols, NSupportedSubprotocols) of
|
|
@@ -221,29 +221,29 @@ pick_subprotocol([Subprotocol | Rest], SupportedSubprotocols) ->
|
|
|
pick_subprotocol(Rest, SupportedSubprotocols)
|
|
pick_subprotocol(Rest, SupportedSubprotocols)
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-parse_header_fun_origin(Req, #{zone := Zone, listener := Listener}) ->
|
|
|
|
|
|
|
+parse_header_fun_origin(Req, #{listener := {Type, Listener}}) ->
|
|
|
case cowboy_req:header(<<"origin">>, Req) of
|
|
case cowboy_req:header(<<"origin">>, Req) of
|
|
|
undefined ->
|
|
undefined ->
|
|
|
- case get_ws_opts(Zone, Listener, allow_origin_absence) of
|
|
|
|
|
|
|
+ case get_ws_opts(Type, Listener, allow_origin_absence) of
|
|
|
true -> ok;
|
|
true -> ok;
|
|
|
false -> {error, origin_header_cannot_be_absent}
|
|
false -> {error, origin_header_cannot_be_absent}
|
|
|
end;
|
|
end;
|
|
|
Value ->
|
|
Value ->
|
|
|
- case lists:member(Value, get_ws_opts(Zone, Listener, check_origins)) of
|
|
|
|
|
|
|
+ case lists:member(Value, get_ws_opts(Type, Listener, check_origins)) of
|
|
|
true -> ok;
|
|
true -> ok;
|
|
|
false -> {origin_not_allowed, Value}
|
|
false -> {origin_not_allowed, Value}
|
|
|
end
|
|
end
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-check_origin_header(Req, #{zone := Zone, listener := Listener} = Opts) ->
|
|
|
|
|
- case get_ws_opts(Zone, Listener, check_origin_enable) of
|
|
|
|
|
|
|
+check_origin_header(Req, #{listener := {Type, Listener}} = Opts) ->
|
|
|
|
|
+ case get_ws_opts(Type, Listener, check_origin_enable) of
|
|
|
true -> parse_header_fun_origin(Req, Opts);
|
|
true -> parse_header_fun_origin(Req, Opts);
|
|
|
false -> ok
|
|
false -> ok
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-websocket_init([Req, #{zone := Zone, listener := Listener} = Opts]) ->
|
|
|
|
|
|
|
+websocket_init([Req, #{zone := Zone, listener := {Type, Listener}} = Opts]) ->
|
|
|
{Peername, Peercert} =
|
|
{Peername, Peercert} =
|
|
|
- case emqx_config:get_listener_conf(Zone, Listener, [proxy_protocol]) andalso
|
|
|
|
|
|
|
+ case emqx_config:get_listener_conf(Type, Listener, [proxy_protocol]) andalso
|
|
|
maps:get(proxy_header, Req) of
|
|
maps:get(proxy_header, Req) of
|
|
|
#{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} ->
|
|
#{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} ->
|
|
|
SourceName = {SrcAddr, SrcPort},
|
|
SourceName = {SrcAddr, SrcPort},
|
|
@@ -278,7 +278,7 @@ websocket_init([Req, #{zone := Zone, listener := Listener} = Opts]) ->
|
|
|
conn_mod => ?MODULE
|
|
conn_mod => ?MODULE
|
|
|
},
|
|
},
|
|
|
Limiter = emqx_limiter:init(Zone, undefined, undefined, []),
|
|
Limiter = emqx_limiter:init(Zone, undefined, undefined, []),
|
|
|
- MQTTPiggyback = get_ws_opts(Zone, Listener, mqtt_piggyback),
|
|
|
|
|
|
|
+ MQTTPiggyback = get_ws_opts(Type, Listener, mqtt_piggyback),
|
|
|
FrameOpts = #{
|
|
FrameOpts = #{
|
|
|
strict_mode => emqx_config:get_zone_conf(Zone, [mqtt, strict_mode]),
|
|
strict_mode => emqx_config:get_zone_conf(Zone, [mqtt, strict_mode]),
|
|
|
max_size => emqx_config:get_zone_conf(Zone, [mqtt, max_packet_size])
|
|
max_size => emqx_config:get_zone_conf(Zone, [mqtt, max_packet_size])
|
|
@@ -317,7 +317,7 @@ websocket_init([Req, #{zone := Zone, listener := Listener} = Opts]) ->
|
|
|
idle_timeout = IdleTimeout,
|
|
idle_timeout = IdleTimeout,
|
|
|
idle_timer = IdleTimer,
|
|
idle_timer = IdleTimer,
|
|
|
zone = Zone,
|
|
zone = Zone,
|
|
|
- listener = Listener
|
|
|
|
|
|
|
+ listener = {Type, Listener}
|
|
|
}, hibernate}.
|
|
}, hibernate}.
|
|
|
|
|
|
|
|
websocket_handle({binary, Data}, State) when is_list(Data) ->
|
|
websocket_handle({binary, Data}, State) when is_list(Data) ->
|
|
@@ -370,8 +370,8 @@ websocket_info({check_gc, Stats}, State) ->
|
|
|
return(check_oom(run_gc(Stats, State)));
|
|
return(check_oom(run_gc(Stats, State)));
|
|
|
|
|
|
|
|
websocket_info(Deliver = {deliver, _Topic, _Msg},
|
|
websocket_info(Deliver = {deliver, _Topic, _Msg},
|
|
|
- State = #state{zone = Zone, listener = Listener}) ->
|
|
|
|
|
- ActiveN = emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]),
|
|
|
|
|
|
|
+ State = #state{listener = {Type, Listener}}) ->
|
|
|
|
|
+ ActiveN = get_active_n(Type, Listener),
|
|
|
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
|
|
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
|
|
|
with_channel(handle_deliver, [Delivers], State);
|
|
with_channel(handle_deliver, [Delivers], State);
|
|
|
|
|
|
|
@@ -558,12 +558,12 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) ->
|
|
|
%% Handle incoming packet
|
|
%% Handle incoming packet
|
|
|
%%--------------------------------------------------------------------
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
-handle_incoming(Packet, State = #state{zone = Zone, listener = Listener})
|
|
|
|
|
|
|
+handle_incoming(Packet, State = #state{listener = {Type, Listener}})
|
|
|
when is_record(Packet, mqtt_packet) ->
|
|
when is_record(Packet, mqtt_packet) ->
|
|
|
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
|
|
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
|
|
|
ok = inc_incoming_stats(Packet),
|
|
ok = inc_incoming_stats(Packet),
|
|
|
NState = case emqx_pd:get_counter(incoming_pubs) >
|
|
NState = case emqx_pd:get_counter(incoming_pubs) >
|
|
|
- emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of
|
|
|
|
|
|
|
+ get_active_n(Type, Listener) of
|
|
|
true -> postpone({cast, rate_limit}, State);
|
|
true -> postpone({cast, rate_limit}, State);
|
|
|
false -> State
|
|
false -> State
|
|
|
end,
|
|
end,
|
|
@@ -595,12 +595,12 @@ with_channel(Fun, Args, State = #state{channel = Channel}) ->
|
|
|
%%--------------------------------------------------------------------
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
handle_outgoing(Packets, State = #state{mqtt_piggyback = MQTTPiggyback,
|
|
handle_outgoing(Packets, State = #state{mqtt_piggyback = MQTTPiggyback,
|
|
|
- zone = Zone, listener = Listener}) ->
|
|
|
|
|
|
|
+ listener = {Type, Listener}}) ->
|
|
|
IoData = lists:map(serialize_and_inc_stats_fun(State), Packets),
|
|
IoData = lists:map(serialize_and_inc_stats_fun(State), Packets),
|
|
|
Oct = iolist_size(IoData),
|
|
Oct = iolist_size(IoData),
|
|
|
ok = inc_sent_stats(length(Packets), Oct),
|
|
ok = inc_sent_stats(length(Packets), Oct),
|
|
|
NState = case emqx_pd:get_counter(outgoing_pubs) >
|
|
NState = case emqx_pd:get_counter(outgoing_pubs) >
|
|
|
- emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of
|
|
|
|
|
|
|
+ get_active_n(Type, Listener) of
|
|
|
true ->
|
|
true ->
|
|
|
Stats = #{cnt => emqx_pd:reset_counter(outgoing_pubs),
|
|
Stats = #{cnt => emqx_pd:reset_counter(outgoing_pubs),
|
|
|
oct => emqx_pd:reset_counter(outgoing_bytes)
|
|
oct => emqx_pd:reset_counter(outgoing_bytes)
|
|
@@ -749,10 +749,10 @@ classify([Event|More], Packets, Cmds, Events) ->
|
|
|
|
|
|
|
|
trigger(Event) -> erlang:send(self(), Event).
|
|
trigger(Event) -> erlang:send(self(), Event).
|
|
|
|
|
|
|
|
-get_peer(Req, #{zone := Zone, listener := Listener}) ->
|
|
|
|
|
|
|
+get_peer(Req, #{listener := {Type, Listener}}) ->
|
|
|
{PeerAddr, PeerPort} = cowboy_req:peer(Req),
|
|
{PeerAddr, PeerPort} = cowboy_req:peer(Req),
|
|
|
AddrHeader = cowboy_req:header(
|
|
AddrHeader = cowboy_req:header(
|
|
|
- get_ws_opts(Zone, Listener, proxy_address_header), Req, <<>>),
|
|
|
|
|
|
|
+ get_ws_opts(Type, Listener, proxy_address_header), Req, <<>>),
|
|
|
ClientAddr = case string:tokens(binary_to_list(AddrHeader), ", ") of
|
|
ClientAddr = case string:tokens(binary_to_list(AddrHeader), ", ") of
|
|
|
[] ->
|
|
[] ->
|
|
|
undefined;
|
|
undefined;
|
|
@@ -766,7 +766,7 @@ get_peer(Req, #{zone := Zone, listener := Listener}) ->
|
|
|
PeerAddr
|
|
PeerAddr
|
|
|
end,
|
|
end,
|
|
|
PortHeader = cowboy_req:header(
|
|
PortHeader = cowboy_req:header(
|
|
|
- get_ws_opts(Zone, Listener, proxy_port_header), Req, <<>>),
|
|
|
|
|
|
|
+ get_ws_opts(Type, Listener, proxy_port_header), Req, <<>>),
|
|
|
ClientPort = case string:tokens(binary_to_list(PortHeader), ", ") of
|
|
ClientPort = case string:tokens(binary_to_list(PortHeader), ", ") of
|
|
|
[] ->
|
|
[] ->
|
|
|
undefined;
|
|
undefined;
|
|
@@ -787,5 +787,8 @@ set_field(Name, Value, State) ->
|
|
|
Pos = emqx_misc:index_of(Name, record_info(fields, state)),
|
|
Pos = emqx_misc:index_of(Name, record_info(fields, state)),
|
|
|
setelement(Pos+1, State, Value).
|
|
setelement(Pos+1, State, Value).
|
|
|
|
|
|
|
|
-get_ws_opts(Zone, Listener, Key) ->
|
|
|
|
|
- emqx_config:get_listener_conf(Zone, Listener, [websocket, Key]).
|
|
|
|
|
|
|
+get_ws_opts(Type, Listener, Key) ->
|
|
|
|
|
+ emqx_config:get_listener_conf(Type, Listener, [websocket, Key]).
|
|
|
|
|
+
|
|
|
|
|
+get_active_n(Type, Listener) ->
|
|
|
|
|
+ emqx_config:get_listener_conf(Type, Listener, [tcp, active_n]).
|