Просмотр исходного кода

fix(shared_sub): Dispatch shared messages via gen_rpc

ieQu1 3 лет назад
Родитель
Сommit
9dfa82b448

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -26,6 +26,7 @@
 {emqx_resource,1}.
 {emqx_retainer,1}.
 {emqx_rule_engine,1}.
+{emqx_shared_sub,1}.
 {emqx_slow_subs,1}.
 {emqx_statsd,1}.
 {emqx_telemetry,1}.

+ 25 - 12
apps/emqx/src/emqx_shared_sub.erl

@@ -38,7 +38,8 @@
 
 -export([
     dispatch/3,
-    dispatch/4
+    dispatch/4,
+    do_dispatch_with_ack/4
 ]).
 
 -export([
@@ -170,30 +171,31 @@ do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
 %% return either 'ok' (when everything is fine) or 'error'
 do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
     %% For QoS 0 message, send it as regular dispatch
-    SubPid ! {deliver, Topic, Msg},
-    ok;
+    send(SubPid, Topic, {deliver, Topic, Msg});
 do_dispatch(SubPid, _Group, Topic, Msg, retry) ->
     %% Retry implies all subscribers nack:ed, send again without ack
-    SubPid ! {deliver, Topic, Msg},
-    ok;
+    send(SubPid, Topic, {deliver, Topic, Msg});
 do_dispatch(SubPid, Group, Topic, Msg, fresh) ->
     case ack_enabled() of
         true ->
-            dispatch_with_ack(SubPid, Group, Topic, Msg);
+            %% FIXME: replace with `emqx_shared_sub_proto:dispatch_with_ack' in 5.2
+            do_dispatch_with_ack(SubPid, Group, Topic, Msg);
         false ->
-            SubPid ! {deliver, Topic, Msg},
-            ok
+            send(SubPid, Topic, {deliver, Topic, Msg})
     end.
 
-dispatch_with_ack(SubPid, Group, Topic, Msg) ->
+-spec do_dispatch_with_ack(pid(), emqx_types:group(), emqx_types:topic(), emqx_types:message()) ->
+    ok | {error, _}.
+do_dispatch_with_ack(SubPid, Group, Topic, Msg) ->
     %% For QoS 1/2 message, expect an ack
     Ref = erlang:monitor(process, SubPid),
     Sender = self(),
-    SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)},
+    %% FIXME: replace with regular send in 5.2
+    send(SubPid, Topic, {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)}),
     Timeout =
         case Msg#message.qos of
-            ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS);
-            ?QOS_2 -> infinity
+            ?QOS_2 -> infinity;
+            _ -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS)
         end,
     try
         receive
@@ -412,6 +414,17 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
+send(Pid, Topic, Msg) ->
+    Node = node(Pid),
+    _ =
+        case Node =:= node() of
+            true ->
+                Pid ! Msg;
+            false ->
+                emqx_shared_sub_proto_v1:send(Node, Pid, Topic, Msg)
+        end,
+    ok.
+
 maybe_insert_round_robin_count({Group, _Topic} = GroupTopic) ->
     strategy(Group) =:= round_robin_per_group andalso
         ets:insert(?SHARED_SUBS_ROUND_ROBIN_COUNTER, {GroupTopic, 0}),

+ 50 - 0
apps/emqx/src/proto/emqx_shared_sub_proto_v1.erl

@@ -0,0 +1,50 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_shared_sub_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+    send/4,
+    dispatch_with_ack/5
+]).
+
+-include("bpapi.hrl").
+
+%%================================================================================
+%% behavior callbacks
+%%================================================================================
+
+introduced_in() ->
+    "5.0.8".
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+-spec send(node(), pid(), emqx_types:topic(), term()) -> true.
+send(Node, Pid, Topic, Msg) ->
+    emqx_rpc:cast(Topic, Node, erlang, send, [Pid, Msg]).
+
+-spec dispatch_with_ack(
+    pid(), emqx_types:group(), emqx_types:topic(), emqx_types:message(), timeout()
+) ->
+    ok | {error, _}.
+dispatch_with_ack(Pid, Group, Topic, Msg, Timeout) ->
+    emqx_rpc:call(
+        Topic, node(Pid), emqx_shared_sub, do_dispatch_with_ack, [Pid, Group, Topic, Msg], Timeout
+    ).

+ 53 - 0
apps/emqx/test/emqx_shared_sub_SUITE.erl

@@ -567,6 +567,59 @@ t_local(_) ->
     ?assertNotEqual(UsedSubPid1, UsedSubPid2),
     ok.
 
+t_remote(_) ->
+    %% This testcase verifies dispatching of shared messages to the remote nodes via backplane API.
+    %%
+    %% In this testcase we start two EMQX nodes: local and remote.
+    %% A subscriber connects to the remote node.
+    %% A publisher connects to the local node and sends three messages with different QoS.
+    %% The test verifies that the remote side received all three messages.
+    ok = ensure_config(sticky, true),
+    GroupConfig = #{
+        <<"local_group">> => local,
+        <<"round_robin_group">> => round_robin,
+        <<"sticky_group">> => sticky
+    },
+
+    Node = start_slave('remote_shared_sub_testtesttest', 21999),
+    ok = ensure_group_config(GroupConfig),
+    ok = ensure_group_config(Node, GroupConfig),
+
+    Topic = <<"foo/bar">>,
+    ClientIdLocal = <<"ClientId1">>,
+    ClientIdRemote = <<"ClientId2">>,
+
+    {ok, ConnPidLocal} = emqtt:start_link([{clientid, ClientIdLocal}]),
+    {ok, ConnPidRemote} = emqtt:start_link([{clientid, ClientIdRemote}, {port, 21999}]),
+
+    try
+        {ok, ClientPidLocal} = emqtt:connect(ConnPidLocal),
+        {ok, ClientPidRemote} = emqtt:connect(ConnPidRemote),
+
+        emqtt:subscribe(ConnPidRemote, {<<"$share/remote_group/", Topic/binary>>, 0}),
+
+        ct:sleep(100),
+
+        Message1 = emqx_message:make(ClientPidLocal, 0, Topic, <<"hello1">>),
+        Message2 = emqx_message:make(ClientPidLocal, 1, Topic, <<"hello2">>),
+        Message3 = emqx_message:make(ClientPidLocal, 2, Topic, <<"hello3">>),
+
+        emqx:publish(Message1),
+        {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPidRemote]),
+
+        emqx:publish(Message2),
+        {true, UsedSubPid1} = last_message(<<"hello2">>, [ConnPidRemote]),
+
+        emqx:publish(Message3),
+        {true, UsedSubPid1} = last_message(<<"hello3">>, [ConnPidRemote]),
+
+        ok
+    after
+        emqtt:stop(ConnPidLocal),
+        emqtt:stop(ConnPidRemote),
+        stop_slave(Node)
+    end.
+
 t_local_fallback(_) ->
     ok = ensure_group_config(#{
         <<"local_group">> => local,