emqx.erl 7.2 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx).
  17. -include("emqx.hrl").
  18. -include("logger.hrl").
  19. -include("types.hrl").
  20. -logger_header("[EMQ X]").
  21. %% Start/Stop the application
  22. -export([ start/0
  23. , restart/1
  24. , is_running/1
  25. , stop/0
  26. ]).
  27. -export([ get_env/1
  28. , get_env/2
  29. ]).
  30. %% PubSub API
  31. -export([ subscribe/1
  32. , subscribe/2
  33. , subscribe/3
  34. , publish/1
  35. , unsubscribe/1
  36. ]).
  37. %% PubSub management API
  38. -export([ topics/0
  39. , subscriptions/1
  40. , subscribers/1
  41. , subscribed/2
  42. ]).
  43. %% Hooks API
  44. -export([ hook/2
  45. , hook/3
  46. , hook/4
  47. , unhook/2
  48. , run_hook/2
  49. , run_fold_hook/3
  50. ]).
  51. %% Shutdown and reboot
  52. -export([ shutdown/0
  53. , shutdown/1
  54. , reboot/0
  55. ]).
  56. -define(APP, ?MODULE).
  57. -define(COPYRIGHT, "Copyright (c) 2020 EMQ Technologies Co., Ltd").
  58. -define(LICENSE_MESSAGE, "Licensed under the Apache License, Version 2.0").
  59. %%--------------------------------------------------------------------
  60. %% Bootstrap, is_running...
  61. %%--------------------------------------------------------------------
  62. %% @doc Start emqx application
  63. -spec(start() -> {ok, list(atom())} | {error, term()}).
  64. start() ->
  65. %% Check OS
  66. %% Check VM
  67. %% Check Mnesia
  68. application:ensure_all_started(?APP).
  69. -spec(restart(string()) -> ok).
  70. restart(ConfFile) ->
  71. reload_config(ConfFile),
  72. shutdown(),
  73. ok = application:stop(mnesia),
  74. _ = application:start(mnesia),
  75. reboot().
  76. %% @doc Stop emqx application.
  77. -spec(stop() -> ok | {error, term()}).
  78. stop() ->
  79. application:stop(?APP).
  80. %% @doc Is emqx running?
  81. -spec(is_running(node()) -> boolean()).
  82. is_running(Node) ->
  83. case rpc:call(Node, erlang, whereis, [?APP]) of
  84. {badrpc, _} -> false;
  85. undefined -> false;
  86. Pid when is_pid(Pid) -> true
  87. end.
  88. %% @doc Get environment
  89. -spec(get_env(Key :: atom()) -> maybe(term())).
  90. get_env(Key) ->
  91. get_env(Key, undefined).
  92. -spec(get_env(Key :: atom(), Default :: term()) -> term()).
  93. get_env(Key, Default) ->
  94. application:get_env(?APP, Key, Default).
  95. %%--------------------------------------------------------------------
  96. %% PubSub API
  97. %%--------------------------------------------------------------------
  98. -spec(subscribe(emqx_topic:topic() | string()) -> ok).
  99. subscribe(Topic) ->
  100. emqx_broker:subscribe(iolist_to_binary(Topic)).
  101. -spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | emqx_types:subopts()) -> ok).
  102. subscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId)->
  103. emqx_broker:subscribe(iolist_to_binary(Topic), SubId);
  104. subscribe(Topic, SubOpts) when is_map(SubOpts) ->
  105. emqx_broker:subscribe(iolist_to_binary(Topic), SubOpts).
  106. -spec(subscribe(emqx_topic:topic() | string(),
  107. emqx_types:subid() | pid(), emqx_types:subopts()) -> ok).
  108. subscribe(Topic, SubId, SubOpts) when (is_atom(SubId) orelse is_binary(SubId)), is_map(SubOpts) ->
  109. emqx_broker:subscribe(iolist_to_binary(Topic), SubId, SubOpts).
  110. -spec(publish(emqx_types:message()) -> emqx_types:publish_result()).
  111. publish(Msg) ->
  112. emqx_broker:publish(Msg).
  113. -spec(unsubscribe(emqx_topic:topic() | string()) -> ok).
  114. unsubscribe(Topic) ->
  115. emqx_broker:unsubscribe(iolist_to_binary(Topic)).
  116. %%--------------------------------------------------------------------
  117. %% PubSub management API
  118. %%--------------------------------------------------------------------
  119. -spec(topics() -> list(emqx_topic:topic())).
  120. topics() -> emqx_router:topics().
  121. -spec(subscribers(emqx_topic:topic() | string()) -> [pid()]).
  122. subscribers(Topic) ->
  123. emqx_broker:subscribers(iolist_to_binary(Topic)).
  124. -spec(subscriptions(pid()) -> [{emqx_topic:topic(), emqx_types:subopts()}]).
  125. subscriptions(SubPid) when is_pid(SubPid) ->
  126. emqx_broker:subscriptions(SubPid).
  127. -spec(subscribed(pid() | emqx_types:subid(), emqx_topic:topic() | string()) -> boolean()).
  128. subscribed(SubPid, Topic) when is_pid(SubPid) ->
  129. emqx_broker:subscribed(SubPid, iolist_to_binary(Topic));
  130. subscribed(SubId, Topic) when is_atom(SubId); is_binary(SubId) ->
  131. emqx_broker:subscribed(SubId, iolist_to_binary(Topic)).
  132. %%--------------------------------------------------------------------
  133. %% Hooks API
  134. %%--------------------------------------------------------------------
  135. -spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action()) -> ok | {error, already_exists}).
  136. hook(HookPoint, Action) ->
  137. emqx_hooks:add(HookPoint, Action).
  138. -spec(hook(emqx_hooks:hookpoint(),
  139. emqx_hooks:action(),
  140. emqx_hooks:filter() | integer() | list())
  141. -> ok | {error, already_exists}).
  142. hook(HookPoint, Action, Priority) when is_integer(Priority) ->
  143. emqx_hooks:add(HookPoint, Action, Priority);
  144. hook(HookPoint, Action, Filter) when is_function(Filter); is_tuple(Filter) ->
  145. emqx_hooks:add(HookPoint, Action, Filter);
  146. hook(HookPoint, Action, InitArgs) when is_list(InitArgs) ->
  147. emqx_hooks:add(HookPoint, Action, InitArgs).
  148. -spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action(), emqx_hooks:filter(), integer())
  149. -> ok | {error, already_exists}).
  150. hook(HookPoint, Action, Filter, Priority) ->
  151. emqx_hooks:add(HookPoint, Action, Filter, Priority).
  152. -spec(unhook(emqx_hooks:hookpoint(), emqx_hooks:action() | {module(), atom()}) -> ok).
  153. unhook(HookPoint, Action) ->
  154. emqx_hooks:del(HookPoint, Action).
  155. -spec(run_hook(emqx_hooks:hookpoint(), list(any())) -> ok | stop).
  156. run_hook(HookPoint, Args) ->
  157. emqx_hooks:run(HookPoint, Args).
  158. -spec(run_fold_hook(emqx_hooks:hookpoint(), list(any()), any()) -> any()).
  159. run_fold_hook(HookPoint, Args, Acc) ->
  160. emqx_hooks:run_fold(HookPoint, Args, Acc).
  161. %%--------------------------------------------------------------------
  162. %% Shutdown and reboot
  163. %%--------------------------------------------------------------------
  164. shutdown() ->
  165. shutdown(normal).
  166. shutdown(Reason) ->
  167. ?LOG(critical, "emqx shutdown for ~s", [Reason]),
  168. _ = emqx_alarm_handler:unload(),
  169. _ = emqx_plugins:unload(),
  170. lists:foreach(fun application:stop/1, [emqx, ekka, cowboy, ranch, esockd, gproc]).
  171. reboot() ->
  172. lists:foreach(fun application:start/1, [gproc, esockd, ranch, cowboy, ekka, emqx]).
  173. %%--------------------------------------------------------------------
  174. %% Internal functions
  175. %%--------------------------------------------------------------------
  176. reload_config(ConfFile) ->
  177. {ok, [Conf]} = file:consult(ConfFile),
  178. lists:foreach(fun({App, Vals}) ->
  179. [application:set_env(App, Par, Val) || {Par, Val} <- Vals]
  180. end, Conf).