terry-xiaoyu 7 лет назад
Родитель
Сommit
2dc8ec8b11
5 измененных файлов с 53 добавлено и 21 удалено
  1. 7 3
      etc/emqx.conf
  2. 14 9
      priv/emqx.schema
  3. 6 1
      src/emqx_bridge.erl
  4. 17 7
      src/emqx_client.erl
  5. 9 1
      src/emqx_client_sock.erl

+ 7 - 3
etc/emqx.conf

@@ -1672,21 +1672,25 @@ bridge.aws.mqueue_type = memory
 ## Value: Number
 bridge.aws.max_pending_messages = 10000
 
+## Bribge to remote server via SSL.
+##
+## Value: on | off
+bridge.aws.ssl = off
 
 ## PEM-encoded CA certificates of the bridge.
 ##
 ## Value: File
-## bridge.aws.cacertfile = cacert.pem
+## bridge.aws.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
 
 ## SSL Certfile of the bridge.
 ##
 ## Value: File
-## bridge.aws.certfile = cert.pem
+## bridge.aws.certfile = {{ platform_etc_dir }}/certs/client-cert.pem
 
 ## SSL Keyfile of the bridge.
 ##
 ## Value: File
-## bridge.aws.keyfile = key.pem
+## bridge.aws.keyfile = {{ platform_etc_dir }}/certs/client-key.pem
 
 ## SSL Ciphers used by the bridge.
 ##

+ 14 - 9
priv/emqx.schema

@@ -1552,6 +1552,11 @@ end}.
   {datatype, string}
 ]}.
 
+{mapping, "bridge.$name.ssl", "emqx.bridges", [
+  {datatype, flag},
+  {default, off}
+]}.
+
 {mapping, "bridge.$name.cacertfile", "emqx.bridges", [
   {datatype, string}
 ]}.
@@ -1575,11 +1580,12 @@ end}.
 
 {mapping, "bridge.$name.keepalive", "emqx.bridges", [
   {default, "10s"},
-  {datatype, {duration, s}}
+  {datatype, {duration, ms}}
 ]}.
 
 {mapping, "bridge.$name.tls_versions", "emqx.bridges", [
-  {datatype, string}
+  {datatype, string},
+  {default, "tlsv1,tlsv1.1,tlsv1.2"}
 ]}.
 
 {mapping, "bridge.$name.subscription.$id.topic", "emqx.bridges", [
@@ -1597,12 +1603,11 @@ end}.
 
 {mapping, "bridge.$name.reconnect_interval", "emqx.bridges", [
   {default, "30s"},
-  {datatype, {duration, s}}
+  {datatype, {duration, ms}}
 ]}.
 
 
 {translation, "emqx.bridges", fun(Conf) ->
-
     Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,
 
     IsSsl = fun(cacertfile)   -> true;
@@ -1625,11 +1630,12 @@ end}.
                 case IsSsl(Opt) of
                     true ->
                         SslOpts = [Parse(Opt, Val)|proplists:get_value(ssl_opts, Opts, [])],
-                        lists:ukeymerge(1, [{ssl_opts, SslOpts}], Opts);
+                        lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts));
                     false ->
                         [{Opt, Val}|Opts]
                 end
             end,
+
     Subscriptions = fun(Name) ->
                             Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".subscription", Conf),
                             lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, "subscription", I, "topic"], Topic} <- Configs])],
@@ -1639,11 +1645,10 @@ end}.
     maps:to_list(
       lists:foldl(
         fun({["bridge", Name, Opt], Val}, Acc) ->
+                %% e.g #{aws => [{OptKey, OptVal}]}
+                Init = [{list_to_atom(Opt), Val},{subscriptions, Subscriptions(Name)}],
                 maps:update_with(list_to_atom(Name),
-                                 fun(Opts) ->
-                                         Merge(list_to_atom(Opt), Val, Opts)
-                                 end, [{list_to_atom(Opt), Val},
-                                       {subscriptions, Subscriptions(Name)}], Acc);
+                                 fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc);
            (_, Acc) -> Acc
         end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.", Conf))))
 

+ 6 - 1
src/emqx_bridge.erl

@@ -199,7 +199,8 @@ handle_info(start, State = #state{options = Options,
             [emqx_client:subscribe(ClientPid, {Topic, Qos}) || {Topic, Qos} <- Subs],
             [emqx_broker:subscribe(Topic) || Topic <- Forwards],
             {noreply, State#state{client_pid = ClientPid, subscriptions = Subs, forwards = Forwards}};
-        {error,_} ->
+        {error, Reason} ->
+            logger:error("[Bridge] start failed! error: ~p", [Reason]),
             erlang:send_after(ReconnectInterval, self(), start),
             {noreply, State}
     end;
@@ -285,6 +286,10 @@ options([{clean_start, CleanStart}| Options], Acc) ->
 options([{address, Address}| Options], Acc) ->
     {Host, Port} = address(Address),
     options(Options, [{host, Host}, {port, Port}|Acc]);
+options([{ssl, Ssl}| Options], Acc) ->
+    options(Options, [{ssl, Ssl}|Acc]);
+options([{ssl_opts, SslOpts}| Options], Acc) ->
+    options(Options, [{ssl_opts, SslOpts}|Acc]);
 options([_Option | Options], Acc) ->
     options(Options, Acc).
 

+ 17 - 7
src/emqx_client.erl

@@ -538,14 +538,24 @@ init([{hosts, Hosts} | Opts], State) ->
     init(Opts, State#state{hosts = Hosts1});
 init([{tcp_opts, TcpOpts} | Opts], State = #state{sock_opts = SockOpts}) ->
     init(Opts, State#state{sock_opts = emqx_misc:merge_opts(SockOpts, TcpOpts)});
-init([ssl | Opts], State = #state{sock_opts = SockOpts}) ->
-    ok = ssl:start(),
-    SockOpts1 = emqx_misc:merge_opts([{ssl_opts, []}], SockOpts),
-    init(Opts, State#state{sock_opts = SockOpts1});
+init([{ssl, EnableSsl} | Opts], State) ->
+    case lists:keytake(ssl_opts, 1, Opts) of
+        {value, SslOpts, WithOutSslOpts} ->
+            init([SslOpts, {ssl, EnableSsl}| WithOutSslOpts], State);
+        false ->
+            init([{ssl_opts, []}, {ssl, EnableSsl}| Opts], State)
+    end;
 init([{ssl_opts, SslOpts} | Opts], State = #state{sock_opts = SockOpts}) ->
-    ok = ssl:start(),
-    SockOpts1 = emqx_misc:merge_opts(SockOpts, [{ssl_opts, SslOpts}]),
-    init(Opts, State#state{sock_opts = SockOpts1});
+    case lists:keytake(ssl, 1, Opts) of
+        {value, {ssl, true}, WithOutEnableSsl} ->
+            ok = ssl:start(),
+            SockOpts1 = emqx_misc:merge_opts(SockOpts, [{ssl_opts, SslOpts}]),
+            init(WithOutEnableSsl, State#state{sock_opts = SockOpts1});
+        {value, {ssl, false}, WithOutEnableSsl} ->
+            init(WithOutEnableSsl, State);
+        false ->
+            init(Opts, State)
+    end;
 init([{client_id, ClientId} | Opts], State) ->
     init(Opts, State#state{client_id = iolist_to_binary(ClientId)});
 init([{clean_start, CleanStart} | Opts], State) when is_boolean(CleanStart) ->

+ 9 - 1
src/emqx_client_sock.erl

@@ -49,7 +49,10 @@ connect(Host, Port, SockOpts, Timeout) ->
     end.
 
 ssl_upgrade(Sock, SslOpts, Timeout) ->
-    case ssl:connect(Sock, SslOpts, Timeout) of
+    TlsVersions = proplists:get_value(versions, SslOpts, []),
+    Ciphers = proplists:get_value(ciphers, SslOpts, default_ciphers(TlsVersions)),
+    SslOpts2 = emqx_misc:merge_opts(SslOpts, [{ciphers, Ciphers}]),
+    case ssl:connect(Sock, SslOpts2, Timeout) of
         {ok, SslSock} ->
             ok = ssl:controlling_process(SslSock, self()),
             {ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
@@ -91,3 +94,8 @@ sockname(Sock) when is_port(Sock) ->
 sockname(#ssl_socket{ssl = SslSock}) ->
     ssl:sockname(SslSock).
 
+default_ciphers(TlsVersions) ->
+    lists:foldl(
+        fun(TlsVer, Ciphers) ->
+            Ciphers ++ ssl:cipher_suites(all, TlsVer)
+        end, [], TlsVersions).