Browse Source

chore(emqx_channel): use macros for reply construction

Ilya Averyanov 2 years ago
parent
commit
39a48179ea

+ 5 - 0
apps/emqx/include/emqx_channel.hrl

@@ -41,4 +41,9 @@
     will_msg
     will_msg
 ]).
 ]).
 
 
+-define(REPLY_OUTGOING(Packets), {outgoing, Packets}).
+-define(REPLY_CONNACK(Packet), {connack, Packet}).
+-define(REPLY_EVENT(StateOrEvent), {event, StateOrEvent}).
+-define(REPLY_CLOSE(Reason), {close, Reason}).
+
 -define(EXPIRE_INTERVAL_INFINITE, 4294967295000).
 -define(EXPIRE_INTERVAL_INFINITE, 4294967295000).

+ 7 - 6
apps/emqx/src/emqx_channel.erl

@@ -122,6 +122,7 @@
 -type reply() ::
 -type reply() ::
     {outgoing, emqx_types:packet()}
     {outgoing, emqx_types:packet()}
     | {outgoing, [emqx_types:packet()]}
     | {outgoing, [emqx_types:packet()]}
+    | {connack, emqx_types:packet()}
     | {event, conn_state() | updated}
     | {event, conn_state() | updated}
     | {close, Reason :: atom()}.
     | {close, Reason :: atom()}.
 
 
@@ -1023,7 +1024,7 @@ handle_out(publish, [], Channel) ->
     {ok, Channel};
     {ok, Channel};
 handle_out(publish, Publishes, Channel) ->
 handle_out(publish, Publishes, Channel) ->
     {Packets, NChannel} = do_deliver(Publishes, Channel),
     {Packets, NChannel} = do_deliver(Publishes, Channel),
-    {ok, {outgoing, Packets}, NChannel};
+    {ok, ?REPLY_OUTGOING(Packets), NChannel};
 handle_out(puback, {PacketId, ReasonCode}, Channel) ->
 handle_out(puback, {PacketId, ReasonCode}, Channel) ->
     {ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel};
     {ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel};
 handle_out(pubrec, {PacketId, ReasonCode}, Channel) ->
 handle_out(pubrec, {PacketId, ReasonCode}, Channel) ->
@@ -1048,7 +1049,7 @@ handle_out(disconnect, {ReasonCode, ReasonName}, Channel) ->
     handle_out(disconnect, {ReasonCode, ReasonName, #{}}, Channel);
     handle_out(disconnect, {ReasonCode, ReasonName, #{}}, Channel);
 handle_out(disconnect, {ReasonCode, ReasonName, Props}, Channel = ?IS_MQTT_V5) ->
 handle_out(disconnect, {ReasonCode, ReasonName, Props}, Channel = ?IS_MQTT_V5) ->
     Packet = ?DISCONNECT_PACKET(ReasonCode, Props),
     Packet = ?DISCONNECT_PACKET(ReasonCode, Props),
-    {ok, [{outgoing, Packet}, {close, ReasonName}], Channel};
+    {ok, [?REPLY_OUTGOING(Packet), {close, ReasonName}], Channel};
 handle_out(disconnect, {_ReasonCode, ReasonName, _Props}, Channel) ->
 handle_out(disconnect, {_ReasonCode, ReasonName, _Props}, Channel) ->
     {ok, {close, ReasonName}, Channel};
     {ok, {close, ReasonName}, Channel};
 handle_out(auth, {ReasonCode, Properties}, Channel) ->
 handle_out(auth, {ReasonCode, Properties}, Channel) ->
@@ -1062,7 +1063,7 @@ handle_out(Type, Data, Channel) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 return_connack(AckPacket, Channel) ->
 return_connack(AckPacket, Channel) ->
-    Replies = [{event, connected}, {connack, AckPacket}],
+    Replies = [?REPLY_EVENT(connected), ?REPLY_CONNACK(AckPacket)],
     case maybe_resume_session(Channel) of
     case maybe_resume_session(Channel) of
         ignore ->
         ignore ->
             {ok, Replies, Channel};
             {ok, Replies, Channel};
@@ -1073,7 +1074,7 @@ return_connack(AckPacket, Channel) ->
                 session = NSession
                 session = NSession
             },
             },
             {Packets, NChannel2} = do_deliver(Publishes, NChannel1),
             {Packets, NChannel2} = do_deliver(Publishes, NChannel1),
-            Outgoing = [{outgoing, Packets} || length(Packets) > 0],
+            Outgoing = [?REPLY_OUTGOING(Packets) || length(Packets) > 0],
             {ok, Replies ++ Outgoing, NChannel2}
             {ok, Replies ++ Outgoing, NChannel2}
     end.
     end.
 
 
@@ -1121,7 +1122,7 @@ do_deliver(Publishes, Channel) when is_list(Publishes) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 return_sub_unsub_ack(Packet, Channel) ->
 return_sub_unsub_ack(Packet, Channel) ->
-    {ok, [{outgoing, Packet}, {event, updated}], Channel}.
+    {ok, [?REPLY_OUTGOING(Packet), ?REPLY_EVENT(updated)], Channel}.
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Handle call
 %% Handle call
@@ -1235,7 +1236,7 @@ handle_info(
 ->
 ->
     Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
     Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
     case maybe_shutdown(Reason, Channel1) of
     case maybe_shutdown(Reason, Channel1) of
-        {ok, Channel2} -> {ok, {event, disconnected}, Channel2};
+        {ok, Channel2} -> {ok, ?REPLY_EVENT(disconnected), Channel2};
         Shutdown -> Shutdown
         Shutdown -> Shutdown
     end;
     end;
 handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
 handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->

+ 4 - 2
apps/emqx_ft/src/emqx_ft.erl

@@ -18,7 +18,9 @@
 
 
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx/include/emqx_channel.hrl").
 -include_lib("emqx/include/emqx_hooks.hrl").
 -include_lib("emqx/include/emqx_hooks.hrl").
+
 -include_lib("snabbkaffe/include/trace.hrl").
 -include_lib("snabbkaffe/include/trace.hrl").
 
 
 -export([
 -export([
@@ -164,7 +166,7 @@ on_channel_unregistered(ChannelPid) ->
 on_client_timeout(_TRef, ?FT_EVENT({MRef, PacketId}), Acc) ->
 on_client_timeout(_TRef, ?FT_EVENT({MRef, PacketId}), Acc) ->
     _ = erlang:demonitor(MRef, [flush]),
     _ = erlang:demonitor(MRef, [flush]),
     _ = emqx_ft_async_reply:take_by_mref(MRef),
     _ = emqx_ft_async_reply:take_by_mref(MRef),
-    {ok, [{outgoing, ?PUBACK_PACKET(PacketId, ?RC_UNSPECIFIED_ERROR)} | Acc]};
+    {ok, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, ?RC_UNSPECIFIED_ERROR)) | Acc]};
 on_client_timeout(_TRef, _Event, Acc) ->
 on_client_timeout(_TRef, _Event, Acc) ->
     {ok, Acc}.
     {ok, Acc}.
 
 
@@ -172,7 +174,7 @@ on_process_down(MRef, _Pid, Reason, Acc) ->
     case emqx_ft_async_reply:take_by_mref(MRef) of
     case emqx_ft_async_reply:take_by_mref(MRef) of
         {ok, PacketId, TRef} ->
         {ok, PacketId, TRef} ->
             _ = emqx_utils:cancel_timer(TRef),
             _ = emqx_utils:cancel_timer(TRef),
-            {ok, [{outgoing, ?PUBACK_PACKET(PacketId, reason_to_rc(Reason))} | Acc]};
+            {ok, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, reason_to_rc(Reason))) | Acc]};
         not_found ->
         not_found ->
             {ok, Acc}
             {ok, Acc}
     end.
     end.