Преглед изворни кода

feat(authn): add connection expire based on authn data

Ilya Averyanov пре 1 година
родитељ
комит
80d724c504
33 измењених фајлова са 635 додато и 79 уклоњено
  1. 29 8
      apps/emqx/src/emqx_channel.erl
  2. 1 0
      apps/emqx/test/emqx_channel_SUITE.erl
  3. 54 0
      apps/emqx/test/emqx_connection_expire_SUITE.erl
  4. 1 0
      apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl
  5. 35 16
      apps/emqx_auth_jwt/src/emqx_authn_jwt.erl
  6. 6 0
      apps/emqx_auth_jwt/src/emqx_authn_jwt_schema.erl
  7. 9 5
      apps/emqx_auth_jwt/test/emqx_authn_jwt_SUITE.erl
  8. 93 0
      apps/emqx_auth_jwt/test/emqx_authn_jwt_expire_SUITE.erl
  9. 3 2
      apps/emqx_auth_jwt/test/emqx_authz_jwt_SUITE.erl
  10. 1 1
      apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl
  11. 19 3
      apps/emqx_gateway/src/emqx_gateway_ctx.erl
  12. 1 1
      apps/emqx_gateway/test/emqx_gateway_ctx_SUITE.erl
  13. 11 1
      apps/emqx_gateway_coap/src/emqx_coap_channel.erl
  14. 1 1
      apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src
  15. 36 0
      apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl
  16. 13 2
      apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl
  17. 1 1
      apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src
  18. 43 6
      apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl
  19. 1 1
      apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src
  20. 19 3
      apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl
  21. 31 0
      apps/emqx_gateway_gbt32960/test/emqx_gbt32960_SUITE.erl
  22. 1 1
      apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src
  23. 12 2
      apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl
  24. 41 0
      apps/emqx_gateway_lwm2m/test/emqx_lwm2m_SUITE.erl
  25. 13 2
      apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl
  26. 39 0
      apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl
  27. 20 16
      apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl
  28. 3 1
      apps/emqx_gateway_ocpp/src/emqx_ocpp_connection.erl
  29. 29 2
      apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl
  30. 16 4
      apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl
  31. 37 0
      apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl
  32. 10 0
      changes/ce/feat-12947.en.md
  33. 6 0
      rel/i18n/emqx_authn_jwt_schema.hocon

+ 29 - 8
apps/emqx/src/emqx_channel.erl

@@ -1075,7 +1075,7 @@ handle_out(disconnect, {ReasonCode, ReasonName, Props}, Channel = ?IS_MQTT_V5) -
     Packet = ?DISCONNECT_PACKET(ReasonCode, Props),
     {ok, [?REPLY_OUTGOING(Packet), ?REPLY_CLOSE(ReasonName)], Channel};
 handle_out(disconnect, {_ReasonCode, ReasonName, _Props}, Channel) ->
-    {ok, {close, ReasonName}, Channel};
+    {ok, ?REPLY_CLOSE(ReasonName), Channel};
 handle_out(auth, {ReasonCode, Properties}, Channel) ->
     {ok, ?AUTH_PACKET(ReasonCode, Properties), Channel};
 handle_out(Type, Data, Channel) ->
@@ -1406,6 +1406,16 @@ handle_timeout(
         {_, Quota2} ->
             {ok, clean_timer(TimerName, Channel#channel{quota = Quota2})}
     end;
+handle_timeout(
+    _TRef,
+    connection_expire,
+    #channel{conn_state = ConnState} = Channel0
+) ->
+    Channel1 = clean_timer(connection_expire, Channel0),
+    case ConnState of
+        disconnected -> {ok, Channel1};
+        _ -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel1)
+    end;
 handle_timeout(TRef, Msg, Channel) ->
     case emqx_hooks:run_fold('client.timeout', [TRef, Msg], []) of
         [] ->
@@ -1810,18 +1820,23 @@ log_auth_failure(Reason) ->
 %% Merge authentication result into ClientInfo
 %% Authentication result may include:
 %% 1. `is_superuser': The superuser flag from various backends
-%% 2. `acl': ACL rules from JWT, HTTP auth backend
-%% 3. `client_attrs': Extra client attributes from JWT, HTTP auth backend
-%% 4. Maybe more non-standard fields used by hook callbacks
+%% 2. `expire_at`: Authentication validity deadline, the client will be disconnected after this time
+%% 3. `acl': ACL rules from JWT, HTTP auth backend
+%% 4. `client_attrs': Extra client attributes from JWT, HTTP auth backend
+%% 5. Maybe more non-standard fields used by hook callbacks
 merge_auth_result(ClientInfo, AuthResult0) when is_map(ClientInfo) andalso is_map(AuthResult0) ->
     IsSuperuser = maps:get(is_superuser, AuthResult0, false),
-    AuthResult = maps:without([client_attrs], AuthResult0),
+    ExpireAt = maps:get(expire_at, AuthResult0, undefined),
+    AuthResult = maps:without([client_attrs, expire_at], AuthResult0),
     Attrs0 = maps:get(client_attrs, ClientInfo, #{}),
     Attrs1 = maps:get(client_attrs, AuthResult0, #{}),
     Attrs = maps:merge(Attrs0, Attrs1),
     NewClientInfo = maps:merge(
         ClientInfo#{client_attrs => Attrs},
-        AuthResult#{is_superuser => IsSuperuser}
+        AuthResult#{
+            is_superuser => IsSuperuser,
+            auth_expire_at => ExpireAt
+        }
     ),
     fix_mountpoint(NewClientInfo).
 
@@ -2228,10 +2243,16 @@ ensure_connected(
 ) ->
     NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
     ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
-    Channel#channel{
+    schedule_connection_expire(Channel#channel{
         conninfo = trim_conninfo(NConnInfo),
         conn_state = connected
-    }.
+    }).
+
+schedule_connection_expire(Channel = #channel{clientinfo = #{auth_expire_at := undefined}}) ->
+    Channel;
+schedule_connection_expire(Channel = #channel{clientinfo = #{auth_expire_at := ExpireAt}}) ->
+    Interval = max(0, ExpireAt - erlang:system_time(millisecond)),
+    ensure_timer(connection_expire, Interval, Channel).
 
 trim_conninfo(ConnInfo) ->
     maps:without(

+ 1 - 0
apps/emqx/test/emqx_channel_SUITE.erl

@@ -1061,6 +1061,7 @@ clientinfo(InitProps) ->
             clientid => <<"clientid">>,
             username => <<"username">>,
             is_superuser => false,
+            auth_expire_at => undefined,
             is_bridge => false,
             mountpoint => undefined
         },

+ 54 - 0
apps/emqx/test/emqx_connection_expire_SUITE.erl

@@ -0,0 +1,54 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_connection_expire_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+%%--------------------------------------------------------------------
+%% CT callbacks
+%%--------------------------------------------------------------------
+
+init_per_suite(Config) ->
+    Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
+    [{apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    emqx_cth_suite:stop(proplists:get_value(apps, Config)).
+
+t_disonnect_by_auth_info(_) ->
+    _ = process_flag(trap_exit, true),
+
+    _ = meck:new(emqx_access_control, [passthrough, no_history]),
+    _ = meck:expect(emqx_access_control, authenticate, fun(_) ->
+        {ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}}
+    end),
+
+    {ok, C} = emqtt:start_link([{proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C),
+
+    receive
+        {disconnected, ?RC_NOT_AUTHORIZED, #{}} -> ok
+    after 5000 ->
+        ct:fail("Client should be disconnected by timeout")
+    end.

+ 1 - 0
apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl

@@ -142,6 +142,7 @@ end).
 -type state() :: #{atom() => term()}.
 -type extra() :: #{
     is_superuser := boolean(),
+    expire_at => pos_integer(),
     atom() => term()
 }.
 -type user_info() :: #{

+ 35 - 16
apps/emqx_auth_jwt/src/emqx_authn_jwt.erl

@@ -76,6 +76,7 @@ authenticate(
     Credential,
     #{
         verify_claims := VerifyClaims0,
+        disconnect_after_expire := DisconnectAfterExpire,
         jwk := JWK,
         acl_claim_name := AclClaimName,
         from := From
@@ -84,11 +85,12 @@ authenticate(
     JWT = maps:get(From, Credential),
     JWKs = [JWK],
     VerifyClaims = replace_placeholder(VerifyClaims0, Credential),
-    verify(JWT, JWKs, VerifyClaims, AclClaimName);
+    verify(JWT, JWKs, VerifyClaims, AclClaimName, DisconnectAfterExpire);
 authenticate(
     Credential,
     #{
         verify_claims := VerifyClaims0,
+        disconnect_after_expire := DisconnectAfterExpire,
         jwk_resource := ResourceId,
         acl_claim_name := AclClaimName,
         from := From
@@ -104,7 +106,7 @@ authenticate(
         {ok, JWKs} ->
             JWT = maps:get(From, Credential),
             VerifyClaims = replace_placeholder(VerifyClaims0, Credential),
-            verify(JWT, JWKs, VerifyClaims, AclClaimName)
+            verify(JWT, JWKs, VerifyClaims, AclClaimName, DisconnectAfterExpire)
     end.
 
 destroy(#{jwk_resource := ResourceId}) ->
@@ -123,6 +125,7 @@ create2(#{
     secret := Secret0,
     secret_base64_encoded := Base64Encoded,
     verify_claims := VerifyClaims,
+    disconnect_after_expire := DisconnectAfterExpire,
     acl_claim_name := AclClaimName,
     from := From
 }) ->
@@ -134,6 +137,7 @@ create2(#{
             {ok, #{
                 jwk => JWK,
                 verify_claims => VerifyClaims,
+                disconnect_after_expire => DisconnectAfterExpire,
                 acl_claim_name => AclClaimName,
                 from => From
             }}
@@ -143,6 +147,7 @@ create2(#{
     algorithm := 'public-key',
     public_key := PublicKey,
     verify_claims := VerifyClaims,
+    disconnect_after_expire := DisconnectAfterExpire,
     acl_claim_name := AclClaimName,
     from := From
 }) ->
@@ -150,6 +155,7 @@ create2(#{
     {ok, #{
         jwk => JWK,
         verify_claims => VerifyClaims,
+        disconnect_after_expire => DisconnectAfterExpire,
         acl_claim_name => AclClaimName,
         from => From
     }};
@@ -157,6 +163,7 @@ create2(
     #{
         use_jwks := true,
         verify_claims := VerifyClaims,
+        disconnect_after_expire := DisconnectAfterExpire,
         acl_claim_name := AclClaimName,
         from := From
     } = Config
@@ -171,6 +178,7 @@ create2(
     {ok, #{
         jwk_resource => ResourceId,
         verify_claims => VerifyClaims,
+        disconnect_after_expire => DisconnectAfterExpire,
         acl_claim_name => AclClaimName,
         from => From
     }}.
@@ -214,23 +222,12 @@ replace_placeholder([{Name, {placeholder, PL}} | More], Variables, Acc) ->
 replace_placeholder([{Name, Value} | More], Variables, Acc) ->
     replace_placeholder(More, Variables, [{Name, Value} | Acc]).
 
-verify(undefined, _, _, _) ->
+verify(undefined, _, _, _, _) ->
     ignore;
-verify(JWT, JWKs, VerifyClaims, AclClaimName) ->
+verify(JWT, JWKs, VerifyClaims, AclClaimName, DisconnectAfterExpire) ->
     case do_verify(JWT, JWKs, VerifyClaims) of
         {ok, Extra} ->
-            IsSuperuser = emqx_authn_utils:is_superuser(Extra),
-            Attrs = emqx_authn_utils:client_attrs(Extra),
-            try
-                ACL = acl(Extra, AclClaimName),
-                Result = maps:merge(IsSuperuser, maps:merge(ACL, Attrs)),
-                {ok, Result}
-            catch
-                throw:{bad_acl_rule, Reason} ->
-                    %% it's a invalid token, so ok to log
-                    ?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{jwt => JWT}),
-                    {error, bad_username_or_password}
-            end;
+            extra_to_auth_data(Extra, JWT, AclClaimName, DisconnectAfterExpire);
         {error, {missing_claim, Claim}} ->
             %% it's a invalid token, so it's ok to log
             ?TRACE_AUTHN_PROVIDER("missing_jwt_claim", #{jwt => JWT, claim => Claim}),
@@ -245,6 +242,25 @@ verify(JWT, JWKs, VerifyClaims, AclClaimName) ->
             {error, bad_username_or_password}
     end.
 
+extra_to_auth_data(Extra, JWT, AclClaimName, DisconnectAfterExpire) ->
+    IsSuperuser = emqx_authn_utils:is_superuser(Extra),
+    Attrs = emqx_authn_utils:client_attrs(Extra),
+    ExpireAt = expire_at(DisconnectAfterExpire, Extra),
+    try
+        ACL = acl(Extra, AclClaimName),
+        Result = merge_maps([ExpireAt, IsSuperuser, ACL, Attrs]),
+        {ok, Result}
+    catch
+        throw:{bad_acl_rule, Reason} ->
+            %% it's a invalid token, so ok to log
+            ?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{jwt => JWT}),
+            {error, bad_username_or_password}
+    end.
+
+expire_at(false, _Extra) -> #{};
+expire_at(true, #{<<"exp">> := ExpireTime}) -> #{expire_at => ExpireTime};
+expire_at(true, #{}) -> #{}.
+
 acl(Claims, AclClaimName) ->
     case Claims of
         #{AclClaimName := Rules} ->
@@ -397,3 +413,6 @@ parse_rule(Rule) ->
         {error, Reason} ->
             throw({bad_acl_rule, Reason})
     end.
+
+merge_maps([]) -> #{};
+merge_maps([Map | Maps]) -> maps:merge(Map, merge_maps(Maps)).

+ 6 - 0
apps/emqx_auth_jwt/src/emqx_authn_jwt_schema.erl

@@ -122,6 +122,7 @@ common_fields() ->
             desc => ?DESC(acl_claim_name)
         }},
         {verify_claims, fun verify_claims/1},
+        {disconnect_after_expire, fun disconnect_after_expire/1},
         {from, fun from/1}
     ] ++ emqx_authn_schema:common_fields().
 
@@ -172,6 +173,11 @@ verify_claims(required) ->
 verify_claims(_) ->
     undefined.
 
+disconnect_after_expire(type) -> boolean();
+disconnect_after_expire(desc) -> ?DESC(?FUNCTION_NAME);
+disconnect_after_expire(default) -> true;
+disconnect_after_expire(_) -> undefined.
+
 do_check_verify_claims([]) ->
     true;
 do_check_verify_claims([{Name, Expected} | More]) ->

+ 9 - 5
apps/emqx_auth_jwt/test/emqx_authn_jwt_SUITE.erl

@@ -55,7 +55,8 @@ t_hmac_based(_) ->
         algorithm => 'hmac-based',
         secret => Secret,
         secret_base64_encoded => false,
-        verify_claims => [{<<"username">>, <<"${username}">>}]
+        verify_claims => [{<<"username">>, <<"${username}">>}],
+        disconnect_after_expire => false
     },
     {ok, State} = emqx_authn_jwt:create(?AUTHN_ID, Config),
 
@@ -179,7 +180,8 @@ t_public_key(_) ->
         use_jwks => false,
         algorithm => 'public-key',
         public_key => PublicKey,
-        verify_claims => []
+        verify_claims => [],
+        disconnect_after_expire => false
     },
     {ok, State} = emqx_authn_jwt:create(?AUTHN_ID, Config),
 
@@ -207,7 +209,8 @@ t_jwt_in_username(_) ->
         algorithm => 'hmac-based',
         secret => Secret,
         secret_base64_encoded => false,
-        verify_claims => []
+        verify_claims => [],
+        disconnect_after_expire => false
     },
     {ok, State} = emqx_authn_jwt:create(?AUTHN_ID, Config),
 
@@ -238,7 +241,7 @@ t_jwks_renewal(_Config) ->
         algorithm => 'public-key',
         ssl => #{enable => false},
         verify_claims => [],
-
+        disconnect_after_expire => false,
         use_jwks => true,
         endpoint => "https://127.0.0.1:" ++ integer_to_list(?JWKS_PORT + 1) ++ ?JWKS_PATH,
         refresh_interval => 1000,
@@ -335,7 +338,8 @@ t_verify_claims(_) ->
         algorithm => 'hmac-based',
         secret => Secret,
         secret_base64_encoded => false,
-        verify_claims => [{<<"foo">>, <<"bar">>}]
+        verify_claims => [{<<"foo">>, <<"bar">>}],
+        disconnect_after_expire => false
     },
     {ok, State0} = emqx_authn_jwt:create(?AUTHN_ID, Config0),
 

+ 93 - 0
apps/emqx_auth_jwt/test/emqx_authn_jwt_expire_SUITE.erl

@@ -0,0 +1,93 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_authn_jwt_expire_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx_auth/include/emqx_authn.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-define(PATH, [authentication]).
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_testcase(_, Config) ->
+    _ = emqx_authn_test_lib:delete_authenticators(?PATH, ?GLOBAL),
+    Config.
+
+end_per_testcase(_, _Config) ->
+    _ = emqx_authn_test_lib:delete_authenticators(?PATH, ?GLOBAL),
+    ok.
+
+init_per_suite(Config) ->
+    Apps = emqx_cth_suite:start([emqx, emqx_conf, emqx_auth, emqx_auth_jwt], #{
+        work_dir => ?config(priv_dir, Config)
+    }),
+    [{apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    emqx_authn_test_lib:delete_authenticators(?PATH, ?GLOBAL),
+    ok = emqx_cth_suite:stop(?config(apps, Config)),
+    ok.
+
+%%--------------------------------------------------------------------
+%% CT cases
+%%--------------------------------------------------------------------
+
+t_jwt_expire(_Config) ->
+    _ = process_flag(trap_exit, true),
+
+    {ok, _} = emqx:update_config(
+        ?PATH,
+        {create_authenticator, ?GLOBAL, auth_config()}
+    ),
+
+    {ok, [#{provider := emqx_authn_jwt}]} = emqx_authn_chains:list_authenticators(?GLOBAL),
+
+    Payload = #{
+        <<"username">> => <<"myuser">>,
+        <<"exp">> => erlang:system_time(second) + 2
+    },
+    JWS = emqx_authn_jwt_SUITE:generate_jws('hmac-based', Payload, <<"secret">>),
+
+    {ok, C} = emqtt:start_link([{username, <<"myuser">>}, {password, JWS}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C),
+
+    receive
+        {disconnected, ?RC_NOT_AUTHORIZED, #{}} -> ok
+    after 5000 ->
+        ct:fail("Client should be disconnected by timeout")
+    end.
+
+%%--------------------------------------------------------------------
+%% Helper functions
+%%--------------------------------------------------------------------
+
+auth_config() ->
+    #{
+        <<"use_jwks">> => false,
+        <<"algorithm">> => <<"hmac-based">>,
+        <<"acl_claim_name">> => <<"acl">>,
+        <<"secret">> => <<"secret">>,
+        <<"mechanism">> => <<"jwt">>,
+        <<"verify_claims">> => #{<<"username">> => <<"${username}">>}
+        %% Should be enabled by default
+        %% <<"disconnect_after_expire">> => true
+    }.

+ 3 - 2
apps/emqx_auth_jwt/test/emqx_authz_jwt_SUITE.erl

@@ -455,11 +455,12 @@ t_invalid_rule(_Config) ->
 authn_config() ->
     #{
         <<"mechanism">> => <<"jwt">>,
-        <<"use_jwks">> => <<"false">>,
+        <<"use_jwks">> => false,
         <<"algorithm">> => <<"hmac-based">>,
         <<"secret">> => ?SECRET,
-        <<"secret_base64_encoded">> => <<"false">>,
+        <<"secret_base64_encoded">> => false,
         <<"acl_claim_name">> => <<"acl">>,
+        <<"disconnect_after_expire">> => false,
         <<"verify_claims">> => #{
             <<"username">> => ?PH_USERNAME
         }

+ 1 - 1
apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl

@@ -518,7 +518,7 @@ handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
 handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
     handle_info({sock_error, Reason}, State);
 handle_msg({close, Reason}, State) ->
-    ?SLOG(debug, #{msg => "force_socket_close", reason => Reason}),
+    ?tp(debug, force_socket_close, #{reason => Reason}),
     handle_info({sock_closed, Reason}, close_socket(State));
 handle_msg(
     {event, connected},

+ 19 - 3
apps/emqx_gateway/src/emqx_gateway_ctx.erl

@@ -39,6 +39,7 @@
 %% Authentication circle
 -export([
     authenticate/2,
+    connection_expire_interval/2,
     open_session/5,
     open_session/6,
     insert_channel_info/4,
@@ -78,6 +79,13 @@ authenticate(_Ctx, ClientInfo0) ->
             {error, Reason}
     end.
 
+-spec connection_expire_interval(context(), emqx_types:clientinfo()) ->
+    undefined | non_neg_integer().
+connection_expire_interval(_Ctx, #{auth_expire_at := undefined}) ->
+    undefined;
+connection_expire_interval(_Ctx, #{auth_expire_at := ExpireAt}) ->
+    max(0, ExpireAt - erlang:system_time(millisecond)).
+
 %% @doc Register the session to the cluster.
 %%
 %%  This function should be called after the client has authenticated
@@ -157,6 +165,9 @@ set_chan_stats(_Ctx = #{gwname := GwName}, ClientId, Stats) ->
 connection_closed(_Ctx = #{gwname := GwName}, ClientId) ->
     emqx_gateway_cm:connection_closed(GwName, ClientId).
 
+%%--------------------------------------------------------------------
+%% Message circle
+
 -spec authorize(
     context(),
     emqx_types:clientinfo(),
@@ -167,6 +178,9 @@ connection_closed(_Ctx = #{gwname := GwName}, ClientId) ->
 authorize(_Ctx, ClientInfo, Action, Topic) ->
     emqx_access_control:authorize(ClientInfo, Action, Topic).
 
+%%--------------------------------------------------------------------
+%% Metrics & Stats
+
 metrics_inc(_Ctx = #{gwname := GwName}, Name) ->
     emqx_gateway_metrics:inc(GwName, Name).
 
@@ -183,6 +197,8 @@ eval_mountpoint(ClientInfo = #{mountpoint := MountPoint}) ->
     MountPoint1 = emqx_mountpoint:replvar(MountPoint, ClientInfo),
     ClientInfo#{mountpoint := MountPoint1}.
 
-merge_auth_result(ClientInfo, AuthResult) when is_map(ClientInfo) andalso is_map(AuthResult) ->
-    IsSuperuser = maps:get(is_superuser, AuthResult, false),
-    maps:merge(ClientInfo, AuthResult#{is_superuser => IsSuperuser}).
+merge_auth_result(ClientInfo, AuthResult0) when is_map(ClientInfo) andalso is_map(AuthResult0) ->
+    IsSuperuser = maps:get(is_superuser, AuthResult0, false),
+    ExpireAt = maps:get(expire_at, AuthResult0, undefined),
+    AuthResult1 = maps:without([expire_at], AuthResult0),
+    maps:merge(ClientInfo#{auth_expire_at => ExpireAt}, AuthResult1#{is_superuser => IsSuperuser}).

+ 1 - 1
apps/emqx_gateway/test/emqx_gateway_ctx_SUITE.erl

@@ -82,4 +82,4 @@ t_authenticate(_) ->
     ?assertMatch({ok, #{is_superuser := true}}, emqx_gateway_ctx:authenticate(Ctx, Info4)),
     ok.
 
-default_result(Info) -> Info#{zone => default, is_superuser => false}.
+default_result(Info) -> Info#{zone => default, is_superuser => false, auth_expire_at => undefined}.

+ 11 - 1
apps/emqx_gateway_coap/src/emqx_coap_channel.erl

@@ -214,6 +214,8 @@ handle_timeout(_, {transport, Msg}, Channel) ->
     call_session(timeout, Msg, Channel);
 handle_timeout(_, disconnect, Channel) ->
     {shutdown, normal, Channel};
+handle_timeout(_, connection_expire, Channel) ->
+    {shutdown, expired, Channel};
 handle_timeout(_, _, Channel) ->
     {ok, Channel}.
 
@@ -595,6 +597,14 @@ process_connect(
             iter(Iter, reply({error, bad_request}, Msg, Result), Channel)
     end.
 
+schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) ->
+    case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of
+        undefined ->
+            Channel;
+        Interval ->
+            ensure_timer(connection_expire_timer, Interval, connection_expire, Channel)
+    end.
+
 run_hooks(Ctx, Name, Args) ->
     emqx_gateway_ctx:metrics_inc(Ctx, Name),
     emqx_hooks:run(Name, Args).
@@ -619,7 +629,7 @@ ensure_connected(
     NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
     _ = run_hooks(Ctx, 'client.connack', [NConnInfo, connection_accepted, #{}]),
     ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
-    Channel#channel{conninfo = NConnInfo, conn_state = connected}.
+    schedule_connection_expire(Channel#channel{conninfo = NConnInfo, conn_state = connected}).
 
 %%--------------------------------------------------------------------
 %% Ensure disconnected

+ 1 - 1
apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_coap, [
     {description, "CoAP Gateway"},
-    {vsn, "0.1.7"},
+    {vsn, "0.1.8"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 36 - 0
apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl

@@ -29,6 +29,7 @@
 
 -include_lib("er_coap_client/include/coap.hrl").
 -include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/asserts.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
@@ -83,6 +84,17 @@ init_per_testcase(t_connection_with_authn_failed, Config) ->
         fun(_) -> {error, bad_username_or_password} end
     ),
     Config;
+init_per_testcase(t_connection_with_expire, Config) ->
+    ok = meck:new(emqx_access_control, [passthrough, no_history]),
+    ok = meck:expect(
+        emqx_access_control,
+        authenticate,
+        fun(_) ->
+            {ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 100}}
+        end
+    ),
+    snabbkaffe:start_trace(),
+    Config;
 init_per_testcase(t_heartbeat, Config) ->
     NewHeartbeat = 800,
     OldConf = emqx:get_raw_config([gateway, coap]),
@@ -103,6 +115,10 @@ end_per_testcase(t_heartbeat, Config) ->
     OldConf = ?config(old_conf, Config),
     {ok, _} = emqx_gateway_conf:update_gateway(coap, OldConf),
     ok;
+end_per_testcase(t_connection_with_expire, Config) ->
+    snabbkaffe:stop(),
+    meck:unload(emqx_access_control),
+    Config;
 end_per_testcase(_, Config) ->
     ok = meck:unload(emqx_access_control),
     Config.
@@ -270,6 +286,26 @@ t_connection_with_authn_failed(_) ->
     ),
     ok.
 
+t_connection_with_expire(_) ->
+    ChId = {{127, 0, 0, 1}, 5683},
+    {ok, Sock} = er_coap_udp_socket:start_link(),
+    {ok, Channel} = er_coap_udp_socket:get_channel(Sock, ChId),
+
+    URI = ?MQTT_PREFIX ++ "/connection?clientid=client1",
+
+    ?assertWaitEvent(
+        begin
+            Req = make_req(post),
+            {ok, created, _Data} = do_request(Channel, URI, Req)
+        end,
+        #{
+            ?snk_kind := conn_process_terminated,
+            clientid := <<"client1">>,
+            reason := {shutdown, expired}
+        },
+        5000
+    ).
+
 t_publish(_) ->
     %% can publish to a normal topic
     Topics = [

+ 13 - 2
apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl

@@ -302,6 +302,9 @@ handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) -
     {shutdown, Reason, Channel};
 handle_timeout(_TRef, force_close_idle, Channel) ->
     {shutdown, idle_timeout, Channel};
+handle_timeout(_TRef, connection_expire, Channel) ->
+    NChannel = remove_timer_ref(connection_expire, Channel),
+    {ok, [{event, disconnected}, {close, expired}], NChannel};
 handle_timeout(_TRef, Msg, Channel) ->
     ?SLOG(warning, #{
         msg => "unexpected_timeout_signal",
@@ -666,10 +669,18 @@ ensure_connected(
 ) ->
     NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
     ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
-    Channel#channel{
+    schedule_connection_expire(Channel#channel{
         conninfo = NConnInfo,
         conn_state = connected
-    }.
+    }).
+
+schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) ->
+    case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of
+        undefined ->
+            Channel;
+        Interval ->
+            ensure_timer(connection_expire, Interval, Channel)
+    end.
 
 ensure_disconnected(
     Reason,

+ 1 - 1
apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_exproto, [
     {description, "ExProto Gateway"},
-    {vsn, "0.1.9"},
+    {vsn, "0.1.10"},
     {registered, []},
     {applications, [kernel, stdlib, grpc, emqx, emqx_gateway]},
     {env, []},

+ 43 - 6
apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl

@@ -21,6 +21,7 @@
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/asserts.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
@@ -81,6 +82,7 @@ groups() ->
         t_raw_publish,
         t_auth_deny,
         t_acl_deny,
+        t_auth_expire,
         t_hook_connected_disconnected,
         t_hook_session_subscribed_unsubscribed,
         t_hook_message_delivered
@@ -157,14 +159,17 @@ end_per_group(_, Cfg) ->
 init_per_testcase(TestCase, Cfg) when
     TestCase == t_enter_passive_mode
 ->
+    snabbkaffe:start_trace(),
     case proplists:get_value(listener_type, Cfg) of
         udp -> {skip, ignore};
         _ -> Cfg
     end;
 init_per_testcase(_TestCase, Cfg) ->
+    snabbkaffe:start_trace(),
     Cfg.
 
 end_per_testcase(_TestCase, _Cfg) ->
+    snabbkaffe:stop(),
     ok.
 
 listener_confs(Type) ->
@@ -290,6 +295,42 @@ t_auth_deny(Cfg) ->
         end,
     meck:unload([emqx_gateway_ctx]).
 
+t_auth_expire(Cfg) ->
+    SockType = proplists:get_value(listener_type, Cfg),
+    Sock = open(SockType),
+
+    Client = #{
+        proto_name => <<"demo">>,
+        proto_ver => <<"v0.1">>,
+        clientid => <<"test_client_1">>
+    },
+    Password = <<"123456">>,
+
+    ok = meck:new(emqx_access_control, [passthrough, no_history]),
+    ok = meck:expect(
+        emqx_access_control,
+        authenticate,
+        fun(_) ->
+            {ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}}
+        end
+    ),
+
+    ConnBin = frame_connect(Client, Password),
+    ConnAckBin = frame_connack(0),
+
+    ?assertWaitEvent(
+        begin
+            send(Sock, ConnBin),
+            {ok, ConnAckBin} = recv(Sock, 5000)
+        end,
+        #{
+            ?snk_kind := conn_process_terminated,
+            clientid := <<"test_client_1">>,
+            reason := {shutdown, expired}
+        },
+        5000
+    ).
+
 t_acl_deny(Cfg) ->
     SockType = proplists:get_value(listener_type, Cfg),
     Sock = open(SockType),
@@ -332,7 +373,6 @@ t_acl_deny(Cfg) ->
     close(Sock).
 
 t_keepalive_timeout(Cfg) ->
-    ok = snabbkaffe:start_trace(),
     SockType = proplists:get_value(listener_type, Cfg),
     Sock = open(SockType),
 
@@ -383,8 +423,7 @@ t_keepalive_timeout(Cfg) ->
             ?assertEqual(1, length(?of_kind(conn_process_terminated, Trace))),
             %% socket port should be closed
             ?assertEqual({error, closed}, recv(Sock, 5000))
-    end,
-    snabbkaffe:stop().
+    end.
 
 t_hook_connected_disconnected(Cfg) ->
     SockType = proplists:get_value(listener_type, Cfg),
@@ -513,7 +552,6 @@ t_hook_message_delivered(Cfg) ->
     emqx_hooks:del('message.delivered', {?MODULE, hook_fun5}).
 
 t_idle_timeout(Cfg) ->
-    ok = snabbkaffe:start_trace(),
     SockType = proplists:get_value(listener_type, Cfg),
     Sock = open(SockType),
 
@@ -551,8 +589,7 @@ t_idle_timeout(Cfg) ->
                 {ok, #{reason := {shutdown, idle_timeout}}},
                 ?block_until(#{?snk_kind := conn_process_terminated}, 10000)
             )
-    end,
-    snabbkaffe:stop().
+    end.
 
 %%--------------------------------------------------------------------
 %% Utils

+ 1 - 1
apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_gbt32960, [
     {description, "GBT32960 Gateway"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway]},
     {env, []},

+ 19 - 3
apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl

@@ -72,7 +72,8 @@
 
 -define(TIMER_TABLE, #{
     alive_timer => keepalive,
-    retry_timer => retry_delivery
+    retry_timer => retry_delivery,
+    connection_expire_timer => connection_expire
 }).
 
 -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
@@ -468,6 +469,13 @@ handle_timeout(
             {Outgoings2, NChannel} = dispatch_frame(Channel#channel{inflight = NInflight}),
             {ok, [{outgoing, Outgoings ++ Outgoings2}], reset_timer(retry_timer, NChannel)}
     end;
+handle_timeout(
+    _TRef,
+    connection_expire,
+    Channel
+) ->
+    NChannel = clean_timer(connection_expire_timer, Channel),
+    {ok, [{event, disconnected}, {close, expired}], NChannel};
 handle_timeout(_TRef, Msg, Channel) ->
     log(error, #{msg => "unexpected_timeout", content => Msg}, Channel),
     {ok, Channel}.
@@ -591,10 +599,18 @@ ensure_connected(
 ) ->
     NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
     ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
-    Channel#channel{
+    schedule_connection_expire(Channel#channel{
         conninfo = NConnInfo,
         conn_state = connected
-    }.
+    }).
+
+schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) ->
+    case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of
+        undefined ->
+            Channel;
+        Interval ->
+            ensure_timer(connection_expire_timer, Interval, Channel)
+    end.
 
 process_connect(
     Frame,

+ 31 - 0
apps/emqx_gateway_gbt32960/test/emqx_gbt32960_SUITE.erl

@@ -11,6 +11,7 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("emqx/include/asserts.hrl").
 
 -define(BYTE, 8 / big - integer).
 -define(WORD, 16 / big - integer).
@@ -52,6 +53,14 @@ end_per_suite(Config) ->
     emqx_cth_suite:stop(?config(suite_apps, Config)),
     ok.
 
+init_per_testcase(_, Config) ->
+    snabbkaffe:start_trace(),
+    Config.
+
+end_per_testcase(_, _Config) ->
+    snabbkaffe:stop(),
+    ok.
+
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% helper functions %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
 encode(Cmd, Vin, Data) ->
@@ -171,6 +180,28 @@ t_case01_login_channel_info(_Config) ->
 
     ok = gen_tcp:close(Socket).
 
+t_case01_auth_expire(_Config) ->
+    ok = meck:new(emqx_access_control, [passthrough, no_history]),
+    ok = meck:expect(
+        emqx_access_control,
+        authenticate,
+        fun(_) ->
+            {ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}}
+        end
+    ),
+
+    ?assertWaitEvent(
+        begin
+            {ok, _Socket} = login_first()
+        end,
+        #{
+            ?snk_kind := conn_process_terminated,
+            clientid := <<"1G1BL52P7TR115520">>,
+            reason := {shutdown, expired}
+        },
+        5000
+    ).
+
 t_case02_reportinfo_0x01(_Config) ->
     % send VEHICLE LOGIN
     {ok, Socket} = login_first(),

+ 1 - 1
apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_gateway_lwm2m, [
     {description, "LwM2M Gateway"},
-    {vsn, "0.1.5"},
+    {vsn, "0.1.6"},
     {registered, []},
     {applications, [kernel, stdlib, emqx, emqx_gateway, emqx_gateway_coap, xmerl]},
     {env, []},

+ 12 - 2
apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl

@@ -202,6 +202,8 @@ handle_timeout(_, {transport, _} = Msg, Channel) ->
     call_session(timeout, Msg, Channel);
 handle_timeout(_, disconnect, Channel) ->
     {shutdown, normal, Channel};
+handle_timeout(_, connection_expire, Channel) ->
+    {shutdown, expired, Channel};
 handle_timeout(_, _, Channel) ->
     {ok, Channel}.
 
@@ -353,10 +355,18 @@ ensure_connected(
 
     NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
     ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
-    Channel#channel{
+    schedule_connection_expire(Channel#channel{
         conninfo = NConnInfo,
         conn_state = connected
-    }.
+    }).
+
+schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) ->
+    case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of
+        undefined ->
+            Channel;
+        Interval ->
+            make_timer(connection_expire, Interval, connection_expire, Channel)
+    end.
 
 %%--------------------------------------------------------------------
 %% Ensure disconnected

+ 41 - 0
apps/emqx_gateway_lwm2m/test/emqx_lwm2m_SUITE.erl

@@ -36,6 +36,7 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx/include/asserts.hrl").
 
 -record(coap_content, {content_format, payload = <<>>}).
 
@@ -66,6 +67,7 @@ groups() ->
     [
         {test_grp_0_register, [RepeatOpt], [
             case01_register,
+            case01_auth_expire,
             case01_register_additional_opts,
             %% TODO now we can't handle partial decode packet
             %% case01_register_incorrect_opts,
@@ -145,6 +147,7 @@ end_per_suite(Config) ->
     Config.
 
 init_per_testcase(TestCase, Config) ->
+    snabbkaffe:start_trace(),
     GatewayConfig =
         case TestCase of
             case09_auto_observe ->
@@ -171,6 +174,7 @@ end_per_testcase(_AllTestCase, Config) ->
     timer:sleep(300),
     gen_udp:close(?config(sock, Config)),
     emqtt:disconnect(?config(emqx_c, Config)),
+    snabbkaffe:stop(),
     ok = application:stop(emqx_gateway).
 
 default_config() ->
@@ -280,6 +284,43 @@ case01_register(Config) ->
     timer:sleep(50),
     false = lists:member(SubTopic, test_mqtt_broker:get_subscrbied_topics()).
 
+case01_auth_expire(Config) ->
+    ok = meck:new(emqx_access_control, [passthrough, no_history]),
+    ok = meck:expect(
+        emqx_access_control,
+        authenticate,
+        fun(_) ->
+            {ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}}
+        end
+    ),
+
+    %%----------------------------------------
+    %% REGISTER command
+    %%----------------------------------------
+    UdpSock = ?config(sock, Config),
+    Epn = "urn:oma:lwm2m:oma:3",
+    MsgId = 12,
+
+    ?assertWaitEvent(
+        test_send_coap_request(
+            UdpSock,
+            post,
+            sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=345&lwm2m=1", [?PORT, Epn]),
+            #coap_content{
+                content_format = <<"text/plain">>,
+                payload = <<"</1>, </2>, </3>, </4>, </5>">>
+            },
+            [],
+            MsgId
+        ),
+        #{
+            ?snk_kind := conn_process_terminated,
+            clientid := <<"urn:oma:lwm2m:oma:3">>,
+            reason := {shutdown, expired}
+        },
+        5000
+    ).
+
 case01_register_additional_opts(Config) ->
     %%----------------------------------------
     %% REGISTER command

+ 13 - 2
apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl

@@ -364,10 +364,18 @@ ensure_connected(
 ) ->
     NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
     ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
-    Channel#channel{
+    schedule_connection_expire(Channel#channel{
         conninfo = NConnInfo,
         conn_state = connected
-    }.
+    }).
+
+schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) ->
+    case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of
+        undefined ->
+            Channel;
+        Interval ->
+            ensure_timer(connection_expire, Interval, Channel)
+    end.
 
 process_connect(
     Channel = #channel{
@@ -2122,6 +2130,9 @@ handle_timeout(_TRef, expire_session, Channel) ->
     shutdown(expired, Channel);
 handle_timeout(_TRef, expire_asleep, Channel) ->
     shutdown(asleep_timeout, Channel);
+handle_timeout(_TRef, connection_expire, Channel) ->
+    NChannel = clean_timer(connection_expire, Channel),
+    handle_out(disconnect, expired, NChannel);
 handle_timeout(_TRef, Msg, Channel) ->
     %% NOTE
     %% We do not expect `emqx_mqttsn_session` to set up any custom timers (i.e with

+ 39 - 0
apps/emqx_gateway_mqttsn/test/emqx_sn_protocol_SUITE.erl

@@ -33,6 +33,7 @@
 -include_lib("common_test/include/ct.hrl").
 
 -include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/asserts.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -141,6 +142,14 @@ end_per_suite(Config) ->
     emqx_common_test_http:delete_default_app(),
     emqx_cth_suite:stop(?config(suite_apps, Config)).
 
+init_per_testcase(_TestCase, Config) ->
+    snabbkaffe:start_trace(),
+    Config.
+
+end_per_testcase(_TestCase, _Config) ->
+    snabbkaffe:stop(),
+    ok.
+
 restart_mqttsn_with_subs_resume_on() ->
     Conf = emqx:get_raw_config([gateway, mqttsn]),
     emqx_gateway_conf:update_gateway(
@@ -206,6 +215,36 @@ t_connect(_) ->
     ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
     gen_udp:close(Socket).
 
+t_auth_expire(_) ->
+    SockName = {'mqttsn:udp:default', 1884},
+    ?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())),
+
+    ok = meck:new(emqx_access_control, [passthrough, no_history]),
+    ok = meck:expect(
+        emqx_access_control,
+        authenticate,
+        fun(_) ->
+            {ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}}
+        end
+    ),
+
+    ?assertWaitEvent(
+        begin
+            {ok, Socket} = gen_udp:open(0, [binary]),
+            send_connect_msg(Socket, <<"client_id_test1">>),
+            ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
+
+            ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
+            gen_udp:close(Socket)
+        end,
+        #{
+            ?snk_kind := conn_process_terminated,
+            clientid := <<"client_id_test1">>,
+            reason := {shutdown, expired}
+        },
+        5000
+    ).
+
 t_first_disconnect(_) ->
     SockName = {'mqttsn:udp:default', 1884},
     ?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())),

+ 20 - 16
apps/emqx_gateway_ocpp/src/emqx_ocpp_channel.erl

@@ -89,7 +89,8 @@
 -type replies() :: reply() | [reply()].
 
 -define(TIMER_TABLE, #{
-    alive_timer => keepalive
+    alive_timer => keepalive,
+    connection_expire_timer => connection_expire
 }).
 
 -define(INFO_KEYS, [
@@ -315,20 +316,13 @@ enrich_client(
         expiry_interval => 0,
         receive_maximum => 1
     },
-    NClientInfo = fix_mountpoint(
+    NClientInfo =
         ClientInfo#{
             clientid => ClientId,
             username => Username
-        }
-    ),
+        },
     {ok, Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo}}.
 
-fix_mountpoint(ClientInfo = #{mountpoint := undefined}) ->
-    ClientInfo;
-fix_mountpoint(ClientInfo = #{mountpoint := Mountpoint}) ->
-    Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
-    ClientInfo#{mountpoint := Mountpoint1}.
-
 set_log_meta(#channel{
     clientinfo = #{clientid := ClientId},
     conninfo = #{peername := Peername}
@@ -350,15 +344,14 @@ check_banned(_UserInfo, #channel{clientinfo = ClientInfo}) ->
 
 auth_connect(
     #{password := Password},
-    #channel{clientinfo = ClientInfo} = Channel
+    #channel{ctx = Ctx, clientinfo = ClientInfo} = Channel
 ) ->
     #{
         clientid := ClientId,
         username := Username
     } = ClientInfo,
-    case emqx_access_control:authenticate(ClientInfo#{password => Password}) of
-        {ok, AuthResult} ->
-            NClientInfo = maps:merge(ClientInfo, AuthResult),
+    case emqx_gateway_ctx:authenticate(Ctx, ClientInfo#{password => Password}) of
+        {ok, NClientInfo} ->
             {ok, Channel#channel{clientinfo = NClientInfo}};
         {error, Reason} ->
             ?SLOG(warning, #{
@@ -659,6 +652,9 @@ handle_timeout(
         {error, timeout} ->
             handle_out(disconnect, keepalive_timeout, Channel)
     end;
+handle_timeout(_TRef, connection_expire, Channel) ->
+    %% No take over implemented, so just shutdown
+    shutdown(expired, Channel);
 handle_timeout(_TRef, Msg, Channel) ->
     ?SLOG(error, #{msg => "unexpected_timeout", timeout_msg => Msg}),
     {ok, Channel}.
@@ -796,10 +792,18 @@ ensure_connected(
 ) ->
     NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
     ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
-    Channel#channel{
+    schedule_connection_expire(Channel#channel{
         conninfo = NConnInfo,
         conn_state = connected
-    }.
+    }).
+
+schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) ->
+    case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of
+        undefined ->
+            Channel;
+        Interval ->
+            ensure_timer(connection_expire_timer, Interval, Channel)
+    end.
 
 ensure_disconnected(
     Reason,

+ 3 - 1
apps/emqx_gateway_ocpp/src/emqx_ocpp_connection.erl

@@ -20,6 +20,7 @@
 -include("emqx_ocpp.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/types.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -logger_header("[OCPP/WS]").
 
@@ -513,7 +514,8 @@ websocket_close(Reason, State) ->
     handle_info({sock_closed, Reason}, State).
 
 terminate(Reason, _Req, #state{channel = Channel}) ->
-    ?SLOG(debug, #{msg => "terminated", reason => Reason}),
+    ClientId = emqx_ocpp_channel:info(clientid, Channel),
+    ?tp(debug, conn_process_terminated, #{reason => Reason, clientid => ClientId}),
     emqx_ocpp_channel:terminate(Reason, Channel);
 terminate(_Reason, _Req, _UnExpectedState) ->
     ok.

+ 29 - 2
apps/emqx_gateway_ocpp/test/emqx_ocpp_SUITE.erl

@@ -19,6 +19,7 @@
 -include("emqx_ocpp.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("emqx/include/asserts.hrl").
 
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -32,8 +33,6 @@
     ]
 ).
 
--define(HEARTBEAT, <<$\n>>).
-
 %% erlfmt-ignore
 -define(CONF_DEFAULT, <<"
     gateway.ocpp {
@@ -82,6 +81,14 @@ end_per_suite(Config) ->
     emqx_cth_suite:stop(?config(suite_apps, Config)),
     ok.
 
+init_per_testcase(_TestCase, Config) ->
+    snabbkaffe:start_trace(),
+    Config.
+
+end_per_testcase(_TestCase, _Config) ->
+    snabbkaffe:stop(),
+    ok.
+
 default_config() ->
     ?CONF_DEFAULT.
 
@@ -188,6 +195,26 @@ t_adjust_keepalive_timer(_Config) ->
     ?assertEqual(undefined, emqx_gateway_cm:get_chan_info(ocpp, <<"client1">>)),
     ok.
 
+t_auth_expire(_Config) ->
+    ok = meck:new(emqx_access_control, [passthrough, no_history]),
+    ok = meck:expect(
+        emqx_access_control,
+        authenticate,
+        fun(_) ->
+            {ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}}
+        end
+    ),
+
+    ?assertWaitEvent(
+        {ok, _ClientPid} = connect("127.0.0.1", 33033, <<"client1">>),
+        #{
+            ?snk_kind := conn_process_terminated,
+            clientid := <<"client1">>,
+            reason := {shutdown, expired}
+        },
+        5000
+    ).
+
 t_listeners_status(_Config) ->
     {200, [Listener]} = request(get, "/gateways/ocpp/listeners"),
     ?assertMatch(

+ 16 - 4
apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl

@@ -93,7 +93,8 @@
 -define(TIMER_TABLE, #{
     incoming_timer => keepalive,
     outgoing_timer => keepalive_send,
-    clean_trans_timer => clean_trans
+    clean_trans_timer => clean_trans,
+    connection_expire_timer => connection_expire
 }).
 
 -define(TRANS_TIMEOUT, 60000).
@@ -356,10 +357,18 @@ ensure_connected(
 ) ->
     NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
     ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
-    Channel#channel{
+    schedule_connection_expire(Channel#channel{
         conninfo = NConnInfo,
         conn_state = connected
-    }.
+    }).
+
+schedule_connection_expire(Channel = #channel{ctx = Ctx, clientinfo = ClientInfo}) ->
+    case emqx_gateway_ctx:connection_expire_interval(Ctx, ClientInfo) of
+        undefined ->
+            Channel;
+        Interval ->
+            ensure_timer(connection_expire_timer, Interval, Channel)
+    end.
 
 process_connect(
     Channel = #channel{
@@ -1137,7 +1146,10 @@ handle_timeout(_TRef, clean_trans, Channel = #channel{transaction = Trans}) ->
         end,
         Trans
     ),
-    {ok, ensure_clean_trans_timer(Channel#channel{transaction = NTrans})}.
+    {ok, ensure_clean_trans_timer(Channel#channel{transaction = NTrans})};
+handle_timeout(_TRef, connection_expire, Channel) ->
+    %% No session take over implemented, just shut down
+    shutdown(expired, Channel).
 
 %%--------------------------------------------------------------------
 %% Terminate

+ 37 - 0
apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl

@@ -18,6 +18,7 @@
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("emqx/include/asserts.hrl").
 -include("emqx_stomp.hrl").
 
 -compile(export_all).
@@ -78,6 +79,14 @@ end_per_suite(Config) ->
     emqx_cth_suite:stop(?config(suite_apps, Config)),
     ok.
 
+init_per_testcase(_TestCase, Config) ->
+    snabbkaffe:start_trace(),
+    Config.
+
+end_per_testcase(_TestCase, _Config) ->
+    snabbkaffe:stop(),
+    ok.
+
 default_config() ->
     ?CONF_DEFAULT.
 
@@ -141,6 +150,34 @@ t_connect(_) ->
     end,
     with_connection(ProtocolError).
 
+t_auth_expire(_) ->
+    ok = meck:new(emqx_access_control, [passthrough, no_history]),
+    ok = meck:expect(
+        emqx_access_control,
+        authenticate,
+        fun(_) ->
+            {ok, #{is_superuser => false, expire_at => erlang:system_time(millisecond) + 500}}
+        end
+    ),
+
+    ConnectWithExpire = fun(Sock) ->
+        ?assertWaitEvent(
+            begin
+                ok = send_connection_frame(Sock, <<"guest">>, <<"guest">>, <<"1000,2000">>),
+                {ok, Frame} = recv_a_frame(Sock),
+                ?assertMatch(<<"CONNECTED">>, Frame#stomp_frame.command)
+            end,
+            #{
+                ?snk_kind := conn_process_terminated,
+                clientid := _,
+                reason := {shutdown, expired}
+            },
+            5000
+        )
+    end,
+    with_connection(ConnectWithExpire),
+    meck:unload(emqx_access_control).
+
 t_heartbeat(_) ->
     %% Test heart beat
     with_connection(fun(Sock) ->

+ 10 - 0
changes/ce/feat-12947.en.md

@@ -0,0 +1,10 @@
+## Breaking changes
+
+For JWT authentication, support new `disconnect_after_expire` option. When enabled, the client will be disconnected after the JWT token expires.
+
+This option is enabled by default, so the default behavior is changed.
+Previously, the clients with actual JWTs could connect to the broker and stay connected
+even after the JWT token expired.
+Now, the client will be disconnected after the JWT token expires.
+
+To preserve the previous behavior, set `disconnect_after_expire` to `false`.

+ 6 - 0
rel/i18n/emqx_authn_jwt_schema.hocon

@@ -139,4 +139,10 @@ Authentication will verify that the value of claims in the JWT (taken from the P
 verify_claims.label:
 """Verify Claims"""
 
+disconnect_after_expire.desc:
+"""Disconnect the client after the token expires."""
+
+disconnect_after_expire.label:
+"""Disconnect After Expire"""
+
 }