Przeglądaj źródła

fix(emqx_event_message): fix config update not work in cluster

lafirest 4 lat temu
rodzic
commit
24cba8efa1

+ 48 - 55
apps/emqx_modules/src/emqx_event_message.erl

@@ -24,6 +24,8 @@
         , update/1
         , enable/0
         , disable/0
+        , post_config_update/5
+        , init_conf_handler/0
         ]).
 
 -export([ on_client_connected/2
@@ -39,66 +41,31 @@
 -export([reason/1]).
 -endif.
 
+init_conf_handler() ->
+    emqx_conf:add_handler([event_message], ?MODULE).
+
 list() ->
     emqx_conf:get([event_message], #{}).
 
 update(Params) ->
-    disable(),
     case emqx_conf:update([event_message],
-                        Params,
-                        #{rawconf_with_defaults => true, override_to => cluster}) of
+                          Params,
+                          #{rawconf_with_defaults => true, override_to => cluster}) of
         {ok, #{raw_config := NewEventMessage}} ->
-            enable(),
             {ok, NewEventMessage};
         {error, Reason} ->
             {error, Reason}
     end.
 
+post_config_update(_KeyPath, _Config, NewConf, _OldConf, _AppEnvs) ->
+    disable(),
+    enable(maps:to_list(NewConf)).
+
 enable() ->
-    lists:foreach(fun({_Topic, false}) -> ok;
-                     ({Topic, true}) ->
-        case Topic of
-            client_connected ->
-                emqx_hooks:put('client.connected', {?MODULE, on_client_connected, []});
-            client_disconnected ->
-                emqx_hooks:put('client.disconnected', {?MODULE, on_client_disconnected, []});
-            client_subscribed ->
-                emqx_hooks:put('session.subscribed', {?MODULE, on_client_subscribed, []});
-            client_unsubscribed ->
-                emqx_hooks:put('session.unsubscribed', {?MODULE, on_client_unsubscribed, []});
-            message_delivered ->
-                emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []});
-            message_acked ->
-                emqx_hooks:put('message.acked', {?MODULE, on_message_acked, []});
-            message_dropped ->
-                emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []});
-            _ ->
-                ok
-        end
-    end, maps:to_list(list())).
+    enable(maps:to_list(list())).
 
 disable() ->
-    lists:foreach(fun({_Topic, false}) -> ok;
-                     ({Topic, true}) ->
-        case Topic of
-            client_connected ->
-                emqx_hooks:del('client.connected', {?MODULE, on_client_connected});
-            client_disconnected ->
-                emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected});
-            client_subscribed ->
-                emqx_hooks:del('session.subscribed', {?MODULE, on_client_subscribed});
-            client_unsubscribed ->
-                emqx_hooks:del('session.unsubscribed', {?MODULE, on_client_unsubscribed});
-            message_delivered ->
-                emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered});
-            message_acked ->
-                emqx_hooks:del('message.acked', {?MODULE, on_message_acked});
-            message_dropped ->
-                emqx_hooks:del('message.dropped', {?MODULE, on_message_dropped});
-            _ ->
-                ok
-        end
-    end, maps:to_list(list())).
+    foreach_with(fun check_enable/2, fun emqx_hooks:del/2, maps:to_list(list())).
 
 %%--------------------------------------------------------------------
 %% Callbacks
@@ -107,10 +74,10 @@ disable() ->
 on_client_connected(ClientInfo, ConnInfo) ->
     Payload0 = common_infos(ClientInfo, ConnInfo),
     Payload = Payload0#{
-        keepalive       => maps:get(keepalive, ConnInfo, 0),
-        clean_start     => maps:get(clean_start, ConnInfo, true),
-        expiry_interval => maps:get(expiry_interval, ConnInfo, 0)
-    },
+                        keepalive       => maps:get(keepalive, ConnInfo, 0),
+                        clean_start     => maps:get(clean_start, ConnInfo, true),
+                        expiry_interval => maps:get(expiry_interval, ConnInfo, 0)
+                       },
     publish_event_msg(<<"$event/client_connected">>, Payload).
 
 on_client_disconnected(ClientInfo,
@@ -118,14 +85,14 @@ on_client_disconnected(ClientInfo,
 
     Payload0 = common_infos(ClientInfo, ConnInfo),
     Payload = Payload0#{
-                  reason => reason(Reason),
-                  disconnected_at => DisconnectedAt
-                 },
+                        reason => reason(Reason),
+                        disconnected_at => DisconnectedAt
+                       },
     publish_event_msg(<<"$event/client_disconnected">>, Payload).
 
 on_client_subscribed(_ClientInfo = #{clientid := ClientId,
                                      username := Username},
-                      Topic, SubOpts) ->
+                     Topic, SubOpts) ->
     Payload = #{clientid => ClientId,
                 username => Username,
                 topic => Topic,
@@ -136,7 +103,7 @@ on_client_subscribed(_ClientInfo = #{clientid := ClientId,
 
 on_client_unsubscribed(_ClientInfo = #{clientid := ClientId,
                                        username := Username},
-                      Topic, _SubOpts) ->
+                       Topic, _SubOpts) ->
     Payload = #{clientid => ClientId,
                 username => Username,
                 topic => Topic,
@@ -281,3 +248,29 @@ ignore_sys_message(#message{flags = Flags}) ->
 publish_event_msg(Topic, Payload) ->
     _ = emqx_broker:safe_publish(make_msg(Topic, emqx_json:encode(Payload))),
     ok.
+
+enable(List) ->
+    foreach_with(fun check_enable/2, fun emqx_hooks:put/2, List).
+
+check_enable(Handler, {client_connected, true}) ->
+    Handler('client.connected', {?MODULE, on_client_connected, []});
+check_enable(Handler, {client_disconnected, true}) ->
+    Handler('client.disconnected', {?MODULE, on_client_disconnected, []});
+check_enable(Handler, {client_subscribed, true}) ->
+    Handler('session.subscribed', {?MODULE, on_client_subscribed, []});
+check_enable(Handler, {client_unsubscribed, true}) ->
+    Handler('session.unsubscribed', {?MODULE, on_client_unsubscribed, []});
+check_enable(Handler, {message_delivered, true}) ->
+    Handler('message.delivered', {?MODULE, on_message_delivered, []});
+check_enable(Handler, {message_acked, true}) ->
+    Handler('message.acked', {?MODULE, on_message_acked, []});
+check_enable(Handler, {message_dropped, true}) ->
+    Handler('message.dropped', {?MODULE, on_message_dropped, []});
+check_enable(_Handler, {_Topic, _Enable}) ->
+    ok.
+
+foreach_with(Fun, With, [H | T]) ->
+    Fun(With, H),
+    foreach_with(Fun, With, T);
+foreach_with(_Fun, _With, []) ->
+    ok.

+ 1 - 0
apps/emqx_modules/src/emqx_modules_sup.erl

@@ -37,6 +37,7 @@ start_link() ->
 %% Supervisor callbacks
 %%--------------------------------------------------------------------
 init([]) ->
+    emqx_event_message:init_conf_handler(),
     {ok, {{one_for_one, 10, 3600},
           [ ?CHILD(emqx_telemetry)
           , ?CHILD(emqx_topic_metrics)