Browse Source

Merge pull request #14243 from HJianBo/fix-some-gateways-license-not-working

fix: ensure all gateways call the client.connect hook
JianBo He 1 year atrás
parent
commit
44ef842696

+ 34 - 0
apps/emqx_gateway/test/emqx_gateway_test_utils.erl

@@ -203,3 +203,37 @@ sn_client_disconnect(Socket) ->
     _ = emqx_sn_protocol_SUITE:send_disconnect_msg(Socket, undefined),
     gen_udp:close(Socket),
     ok.
+
+meck_emqx_hook_calls() ->
+    Self = self(),
+    ok = meck:new(emqx_hooks, [passthrough, no_history, no_link]),
+    ok = meck:expect(
+        emqx_hooks,
+        run,
+        fun(A1, A2) ->
+            Self ! {hook_call, A1},
+            meck:passthrough([A1, A2])
+        end
+    ),
+
+    ok = meck:expect(
+        emqx_hooks,
+        run_fold,
+        fun(A1, A2, A3) ->
+            Self ! {hook_call, A1},
+            meck:passthrough([A1, A2, A3])
+        end
+    ).
+
+collect_emqx_hooks_calls() ->
+    collect_emqx_hooks_calls([]).
+
+collect_emqx_hooks_calls(Acc) ->
+    receive
+        {hook_call, Args} ->
+            collect_emqx_hooks_calls([Args | Acc])
+    after 1000 ->
+        L = lists:reverse(Acc),
+        meck:unload(emqx_hooks),
+        L
+    end.

+ 7 - 0
apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl

@@ -145,6 +145,8 @@ restart_coap_with_connection_mode(Bool) ->
 
 t_connection(_) ->
     Action = fun(Channel) ->
+        emqx_gateway_test_utils:meck_emqx_hook_calls(),
+
         %% connection
         Token = connection(Channel),
 
@@ -154,6 +156,11 @@ t_connection(_) ->
             emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
         ),
 
+        ?assertMatch(
+            ['client.connect' | _],
+            emqx_gateway_test_utils:collect_emqx_hooks_calls()
+        ),
+
         %% heartbeat
         {ok, changed, _} = send_heartbeat(Token),
 

+ 117 - 58
apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl

@@ -347,67 +347,31 @@ handle_call(
         request_clientinfo => ClientInfo
     }),
     {reply, {error, ?RESP_PERMISSION_DENY, <<"Client socket disconnected">>}, Channel};
-handle_call(
-    {auth, ClientInfo0, Password},
-    _From,
-    Channel = #channel{
-        ctx = Ctx,
-        conninfo = ConnInfo,
-        clientinfo = ClientInfo
-    }
-) ->
-    ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo),
-    ConnInfo1 = enrich_conninfo(ClientInfo0, ConnInfo),
-
-    Channel1 = Channel#channel{
-        conninfo = ConnInfo1,
-        clientinfo = ClientInfo1
-    },
-
-    #{clientid := ClientId, username := Username} = ClientInfo1,
-
+handle_call({auth, ClientInfo0, Password}, _From, Channel) ->
+    ClientInfo1 = ClientInfo0#{password => Password},
     case
-        emqx_gateway_ctx:authenticate(
-            Ctx, ClientInfo1#{password => Password}
+        emqx_utils:pipeline(
+            [
+                fun enrich_conninfo/2,
+                fun run_conn_hooks/2,
+                fun enrich_clientinfo/2,
+                fun set_log_meta/2,
+                fun auth_connect/2
+            ],
+            ClientInfo1,
+            Channel#channel{conn_state = connecting}
         )
     of
-        {ok, NClientInfo} ->
-            SessFun = fun(_, _) -> #{} end,
-            emqx_logger:set_metadata_clientid(ClientId),
-            case
-                emqx_gateway_ctx:open_session(
-                    Ctx,
-                    true,
-                    NClientInfo,
-                    ConnInfo1,
-                    SessFun
-                )
-            of
-                {ok, _Session} ->
-                    ?SLOG(debug, #{
-                        msg => "client_login_succeed",
-                        clientid => ClientId,
-                        username => Username
-                    }),
-                    {reply, ok, [{event, connected}],
-                        ensure_connected(Channel1#channel{clientinfo = NClientInfo})};
-                {error, Reason} ->
-                    ?SLOG(warning, #{
-                        msg => "client_login_failed",
-                        clientid => ClientId,
-                        username => Username,
-                        reason => Reason
-                    }),
-                    {reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel}
-            end;
-        {error, Reason} ->
+        {ok, _, NChannel} ->
+            process_connect(NChannel);
+        {error, Reason, NChannel} ->
             ?SLOG(warning, #{
                 msg => "client_login_failed",
-                clientid => ClientId,
-                username => Username,
+                clientid => maps:get(clientid, ClientInfo0, undefined),
+                username => maps:get(username, ClientInfo0, undefined),
                 reason => Reason
             }),
-            {reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel}
+            {reply, {error, ?RESP_PERMISSION_DENY, Reason}, NChannel}
     end;
 handle_call(
     {start_timer, keepalive, Interval},
@@ -706,6 +670,10 @@ run_hooks(Ctx, Name, Args) ->
     emqx_gateway_ctx:metrics_inc(Ctx, Name),
     emqx_hooks:run(Name, Args).
 
+run_hooks(Ctx, Name, Args, Acc) ->
+    emqx_gateway_ctx:metrics_inc(Ctx, Name),
+    emqx_hooks:run_fold(Name, Args, Acc).
+
 metrics_inc(Ctx, Name) ->
     emqx_gateway_ctx:metrics_inc(Ctx, Name).
 
@@ -765,11 +733,22 @@ dispatch(FunName, Req, Channel = #channel{gcli = GClient}) ->
 %% Format
 %%--------------------------------------------------------------------
 
-enrich_conninfo(InClientInfo, ConnInfo) ->
+enrich_conninfo(
+    InClientInfo,
+    Channel = #channel{
+        conninfo = ConnInfo
+    }
+) ->
     Ks = [proto_name, proto_ver, clientid, username],
-    maps:merge(ConnInfo, maps:with(Ks, InClientInfo)).
+    NConnInfo = maps:merge(ConnInfo, maps:with(Ks, InClientInfo)),
+    {ok, Channel#channel{conninfo = NConnInfo}}.
 
-enrich_clientinfo(InClientInfo = #{proto_name := ProtoName}, ClientInfo) ->
+enrich_clientinfo(
+    InClientInfo = #{proto_name := ProtoName},
+    Channel = #channel{
+        clientinfo = ClientInfo
+    }
+) ->
     Ks = [clientid, username],
     case maps:get(mountpoint, InClientInfo, <<>>) of
         <<>> ->
@@ -788,7 +767,87 @@ enrich_clientinfo(InClientInfo = #{proto_name := ProtoName}, ClientInfo) ->
             )
     end,
     NClientInfo = maps:merge(ClientInfo, maps:with(Ks, InClientInfo)),
-    NClientInfo#{protocol => proto_name_to_protocol(ProtoName)}.
+    NClientInfo1 = NClientInfo#{protocol => proto_name_to_protocol(ProtoName)},
+    {ok, Channel#channel{clientinfo = NClientInfo1}}.
+
+set_log_meta(_InClientInfo, #channel{clientinfo = #{clientid := ClientId}}) ->
+    emqx_logger:set_metadata_clientid(ClientId),
+    ok.
+
+run_conn_hooks(
+    InClientInfo,
+    Channel = #channel{
+        ctx = Ctx,
+        conninfo = ConnInfo
+    }
+) ->
+    ConnProps = #{},
+    case run_hooks(Ctx, 'client.connect', [ConnInfo], ConnProps) of
+        Error = {error, _Reason} -> Error;
+        _NConnProps -> {ok, InClientInfo, Channel}
+    end.
+
+auth_connect(
+    _InClientInfo = #{password := Password},
+    Channel = #channel{
+        ctx = Ctx,
+        clientinfo = ClientInfo
+    }
+) ->
+    #{
+        clientid := ClientId,
+        username := Username
+    } = ClientInfo,
+    case emqx_gateway_ctx:authenticate(Ctx, ClientInfo#{password => Password}) of
+        {ok, NClientInfo} ->
+            {ok, Channel#channel{clientinfo = NClientInfo}};
+        {error, Reason} ->
+            ?SLOG(warning, #{
+                msg => "client_login_failed",
+                clientid => ClientId,
+                username => Username,
+                reason => Reason
+            }),
+            {error, Reason}
+    end.
+
+process_connect(
+    Channel = #channel{
+        ctx = Ctx,
+        conninfo = ConnInfo,
+        clientinfo = ClientInfo
+    }
+) ->
+    #{
+        clientid := ClientId,
+        username := Username
+    } = ClientInfo,
+    SessFun = fun(_, _) -> #{} end,
+    case
+        emqx_gateway_ctx:open_session(
+            Ctx,
+            true,
+            ClientInfo,
+            ConnInfo,
+            SessFun
+        )
+    of
+        {ok, _Session} ->
+            ?SLOG(debug, #{
+                msg => "client_login_succeed",
+                clientid => ClientId,
+                username => Username
+            }),
+            {reply, ok, [{event, connected}], ensure_connected(Channel)};
+        {error, Reason} ->
+            ?SLOG(warning, #{
+                msg => "client_login_failed",
+                clientid => ClientId,
+                username => Username,
+                reason => Reason
+            }),
+            {reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel}
+    end.
 
 default_conninfo(ConnInfo) ->
     ConnInfo#{

+ 1 - 1
apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_exproto, [
     {description, "ExProto Gateway"},
-    {vsn, "0.1.14"},
+    {vsn, "0.1.15"},
     {registered, []},
     {applications, [kernel, stdlib, grpc, emqx, emqx_gateway]},
     {env, []},

+ 12 - 0
apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl

@@ -440,12 +440,19 @@ t_hook_connected_disconnected(Cfg) ->
     ConnAckBin = frame_connack(0),
 
     Parent = self(),
+    emqx_hooks:add('client.connect', {?MODULE, hook_fun0, [Parent]}, 1000),
     emqx_hooks:add('client.connected', {?MODULE, hook_fun1, [Parent]}, 1000),
     emqx_hooks:add('client.disconnected', {?MODULE, hook_fun2, [Parent]}, 1000),
 
     send(Sock, ConnBin),
     {ok, ConnAckBin} = recv(Sock, 5000),
 
+    receive
+        connect -> ok
+    after 1000 ->
+        error(hook_is_not_running)
+    end,
+
     receive
         connected -> ok
     after 1000 ->
@@ -465,6 +472,7 @@ t_hook_connected_disconnected(Cfg) ->
         begin
             {error, closed} = recv(Sock, 5000)
         end,
+    emqx_hooks:del('client.connect', {?MODULE, hook_fun0}),
     emqx_hooks:del('client.connected', {?MODULE, hook_fun1}),
     emqx_hooks:del('client.disconnected', {?MODULE, hook_fun2}).
 
@@ -594,6 +602,10 @@ t_idle_timeout(Cfg) ->
 %%--------------------------------------------------------------------
 %% Utils
 
+hook_fun0(_, _, Parent) ->
+    Parent ! connect,
+    ok.
+
 hook_fun1(_, _, Parent) ->
     Parent ! connected,
     ok.

+ 1 - 1
apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_gbt32960, [
     {description, "GBT32960 Gateway"},
-    {vsn, "0.1.7"},
+    {vsn, "0.1.8"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 19 - 1
apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl

@@ -219,8 +219,9 @@ handle_in(
     case
         emqx_utils:pipeline(
             [
-                fun enrich_clientinfo/2,
                 fun enrich_conninfo/2,
+                fun run_conn_hooks/2,
+                fun enrich_clientinfo/2,
                 fun set_log_meta/2,
                 %% TODO: How to implement the banned in the gateway instance?
                 %, fun check_banned/2
@@ -567,6 +568,19 @@ enrich_conninfo(
     },
     {ok, Channel#channel{conninfo = NConnInfo}}.
 
+run_conn_hooks(
+    Packet,
+    Channel = #channel{
+        ctx = Ctx,
+        conninfo = ConnInfo
+    }
+) ->
+    ConnProps = #{},
+    case run_hooks(Ctx, 'client.connect', [ConnInfo], ConnProps) of
+        Error = {error, _Reason} -> Error;
+        _NConnProps -> {ok, Packet, Channel}
+    end.
+
 set_log_meta(_Packet, #channel{clientinfo = #{clientid := ClientId}}) ->
     emqx_logger:set_metadata_clientid(ClientId),
     ok.
@@ -692,6 +706,10 @@ run_hooks(Ctx, Name, Args) ->
     emqx_gateway_ctx:metrics_inc(Ctx, Name),
     emqx_hooks:run(Name, Args).
 
+run_hooks(Ctx, Name, Args, Acc) ->
+    emqx_gateway_ctx:metrics_inc(Ctx, Name),
+    emqx_hooks:run_fold(Name, Args, Acc).
+
 reply(Reply, Channel) ->
     {reply, Reply, Channel}.
 

+ 7 - 0
apps/emqx_gateway_gbt32960/test/emqx_gbt32960_SUITE.erl

@@ -161,8 +161,15 @@ login_first() ->
     {ok, Socket}.
 
 t_case01_login(_Config) ->
+    emqx_gateway_test_utils:meck_emqx_hook_calls(),
     % send VEHICLE LOGIN
     {ok, Socket} = login_first(),
+
+    ?assertMatch(
+        ['client.connect' | _],
+        emqx_gateway_test_utils:collect_emqx_hooks_calls()
+    ),
+
     ok = gen_tcp:close(Socket).
 
 t_case01_login_channel_info(_Config) ->

+ 1 - 1
apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_jt808, [
     {description, "JT/T 808 Gateway"},
-    {vsn, "0.1.2"},
+    {vsn, "0.1.3"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 20 - 2
apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl

@@ -254,8 +254,8 @@ do_handle_in(Frame = ?MSG(?MC_REGISTER), Channel0) ->
     case
         emqx_utils:pipeline(
             [
-                fun enrich_clientinfo/2,
                 fun enrich_conninfo/2,
+                fun enrich_clientinfo/2,
                 fun set_log_meta/2
             ],
             Frame,
@@ -275,8 +275,9 @@ do_handle_in(Frame = ?MSG(?MC_AUTH), Channel0) ->
     case
         emqx_utils:pipeline(
             [
-                fun enrich_clientinfo/2,
                 fun enrich_conninfo/2,
+                fun run_conn_hooks/2,
+                fun enrich_clientinfo/2,
                 fun set_log_meta/2
             ],
             Frame,
@@ -924,6 +925,19 @@ enrich_conninfo(
     },
     {ok, Channel#channel{conninfo = NConnInfo}}.
 
+run_conn_hooks(
+    Input,
+    Channel = #channel{
+        ctx = Ctx,
+        conninfo = ConnInfo
+    }
+) ->
+    ConnProps = #{},
+    case run_hooks(Ctx, 'client.connect', [ConnInfo], ConnProps) of
+        Error = {error, _Reason} -> Error;
+        _NConnProps -> {ok, Input, Channel}
+    end.
+
 %% Register
 enrich_clientinfo(
     #{
@@ -1007,6 +1021,10 @@ run_hooks(Ctx, Name, Args) ->
     emqx_gateway_ctx:metrics_inc(Ctx, Name),
     emqx_hooks:run(Name, Args).
 
+run_hooks(Ctx, Name, Args, Acc) ->
+    emqx_gateway_ctx:metrics_inc(Ctx, Name),
+    emqx_hooks:run_fold(Name, Args, Acc).
+
 discard_downlink_messages([], _Channel) ->
     ok;
 discard_downlink_messages(Messages, Channel) ->

+ 8 - 0
apps/emqx_gateway_jt808/test/emqx_jt808_SUITE.erl

@@ -359,10 +359,18 @@ t_case00_register(_) ->
     ok = gen_tcp:close(Socket).
 
 t_case01_auth(_) ->
+    emqx_gateway_test_utils:meck_emqx_hook_calls(),
+
     {ok, Socket} = gen_tcp:connect({127, 0, 0, 1}, ?PORT, [binary, {active, false}, {nodelay, true}]),
     {ok, AuthCode} = client_regi_procedure(Socket),
+
     ok = client_auth_procedure(Socket, AuthCode),
 
+    ?assertMatch(
+        ['client.connect' | _],
+        emqx_gateway_test_utils:collect_emqx_hooks_calls()
+    ),
+
     ok = gen_tcp:close(Socket).
 
 t_case02_anonymous_register_and_auth(_) ->

+ 10 - 2
apps/emqx_gateway_lwm2m/test/emqx_lwm2m_SUITE.erl

@@ -239,6 +239,8 @@ case01_register(Config) ->
     MsgId = 12,
     SubTopic = list_to_binary("lwm2m/" ++ Epn ++ "/dn/#"),
 
+    emqx_gateway_test_utils:meck_emqx_hook_calls(),
+
     test_send_coap_request(
         UdpSock,
         post,
@@ -251,7 +253,13 @@ case01_register(Config) ->
         MsgId
     ),
 
-    %% checkpoint 1 - response
+    %% checkpoint 1 - called client.connect hook
+    ?assertMatch(
+        ['client.connect' | _],
+        emqx_gateway_test_utils:collect_emqx_hooks_calls()
+    ),
+
+    %% checkpoint 2 - response
     #coap_message{type = Type, method = Method, id = RspId, options = Opts} =
         test_recv_coap_response(UdpSock),
     ack = Type,
@@ -260,7 +268,7 @@ case01_register(Config) ->
     Location = maps:get(location_path, Opts),
     ?assertNotEqual(undefined, Location),
 
-    %% checkpoint 2 - verify subscribed topics
+    %% checkpoint 3 - verify subscribed topics
     timer:sleep(100),
     ?LOGT("all topics: ~p", [test_mqtt_broker:get_subscrbied_topics()]),
     true = lists:member(SubTopic, test_mqtt_broker:get_subscrbied_topics()),

+ 9 - 0
apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl

@@ -206,8 +206,17 @@ t_connect(_) ->
     SockName = {'mqttsn:udp:default', 1884},
     ?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())),
 
+    emqx_gateway_test_utils:meck_emqx_hook_calls(),
+
     {ok, Socket} = gen_udp:open(0, [binary]),
     send_connect_msg(Socket, <<"client_id_test1">>),
+
+    %% assert: client.connect hook is called
+    ?assertMatch(
+        ['client.connect' | _],
+        emqx_gateway_test_utils:collect_emqx_hooks_calls()
+    ),
+
     ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
 
     send_disconnect_msg(Socket, undefined),

+ 7 - 0
apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl

@@ -107,6 +107,8 @@ restart_stomp_with_mountpoint(Mountpoint) ->
 t_connect(_) ->
     %% Successful connect
     ConnectSucced = fun(Sock) ->
+        emqx_gateway_test_utils:meck_emqx_hook_calls(),
+
         ok = send_connection_frame(Sock, <<"guest">>, <<"guest">>, <<"1000,2000">>),
         {ok, Frame} = recv_a_frame(Sock),
         ?assertMatch(<<"CONNECTED">>, Frame#stomp_frame.command),
@@ -114,6 +116,11 @@ t_connect(_) ->
             <<"2000,1000">>, proplists:get_value(<<"heart-beat">>, Frame#stomp_frame.headers)
         ),
 
+        ?assertMatch(
+            ['client.connect' | _],
+            emqx_gateway_test_utils:collect_emqx_hooks_calls()
+        ),
+
         ok = send_disconnect_frame(Sock, <<"12345">>),
         ?assertMatch(
             {ok, #stomp_frame{

+ 1 - 0
changes/ce/fix-14243.en.md

@@ -0,0 +1 @@
+Fixed `client.connect` hook not triggering for some gateways.