Sfoglia il codice sorgente

Merge pull request #6701 from k32/bpapi-persistent-session

refactor(persistent_session): Decorate API calls
k32 4 anni fa
parent
commit
24251039d5

+ 2 - 2
apps/emqx/src/emqx_persistent_session.erl

@@ -309,11 +309,11 @@ resume(ClientInfo = #{clientid := ClientID}, ConnInfo, Session) ->
     {Session2, Pendings4 ++ WriterPendings}.
     {Session2, Pendings4 ++ WriterPendings}.
 
 
 resume_begin(Nodes, SessionID) ->
 resume_begin(Nodes, SessionID) ->
-    Res = erpc:multicall(Nodes, emqx_session_router, resume_begin, [self(), SessionID]),
+    Res = emqx_persistent_session_proto_v1:resume_begin(Nodes, self(), SessionID),
     [{Node, Marker} || {{ok, {ok, Marker}}, Node} <- lists:zip(Res, Nodes)].
     [{Node, Marker} || {{ok, {ok, Marker}}, Node} <- lists:zip(Res, Nodes)].
 
 
 resume_end(Nodes, SessionID) ->
 resume_end(Nodes, SessionID) ->
-    Res = erpc:multicall(Nodes, emqx_session_router, resume_end, [self(), SessionID]),
+    Res = emqx_persistent_session_proto_v1:resume_end(Nodes, self(), SessionID),
     ?tp(ps_erpc_multical_result, #{ res => Res, sid => SessionID }),
     ?tp(ps_erpc_multical_result, #{ res => Res, sid => SessionID }),
     %% TODO: Should handle the errors
     %% TODO: Should handle the errors
     [ {deliver, STopic, M}
     [ {deliver, STopic, M}

+ 10 - 0
apps/emqx/src/emqx_rpc.erl

@@ -31,6 +31,8 @@
              , call_result/0
              , call_result/0
              , cast_result/0
              , cast_result/0
              , multicall_result/0
              , multicall_result/0
+             , erpc/1
+             , erpc_multicast/1
              ]).
              ]).
 
 
 -compile({inline,
 -compile({inline,
@@ -48,6 +50,14 @@
 
 
 -type multicall_result() :: {_Results :: [term()], _BadNodes :: [node()]}.
 -type multicall_result() :: {_Results :: [term()], _BadNodes :: [node()]}.
 
 
+-type erpc(Ret) :: {ok, Ret}
+                 | {throw, _Err}
+                 | {exit, {exception | signal, _Reason}}
+                 | {error, {exception, _Reason, _Stack :: list()}}
+                 | {error, {erpc, _Reason}}.
+
+-type erpc_multicast(Ret) :: [erpc(Ret)].
+
 -spec call(node(), module(), atom(), list()) -> call_result().
 -spec call(node(), module(), atom(), list()) -> call_result().
 call(Node, Mod, Fun, Args) ->
 call(Node, Mod, Fun, Args) ->
     filter_result(gen_rpc:call(rpc_node(Node), Mod, Fun, Args)).
     filter_result(gen_rpc:call(rpc_node(Node), Mod, Fun, Args)).

+ 40 - 0
apps/emqx/src/proto/emqx_persistent_session_proto_v1.erl

@@ -0,0 +1,40 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_persistent_session_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([ introduced_in/0
+        , resume_begin/3
+        , resume_end/3
+        ]).
+
+-include("bpapi.hrl").
+-include("emqx.hrl").
+
+introduced_in() ->
+    "5.0.0".
+
+-spec resume_begin([node()], pid(), binary()) ->
+          emqx_rpc:erpc_multicall([{node(), emqx_guid:guid()}]).
+resume_begin(Nodes, Pid, SessionID) when is_pid(Pid), is_binary(SessionID) ->
+    erpc:multicall(Nodes, emqx_session_router, resume_begin, [Pid, SessionID]).
+
+-spec resume_end([node()], pid(), binary()) ->
+          emqx_rpc:erpc_multicall({'ok', [emqx_types:message()]} | {'error', term()}).
+resume_end(Nodes, Pid, SessionID) when is_pid(Pid), is_binary(SessionID) ->
+    erpc:multicall(Nodes, emqx_session_router, resume_end, [Pid, SessionID]).