|
|
@@ -27,34 +27,16 @@
|
|
|
|
|
|
-export([on_message_publish/1]).
|
|
|
|
|
|
--export([
|
|
|
- resource_type/1,
|
|
|
- bridge_type/1,
|
|
|
- resource_id/1,
|
|
|
- resource_id/2,
|
|
|
- bridge_id/2,
|
|
|
- parse_bridge_id/1
|
|
|
-]).
|
|
|
-
|
|
|
-export([
|
|
|
load/0,
|
|
|
lookup/1,
|
|
|
lookup/2,
|
|
|
lookup/3,
|
|
|
- list/0,
|
|
|
- list_bridges_by_connector/1,
|
|
|
- create/2,
|
|
|
create/3,
|
|
|
- recreate/2,
|
|
|
- recreate/3,
|
|
|
- create_dry_run/2,
|
|
|
- remove/1,
|
|
|
+ disable_enable/3,
|
|
|
remove/2,
|
|
|
- update/2,
|
|
|
- update/3,
|
|
|
- stop/2,
|
|
|
- restart/2,
|
|
|
- reset_metrics/1
|
|
|
+ list/0,
|
|
|
+ list_bridges_by_connector/1
|
|
|
]).
|
|
|
|
|
|
-export([send_message/2]).
|
|
|
@@ -129,8 +111,8 @@ send_to_matched_egress_bridges(Topic, Msg) ->
|
|
|
).
|
|
|
|
|
|
send_message(BridgeId, Message) ->
|
|
|
- {BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
|
|
- ResId = emqx_bridge:resource_id(BridgeType, BridgeName),
|
|
|
+ {BridgeType, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
|
|
|
+ ResId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
|
|
|
case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of
|
|
|
not_found ->
|
|
|
{error, {bridge_not_found, BridgeId}};
|
|
|
@@ -143,70 +125,23 @@ send_message(BridgeId, Message) ->
|
|
|
config_key_path() ->
|
|
|
[bridges].
|
|
|
|
|
|
-resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
|
|
|
-resource_type(mqtt) -> emqx_connector_mqtt;
|
|
|
-resource_type(<<"http">>) -> emqx_connector_http;
|
|
|
-resource_type(http) -> emqx_connector_http.
|
|
|
-
|
|
|
-bridge_type(emqx_connector_mqtt) -> mqtt;
|
|
|
-bridge_type(emqx_connector_http) -> http.
|
|
|
-
|
|
|
post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
|
|
|
#{added := Added, removed := Removed, changed := Updated} =
|
|
|
diff_confs(NewConf, OldConf),
|
|
|
%% The config update will be failed if any task in `perform_bridge_changes` failed.
|
|
|
Result = perform_bridge_changes([
|
|
|
- {fun remove/3, Removed},
|
|
|
- {fun create/3, Added},
|
|
|
- {fun update/3, Updated}
|
|
|
+ {fun emqx_bridge_resource:remove/3, Removed},
|
|
|
+ {fun emqx_bridge_resource:create/3, Added},
|
|
|
+ {fun emqx_bridge_resource:update/3, Updated}
|
|
|
]),
|
|
|
ok = unload_hook(),
|
|
|
ok = load_hook(NewConf),
|
|
|
Result.
|
|
|
|
|
|
-perform_bridge_changes(Tasks) ->
|
|
|
- perform_bridge_changes(Tasks, ok).
|
|
|
-
|
|
|
-perform_bridge_changes([], Result) ->
|
|
|
- Result;
|
|
|
-perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) ->
|
|
|
- Result = maps:fold(
|
|
|
- fun
|
|
|
- ({_Type, _Name}, _Conf, {error, Reason}) ->
|
|
|
- {error, Reason};
|
|
|
- ({Type, Name}, Conf, _) ->
|
|
|
- case Action(Type, Name, Conf) of
|
|
|
- {error, Reason} -> {error, Reason};
|
|
|
- Return -> Return
|
|
|
- end
|
|
|
- end,
|
|
|
- Result0,
|
|
|
- MapConfs
|
|
|
- ),
|
|
|
- perform_bridge_changes(Tasks, Result).
|
|
|
-
|
|
|
load() ->
|
|
|
Bridges = emqx:get_config([bridges], #{}),
|
|
|
emqx_bridge_monitor:ensure_all_started(Bridges).
|
|
|
|
|
|
-resource_id(BridgeId) when is_binary(BridgeId) ->
|
|
|
- <<"bridge:", BridgeId/binary>>.
|
|
|
-
|
|
|
-resource_id(BridgeType, BridgeName) ->
|
|
|
- BridgeId = bridge_id(BridgeType, BridgeName),
|
|
|
- resource_id(BridgeId).
|
|
|
-
|
|
|
-bridge_id(BridgeType, BridgeName) ->
|
|
|
- Name = bin(BridgeName),
|
|
|
- Type = bin(BridgeType),
|
|
|
- <<Type/binary, ":", Name/binary>>.
|
|
|
-
|
|
|
-parse_bridge_id(BridgeId) ->
|
|
|
- case string:split(bin(BridgeId), ":", all) of
|
|
|
- [Type, Name] -> {binary_to_atom(Type, utf8), binary_to_atom(Name, utf8)};
|
|
|
- _ -> error({invalid_bridge_id, BridgeId})
|
|
|
- end.
|
|
|
-
|
|
|
list() ->
|
|
|
lists:foldl(
|
|
|
fun({Type, NameAndConf}, Bridges) ->
|
|
|
@@ -233,14 +168,14 @@ list_bridges_by_connector(ConnectorId) ->
|
|
|
].
|
|
|
|
|
|
lookup(Id) ->
|
|
|
- {Type, Name} = parse_bridge_id(Id),
|
|
|
+ {Type, Name} = emqx_bridge_resource:parse_bridge_id(Id),
|
|
|
lookup(Type, Name).
|
|
|
|
|
|
lookup(Type, Name) ->
|
|
|
RawConf = emqx:get_raw_config([bridges, Type, Name], #{}),
|
|
|
lookup(Type, Name, RawConf).
|
|
|
lookup(Type, Name, RawConf) ->
|
|
|
- case emqx_resource:get_instance(resource_id(Type, Name)) of
|
|
|
+ case emqx_resource:get_instance(emqx_bridge_resource:resource_id(Type, Name)) of
|
|
|
{error, not_found} ->
|
|
|
{error, not_found};
|
|
|
{ok, _, Data} ->
|
|
|
@@ -252,125 +187,52 @@ lookup(Type, Name, RawConf) ->
|
|
|
}}
|
|
|
end.
|
|
|
|
|
|
-reset_metrics(ResourceId) ->
|
|
|
- emqx_resource:reset_metrics(ResourceId).
|
|
|
-
|
|
|
-stop(Type, Name) ->
|
|
|
- emqx_resource:stop(resource_id(Type, Name)).
|
|
|
-
|
|
|
-%% we don't provide 'start', as we want an already started bridge to be restarted.
|
|
|
-restart(Type, Name) ->
|
|
|
- emqx_resource:restart(resource_id(Type, Name)).
|
|
|
-
|
|
|
-create(BridgeId, Conf) ->
|
|
|
- {BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
|
|
- create(BridgeType, BridgeName, Conf).
|
|
|
-
|
|
|
-create(Type, Name, Conf) ->
|
|
|
- ?SLOG(info, #{
|
|
|
- msg => "create bridge",
|
|
|
- type => Type,
|
|
|
- name => Name,
|
|
|
- config => Conf
|
|
|
- }),
|
|
|
- {ok, _Data} = emqx_resource:create_local(
|
|
|
- resource_id(Type, Name),
|
|
|
- <<"emqx_bridge">>,
|
|
|
- emqx_bridge:resource_type(Type),
|
|
|
- parse_confs(Type, Name, Conf),
|
|
|
- #{}
|
|
|
- ),
|
|
|
- maybe_disable_bridge(Type, Name, Conf).
|
|
|
-
|
|
|
-update(BridgeId, {OldConf, Conf}) ->
|
|
|
- {BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
|
|
- update(BridgeType, BridgeName, {OldConf, Conf}).
|
|
|
-
|
|
|
-update(Type, Name, {OldConf, Conf}) ->
|
|
|
- %% TODO: sometimes its not necessary to restart the bridge connection.
|
|
|
- %%
|
|
|
- %% - if the connection related configs like `servers` is updated, we should restart/start
|
|
|
- %% or stop bridges according to the change.
|
|
|
- %% - if the connection related configs are not update, only non-connection configs like
|
|
|
- %% the `method` or `headers` of a HTTP bridge is changed, then the bridge can be updated
|
|
|
- %% without restarting the bridge.
|
|
|
- %%
|
|
|
- case if_only_to_toggle_enable(OldConf, Conf) of
|
|
|
- false ->
|
|
|
- ?SLOG(info, #{
|
|
|
- msg => "update bridge",
|
|
|
- type => Type,
|
|
|
- name => Name,
|
|
|
- config => Conf
|
|
|
- }),
|
|
|
- case recreate(Type, Name, Conf) of
|
|
|
- {ok, _} ->
|
|
|
- maybe_disable_bridge(Type, Name, Conf);
|
|
|
- {error, not_found} ->
|
|
|
- ?SLOG(warning, #{
|
|
|
- msg => "updating_a_non-exist_bridge_need_create_a_new_one",
|
|
|
- type => Type,
|
|
|
- name => Name,
|
|
|
- config => Conf
|
|
|
- }),
|
|
|
- create(Type, Name, Conf);
|
|
|
- {error, Reason} ->
|
|
|
- {error, {update_bridge_failed, Reason}}
|
|
|
- end;
|
|
|
- true ->
|
|
|
- %% we don't need to recreate the bridge if this config change is only to
|
|
|
- %% toggole the config 'bridge.{type}.{name}.enable'
|
|
|
- case maps:get(enable, Conf, true) of
|
|
|
- true -> restart(Type, Name);
|
|
|
- false -> stop(Type, Name)
|
|
|
- end
|
|
|
- end.
|
|
|
-
|
|
|
-recreate(Type, Name) ->
|
|
|
- recreate(Type, Name, emqx:get_config([bridges, Type, Name])).
|
|
|
+disable_enable(Action, BridgeType, BridgeName) when
|
|
|
+ Action =:= disable; Action =:= enable
|
|
|
+->
|
|
|
+ emqx_conf:update(
|
|
|
+ config_key_path() ++ [BridgeType, BridgeName],
|
|
|
+ {Action, BridgeType, BridgeName},
|
|
|
+ #{override_to => cluster}
|
|
|
+ ).
|
|
|
|
|
|
-recreate(Type, Name, Conf) ->
|
|
|
- emqx_resource:recreate_local(
|
|
|
- resource_id(Type, Name),
|
|
|
- emqx_bridge:resource_type(Type),
|
|
|
- parse_confs(Type, Name, Conf),
|
|
|
- #{}
|
|
|
+create(BridgeType, BridgeName, RawConf) ->
|
|
|
+ emqx_conf:update(
|
|
|
+ emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
|
|
|
+ RawConf,
|
|
|
+ #{override_to => cluster}
|
|
|
).
|
|
|
|
|
|
-create_dry_run(Type, Conf) ->
|
|
|
- Conf0 = fill_dry_run_conf(Conf),
|
|
|
- case emqx_resource:check_config(emqx_bridge:resource_type(Type), Conf0) of
|
|
|
- {ok, Conf1} ->
|
|
|
- TmpPath = iolist_to_binary(["bridges-create-dry-run:", emqx_misc:gen_id(8)]),
|
|
|
- case emqx_connector_ssl:convert_certs(TmpPath, Conf1) of
|
|
|
- {error, Reason} ->
|
|
|
- {error, Reason};
|
|
|
- {ok, ConfNew} ->
|
|
|
- Res = emqx_resource:create_dry_run_local(
|
|
|
- emqx_bridge:resource_type(Type), ConfNew
|
|
|
- ),
|
|
|
- _ = maybe_clear_certs(TmpPath, ConfNew),
|
|
|
- Res
|
|
|
- end;
|
|
|
- {error, _} = Error ->
|
|
|
- Error
|
|
|
- end.
|
|
|
+remove(BridgeType, BridgeName) ->
|
|
|
+ emqx_conf:remove(
|
|
|
+ emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
|
|
|
+ #{override_to => cluster}
|
|
|
+ ).
|
|
|
|
|
|
-remove(BridgeId) ->
|
|
|
- {BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
|
|
- remove(BridgeType, BridgeName, #{}).
|
|
|
+%%========================================================================================
|
|
|
+%% Helper functions
|
|
|
+%%========================================================================================
|
|
|
|
|
|
-remove(Type, Name) ->
|
|
|
- remove(Type, Name, undefined).
|
|
|
+perform_bridge_changes(Tasks) ->
|
|
|
+ perform_bridge_changes(Tasks, ok).
|
|
|
|
|
|
-%% just for perform_bridge_changes/1
|
|
|
-remove(Type, Name, _Conf) ->
|
|
|
- ?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}),
|
|
|
- case emqx_resource:remove_local(resource_id(Type, Name)) of
|
|
|
- ok -> ok;
|
|
|
- {error, not_found} -> ok;
|
|
|
- {error, Reason} -> {error, Reason}
|
|
|
- end.
|
|
|
+perform_bridge_changes([], Result) ->
|
|
|
+ Result;
|
|
|
+perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) ->
|
|
|
+ Result = maps:fold(
|
|
|
+ fun
|
|
|
+ ({_Type, _Name}, _Conf, {error, Reason}) ->
|
|
|
+ {error, Reason};
|
|
|
+ ({Type, Name}, Conf, _) ->
|
|
|
+ case Action(Type, Name, Conf) of
|
|
|
+ {error, Reason} -> {error, Reason};
|
|
|
+ Return -> Return
|
|
|
+ end
|
|
|
+ end,
|
|
|
+ Result0,
|
|
|
+ MapConfs
|
|
|
+ ),
|
|
|
+ perform_bridge_changes(Tasks, Result).
|
|
|
|
|
|
diff_confs(NewConfs, OldConfs) ->
|
|
|
emqx_map_lib:diff_maps(
|
|
|
@@ -416,104 +278,10 @@ get_matched_bridge_id(#{enable := false}, _Topic, _BType, _BName, Acc) ->
|
|
|
Acc;
|
|
|
get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) ->
|
|
|
case emqx_topic:match(Topic, Filter) of
|
|
|
- true -> [bridge_id(BType, BName) | Acc];
|
|
|
+ true -> [emqx_bridge_resource:bridge_id(BType, BName) | Acc];
|
|
|
false -> Acc
|
|
|
end.
|
|
|
|
|
|
-parse_confs(
|
|
|
- http,
|
|
|
- _Name,
|
|
|
- #{
|
|
|
- url := Url,
|
|
|
- method := Method,
|
|
|
- body := Body,
|
|
|
- headers := Headers,
|
|
|
- request_timeout := ReqTimeout
|
|
|
- } = Conf
|
|
|
-) ->
|
|
|
- {BaseUrl, Path} = parse_url(Url),
|
|
|
- {ok, BaseUrl2} = emqx_http_lib:uri_parse(BaseUrl),
|
|
|
- Conf#{
|
|
|
- base_url => BaseUrl2,
|
|
|
- request =>
|
|
|
- #{
|
|
|
- path => Path,
|
|
|
- method => Method,
|
|
|
- body => Body,
|
|
|
- headers => Headers,
|
|
|
- request_timeout => ReqTimeout
|
|
|
- }
|
|
|
- };
|
|
|
-parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) when
|
|
|
- is_binary(ConnId)
|
|
|
-->
|
|
|
- case emqx_connector:parse_connector_id(ConnId) of
|
|
|
- {Type, ConnName} ->
|
|
|
- ConnectorConfs = emqx:get_config([connectors, Type, ConnName]),
|
|
|
- make_resource_confs(
|
|
|
- Direction,
|
|
|
- ConnectorConfs,
|
|
|
- maps:without([connector, direction], Conf),
|
|
|
- Type,
|
|
|
- Name
|
|
|
- );
|
|
|
- {_ConnType, _ConnName} ->
|
|
|
- error({cannot_use_connector_with_different_type, ConnId})
|
|
|
- end;
|
|
|
-parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf) when
|
|
|
- is_map(ConnectorConfs)
|
|
|
-->
|
|
|
- make_resource_confs(
|
|
|
- Direction,
|
|
|
- ConnectorConfs,
|
|
|
- maps:without([connector, direction], Conf),
|
|
|
- Type,
|
|
|
- Name
|
|
|
- ).
|
|
|
-
|
|
|
-make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) ->
|
|
|
- BName = bridge_id(Type, Name),
|
|
|
- ConnectorConfs#{
|
|
|
- ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>}
|
|
|
- };
|
|
|
-make_resource_confs(egress, ConnectorConfs, BridgeConf, _Type, _Name) ->
|
|
|
- ConnectorConfs#{
|
|
|
- egress => BridgeConf
|
|
|
- }.
|
|
|
-
|
|
|
-parse_url(Url) ->
|
|
|
- case string:split(Url, "//", leading) of
|
|
|
- [Scheme, UrlRem] ->
|
|
|
- case string:split(UrlRem, "/", leading) of
|
|
|
- [HostPort, Path] ->
|
|
|
- {iolist_to_binary([Scheme, "//", HostPort]), Path};
|
|
|
- [HostPort] ->
|
|
|
- {iolist_to_binary([Scheme, "//", HostPort]), <<>>}
|
|
|
- end;
|
|
|
- [Url] ->
|
|
|
- error({invalid_url, Url})
|
|
|
- end.
|
|
|
-
|
|
|
-maybe_disable_bridge(Type, Name, Conf) ->
|
|
|
- case maps:get(enable, Conf, true) of
|
|
|
- false -> stop(Type, Name);
|
|
|
- true -> ok
|
|
|
- end.
|
|
|
-
|
|
|
-if_only_to_toggle_enable(OldConf, Conf) ->
|
|
|
- #{added := Added, removed := Removed, changed := Updated} =
|
|
|
- emqx_map_lib:diff_maps(OldConf, Conf),
|
|
|
- case {Added, Removed, Updated} of
|
|
|
- {Added, Removed, #{enable := _} = Updated} when
|
|
|
- map_size(Added) =:= 0,
|
|
|
- map_size(Removed) =:= 0,
|
|
|
- map_size(Updated) =:= 1
|
|
|
- ->
|
|
|
- true;
|
|
|
- {_, _, _} ->
|
|
|
- false
|
|
|
- end.
|
|
|
-
|
|
|
-spec get_basic_usage_info() ->
|
|
|
#{
|
|
|
num_bridges => non_neg_integer(),
|
|
|
@@ -551,42 +319,3 @@ get_basic_usage_info() ->
|
|
|
_:_ ->
|
|
|
InitialAcc
|
|
|
end.
|
|
|
-
|
|
|
-fill_dry_run_conf(Conf) ->
|
|
|
- Conf#{
|
|
|
- <<"egress">> =>
|
|
|
- #{
|
|
|
- <<"remote_topic">> => <<"t">>,
|
|
|
- <<"remote_qos">> => 0,
|
|
|
- <<"retain">> => true,
|
|
|
- <<"payload">> => <<"val">>
|
|
|
- },
|
|
|
- <<"ingress">> =>
|
|
|
- #{<<"remote_topic">> => <<"t">>}
|
|
|
- }.
|
|
|
-
|
|
|
-maybe_clear_certs(TmpPath, #{ssl := SslConf} = Conf) ->
|
|
|
- %% don't remove the cert files if they are in use
|
|
|
- case is_tmp_path_conf(TmpPath, SslConf) of
|
|
|
- true -> emqx_connector_ssl:clear_certs(TmpPath, Conf);
|
|
|
- false -> ok
|
|
|
- end.
|
|
|
-
|
|
|
-is_tmp_path_conf(TmpPath, #{certfile := Certfile}) ->
|
|
|
- is_tmp_path(TmpPath, Certfile);
|
|
|
-is_tmp_path_conf(TmpPath, #{keyfile := Keyfile}) ->
|
|
|
- is_tmp_path(TmpPath, Keyfile);
|
|
|
-is_tmp_path_conf(TmpPath, #{cacertfile := CaCertfile}) ->
|
|
|
- is_tmp_path(TmpPath, CaCertfile);
|
|
|
-is_tmp_path_conf(_TmpPath, _Conf) ->
|
|
|
- false.
|
|
|
-
|
|
|
-is_tmp_path(TmpPath, File) ->
|
|
|
- string:str(str(File), str(TmpPath)) > 0.
|
|
|
-
|
|
|
-str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
|
|
|
-str(Str) when is_list(Str) -> Str.
|
|
|
-
|
|
|
-bin(Bin) when is_binary(Bin) -> Bin;
|
|
|
-bin(Str) when is_list(Str) -> list_to_binary(Str);
|
|
|
-bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|