|
|
@@ -20,25 +20,12 @@
|
|
|
|
|
|
-export([post_config_update/4]).
|
|
|
|
|
|
--export([reload_hook/0, unload_hook/0]).
|
|
|
-
|
|
|
--export([on_message_publish/1]).
|
|
|
-
|
|
|
--export([ load_bridges/0
|
|
|
- , get_bridge/2
|
|
|
- , get_bridge/3
|
|
|
- , list_bridges/0
|
|
|
- , create_bridge/3
|
|
|
- , remove_bridge/3
|
|
|
- , update_bridge/3
|
|
|
- , start_bridge/2
|
|
|
- , stop_bridge/2
|
|
|
- , restart_bridge/2
|
|
|
- , send_message/2
|
|
|
+-export([ load_hook/0
|
|
|
+ , reload_hook/0
|
|
|
+ , unload_hook/0
|
|
|
]).
|
|
|
|
|
|
--export([ config_key_path/0
|
|
|
- ]).
|
|
|
+-export([on_message_publish/1]).
|
|
|
|
|
|
-export([ resource_type/1
|
|
|
, bridge_type/1
|
|
|
@@ -47,9 +34,30 @@
|
|
|
, parse_bridge_id/1
|
|
|
]).
|
|
|
|
|
|
+-export([ load/0
|
|
|
+ , lookup/2
|
|
|
+ , lookup/3
|
|
|
+ , list/0
|
|
|
+ , create/3
|
|
|
+ , remove/3
|
|
|
+ , update/3
|
|
|
+ , start/2
|
|
|
+ , stop/2
|
|
|
+ , restart/2
|
|
|
+ ]).
|
|
|
+
|
|
|
+-export([ send_message/2
|
|
|
+ ]).
|
|
|
+
|
|
|
+-export([ config_key_path/0
|
|
|
+ ]).
|
|
|
+
|
|
|
reload_hook() ->
|
|
|
unload_hook(),
|
|
|
- Bridges = emqx_conf:get([bridges], #{}),
|
|
|
+ load_hook().
|
|
|
+
|
|
|
+load_hook() ->
|
|
|
+ Bridges = emqx:get_config([bridges], #{}),
|
|
|
lists:foreach(fun({_Type, Bridge}) ->
|
|
|
lists:foreach(fun({_Name, BridgeConf}) ->
|
|
|
load_hook(BridgeConf)
|
|
|
@@ -82,9 +90,7 @@ config_key_path() ->
|
|
|
[bridges].
|
|
|
|
|
|
resource_type(mqtt) -> emqx_connector_mqtt;
|
|
|
-resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
|
|
|
-resource_type(http) -> emqx_connector_http;
|
|
|
-resource_type(<<"http">>) -> emqx_connector_http.
|
|
|
+resource_type(http) -> emqx_connector_http.
|
|
|
|
|
|
bridge_type(emqx_connector_mqtt) -> mqtt;
|
|
|
bridge_type(emqx_connector_http) -> http.
|
|
|
@@ -93,9 +99,9 @@ post_config_update(_Req, NewConf, OldConf, _AppEnv) ->
|
|
|
#{added := Added, removed := Removed, changed := Updated}
|
|
|
= diff_confs(NewConf, OldConf),
|
|
|
perform_bridge_changes([
|
|
|
- {fun remove_bridge/3, Removed},
|
|
|
- {fun create_bridge/3, Added},
|
|
|
- {fun update_bridge/3, Updated}
|
|
|
+ {fun remove/3, Removed},
|
|
|
+ {fun create/3, Added},
|
|
|
+ {fun update/3, Updated}
|
|
|
]),
|
|
|
reload_hook().
|
|
|
|
|
|
@@ -116,8 +122,8 @@ perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) ->
|
|
|
end, Result0, MapConfs),
|
|
|
perform_bridge_changes(Tasks, Result).
|
|
|
|
|
|
-load_bridges() ->
|
|
|
- Bridges = emqx_conf:get([bridges], #{}),
|
|
|
+load() ->
|
|
|
+ Bridges = emqx:get_config([bridges], #{}),
|
|
|
emqx_bridge_monitor:ensure_all_started(Bridges).
|
|
|
|
|
|
resource_id(BridgeId) when is_binary(BridgeId) ->
|
|
|
@@ -138,36 +144,36 @@ parse_bridge_id(BridgeId) ->
|
|
|
_ -> error({invalid_bridge_id, BridgeId})
|
|
|
end.
|
|
|
|
|
|
-list_bridges() ->
|
|
|
+list() ->
|
|
|
lists:foldl(fun({Type, NameAndConf}, Bridges) ->
|
|
|
lists:foldl(fun({Name, RawConf}, Acc) ->
|
|
|
- case get_bridge(Type, Name, RawConf) of
|
|
|
+ case lookup(Type, Name, RawConf) of
|
|
|
{error, not_found} -> Acc;
|
|
|
{ok, Res} -> [Res | Acc]
|
|
|
end
|
|
|
end, Bridges, maps:to_list(NameAndConf))
|
|
|
end, [], maps:to_list(emqx:get_raw_config([bridges], #{}))).
|
|
|
|
|
|
-get_bridge(Type, Name) ->
|
|
|
+lookup(Type, Name) ->
|
|
|
RawConf = emqx:get_raw_config([bridges, Type, Name], #{}),
|
|
|
- get_bridge(Type, Name, RawConf).
|
|
|
-get_bridge(Type, Name, RawConf) ->
|
|
|
+ lookup(Type, Name, RawConf).
|
|
|
+lookup(Type, Name, RawConf) ->
|
|
|
case emqx_resource:get_instance(resource_id(Type, Name)) of
|
|
|
{error, not_found} -> {error, not_found};
|
|
|
{ok, Data} -> {ok, #{id => bridge_id(Type, Name), resource_data => Data,
|
|
|
raw_config => RawConf}}
|
|
|
end.
|
|
|
|
|
|
-start_bridge(Type, Name) ->
|
|
|
- restart_bridge(Type, Name).
|
|
|
+start(Type, Name) ->
|
|
|
+ restart(Type, Name).
|
|
|
|
|
|
-stop_bridge(Type, Name) ->
|
|
|
+stop(Type, Name) ->
|
|
|
emqx_resource:stop(resource_id(Type, Name)).
|
|
|
|
|
|
-restart_bridge(Type, Name) ->
|
|
|
+restart(Type, Name) ->
|
|
|
emqx_resource:restart(resource_id(Type, Name)).
|
|
|
|
|
|
-create_bridge(Type, Name, Conf) ->
|
|
|
+create(Type, Name, Conf) ->
|
|
|
?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
|
|
|
config => Conf}),
|
|
|
ResId = resource_id(Type, Name),
|
|
|
@@ -181,7 +187,7 @@ create_bridge(Type, Name, Conf) ->
|
|
|
{error, Reason}
|
|
|
end.
|
|
|
|
|
|
-update_bridge(Type, Name, {_OldConf, Conf}) ->
|
|
|
+update(Type, Name, {_OldConf, Conf}) ->
|
|
|
%% TODO: sometimes its not necessary to restart the bridge connection.
|
|
|
%%
|
|
|
%% - if the connection related configs like `servers` is updated, we should restart/start
|
|
|
@@ -195,7 +201,7 @@ update_bridge(Type, Name, {_OldConf, Conf}) ->
|
|
|
emqx_resource:recreate(resource_id(Type, Name),
|
|
|
emqx_bridge:resource_type(Type), Conf, []).
|
|
|
|
|
|
-remove_bridge(Type, Name, _Conf) ->
|
|
|
+remove(Type, Name, _Conf) ->
|
|
|
?SLOG(info, #{msg => "remove bridge", type => Type, name => Name}),
|
|
|
case emqx_resource:remove(resource_id(Type, Name)) of
|
|
|
ok -> ok;
|