|
@@ -739,8 +739,6 @@ options(Options, PoolName, ResId) ->
|
|
|
Topic ->
|
|
Topic ->
|
|
|
[{subscriptions, [{Topic, Get(<<"qos">>)}]} | Subscriptions]
|
|
[{subscriptions, [{Topic, Get(<<"qos">>)}]} | Subscriptions]
|
|
|
end,
|
|
end,
|
|
|
- %% TODO check why only ciphers are configurable but not versions
|
|
|
|
|
- TlsVersions = emqx_tls_lib:default_versions(),
|
|
|
|
|
[{address, binary_to_list(Address)},
|
|
[{address, binary_to_list(Address)},
|
|
|
{bridge_mode, GetD(<<"bridge_mode">>, true)},
|
|
{bridge_mode, GetD(<<"bridge_mode">>, true)},
|
|
|
{clean_start, true},
|
|
{clean_start, true},
|
|
@@ -751,15 +749,17 @@ options(Options, PoolName, ResId) ->
|
|
|
{username, str(Get(<<"username">>))},
|
|
{username, str(Get(<<"username">>))},
|
|
|
{password, str(Get(<<"password">>))},
|
|
{password, str(Get(<<"password">>))},
|
|
|
{proto_ver, mqtt_ver(Get(<<"proto_ver">>))},
|
|
{proto_ver, mqtt_ver(Get(<<"proto_ver">>))},
|
|
|
- {retry_interval, cuttlefish_duration:parse(str(GetD(<<"retry_interval">>, "30s")), s)},
|
|
|
|
|
- {ssl, cuttlefish_flag:parse(str(Get(<<"ssl">>)))},
|
|
|
|
|
- {ssl_opts, [ {versions, TlsVersions}
|
|
|
|
|
- , {ciphers, emqx_tls_lib:integral_ciphers(TlsVersions, Get(<<"ciphers">>))}
|
|
|
|
|
- | get_ssl_opts(Options, ResId)
|
|
|
|
|
- ]}
|
|
|
|
|
|
|
+ {retry_interval, cuttlefish_duration:parse(str(GetD(<<"retry_interval">>, "30s")), s)}
|
|
|
|
|
+ | maybe_ssl(Options, cuttlefish_flag:parse(str(Get(<<"ssl">>))), ResId)
|
|
|
] ++ Subscriptions1
|
|
] ++ Subscriptions1
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
|
|
+maybe_ssl(_Options, false, _ResId) ->
|
|
|
|
|
+ [{ssl, false}];
|
|
|
|
|
+maybe_ssl(Options, true, ResId) ->
|
|
|
|
|
+ Dir = filename:join([emqx:get_env(data_dir), "rule", ResId]),
|
|
|
|
|
+ [{ssl, true}, {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(Options, Dir)}].
|
|
|
|
|
+
|
|
|
mqtt_ver(ProtoVer) ->
|
|
mqtt_ver(ProtoVer) ->
|
|
|
case ProtoVer of
|
|
case ProtoVer of
|
|
|
<<"mqttv3">> -> v3;
|
|
<<"mqttv3">> -> v3;
|
|
@@ -772,43 +772,3 @@ format_subscriptions(SubOpts) ->
|
|
|
lists:map(fun(Sub) ->
|
|
lists:map(fun(Sub) ->
|
|
|
{maps:get(<<"topic">>, Sub), maps:get(<<"qos">>, Sub)}
|
|
{maps:get(<<"topic">>, Sub), maps:get(<<"qos">>, Sub)}
|
|
|
end, SubOpts).
|
|
end, SubOpts).
|
|
|
-
|
|
|
|
|
-get_ssl_opts(Opts, ResId) ->
|
|
|
|
|
- KeyFile = maps:get(<<"keyfile">>, Opts, undefined),
|
|
|
|
|
- CertFile = maps:get(<<"certfile">>, Opts, undefined),
|
|
|
|
|
- CAFile = case maps:get(<<"cacertfile">>, Opts, undefined) of
|
|
|
|
|
- undefined -> maps:get(<<"cafile">>, Opts, undefined);
|
|
|
|
|
- CAFile0 -> CAFile0
|
|
|
|
|
- end,
|
|
|
|
|
- Filter = fun(Opts1) ->
|
|
|
|
|
- [{K, V} || {K, V} <- Opts1,
|
|
|
|
|
- V =/= undefined,
|
|
|
|
|
- V =/= <<>>,
|
|
|
|
|
- V =/= "" ]
|
|
|
|
|
- end,
|
|
|
|
|
- Key = save_upload_file(KeyFile, ResId),
|
|
|
|
|
- Cert = save_upload_file(CertFile, ResId),
|
|
|
|
|
- CA = save_upload_file(CAFile, ResId),
|
|
|
|
|
- Verify = case maps:get(<<"verify">>, Opts, false) of
|
|
|
|
|
- false -> verify_none;
|
|
|
|
|
- true -> verify_peer
|
|
|
|
|
- end,
|
|
|
|
|
- case Filter([{keyfile, Key}, {certfile, Cert}, {cacertfile, CA}]) of
|
|
|
|
|
- [] -> [{verify, Verify}];
|
|
|
|
|
- SslOpts ->
|
|
|
|
|
- [{verify, Verify} | SslOpts]
|
|
|
|
|
- end.
|
|
|
|
|
-
|
|
|
|
|
-save_upload_file(#{<<"file">> := <<>>, <<"filename">> := <<>>}, _ResId) -> "";
|
|
|
|
|
-save_upload_file(FilePath, _) when is_binary(FilePath) -> binary_to_list(FilePath);
|
|
|
|
|
-save_upload_file(#{<<"file">> := File, <<"filename">> := FileName}, ResId) ->
|
|
|
|
|
- FullFilename = filename:join([emqx:get_env(data_dir), rules, ResId, FileName]),
|
|
|
|
|
- ok = filelib:ensure_dir(FullFilename),
|
|
|
|
|
- case file:write_file(FullFilename, File) of
|
|
|
|
|
- ok ->
|
|
|
|
|
- binary_to_list(FullFilename);
|
|
|
|
|
- {error, Reason} ->
|
|
|
|
|
- logger:error("Store file failed, ResId: ~p, ~0p", [ResId, Reason]),
|
|
|
|
|
- error({ResId, store_file_fail})
|
|
|
|
|
- end;
|
|
|
|
|
-save_upload_file(_, _) -> "".
|
|
|