Procházet zdrojové kódy

feat(emqx_ws_connection): check http header to know real IP/port

z8674558 před 4 roky
rodič
revize
a48e7df4f5
4 změnil soubory, kde provedl 92 přidání a 1 odebrání
  1. 24 0
      etc/emqx.conf
  2. 22 0
      priv/emqx.schema
  3. 29 1
      src/emqx_ws_connection.erl
  4. 17 0
      test/emqx_ws_connection_SUITE.erl

+ 24 - 0
etc/emqx.conf

@@ -1608,6 +1608,18 @@ listener.ws.external.access.1 = allow all
 ## Default: mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5
 ## listener.ws.external.supported_subprotocols = mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5
 
+## Specify which HTTP header for real source IP if the EMQ X cluster is
+## deployed behind NGINX or HAProxy.
+##
+## Default: X-Forwarded-For
+## listener.ws.external.proxy_address_header = X-Forwarded-For
+
+## Specify which HTTP header for real source port if the EMQ X cluster is
+## deployed behind NGINX or HAProxy.
+##
+## Default: X-Forwarded-Port
+## listener.ws.external.proxy_address_header = X-Forwarded-Port
+
 ## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind
 ## HAProxy or Nginx.
 ##
@@ -1851,6 +1863,18 @@ listener.wss.external.access.1 = allow all
 ## Default: mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5
 ## listener.wss.external.supported_subprotocols = mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5
 
+## Specify which HTTP header for real source IP if the EMQ X cluster is
+## deployed behind NGINX or HAProxy.
+##
+## Default: X-Forwarded-For
+## listener.wss.external.proxy_address_header = X-Forwarded-For
+
+## Specify which HTTP header for real source port if the EMQ X cluster is
+## deployed behind NGINX or HAProxy.
+##
+## Default: X-Forwarded-Port
+## listener.wss.external.proxy_port_header = X-Forwarded-Port
+
 ## Enable the Proxy Protocol V1/2 support.
 ##
 ## See: listener.tcp.$name.proxy_protocol

+ 22 - 0
priv/emqx.schema

@@ -1531,6 +1531,16 @@ end}.
   {datatype, string}
 ]}.
 
+{mapping, "listener.ws.$name.proxy_address_header", "emqx.listeners", [
+  {default, "X-Forwarded-For"},
+  {datatype, string}
+]}.
+
+{mapping, "listener.ws.$name.proxy_port_header", "emqx.listeners", [
+  {default, "X-Forwarded-Port"},
+  {datatype, string}
+]}.
+
 {mapping, "listener.ws.$name.proxy_protocol", "emqx.listeners", [
   {datatype, flag}
 ]}.
@@ -1715,6 +1725,16 @@ end}.
   {datatype, string}
 ]}.
 
+{mapping, "listener.wss.$name.proxy_address_header", "emqx.listeners", [
+  {default, "X-Forwarded-For"},
+  {datatype, string}
+]}.
+
+{mapping, "listener.wss.$name.proxy_port_header", "emqx.listeners", [
+  {default, "X-Forwarded-Port"},
+  {datatype, string}
+]}.
+
 {mapping, "listener.wss.$name.proxy_protocol", "emqx.listeners", [
   {datatype, flag}
 ]}.
@@ -1967,6 +1987,8 @@ end}.
                           {zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))},
                           {rate_limit, RateLimit(cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined))},
                           {proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)},
+                          {proxy_address_header, list_to_binary(string:lowercase(cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, "")))},
+                          {proxy_port_header, list_to_binary(string:lowercase(cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, "")))},
                           {proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)},
                           {fail_if_no_subprotocol, cuttlefish:conf_get(Prefix ++ ".fail_if_no_subprotocol", Conf, undefined)},
                           {supported_subprotocols, string:tokens(cuttlefish:conf_get(Prefix ++ ".supported_subprotocols", Conf, ""), ", ")},

+ 29 - 1
src/emqx_ws_connection.erl

@@ -253,7 +253,7 @@ websocket_init([Req, Opts]) ->
                    #{src_address := SrcAddr, src_port := SrcPort} ->
                        {SrcAddr, SrcPort};
                    _ ->
-                       cowboy_req:peer(Req)
+                       get_peer(Req, Opts)
                end,
     Sockname = cowboy_req:sock(Req),
     Peercert = cowboy_req:cert(Req),
@@ -725,6 +725,34 @@ classify([Event|More], Packets, Cmds, Events) ->
 
 trigger(Event) -> erlang:send(self(), Event).
 
+get_peer(Req, Opts) ->
+    {PeerAddr, PeerPort} = cowboy_req:peer(Req),
+    AddrHeader = cowboy_req:header(proplists:get_value(proxy_address_header, Opts), Req, <<>>),
+    ClientAddr = case string:tokens(binary_to_list(AddrHeader), ", ") of
+                     [] ->
+                         undefined;
+                     AddrList ->
+                         hd(AddrList)
+                 end,
+    Addr = case inet:parse_address(ClientAddr) of
+               {ok, A} ->
+                   A;
+               _ ->
+                   PeerAddr
+           end,
+    PortHeader = cowboy_req:header(proplists:get_value(proxy_port_header, Opts), Req, <<>>),
+    ClientPort = case string:tokens(binary_to_list(PortHeader), ", ") of
+                     [] ->
+                         undefined;
+                     PortList ->
+                         hd(PortList)
+                 end,
+    try
+        {Addr, list_to_integer(ClientPort)}
+    catch
+        _:_  -> {Addr, PeerPort}
+    end.
+
 %%--------------------------------------------------------------------
 %% For CT tests
 %%--------------------------------------------------------------------

+ 17 - 0
test/emqx_ws_connection_SUITE.erl

@@ -50,6 +50,7 @@ init_per_testcase(TestCase, Config) when
     ->
     %% Mock cowboy_req
     ok = meck:new(cowboy_req, [passthrough, no_history, no_link]),
+    ok = meck:expect(cowboy_req, header, fun(_, _, _) -> <<>> end),
     ok = meck:expect(cowboy_req, peer, fun(_) -> {{127,0,0,1}, 3456} end),
     ok = meck:expect(cowboy_req, sock, fun(_) -> {{127,0,0,1}, 18083} end),
     ok = meck:expect(cowboy_req, cert, fun(_) -> undefined end),
@@ -123,6 +124,22 @@ t_info(_) ->
       sockstate := running
      } = SockInfo.
 
+t_header(_) ->
+    ok = meck:expect(cowboy_req, header, fun(<<"x-forwarded-for">>, _, _) -> <<"100.100.100.100, 99.99.99.99">>;
+                                            (<<"x-forwarded-port">>, _, _) -> <<"1000">> end),
+    {ok, St, _} = ?ws_conn:websocket_init([req, [{zone, external},
+                                                 {proxy_address_header, <<"x-forwarded-for">>},
+                                                 {proxy_port_header, <<"x-forwarded-port">>}]]),
+    WsPid = spawn(fun() ->
+        receive {call, From, info} ->
+            gen_server:reply(From, ?ws_conn:info(St))
+        end end),
+    #{sockinfo := SockInfo} = ?ws_conn:call(WsPid, info),
+    #{socktype  := ws,
+        peername  := {{100,100,100,100}, 1000},
+        sockstate := running
+    } = SockInfo.
+
 t_info_limiter(_) ->
     St = st(#{limiter => emqx_limiter:init(external, [])}),
     ?assertEqual(undefined, ?ws_conn:info(limiter, St)).