emqx_request_handler.erl 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2019-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_request_handler).
  17. -export([start_link/4, stop/1]).
  18. -include_lib("emqx/include/emqx_mqtt.hrl").
  19. -type qos() :: emqx_types:qos_name() | emqx_types:qos().
  20. -type topic() :: emqx_types:topic().
  21. -type handler() :: fun((CorrData :: binary(), ReqPayload :: binary()) -> RspPayload :: binary()).
  22. -spec start_link(topic(), qos(), handler(), emqtt:options()) ->
  23. {ok, pid()} | {error, any()}.
  24. start_link(RequestTopic, QoS, RequestHandler, Options0) ->
  25. Parent = self(),
  26. MsgHandler = make_msg_handler(RequestHandler, Parent),
  27. Options = [{msg_handler, MsgHandler} | Options0],
  28. case emqtt:start_link(Options) of
  29. {ok, Pid} ->
  30. {ok, _} = emqtt:connect(Pid),
  31. try subscribe(Pid, RequestTopic, QoS) of
  32. ok -> {ok, Pid};
  33. {error, _} = Error -> Error
  34. catch
  35. C:E:S ->
  36. emqtt:stop(Pid),
  37. {error, {C, E, S}}
  38. end;
  39. {error, _} = Error ->
  40. Error
  41. end.
  42. stop(Pid) ->
  43. emqtt:disconnect(Pid).
  44. make_msg_handler(RequestHandler, Parent) ->
  45. #{
  46. publish => fun(Msg) -> handle_msg(Msg, RequestHandler, Parent) end,
  47. puback => fun(_Ack) -> ok end,
  48. disconnected => fun(_Reason) -> ok end
  49. }.
  50. handle_msg(ReqMsg, RequestHandler, Parent) ->
  51. #{qos := QoS, properties := Props, payload := ReqPayload} = ReqMsg,
  52. case maps:find('Response-Topic', Props) of
  53. {ok, RspTopic} when RspTopic =/= <<>> ->
  54. CorrData = maps:get('Correlation-Data', Props),
  55. RspProps = maps:without(['Response-Topic'], Props),
  56. RspPayload = RequestHandler(CorrData, ReqPayload),
  57. RspMsg = #mqtt_msg{
  58. qos = QoS,
  59. topic = RspTopic,
  60. props = RspProps,
  61. payload = RspPayload
  62. },
  63. logger:debug(
  64. "~p sending response msg to topic ~ts with~n"
  65. "corr-data=~p~npayload=~p",
  66. [?MODULE, RspTopic, CorrData, RspPayload]
  67. ),
  68. ok = send_response(RspMsg);
  69. _ ->
  70. Parent ! {discarded, ReqPayload},
  71. ok
  72. end.
  73. send_response(Msg) ->
  74. %% This function is evaluated by emqtt itself.
  75. %% hence delegate to another temp process for the loopback gen_statem call.
  76. Client = self(),
  77. _ = spawn_link(fun() ->
  78. case emqtt:publish(Client, Msg) of
  79. ok -> ok;
  80. {ok, _} -> ok;
  81. {error, Reason} -> exit({failed_to_publish_response, Reason})
  82. end
  83. end),
  84. ok.
  85. subscribe(Client, Topic, QoS) ->
  86. {ok, _Props, _QoS} =
  87. emqtt:subscribe(Client, [
  88. {Topic, [
  89. {rh, 2},
  90. {rap, false},
  91. {nl, true},
  92. {qos, QoS}
  93. ]}
  94. ]),
  95. ok.