Просмотр исходного кода

fix(cluster link): trigger agent disconnection on exceptions

See https://github.com/emqx/emqx/pull/13928

Previously, even if an exception occurred (such as `assert_current_incarnation` failing,
for example), the agent would stay connected instead of letting it crash and recover.
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
7425563914

+ 63 - 17
apps/emqx_cluster_link/src/emqx_cluster_link.erl

@@ -28,6 +28,9 @@
     on_message_publish/1
 ]).
 
+%% Internal exports
+-export([do_handle_route_op_msg/1]).
+
 -include("emqx_cluster_link.hrl").
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_hooks.hrl").
@@ -109,6 +112,44 @@ forward(Routes, Delivery) ->
 %%--------------------------------------------------------------------
 
 on_message_publish(
+    #message{topic = <<?ROUTE_TOPIC_PREFIX, _/binary>>} = Msg
+) ->
+    case handle_route_op_msg(Msg) of
+        ok ->
+            {stop, []};
+        error ->
+            %% Disconnect so that upstream agent starts anew
+            Headers0 = Msg#message.headers,
+            Headers = Headers0#{
+                allow_publish => false,
+                should_disconnect => true
+            },
+            StopMsg = emqx_message:set_headers(Headers, Msg),
+            {stop, StopMsg}
+    end;
+on_message_publish(#message{topic = <<?MSG_TOPIC_PREFIX, ClusterName/binary>>, payload = Payload}) ->
+    case emqx_cluster_link_mqtt:decode_forwarded_msg(Payload) of
+        #message{} = ForwardedMsg ->
+            {stop, maybe_filter_incoming_msg(ForwardedMsg, ClusterName)};
+        _Err ->
+            %% Just ignore it. It must be already logged by the decoder
+            {stop, []}
+    end;
+on_message_publish(_Msg) ->
+    ok.
+
+put_hook() ->
+    emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_SYS_MSGS).
+
+delete_hook() ->
+    emqx_hooks:del('message.publish', {?MODULE, on_message_publish, []}).
+
+%%--------------------------------------------------------------------
+%% Internal exports
+%%--------------------------------------------------------------------
+
+%% Exported only for mocking in tests
+do_handle_route_op_msg(
     #message{topic = <<?ROUTE_TOPIC_PREFIX, ClusterName/binary>>, payload = Payload} = Msg
 ) ->
     case emqx_cluster_link_mqtt:decode_route_op(Payload) of
@@ -126,24 +167,8 @@ on_message_publish(
                 payload => ParsedPayload
             })
     end,
-    {stop, []};
-on_message_publish(#message{topic = <<?MSG_TOPIC_PREFIX, ClusterName/binary>>, payload = Payload}) ->
-    case emqx_cluster_link_mqtt:decode_forwarded_msg(Payload) of
-        #message{} = ForwardedMsg ->
-            {stop, maybe_filter_incomming_msg(ForwardedMsg, ClusterName)};
-        _Err ->
-            %% Just ignore it. It must be already logged by the decoder
-            {stop, []}
-    end;
-on_message_publish(_Msg) ->
     ok.
 
-put_hook() ->
-    emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_SYS_MSGS).
-
-delete_hook() ->
-    emqx_hooks:del('message.publish', {?MODULE, on_message_publish, []}).
-
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------
@@ -151,6 +176,27 @@ delete_hook() ->
 -define(PD_EXTROUTER_ACTOR, '$clink_extrouter_actor').
 -define(PD_EXTROUTER_ACTOR_STATE, '$clink_extrouter_actor_state').
 
+handle_route_op_msg(
+    #message{topic = <<?ROUTE_TOPIC_PREFIX, ClusterName/binary>>} = Msg
+) ->
+    try
+        ?MODULE:do_handle_route_op_msg(Msg)
+    catch
+        K:E:Stacktrace ->
+            MyClusterName = emqx_cluster_link_config:cluster(),
+            ?SLOG(error, #{
+                msg => "cluster_link_routesync_protocol_error",
+                kind => K,
+                reason => E,
+                stacktrace => Stacktrace,
+                %% How this cluster names itself
+                local_name => MyClusterName,
+                %% How the remote cluster names itself
+                received_from => ClusterName
+            }),
+            error
+    end.
+
 maybe_push_route_op(Op, Topic, RouteID) ->
     maybe_push_route_op(Op, Topic, RouteID, push).
 
@@ -264,7 +310,7 @@ update_actor_state(ActorSt) ->
 with_sender_name(#message{extra = Extra} = Msg, ClusterName) when is_map(Extra) ->
     Msg#message{extra = Extra#{link_origin => ClusterName}}.
 
-maybe_filter_incomming_msg(#message{topic = T} = Msg, ClusterName) ->
+maybe_filter_incoming_msg(#message{topic = T} = Msg, ClusterName) ->
     %% Should prevent irrelevant messages from being dispatched in case
     %% the remote routing state lags behind the local config changes.
     #{enable := Enable, topics := Topics} = emqx_cluster_link_config:link(ClusterName),

+ 1 - 2
apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl

@@ -494,8 +494,7 @@ handle_connect_error(Reason, St) ->
     ensure_reconnect_timer(St#st{error = Reason, status = disconnected}).
 
 handle_client_down(Reason, St = #st{target = TargetCluster, actor = Actor}) ->
-    ?SLOG(error, #{
-        msg => "cluster_link_connection_failed",
+    ?tp(error, "cluster_link_connection_failed", #{
         reason => Reason,
         target_cluster => St#st.target,
         actor => St#st.actor

+ 64 - 6
apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl

@@ -12,6 +12,8 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-define(ON(NODE, DO), erpc:call(NODE, fun() -> DO end)).
+
 %%
 
 all() ->
@@ -83,8 +85,8 @@ mk_target_cluster(BaseName, Config) ->
         "\n     topics = [\"#\"]"
         "\n   }"
         "\n ]}",
-    TargetApps1 = [{emqx_conf, combine([conf_log(), TargetConf])}],
-    TargetApps2 = [{emqx_conf, combine([conf_log(), conf_mqtt_listener(31883), TargetConf])}],
+    TargetApps1 = [{emqx_conf, combine([conf_log(), conf_mqtt_listener(31883), TargetConf])}],
+    TargetApps2 = [{emqx_conf, combine([conf_log(), TargetConf])}],
     emqx_cth_cluster:mk_nodespecs(
         [
             {mk_nodename(BaseName, t1), #{apps => TargetApps1, base_port => 20100}},
@@ -201,22 +203,29 @@ t_target_extrouting_gc(Config) ->
     TargetC2 = start_client_unlink("t_target_extrouting_gc2", TargetNode2),
     IsShared = ?config(is_shared_sub, Config),
 
-    {ok, _, _} = emqtt:subscribe(TargetC1, maybe_shared_topic(IsShared, <<"t/#">>), qos1),
-    {ok, _, _} = emqtt:subscribe(TargetC2, maybe_shared_topic(IsShared, <<"t/+">>), qos1),
+    TopicFilter1 = <<"t/+">>,
+    TopicFilter2 = <<"t/#">>,
+    {ok, _, _} = emqtt:subscribe(TargetC1, maybe_shared_topic(IsShared, TopicFilter1), qos1),
+    {ok, _, _} = emqtt:subscribe(TargetC2, maybe_shared_topic(IsShared, TopicFilter2), qos1),
     {ok, _} = ?block_until(#{?snk_kind := clink_route_sync_complete}),
     {ok, _} = emqtt:publish(SourceC1, <<"t/1">>, <<"HELLO1">>, qos1),
     {ok, _} = emqtt:publish(SourceC1, <<"t/2/ext">>, <<"HELLO2">>, qos1),
     {ok, _} = emqtt:publish(SourceC1, <<"t/3/ext">>, <<"HELLO3">>, qos1),
     Pubs1 = [M || {publish, M} <- ?drainMailbox(1_000)],
+    %% We switch off `TargetNode2' first.  Since `TargetNode1' is the sole endpoint
+    %% configured in Target Cluster, the link will keep working (i.e., CL MQTT ecpool
+    %% workers will stay connected).  If we turned `TargetNode1' first, then all ecpool
+    %% workers would die and stay dead (since currently we don't set `auto_reconnect' for
+    %% the pool).
     {ok, _} = ?wait_async_action(
-        emqx_cth_cluster:stop_node(TargetNode1),
+        emqx_cth_cluster:stop_node(TargetNode2),
         #{?snk_kind := clink_extrouter_actor_cleaned, cluster := <<"cl.target">>}
     ),
     {ok, _} = emqtt:publish(SourceC1, <<"t/4/ext">>, <<"HELLO4">>, qos1),
     {ok, _} = emqtt:publish(SourceC1, <<"t/5">>, <<"HELLO5">>, qos1),
     Pubs2 = [M || {publish, M} <- ?drainMailbox(1_000)],
     {ok, _} = ?wait_async_action(
-        emqx_cth_cluster:stop_node(TargetNode2),
+        emqx_cth_cluster:stop_node(TargetNode1),
         #{?snk_kind := clink_extrouter_actor_cleaned, cluster := <<"cl.target">>}
     ),
     ok = emqtt:stop(SourceC1),
@@ -236,6 +245,9 @@ t_target_extrouting_gc(Config) ->
             #{topic := <<"t/1">>, payload := <<"HELLO1">>, client_pid := _C2},
             #{topic := <<"t/2/ext">>, payload := <<"HELLO2">>},
             #{topic := <<"t/3/ext">>, payload := <<"HELLO3">>},
+            %% We expect only `HELLO5' and not `HELLO4' to be here because the former was
+            %% published while only `TargetNode1' was alive, and this node held only the
+            %% `t/+' subscription at that time.
             #{topic := <<"t/5">>, payload := <<"HELLO5">>}
         ],
         lists:sort(emqx_utils_maps:key_comparer(topic), Pubs1 ++ Pubs2)
@@ -253,6 +265,52 @@ t_target_extrouting_gc(Config) ->
         Trace
     ).
 
+%% Checks that, if an exception occurs while handling a route op message, we disconnect
+%% the upstream agent client so it restarts.
+t_disconnect_on_errors('init', Config) ->
+    SourceNodes = emqx_cth_cluster:start(mk_source_cluster(?FUNCTION_NAME, Config)),
+    [TargetNodeSpec | _] = mk_target_cluster(?FUNCTION_NAME, Config),
+    TargetNodes = emqx_cth_cluster:start([TargetNodeSpec]),
+    _Apps = start_cluster_link(SourceNodes ++ TargetNodes, Config),
+    ok = snabbkaffe:start_trace(),
+    [
+        {source_nodes, SourceNodes},
+        {target_nodes, TargetNodes}
+        | Config
+    ];
+t_disconnect_on_errors('end', Config) ->
+    ok = snabbkaffe:stop(),
+    ok = emqx_cth_cluster:stop(?config(source_nodes, Config)),
+    ok = emqx_cth_cluster:stop(?config(target_nodes, Config)).
+t_disconnect_on_errors(Config) ->
+    ct:timetrap({seconds, 20}),
+    [SN1 | _] = nodes_source(Config),
+    [TargetNode] = nodes_target(Config),
+    SC1 = start_client("t_disconnect_on_errors", SN1),
+    ok = ?ON(SN1, meck:new(emqx_cluster_link, [passthrough, no_link, no_history])),
+    ?assertMatch(
+        {_, {ok, _}},
+        ?wait_async_action(
+            begin
+                ok = ?ON(
+                    TargetNode,
+                    meck:expect(
+                        emqx_cluster_link,
+                        do_handle_route_op_msg,
+                        fun(_Msg) ->
+                            meck:exception(error, {unexpected, error})
+                        end
+                    )
+                ),
+                emqtt:subscribe(SC1, <<"t/u/v">>, 1)
+            end,
+            #{?snk_kind := "cluster_link_connection_failed"}
+        )
+    ),
+    _ = ?ON(TargetNode, meck:unload()),
+    ok = emqtt:stop(SC1),
+    ok.
+
 %%
 
 maybe_shared_topic(true = _IsShared, Topic) ->

+ 1 - 0
changes/ee/fix-13929.en.md

@@ -0,0 +1 @@
+Fixed an issue where a cluster link could in some occasions become stuck and stop working until a manual restart on the upstream cluster was performed.