Просмотр исходного кода

chore(gw): add From param for _channel:handle_call/3

JianBo He 4 лет назад
Родитель
Сommit
c42c1e698a

+ 6 - 2
apps/emqx_gateway/src/bhvrs/emqx_gateway_channel.erl

@@ -44,6 +44,8 @@
 
 -type conn_state() :: idle | connecting | connected | disconnected | atom().
 
+-type gen_server_from() :: {pid(), Tag :: term()}.
+
 -type reply() :: {outgoing, emqx_gateway_frame:packet()}
                | {outgoing, [emqx_gateway_frame:packet()]}
                | {event, conn_state() | updated}
@@ -71,11 +73,13 @@
      | {shutdown, Reason :: any(), channel()}.
 
 %% @doc Handle the custom gen_server:call/2 for its connection process
--callback handle_call(Req :: any(), channel())
+-callback handle_call(Req :: any(), From :: gen_server_from(), channel())
     -> {reply, Reply :: any(), channel()}
      %% Reply to caller and trigger an event(s)
      | {reply, Reply :: any(),
-        EventOrEvents:: tuple() | list(tuple()), channel()}
+        EventOrEvents :: tuple() | list(tuple()), channel()}
+     | {noreply, channel()}
+     | {noreply, EventOrEvents :: tuple() | list(tuple()), channel()}
      | {shutdown, Reason :: any(), Reply :: any(), channel()}
      %% Shutdown the process, reply to caller and write a packet to client
      | {shutdown, Reason :: any(), Reply :: any(),

+ 10 - 4
apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl

@@ -394,6 +394,10 @@ append_msg(Q, Msg) ->
 
 handle_msg({'$gen_call', From, Req}, State) ->
     case handle_call(From, Req, State) of
+        {noreply, NState} ->
+            {ok, NState};
+        {noreply, Msgs, NState} ->
+            {ok, next_msgs(Msgs), NState};
         {reply, Reply, NState} ->
             gen_server:reply(From, Reply),
             {ok, NState};
@@ -545,10 +549,14 @@ handle_call(_From, info, State) ->
 handle_call(_From, stats, State) ->
     {reply, stats(State), State};
 
-handle_call(_From, Req, State = #state{
+handle_call(From, Req, State = #state{
                                    chann_mod = ChannMod,
                                    channel = Channel}) ->
-    case ChannMod:handle_call(Req, Channel) of
+    case ChannMod:handle_call(Req, From, Channel) of
+        {noreply, NChannel} ->
+            {noreply, State#state{channel = NChannel}};
+        {noreply, Msgs, NChannel} ->
+            {noreply, Msgs, State#state{channel = NChannel}};
         {reply, Reply, NChannel} ->
             {reply, Reply, State#state{channel = NChannel}};
         {reply, Reply, Msgs, NChannel} ->
@@ -559,8 +567,6 @@ handle_call(_From, Req, State = #state{
             NState = State#state{channel = NChannel},
             ok = handle_outgoing(Packet, NState),
             shutdown(Reason, Reply, NState)
-
-
     end.
 
 %%--------------------------------------------------------------------

+ 2 - 2
apps/emqx_gateway/src/coap/emqx_coap_channel.erl

@@ -34,7 +34,7 @@
         , terminate/2
         ]).
 
--export([ handle_call/2
+-export([ handle_call/3
         , handle_cast/2
         , handle_info/2
         ]).
@@ -165,7 +165,7 @@ handle_timeout(_, _, Channel) ->
 %%--------------------------------------------------------------------
 %% Handle call
 %%--------------------------------------------------------------------
-handle_call(Req, Channel) ->
+handle_call(Req, _From, Channel) ->
     ?LOG(error, "Unexpected call: ~p", [Req]),
     {reply, ignored, Channel}.
 

+ 16 - 15
apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl

@@ -31,7 +31,7 @@
         , handle_in/2
         , handle_deliver/2
         , handle_timeout/3
-        , handle_call/2
+        , handle_call/3
         , handle_cast/2
         , handle_info/2
         , terminate/2
@@ -243,23 +243,24 @@ handle_timeout(_TRef, Msg, Channel) ->
     ?WARN("Unexpected timeout: ~p", [Msg]),
     {ok, Channel}.
 
--spec handle_call(any(), channel())
+-spec handle_call(Req :: any(), From :: any(), channel())
     -> {reply, Reply :: term(), channel()}
      | {reply, Reply :: term(), replies(), channel()}
      | {shutdown, Reason :: term(), Reply :: term(), channel()}.
 
-handle_call({send, Data}, Channel) ->
+handle_call({send, Data}, _From, Channel) ->
     {reply, ok, [{outgoing, Data}], Channel};
 
-handle_call(close, Channel = #channel{conn_state = connected}) ->
+handle_call(close, _From, Channel = #channel{conn_state = connected}) ->
     {reply, ok, [{event, disconnected}, {close, normal}], Channel};
-handle_call(close, Channel) ->
+handle_call(close, _From, Channel) ->
     {reply, ok, [{close, normal}], Channel};
 
-handle_call({auth, ClientInfo, _Password}, Channel = #channel{conn_state = connected}) ->
+handle_call({auth, ClientInfo, _Password}, _From,
+            Channel = #channel{conn_state = connected}) ->
     ?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]),
     {reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel};
-handle_call({auth, ClientInfo0, Password},
+handle_call({auth, ClientInfo0, Password}, _From,
             Channel = #channel{
                          ctx = Ctx,
                          conninfo = ConnInfo,
@@ -300,7 +301,7 @@ handle_call({auth, ClientInfo0, Password},
             {reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel}
     end;
 
-handle_call({start_timer, keepalive, Interval},
+handle_call({start_timer, keepalive, Interval}, _From,
             Channel = #channel{
                          conninfo = ConnInfo,
                          clientinfo = ClientInfo
@@ -310,7 +311,7 @@ handle_call({start_timer, keepalive, Interval},
     NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo},
     {reply, ok, ensure_keepalive(NChannel)};
 
-handle_call({subscribe_from_client, TopicFilter, Qos},
+handle_call({subscribe_from_client, TopicFilter, Qos}, _From,
             Channel = #channel{
                          ctx = Ctx,
                          conn_state = connected,
@@ -323,20 +324,20 @@ handle_call({subscribe_from_client, TopicFilter, Qos},
             {reply, ok, NChannel}
     end;
 
-handle_call({subscribe, Topic, SubOpts}, Channel) ->
+handle_call({subscribe, Topic, SubOpts}, _From, Channel) ->
     {ok, NChannel} = do_subscribe([{Topic, SubOpts}], Channel),
     {reply, ok, NChannel};
 
-handle_call({unsubscribe_from_client, TopicFilter},
+handle_call({unsubscribe_from_client, TopicFilter}, _From,
             Channel = #channel{conn_state = connected}) ->
     {ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel),
     {reply, ok, NChannel};
 
-handle_call({unsubscribe, Topic}, Channel) ->
+handle_call({unsubscribe, Topic}, _From, Channel) ->
     {ok, NChannel} = do_unsubscribe([Topic], Channel),
     {reply, ok, NChannel};
 
-handle_call({publish, Topic, Qos, Payload},
+handle_call({publish, Topic, Qos, Payload}, _From,
             Channel = #channel{
                          ctx = Ctx,
                          conn_state = connected,
@@ -353,10 +354,10 @@ handle_call({publish, Topic, Qos, Payload},
             {reply, ok, Channel}
     end;
 
-handle_call(kick, Channel) ->
+handle_call(kick, _From, Channel) ->
     {shutdown, kicked, ok, Channel};
 
-handle_call(Req, Channel) ->
+handle_call(Req, _From, Channel) ->
     ?LOG(warning, "Unexpected call: ~p", [Req]),
     {reply, {error, unexpected_call}, Channel}.
 

+ 2 - 2
apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl

@@ -35,7 +35,7 @@
         , terminate/2
         ]).
 
--export([ handle_call/2
+-export([ handle_call/3
         , handle_cast/2
         , handle_info/2
         ]).
@@ -152,7 +152,7 @@ handle_timeout(_, _, Channel) ->
 %%--------------------------------------------------------------------
 %% Handle call
 %%--------------------------------------------------------------------
-handle_call(Req, Channel) ->
+handle_call(Req, _From, Channel) ->
     ?LOG(error, "Unexpected call: ~p", [Req]),
     {reply, ignored, Channel}.
 

+ 17 - 16
apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl

@@ -38,7 +38,7 @@
         , set_conn_state/2
         ]).
 
--export([ handle_call/2
+-export([ handle_call/3
         , handle_cast/2
         , handle_info/2
         ]).
@@ -1113,12 +1113,13 @@ message_to_packet(MsgId, Message,
 %% Handle call
 %%--------------------------------------------------------------------
 
--spec handle_call(Req :: term(), channel())
-      -> {reply, Reply :: term(), channel()}
-       | {shutdown, Reason :: term(), Reply :: term(), channel()}
-       | {shutdown, Reason :: term(), Reply :: term(),
-          emqx_types:packet(), channel()}.
-handle_call({subscribe, Topic, SubOpts}, Channel) ->
+-spec handle_call(Req :: term(), From :: term(), channel())
+    -> {reply, Reply :: term(), channel()}
+     | {reply, Reply :: term(), replies(), channel()}
+     | {shutdown, Reason :: term(), Reply :: term(), channel()}
+     | {shutdown, Reason :: term(), Reply :: term(),
+        emqx_types:packet(), channel()}.
+handle_call({subscribe, Topic, SubOpts}, _From, Channel) ->
     %% XXX: Only support short_topic_name
     SubProps = maps:get(sub_props, SubOpts, #{}),
     case maps:get(subtype, SubProps, short_topic_name) of
@@ -1141,26 +1142,26 @@ handle_call({subscribe, Topic, SubOpts}, Channel) ->
             reply({error, only_support_short_name_topic}, Channel)
     end;
 
-handle_call({unsubscribe, Topic}, Channel) ->
+handle_call({unsubscribe, Topic}, _From, Channel) ->
     TopicFilters = [emqx_topic:parse(Topic)],
     {ok, _, NChannel} = do_unsubscribe(TopicFilters, Channel),
     reply(ok, NChannel);
 
-handle_call(subscriptions, Channel = #channel{session = Session}) ->
+handle_call(subscriptions, _From, Channel = #channel{session = Session}) ->
     reply(maps:to_list(emqx_session:info(subscriptions, Session)), Channel);
 
-handle_call(kick, Channel) ->
+handle_call(kick, _From, Channel) ->
     NChannel = ensure_disconnected(kicked, Channel),
     shutdown_and_reply(kicked, ok, NChannel);
 
-handle_call(discard, Channel) ->
+handle_call(discard, _From, Channel) ->
     shutdown_and_reply(discarded, ok, Channel);
 
 %% XXX: No Session Takeover
-%handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
+%handle_call({takeover, 'begin'}, _From, Channel = #channel{session = Session}) ->
 %    reply(Session, Channel#channel{takeover = true});
 %
-%handle_call({takeover, 'end'}, Channel = #channel{session  = Session,
+%handle_call({takeover, 'end'}, _From, Channel = #channel{session  = Session,
 %                                                  pendings = Pendings}) ->
 %    ok = emqx_session:takeover(Session),
 %    %% TODO: Should not drain deliver here (side effect)
@@ -1168,16 +1169,16 @@ handle_call(discard, Channel) ->
 %    AllPendings = lists:append(Delivers, Pendings),
 %    shutdown_and_reply(takeovered, AllPendings, Channel);
 
-%handle_call(list_authz_cache, Channel) ->
+%handle_call(list_authz_cache, _From, Channel) ->
 %    {reply, emqx_authz_cache:list_authz_cache(), Channel};
 
 %% XXX: No Quota Now
-% handle_call({quota, Policy}, Channel) ->
+% handle_call({quota, Policy}, _From, Channel) ->
 %     Zone = info(zone, Channel),
 %     Quota = emqx_limiter:init(Zone, Policy),
 %     reply(ok, Channel#channel{quota = Quota});
 
-handle_call(Req, Channel) ->
+handle_call(Req, _From, Channel) ->
     ?LOG(error, "Unexpected call: ~p", [Req]),
     reply(ignored, Channel).
 

+ 19 - 19
apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl

@@ -39,7 +39,7 @@
         , set_conn_state/2
         ]).
 
--export([ handle_call/2
+-export([ handle_call/3
         , handle_cast/2
         , handle_info/2
         ]).
@@ -586,10 +586,10 @@ do_subscribe([{ParsedTopic, SubOpts0}|More],
 %%--------------------------------------------------------------------
 
 -spec(handle_out(atom(), term(), channel())
-      -> {ok, channel()}
-       | {ok, replies(), channel()}
-       | {shutdown, Reason :: term(), channel()}
-       | {shutdown, Reason :: term(), replies(), channel()}).
+    -> {ok, channel()}
+     | {ok, replies(), channel()}
+     | {shutdown, Reason :: term(), channel()}
+     | {shutdown, Reason :: term(), replies(), channel()}).
 
 handle_out(connerr, {Headers, ReceiptId, ErrMsg}, Channel) ->
     Frame = error_frame(Headers, ReceiptId, ErrMsg),
@@ -620,11 +620,12 @@ handle_out(receipt, ReceiptId, Channel) ->
 %% Handle call
 %%--------------------------------------------------------------------
 
--spec(handle_call(Req :: term(), channel())
-      -> {reply, Reply :: term(), channel()}
-       | {shutdown, Reason :: term(), Reply :: term(), channel()}
-       | {shutdown, Reason :: term(), Reply :: term(), stomp_frame(), channel()}).
-handle_call({subscribe, Topic, SubOpts},
+-spec(handle_call(Req :: term(), From :: term(), channel())
+    -> {reply, Reply :: term(), channel()}
+     | {reply, Reply :: term(), replies(), channel()}
+     | {shutdown, Reason :: term(), Reply :: term(), channel()}
+     | {shutdown, Reason :: term(), Reply :: term(), stomp_frame(), channel()}).
+handle_call({subscribe, Topic, SubOpts}, _From,
             Channel = #channel{
                          subscriptions = Subs
                         }) ->
@@ -653,7 +654,7 @@ handle_call({subscribe, Topic, SubOpts},
             end
     end;
 
-handle_call({unsubscribe, Topic},
+handle_call({unsubscribe, Topic}, _From,
             Channel = #channel{
                          ctx = Ctx,
                          clientinfo = ClientInfo = #{mountpoint := Mountpoint},
@@ -670,27 +671,27 @@ handle_call({unsubscribe, Topic},
          );
 
 %% Reply :: [{emqx_types:topic(), emqx_types:subopts()}]
-handle_call(subscriptions, Channel = #channel{subscriptions = Subs}) ->
+handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subs}) ->
     Reply = lists:map(
               fun({_SubId, Topic, _Ack, SubOpts}) ->
                 {Topic, SubOpts}
               end, Subs),
     reply(Reply, Channel);
 
-handle_call(kick, Channel) ->
+handle_call(kick, _From, Channel) ->
     NChannel = ensure_disconnected(kicked, Channel),
     Frame = error_frame(undefined, <<"Kicked out">>),
     shutdown_and_reply(kicked, ok, Frame, NChannel);
 
-handle_call(discard, Channel) ->
+handle_call(discard, _From, Channel) ->
     Frame = error_frame(undefined, <<"Discarded">>),
     shutdown_and_reply(discarded, ok, Frame, Channel);
 
 %% XXX: No Session Takeover
-%handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
+%handle_call({takeover, 'begin'}, _From, Channel = #channel{session = Session}) ->
 %    reply(Session, Channel#channel{takeover = true});
 %
-%handle_call({takeover, 'end'}, Channel = #channel{session  = Session,
+%handle_call({takeover, 'end'}, _From, Channel = #channel{session  = Session,
 %                                                  pendings = Pendings}) ->
 %    ok = emqx_session:takeover(Session),
 %    %% TODO: Should not drain deliver here (side effect)
@@ -698,7 +699,7 @@ handle_call(discard, Channel) ->
 %    AllPendings = lists:append(Delivers, Pendings),
 %    shutdown_and_reply(takeovered, AllPendings, Channel);
 
-handle_call(list_authz_cache, Channel) ->
+handle_call(list_authz_cache, _From, Channel) ->
     %% This won't work
     {reply, emqx_authz_cache:list_authz_cache(), Channel};
 
@@ -708,11 +709,10 @@ handle_call(list_authz_cache, Channel) ->
 %     Quota = emqx_limiter:init(Zone, Policy),
 %     reply(ok, Channel#channel{quota = Quota});
 
-handle_call(Req, Channel) ->
+handle_call(Req, _From, Channel) ->
     ?LOG(error, "Unexpected call: ~p", [Req]),
     reply(ignored, Channel).
 
-
 %%--------------------------------------------------------------------
 %% Handle cast
 %%--------------------------------------------------------------------