Просмотр исходного кода

fix(emqx_bridge_mqtt): fix retry_inflight

The Inflight list should not be used to update State.inflight
Zaiming Shi 4 лет назад
Родитель
Сommit
d66f67d411

+ 13 - 9
apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl

@@ -337,7 +337,7 @@ connecting(#{reconnect_delay_ms := ReconnectDelayMs} = State) ->
     end.
 
 connected(state_timeout, connected, #{inflight := Inflight} = State) ->
-    case retry_inflight(State, Inflight) of
+    case retry_inflight(State#{inflight := []}, Inflight) of
         {ok, NewState} ->
             {keep_state, NewState, {next_event, internal, maybe_send}};
         {error, NewState} ->
@@ -348,10 +348,10 @@ connected(internal, maybe_send, State) ->
     {keep_state, NewState};
 
 connected(info, {disconnected, Conn, Reason},
-         #{connection := Connection, name := Name, reconnect_delay_ms := ReconnectDelayMs} = State) ->
+          #{connection := Connection, name := Name, reconnect_delay_ms := ReconnectDelayMs} = State) ->
+    ?tp(info, disconnected, #{name => Name, reason => Reason}),
     case Conn =:= maps:get(client_pid, Connection, undefined)  of
         true ->
-            ?LOG(info, "Bridge ~p diconnected~nreason=~p", [Name, Reason]),
             {next_state, idle, State#{connection => undefined}, {state_timeout, ReconnectDelayMs, reconnect}};
         false ->
             keep_state_and_data
@@ -434,12 +434,14 @@ do_connect(#{forwards := Forwards,
              subscriptions := Subs,
              connect_module := ConnectModule,
              connect_cfg := ConnectCfg,
+             inflight := Inflight,
              name := Name} = State) ->
     ok = subscribe_local_topics(Forwards, Name),
     case emqx_bridge_connect:start(ConnectModule, ConnectCfg#{subscriptions => Subs}) of
         {ok, Conn} ->
-            ?LOG(info, "Bridge ~p is connecting......", [Name]),
-            {ok, eval_bridge_handler(State#{connection => Conn}, connected)};
+            Res = eval_bridge_handler(State#{connection => Conn}, connected),
+            ?tp(info, connected, #{name => Name, inflight => length(Inflight)}),
+            {ok, Res};
         {error, Reason} ->
             {error, Reason, State}
     end.
@@ -475,10 +477,12 @@ collect(Acc) ->
 
 %% Retry all inflight (previously sent but not acked) batches.
 retry_inflight(State, []) -> {ok, State};
-retry_inflight(State, [#{q_ack_ref := QAckRef, batch := Batch} | Inflight]) ->
-    case do_send(State#{inflight := Inflight}, QAckRef, Batch) of
-        {ok, State1} -> retry_inflight(State1, Inflight);
-        {error, State1} -> {error, State1}
+retry_inflight(State, [#{q_ack_ref := QAckRef, batch := Batch} | Rest] = OldInf) ->
+    case do_send(State, QAckRef, Batch) of
+        {ok, State1} ->
+            retry_inflight(State1, Rest);
+        {error, #{inflight := NewInf} = State1} ->
+            {error, State1#{inflight := NewInf ++ OldInf}}
     end.
 
 pop_and_send(#{inflight := Inflight, max_inflight := Max } = State) ->

+ 1 - 1
apps/emqx_bridge_mqtt/test/emqx_bridge_stub_conn.erl

@@ -34,7 +34,7 @@ stop(_) -> ok.
 
 %% @doc Callback for `emqx_bridge_connect' behaviour
 -spec send(_, batch()) -> {ok, ack_ref()} | {error, any()}.
-send(#{stub_pid := Pid}, Batch) ->
+send(#{client_pid := Pid}, Batch) ->
     Ref = make_ref(),
     Pid ! {stub_message, self(), Ref, Batch},
     {ok, Ref}.

+ 50 - 3
apps/emqx_bridge_mqtt/test/emqx_bridge_worker_SUITE.erl

@@ -191,7 +191,7 @@ t_stub_normal(Config) when is_list(Config) ->
             connect_module => emqx_bridge_stub_conn,
             forward_mountpoint => <<"forwarded">>,
             start_type => auto,
-            stub_pid => self()
+            client_pid => self()
            },
     {ok, Pid} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
     ClientId = <<"ClientId">>,
@@ -218,7 +218,7 @@ t_stub_overflow(Config) when is_list(Config) ->
             connect_module => emqx_bridge_stub_conn,
             forward_mountpoint => <<"forwarded">>,
             start_type => auto,
-            stub_pid => self(),
+            client_pid => self(),
             max_inflight => MaxInflight
            },
     {ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
@@ -250,7 +250,7 @@ t_stub_random_order(Config) when is_list(Config) ->
             connect_module => emqx_bridge_stub_conn,
             forward_mountpoint => <<"forwarded">>,
             start_type => auto,
-            stub_pid => self(),
+            client_pid => self(),
             max_inflight => MaxInflight
            },
     {ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
@@ -273,6 +273,53 @@ t_stub_random_order(Config) when is_list(Config) ->
         ok = emqx_bridge_worker:stop(Worker)
     end.
 
+t_stub_retry_inflight(Config) when is_list(Config) ->
+    Topic = <<"to_stub_retry_inflight/a">>,
+    MaxInflight = 10,
+    Cfg = #{forwards => [Topic],
+            connect_module => emqx_bridge_stub_conn,
+            forward_mountpoint => <<"forwarded">>,
+            reconnect_delay_ms => 10,
+            start_type => auto,
+            client_pid => self(),
+            max_inflight => MaxInflight
+           },
+    {ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
+    ClientId = <<"ClientId2">>,
+    try
+        case ?block_until(#{?snk_kind := connected, inflight := 0}, 2000, 1000) of
+            {ok, #{inflight := 0}} -> ok;
+            Other -> ct:fail("~p", [Other])
+        end,
+        {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]),
+        {ok, _} = emqtt:connect(ConnPid),
+        lists:foreach(
+          fun(I) ->
+                  Data = integer_to_binary(I),
+                  _ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1)
+          end, lists:seq(1, MaxInflight)),
+        %% receive acks but do not ack
+        Acks1 = stub_receive(MaxInflight),
+        ?assertEqual(MaxInflight, length(Acks1)),
+        %% simulate a disconnect
+        Worker ! {disconnected, self(), test},
+        ?SNK_WAIT(disconnected),
+        case ?block_until(#{?snk_kind := connected, inflight := MaxInflight}, 2000, 20) of
+            {ok, _} -> ok;
+            Error -> ct:fail("~p", [Error])
+        end,
+        %% expect worker to retry inflight, so to receive acks again
+        Acks2 = stub_receive(MaxInflight),
+        ?assertEqual(MaxInflight, length(Acks2)),
+        lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end,
+                      lists:reverse(Acks2)),
+        ?SNK_WAIT(inflight_drained),
+        ?SNK_WAIT(replayq_drained),
+        emqtt:disconnect(ConnPid)
+    after
+        ok = emqx_bridge_worker:stop(Worker)
+    end.
+
 stub_receive(N) ->
     stub_receive(N, []).