Просмотр исходного кода

Merge pull request #9870 from keynslug/fix/mqtt-connection-loss-feedback

feat(mqtt-bridge): avoid middleman process
Zaiming (Stone) Shi 3 лет назад
Родитель
Сommit
b3ad9e97d2

+ 0 - 1
apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl

@@ -72,7 +72,6 @@ up(#{<<"connector">> := Connector} = Config) ->
         Cn(proto_ver, <<"v4">>),
         Cn(server, undefined),
         Cn(retry_interval, <<"15s">>),
-        Cn(reconnect_interval, <<"15s">>),
         Cn(ssl, default_ssl()),
         {enable, Enable},
         {resource_opts, default_resource_opts()},

+ 1 - 1
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -640,7 +640,7 @@ t_bridges_probe(Config) ->
     ?assertMatch(
         #{
             <<"code">> := <<"TEST_FAILED">>,
-            <<"message">> := <<"#{reason => econnrefused", _/binary>>
+            <<"message">> := <<"econnrefused">>
         },
         jsx:decode(ConnRefused)
     ),

+ 0 - 2
apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl

@@ -224,7 +224,6 @@ bridges {
         mode = \"cluster_shareload\"
         password = \"\"
         proto_ver = \"v5\"
-        reconnect_interval = \"15s\"
         replayq {offload = false, seg_bytes = \"100MB\"}
         retry_interval = \"12s\"
         server = \"localhost:1883\"
@@ -257,7 +256,6 @@ bridges {
         mode = \"cluster_shareload\"
         password = \"\"
         proto_ver = \"v4\"
-        reconnect_interval = \"15s\"
         replayq {offload = false, seg_bytes = \"100MB\"}
         retry_interval = \"44s\"
         server = \"localhost:1883\"

+ 4 - 4
apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl

@@ -825,15 +825,15 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
             <<"type">> => ?TYPE_MQTT,
             <<"name">> => ?BRIDGE_NAME_EGRESS,
             <<"egress">> => ?EGRESS_CONF,
-            %% to make it reconnect quickly
-            <<"reconnect_interval">> => <<"1s">>,
             <<"resource_opts">> => #{
                 <<"worker_pool_size">> => 2,
                 <<"query_mode">> => <<"sync">>,
                 %% using a long time so we can test recovery
                 <<"request_timeout">> => <<"15s">>,
                 %% to make it check the healthy quickly
-                <<"health_check_interval">> => <<"0.5s">>
+                <<"health_check_interval">> => <<"0.5s">>,
+                %% to make it reconnect quickly
+                <<"auto_restart_interval">> => <<"1s">>
             }
         }
     ),
@@ -911,7 +911,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
     Decoded1 = jsx:decode(BridgeStr1),
     DecodedMetrics1 = jsx:decode(BridgeMetricsStr1),
     ?assertMatch(
-        Status when (Status == <<"connected">> orelse Status == <<"connecting">>),
+        Status when (Status == <<"connecting">> orelse Status == <<"disconnected">>),
         maps:get(<<"status">>, Decoded1)
     ),
     %% matched >= 3 because of possible retries.

+ 41 - 40
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -105,16 +105,15 @@ init([]) ->
     {ok, {SupFlag, []}}.
 
 bridge_spec(Config) ->
+    {Name, NConfig} = maps:take(name, Config),
     #{
-        id => maps:get(name, Config),
-        start => {emqx_connector_mqtt_worker, start_link, [Config]},
-        restart => permanent,
-        shutdown => 5000,
-        type => worker,
-        modules => [emqx_connector_mqtt_worker]
+        id => Name,
+        start => {emqx_connector_mqtt_worker, start_link, [Name, NConfig]},
+        restart => temporary,
+        shutdown => 5000
     }.
 
--spec bridges() -> [{node(), map()}].
+-spec bridges() -> [{_Name, _Status}].
 bridges() ->
     [
         {Name, emqx_connector_mqtt_worker:status(Name)}
@@ -144,8 +143,7 @@ on_message_received(Msg, HookPoint, ResId) ->
 %% ===================================================================
 callback_mode() -> async_if_possible.
 
-on_start(InstId, Conf) ->
-    InstanceId = binary_to_atom(InstId, utf8),
+on_start(InstanceId, Conf) ->
     ?SLOG(info, #{
         msg => "starting_mqtt_connector",
         connector => InstanceId,
@@ -154,8 +152,8 @@ on_start(InstId, Conf) ->
     BasicConf = basic_config(Conf),
     BridgeConf = BasicConf#{
         name => InstanceId,
-        clientid => clientid(InstId, Conf),
-        subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), Conf, InstId),
+        clientid => clientid(InstanceId, Conf),
+        subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), Conf, InstanceId),
         forwards => make_forward_confs(maps:get(egress, Conf, undefined))
     },
     case ?MODULE:create_bridge(BridgeConf) of
@@ -189,44 +187,49 @@ on_stop(_InstId, #{name := InstanceId}) ->
 
 on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) ->
     ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
-    emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg).
+    case emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg) of
+        ok ->
+            ok;
+        {error, Reason} ->
+            classify_error(Reason)
+    end.
 
-on_query_async(
-    _InstId,
-    {send_message, Msg},
-    {ReplyFun, Args},
-    #{name := InstanceId}
-) ->
+on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) ->
     ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
-    %% this is a cast, currently.
-    ok = emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}),
-    WorkerPid = get_worker_pid(InstanceId),
-    {ok, WorkerPid}.
+    case emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, Callback) of
+        ok ->
+            % TODO this is racy
+            {ok, emqx_connector_mqtt_worker:pid(InstanceId)};
+        {error, Reason} ->
+            classify_error(Reason)
+    end.
 
 on_get_status(_InstId, #{name := InstanceId}) ->
-    case emqx_connector_mqtt_worker:status(InstanceId) of
-        connected -> connected;
-        _ -> connecting
-    end.
+    emqx_connector_mqtt_worker:status(InstanceId).
+
+classify_error(disconnected = Reason) ->
+    {error, {recoverable_error, Reason}};
+classify_error({disconnected, _RC, _} = Reason) ->
+    {error, {recoverable_error, Reason}};
+classify_error({shutdown, _} = Reason) ->
+    {error, {recoverable_error, Reason}};
+classify_error(Reason) ->
+    {error, {unrecoverable_error, Reason}}.
 
 ensure_mqtt_worker_started(InstanceId, BridgeConf) ->
-    case emqx_connector_mqtt_worker:ensure_started(InstanceId) of
-        ok -> {ok, #{name => InstanceId, bridge_conf => BridgeConf}};
-        {error, Reason} -> {error, Reason}
+    case emqx_connector_mqtt_worker:connect(InstanceId) of
+        {ok, Properties} ->
+            {ok, #{name => InstanceId, config => BridgeConf, props => Properties}};
+        {error, Reason} ->
+            {error, Reason}
     end.
 
-%% mqtt workers, when created and called via bridge callbacks, are
-%% registered.
--spec get_worker_pid(atom()) -> pid().
-get_worker_pid(InstanceId) ->
-    whereis(InstanceId).
-
 make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 ->
     undefined;
 make_sub_confs(undefined, _Conf, _) ->
     undefined;
-make_sub_confs(SubRemoteConf, Conf, InstId) ->
-    ResId = emqx_resource_manager:manager_id_to_resource_id(InstId),
+make_sub_confs(SubRemoteConf, Conf, InstanceId) ->
+    ResId = emqx_resource_manager:manager_id_to_resource_id(InstanceId),
     case maps:find(hookpoint, Conf) of
         error ->
             error({no_hookpoint_provided, Conf});
@@ -260,7 +263,6 @@ basic_config(
         %% 30s
         connect_timeout => 30,
         auto_reconnect => true,
-        reconnect_interval => ?AUTO_RECONNECT_INTERVAL,
         proto_ver => ProtoVer,
         %% Opening bridge_mode will form a non-standard mqtt connection message.
         %% A load balancing server (such as haproxy) is often set up before the emqx broker server.
@@ -273,8 +275,7 @@ basic_config(
         retry_interval => RetryIntv,
         max_inflight => MaxInflight,
         ssl => EnableSsl,
-        ssl_opts => maps:to_list(maps:remove(enable, Ssl)),
-        if_record_metrics => true
+        ssl_opts => maps:to_list(maps:remove(enable, Ssl))
     },
     maybe_put_fields([username, password], Conf, BasicConf).
 

+ 0 - 236
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl

@@ -1,236 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
-%% @doc This module implements EMQX Bridge transport layer on top of MQTT protocol
-
--module(emqx_connector_mqtt_mod).
-
--export([
-    start/1,
-    send/2,
-    send_async/3,
-    stop/1,
-    ping/1
-]).
-
--export([
-    ensure_subscribed/3,
-    ensure_unsubscribed/2
-]).
-
-%% callbacks for emqtt
--export([
-    handle_publish/3,
-    handle_disconnected/2
-]).
-
--include_lib("emqx/include/logger.hrl").
--include_lib("emqx/include/emqx_mqtt.hrl").
-
--define(ACK_REF(ClientPid, PktId), {ClientPid, PktId}).
-
-%% Messages towards ack collector process
--define(REF_IDS(Ref, Ids), {Ref, Ids}).
-
-%%--------------------------------------------------------------------
-%% emqx_bridge_connect callbacks
-%%--------------------------------------------------------------------
-
-start(Config) ->
-    Parent = self(),
-    ServerStr = iolist_to_binary(maps:get(server, Config)),
-    {Server, Port} = emqx_connector_mqtt_schema:parse_server(ServerStr),
-    Mountpoint = maps:get(receive_mountpoint, Config, undefined),
-    Subscriptions = maps:get(subscriptions, Config, undefined),
-    Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions),
-    Handlers = make_hdlr(Parent, Vars, #{server => ServerStr}),
-    Config1 = Config#{
-        msg_handler => Handlers,
-        host => Server,
-        port => Port,
-        force_ping => true,
-        proto_ver => maps:get(proto_ver, Config, v4)
-    },
-    case emqtt:start_link(process_config(Config1)) of
-        {ok, Pid} ->
-            case emqtt:connect(Pid) of
-                {ok, _} ->
-                    try
-                        ok = sub_remote_topics(Pid, Subscriptions),
-                        {ok, #{client_pid => Pid, subscriptions => Subscriptions}}
-                    catch
-                        throw:Reason ->
-                            ok = stop(#{client_pid => Pid}),
-                            {error, error_reason(Reason, ServerStr)}
-                    end;
-                {error, Reason} ->
-                    ok = stop(#{client_pid => Pid}),
-                    {error, error_reason(Reason, ServerStr)}
-            end;
-        {error, Reason} ->
-            {error, error_reason(Reason, ServerStr)}
-    end.
-
-error_reason(Reason, ServerStr) ->
-    #{reason => Reason, server => ServerStr}.
-
-stop(#{client_pid := Pid}) ->
-    safe_stop(Pid, fun() -> emqtt:stop(Pid) end, 1000),
-    ok.
-
-ping(undefined) ->
-    pang;
-ping(#{client_pid := Pid}) ->
-    emqtt:ping(Pid).
-
-ensure_subscribed(#{client_pid := Pid, subscriptions := Subs} = Conn, Topic, QoS) when
-    is_pid(Pid)
-->
-    case emqtt:subscribe(Pid, Topic, QoS) of
-        {ok, _, _} -> Conn#{subscriptions => [{Topic, QoS} | Subs]};
-        Error -> {error, Error}
-    end;
-ensure_subscribed(_Conn, _Topic, _QoS) ->
-    %% return ok for now
-    %% next re-connect should should call start with new topic added to config
-    ok.
-
-ensure_unsubscribed(#{client_pid := Pid, subscriptions := Subs} = Conn, Topic) when is_pid(Pid) ->
-    case emqtt:unsubscribe(Pid, Topic) of
-        {ok, _, _} -> Conn#{subscriptions => lists:keydelete(Topic, 1, Subs)};
-        Error -> {error, Error}
-    end;
-ensure_unsubscribed(Conn, _) ->
-    %% return ok for now
-    %% next re-connect should should call start with this topic deleted from config
-    Conn.
-
-safe_stop(Pid, StopF, Timeout) ->
-    MRef = monitor(process, Pid),
-    unlink(Pid),
-    try
-        StopF()
-    catch
-        _:_ ->
-            ok
-    end,
-    receive
-        {'DOWN', MRef, _, _, _} ->
-            ok
-    after Timeout ->
-        exit(Pid, kill)
-    end.
-
-send(#{client_pid := ClientPid}, Msg) ->
-    emqtt:publish(ClientPid, Msg).
-
-send_async(#{client_pid := ClientPid}, Msg, Callback) ->
-    emqtt:publish_async(ClientPid, Msg, infinity, Callback).
-
-handle_publish(Msg, undefined, _Opts) ->
-    ?SLOG(error, #{
-        msg =>
-            "cannot_publish_to_local_broker_as"
-            "_'ingress'_is_not_configured",
-        message => Msg
-    });
-handle_publish(#{properties := Props} = Msg0, Vars, Opts) ->
-    Msg = format_msg_received(Msg0, Opts),
-    ?SLOG(debug, #{
-        msg => "publish_to_local_broker",
-        message => Msg,
-        vars => Vars
-    }),
-    case Vars of
-        #{on_message_received := {Mod, Func, Args}} ->
-            _ = erlang:apply(Mod, Func, [Msg | Args]);
-        _ ->
-            ok
-    end,
-    maybe_publish_to_local_broker(Msg, Vars, Props).
-
-handle_disconnected(Reason, Parent) ->
-    Parent ! {disconnected, self(), Reason}.
-
-make_hdlr(Parent, Vars, Opts) ->
-    #{
-        publish => {fun ?MODULE:handle_publish/3, [Vars, Opts]},
-        disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]}
-    }.
-
-sub_remote_topics(_ClientPid, undefined) ->
-    ok;
-sub_remote_topics(ClientPid, #{remote := #{topic := FromTopic, qos := QoS}}) ->
-    case emqtt:subscribe(ClientPid, FromTopic, QoS) of
-        {ok, _, _} -> ok;
-        Error -> throw(Error)
-    end.
-
-process_config(Config) ->
-    maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config).
-
-maybe_publish_to_local_broker(Msg, Vars, Props) ->
-    case emqx_map_lib:deep_get([local, topic], Vars, undefined) of
-        %% local topic is not set, discard it
-        undefined -> ok;
-        _ -> emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props))
-    end.
-
-format_msg_received(
-    #{
-        dup := Dup,
-        payload := Payload,
-        properties := Props,
-        qos := QoS,
-        retain := Retain,
-        topic := Topic
-    },
-    #{server := Server}
-) ->
-    #{
-        id => emqx_guid:to_hexstr(emqx_guid:gen()),
-        server => Server,
-        payload => Payload,
-        topic => Topic,
-        qos => QoS,
-        dup => Dup,
-        retain => Retain,
-        pub_props => printable_maps(Props),
-        message_received_at => erlang:system_time(millisecond)
-    }.
-
-printable_maps(undefined) ->
-    #{};
-printable_maps(Headers) ->
-    maps:fold(
-        fun
-            ('User-Property', V0, AccIn) when is_list(V0) ->
-                AccIn#{
-                    'User-Property' => maps:from_list(V0),
-                    'User-Property-Pairs' => [
-                        #{
-                            key => Key,
-                            value => Value
-                        }
-                     || {Key, Value} <- V0
-                    ]
-                };
-            (K, V0, AccIn) ->
-                AccIn#{K => V0}
-        end,
-        #{},
-        Headers
-    ).

+ 0 - 6
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl

@@ -72,12 +72,6 @@ fields("server_configs") ->
             )},
         {server, emqx_schema:servers_sc(#{desc => ?DESC("server")}, ?MQTT_HOST_OPTS)},
         {clientid_prefix, mk(binary(), #{required => false, desc => ?DESC("clientid_prefix")})},
-        {reconnect_interval,
-            mk_duration(
-                "Reconnect interval. Delay for the MQTT bridge to retry establishing the connection "
-                "in case of transportation failure.",
-                #{default => "15s"}
-            )},
         {proto_ver,
             mk(
                 hoconsc:enum([v3, v4, v5]),

+ 280 - 326
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl

@@ -60,172 +60,252 @@
 %% * Local messages are all normalised to QoS-1 when exporting to remote
 
 -module(emqx_connector_mqtt_worker).
--behaviour(gen_statem).
 
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("emqx/include/logger.hrl").
 
 %% APIs
 -export([
-    start_link/1,
-    stop/1
-]).
-
-%% gen_statem callbacks
--export([
-    terminate/3,
-    code_change/4,
-    init/1,
-    callback_mode/0
-]).
-
-%% state functions
--export([
-    idle/3,
-    connected/3
+    start_link/2,
+    stop/1,
+    pid/1
 ]).
 
 %% management APIs
 -export([
-    ensure_started/1,
-    ensure_stopped/1,
+    connect/1,
     status/1,
     ping/1,
     send_to_remote/2,
     send_to_remote_async/3
 ]).
 
--export([get_forwards/1]).
-
--export([get_subscriptions/1]).
+-export([handle_publish/3]).
+-export([handle_disconnect/1]).
 
 -export_type([
     config/0,
     ack_ref/0
 ]).
 
--type id() :: atom() | string() | pid().
--type qos() :: emqx_types:qos().
+-type name() :: term().
+% -type qos() :: emqx_types:qos().
 -type config() :: map().
 -type ack_ref() :: term().
--type topic() :: emqx_types:topic().
+% -type topic() :: emqx_types:topic().
 
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 
-%% same as default in-flight limit for emqtt
--define(DEFAULT_INFLIGHT_SIZE, 32).
--define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)).
+-define(REF(Name), {via, gproc, ?NAME(Name)}).
+-define(NAME(Name), {n, l, Name}).
 
 %% @doc Start a bridge worker. Supported configs:
-%% start_type: 'manual' (default) or 'auto', when manual, bridge will stay
-%%      at 'idle' state until a manual call to start it.
-%% connect_module: The module which implements emqx_bridge_connect behaviour
-%%      and work as message batch transport layer
-%% reconnect_interval: Delay in milli-seconds for the bridge worker to retry
-%%      in case of transportation failure.
-%% max_inflight: Max number of batches allowed to send-ahead before receiving
-%%       confirmation from remote node/cluster
 %% mountpoint: The topic mount point for messages sent to remote node/cluster
 %%      `undefined', `<<>>' or `""' to disable
 %% forwards: Local topics to subscribe.
 %%
 %% Find more connection specific configs in the callback modules
 %% of emqx_bridge_connect behaviour.
-start_link(Opts) when is_list(Opts) ->
-    start_link(maps:from_list(Opts));
-start_link(Opts) ->
-    case maps:get(name, Opts, undefined) of
-        undefined ->
-            gen_statem:start_link(?MODULE, Opts, []);
-        Name ->
-            Name1 = name(Name),
-            gen_statem:start_link({local, Name1}, ?MODULE, Opts#{name => Name1}, [])
+-spec start_link(name(), map()) ->
+    {ok, pid()} | {error, _Reason}.
+start_link(Name, BridgeOpts) ->
+    ?SLOG(debug, #{
+        msg => "client_starting",
+        name => Name,
+        options => BridgeOpts
+    }),
+    Conf = init_config(BridgeOpts),
+    Options = mk_client_options(Conf, BridgeOpts),
+    case emqtt:start_link(Options) of
+        {ok, Pid} ->
+            true = gproc:reg_other(?NAME(Name), Pid, Conf),
+            {ok, Pid};
+        {error, Reason} = Error ->
+            ?SLOG(error, #{
+                msg => "client_start_failed",
+                config => emqx_misc:redact(BridgeOpts),
+                reason => Reason
+            }),
+            Error
     end.
 
-ensure_started(Name) ->
-    gen_statem:call(name(Name), ensure_started).
+init_config(Opts) ->
+    Mountpoint = maps:get(forward_mountpoint, Opts, undefined),
+    Subscriptions = maps:get(subscriptions, Opts, undefined),
+    Forwards = maps:get(forwards, Opts, undefined),
+    #{
+        mountpoint => format_mountpoint(Mountpoint),
+        subscriptions => pre_process_subscriptions(Subscriptions),
+        forwards => pre_process_forwards(Forwards)
+    }.
+
+mk_client_options(Conf, BridgeOpts) ->
+    Server = iolist_to_binary(maps:get(server, BridgeOpts)),
+    HostPort = emqx_connector_mqtt_schema:parse_server(Server),
+    Mountpoint = maps:get(receive_mountpoint, BridgeOpts, undefined),
+    Subscriptions = maps:get(subscriptions, Conf),
+    Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions),
+    Opts = maps:without(
+        [
+            address,
+            auto_reconnect,
+            conn_type,
+            mountpoint,
+            forwards,
+            receive_mountpoint,
+            subscriptions
+        ],
+        BridgeOpts
+    ),
+    Opts#{
+        msg_handler => mk_client_event_handler(Vars, #{server => Server}),
+        hosts => [HostPort],
+        force_ping => true,
+        proto_ver => maps:get(proto_ver, BridgeOpts, v4)
+    }.
+
+mk_client_event_handler(Vars, Opts) when Vars /= undefined ->
+    #{
+        publish => {fun ?MODULE:handle_publish/3, [Vars, Opts]},
+        disconnected => {fun ?MODULE:handle_disconnect/1, []}
+    };
+mk_client_event_handler(undefined, _Opts) ->
+    undefined.
 
-%% @doc Manually stop bridge worker. State idempotency ensured.
-ensure_stopped(Name) ->
-    gen_statem:call(name(Name), ensure_stopped, 5000).
+connect(Name) ->
+    #{subscriptions := Subscriptions} = get_config(Name),
+    case emqtt:connect(pid(Name)) of
+        {ok, Properties} ->
+            case subscribe_remote_topics(Name, Subscriptions) of
+                ok ->
+                    {ok, Properties};
+                {ok, _, _RCs} ->
+                    {ok, Properties};
+                {error, Reason} = Error ->
+                    ?SLOG(error, #{
+                        msg => "client_subscribe_failed",
+                        subscriptions => Subscriptions,
+                        reason => Reason
+                    }),
+                    Error
+            end;
+        {error, Reason} = Error ->
+            ?SLOG(error, #{
+                msg => "client_connect_failed",
+                reason => Reason
+            }),
+            Error
+    end.
 
-stop(Pid) -> gen_statem:stop(Pid).
+subscribe_remote_topics(Ref, #{remote := #{topic := FromTopic, qos := QoS}}) ->
+    emqtt:subscribe(ref(Ref), FromTopic, QoS);
+subscribe_remote_topics(_Ref, undefined) ->
+    ok.
 
-status(Pid) when is_pid(Pid) ->
-    gen_statem:call(Pid, status);
-status(Name) ->
-    gen_statem:call(name(Name), status).
+stop(Ref) ->
+    emqtt:stop(ref(Ref)).
+
+pid(Name) ->
+    gproc:lookup_pid(?NAME(Name)).
+
+status(Ref) ->
+    trycall(
+        fun() ->
+            Info = emqtt:info(ref(Ref)),
+            case proplists:get_value(socket, Info) of
+                Socket when Socket /= undefined ->
+                    connected;
+                undefined ->
+                    connecting
+            end
+        end,
+        #{noproc => disconnected}
+    ).
 
-ping(Pid) when is_pid(Pid) ->
-    gen_statem:call(Pid, ping);
-ping(Name) ->
-    gen_statem:call(name(Name), ping).
+ping(Ref) ->
+    emqtt:ping(ref(Ref)).
 
-send_to_remote(Pid, Msg) when is_pid(Pid) ->
-    gen_statem:call(Pid, {send_to_remote, Msg});
-send_to_remote(Name, Msg) ->
-    gen_statem:call(name(Name), {send_to_remote, Msg}).
+send_to_remote(Name, MsgIn) ->
+    trycall(
+        fun() -> do_send(Name, export_msg(Name, MsgIn)) end,
+        #{
+            badarg => {error, disconnected},
+            noproc => {error, disconnected}
+        }
+    ).
 
-send_to_remote_async(Pid, Msg, Callback) when is_pid(Pid) ->
-    gen_statem:cast(Pid, {send_to_remote_async, Msg, Callback});
-send_to_remote_async(Name, Msg, Callback) ->
-    gen_statem:cast(name(Name), {send_to_remote_async, Msg, Callback}).
+do_send(Name, {true, Msg}) ->
+    case emqtt:publish(pid(Name), Msg) of
+        ok ->
+            ok;
+        {ok, #{reason_code := RC}} when
+            RC =:= ?RC_SUCCESS;
+            RC =:= ?RC_NO_MATCHING_SUBSCRIBERS
+        ->
+            ok;
+        {ok, #{reason_code := RC, reason_code_name := Reason}} ->
+            ?SLOG(warning, #{
+                msg => "remote_publish_failed",
+                message => Msg,
+                reason_code => RC,
+                reason_code_name => Reason
+            }),
+            {error, Reason};
+        {error, Reason} ->
+            ?SLOG(info, #{
+                msg => "client_failed",
+                reason => Reason
+            }),
+            {error, Reason}
+    end;
+do_send(_Name, false) ->
+    ok.
 
-%% @doc Return all forwards (local subscriptions).
--spec get_forwards(id()) -> [topic()].
-get_forwards(Name) -> gen_statem:call(name(Name), get_forwards, timer:seconds(1000)).
+send_to_remote_async(Name, MsgIn, Callback) ->
+    trycall(
+        fun() -> do_send_async(Name, export_msg(Name, MsgIn), Callback) end,
+        #{badarg => {error, disconnected}}
+    ).
 
-%% @doc Return all subscriptions (subscription over mqtt connection to remote broker).
--spec get_subscriptions(id()) -> [{emqx_types:topic(), qos()}].
-get_subscriptions(Name) -> gen_statem:call(name(Name), get_subscriptions).
+do_send_async(Name, {true, Msg}, Callback) ->
+    emqtt:publish_async(pid(Name), Msg, _Timeout = infinity, Callback);
+do_send_async(_Name, false, _Callback) ->
+    ok.
 
-callback_mode() -> [state_functions].
+ref(Pid) when is_pid(Pid) ->
+    Pid;
+ref(Term) ->
+    ?REF(Term).
 
-%% @doc Config should be a map().
-init(#{name := Name} = ConnectOpts) ->
-    ?SLOG(debug, #{
-        msg => "starting_bridge_worker",
-        name => Name
-    }),
-    erlang:process_flag(trap_exit, true),
-    State = init_state(ConnectOpts),
-    self() ! idle,
-    {ok, idle, State#{
-        connect_opts => pre_process_opts(ConnectOpts)
-    }}.
-
-init_state(Opts) ->
-    ReconnDelayMs = maps:get(reconnect_interval, Opts, ?DEFAULT_RECONNECT_DELAY_MS),
-    StartType = maps:get(start_type, Opts, manual),
-    Mountpoint = maps:get(forward_mountpoint, Opts, undefined),
-    MaxInflightSize = maps:get(max_inflight, Opts, ?DEFAULT_INFLIGHT_SIZE),
-    Name = maps:get(name, Opts, undefined),
-    #{
-        start_type => StartType,
-        reconnect_interval => ReconnDelayMs,
-        mountpoint => format_mountpoint(Mountpoint),
-        max_inflight => MaxInflightSize,
-        connection => undefined,
-        name => Name
-    }.
+trycall(Fun, Else) ->
+    try
+        Fun()
+    catch
+        error:badarg ->
+            maps:get(badarg, Else);
+        exit:{noproc, _} ->
+            maps:get(noproc, Else)
+    end.
 
-pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts) ->
-    ConnectOpts#{
-        subscriptions => pre_process_in_out(in, InConf),
-        forwards => pre_process_in_out(out, OutConf)
-    }.
+format_mountpoint(undefined) ->
+    undefined;
+format_mountpoint(Prefix) ->
+    binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).
 
-pre_process_in_out(_, undefined) ->
+pre_process_subscriptions(undefined) ->
     undefined;
-pre_process_in_out(in, #{local := LC} = Conf) when is_map(Conf) ->
+pre_process_subscriptions(#{local := LC} = Conf) when is_map(Conf) ->
     Conf#{local => pre_process_in_out_common(LC)};
-pre_process_in_out(in, Conf) when is_map(Conf) ->
+pre_process_subscriptions(Conf) when is_map(Conf) ->
     %% have no 'local' field in the config
+    undefined.
+
+pre_process_forwards(undefined) ->
     undefined;
-pre_process_in_out(out, #{remote := RC} = Conf) when is_map(Conf) ->
+pre_process_forwards(#{remote := RC} = Conf) when is_map(Conf) ->
     Conf#{remote => pre_process_in_out_common(RC)};
-pre_process_in_out(out, Conf) when is_map(Conf) ->
+pre_process_forwards(Conf) when is_map(Conf) ->
     %% have no 'remote' field in the config
     undefined.
 
@@ -245,238 +325,112 @@ pre_process_conf(Key, Conf) ->
             Conf#{Key => Val}
     end.
 
-code_change(_Vsn, State, Data, _Extra) ->
-    {ok, State, Data}.
+get_config(Name) ->
+    gproc:lookup_value(?NAME(Name)).
 
-terminate(_Reason, _StateName, State) ->
-    _ = disconnect(State),
-    maybe_destroy_session(State).
-
-maybe_destroy_session(#{connect_opts := ConnectOpts = #{clean_start := false}} = State) ->
-    try
-        %% Destroy session if clean_start is not set.
-        %% Ignore any crashes, just refresh the clean_start = true.
-        _ = do_connect(State#{connect_opts => ConnectOpts#{clean_start => true}}),
-        _ = disconnect(State),
-        ok
-    catch
-        _:_ ->
-            ok
-    end;
-maybe_destroy_session(_State) ->
-    ok.
-
-%% ensure_started will be deprecated in the future
-idle({call, From}, ensure_started, State) ->
-    case do_connect(State) of
-        {ok, State1} ->
-            {next_state, connected, State1, [{reply, From, ok}, {state_timeout, 0, connected}]};
-        {error, Reason, _State} ->
-            {keep_state_and_data, {reply, From, {error, Reason}}}
-    end;
-idle({call, From}, {send_to_remote, _}, _State) ->
-    {keep_state_and_data, {reply, From, {error, {recoverable_error, not_connected}}}};
-%% @doc Standing by for manual start.
-idle(info, idle, #{start_type := manual}) ->
-    keep_state_and_data;
-%% @doc Standing by for auto start.
-idle(info, idle, #{start_type := auto} = State) ->
-    connecting(State);
-idle(state_timeout, reconnect, State) ->
-    connecting(State);
-idle(Type, Content, State) ->
-    common(idle, Type, Content, State).
-
-connecting(#{reconnect_interval := ReconnectDelayMs} = State) ->
-    case do_connect(State) of
-        {ok, State1} ->
-            {next_state, connected, State1, {state_timeout, 0, connected}};
-        _ ->
-            {keep_state_and_data, {state_timeout, ReconnectDelayMs, reconnect}}
-    end.
-
-connected(state_timeout, connected, State) ->
-    %% nothing to do
-    {keep_state, State};
-connected({call, From}, {send_to_remote, Msg}, State) ->
-    case do_send(State, Msg) of
-        {ok, NState} ->
-            {keep_state, NState, {reply, From, ok}};
-        {error, Reason} ->
-            {keep_state_and_data, {reply, From, {error, Reason}}}
-    end;
-connected(cast, {send_to_remote_async, Msg, Callback}, State) ->
-    _ = do_send_async(State, Msg, Callback),
-    {keep_state, State};
-connected(
-    info,
-    {disconnected, Conn, Reason},
-    #{connection := Connection, name := Name, reconnect_interval := ReconnectDelayMs} = State
-) ->
-    ?tp(info, disconnected, #{name => Name, reason => Reason}),
-    case Conn =:= maps:get(client_pid, Connection, undefined) of
-        true ->
-            {next_state, idle, State#{connection => undefined},
-                {state_timeout, ReconnectDelayMs, reconnect}};
-        false ->
-            keep_state_and_data
-    end;
-connected(Type, Content, State) ->
-    common(connected, Type, Content, State).
-
-%% Common handlers
-common(StateName, {call, From}, status, _State) ->
-    {keep_state_and_data, {reply, From, StateName}};
-common(_StateName, {call, From}, ping, #{connection := Conn} = _State) ->
-    Reply = emqx_connector_mqtt_mod:ping(Conn),
-    {keep_state_and_data, {reply, From, Reply}};
-common(_StateName, {call, From}, ensure_stopped, #{connection := undefined} = _State) ->
-    {keep_state_and_data, {reply, From, ok}};
-common(_StateName, {call, From}, ensure_stopped, #{connection := Conn} = State) ->
-    Reply = emqx_connector_mqtt_mod:stop(Conn),
-    {next_state, idle, State#{connection => undefined}, {reply, From, Reply}};
-common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := Forwards}}) ->
-    {keep_state_and_data, {reply, From, Forwards}};
-common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) ->
-    {keep_state_and_data, {reply, From, maps:get(subscriptions, Connection, #{})}};
-common(_StateName, {call, From}, Req, _State) ->
-    {keep_state_and_data, {reply, From, {error, {unsupported_request, Req}}}};
-common(_StateName, info, {'EXIT', _, _}, State) ->
-    {keep_state, State};
-common(StateName, Type, Content, #{name := Name} = State) ->
-    ?SLOG(error, #{
-        msg => "bridge_discarded_event",
-        name => Name,
-        type => Type,
-        state_name => StateName,
-        content => Content
-    }),
-    {keep_state, State}.
-
-do_connect(
-    #{
-        connect_opts := ConnectOpts,
-        name := Name
-    } = State
-) ->
-    case emqx_connector_mqtt_mod:start(ConnectOpts) of
-        {ok, Conn} ->
-            ?tp(info, connected, #{name => Name}),
-            {ok, State#{connection => Conn}};
-        {error, Reason} ->
-            ConnectOpts1 = obfuscate(ConnectOpts),
+export_msg(Name, Msg) ->
+    case get_config(Name) of
+        #{forwards := Forwards = #{}, mountpoint := Mountpoint} ->
+            {true, export_msg(Mountpoint, Forwards, Msg)};
+        #{forwards := undefined} ->
             ?SLOG(error, #{
-                msg => "failed_to_connect",
-                config => ConnectOpts1,
-                reason => Reason
+                msg => "forwarding_unavailable",
+                message => Msg,
+                reason => "egress is not configured"
             }),
-            {error, Reason, State}
+            false
     end.
 
-do_send(#{connect_opts := #{forwards := undefined}}, Msg) ->
-    ?SLOG(error, #{
-        msg =>
-            "cannot_forward_messages_to_remote_broker"
-            "_as_'egress'_is_not_configured",
-        messages => Msg
-    });
-do_send(
-    #{
-        connection := Connection,
-        mountpoint := Mountpoint,
-        connect_opts := #{forwards := Forwards}
-    } = State,
-    Msg
-) ->
+export_msg(Mountpoint, Forwards, Msg) ->
     Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards),
-    ExportMsg = emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars),
+    emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars).
+
+%%
+
+handle_publish(#{properties := Props} = MsgIn, Vars, Opts) ->
+    Msg = import_msg(MsgIn, Opts),
     ?SLOG(debug, #{
-        msg => "publish_to_remote_broker",
+        msg => "publish_local",
         message => Msg,
         vars => Vars
     }),
-    case emqx_connector_mqtt_mod:send(Connection, ExportMsg) of
-        ok ->
-            {ok, State};
-        {ok, #{reason_code := RC}} when
-            RC =:= ?RC_SUCCESS;
-            RC =:= ?RC_NO_MATCHING_SUBSCRIBERS
-        ->
-            {ok, State};
-        {ok, #{reason_code := RC, reason_code_name := RCN}} ->
-            ?SLOG(warning, #{
-                msg => "publish_to_remote_node_falied",
-                message => Msg,
-                reason_code => RC,
-                reason_code_name => RCN
-            }),
-            {error, RCN};
-        {error, Reason} ->
-            ?SLOG(info, #{
-                msg => "mqtt_bridge_produce_failed",
-                reason => Reason
-            }),
-            {error, Reason}
+    case Vars of
+        #{on_message_received := {Mod, Func, Args}} ->
+            _ = erlang:apply(Mod, Func, [Msg | Args]);
+        _ ->
+            ok
+    end,
+    maybe_publish_local(Msg, Vars, Props).
+
+handle_disconnect(_Reason) ->
+    ok.
+
+maybe_publish_local(Msg, Vars, Props) ->
+    case emqx_map_lib:deep_get([local, topic], Vars, undefined) of
+        %% local topic is not set, discard it
+        undefined ->
+            ok;
+        _ ->
+            emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props))
     end.
 
-do_send_async(#{connect_opts := #{forwards := undefined}}, Msg, _Callback) ->
-    %% TODO: eval callback with undefined error
-    ?SLOG(error, #{
-        msg =>
-            "cannot_forward_messages_to_remote_broker"
-            "_as_'egress'_is_not_configured",
-        messages => Msg
-    });
-do_send_async(
+import_msg(
     #{
-        connection := Connection,
-        mountpoint := Mountpoint,
-        connect_opts := #{forwards := Forwards}
+        dup := Dup,
+        payload := Payload,
+        properties := Props,
+        qos := QoS,
+        retain := Retain,
+        topic := Topic
     },
-    Msg,
-    Callback
+    #{server := Server}
 ) ->
-    Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards),
-    ExportMsg = emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars),
-    ?SLOG(debug, #{
-        msg => "publish_to_remote_broker",
-        message => Msg,
-        vars => Vars
-    }),
-    emqx_connector_mqtt_mod:send_async(Connection, ExportMsg, Callback).
-
-disconnect(#{connection := Conn} = State) when Conn =/= undefined ->
-    emqx_connector_mqtt_mod:stop(Conn),
-    State#{connection => undefined};
-disconnect(State) ->
-    State.
-
-format_mountpoint(undefined) ->
-    undefined;
-format_mountpoint(Prefix) ->
-    binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).
-
-name(Id) -> list_to_atom(str(Id)).
+    #{
+        id => emqx_guid:to_hexstr(emqx_guid:gen()),
+        server => Server,
+        payload => Payload,
+        topic => Topic,
+        qos => QoS,
+        dup => Dup,
+        retain => Retain,
+        pub_props => printable_maps(Props),
+        message_received_at => erlang:system_time(millisecond)
+    }.
 
-obfuscate(Map) ->
+printable_maps(undefined) ->
+    #{};
+printable_maps(Headers) ->
     maps:fold(
-        fun(K, V, Acc) ->
-            case is_sensitive(K) of
-                true -> [{K, '***'} | Acc];
-                false -> [{K, V} | Acc]
-            end
+        fun
+            ('User-Property', V0, AccIn) when is_list(V0) ->
+                AccIn#{
+                    'User-Property' => maps:from_list(V0),
+                    'User-Property-Pairs' => [
+                        #{
+                            key => Key,
+                            value => Value
+                        }
+                     || {Key, Value} <- V0
+                    ]
+                };
+            (K, V0, AccIn) ->
+                AccIn#{K => V0}
         end,
-        [],
-        Map
+        #{},
+        Headers
     ).
 
-is_sensitive(password) -> true;
-is_sensitive(ssl_opts) -> true;
-is_sensitive(_) -> false.
-
-str(A) when is_atom(A) ->
-    atom_to_list(A);
-str(B) when is_binary(B) ->
-    binary_to_list(B);
-str(S) when is_list(S) ->
-    S.
+%% TODO
+% maybe_destroy_session(#{connect_opts := ConnectOpts = #{clean_start := false}} = State) ->
+%     try
+%         %% Destroy session if clean_start is not set.
+%         %% Ignore any crashes, just refresh the clean_start = true.
+%         _ = do_connect(State#{connect_opts => ConnectOpts#{clean_start => true}}),
+%         _ = disconnect(State),
+%         ok
+%     catch
+%         _:_ ->
+%             ok
+%     end;
+% maybe_destroy_session(_State) ->
+%     ok.

+ 0 - 60
apps/emqx_connector/test/emqx_connector_mqtt_tests.erl

@@ -1,60 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_connector_mqtt_tests).
-
--include_lib("eunit/include/eunit.hrl").
--include_lib("emqx/include/emqx_mqtt.hrl").
-
-send_and_ack_test() ->
-    %% delegate from gen_rpc to rpc for unit test
-    meck:new(emqtt, [passthrough, no_history]),
-    meck:expect(
-        emqtt,
-        start_link,
-        1,
-        fun(_) ->
-            {ok, spawn_link(fun() -> ok end)}
-        end
-    ),
-    meck:expect(emqtt, connect, 1, {ok, dummy}),
-    meck:expect(
-        emqtt,
-        stop,
-        1,
-        fun(Pid) -> Pid ! stop end
-    ),
-    meck:expect(
-        emqtt,
-        publish,
-        2,
-        fun(Client, Msg) ->
-            Client ! {publish, Msg},
-            %% as packet id
-            {ok, Msg}
-        end
-    ),
-    try
-        Max = 1,
-        Batch = lists:seq(1, Max),
-        {ok, Conn} = emqx_connector_mqtt_mod:start(#{server => "127.0.0.1:1883"}),
-        %% return last packet id as batch reference
-        {ok, _AckRef} = emqx_connector_mqtt_mod:send(Conn, Batch),
-
-        ok = emqx_connector_mqtt_mod:stop(Conn)
-    after
-        meck:unload(emqtt)
-    end.

+ 0 - 101
apps/emqx_connector/test/emqx_connector_mqtt_worker_tests.erl

@@ -1,101 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_connector_mqtt_worker_tests).
-
--include_lib("eunit/include/eunit.hrl").
--include_lib("emqx/include/emqx.hrl").
--include_lib("emqx/include/emqx_mqtt.hrl").
-
--define(BRIDGE_NAME, test).
--define(BRIDGE_REG_NAME, emqx_connector_mqtt_worker_test).
--define(WAIT(PATTERN, TIMEOUT),
-    receive
-        PATTERN ->
-            ok
-    after TIMEOUT ->
-        error(timeout)
-    end
-).
-
--export([start/1, send/2, stop/1]).
-
-start(#{connect_result := Result, test_pid := Pid, test_ref := Ref}) ->
-    case is_pid(Pid) of
-        true -> Pid ! {connection_start_attempt, Ref};
-        false -> ok
-    end,
-    Result.
-
-send(SendFun, Batch) when is_function(SendFun, 2) ->
-    SendFun(Batch).
-
-stop(_Pid) -> ok.
-
-%% connect first, disconnect, then connect again
-disturbance_test() ->
-    meck:new(emqx_connector_mqtt_mod, [passthrough, no_history]),
-    meck:expect(emqx_connector_mqtt_mod, start, 1, fun(Conf) -> start(Conf) end),
-    meck:expect(emqx_connector_mqtt_mod, send, 2, fun(SendFun, Batch) -> send(SendFun, Batch) end),
-    meck:expect(emqx_connector_mqtt_mod, stop, 1, fun(Pid) -> stop(Pid) end),
-    try
-        emqx_metrics:start_link(),
-        Ref = make_ref(),
-        TestPid = self(),
-        Config = make_config(Ref, TestPid, {ok, #{client_pid => TestPid}}),
-        {ok, Pid} = emqx_connector_mqtt_worker:start_link(Config#{name => bridge_disturbance}),
-        ?assertEqual(Pid, whereis(bridge_disturbance)),
-        ?WAIT({connection_start_attempt, Ref}, 1000),
-        Pid ! {disconnected, TestPid, test},
-        ?WAIT({connection_start_attempt, Ref}, 1000),
-        emqx_metrics:stop(),
-        ok = emqx_connector_mqtt_worker:stop(Pid)
-    after
-        meck:unload(emqx_connector_mqtt_mod)
-    end.
-
-manual_start_stop_test() ->
-    meck:new(emqx_connector_mqtt_mod, [passthrough, no_history]),
-    meck:expect(emqx_connector_mqtt_mod, start, 1, fun(Conf) -> start(Conf) end),
-    meck:expect(emqx_connector_mqtt_mod, send, 2, fun(SendFun, Batch) -> send(SendFun, Batch) end),
-    meck:expect(emqx_connector_mqtt_mod, stop, 1, fun(Pid) -> stop(Pid) end),
-    try
-        emqx_metrics:start_link(),
-        Ref = make_ref(),
-        TestPid = self(),
-        BridgeName = manual_start_stop,
-        Config0 = make_config(Ref, TestPid, {ok, #{client_pid => TestPid}}),
-        Config = Config0#{start_type := manual},
-        {ok, Pid} = emqx_connector_mqtt_worker:start_link(Config#{name => BridgeName}),
-        %% call ensure_started again should yield the same result
-        ok = emqx_connector_mqtt_worker:ensure_started(BridgeName),
-        emqx_connector_mqtt_worker:ensure_stopped(BridgeName),
-        emqx_metrics:stop(),
-        ok = emqx_connector_mqtt_worker:stop(Pid)
-    after
-        meck:unload(emqx_connector_mqtt_mod)
-    end.
-
-make_config(Ref, TestPid, Result) ->
-    #{
-        start_type => auto,
-        subscriptions => undefined,
-        forwards => undefined,
-        reconnect_interval => 50,
-        test_pid => TestPid,
-        test_ref => Ref,
-        connect_result => Result
-    }.

+ 5 - 4
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -1144,6 +1144,10 @@ append_queue(Id, Index, Q, Queries) ->
 -define(INITIAL_TIME_REF, initial_time).
 -define(INITIAL_MONOTONIC_TIME_REF, initial_monotonic_time).
 
+%% NOTE
+%% There are 4 metadata rows in an inflight table, keyed by atoms declared above. ☝
+-define(INFLIGHT_META_ROWS, 4).
+
 inflight_new(InfltWinSZ, Id, Index) ->
     TableId = ets:new(
         emqx_resource_buffer_worker_inflight_tab,
@@ -1204,12 +1208,9 @@ is_inflight_full(InflightTID) ->
     Size >= MaxSize.
 
 inflight_num_batches(InflightTID) ->
-    %% Note: we subtract 2 because there're 2 metadata rows that hold
-    %% the maximum size value and the number of messages.
-    MetadataRowCount = 2,
     case ets:info(InflightTID, size) of
         undefined -> 0;
-        Size -> max(0, Size - MetadataRowCount)
+        Size -> max(0, Size - ?INFLIGHT_META_ROWS)
     end.
 
 inflight_num_msgs(InflightTID) ->

+ 18 - 14
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -411,7 +411,8 @@ t_query_counter_async_inflight(_) ->
     ?check_trace(
         {_, {ok, _}} =
             ?wait_async_action(
-                inc_counter_in_parallel(WindowSize, ReqOpts),
+                %% one more so that inflight would be already full upon last query
+                inc_counter_in_parallel(WindowSize + 1, ReqOpts),
                 #{?snk_kind := buffer_worker_flush_but_inflight_full},
                 1_000
             ),
@@ -445,9 +446,9 @@ t_query_counter_async_inflight(_) ->
     %% all responses should be received after the resource is resumed.
     {ok, SRef0} = snabbkaffe:subscribe(
         ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
-        %% +1 because the tmp_query above will be retried and succeed
+        %% +2 because the tmp_query above will be retried and succeed
         %% this time.
-        WindowSize + 1,
+        WindowSize + 2,
         _Timeout0 = 10_000
     ),
     ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
@@ -475,7 +476,7 @@ t_query_counter_async_inflight(_) ->
         fun(Trace) ->
             QueryTrace = ?of_kind(call_query_async, Trace),
             ?assertMatch([#{query := {query, _, {inc_counter, _}, _, _}} | _], QueryTrace),
-            ?assertEqual(WindowSize + Num, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
+            ?assertEqual(WindowSize + Num + 1, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
             tap_metrics(?LINE),
             ok
         end
@@ -487,7 +488,8 @@ t_query_counter_async_inflight(_) ->
     ?check_trace(
         {_, {ok, _}} =
             ?wait_async_action(
-                inc_counter_in_parallel(WindowSize, ReqOpts),
+                %% one more so that inflight would be already full upon last query
+                inc_counter_in_parallel(WindowSize + 1, ReqOpts),
                 #{?snk_kind := buffer_worker_flush_but_inflight_full},
                 1_000
             ),
@@ -500,10 +502,10 @@ t_query_counter_async_inflight(_) ->
     %% this will block the resource_worker
     ok = emqx_resource:query(?ID, {inc_counter, 4}),
 
-    Sent = WindowSize + Num + WindowSize,
+    Sent = WindowSize + 1 + Num + WindowSize + 1,
     {ok, SRef1} = snabbkaffe:subscribe(
         ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
-        WindowSize,
+        WindowSize + 1,
         _Timeout0 = 10_000
     ),
     ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
@@ -593,7 +595,8 @@ t_query_counter_async_inflight_batch(_) ->
     ?check_trace(
         {_, {ok, _}} =
             ?wait_async_action(
-                inc_counter_in_parallel(NumMsgs, ReqOpts),
+                %% a batch more so that inflight would be already full upon last query
+                inc_counter_in_parallel(NumMsgs + BatchSize, ReqOpts),
                 #{?snk_kind := buffer_worker_flush_but_inflight_full},
                 5_000
             ),
@@ -652,9 +655,9 @@ t_query_counter_async_inflight_batch(_) ->
     %% all responses should be received after the resource is resumed.
     {ok, SRef0} = snabbkaffe:subscribe(
         ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
-        %% +1 because the tmp_query above will be retried and succeed
+        %% +2 because the tmp_query above will be retried and succeed
         %% this time.
-        WindowSize + 1,
+        WindowSize + 2,
         10_000
     ),
     ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
@@ -664,7 +667,7 @@ t_query_counter_async_inflight_batch(_) ->
     %% take it again from the table; this time, it should have
     %% succeeded.
     ?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)),
-    ?assertEqual(NumMsgs, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
+    ?assertEqual(NumMsgs + BatchSize, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
     tap_metrics(?LINE),
 
     %% send async query, this time everything should be ok.
@@ -691,7 +694,7 @@ t_query_counter_async_inflight_batch(_) ->
         end
     ),
     ?assertEqual(
-        NumMsgs + NumMsgs1,
+        NumMsgs + BatchSize + NumMsgs1,
         ets:info(Tab0, size),
         #{tab => ets:tab2list(Tab0)}
     ),
@@ -703,7 +706,8 @@ t_query_counter_async_inflight_batch(_) ->
     ?check_trace(
         {_, {ok, _}} =
             ?wait_async_action(
-                inc_counter_in_parallel(NumMsgs, ReqOpts),
+                %% a batch more so that inflight would be already full upon last query
+                inc_counter_in_parallel(NumMsgs + BatchSize, ReqOpts),
                 #{?snk_kind := buffer_worker_flush_but_inflight_full},
                 5_000
             ),
@@ -719,7 +723,7 @@ t_query_counter_async_inflight_batch(_) ->
     %% this will block the resource_worker
     ok = emqx_resource:query(?ID, {inc_counter, 1}),
 
-    Sent = NumMsgs + NumMsgs1 + NumMsgs,
+    Sent = NumMsgs + BatchSize + NumMsgs1 + NumMsgs,
     {ok, SRef1} = snabbkaffe:subscribe(
         ?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
         WindowSize,