|
|
@@ -17,12 +17,9 @@
|
|
|
-behaviour(gen_statem).
|
|
|
|
|
|
-include("types.hrl").
|
|
|
--include("emqx_mqtt.hrl").
|
|
|
-include("emqx_client.hrl").
|
|
|
|
|
|
-export([start_link/0, start_link/1]).
|
|
|
--export([request/5, request/6, request_async/7, receive_response/3]).
|
|
|
--export([set_request_handler/2, sub_request_topic/3, sub_request_topic/4]).
|
|
|
-export([connect/1]).
|
|
|
-export([subscribe/2, subscribe/3, subscribe/4]).
|
|
|
-export([publish/2, publish/3, publish/4, publish/5]).
|
|
|
@@ -41,9 +38,7 @@
|
|
|
-export([initialized/3, waiting_for_connack/3, connected/3, inflight_full/3]).
|
|
|
-export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]).
|
|
|
|
|
|
--export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0,
|
|
|
- request_input/0, response_payload/0, request_handler/0,
|
|
|
- corr_data/0, mqtt_msg/0]).
|
|
|
+-export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0, mqtt_msg/0]).
|
|
|
|
|
|
-export_type([host/0, option/0]).
|
|
|
|
|
|
@@ -57,24 +52,12 @@
|
|
|
-define(WILL_MSG(QoS, Retain, Topic, Props, Payload),
|
|
|
#mqtt_msg{qos = QoS, retain = Retain, topic = Topic, props = Props, payload = Payload}).
|
|
|
|
|
|
--define(RESPONSE_TIMEOUT_SECONDS, timer:seconds(5)).
|
|
|
-
|
|
|
--define(NO_REQ_HANDLER, undefined).
|
|
|
-
|
|
|
--define(NO_GROUP, <<>>).
|
|
|
-
|
|
|
-define(NO_CLIENT_ID, <<>>).
|
|
|
|
|
|
-type(host() :: inet:ip_address() | inet:hostname()).
|
|
|
|
|
|
--type(corr_data() :: binary()).
|
|
|
-
|
|
|
-%% NOTE: Message handler is different from request handler.
|
|
|
-%% Message handler is a set of callbacks defined to handle MQTT messages as well as
|
|
|
-%% the disconnect event.
|
|
|
-%% Request handler is a callback to handle received MQTT message as in 'request',
|
|
|
-%% and publish another MQTT message back to the defined topic as in 'response'.
|
|
|
-%% `owner' and `msg_handler' has no effect when `request_handler' is set.
|
|
|
+%% Message handler is a set of callbacks defined to handle MQTT messages
|
|
|
+%% as well as the disconnect event.
|
|
|
-define(NO_MSG_HDLR, undefined).
|
|
|
-type(msg_handler() :: #{puback := fun((_) -> any()),
|
|
|
publish := fun((emqx_types:message()) -> any()),
|
|
|
@@ -100,7 +83,6 @@
|
|
|
| {keepalive, non_neg_integer()}
|
|
|
| {max_inflight, pos_integer()}
|
|
|
| {retry_interval, timeout()}
|
|
|
- | {request_handler, request_handler()}
|
|
|
| {will_topic, iodata()}
|
|
|
| {will_payload, iodata()}
|
|
|
| {will_retain, boolean()}
|
|
|
@@ -146,7 +128,6 @@
|
|
|
ack_timer :: reference(),
|
|
|
retry_interval :: pos_integer(),
|
|
|
retry_timer :: reference(),
|
|
|
- request_handler :: request_handler(),
|
|
|
session_present :: boolean(),
|
|
|
last_packet_id :: packet_id(),
|
|
|
parse_state :: emqx_frame:state()}).
|
|
|
@@ -176,35 +157,10 @@
|
|
|
|
|
|
-type(subscribe_ret() :: {ok, properties(), [reason_code()]} | {error, term()}).
|
|
|
|
|
|
--type(request_input() :: binary()).
|
|
|
-
|
|
|
--type(response_payload() :: binary()).
|
|
|
-
|
|
|
--type(request_handler() :: fun((request_input()) -> response_payload())).
|
|
|
-
|
|
|
--type(group() :: binary()).
|
|
|
-
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% API
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
-%% @doc Swap in a new request handler on the fly.
|
|
|
--spec(set_request_handler(client(), request_handler()) -> ok).
|
|
|
-set_request_handler(Responser, RequestHandler) ->
|
|
|
- gen_statem:call(Responser, {set_request_handler, RequestHandler}).
|
|
|
-
|
|
|
-%% @doc Subscribe to request topic.
|
|
|
--spec(sub_request_topic(client(), qos(), topic()) -> ok).
|
|
|
-sub_request_topic(Client, QoS, Topic) ->
|
|
|
- sub_request_topic(Client, QoS, Topic, ?NO_GROUP).
|
|
|
-
|
|
|
-%% @doc Share-subscribe to request topic.
|
|
|
--spec(sub_request_topic(client(), qos(), topic(), group()) -> ok).
|
|
|
-sub_request_topic(Client, QoS, Topic, Group) ->
|
|
|
- Properties = get_properties(Client),
|
|
|
- NewTopic = make_req_rsp_topic(Properties, Topic, Group),
|
|
|
- subscribe_req_rsp_topic(Client, QoS, NewTopic).
|
|
|
-
|
|
|
-spec(start_link() -> gen_statem:start_ret()).
|
|
|
start_link() -> start_link([]).
|
|
|
|
|
|
@@ -293,76 +249,6 @@ parse_subopt([{nl, false} | Opts], Result) ->
|
|
|
parse_subopt([{qos, QoS} | Opts], Result) ->
|
|
|
parse_subopt(Opts, Result#{qos := ?QOS_I(QoS)}).
|
|
|
|
|
|
--spec(request(client(), topic(), topic(), payload(), qos() | [pubopt()])
|
|
|
- -> ok | {ok, packet_id()} | {error, term()}).
|
|
|
-request(Client, ResponseTopic, RequestTopic, Payload, QoS) when is_binary(ResponseTopic), is_atom(QoS) ->
|
|
|
- request(Client, ResponseTopic, RequestTopic, Payload, [{qos, ?QOS_I(QoS)}]);
|
|
|
-request(Client, ResponseTopic, RequestTopic, Payload, QoS) when is_binary(ResponseTopic), ?IS_QOS(QoS) ->
|
|
|
- request(Client, ResponseTopic, RequestTopic, Payload, [{qos, QoS}]);
|
|
|
-request(Client, ResponseTopic, RequestTopic, Payload, Opts) when is_binary(ResponseTopic), is_list(Opts) ->
|
|
|
- request(Client, ResponseTopic, RequestTopic, Payload, Opts, _Properties = #{}).
|
|
|
-
|
|
|
-%% @doc Send a request to request topic and wait for response.
|
|
|
--spec(request(client(), topic(), topic(), payload(), [pubopt()], properties())
|
|
|
- -> {ok, response_payload()} | {error, term()}).
|
|
|
-request(Client, ResponseTopic, RequestTopic, Payload, Opts, Properties) ->
|
|
|
- CorrData = make_corr_data(),
|
|
|
- case request_async(Client, ResponseTopic, RequestTopic,
|
|
|
- Payload, Opts, Properties, CorrData) of
|
|
|
- ok -> receive_response(Client, CorrData, Opts);
|
|
|
- {error, Reason} -> {error, Reason}
|
|
|
- end.
|
|
|
-
|
|
|
-%% @doc Get client properties.
|
|
|
--spec(get_properties(client()) -> properties()).
|
|
|
-get_properties(Client) -> gen_statem:call(Client, get_properties, infinity).
|
|
|
-
|
|
|
-%% @doc Send a request, but do not wait for response.
|
|
|
-%% The caller should expect a `{publish, Response}' message,
|
|
|
-%% or call `receive_response/3' to receive the message.
|
|
|
--spec(request_async(client(), topic(), topic(), payload(),
|
|
|
- [pubopt()], properties(), corr_data()) -> ok | {error, any()}).
|
|
|
-request_async(Client, ResponseTopic, RequestTopic, Payload, Opts, Properties, CorrData)
|
|
|
- when is_binary(ResponseTopic),
|
|
|
- is_binary(RequestTopic),
|
|
|
- is_map(Properties),
|
|
|
- is_list(Opts) ->
|
|
|
- ok = emqx_mqtt_props:validate(Properties),
|
|
|
- Retain = proplists:get_bool(retain, Opts),
|
|
|
- QoS = ?QOS_I(proplists:get_value(qos, Opts, ?QOS_0)),
|
|
|
- ClientProperties = get_properties(Client),
|
|
|
- NewResponseTopic = make_req_rsp_topic(ClientProperties, ResponseTopic),
|
|
|
- NewRequestTopic = make_req_rsp_topic(ClientProperties, RequestTopic),
|
|
|
- %% This is perhaps not optimal to subscribe the response topic for
|
|
|
- %% each and every request even though the response topic is always the same
|
|
|
- ok = sub_response_topic(Client, QoS, NewResponseTopic),
|
|
|
- NewProperties = maps:merge(Properties, #{'Response-Topic' => NewResponseTopic,
|
|
|
- 'Correlation-Data' => CorrData}),
|
|
|
- case publish(Client, #mqtt_msg{qos = QoS,
|
|
|
- retain = Retain,
|
|
|
- topic = NewRequestTopic,
|
|
|
- props = NewProperties,
|
|
|
- payload = iolist_to_binary(Payload)}) of
|
|
|
- ok -> ok;
|
|
|
- {ok, _PacketId} -> ok; %% assume auto_ack
|
|
|
- {error, Reason} -> {error, Reason}
|
|
|
- end.
|
|
|
-
|
|
|
-%% @doc Block wait the response for a request sent earlier.
|
|
|
--spec(receive_response(client(), corr_data(), [pubopt()])
|
|
|
- -> {ok, response_payload()} | {error, any()}).
|
|
|
-receive_response(Client, CorrData, Opts) ->
|
|
|
- TimeOut = proplists:get_value(timeout, Opts, ?RESPONSE_TIMEOUT_SECONDS),
|
|
|
- MRef = erlang:monitor(process, Client),
|
|
|
- TRef = erlang:start_timer(TimeOut, self(), response),
|
|
|
- try
|
|
|
- receive_response(Client, CorrData, TRef, MRef)
|
|
|
- after
|
|
|
- erlang:cancel_timer(TRef),
|
|
|
- receive {timeout, TRef, _} -> ok after 0 -> ok end,
|
|
|
- erlang:demonitor(MRef, [flush])
|
|
|
- end.
|
|
|
-
|
|
|
-spec(publish(client(), topic(), payload()) -> ok | {error, term()}).
|
|
|
publish(Client, Topic, Payload) when is_binary(Topic) ->
|
|
|
publish(Client, #mqtt_msg{topic = Topic, qos = ?QOS_0, payload = iolist_to_binary(Payload)}).
|
|
|
@@ -511,7 +397,6 @@ init([Options]) ->
|
|
|
auto_ack = true,
|
|
|
ack_timeout = ?DEFAULT_ACK_TIMEOUT,
|
|
|
retry_interval = 0,
|
|
|
- request_handler = ?NO_REQ_HANDLER,
|
|
|
connect_timeout = ?DEFAULT_CONNECT_TIMEOUT,
|
|
|
last_packet_id = 1}),
|
|
|
{ok, initialized, init_parse_state(State)}.
|
|
|
@@ -616,8 +501,6 @@ init([{auto_ack, AutoAck} | Opts], State) when is_boolean(AutoAck) ->
|
|
|
init(Opts, State#state{auto_ack = AutoAck});
|
|
|
init([{retry_interval, I} | Opts], State) ->
|
|
|
init(Opts, State#state{retry_interval = timer:seconds(I)});
|
|
|
-init([{request_handler, Handler} | Opts], State) ->
|
|
|
- init(Opts, State#state{request_handler = Handler});
|
|
|
init([{bridge_mode, Mode} | Opts], State) when is_boolean(Mode) ->
|
|
|
init(Opts, State#state{bridge_mode = Mode});
|
|
|
init([_Opt | Opts], State) ->
|
|
|
@@ -756,15 +639,9 @@ connected({call, From}, pause, State) ->
|
|
|
connected({call, From}, resume, State) ->
|
|
|
{keep_state, State#state{paused = false}, [{reply, From, ok}]};
|
|
|
|
|
|
-connected({call, From}, get_properties, #state{properties = Properties}) ->
|
|
|
- {keep_state_and_data, [{reply, From, Properties}]};
|
|
|
-
|
|
|
connected({call, From}, client_id, #state{client_id = ClientId}) ->
|
|
|
{keep_state_and_data, [{reply, From, ClientId}]};
|
|
|
|
|
|
-connected({call, From}, {set_request_handler, RequestHandler}, State) ->
|
|
|
- {keep_state, State#state{request_handler = RequestHandler}, [{reply, From, ok}]};
|
|
|
-
|
|
|
connected({call, From}, SubReq = {subscribe, Properties, Topics},
|
|
|
State = #state{last_packet_id = PacketId, subscriptions = Subscriptions}) ->
|
|
|
case send(?SUBSCRIBE_PACKET(PacketId, Properties, Topics), State) of
|
|
|
@@ -846,25 +723,12 @@ connected(cast, {pubcomp, PacketId, ReasonCode, Properties}, State) ->
|
|
|
connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), #state{paused = true}) ->
|
|
|
keep_state_and_data;
|
|
|
|
|
|
-connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, Properties, Payload),
|
|
|
- State) when Properties =/= undefined ->
|
|
|
- NewState = response_publish(Properties, State, ?QOS_0, Payload),
|
|
|
- {keep_state, deliver(packet_to_msg(Packet), NewState)};
|
|
|
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), State) ->
|
|
|
{keep_state, deliver(packet_to_msg(Packet), State)};
|
|
|
|
|
|
-connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, _Topic, _PacketId, Properties, Payload), State)
|
|
|
- when Properties =/= undefined ->
|
|
|
- NewState = response_publish(Properties, State, ?QOS_1, Payload),
|
|
|
- publish_process(?QOS_1, Packet, NewState);
|
|
|
-
|
|
|
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) ->
|
|
|
publish_process(?QOS_1, Packet, State);
|
|
|
|
|
|
-connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _Topic, _PacketId, Properties, Payload), State)
|
|
|
- when Properties =/= undefined ->
|
|
|
- NewState = response_publish(Properties, State, ?QOS_2, Payload),
|
|
|
- publish_process(?QOS_2, Packet, NewState);
|
|
|
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) ->
|
|
|
publish_process(?QOS_2, Packet, State);
|
|
|
|
|
|
@@ -1086,57 +950,6 @@ delete_inflight_when_full(Packet, State0) ->
|
|
|
false -> {next_state, connected, State}
|
|
|
end.
|
|
|
|
|
|
-%% Subscribe to response topic.
|
|
|
--spec(sub_response_topic(client(), qos(), topic()) -> ok).
|
|
|
-sub_response_topic(Client, QoS, Topic) when is_binary(Topic) ->
|
|
|
- subscribe_req_rsp_topic(Client, QoS, Topic).
|
|
|
-
|
|
|
-receive_response(Client, CorrData, TRef, MRef) ->
|
|
|
- receive
|
|
|
- {publish, Response} ->
|
|
|
- {ok, Properties} = maps:find(properties, Response),
|
|
|
- case maps:find('Correlation-Data', Properties) of
|
|
|
- {ok, CorrData} ->
|
|
|
- maps:find(payload, Response);
|
|
|
- _ ->
|
|
|
- emqx_logger:debug("Discarded stale response: ~p", [Response]),
|
|
|
- receive_response(Client, CorrData, TRef, MRef)
|
|
|
- end;
|
|
|
- {timeout, TRef, response} ->
|
|
|
- {error, timeout};
|
|
|
- {'DOWN', MRef, process, _, _} ->
|
|
|
- {error, client_down}
|
|
|
- end.
|
|
|
-
|
|
|
-%% Make a unique correlation data for each request.
|
|
|
-%% It has to be unique because stale responses should be discarded.
|
|
|
-make_corr_data() -> term_to_binary(make_ref()).
|
|
|
-
|
|
|
-%% Shared function for request and response topic subscription.
|
|
|
-subscribe_req_rsp_topic(Client, QoS, Topic) ->
|
|
|
- %% It is a Protocol Error to set the No Local bit to 1 on a Shared Subscription
|
|
|
- {ok, _Props, _QoS} = subscribe(Client, [{Topic, [{rh, 2}, {rap, false},
|
|
|
- {nl, not ?IS_SHARE(Topic)},
|
|
|
- {qos, QoS}]}]),
|
|
|
- emqx_logger:debug("Subscribed to topic ~s", [Topic]),
|
|
|
- ok.
|
|
|
-
|
|
|
-%% Make a request or response topic.
|
|
|
-make_req_rsp_topic(Properties, Topic) ->
|
|
|
- make_req_rsp_topic(Properties, Topic, ?NO_GROUP).
|
|
|
-
|
|
|
-%% Same as make_req_rsp_topic/2, but allow shared subscription (for request topics)
|
|
|
-make_req_rsp_topic(Properties, Topic, Group) ->
|
|
|
- case maps:find('Response-Information', Properties) of
|
|
|
- {ok, ResponseInformation} when ResponseInformation =/= <<>> ->
|
|
|
- emqx_topic:join([req_rsp_topic_prefix(Group, ResponseInformation), Topic]);
|
|
|
- _ ->
|
|
|
- erlang:error(no_response_information)
|
|
|
- end.
|
|
|
-
|
|
|
-req_rsp_topic_prefix(?NO_GROUP, Prefix) -> Prefix;
|
|
|
-req_rsp_topic_prefix(Group, Prefix) -> ?SHARE(Group, Prefix).
|
|
|
-
|
|
|
assign_id(?NO_CLIENT_ID, Props) ->
|
|
|
case maps:find('Assigned-Client-Identifier', Props) of
|
|
|
{ok, Value} ->
|
|
|
@@ -1162,49 +975,6 @@ publish_process(?QOS_2, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId),
|
|
|
Stop -> Stop
|
|
|
end.
|
|
|
|
|
|
-response_publish(#{'Response-Topic' := ResponseTopic} = Properties,
|
|
|
- State = #state{request_handler = RequestHandler}, QoS, Payload)
|
|
|
- when RequestHandler =/= ?NO_REQ_HANDLER ->
|
|
|
- do_publish(ResponseTopic, Properties, State, QoS, Payload);
|
|
|
-response_publish(_Properties, State, _QoS, _Payload) -> State.
|
|
|
-
|
|
|
-do_publish(ResponseTopic, Properties, State = #state{request_handler = RequestHandler}, ?QOS_0, Payload) ->
|
|
|
- Msg = #mqtt_msg{qos = ?QOS_0,
|
|
|
- retain = false,
|
|
|
- topic = ResponseTopic,
|
|
|
- props = Properties,
|
|
|
- payload = RequestHandler(Payload)
|
|
|
- },
|
|
|
- case send(Msg, State) of
|
|
|
- {ok, NewState} -> NewState;
|
|
|
- _Error -> State
|
|
|
- end;
|
|
|
-do_publish(ResponseTopic, Properties, State = #state{request_handler = RequestHandler,
|
|
|
- inflight = Inflight,
|
|
|
- last_packet_id = PacketId},
|
|
|
- QoS, Payload)
|
|
|
- when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2)->
|
|
|
- case emqx_inflight:is_full(Inflight) of
|
|
|
- true ->
|
|
|
- emqx_logger:error("Inflight is full"),
|
|
|
- State;
|
|
|
- false ->
|
|
|
- Msg = #mqtt_msg{packet_id = PacketId,
|
|
|
- qos = QoS,
|
|
|
- retain = false,
|
|
|
- topic = ResponseTopic,
|
|
|
- props = Properties,
|
|
|
- payload = RequestHandler(Payload)},
|
|
|
- case send(Msg, State) of
|
|
|
- {ok, NewState} ->
|
|
|
- Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg, os:timestamp()}, Inflight),
|
|
|
- ensure_retry_timer(NewState#state{inflight = Inflight1});
|
|
|
- {error, Reason} ->
|
|
|
- emqx_logger:error("Send failed: ~p", [Reason]),
|
|
|
- State
|
|
|
- end
|
|
|
- end.
|
|
|
-
|
|
|
ensure_keepalive_timer(State = ?PROPERTY('Server-Keep-Alive', Secs)) ->
|
|
|
ensure_keepalive_timer(timer:seconds(Secs), State);
|
|
|
ensure_keepalive_timer(State = #state{keepalive = 0}) ->
|
|
|
@@ -1291,9 +1061,6 @@ retry_send(pubrel, PacketId, Now, State = #state{inflight = Inflight}) ->
|
|
|
Error
|
|
|
end.
|
|
|
|
|
|
-deliver(_Msg, State = #state{request_handler = Hdlr}) when Hdlr =/= ?NO_REQ_HANDLER ->
|
|
|
- %% message has been terminated by request handler, hence should not continue processing
|
|
|
- State;
|
|
|
deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId,
|
|
|
topic = Topic, props = Props, payload = Payload},
|
|
|
State) ->
|
|
|
@@ -1303,17 +1070,17 @@ deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId,
|
|
|
ok = eval_msg_handler(State, publish, Msg),
|
|
|
State.
|
|
|
|
|
|
-eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER,
|
|
|
+eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR,
|
|
|
owner = Owner},
|
|
|
disconnected, {ReasonCode, Properties}) ->
|
|
|
%% Special handling for disconnected message when there is no handler callback
|
|
|
Owner ! {disconnected, ReasonCode, Properties},
|
|
|
ok;
|
|
|
-eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER},
|
|
|
+eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR},
|
|
|
disconnected, _OtherReason) ->
|
|
|
%% do nothing to be backward compatible
|
|
|
ok;
|
|
|
-eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER,
|
|
|
+eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR,
|
|
|
owner = Owner}, Kind, Msg) ->
|
|
|
Owner ! {Kind, Msg},
|
|
|
ok;
|