| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx).
- -include("emqx.hrl").
- -include("logger.hrl").
- -include("types.hrl").
- -logger_header("[EMQ X]").
- %% Start/Stop the application
- -export([ start/0
- , restart/1
- , is_running/1
- , stop/0
- ]).
- -export([ get_env/1
- , get_env/2
- ]).
- %% PubSub API
- -export([ subscribe/1
- , subscribe/2
- , subscribe/3
- , publish/1
- , unsubscribe/1
- ]).
- %% PubSub management API
- -export([ topics/0
- , subscriptions/1
- , subscribers/1
- , subscribed/2
- ]).
- %% Hooks API
- -export([ hook/2
- , hook/3
- , hook/4
- , unhook/2
- , run_hook/2
- , run_fold_hook/3
- ]).
- %% Shutdown and reboot
- -export([ shutdown/0
- , shutdown/1
- , reboot/0
- ]).
- -define(APP, ?MODULE).
- -define(COPYRIGHT, "Copyright (c) 2020 EMQ Technologies Co., Ltd").
- -define(LICENSE_MESSAGE, "Licensed under the Apache License, Version 2.0").
- %%--------------------------------------------------------------------
- %% Bootstrap, is_running...
- %%--------------------------------------------------------------------
- %% @doc Start emqx application
- -spec(start() -> {ok, list(atom())} | {error, term()}).
- start() ->
- %% Check OS
- %% Check VM
- %% Check Mnesia
- application:ensure_all_started(?APP).
- -spec(restart(string()) -> ok).
- restart(ConfFile) ->
- reload_config(ConfFile),
- shutdown(),
- ok = application:stop(mnesia),
- _ = application:start(mnesia),
- reboot().
- %% @doc Stop emqx application.
- -spec(stop() -> ok | {error, term()}).
- stop() ->
- application:stop(?APP).
- %% @doc Is emqx running?
- -spec(is_running(node()) -> boolean()).
- is_running(Node) ->
- case rpc:call(Node, erlang, whereis, [?APP]) of
- {badrpc, _} -> false;
- undefined -> false;
- Pid when is_pid(Pid) -> true
- end.
- %% @doc Get environment
- -spec(get_env(Key :: atom()) -> maybe(term())).
- get_env(Key) ->
- get_env(Key, undefined).
- -spec(get_env(Key :: atom(), Default :: term()) -> term()).
- get_env(Key, Default) ->
- application:get_env(?APP, Key, Default).
- %%--------------------------------------------------------------------
- %% PubSub API
- %%--------------------------------------------------------------------
- -spec(subscribe(emqx_topic:topic() | string()) -> ok).
- subscribe(Topic) ->
- emqx_broker:subscribe(iolist_to_binary(Topic)).
- -spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | emqx_types:subopts()) -> ok).
- subscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId)->
- emqx_broker:subscribe(iolist_to_binary(Topic), SubId);
- subscribe(Topic, SubOpts) when is_map(SubOpts) ->
- emqx_broker:subscribe(iolist_to_binary(Topic), SubOpts).
- -spec(subscribe(emqx_topic:topic() | string(),
- emqx_types:subid() | pid(), emqx_types:subopts()) -> ok).
- subscribe(Topic, SubId, SubOpts) when (is_atom(SubId) orelse is_binary(SubId)), is_map(SubOpts) ->
- emqx_broker:subscribe(iolist_to_binary(Topic), SubId, SubOpts).
- -spec(publish(emqx_types:message()) -> emqx_types:publish_result()).
- publish(Msg) ->
- emqx_broker:publish(Msg).
- -spec(unsubscribe(emqx_topic:topic() | string()) -> ok).
- unsubscribe(Topic) ->
- emqx_broker:unsubscribe(iolist_to_binary(Topic)).
- %%--------------------------------------------------------------------
- %% PubSub management API
- %%--------------------------------------------------------------------
- -spec(topics() -> list(emqx_topic:topic())).
- topics() -> emqx_router:topics().
- -spec(subscribers(emqx_topic:topic() | string()) -> [pid()]).
- subscribers(Topic) ->
- emqx_broker:subscribers(iolist_to_binary(Topic)).
- -spec(subscriptions(pid()) -> [{emqx_topic:topic(), emqx_types:subopts()}]).
- subscriptions(SubPid) when is_pid(SubPid) ->
- emqx_broker:subscriptions(SubPid).
- -spec(subscribed(pid() | emqx_types:subid(), emqx_topic:topic() | string()) -> boolean()).
- subscribed(SubPid, Topic) when is_pid(SubPid) ->
- emqx_broker:subscribed(SubPid, iolist_to_binary(Topic));
- subscribed(SubId, Topic) when is_atom(SubId); is_binary(SubId) ->
- emqx_broker:subscribed(SubId, iolist_to_binary(Topic)).
- %%--------------------------------------------------------------------
- %% Hooks API
- %%--------------------------------------------------------------------
- -spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action()) -> ok | {error, already_exists}).
- hook(HookPoint, Action) ->
- emqx_hooks:add(HookPoint, Action).
- -spec(hook(emqx_hooks:hookpoint(),
- emqx_hooks:action(),
- emqx_hooks:filter() | integer() | list())
- -> ok | {error, already_exists}).
- hook(HookPoint, Action, Priority) when is_integer(Priority) ->
- emqx_hooks:add(HookPoint, Action, Priority);
- hook(HookPoint, Action, Filter) when is_function(Filter); is_tuple(Filter) ->
- emqx_hooks:add(HookPoint, Action, Filter);
- hook(HookPoint, Action, InitArgs) when is_list(InitArgs) ->
- emqx_hooks:add(HookPoint, Action, InitArgs).
- -spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action(), emqx_hooks:filter(), integer())
- -> ok | {error, already_exists}).
- hook(HookPoint, Action, Filter, Priority) ->
- emqx_hooks:add(HookPoint, Action, Filter, Priority).
- -spec(unhook(emqx_hooks:hookpoint(), emqx_hooks:action() | {module(), atom()}) -> ok).
- unhook(HookPoint, Action) ->
- emqx_hooks:del(HookPoint, Action).
- -spec(run_hook(emqx_hooks:hookpoint(), list(any())) -> ok | stop).
- run_hook(HookPoint, Args) ->
- emqx_hooks:run(HookPoint, Args).
- -spec(run_fold_hook(emqx_hooks:hookpoint(), list(any()), any()) -> any()).
- run_fold_hook(HookPoint, Args, Acc) ->
- emqx_hooks:run_fold(HookPoint, Args, Acc).
- %%--------------------------------------------------------------------
- %% Shutdown and reboot
- %%--------------------------------------------------------------------
- shutdown() ->
- shutdown(normal).
- shutdown(Reason) ->
- ?LOG(critical, "emqx shutdown for ~s", [Reason]),
- _ = emqx_alarm_handler:unload(),
- _ = emqx_plugins:unload(),
- lists:foreach(fun application:stop/1, [emqx, ekka, cowboy, ranch, esockd, gproc]).
- reboot() ->
- lists:foreach(fun application:start/1, [gproc, esockd, ranch, cowboy, ekka, emqx]).
- %%--------------------------------------------------------------------
- %% Internal functions
- %%--------------------------------------------------------------------
- reload_config(ConfFile) ->
- {ok, [Conf]} = file:consult(ConfFile),
- lists:foreach(fun({App, Vals}) ->
- [application:set_env(App, Par, Val) || {Par, Val} <- Vals]
- end, Conf).
|