Explorar o código

fix(cluster link): activate auto reconnect for forwarding resource

Thales Macedo Garitezi hai 1 ano
pai
achega
9bcd207fef

+ 3 - 0
apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl

@@ -93,6 +93,8 @@
 
 
 -define(PUB_TIMEOUT, 10_000).
 -define(PUB_TIMEOUT, 10_000).
 
 
+-define(AUTO_RECONNECT_INTERVAL_S, 2).
+
 -type cluster_name() :: binary().
 -type cluster_name() :: binary().
 
 
 -spec resource_id(cluster_name()) -> resource_id().
 -spec resource_id(cluster_name()) -> resource_id().
@@ -173,6 +175,7 @@ on_start(ResourceId, #{pool_size := PoolSize} = ClusterConf) ->
         {name, PoolName},
         {name, PoolName},
         {pool_size, PoolSize},
         {pool_size, PoolSize},
         {pool_type, hash},
         {pool_type, hash},
+        {auto_reconnect, ?AUTO_RECONNECT_INTERVAL_S},
         {client_opts, emqtt_client_opts(?MSG_CLIENTID_SUFFIX, ClusterConf)}
         {client_opts, emqtt_client_opts(?MSG_CLIENTID_SUFFIX, ClusterConf)}
     ],
     ],
     ok = emqx_resource:allocate_resource(ResourceId, pool_name, PoolName),
     ok = emqx_resource:allocate_resource(ResourceId, pool_name, PoolName),

+ 2 - 3
apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl

@@ -214,9 +214,8 @@ t_target_extrouting_gc(Config) ->
     Pubs1 = [M || {publish, M} <- ?drainMailbox(1_000)],
     Pubs1 = [M || {publish, M} <- ?drainMailbox(1_000)],
     %% We switch off `TargetNode2' first.  Since `TargetNode1' is the sole endpoint
     %% We switch off `TargetNode2' first.  Since `TargetNode1' is the sole endpoint
     %% configured in Target Cluster, the link will keep working (i.e., CL MQTT ecpool
     %% configured in Target Cluster, the link will keep working (i.e., CL MQTT ecpool
-    %% workers will stay connected).  If we turned `TargetNode1' first, then all ecpool
-    %% workers would die and stay dead (since currently we don't set `auto_reconnect' for
-    %% the pool).
+    %% workers will stay connected).  If we turned `TargetNode1' first, then the link
+    %% would stay down and stop replicating messages.
     {ok, _} = ?wait_async_action(
     {ok, _} = ?wait_async_action(
         emqx_cth_cluster:stop_node(TargetNode2),
         emqx_cth_cluster:stop_node(TargetNode2),
         #{?snk_kind := clink_extrouter_actor_cleaned, cluster := <<"cl.target">>}
         #{?snk_kind := clink_extrouter_actor_cleaned, cluster := <<"cl.target">>}