emqx_request_handler.erl 3.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2019-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_request_handler).
  17. -export([start_link/4, stop/1]).
  18. -include_lib("emqx/include/emqx_mqtt.hrl").
  19. -type qos() :: emqx_mqtt_types:qos_name() | emqx_mqtt_types:qos().
  20. -type topic() :: emqx_topic:topic().
  21. -type handler() :: fun((CorrData :: binary(), ReqPayload :: binary()) -> RspPayload :: binary()).
  22. -spec start_link(topic(), qos(), handler(), emqtt:options()) ->
  23. {ok, pid()} | {error, any()}.
  24. start_link(RequestTopic, QoS, RequestHandler, Options0) ->
  25. Parent = self(),
  26. MsgHandler = make_msg_handler(RequestHandler, Parent),
  27. Options = [{msg_handler, MsgHandler} | Options0],
  28. case emqtt:start_link(Options) of
  29. {ok, Pid} ->
  30. {ok, _} = emqtt:connect(Pid),
  31. try subscribe(Pid, RequestTopic, QoS) of
  32. ok -> {ok, Pid};
  33. {error, _} = Error -> Error
  34. catch
  35. C : E : S ->
  36. emqtt:stop(Pid),
  37. {error, {C, E, S}}
  38. end;
  39. {error, _} = Error -> Error
  40. end.
  41. stop(Pid) ->
  42. emqtt:disconnect(Pid).
  43. make_msg_handler(RequestHandler, Parent) ->
  44. #{publish => fun(Msg) -> handle_msg(Msg, RequestHandler, Parent) end,
  45. puback => fun(_Ack) -> ok end,
  46. disconnected => fun(_Reason) -> ok end
  47. }.
  48. handle_msg(ReqMsg, RequestHandler, Parent) ->
  49. #{qos := QoS, properties := Props, payload := ReqPayload} = ReqMsg,
  50. case maps:find('Response-Topic', Props) of
  51. {ok, RspTopic} when RspTopic =/= <<>> ->
  52. CorrData = maps:get('Correlation-Data', Props),
  53. RspProps = maps:without(['Response-Topic'], Props),
  54. RspPayload = RequestHandler(CorrData, ReqPayload),
  55. RspMsg = #mqtt_msg{qos = QoS,
  56. topic = RspTopic,
  57. props = RspProps,
  58. payload = RspPayload
  59. },
  60. emqx_logger:debug("~p sending response msg to topic ~s with~n"
  61. "corr-data=~p~npayload=~p",
  62. [?MODULE, RspTopic, CorrData, RspPayload]),
  63. ok = send_response(RspMsg);
  64. _ ->
  65. Parent ! {discarded, ReqPayload},
  66. ok
  67. end.
  68. send_response(Msg) ->
  69. %% This function is evaluated by emqtt itself.
  70. %% hence delegate to another temp process for the loopback gen_statem call.
  71. Client = self(),
  72. _ = spawn_link(fun() ->
  73. case emqtt:publish(Client, Msg) of
  74. ok -> ok;
  75. {ok, _} -> ok;
  76. {error, Reason} -> exit({failed_to_publish_response, Reason})
  77. end
  78. end),
  79. ok.
  80. subscribe(Client, Topic, QoS) ->
  81. {ok, _Props, _QoS} =
  82. emqtt:subscribe(Client, [{Topic, [{rh, 2}, {rap, false},
  83. {nl, true}, {qos, QoS}]}]),
  84. ok.