emqx_hookpoints.erl 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_hookpoints).
  17. -include("logger.hrl").
  18. -type callback_result() :: stop | any().
  19. -type fold_callback_result(Acc) :: {stop, Acc} | {ok, Acc} | stop | any().
  20. -export_type([
  21. fold_callback_result/1,
  22. callback_result/0
  23. ]).
  24. -export([
  25. default_hookpoints/0,
  26. register_hookpoints/0,
  27. register_hookpoints/1,
  28. verify_hookpoint/1
  29. ]).
  30. %%-----------------------------------------------------------------------------
  31. %% Hookpoints
  32. %%-----------------------------------------------------------------------------
  33. -define(HOOKPOINTS, [
  34. 'client.connect',
  35. 'client.connack',
  36. 'client.connected',
  37. 'client.disconnected',
  38. 'client.authorize',
  39. 'client.check_authz_complete',
  40. 'client.check_authn_complete',
  41. 'client.authenticate',
  42. 'client.subscribe',
  43. 'client.unsubscribe',
  44. 'client.timeout',
  45. 'client.monitored_process_down',
  46. 'session.created',
  47. 'session.subscribed',
  48. 'session.unsubscribed',
  49. 'session.resumed',
  50. 'session.discarded',
  51. 'session.takenover',
  52. 'session.terminated',
  53. 'message.publish',
  54. 'message.puback',
  55. 'message.dropped',
  56. 'schema.validation_failed',
  57. 'message.delivered',
  58. 'message.acked',
  59. 'delivery.dropped',
  60. 'delivery.completed',
  61. 'cm.channel.unregistered',
  62. 'tls_handshake.psk_lookup'
  63. ]).
  64. %% Our template plugin used this hookpoints before its 5.1.0 version,
  65. %% so we keep them here
  66. -define(DEPRECATED_HOOKPOINTS, [
  67. %% This is a deprecated hookpoint renamed to 'client.authorize'
  68. 'client.check_acl',
  69. %% Misspelled hookpoint
  70. 'session.takeovered'
  71. ]).
  72. %%-----------------------------------------------------------------------------
  73. %% Callbacks
  74. %%-----------------------------------------------------------------------------
  75. %% Callback definitions are given for documentation purposes.
  76. %% Each hook callback implementation can also accept any number of custom arguments
  77. %% after the mandatory ones.
  78. %%
  79. %% By default, callbacks are executed in the channel process context.
  80. -callback 'client.connect'(emqx_types:conninfo(), Props) ->
  81. fold_callback_result(Props)
  82. when
  83. Props :: emqx_types:properties().
  84. -callback 'client.connack'(emqx_types:conninfo(), _Reason :: atom(), Props) ->
  85. fold_callback_result(Props)
  86. when
  87. Props :: emqx_types:properties().
  88. -callback 'client.connected'(emqx_types:clientinfo(), emqx_types:conninfo()) -> callback_result().
  89. -callback 'client.disconnected'(emqx_types:clientinfo(), _Reason :: atom(), emqx_types:conninfo()) ->
  90. callback_result().
  91. -callback 'client.authorize'(
  92. emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic(), allow | deny
  93. ) ->
  94. fold_callback_result(#{result := allow | deny, from => term()}).
  95. -callback 'client.check_authz_complete'(
  96. emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic(), allow | deny, _From :: term()
  97. ) ->
  98. callback_result().
  99. -callback 'client.authenticate'(emqx_types:clientinfo(), ignore) ->
  100. fold_callback_result(
  101. ignore
  102. | ok
  103. | {ok, map()}
  104. | {ok, map(), binary()}
  105. | {continue, map()}
  106. | {continue, binary(), map()}
  107. | {error, term()}
  108. ).
  109. -callback 'client.subscribe'(emqx_types:clientinfo(), emqx_types:properties(), TopicFilters) ->
  110. fold_callback_result(TopicFilters)
  111. when
  112. TopicFilters :: list({emqx_types:topic(), map()}).
  113. -callback 'client.unsubscribe'(emqx_types:clientinfo(), emqx_types:properties(), TopicFilters) ->
  114. fold_callback_result(TopicFilters)
  115. when
  116. TopicFilters :: list({emqx_types:topic(), map()}).
  117. -callback 'client.timeout'(_TimerReference :: reference(), _Msg :: term(), Replies) ->
  118. fold_callback_result(Replies)
  119. when
  120. Replies :: emqx_channel:replies().
  121. -callback 'client.monitored_process_down'(
  122. _MonitorRef :: reference(), _Pid :: pid(), _Reason :: term(), Replies
  123. ) ->
  124. fold_callback_result(Replies)
  125. when
  126. Replies :: emqx_channel:replies().
  127. -callback 'session.created'(emqx_types:clientinfo(), _SessionInfo :: emqx_types:infos()) ->
  128. callback_result().
  129. -callback 'session.subscribed'(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts()) ->
  130. callback_result().
  131. -callback 'session.unsubscribed'(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts()) ->
  132. callback_result().
  133. -callback 'session.resumed'(emqx_types:clientinfo(), _SessionInfo :: emqx_types:infos()) ->
  134. callback_result().
  135. -callback 'session.discarded'(emqx_types:clientinfo(), _SessionInfo :: emqx_types:infos()) ->
  136. callback_result().
  137. -callback 'session.takenover'(emqx_types:clientinfo(), _SessionInfo :: emqx_types:infos()) ->
  138. callback_result().
  139. -callback 'session.terminated'(
  140. emqx_types:clientinfo(), _Reason :: atom(), _SessionInfo :: emqx_types:infos()
  141. ) -> callback_result().
  142. -callback 'message.publish'(Msg) ->
  143. fold_callback_result(Msg)
  144. when
  145. Msg :: emqx_types:message().
  146. -callback 'message.puback'(
  147. emqx_types:packet_id(),
  148. emqx_types:message(),
  149. emqx_types:publish_result(),
  150. emqx_types:reason_code()
  151. ) ->
  152. fold_callback_result(undefined | emqx_types:reason_code()).
  153. -callback 'message.dropped'(emqx_types:message(), #{node => node()}, _Reason :: atom()) ->
  154. callback_result().
  155. -callback 'schema.validation_failed'(emqx_types:message(), #{node => node()}, _Ctx :: map()) ->
  156. callback_result().
  157. -callback 'message.delivered'(emqx_types:clientinfo(), Msg) -> fold_callback_result(Msg) when
  158. Msg :: emqx_types:message().
  159. -callback 'message.acked'(emqx_types:clientinfo(), emqx_types:message()) -> callback_result().
  160. -callback 'delivery.dropped'(emqx_types:clientinfo(), emqx_types:message(), _Reason :: atom()) ->
  161. callback_result().
  162. -callback 'delivery.completed'(emqx_types:message(), #{
  163. session_birth_time := emqx_utils_calendar:epoch_millisecond(), clientid := emqx_types:clientid()
  164. }) ->
  165. callback_result().
  166. %% NOTE
  167. %% Executed out of channel process context
  168. -callback 'cm.channel.unregistered'(_ChanPid :: pid()) -> callback_result().
  169. %% NOTE
  170. %% Executed out of channel process context
  171. -callback 'tls_handshake.psk_lookup'(emqx_tls_psk:psk_identity(), normal) ->
  172. fold_callback_result(
  173. {ok, _SharedSecret :: binary()}
  174. | {error, term()}
  175. | normal
  176. ).
  177. %%-----------------------------------------------------------------------------
  178. %% API
  179. %%-----------------------------------------------------------------------------
  180. %% Binary hookpoint names are dynamic and used for bridges
  181. -type registered_hookpoint() :: atom().
  182. -type registered_hookpoint_status() :: valid | deprecated.
  183. -spec default_hookpoints() -> #{registered_hookpoint() => registered_hookpoint_status()}.
  184. default_hookpoints() ->
  185. maps:merge(
  186. maps:from_keys(?HOOKPOINTS, valid),
  187. maps:from_keys(?DEPRECATED_HOOKPOINTS, deprecated)
  188. ).
  189. -spec register_hookpoints() -> ok.
  190. register_hookpoints() ->
  191. register_hookpoints(default_hookpoints()).
  192. -spec register_hookpoints(
  193. [registered_hookpoint()] | #{registered_hookpoint() => registered_hookpoint_status()}
  194. ) -> ok.
  195. register_hookpoints(HookPoints) when is_list(HookPoints) ->
  196. register_hookpoints(maps:from_keys(HookPoints, valid));
  197. register_hookpoints(HookPoints) when is_map(HookPoints) ->
  198. persistent_term:put(?MODULE, HookPoints).
  199. -spec verify_hookpoint(registered_hookpoint() | binary()) -> ok | no_return().
  200. verify_hookpoint(HookPoint) when is_binary(HookPoint) -> ok;
  201. verify_hookpoint(HookPoint) ->
  202. case maps:find(HookPoint, registered_hookpoints()) of
  203. {ok, valid} -> ok;
  204. {ok, deprecated} -> ?SLOG(warning, #{msg => deprecated_hookpoint, hookpoint => HookPoint});
  205. error -> error({invalid_hookpoint, HookPoint})
  206. end.
  207. %%-----------------------------------------------------------------------------
  208. %% Internal API
  209. %%-----------------------------------------------------------------------------
  210. -spec registered_hookpoints() -> #{registered_hookpoint() => registered_hookpoint_status()}.
  211. registered_hookpoints() ->
  212. persistent_term:get(?MODULE, #{}).