Просмотр исходного кода

Merge pull request #9641 from thalesmg/fix-gcp-pubsub-jwt-refresh-v50

fix(gcp_pubsub): fix potential jwt accumulation and lack of refresh (v5.0)
Thales Macedo Garitezi 3 лет назад
Родитель
Сommit
a612eacf3a

+ 1 - 1
apps/emqx_connector/src/emqx_connector.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_connector, [
     {description, "An OTP application"},
-    {vsn, "0.1.10"},
+    {vsn, "0.1.11"},
     {registered, []},
     {mod, {emqx_connector_app, []}},
     {applications, [

+ 14 - 1
apps/emqx_connector/src/emqx_connector_jwt.erl

@@ -18,11 +18,13 @@
 
 -include_lib("emqx_connector/include/emqx_connector_tables.hrl").
 -include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 %% API
 -export([
     lookup_jwt/1,
-    lookup_jwt/2
+    lookup_jwt/2,
+    delete_jwt/2
 ]).
 
 -type jwt() :: binary().
@@ -44,3 +46,14 @@ lookup_jwt(TId, ResourceId) ->
         error:badarg ->
             {error, not_found}
     end.
+
+-spec delete_jwt(ets:table(), resource_id()) -> ok.
+delete_jwt(TId, ResourceId) ->
+    try
+        ets:delete(TId, {ResourceId, jwt}),
+        ?tp(connector_jwt_deleted, #{}),
+        ok
+    catch
+        error:badarg ->
+            ok
+    end.

+ 1 - 1
apps/emqx_connector/src/emqx_connector_jwt_sup.erl

@@ -78,7 +78,7 @@ jwt_worker_child_spec(Id, Config) ->
         restart => transient,
         type => worker,
         significant => false,
-        shutdown => brutal_kill,
+        shutdown => 5_000,
         modules => [emqx_connector_jwt_worker]
     }.
 

+ 27 - 8
apps/emqx_connector/src/emqx_connector_jwt_worker.erl

@@ -21,7 +21,8 @@
 %% API
 -export([
     start_link/1,
-    ensure_jwt/1
+    ensure_jwt/1,
+    force_refresh/1
 ]).
 
 %% gen_server API
@@ -32,7 +33,8 @@
     handle_cast/2,
     handle_info/2,
     format_status/1,
-    format_status/2
+    format_status/2,
+    terminate/2
 ]).
 
 -include_lib("emqx_resource/include/emqx_resource.hrl").
@@ -52,7 +54,7 @@
 }.
 -type jwt() :: binary().
 -type state() :: #{
-    refresh_timer := undefined | timer:tref(),
+    refresh_timer := undefined | timer:tref() | reference(),
     resource_id := resource_id(),
     expiration := timer:time(),
     table := ets:table(),
@@ -94,6 +96,11 @@ ensure_jwt(Worker) ->
     gen_server:cast(Worker, {ensure_jwt, Ref}),
     Ref.
 
+-spec force_refresh(pid()) -> ok.
+force_refresh(Worker) ->
+    _ = erlang:send(Worker, {timeout, force_refresh, ?refresh_jwt}),
+    ok.
+
 %%-----------------------------------------------------------------------------------------
 %% gen_server API
 %%-----------------------------------------------------------------------------------------
@@ -102,6 +109,7 @@ ensure_jwt(Worker) ->
     {ok, state(), {continue, {make_key, binary()}}}
     | {stop, {error, term()}}.
 init(#{private_key := PrivateKeyPEM} = Config) ->
+    process_flag(trap_exit, true),
     State0 = maps:without([private_key], Config),
     State = State0#{
         jwk => undefined,
@@ -148,7 +156,7 @@ handle_cast({ensure_jwt, From}, State0 = #{jwt := JWT}) ->
 handle_cast(_Req, State) ->
     {noreply, State}.
 
-handle_info({timeout, TRef, ?refresh_jwt}, State0 = #{refresh_timer := TRef}) ->
+handle_info({timeout, _TRef, ?refresh_jwt}, State0) ->
     State = generate_and_store_jwt(State0),
     {noreply, State};
 handle_info(_Msg, State) ->
@@ -161,6 +169,11 @@ format_status(_Opt, [_PDict, State0]) ->
     State = censor_secrets(State0),
     [{data, [{"State", State}]}].
 
+terminate(_Reason, State) ->
+    #{resource_id := ResourceId, table := TId} = State,
+    emqx_connector_jwt:delete_jwt(TId, ResourceId),
+    ok.
+
 %%-----------------------------------------------------------------------------------------
 %% Helper fns
 %%-----------------------------------------------------------------------------------------
@@ -211,15 +224,14 @@ store_jwt(#{resource_id := ResourceId, table := TId}, JWT) ->
 -spec ensure_timer(state()) -> state().
 ensure_timer(
     State = #{
-        refresh_timer := undefined,
+        refresh_timer := OldTimer,
         expiration := ExpirationMS0
     }
 ) ->
+    cancel_timer(OldTimer),
     ExpirationMS = max(5_000, ExpirationMS0 - 5_000),
     TRef = erlang:start_timer(ExpirationMS, self(), ?refresh_jwt),
-    State#{refresh_timer => TRef};
-ensure_timer(State) ->
-    State.
+    State#{refresh_timer => TRef}.
 
 -spec censor_secrets(state()) -> map().
 censor_secrets(State = #{jwt := JWT, jwk := JWK}) ->
@@ -232,3 +244,10 @@ censor_secret(undefined) ->
     undefined;
 censor_secret(_Secret) ->
     "******".
+
+-spec cancel_timer(undefined | timer:tref() | reference()) -> ok.
+cancel_timer(undefined) ->
+    ok;
+cancel_timer(TRef) ->
+    _ = erlang:cancel_timer(TRef),
+    ok.

+ 10 - 0
apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl

@@ -67,3 +67,13 @@ t_lookup_jwt_missing(_Config) ->
     ResourceId = <<"resource id">>,
     ?assertEqual({error, not_found}, emqx_connector_jwt:lookup_jwt(ResourceId)),
     ok.
+
+t_delete_jwt(_Config) ->
+    TId = ?JWT_TABLE,
+    JWT = <<"some jwt">>,
+    ResourceId = <<"resource id">>,
+    true = insert_jwt(TId, ResourceId, JWT),
+    {ok, _} = emqx_connector_jwt:lookup_jwt(ResourceId),
+    ?assertEqual(ok, emqx_connector_jwt:delete_jwt(TId, ResourceId)),
+    ?assertEqual({error, not_found}, emqx_connector_jwt:lookup_jwt(TId, ResourceId)),
+    ok.

+ 28 - 6
apps/emqx_connector/test/emqx_connector_jwt_worker_SUITE.erl

@@ -186,14 +186,30 @@ t_refresh(_Config) ->
             {ok, SecondJWT} = emqx_connector_jwt:lookup_jwt(Table, ResourceId),
             ?assertNot(is_expired(SecondJWT)),
             ?assert(is_expired(FirstJWT)),
-            {FirstJWT, SecondJWT}
+            %% check yet another refresh to ensure the timer was properly
+            %% reset.
+            ?block_until(
+                #{
+                    ?snk_kind := connector_jwt_worker_refresh,
+                    jwt := JWT1
+                } when
+                    JWT1 =/= SecondJWT andalso
+                        JWT1 =/= FirstJWT,
+                15_000
+            ),
+            {ok, ThirdJWT} = emqx_connector_jwt:lookup_jwt(Table, ResourceId),
+            ?assertNot(is_expired(ThirdJWT)),
+            ?assert(is_expired(SecondJWT)),
+            {FirstJWT, SecondJWT, ThirdJWT}
         end,
-        fun({FirstJWT, SecondJWT}, Trace) ->
+        fun({FirstJWT, SecondJWT, ThirdJWT}, Trace) ->
             ?assertMatch(
-                [_, _ | _],
+                [_, _, _ | _],
                 ?of_kind(connector_jwt_worker_token_stored, Trace)
             ),
             ?assertNotEqual(FirstJWT, SecondJWT),
+            ?assertNotEqual(SecondJWT, ThirdJWT),
+            ?assertNotEqual(FirstJWT, ThirdJWT),
             ok
         end
     ),
@@ -289,7 +305,7 @@ t_lookup_badarg(_Config) ->
 
 t_start_supervised_worker(_Config) ->
     {ok, _} = emqx_connector_jwt_sup:start_link(),
-    Config = #{resource_id := ResourceId} = generate_config(),
+    Config = #{resource_id := ResourceId, table := TId} = generate_config(),
     {ok, Pid} = emqx_connector_jwt_sup:ensure_worker_present(ResourceId, Config),
     Ref = emqx_connector_jwt_worker:ensure_jwt(Pid),
     receive
@@ -300,6 +316,7 @@ t_start_supervised_worker(_Config) ->
     end,
     MRef = monitor(process, Pid),
     ?assert(is_process_alive(Pid)),
+    ?assertMatch({ok, _}, emqx_connector_jwt:lookup_jwt(TId, ResourceId)),
     ok = emqx_connector_jwt_sup:ensure_worker_deleted(ResourceId),
     receive
         {'DOWN', MRef, process, Pid, _} ->
@@ -307,6 +324,11 @@ t_start_supervised_worker(_Config) ->
     after 1_000 ->
         ct:fail("timeout")
     end,
+    %% ensure it cleans up its own tokens to avoid leakage when
+    %% probing/testing rule resources.
+    ?assertEqual({error, not_found}, emqx_connector_jwt:lookup_jwt(TId, ResourceId)),
+    %% ensure the specs are removed from the supervision tree.
+    ?assertEqual([], supervisor:which_children(emqx_connector_jwt_sup)),
     ok.
 
 t_start_supervised_worker_already_started(_Config) ->
@@ -322,9 +344,9 @@ t_start_supervised_worker_already_present(_Config) ->
     Config = #{resource_id := ResourceId} = generate_config(),
     {ok, Pid0} = emqx_connector_jwt_sup:ensure_worker_present(ResourceId, Config),
     Ref = monitor(process, Pid0),
-    exit(Pid0, {shutdown, normal}),
+    exit(Pid0, kill),
     receive
-        {'DOWN', Ref, process, Pid0, {shutdown, normal}} -> ok
+        {'DOWN', Ref, process, Pid0, killed} -> ok
     after 1_000 -> error(worker_didnt_stop)
     end,
     {ok, Pid1} = emqx_connector_jwt_sup:ensure_worker_present(ResourceId, Config),

+ 8 - 0
changes/v5.0.14-en.md

@@ -0,0 +1,8 @@
+# v5.0.14
+
+## Enhancements
+
+
+## Bug Fixes
+
+- Fix an issue where testing the GCP PubSub could leak memory, and an issue where its JWT token would fail to refresh a second time. [#9641](https://github.com/emqx/emqx/pull/9641)

+ 8 - 0
changes/v5.0.14-zh.md

@@ -0,0 +1,8 @@
+# v5.0.14
+
+## 增强
+
+
+## 修复
+
+- 修复了测试GCP PubSub可能泄露内存的问题,以及其JWT令牌第二次刷新失败的问题。 [#9640](https://github.com/emqx/emqx/pull/9640)

+ 1 - 0
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl

@@ -1336,6 +1336,7 @@ t_stop(Config) ->
         fun(Res, Trace) ->
             ?assertMatch({ok, {ok, _}}, Res),
             ?assertMatch([_], ?of_kind(gcp_pubsub_stop, Trace)),
+            ?assertMatch([_ | _], ?of_kind(connector_jwt_deleted, Trace)),
             ok
         end
     ),

+ 1 - 1
lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src

@@ -1,6 +1,6 @@
 {application, emqx_ee_connector, [
     {description, "EMQX Enterprise connectors"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {applications, [
         kernel,

+ 1 - 0
lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl

@@ -154,6 +154,7 @@ on_stop(
         connector => InstanceId
     }),
     emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId),
+    emqx_connector_jwt:delete_jwt(?JWT_TABLE, InstanceId),
     ehttpc_sup:stop_pool(PoolName).
 
 -spec on_query(