Explorar o código

Rename portal to bridge

Gilbert Wong %!s(int64=7) %!d(string=hai) anos
pai
achega
7efd7b3ec0

+ 10 - 10
Makefile

@@ -27,17 +27,17 @@ TEST_ERLC_OPTS += +debug_info -DAPPLICATION=emqx
 
 
 EUNIT_OPTS = verbose
 EUNIT_OPTS = verbose
 
 
-# CT_SUITES = emqx_frame
+CT_SUITES = emqx_bridge
 ## emqx_trie emqx_router emqx_frame emqx_mqtt_compat
 ## emqx_trie emqx_router emqx_frame emqx_mqtt_compat
 
 
-CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
-			emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \
-			emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \
-			emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
-			emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
-			emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_portal \
-			emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \
-			emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message
+# CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
+# 			emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \
+# 			emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \
+# 			emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
+# 			emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
+# 			emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \
+# 			emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \
+# 			emqx_packet emqx_connection emqx_tracer emqx_sys_mon emqx_message
 
 
 CT_NODE_NAME = emqxct@127.0.0.1
 CT_NODE_NAME = emqxct@127.0.0.1
 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
@@ -110,7 +110,7 @@ rebar-ct: rebar-ct-setup
 	@rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(shell echo $(foreach var,$(CT_SUITES),test/$(var)_SUITE) | tr ' ' ',')
 	@rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(shell echo $(foreach var,$(CT_SUITES),test/$(var)_SUITE) | tr ' ' ',')
 
 
 ## Run one single CT with rebar3
 ## Run one single CT with rebar3
-## e.g. make ct-one-suite suite=emqx_portal
+## e.g. make ct-one-suite suite=emqx_bridge
 ct-one-suite: rebar-ct-setup
 ct-one-suite: rebar-ct-setup
 	@rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(suite)_SUITE
 	@rebar3 ct -v --readable=false --name $(CT_NODE_NAME) --suite=$(suite)_SUITE
 
 

+ 2 - 2
etc/emqx.conf

@@ -1726,14 +1726,14 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
 ## bridge.aws.max_inflight_batches = 32
 ## bridge.aws.max_inflight_batches = 32
 
 
 ## Max number of messages to collect in a batch for
 ## Max number of messages to collect in a batch for
-## each send call towards emqx_portal_connect
+## each send call towards emqx_bridge_connect
 ##
 ##
 ## Value: Integer
 ## Value: Integer
 ## default: 32
 ## default: 32
 ## bridge.aws.queue.batch_count_limit = 32
 ## bridge.aws.queue.batch_count_limit = 32
 
 
 ## Max number of bytes to collect in a batch for each
 ## Max number of bytes to collect in a batch for each
-## send call towards emqx_portal_connect
+## send call towards emqx_bridge_connect
 ##
 ##
 ## Value: Bytesize
 ## Value: Bytesize
 ## default: 1000M
 ## default: 1000M

+ 2 - 2
priv/emqx.schema

@@ -1675,9 +1675,9 @@ end}.
                           true when Subs =/= [] ->
                           true when Subs =/= [] ->
                               error({"subscriptions are not supported when bridging between emqx nodes", Name, Subs});
                               error({"subscriptions are not supported when bridging between emqx nodes", Name, Subs});
                           true ->
                           true ->
-                              emqx_portal_rpc;
+                              emqx_bridge_rpc;
                           false ->
                           false ->
-                              emqx_portal_mqtt
+                              emqx_bridge_mqtt
                       end
                       end
               end,
               end,
     %% to be backward compatible
     %% to be backward compatible

+ 31 - 31
src/portal/emqx_portal.erl

@@ -12,18 +12,18 @@
 %% See the License for the specific language governing permissions and
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %% limitations under the License.
 
 
-%% @doc Portal works in two layers (1) batching layer (2) transport layer
-%% The `portal' batching layer collects local messages in batches and sends over
+%% @doc Bridge works in two layers (1) batching layer (2) transport layer
+%% The `bridge' batching layer collects local messages in batches and sends over
 %% to remote MQTT node/cluster via `connetion' transport layer.
 %% to remote MQTT node/cluster via `connetion' transport layer.
 %% In case `REMOTE' is also an EMQX node, `connection' is recommended to be
 %% In case `REMOTE' is also an EMQX node, `connection' is recommended to be
-%% the `gen_rpc' based implementation `emqx_portal_rpc'. Otherwise `connection'
-%% has to be `emqx_portal_mqtt'.
+%% the `gen_rpc' based implementation `emqx_bridge_rpc'. Otherwise `connection'
+%% has to be `emqx_bridge_mqtt'.
 %%
 %%
 %% ```
 %% ```
 %% +------+                        +--------+
 %% +------+                        +--------+
 %% | EMQX |                        | REMOTE |
 %% | EMQX |                        | REMOTE |
 %% |      |                        |        |
 %% |      |                        |        |
-%% |   (portal) <==(connection)==> |        |
+%% |   (bridge) <==(connection)==> |        |
 %% |      |                        |        |
 %% |      |                        |        |
 %% |      |                        |        |
 %% |      |                        |        |
 %% +------+                        +--------+
 %% +------+                        +--------+
@@ -47,8 +47,8 @@
 %% (3): received {disconnected, conn_ref(), Reason} OR
 %% (3): received {disconnected, conn_ref(), Reason} OR
 %%      failed to send to remote node/cluster.
 %%      failed to send to remote node/cluster.
 %%
 %%
-%% NOTE: A portal worker may subscribe to multiple (including wildcard)
-%% local topics, and the underlying `emqx_portal_connect' may subscribe to
+%% NOTE: A bridge worker may subscribe to multiple (including wildcard)
+%% local topics, and the underlying `emqx_bridge_connect' may subscribe to
 %% multiple remote topics, however, worker/connections are not designed
 %% multiple remote topics, however, worker/connections are not designed
 %% to support automatic load-balancing, i.e. in case it can not keep up
 %% to support automatic load-balancing, i.e. in case it can not keep up
 %% with the amount of messages comming in, administrator should split and
 %% with the amount of messages comming in, administrator should split and
@@ -57,7 +57,7 @@
 %% NOTES:
 %% NOTES:
 %% * Local messages are all normalised to QoS-1 when exporting to remote
 %% * Local messages are all normalised to QoS-1 when exporting to remote
 
 
--module(emqx_portal).
+-module(emqx_bridge).
 -behaviour(gen_statem).
 -behaviour(gen_statem).
 
 
 %% APIs
 %% APIs
@@ -84,7 +84,7 @@
 -type id() :: atom() | string() | pid().
 -type id() :: atom() | string() | pid().
 -type qos() :: emqx_mqtt_types:qos().
 -type qos() :: emqx_mqtt_types:qos().
 -type config() :: map().
 -type config() :: map().
--type batch() :: [emqx_portal_msg:exp_msg()].
+-type batch() :: [emqx_bridge_msg:exp_msg()].
 -type ack_ref() :: term().
 -type ack_ref() :: term().
 -type topic() :: emqx_topic:topic().
 -type topic() :: emqx_topic:topic().
 
 
@@ -99,12 +99,12 @@
 -define(DEFAULT_SEG_BYTES, (1 bsl 20)).
 -define(DEFAULT_SEG_BYTES, (1 bsl 20)).
 -define(maybe_send, {next_event, internal, maybe_send}).
 -define(maybe_send, {next_event, internal, maybe_send}).
 
 
-%% @doc Start a portal worker. Supported configs:
-%% start_type: 'manual' (default) or 'auto', when manual, portal will stay
+%% @doc Start a bridge worker. Supported configs:
+%% start_type: 'manual' (default) or 'auto', when manual, bridge will stay
 %%      at 'standing_by' state until a manual call to start it.
 %%      at 'standing_by' state until a manual call to start it.
-%% connect_module: The module which implements emqx_portal_connect behaviour
+%% connect_module: The module which implements emqx_bridge_connect behaviour
 %%      and work as message batch transport layer
 %%      and work as message batch transport layer
-%% reconnect_delay_ms: Delay in milli-seconds for the portal worker to retry
+%% reconnect_delay_ms: Delay in milli-seconds for the bridge worker to retry
 %%      in case of transportation failure.
 %%      in case of transportation failure.
 %% max_inflight_batches: Max number of batches allowed to send-ahead before
 %% max_inflight_batches: Max number of batches allowed to send-ahead before
 %%      receiving confirmation from remote node/cluster
 %%      receiving confirmation from remote node/cluster
@@ -112,20 +112,20 @@
 %%      `undefined', `<<>>' or `""' to disable
 %%      `undefined', `<<>>' or `""' to disable
 %% forwards: Local topics to subscribe.
 %% forwards: Local topics to subscribe.
 %% queue.batch_bytes_limit: Max number of bytes to collect in a batch for each
 %% queue.batch_bytes_limit: Max number of bytes to collect in a batch for each
-%%      send call towards emqx_portal_connect
+%%      send call towards emqx_bridge_connect
 %% queue.batch_count_limit: Max number of messages to collect in a batch for
 %% queue.batch_count_limit: Max number of messages to collect in a batch for
-%%      each send call towards emqx_portal_connect
+%%      each send call towards emqx_bridge_connect
 %% queue.replayq_dir: Directory where replayq should persist messages
 %% queue.replayq_dir: Directory where replayq should persist messages
 %% queue.replayq_seg_bytes: Size in bytes for each replayq segment file
 %% queue.replayq_seg_bytes: Size in bytes for each replayq segment file
 %%
 %%
 %% Find more connection specific configs in the callback modules
 %% Find more connection specific configs in the callback modules
-%% of emqx_portal_connect behaviour.
+%% of emqx_bridge_connect behaviour.
 start_link(Name, Config) when is_list(Config) ->
 start_link(Name, Config) when is_list(Config) ->
     start_link(Name, maps:from_list(Config));
     start_link(Name, maps:from_list(Config));
 start_link(Name, Config) ->
 start_link(Name, Config) ->
     gen_statem:start_link({local, name(Name)}, ?MODULE, Config, []).
     gen_statem:start_link({local, name(Name)}, ?MODULE, Config, []).
 
 
-%% @doc Manually start portal worker. State idempotency ensured.
+%% @doc Manually start bridge worker. State idempotency ensured.
 ensure_started(Name) ->
 ensure_started(Name) ->
     gen_statem:call(name(Name), ensure_started).
     gen_statem:call(name(Name), ensure_started).
 
 
@@ -135,7 +135,7 @@ ensure_started(Name, Config) ->
         {error, {already_started,Pid}} -> {ok, Pid}
         {error, {already_started,Pid}} -> {ok, Pid}
     end.
     end.
 
 
-%% @doc Manually stop portal worker. State idempotency ensured.
+%% @doc Manually stop bridge worker. State idempotency ensured.
 ensure_stopped(Id) ->
 ensure_stopped(Id) ->
     ensure_stopped(Id, 1000).
     ensure_stopped(Id, 1000).
 
 
@@ -168,7 +168,7 @@ status(Pid) ->
 %% @doc This function is to be evaluated on message/batch receiver side.
 %% @doc This function is to be evaluated on message/batch receiver side.
 -spec import_batch(batch(), fun(() -> ok)) -> ok.
 -spec import_batch(batch(), fun(() -> ok)) -> ok.
 import_batch(Batch, AckFun) ->
 import_batch(Batch, AckFun) ->
-    lists:foreach(fun emqx_broker:publish/1, emqx_portal_msg:to_broker_msgs(Batch)),
+    lists:foreach(fun emqx_broker:publish/1, emqx_bridge_msg:to_broker_msgs(Batch)),
     AckFun().
     AckFun().
 
 
 %% @doc This function is to be evaluated on message/batch exporter side
 %% @doc This function is to be evaluated on message/batch exporter side
@@ -197,14 +197,14 @@ ensure_forward_absent(Id, Topic) ->
     gen_statem:call(id(Id), {ensure_absent, forwards, topic(Topic)}).
     gen_statem:call(id(Id), {ensure_absent, forwards, topic(Topic)}).
 
 
 %% @doc Ensure subscribed to remote topic.
 %% @doc Ensure subscribed to remote topic.
-%% NOTE: only applicable when connection module is emqx_portal_mqtt
+%% NOTE: only applicable when connection module is emqx_bridge_mqtt
 %%       return `{error, no_remote_subscription_support}' otherwise.
 %%       return `{error, no_remote_subscription_support}' otherwise.
 -spec ensure_subscription_present(id(), topic(), qos()) -> ok | {error, any()}.
 -spec ensure_subscription_present(id(), topic(), qos()) -> ok | {error, any()}.
 ensure_subscription_present(Id, Topic, QoS) ->
 ensure_subscription_present(Id, Topic, QoS) ->
     gen_statem:call(id(Id), {ensure_present, subscriptions, {topic(Topic), QoS}}).
     gen_statem:call(id(Id), {ensure_present, subscriptions, {topic(Topic), QoS}}).
 
 
 %% @doc Ensure unsubscribed from remote topic.
 %% @doc Ensure unsubscribed from remote topic.
-%% NOTE: only applicable when connection module is emqx_portal_mqtt
+%% NOTE: only applicable when connection module is emqx_bridge_mqtt
 -spec ensure_subscription_absent(id(), topic()) -> ok.
 -spec ensure_subscription_absent(id(), topic()) -> ok.
 ensure_subscription_absent(Id, Topic) ->
 ensure_subscription_absent(Id, Topic) ->
     gen_statem:call(id(Id), {ensure_absent, subscriptions, topic(Topic)}).
     gen_statem:call(id(Id), {ensure_absent, subscriptions, topic(Topic)}).
@@ -225,7 +225,7 @@ init(Config) ->
                        seg_bytes => GetQ(replayq_seg_bytes, ?DEFAULT_SEG_BYTES)
                        seg_bytes => GetQ(replayq_seg_bytes, ?DEFAULT_SEG_BYTES)
                       }
                       }
         end,
         end,
-    Queue = replayq:open(QueueConfig#{sizer => fun emqx_portal_msg:estimate_size/1,
+    Queue = replayq:open(QueueConfig#{sizer => fun emqx_bridge_msg:estimate_size/1,
                                       marshaller => fun msg_marshaller/1}),
                                       marshaller => fun msg_marshaller/1}),
     Topics = lists:sort([iolist_to_binary(T) || T <- Get(forwards, [])]),
     Topics = lists:sort([iolist_to_binary(T) || T <- Get(forwards, [])]),
     Subs = lists:keysort(1, lists:map(fun({T0, QoS}) ->
     Subs = lists:keysort(1, lists:map(fun({T0, QoS}) ->
@@ -241,7 +241,7 @@ init(Config) ->
                                   mountpoint,
                                   mountpoint,
                                   forwards
                                   forwards
                                  ], Config#{subscriptions => Subs}),
                                  ], Config#{subscriptions => Subs}),
-    ConnectFun = fun(SubsX) -> emqx_portal_connect:start(ConnectModule, ConnectConfig#{subscriptions := SubsX}) end,
+    ConnectFun = fun(SubsX) -> emqx_bridge_connect:start(ConnectModule, ConnectConfig#{subscriptions := SubsX}) end,
     {ok, standing_by,
     {ok, standing_by,
      #{connect_module => ConnectModule,
      #{connect_module => ConnectModule,
        connect_fun => ConnectFun,
        connect_fun => ConnectFun,
@@ -279,7 +279,7 @@ standing_by(state_timeout, do_connect, State) ->
 standing_by({call, From}, _Call, _State) ->
 standing_by({call, From}, _Call, _State) ->
     {keep_state_and_data, [{reply, From, {error,standing_by}}]};
     {keep_state_and_data, [{reply, From, {error,standing_by}}]};
 standing_by(info, Info, State) ->
 standing_by(info, Info, State) ->
-    ?LOG(info, "Portal ~p discarded info event at state standing_by:\n~p", [name(), Info]),
+    ?LOG(info, "Bridge ~p discarded info event at state standing_by:\n~p", [name(), Info]),
     {keep_state_and_data, State};
     {keep_state_and_data, State};
 standing_by(Type, Content, State) ->
 standing_by(Type, Content, State) ->
     common(standing_by, Type, Content, State).
     common(standing_by, Type, Content, State).
@@ -298,7 +298,7 @@ connecting(enter, _, #{reconnect_delay_ms := Timeout,
     ok = subscribe_local_topics(Forwards),
     ok = subscribe_local_topics(Forwards),
     case ConnectFun(Subs) of
     case ConnectFun(Subs) of
         {ok, ConnRef, Conn} ->
         {ok, ConnRef, Conn} ->
-            ?LOG(info, "Portal ~p connected", [name()]),
+            ?LOG(info, "Bridge ~p connected", [name()]),
             Action = {state_timeout, 0, connected},
             Action = {state_timeout, 0, connected},
             {keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action};
             {keep_state, State#{conn_ref => ConnRef, connection => Conn}, Action};
         error ->
         error ->
@@ -348,7 +348,7 @@ connected(info, {disconnected, ConnRef, Reason},
           #{conn_ref := ConnRefCurrent, connection := Conn} = State) ->
           #{conn_ref := ConnRefCurrent, connection := Conn} = State) ->
     case ConnRefCurrent =:= ConnRef of
     case ConnRefCurrent =:= ConnRef of
         true ->
         true ->
-            ?LOG(info, "Portal ~p diconnected~nreason=~p", [name(), Conn, Reason]),
+            ?LOG(info, "Bridge ~p diconnected~nreason=~p", [name(), Conn, Reason]),
             {next_state, connecting,
             {next_state, connecting,
              State#{conn_ref := undefined, connection := undefined}};
              State#{conn_ref := undefined, connection := undefined}};
         false ->
         false ->
@@ -360,7 +360,7 @@ connected(info, {batch_ack, Ref}, State) ->
             keep_state_and_data;
             keep_state_and_data;
         bad_order ->
         bad_order ->
             %% try re-connect then re-send
             %% try re-connect then re-send
-            ?LOG(error, "Bad order ack received by portal ~p", [name()]),
+            ?LOG(error, "Bad order ack received by bridge ~p", [name()]),
             {next_state, connecting, disconnect(State)};
             {next_state, connecting, disconnect(State)};
         {ok, NewState} ->
         {ok, NewState} ->
             {keep_state, NewState, ?maybe_send}
             {keep_state, NewState, ?maybe_send}
@@ -391,7 +391,7 @@ common(_StateName, info, {dispatch, _, Msg},
     NewQ = replayq:append(Q, collect([Msg])),
     NewQ = replayq:append(Q, collect([Msg])),
     {keep_state, State#{replayq => NewQ}, ?maybe_send};
     {keep_state, State#{replayq => NewQ}, ?maybe_send};
 common(StateName, Type, Content, State) ->
 common(StateName, Type, Content, State) ->
-    ?LOG(info, "Portal ~p discarded ~p type event at state ~p:\n~p",
+    ?LOG(info, "Bridge ~p discarded ~p type event at state ~p:\n~p",
           [name(), Type, StateName, Content]),
           [name(), Type, StateName, Content]),
     {keep_state, State}.
     {keep_state, State}.
 
 
@@ -531,15 +531,15 @@ disconnect(#{connection := Conn,
 disconnect(State) -> State.
 disconnect(State) -> State.
 
 
 %% Called only when replayq needs to dump it to disk.
 %% Called only when replayq needs to dump it to disk.
-msg_marshaller(Bin) when is_binary(Bin) -> emqx_portal_msg:from_binary(Bin);
-msg_marshaller(Msg) -> emqx_portal_msg:to_binary(Msg).
+msg_marshaller(Bin) when is_binary(Bin) -> emqx_bridge_msg:from_binary(Bin);
+msg_marshaller(Msg) -> emqx_bridge_msg:to_binary(Msg).
 
 
 %% Return {ok, SendAckRef} or {error, Reason}
 %% Return {ok, SendAckRef} or {error, Reason}
 maybe_send(#{connect_module := Module,
 maybe_send(#{connect_module := Module,
              connection := Connection,
              connection := Connection,
              mountpoint := Mountpoint
              mountpoint := Mountpoint
             }, Batch) ->
             }, Batch) ->
-    Module:send(Connection, [emqx_portal_msg:to_export(Module, Mountpoint, M) || M <- Batch]).
+    Module:send(Connection, [emqx_bridge_msg:to_export(Module, Mountpoint, M) || M <- Batch]).
 
 
 format_mountpoint(undefined) ->
 format_mountpoint(undefined) ->
     undefined;
     undefined;

+ 5 - 5
src/portal/emqx_portal_mqtt.erl

@@ -12,10 +12,10 @@
 %% See the License for the specific language governing permissions and
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %% limitations under the License.
 
 
-%% @doc This module implements EMQX Portal transport layer on top of MQTT protocol
+%% @doc This module implements EMQX Bridge transport layer on top of MQTT protocol
 
 
--module(emqx_portal_mqtt).
--behaviour(emqx_portal_connect).
+-module(emqx_bridge_mqtt).
+-behaviour(emqx_bridge_connect).
 
 
 %% behaviour callbacks
 %% behaviour callbacks
 -export([start/1,
 -export([start/1,
@@ -154,7 +154,7 @@ match_acks(Parent, Acked, Sent) ->
 match_acks_1(_Parent, {empty, Empty}, Sent) -> {Empty, Sent};
 match_acks_1(_Parent, {empty, Empty}, Sent) -> {Empty, Sent};
 match_acks_1(Parent, {{value, PktId}, Acked}, [?REF_IDS(Ref, [PktId]) | Sent]) ->
 match_acks_1(Parent, {{value, PktId}, Acked}, [?REF_IDS(Ref, [PktId]) | Sent]) ->
     %% batch finished
     %% batch finished
-    ok = emqx_portal:handle_ack(Parent, Ref),
+    ok = emqx_bridge:handle_ack(Parent, Ref),
     match_acks(Parent, Acked, Sent);
     match_acks(Parent, Acked, Sent);
 match_acks_1(Parent, {{value, PktId}, Acked}, [?REF_IDS(Ref, [PktId | RestIds]) | Sent]) ->
 match_acks_1(Parent, {{value, PktId}, Acked}, [?REF_IDS(Ref, [PktId | RestIds]) | Sent]) ->
     %% one message finished, but not the whole batch
     %% one message finished, but not the whole batch
@@ -171,7 +171,7 @@ handle_puback(AckCollector, #{packet_id := PktId, reason_code := RC}) ->
 %% Message published from remote broker. Import to local broker.
 %% Message published from remote broker. Import to local broker.
 import_msg(Msg) ->
 import_msg(Msg) ->
     %% auto-ack should be enabled in emqx_client, hence dummy ack-fun.
     %% auto-ack should be enabled in emqx_client, hence dummy ack-fun.
-    emqx_portal:import_batch([Msg], _AckFun = fun() -> ok end).
+    emqx_bridge:import_batch([Msg], _AckFun = fun() -> ok end).
 
 
 make_hdlr(Parent, AckCollector, Ref) ->
 make_hdlr(Parent, AckCollector, Ref) ->
     #{puback => fun(Ack) -> handle_puback(AckCollector, Ack) end,
     #{puback => fun(Ack) -> handle_puback(AckCollector, Ack) end,

+ 4 - 4
src/portal/emqx_portal_msg.erl

@@ -12,7 +12,7 @@
 %% See the License for the specific language governing permissions and
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %% limitations under the License.
 
 
--module(emqx_portal_msg).
+-module(emqx_bridge_msg).
 
 
 -export([ to_binary/1
 -export([ to_binary/1
         , from_binary/1
         , from_binary/1
@@ -37,9 +37,9 @@
 %% Shame that we have to know the callback module here
 %% Shame that we have to know the callback module here
 %% would be great if we can get rid of #mqtt_msg{} record
 %% would be great if we can get rid of #mqtt_msg{} record
 %% and use #message{} in all places.
 %% and use #message{} in all places.
--spec to_export(emqx_portal_rpc | emqx_portal_mqtt,
+-spec to_export(emqx_bridge_rpc | emqx_bridge_mqtt,
                 undefined | binary(), msg()) -> exp_msg().
                 undefined | binary(), msg()) -> exp_msg().
-to_export(emqx_portal_mqtt, Mountpoint,
+to_export(emqx_bridge_mqtt, Mountpoint,
           #message{topic = Topic,
           #message{topic = Topic,
                    payload = Payload,
                    payload = Payload,
                    flags = Flags
                    flags = Flags
@@ -79,6 +79,6 @@ to_broker_msg(#{qos := QoS, dup := Dup, retain := Retain, topic := Topic,
     %% published from remote node over a MQTT connection
     %% published from remote node over a MQTT connection
     emqx_message:set_headers(Props,
     emqx_message:set_headers(Props,
         emqx_message:set_flags(#{dup => Dup, retain => Retain},
         emqx_message:set_flags(#{dup => Dup, retain => Retain},
-            emqx_message:make(portal, QoS, Topic, Payload))).
+            emqx_message:make(bridge, QoS, Topic, Payload))).
 
 
 topic(Prefix, Topic) -> emqx_topic:prepend(Prefix, Topic).
 topic(Prefix, Topic) -> emqx_topic:prepend(Prefix, Topic).

+ 8 - 8
src/portal/emqx_portal_rpc.erl

@@ -12,10 +12,10 @@
 %% See the License for the specific language governing permissions and
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %% limitations under the License.
 
 
-%% @doc This module implements EMQX Portal transport layer based on gen_rpc.
+%% @doc This module implements EMQX Bridge transport layer based on gen_rpc.
 
 
--module(emqx_portal_rpc).
--behaviour(emqx_portal_connect).
+-module(emqx_bridge_rpc).
+-behaviour(emqx_bridge_connect).
 
 
 %% behaviour callbacks
 %% behaviour callbacks
 -export([start/1,
 -export([start/1,
@@ -29,8 +29,8 @@
         , heartbeat/2
         , heartbeat/2
         ]).
         ]).
 
 
--type ack_ref() :: emqx_portal:ack_ref().
--type batch() :: emqx_portal:batch().
+-type ack_ref() :: emqx_bridge:ack_ref().
+-type batch() :: emqx_bridge:batch().
 
 
 -define(HEARTBEAT_INTERVAL, timer:seconds(1)).
 -define(HEARTBEAT_INTERVAL, timer:seconds(1)).
 
 
@@ -58,7 +58,7 @@ stop(Pid, _Remote) when is_pid(Pid) ->
     end,
     end,
     ok.
     ok.
 
 
-%% @doc Callback for `emqx_portal_connect' behaviour
+%% @doc Callback for `emqx_bridge_connect' behaviour
 -spec send(node(), batch()) -> {ok, ack_ref()} | {error, any()}.
 -spec send(node(), batch()) -> {ok, ack_ref()} | {error, any()}.
 send(Remote, Batch) ->
 send(Remote, Batch) ->
     Sender = self(),
     Sender = self(),
@@ -73,14 +73,14 @@ handle_send(SenderPid, Batch) ->
     SenderNode = node(SenderPid),
     SenderNode = node(SenderPid),
     Ref = make_ref(),
     Ref = make_ref(),
     AckFun = fun() -> ?RPC:cast(SenderNode, ?MODULE, handle_ack, [SenderPid, Ref]), ok end,
     AckFun = fun() -> ?RPC:cast(SenderNode, ?MODULE, handle_ack, [SenderPid, Ref]), ok end,
-    case emqx_portal:import_batch(Batch, AckFun) of
+    case emqx_bridge:import_batch(Batch, AckFun) of
         ok -> {ok, Ref};
         ok -> {ok, Ref};
         Error -> Error
         Error -> Error
     end.
     end.
 
 
 %% @doc Handle batch ack in sender node.
 %% @doc Handle batch ack in sender node.
 handle_ack(SenderPid, Ref) ->
 handle_ack(SenderPid, Ref) ->
-    ok = emqx_portal:handle_ack(SenderPid, Ref).
+    ok = emqx_bridge:handle_ack(SenderPid, Ref).
 
 
 %% @hidden Heartbeat loop
 %% @hidden Heartbeat loop
 heartbeat(Parent, RemoteNode) ->
 heartbeat(Parent, RemoteNode) ->

+ 14 - 14
src/portal/emqx_portal_sup.erl

@@ -12,17 +12,17 @@
 %% See the License for the specific language governing permissions and
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %% limitations under the License.
 
 
--module(emqx_portal_sup).
+-module(emqx_bridge_sup).
 -behavior(supervisor).
 -behavior(supervisor).
 
 
 -include("logger.hrl").
 -include("logger.hrl").
 
 
--export([start_link/0, start_link/1, portals/0]).
--export([create_portal/2, drop_portal/1]).
+-export([start_link/0, start_link/1, bridges/0]).
+-export([create_bridge/2, drop_bridge/1]).
 -export([init/1]).
 -export([init/1]).
 
 
 -define(SUP, ?MODULE).
 -define(SUP, ?MODULE).
--define(WORKER_SUP, emqx_portal_worker_sup).
+-define(WORKER_SUP, emqx_bridge_worker_sup).
 
 
 start_link() -> start_link(?SUP).
 start_link() -> start_link(?SUP).
 
 
@@ -31,28 +31,28 @@ start_link(Name) ->
 
 
 init(?SUP) ->
 init(?SUP) ->
     BridgesConf = emqx_config:get_env(bridges, []),
     BridgesConf = emqx_config:get_env(bridges, []),
-    BridgeSpec = lists:map(fun portal_spec/1, BridgesConf),
+    BridgeSpec = lists:map(fun bridge_spec/1, BridgesConf),
     SupFlag = #{strategy => one_for_one,
     SupFlag = #{strategy => one_for_one,
                 intensity => 100,
                 intensity => 100,
                 period => 10},
                 period => 10},
     {ok, {SupFlag, BridgeSpec}}.
     {ok, {SupFlag, BridgeSpec}}.
 
 
-portal_spec({Name, Config}) ->
+bridge_spec({Name, Config}) ->
     #{id => Name,
     #{id => Name,
-      start => {emqx_portal, start_link, [Name, Config]},
+      start => {emqx_bridge, start_link, [Name, Config]},
       restart => permanent,
       restart => permanent,
       shutdown => 5000,
       shutdown => 5000,
       type => worker,
       type => worker,
-      modules => [emqx_portal]}.
+      modules => [emqx_bridge]}.
 
 
--spec(portals() -> [{node(), map()}]).
-portals() ->
-    [{Name, emqx_portal:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?SUP)].
+-spec(bridges() -> [{node(), map()}]).
+bridges() ->
+    [{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?SUP)].
 
 
-create_portal(Id, Config) ->
-    supervisor:start_child(?SUP, portal_spec({Id, Config})).
+create_bridge(Id, Config) ->
+    supervisor:start_child(?SUP, bridge_spec({Id, Config})).
 
 
-drop_portal(Id) ->
+drop_bridge(Id) ->
     case supervisor:terminate_child(?SUP, Id) of
     case supervisor:terminate_child(?SUP, Id) of
         ok ->
         ok ->
             supervisor:delete_child(?SUP, Id);
             supervisor:delete_child(?SUP, Id);

+ 3 - 3
src/emqx_portal_connect.erl

@@ -12,7 +12,7 @@
 %% See the License for the specific language governing permissions and
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %% limitations under the License.
 
 
--module(emqx_portal_connect).
+-module(emqx_bridge_connect).
 
 
 -export([start/2]).
 -export([start/2]).
 
 
@@ -25,7 +25,7 @@
 -type connection() :: term().
 -type connection() :: term().
 -type conn_ref() :: term().
 -type conn_ref() :: term().
 -type batch() :: emqx_protal:batch().
 -type batch() :: emqx_protal:batch().
--type ack_ref() :: emqx_portal:ack_ref().
+-type ack_ref() :: emqx_bridge:ack_ref().
 -type topic() :: emqx_topic:topic().
 -type topic() :: emqx_topic:topic().
 -type qos() :: emqx_mqtt_types:qos().
 -type qos() :: emqx_mqtt_types:qos().
 
 
@@ -37,7 +37,7 @@
 -callback start(config()) -> {ok, conn_ref(), connection()} | {error, any()}.
 -callback start(config()) -> {ok, conn_ref(), connection()} | {error, any()}.
 
 
 %% send to remote node/cluster
 %% send to remote node/cluster
-%% portal worker (the caller process) should be expecting
+%% bridge worker (the caller process) should be expecting
 %% a message {batch_ack, reference()} when batch is acknowledged by remote node/cluster
 %% a message {batch_ack, reference()} when batch is acknowledged by remote node/cluster
 -callback send(connection(), batch()) -> {ok, ack_ref()} | {error, any()}.
 -callback send(connection(), batch()) -> {ok, ack_ref()} | {error, any()}.
 
 

+ 2 - 3
src/emqx_sup.erl

@@ -61,7 +61,7 @@ init([]) ->
     RouterSup = supervisor_spec(emqx_router_sup),
     RouterSup = supervisor_spec(emqx_router_sup),
     %% Broker Sup
     %% Broker Sup
     BrokerSup = supervisor_spec(emqx_broker_sup),
     BrokerSup = supervisor_spec(emqx_broker_sup),
-    PortalSup = supervisor_spec(emqx_portal_sup),
+    BridgeSup = supervisor_spec(emqx_bridge_sup),
     %% AccessControl
     %% AccessControl
     AccessControl = worker_spec(emqx_access_control),
     AccessControl = worker_spec(emqx_access_control),
     %% Session Manager
     %% Session Manager
@@ -74,7 +74,7 @@ init([]) ->
           [KernelSup,
           [KernelSup,
            RouterSup,
            RouterSup,
            BrokerSup,
            BrokerSup,
-           PortalSup,
+           BridgeSup,
            AccessControl,
            AccessControl,
            SMSup,
            SMSup,
            CMSup,
            CMSup,
@@ -88,4 +88,3 @@ worker_spec(M) ->
     {M, {M, start_link, []}, permanent, 30000, worker, [M]}.
     {M, {M, start_link, []}, permanent, 30000, worker, [M]}.
 supervisor_spec(M) ->
 supervisor_spec(M) ->
     {M, {M, start_link, []}, permanent, infinity, supervisor, [M]}.
     {M, {M, start_link, []}, permanent, infinity, supervisor, [M]}.
-

+ 30 - 30
test/emqx_portal_SUITE.erl

@@ -12,7 +12,7 @@
 %% See the License for the specific language governing permissions and
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %% limitations under the License.
 
 
--module(emqx_portal_SUITE).
+-module(emqx_bridge_SUITE).
 
 
 -export([all/0, init_per_suite/1, end_per_suite/1]).
 -export([all/0, init_per_suite/1, end_per_suite/1]).
 -export([t_rpc/1,
 -export([t_rpc/1,
@@ -48,39 +48,39 @@ t_mngr(Config) when is_list(Config) ->
     Subs = [{<<"a">>, 1}, {<<"b">>, 2}],
     Subs = [{<<"a">>, 1}, {<<"b">>, 2}],
     Cfg = #{address => node(),
     Cfg = #{address => node(),
             forwards => [<<"mngr">>],
             forwards => [<<"mngr">>],
-            connect_module => emqx_portal_rpc,
+            connect_module => emqx_bridge_rpc,
             mountpoint => <<"forwarded">>,
             mountpoint => <<"forwarded">>,
             subscriptions => Subs,
             subscriptions => Subs,
             start_type => auto
             start_type => auto
            },
            },
     Name = ?FUNCTION_NAME,
     Name = ?FUNCTION_NAME,
-    {ok, Pid} = emqx_portal:start_link(Name, Cfg),
+    {ok, Pid} = emqx_bridge:start_link(Name, Cfg),
     try
     try
-        ?assertEqual([<<"mngr">>], emqx_portal:get_forwards(Name)),
-        ?assertEqual(ok, emqx_portal:ensure_forward_present(Name, "mngr")),
-        ?assertEqual(ok, emqx_portal:ensure_forward_present(Name, "mngr2")),
-        ?assertEqual([<<"mngr">>, <<"mngr2">>], emqx_portal:get_forwards(Pid)),
-        ?assertEqual(ok, emqx_portal:ensure_forward_absent(Name, "mngr2")),
-        ?assertEqual(ok, emqx_portal:ensure_forward_absent(Name, "mngr3")),
-        ?assertEqual([<<"mngr">>], emqx_portal:get_forwards(Pid)),
+        ?assertEqual([<<"mngr">>], emqx_bridge:get_forwards(Name)),
+        ?assertEqual(ok, emqx_bridge:ensure_forward_present(Name, "mngr")),
+        ?assertEqual(ok, emqx_bridge:ensure_forward_present(Name, "mngr2")),
+        ?assertEqual([<<"mngr">>, <<"mngr2">>], emqx_bridge:get_forwards(Pid)),
+        ?assertEqual(ok, emqx_bridge:ensure_forward_absent(Name, "mngr2")),
+        ?assertEqual(ok, emqx_bridge:ensure_forward_absent(Name, "mngr3")),
+        ?assertEqual([<<"mngr">>], emqx_bridge:get_forwards(Pid)),
         ?assertEqual({error, no_remote_subscription_support},
         ?assertEqual({error, no_remote_subscription_support},
-                     emqx_portal:ensure_subscription_present(Pid, <<"t">>, 0)),
+                     emqx_bridge:ensure_subscription_present(Pid, <<"t">>, 0)),
         ?assertEqual({error, no_remote_subscription_support},
         ?assertEqual({error, no_remote_subscription_support},
-                     emqx_portal:ensure_subscription_absent(Pid, <<"t">>)),
-        ?assertEqual(Subs, emqx_portal:get_subscriptions(Pid))
+                     emqx_bridge:ensure_subscription_absent(Pid, <<"t">>)),
+        ?assertEqual(Subs, emqx_bridge:get_subscriptions(Pid))
     after
     after
-        ok = emqx_portal:stop(Pid)
+        ok = emqx_bridge:stop(Pid)
     end.
     end.
 
 
 %% A loopback RPC to local node
 %% A loopback RPC to local node
 t_rpc(Config) when is_list(Config) ->
 t_rpc(Config) when is_list(Config) ->
     Cfg = #{address => node(),
     Cfg = #{address => node(),
             forwards => [<<"t_rpc/#">>],
             forwards => [<<"t_rpc/#">>],
-            connect_module => emqx_portal_rpc,
+            connect_module => emqx_bridge_rpc,
             mountpoint => <<"forwarded">>,
             mountpoint => <<"forwarded">>,
             start_type => auto
             start_type => auto
            },
            },
-    {ok, Pid} = emqx_portal:start_link(?FUNCTION_NAME, Cfg),
+    {ok, Pid} = emqx_bridge:start_link(?FUNCTION_NAME, Cfg),
     ClientId = <<"ClientId">>,
     ClientId = <<"ClientId">>,
     try
     try
         {ok, ConnPid} = emqx_mock_client:start_link(ClientId),
         {ok, ConnPid} = emqx_mock_client:start_link(ClientId),
@@ -96,13 +96,13 @@ t_rpc(Config) when is_list(Config) ->
               end, 4000),
               end, 4000),
         emqx_mock_client:close_session(ConnPid)
         emqx_mock_client:close_session(ConnPid)
     after
     after
-        ok = emqx_portal:stop(Pid)
+        ok = emqx_bridge:stop(Pid)
     end.
     end.
 
 
 %% Full data loopback flow explained:
 %% Full data loopback flow explained:
 %% test-pid --->  mock-cleint ----> local-broker ---(local-subscription)--->
 %% test-pid --->  mock-cleint ----> local-broker ---(local-subscription)--->
-%% portal(export) --- (mqtt-connection)--> local-broker ---(remote-subscription) -->
-%% portal(import) --(mecked message sending)--> test-pid
+%% bridge(export) --- (mqtt-connection)--> local-broker ---(remote-subscription) -->
+%% bridge(import) --(mecked message sending)--> test-pid
 t_mqtt(Config) when is_list(Config) ->
 t_mqtt(Config) when is_list(Config) ->
     SendToTopic = <<"t_mqtt/one">>,
     SendToTopic = <<"t_mqtt/one">>,
     SendToTopic2 = <<"t_mqtt/two">>,
     SendToTopic2 = <<"t_mqtt/two">>,
@@ -111,7 +111,7 @@ t_mqtt(Config) when is_list(Config) ->
     ForwardedTopic2 = emqx_topic:join(["forwarded", atom_to_list(node()), SendToTopic2]),
     ForwardedTopic2 = emqx_topic:join(["forwarded", atom_to_list(node()), SendToTopic2]),
     Cfg = #{address => "127.0.0.1:1883",
     Cfg = #{address => "127.0.0.1:1883",
             forwards => [SendToTopic],
             forwards => [SendToTopic],
-            connect_module => emqx_portal_mqtt,
+            connect_module => emqx_bridge_mqtt,
             mountpoint => Mountpoint,
             mountpoint => Mountpoint,
             username => "user",
             username => "user",
             clean_start => true,
             clean_start => true,
@@ -128,26 +128,26 @@ t_mqtt(Config) when is_list(Config) ->
             reconnect_delay_ms => 1000,
             reconnect_delay_ms => 1000,
             ssl => false,
             ssl => false,
             %% Consume back to forwarded message for verification
             %% Consume back to forwarded message for verification
-            %% NOTE: this is a indefenite loopback without mocking emqx_portal:import_batch/2
+            %% NOTE: this is a indefenite loopback without mocking emqx_bridge:import_batch/2
             subscriptions => [{ForwardedTopic, _QoS = 1}],
             subscriptions => [{ForwardedTopic, _QoS = 1}],
             start_type => auto
             start_type => auto
            },
            },
     Tester = self(),
     Tester = self(),
     Ref = make_ref(),
     Ref = make_ref(),
-    meck:new(emqx_portal, [passthrough, no_history]),
-    meck:expect(emqx_portal, import_batch, 2,
+    meck:new(emqx_bridge, [passthrough, no_history]),
+    meck:expect(emqx_bridge, import_batch, 2,
                 fun(Batch, AckFun) ->
                 fun(Batch, AckFun) ->
                         Tester ! {Ref, Batch},
                         Tester ! {Ref, Batch},
                         AckFun()
                         AckFun()
                 end),
                 end),
-    {ok, Pid} = emqx_portal:start_link(?FUNCTION_NAME, Cfg),
+    {ok, Pid} = emqx_bridge:start_link(?FUNCTION_NAME, Cfg),
     ClientId = <<"client-1">>,
     ClientId = <<"client-1">>,
     try
     try
-        ?assertEqual([{ForwardedTopic, 1}], emqx_portal:get_subscriptions(Pid)),
-        ok = emqx_portal:ensure_subscription_present(Pid, ForwardedTopic2, _QoS = 1),
-        ok = emqx_portal:ensure_forward_present(Pid, SendToTopic2),
+        ?assertEqual([{ForwardedTopic, 1}], emqx_bridge:get_subscriptions(Pid)),
+        ok = emqx_bridge:ensure_subscription_present(Pid, ForwardedTopic2, _QoS = 1),
+        ok = emqx_bridge:ensure_forward_present(Pid, SendToTopic2),
         ?assertEqual([{ForwardedTopic, 1},
         ?assertEqual([{ForwardedTopic, 1},
-                      {ForwardedTopic2, 1}], emqx_portal:get_subscriptions(Pid)),
+                      {ForwardedTopic2, 1}], emqx_bridge:get_subscriptions(Pid)),
         {ok, ConnPid} = emqx_mock_client:start_link(ClientId),
         {ok, ConnPid} = emqx_mock_client:start_link(ClientId),
         {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),
         {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),
         %% message from a different client, to avoid getting terminated by no-local
         %% message from a different client, to avoid getting terminated by no-local
@@ -166,8 +166,8 @@ t_mqtt(Config) when is_list(Config) ->
         ok = receive_and_match_messages(Ref, Msgs2),
         ok = receive_and_match_messages(Ref, Msgs2),
         emqx_mock_client:close_session(ConnPid)
         emqx_mock_client:close_session(ConnPid)
     after
     after
-        ok = emqx_portal:stop(Pid),
-        meck:unload(emqx_portal)
+        ok = emqx_bridge:stop(Pid),
+        meck:unload(emqx_bridge)
     end.
     end.
 
 
 receive_and_match_messages(Ref, Msgs) ->
 receive_and_match_messages(Ref, Msgs) ->

+ 4 - 4
test/emqx_portal_mqtt_tests.erl

@@ -12,7 +12,7 @@
 %% See the License for the specific language governing permissions and
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %% limitations under the License.
 
 
--module(emqx_portal_mqtt_tests).
+-module(emqx_bridge_mqtt_tests).
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include("emqx_mqtt.hrl").
 -include("emqx_mqtt.hrl").
 
 
@@ -39,12 +39,12 @@ send_and_ack_test() ->
     try
     try
         Max = 100,
         Max = 100,
         Batch = lists:seq(1, Max),
         Batch = lists:seq(1, Max),
-        {ok, Ref, Conn} = emqx_portal_mqtt:start(#{address => "127.0.0.1:1883"}),
+        {ok, Ref, Conn} = emqx_bridge_mqtt:start(#{address => "127.0.0.1:1883"}),
         %% return last packet id as batch reference
         %% return last packet id as batch reference
-        {ok, AckRef} = emqx_portal_mqtt:send(Conn, Batch),
+        {ok, AckRef} = emqx_bridge_mqtt:send(Conn, Batch),
         %% expect batch ack
         %% expect batch ack
         receive {batch_ack, AckRef} -> ok end,
         receive {batch_ack, AckRef} -> ok end,
-        ok = emqx_portal_mqtt:stop(Ref, Conn)
+        ok = emqx_bridge_mqtt:stop(Ref, Conn)
     after
     after
         meck:unload(emqx_client)
         meck:unload(emqx_client)
     end.
     end.

+ 7 - 7
test/emqx_portal_rpc_tests.erl

@@ -12,7 +12,7 @@
 %% See the License for the specific language governing permissions and
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %% limitations under the License.
 
 
--module(emqx_portal_rpc_tests).
+-module(emqx_bridge_rpc_tests).
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
 
 send_and_ack_test() ->
 send_and_ack_test() ->
@@ -26,18 +26,18 @@ send_and_ack_test() ->
                 fun(Node, Module, Fun, Args) ->
                 fun(Node, Module, Fun, Args) ->
                         rpc:cast(Node, Module, Fun, Args)
                         rpc:cast(Node, Module, Fun, Args)
                 end),
                 end),
-    meck:new(emqx_portal, [passthrough, no_history]),
-    meck:expect(emqx_portal, import_batch, 2,
+    meck:new(emqx_bridge, [passthrough, no_history]),
+    meck:expect(emqx_bridge, import_batch, 2,
                 fun(batch, AckFun) -> AckFun() end),
                 fun(batch, AckFun) -> AckFun() end),
     try
     try
-        {ok, Pid, Node} = emqx_portal_rpc:start(#{address => node()}),
-        {ok, Ref} = emqx_portal_rpc:send(Node, batch),
+        {ok, Pid, Node} = emqx_bridge_rpc:start(#{address => node()}),
+        {ok, Ref} = emqx_bridge_rpc:send(Node, batch),
         receive
         receive
             {batch_ack, Ref} ->
             {batch_ack, Ref} ->
                 ok
                 ok
         end,
         end,
-        ok = emqx_portal_rpc:stop(Pid, Node)
+        ok = emqx_bridge_rpc:stop(Pid, Node)
     after
     after
         meck:unload(gen_rpc),
         meck:unload(gen_rpc),
-        meck:unload(emqx_portal)
+        meck:unload(emqx_bridge)
     end.
     end.

+ 28 - 28
test/emqx_portal_tests.erl

@@ -12,15 +12,15 @@
 %% See the License for the specific language governing permissions and
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %% limitations under the License.
 
 
--module(emqx_portal_tests).
--behaviour(emqx_portal_connect).
+-module(emqx_bridge_tests).
+-behaviour(emqx_bridge_connect).
 
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include("emqx.hrl").
 -include("emqx.hrl").
 -include("emqx_mqtt.hrl").
 -include("emqx_mqtt.hrl").
 
 
--define(PORTAL_NAME, test).
--define(PORTAL_REG_NAME, emqx_portal_test).
+-define(BRIDGE_NAME, test).
+-define(BRIDGE_REG_NAME, emqx_bridge_test).
 -define(WAIT(PATTERN, TIMEOUT),
 -define(WAIT(PATTERN, TIMEOUT),
         receive
         receive
             PATTERN ->
             PATTERN ->
@@ -45,29 +45,29 @@ send(SendFun, Batch) when is_function(SendFun, 1) ->
 
 
 stop(_Ref, _Pid) -> ok.
 stop(_Ref, _Pid) -> ok.
 
 
-%% portal worker should retry connecting remote node indefinitely
+%% bridge worker should retry connecting remote node indefinitely
 reconnect_test() ->
 reconnect_test() ->
     Ref = make_ref(),
     Ref = make_ref(),
     Config = make_config(Ref, self(), {error, test}),
     Config = make_config(Ref, self(), {error, test}),
-    {ok, Pid} = emqx_portal:start_link(?PORTAL_NAME, Config),
+    {ok, Pid} = emqx_bridge:start_link(?BRIDGE_NAME, Config),
     %% assert name registered
     %% assert name registered
-    ?assertEqual(Pid, whereis(?PORTAL_REG_NAME)),
+    ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)),
     ?WAIT({connection_start_attempt, Ref}, 1000),
     ?WAIT({connection_start_attempt, Ref}, 1000),
     %% expect same message again
     %% expect same message again
     ?WAIT({connection_start_attempt, Ref}, 1000),
     ?WAIT({connection_start_attempt, Ref}, 1000),
-    ok = emqx_portal:stop(?PORTAL_REG_NAME),
+    ok = emqx_bridge:stop(?BRIDGE_REG_NAME),
     ok.
     ok.
 
 
 %% connect first, disconnect, then connect again
 %% connect first, disconnect, then connect again
 disturbance_test() ->
 disturbance_test() ->
     Ref = make_ref(),
     Ref = make_ref(),
     Config = make_config(Ref, self(), {ok, Ref, connection}),
     Config = make_config(Ref, self(), {ok, Ref, connection}),
-    {ok, Pid} = emqx_portal:start_link(?PORTAL_NAME, Config),
-    ?assertEqual(Pid, whereis(?PORTAL_REG_NAME)),
+    {ok, Pid} = emqx_bridge:start_link(?BRIDGE_NAME, Config),
+    ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)),
     ?WAIT({connection_start_attempt, Ref}, 1000),
     ?WAIT({connection_start_attempt, Ref}, 1000),
     Pid ! {disconnected, Ref, test},
     Pid ! {disconnected, Ref, test},
     ?WAIT({connection_start_attempt, Ref}, 1000),
     ?WAIT({connection_start_attempt, Ref}, 1000),
-    ok = emqx_portal:stop(?PORTAL_REG_NAME).
+    ok = emqx_bridge:stop(?BRIDGE_REG_NAME).
 
 
 %% buffer should continue taking in messages when disconnected
 %% buffer should continue taking in messages when disconnected
 buffer_when_disconnected_test_() ->
 buffer_when_disconnected_test_() ->
@@ -76,9 +76,9 @@ buffer_when_disconnected_test_() ->
 test_buffer_when_disconnected() ->
 test_buffer_when_disconnected() ->
     Ref = make_ref(),
     Ref = make_ref(),
     Nums = lists:seq(1, 100),
     Nums = lists:seq(1, 100),
-    Sender = spawn_link(fun() -> receive {portal, Pid} -> sender_loop(Pid, Nums, _Interval = 5) end end),
+    Sender = spawn_link(fun() -> receive {bridge, Pid} -> sender_loop(Pid, Nums, _Interval = 5) end end),
     SenderMref = monitor(process, Sender),
     SenderMref = monitor(process, Sender),
-    Receiver = spawn_link(fun() -> receive {portal, Pid} -> receiver_loop(Pid, Nums, _Interval = 1) end end),
+    Receiver = spawn_link(fun() -> receive {bridge, Pid} -> receiver_loop(Pid, Nums, _Interval = 1) end end),
     ReceiverMref = monitor(process, Receiver),
     ReceiverMref = monitor(process, Receiver),
     SendFun = fun(Batch) ->
     SendFun = fun(Batch) ->
                       BatchRef = make_ref(),
                       BatchRef = make_ref(),
@@ -87,44 +87,44 @@ test_buffer_when_disconnected() ->
               end,
               end,
     Config0 = make_config(Ref, false, {ok, Ref, SendFun}),
     Config0 = make_config(Ref, false, {ok, Ref, SendFun}),
     Config = Config0#{reconnect_delay_ms => 100},
     Config = Config0#{reconnect_delay_ms => 100},
-    {ok, Pid} = emqx_portal:start_link(?PORTAL_NAME, Config),
-    Sender ! {portal, Pid},
-    Receiver ! {portal, Pid},
-    ?assertEqual(Pid, whereis(?PORTAL_REG_NAME)),
+    {ok, Pid} = emqx_bridge:start_link(?BRIDGE_NAME, Config),
+    Sender ! {bridge, Pid},
+    Receiver ! {bridge, Pid},
+    ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)),
     Pid ! {disconnected, Ref, test},
     Pid ! {disconnected, Ref, test},
     ?WAIT({'DOWN', SenderMref, process, Sender, normal}, 5000),
     ?WAIT({'DOWN', SenderMref, process, Sender, normal}, 5000),
     ?WAIT({'DOWN', ReceiverMref, process, Receiver, normal}, 1000),
     ?WAIT({'DOWN', ReceiverMref, process, Receiver, normal}, 1000),
-    ok = emqx_portal:stop(?PORTAL_REG_NAME).
+    ok = emqx_bridge:stop(?BRIDGE_REG_NAME).
 
 
 manual_start_stop_test() ->
 manual_start_stop_test() ->
     Ref = make_ref(),
     Ref = make_ref(),
     Config0 = make_config(Ref, self(), {ok, Ref, connection}),
     Config0 = make_config(Ref, self(), {ok, Ref, connection}),
     Config = Config0#{start_type := manual},
     Config = Config0#{start_type := manual},
-    {ok, Pid} = emqx_portal:ensure_started(?PORTAL_NAME, Config),
+    {ok, Pid} = emqx_bridge:ensure_started(?BRIDGE_NAME, Config),
     %% call ensure_started again should yeld the same result
     %% call ensure_started again should yeld the same result
-    {ok, Pid} = emqx_portal:ensure_started(?PORTAL_NAME, Config),
-    ?assertEqual(Pid, whereis(?PORTAL_REG_NAME)),
+    {ok, Pid} = emqx_bridge:ensure_started(?BRIDGE_NAME, Config),
+    ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)),
     ?assertEqual({error, standing_by},
     ?assertEqual({error, standing_by},
-                 emqx_portal:ensure_forward_present(Pid, "dummy")),
-    emqx_portal:ensure_stopped(unknown),
-    emqx_portal:ensure_stopped(Pid),
-    emqx_portal:ensure_stopped(?PORTAL_REG_NAME).
+                 emqx_bridge:ensure_forward_present(Pid, "dummy")),
+    emqx_bridge:ensure_stopped(unknown),
+    emqx_bridge:ensure_stopped(Pid),
+    emqx_bridge:ensure_stopped(?BRIDGE_REG_NAME).
 
 
-%% Feed messages to portal
+%% Feed messages to bridge
 sender_loop(_Pid, [], _) -> exit(normal);
 sender_loop(_Pid, [], _) -> exit(normal);
 sender_loop(Pid, [Num | Rest], Interval) ->
 sender_loop(Pid, [Num | Rest], Interval) ->
     random_sleep(Interval),
     random_sleep(Interval),
     Pid ! {dispatch, dummy, make_msg(Num)},
     Pid ! {dispatch, dummy, make_msg(Num)},
     sender_loop(Pid, Rest, Interval).
     sender_loop(Pid, Rest, Interval).
 
 
-%% Feed acknowledgments to portal
+%% Feed acknowledgments to bridge
 receiver_loop(_Pid, [], _) -> ok;
 receiver_loop(_Pid, [], _) -> ok;
 receiver_loop(Pid, Nums, Interval) ->
 receiver_loop(Pid, Nums, Interval) ->
     receive
     receive
         {batch, BatchRef, Batch} ->
         {batch, BatchRef, Batch} ->
             Rest = match_nums(Batch, Nums),
             Rest = match_nums(Batch, Nums),
             random_sleep(Interval),
             random_sleep(Interval),
-            emqx_portal:handle_ack(Pid, BatchRef),
+            emqx_bridge:handle_ack(Pid, BatchRef),
             receiver_loop(Pid, Rest, Interval)
             receiver_loop(Pid, Rest, Interval)
     end.
     end.
 
 

+ 3 - 3
test/emqx_ct_broker_helpers.erl

@@ -170,13 +170,13 @@ flush(Msgs) ->
 
 
 bridge_conf() ->
 bridge_conf() ->
     [ {local_rpc,
     [ {local_rpc,
-        [{connect_module, emqx_portal_rpc},
+        [{connect_module, emqx_bridge_rpc},
          {address, node()},
          {address, node()},
-         {forwards, ["portal-1/#", "portal-2/#"]}
+         {forwards, ["bridge-1/#", "bridge-2/#"]}
         ]}
         ]}
     ].
     ].
     % [{aws,
     % [{aws,
-    %   [{connect_module, emqx_portal_mqtt},
+    %   [{connect_module, emqx_bridge_mqtt},
     %   {username,"user"},
     %   {username,"user"},
     %    {address,"127.0.0.1:1883"},
     %    {address,"127.0.0.1:1883"},
     %    {clean_start,true},
     %    {clean_start,true},