Forráskód Böngészése

Add message handler callbacks option to emqx_client

In this commit, msg_handler option is added to emqx_client.
so the caller can provide callbacks to handle puback, publish,
as well as disconnected events instead of always delivered
as message like Owner ! {publish, Msg} to the owner process.

This is to make it ready to implement emqx_portal_connect on
top of emqx_client.
spring2maz 7 éve
szülő
commit
141af0d02c
1 módosított fájl, 59 hozzáadás és 37 törlés
  1. 59 37
      src/emqx_client.erl

+ 59 - 37
src/emqx_client.erl

@@ -59,7 +59,7 @@
 
 -define(RESPONSE_TIMEOUT_SECONDS, timer:seconds(5)).
 
--define(NO_HANDLER, undefined).
+-define(NO_REQ_HANDLER, undefined).
 
 -define(NO_GROUP, <<>>).
 
@@ -67,10 +67,23 @@
 
 -type(host() :: inet:ip_address() | inet:hostname()).
 
--type corr_data() :: binary().
+-type(corr_data() :: binary()).
+
+%% NOTE: Message handler is different from request handler.
+%% Message handler is a set of callbacks defined to handle MQTT messages as well as
+%% the disconnect event.
+%% Request handler is a callback to handle received MQTT message as in 'request',
+%% and publish another MQTT message back to the defined topic as in 'response'.
+%% `owner' and `msg_handler' has no effect when `request_handler' is set.
+-define(NO_MSG_HDLR, undefined).
+-type(msg_handler() :: #{puback := fun((_) -> any()),
+                         publish := fun((emqx_types:message()) -> any()),
+                         disconnected := fun(({reason_code(), _Properties :: term()}) -> any())
+                        }).
 
 -type(option() :: {name, atom()}
                 | {owner, pid()}
+                | {msg_handler, msg_handler()}
                 | {host, host()}
                 | {hosts, [{host(), inet:port_number()}]}
                 | {port, inet:port_number()}
@@ -102,6 +115,7 @@
 
 -record(state, {name            :: atom(),
                 owner           :: pid(),
+                msg_handler     :: ?NO_MSG_HDLR | msg_handler(),
                 host            :: host(),
                 port            :: inet:port_number(),
                 hosts           :: [{host(), inet:port_number()}],
@@ -497,7 +511,7 @@ init([Options]) ->
                                  auto_ack        = true,
                                  ack_timeout     = ?DEFAULT_ACK_TIMEOUT,
                                  retry_interval  = 0,
-                                 request_handler = ?NO_HANDLER,
+                                 request_handler = ?NO_REQ_HANDLER,
                                  connect_timeout = ?DEFAULT_CONNECT_TIMEOUT,
                                  last_packet_id  = 1}),
     {ok, initialized, init_parse_state(State)}.
@@ -516,6 +530,8 @@ init([{name, Name} | Opts], State) ->
 init([{owner, Owner} | Opts], State) when is_pid(Owner) ->
     link(Owner),
     init(Opts, State#state{owner = Owner});
+init([{msg_handler, Hdlr} | Opts], State) ->
+    init(Opts, State#state{msg_handler = Hdlr});
 init([{host, Host} | Opts], State) ->
     init(Opts, State#state{host = Host});
 init([{port, Port} | Opts], State) ->
@@ -857,12 +873,12 @@ connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) ->
     publish_process(?QOS_2, Packet, State);
 
 connected(cast, ?PUBACK_PACKET(PacketId, ReasonCode, Properties),
-          State = #state{owner = Owner, inflight = Inflight}) ->
+          State = #state{inflight = Inflight}) ->
     case emqx_inflight:lookup(PacketId, Inflight) of
         {value, {publish, #mqtt_msg{packet_id = PacketId}, _Ts}} ->
-            Owner ! {puback, #{packet_id   => PacketId,
-                               reason_code => ReasonCode,
-                               properties  => Properties}},
+            ok = eval_msg_handler(State, puback, #{packet_id   => PacketId,
+                                                   reason_code => ReasonCode,
+                                                   properties  => Properties}),
             {keep_state, State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}};
         none ->
             emqx_logger:warning("Unexpected PUBACK: ~p", [PacketId]),
@@ -899,12 +915,12 @@ connected(cast, ?PUBREL_PACKET(PacketId),
      end;
 
 connected(cast, ?PUBCOMP_PACKET(PacketId, ReasonCode, Properties),
-          State = #state{owner = Owner, inflight = Inflight}) ->
+          State = #state{inflight = Inflight}) ->
     case emqx_inflight:lookup(PacketId, Inflight) of
         {value, {pubrel, _PacketId, _Ts}} ->
-            Owner ! {puback, #{packet_id   => PacketId,
-                               reason_code => ReasonCode,
-                               properties  => Properties}},
+            ok = eval_msg_handler(State, puback, #{packet_id   => PacketId,
+                                                   reason_code => ReasonCode,
+                                                   properties  => Properties}),
             {keep_state, State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}};
         none ->
             emqx_logger:warning("Unexpected PUBCOMP Packet: ~p", [PacketId]),
@@ -943,9 +959,8 @@ connected(cast, ?PACKET(?PINGRESP), State) ->
         false -> {keep_state, State}
     end;
 
-connected(cast, ?DISCONNECT_PACKET(ReasonCode, Properties),
-          State = #state{owner = Owner}) ->
-    Owner ! {disconnected, ReasonCode, Properties},
+connected(cast, ?DISCONNECT_PACKET(ReasonCode, Properties), State) ->
+    ok = eval_msg_handler(State, disconnected, {ReasonCode, Properties}),
     {stop, disconnected, State};
 
 connected(info, {timeout, _TRef, keepalive}, State = #state{force_ping = true}) ->
@@ -1101,8 +1116,8 @@ assign_id(?NO_CLIENT_ID, Props) ->
 assign_id(Id, _Props) ->
     Id.
 
-publish_process(?QOS_1, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), State = #state{auto_ack = AutoAck}) ->
-    _ = deliver(packet_to_msg(Packet), State),
+publish_process(?QOS_1, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), State0 = #state{auto_ack = AutoAck}) ->
+    State = deliver(packet_to_msg(Packet), State0),
     case AutoAck of
         true  -> send_puback(?PUBACK_PACKET(PacketId), State);
         false -> {keep_state, State}
@@ -1116,18 +1131,11 @@ publish_process(?QOS_2, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId),
         Stop -> Stop
     end.
 
-response_publish(undefined, State, _QoS, _Payload) ->
-    State;
-response_publish(Properties, State = #state{request_handler = RequestHandler}, QoS, Payload) ->
-    case maps:find('Response-Topic', Properties) of
-        {ok, ResponseTopic} ->
-            case RequestHandler of
-                ?NO_HANDLER -> State;
-                _ -> do_publish(ResponseTopic, Properties, State, QoS, Payload)
-            end;
-        _ ->
-            State
-    end.
+response_publish(#{'Response-Topic' := ResponseTopic} = Properties,
+                 State = #state{request_handler = RequestHandler}, QoS, Payload)
+      when RequestHandler =/= ?NO_REQ_HANDLER ->
+    do_publish(ResponseTopic, Properties, State, QoS, Payload);
+response_publish(_Properties, State, _QoS, _Payload) -> State.
 
 do_publish(ResponseTopic, Properties, State = #state{request_handler = RequestHandler}, ?QOS_0, Payload) ->
         Msg = #mqtt_msg{qos     = ?QOS_0,
@@ -1251,19 +1259,33 @@ retry_send(pubrel, PacketId, Now, State = #state{inflight = Inflight}) ->
             Error
     end.
 
+deliver(_Msg, State = #state{request_handler = Hdlr}) when Hdlr =/= ?NO_REQ_HANDLER ->
+    %% message has been terminated by request handler, hence should not continue processing
+    State;
 deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId,
                   topic = Topic, props = Props, payload = Payload},
-        State = #state{owner = Owner, request_handler = RequestHandler}) ->
-    case RequestHandler of
-        ?NO_HANDLER ->
-            Owner ! {publish, #{qos => QoS, dup => Dup, retain => Retain, packet_id => PacketId,
-                                topic => Topic, properties => Props, payload => Payload,
-                                client_pid => self()}};
-        _ ->
-            ok
-    end,
+        State) ->
+    Msg = #{qos => QoS, dup => Dup, retain => Retain, packet_id => PacketId,
+            topic => Topic, properties => Props, payload => Payload,
+            client_pid => self()},
+    ok = eval_msg_handler(State, publish, Msg),
     State.
 
+eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER,
+                        owner = Owner},
+                 disconnected, {ReasonCode, Properties}) ->
+    %% Special handling for disconnected message when there is no handler callback
+    Owner ! {disconnected, ReasonCode, Properties},
+    ok;
+eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER,
+                        owner = Owner}, Kind, Msg) ->
+    Owner ! {Kind, Msg},
+    ok;
+eval_msg_handler(#state{msg_handler = Handler}, Kind, Msg) ->
+    F = maps:get(Kind, Handler),
+    _ = F(Msg),
+    ok.
+
 packet_to_msg(#mqtt_packet{header   = #mqtt_packet_header{type   = ?PUBLISH,
                                                           dup    = Dup,
                                                           qos    = QoS,