emqx_external_trace.erl 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_external_trace).
  17. -callback trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when
  18. Packet :: emqx_types:packet(),
  19. ChannelInfo :: channel_info(),
  20. Res :: term().
  21. -callback start_trace_send(list(emqx_types:deliver()), channel_info()) ->
  22. list(emqx_types:deliver()).
  23. -callback end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok.
  24. -type channel_info() :: #{atom() => _}.
  25. -export([
  26. provider/0,
  27. register_provider/1,
  28. unregister_provider/1,
  29. trace_process_publish/3,
  30. start_trace_send/2,
  31. end_trace_send/1
  32. ]).
  33. -export_type([channel_info/0]).
  34. -define(PROVIDER, {?MODULE, trace_provider}).
  35. -define(with_provider(IfRegistered, IfNotRegistered),
  36. case persistent_term:get(?PROVIDER, undefined) of
  37. undefined ->
  38. IfNotRegistered;
  39. Provider ->
  40. Provider:IfRegistered
  41. end
  42. ).
  43. %%--------------------------------------------------------------------
  44. %% provider API
  45. %%--------------------------------------------------------------------
  46. -spec register_provider(module()) -> ok | {error, term()}.
  47. register_provider(Module) when is_atom(Module) ->
  48. case is_valid_provider(Module) of
  49. true ->
  50. persistent_term:put(?PROVIDER, Module);
  51. false ->
  52. {error, invalid_provider}
  53. end.
  54. -spec unregister_provider(module()) -> ok | {error, term()}.
  55. unregister_provider(Module) ->
  56. case persistent_term:get(?PROVIDER, undefined) of
  57. Module ->
  58. persistent_term:erase(?PROVIDER),
  59. ok;
  60. _ ->
  61. {error, not_registered}
  62. end.
  63. -spec provider() -> module() | undefined.
  64. provider() ->
  65. persistent_term:get(?PROVIDER, undefined).
  66. %%--------------------------------------------------------------------
  67. %% trace API
  68. %%--------------------------------------------------------------------
  69. -spec trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when
  70. Packet :: emqx_types:packet(),
  71. ChannelInfo :: channel_info(),
  72. Res :: term().
  73. trace_process_publish(Packet, ChannelInfo, ProcessFun) ->
  74. ?with_provider(?FUNCTION_NAME(Packet, ChannelInfo, ProcessFun), ProcessFun(Packet)).
  75. -spec start_trace_send(list(emqx_types:deliver()), channel_info()) ->
  76. list(emqx_types:deliver()).
  77. start_trace_send(Delivers, ChannelInfo) ->
  78. ?with_provider(?FUNCTION_NAME(Delivers, ChannelInfo), Delivers).
  79. -spec end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok.
  80. end_trace_send(Packets) ->
  81. ?with_provider(?FUNCTION_NAME(Packets), ok).
  82. %%--------------------------------------------------------------------
  83. %% Internal functions
  84. %%--------------------------------------------------------------------
  85. is_valid_provider(Module) ->
  86. lists:all(
  87. fun({F, A}) -> erlang:function_exported(Module, F, A) end,
  88. ?MODULE:behaviour_info(callbacks)
  89. ).