Просмотр исходного кода

Change the start_link API for emqx_client

Prior to this change, emqx_client:start_link does 2 works in one call:
- init an erlang process for emqx_client
- send MQTT CONNECT to remote broker

But this solution have some drawbacks:

- the return value of `start_link` compiles the return values of the 2
 works: {ok, Pid, MqttResult}. It is inconsistent with the return value
 of `gen_statem:start_link`, may causes confusions.

- the return mode of the 2 works are different:
  `start_link` should always return {ok, Pid} or {error, Reason}, but
 connecting to mqtt may throw out exceptions as it handles the
 socket. But the caller couldn't have thought of the exception, he would
 pattern match on the result of `emqx_client:start_link`, but it crashed!

- If the init work succeed but the connection failed, the caller couldn't
get a Pid from the return value, but indeed it was created inside the
emqx_client. This hides the fact that the Pid was created, and when the
Pid dies, the caller would receive an message from a Pid it doesn' know about.

This change divived these 2 work into 2 APIs:
- `start_link/1` is to build and verify the options, and returns {ok,Pid}
 (on success) or {error, Reason} (on failure).
- `connect/1` is to send MQTT CONNECT, and returns {ok, MQTTResult::properties()} or
 {error, MQTTReason}. MQTT reason codes will contains in the `MQTTReason`.
terry-xiaoyu 7 лет назад
Родитель
Сommit
997958aed1
5 измененных файлов с 122 добавлено и 75 удалено
  1. 14 6
      etc/emqx.conf
  2. 17 14
      priv/emqx.schema
  3. 28 16
      src/emqx_bridge.erl
  4. 6 13
      src/emqx_client.erl
  5. 57 26
      test/emqx_client_SUITE.erl

+ 14 - 6
etc/emqx.conf

@@ -1612,7 +1612,11 @@ bridge.aws.client_id = bridge_aws
 ## The Clean start flag of a remote bridge.
 ##
 ## Value: boolean
-bridge.aws.clean_start = false
+## Default: true
+##
+## NOTE: Some IoT platforms require clean_start
+##       must be set to 'true'
+## bridge.aws.clean_start = true
 
 ## The username for a remote bridge.
 ##
@@ -1682,12 +1686,12 @@ bridge.aws.ssl = off
 ## Value: File
 ## bridge.aws.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
 
-## SSL Certfile of the bridge.
+## Client SSL Certfile of the bridge.
 ##
 ## Value: File
 ## bridge.aws.certfile = {{ platform_etc_dir }}/certs/client-cert.pem
 
-## SSL Keyfile of the bridge.
+## Client SSL Keyfile of the bridge.
 ##
 ## Value: File
 ## bridge.aws.keyfile = {{ platform_etc_dir }}/certs/client-key.pem
@@ -1745,7 +1749,11 @@ bridge.aws.ssl = off
 ## The Clean start flag of a remote bridge.
 ##
 ## Value: boolean
-## bridge.azure.clean_start = false
+## Default: true
+##
+## NOTE: Some IoT platforms require clean_start
+##       must be set to 'true'
+## bridge.azure.clean_start = true
 
 ## The username for a remote bridge.
 ##
@@ -1811,12 +1819,12 @@ bridge.aws.ssl = off
 ## Value: File
 ## bridge.azure.cacertfile = cacert.pem
 
-## SSL Certfile of the bridge.
+## Client SSL Certfile of the bridge.
 ##
 ## Value: File
 ## bridge.azure.certfile = cert.pem
 
-## SSL Keyfile of the bridge.
+## Client SSL Keyfile of the bridge.
 ##
 ## Value: File
 ## bridge.azure.keyfile = key.pem

+ 17 - 14
priv/emqx.schema

@@ -1549,7 +1549,8 @@ end}.
 ]}.
 
 {mapping, "bridge.$name.forwards", "emqx.bridges", [
-  {datatype, string}
+  {datatype, string},
+  {default, ""}
 ]}.
 
 {mapping, "bridge.$name.ssl", "emqx.bridges", [
@@ -1624,22 +1625,24 @@ end}.
                     {ciphers, Split(Ciphers)};
                (Opt, Val) ->
                     {Opt, Val}
-               end,
-
-    Merge = fun(Opt, Val, Opts) ->
-                case IsSsl(Opt) of
-                    true ->
-                        SslOpts = [Parse(Opt, Val)|proplists:get_value(ssl_opts, Opts, [])],
-                        lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts));
-                    false ->
-                        [{Opt, Val}|Opts]
-                end
+            end,
+
+    Merge = fun(forwards, Val, Opts) ->
+                  [{forwards, string:tokens(Val, ",")}|Opts];
+               (Opt, Val, Opts) ->
+                  case IsSsl(Opt) of
+                      true ->
+                          SslOpts = [Parse(Opt, Val)|proplists:get_value(ssl_opts, Opts, [])],
+                          lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts));
+                      false ->
+                          [{Opt, Val}|Opts]
+                  end
             end,
 
     Subscriptions = fun(Name) ->
-                            Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".subscription", Conf),
-                            lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, "subscription", I, "topic"], Topic} <- Configs])],
-                                      [QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, "subscription", I, "qos"], QoS} <- Configs])])
+                        Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".subscription", Conf),
+                        lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, "subscription", I, "topic"], Topic} <- Configs])],
+                                  [QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, "subscription", I, "qos"], QoS} <- Configs])])
                     end,
 
     maps:to_list(

+ 28 - 16
src/emqx_bridge.erl

@@ -188,20 +188,23 @@ handle_cast(Msg, State) ->
 %% start message bridge
 %%----------------------------------------------------------------
 handle_info(start, State = #state{options = Options,
-                                  client_pid = undefined,
-                                  reconnect_interval = ReconnectInterval}) ->
+                                  client_pid = undefined}) ->
     case emqx_client:start_link([{owner, self()}|options(Options)]) of
-        {ok, ClientPid, _} ->
-            Subs = [{i2b(Topic), Qos} || {Topic, Qos} <- get_value(subscriptions, Options, []),
-                                                         emqx_topic:validate({filter, i2b(Topic)})],
-            Forwards = [i2b(Topic) || Topic <- string:tokens(get_value(forwards, Options, ""), ","),
-                                               emqx_topic:validate({filter, i2b(Topic)})],
-            [emqx_client:subscribe(ClientPid, {Topic, Qos}) || {Topic, Qos} <- Subs],
-            [emqx_broker:subscribe(Topic) || Topic <- Forwards],
-            {noreply, State#state{client_pid = ClientPid, subscriptions = Subs, forwards = Forwards}};
+        {ok, ClientPid} ->
+            case emqx_client:connect(ClientPid) of
+                {ok, _} ->
+                    emqx_logger:info("[Bridge] connected to remote sucessfully"),
+                    Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])),
+                    Forwards = subscribe_local_topics(get_value(forwards, Options, [])),
+                    {noreply, State#state{client_pid = ClientPid,
+                                          subscriptions = Subs,
+                                          forwards = Forwards}};
+                {error, Reason} ->
+                    emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]),
+                    {noreply, State#state{client_pid = ClientPid}}
+            end;
         {error, Reason} ->
-            logger:error("[Bridge] start failed! error: ~p", [Reason]),
-            erlang:send_after(ReconnectInterval, self(), start),
+            emqx_logger:error("[Bridge] start failed! error: ~p", [Reason]),
             {noreply, State}
     end;
 
@@ -219,7 +222,7 @@ handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{r
         {ok, PkgId} ->
             {noreply, State#state{queue = store(MqueueType, {PkgId, Msg}, Queue, MaxPendingMsg)}};
         {error, Reason} ->
-            emqx_logger:error("Publish fail:~p", [Reason]),
+            emqx_logger:error("[Bridge] Publish fail:~p", [Reason]),
             {noreply, State}
     end;
 
@@ -241,11 +244,12 @@ handle_info({puback, #{packet_id := PkgId}}, State = #state{queue = Queue, mqueu
     {noreply, State#state{queue = delete(MqueueType, PkgId, Queue)}};
 
 handle_info({'EXIT', Pid, normal}, State = #state{client_pid = Pid}) ->
+    emqx_logger:warning("[Bridge] stop ~p", [normal]),
     {noreply, State#state{client_pid = undefined}};
 
 handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = Pid,
                                                   reconnect_interval = ReconnectInterval}) ->
-    lager:warning("emqx bridge stop reason:~p", [Reason]),
+    emqx_logger:error("[Bridge] stop ~p", [Reason]),
     erlang:send_after(ReconnectInterval, self(), start),
     {noreply, State#state{client_pid = undefined}};
 
@@ -259,6 +263,14 @@ terminate(_Reason, #state{}) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
+subscribe_remote_topics(ClientPid, Subscriptions) ->
+    [begin emqx_client:subscribe(ClientPid, {bin(Topic), Qos}), {bin(Topic), Qos} end
+        || {Topic, Qos} <- Subscriptions, emqx_topic:validate({filter, bin(Topic)})].
+
+subscribe_local_topics(Topics) ->
+    [begin emqx_broker:subscribe(bin(Topic)), bin(Topic) end
+        || Topic <- Topics, emqx_topic:validate({filter, bin(Topic)})].
+
 proto_ver(mqttv3) -> v3;
 proto_ver(mqttv4) -> v4;
 proto_ver(mqttv5) -> v5.
@@ -296,7 +308,7 @@ options([_Option | Options], Acc) ->
 name(Id) ->
     list_to_atom(lists:concat([?MODULE, "_", Id])).
 
-i2b(L) -> iolist_to_binary(L).
+bin(L) -> iolist_to_binary(L).
 
 mountpoint(undefined, Topic) ->
     Topic;
@@ -306,7 +318,7 @@ mountpoint(Prefix, Topic) ->
 format_mountpoint(undefined) ->
     undefined;
 format_mountpoint(Prefix) ->
-    binary:replace(i2b(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).
+    binary:replace(bin(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).
 
 store(memory, Data, Queue, MaxPendingMsg) when length(Queue) =< MaxPendingMsg ->
     [Data | Queue];

+ 6 - 13
src/emqx_client.erl

@@ -21,6 +21,7 @@
 -export([start_link/0, start_link/1]).
 -export([request/5, request/6, request_async/7, receive_response/3]).
 -export([set_request_handler/2, sub_request_topic/3, sub_request_topic/4]).
+-export([connect/1]).
 -export([subscribe/2, subscribe/3, subscribe/4]).
 -export([publish/2, publish/3, publish/4, publish/5]).
 -export([unsubscribe/2, unsubscribe/3]).
@@ -200,18 +201,11 @@ start_link(Options) when is_map(Options) ->
 start_link(Options) when is_list(Options) ->
     ok  = emqx_mqtt_props:validate(
             proplists:get_value(properties, Options, #{})),
-    case start_client(with_owner(Options)) of
-        {ok, Client} ->
-            connect(Client);
-        Error -> Error
-    end.
-
-start_client(Options) ->
     case proplists:get_value(name, Options) of
         undefined ->
-            gen_statem:start_link(?MODULE, [Options], []);
+            gen_statem:start_link(?MODULE, [with_owner(Options)], []);
         Name when is_atom(Name) ->
-            gen_statem:start_link({local, Name}, ?MODULE, [Options], [])
+            gen_statem:start_link({local, Name}, ?MODULE, [with_owner(Options)], [])
     end.
 
 with_owner(Options) ->
@@ -220,8 +214,7 @@ with_owner(Options) ->
         undefined -> [{owner, self()} | Options]
     end.
 
-%% @private
--spec(connect(client()) -> {ok, client(), properties()} | {error, term()}).
+-spec(connect(client()) -> {ok, properties()} | {error, term()}).
 connect(Client) ->
     gen_statem:call(Client, connect, infinity).
 
@@ -692,7 +685,7 @@ waiting_for_connack(cast, ?CONNACK_PACKET(?RC_SUCCESS,
                             undefined -> AllProps;
                             _ -> maps:merge(AllProps, Properties)
                         end,
-            Reply = {ok, self(), Properties},
+            Reply = {ok, Properties},
             State2 = State1#state{client_id = assign_id(ClientId, AllProps1),
                                   properties = AllProps1,
                                   session_present = SessPresent},
@@ -1004,7 +997,7 @@ handle_event(info, {Error, _Sock, Reason}, _StateName, State)
 
 handle_event(info, {Closed, _Sock}, _StateName, State)
     when Closed =:= tcp_closed; Closed =:= ssl_closed ->
-    {stop, Closed, State};
+    {stop, {shutdown, Closed}, State};
 
 handle_event(info, {'EXIT', Owner, Reason}, _, #state{owner = Owner}) ->
     {stop, Reason};

+ 57 - 26
test/emqx_client_SUITE.erl

@@ -59,20 +59,26 @@ end_per_suite(_Config) ->
     emqx_ct_broker_helpers:run_teardown_steps().
 
 request_response_exception(QoS) ->
-    {ok, Client, _} = emqx_client:start_link([{proto_ver, v5},
-                                                 {properties, #{ 'Request-Response-Information' => 0 }}]),
+    {ok, Client} = emqx_client:start_link([{proto_ver, v5},
+                                           {properties, #{ 'Request-Response-Information' => 0 }}]),
+    {ok, _} = emqx_client:connect(Client),
     ?assertError(no_response_information,
                  emqx_client:sub_request_topic(Client, QoS, <<"request_topic">>)),
     ok = emqx_client:disconnect(Client).
 
 request_response_per_qos(QoS) ->
-    {ok, Requester, _} = emqx_client:start_link([{proto_ver, v5},
-                                                 {client_id, <<"requester">>},
-                                                 {properties, #{ 'Request-Response-Information' => 1}}]),
-    {ok, Responser, _} = emqx_client:start_link([{proto_ver, v5},
-                                                 {client_id, <<"responser">>},
-                                                 {properties, #{ 'Request-Response-Information' => 1}},
-                                                 {request_handler, fun(_Req) -> <<"ResponseTest">> end}]),
+    {ok, Requester} = emqx_client:start_link([{proto_ver, v5},
+                                              {client_id, <<"requester">>},
+                                              {properties, #{ 'Request-Response-Information' => 1}}]),
+    {ok, _} = emqx_client:connect(Requester),
+    {ok, Responser} = emqx_client:start_link([{proto_ver, v5},
+                                              {client_id, <<"responser">>},
+                                              {properties, #{
+                                                'Request-Response-Information' => 1}},
+                                              {request_handler,
+                                                fun(_Req) -> <<"ResponseTest">> end}
+                                             ]),
+    {ok, _} = emqx_client:connect(Responser),
     ok = emqx_client:sub_request_topic(Responser, QoS, <<"request_topic">>),
     {ok, <<"ResponseTest">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"request_payload">>, QoS),
     ok = emqx_client:set_request_handler(Responser, fun(<<"request_payload">>) ->
@@ -108,9 +114,15 @@ share_sub_request_topic_per_qos(QoS) ->
                              {client_id, atom_to_binary(ClientId, utf8)},
                              {properties, Properties}
                             ] end,
-    {ok, Requester, _} = emqx_client:start_link(Opts(requester)),
-    {ok, Responser1, _} = emqx_client:start_link([{request_handler, fun(Req) -> <<"1-", Req/binary>> end} | Opts(requester1)]),
-    {ok, Responser2, _} = emqx_client:start_link([{request_handler, fun(Req) -> <<"2-", Req/binary>> end} | Opts(requester2)]),
+    {ok, Requester} = emqx_client:start_link(Opts(requester)),
+    {ok, _} = emqx_client:connect(Requester),
+
+    {ok, Responser1} = emqx_client:start_link([{request_handler, fun(Req) -> <<"1-", Req/binary>> end} | Opts(requester1)]),
+    {ok, _} = emqx_client:connect(Responser1),
+
+    {ok, Responser2} = emqx_client:start_link([{request_handler, fun(Req) -> <<"2-", Req/binary>> end} | Opts(requester2)]),
+    {ok, _} = emqx_client:connect(Responser2),
+
     ok = emqx_client:sub_request_topic(Responser1, QoS, ReqTopic, Group),
     ok = emqx_client:sub_request_topic(Responser2, QoS, ReqTopic, Group),
     %% Send a request, wait for response, validate response then return responser ID
@@ -148,7 +160,9 @@ receive_messages(Count, Msgs) ->
 basic_test(_Config) ->
     Topic = nth(1, ?TOPICS),
     ct:print("Basic test starting"),
-    {ok, C, _} = emqx_client:start_link(),
+    {ok, C} = emqx_client:start_link(),
+    {ok, _} = emqx_client:connect(C),
+
     {ok, _, [2]} = emqx_client:subscribe(C, Topic, qos2),
     {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
     {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
@@ -157,11 +171,15 @@ basic_test(_Config) ->
     ok = emqx_client:disconnect(C).
 
 will_message_test(_Config) ->
-    {ok, C1, _} = emqx_client:start_link([{clean_start, true},
+    {ok, C1} = emqx_client:start_link([{clean_start, true},
                                           {will_topic, nth(3, ?TOPICS)},
                                           {will_payload, <<"client disconnected">>},
                                           {keepalive, 2}]),
-    {ok, C2, _} = emqx_client:start_link(),
+    {ok, _} = emqx_client:connect(C1),
+
+    {ok, C2} = emqx_client:start_link(),
+    {ok, _} = emqx_client:connect(C2),
+
     {ok, _, [2]} = emqx_client:subscribe(C2, nth(3, ?TOPICS), 2),
     timer:sleep(10),
     ok = emqx_client:stop(C1),
@@ -171,26 +189,33 @@ will_message_test(_Config) ->
     ct:print("Will message test succeeded").
 
 offline_message_queueing_test(_) ->
-    {ok, C1, _} = emqx_client:start_link([{clean_start, false},
-                                          {client_id, <<"c1">>}]),
+    {ok, C1} = emqx_client:start_link([{clean_start, false},
+                                       {client_id, <<"c1">>}]),
+    {ok, _} = emqx_client:connect(C1),
+
     {ok, _, [2]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2),
     ok = emqx_client:disconnect(C1),
-    {ok, C2, _} = emqx_client:start_link([{clean_start, true},
-                                          {client_id, <<"c2">>}]),
+    {ok, C2} = emqx_client:start_link([{clean_start, true},
+                                       {client_id, <<"c2">>}]),
+    {ok, _} = emqx_client:connect(C2),
 
     ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0),
     {ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"qos 1">>, 1),
     {ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2),
     timer:sleep(10),
     emqx_client:disconnect(C2),
-    {ok, C3, _} = emqx_client:start_link([{clean_start, false},
+    {ok, C3} = emqx_client:start_link([{clean_start, false},
                                           {client_id, <<"c1">>}]),
+    {ok, _} = emqx_client:connect(C3),
+
     timer:sleep(10),
     emqx_client:disconnect(C3),
     ?assertEqual(3, length(receive_messages(3))).
 
 overlapping_subscriptions_test(_) ->
-    {ok, C, _} = emqx_client:start_link([]),
+    {ok, C} = emqx_client:start_link([]),
+    {ok, _} = emqx_client:connect(C),
+
     {ok, _, [2, 1]} = emqx_client:subscribe(C, [{nth(7, ?WILD_TOPICS), 2},
                                                 {nth(1, ?WILD_TOPICS), 1}]),
     timer:sleep(10),
@@ -228,8 +253,10 @@ overlapping_subscriptions_test(_) ->
 
 redelivery_on_reconnect_test(_) ->
     ct:print("Redelivery on reconnect test starting"),
-    {ok, C1, _} = emqx_client:start_link([{clean_start, false},
-                                          {client_id, <<"c">>}]),
+    {ok, C1} = emqx_client:start_link([{clean_start, false},
+                                       {client_id, <<"c">>}]),
+    {ok, _} = emqx_client:connect(C1),
+
     {ok, _, [2]} = emqx_client:subscribe(C1, nth(7, ?WILD_TOPICS), 2),
     timer:sleep(10),
     ok = emqx_client:pause(C1),
@@ -240,8 +267,10 @@ redelivery_on_reconnect_test(_) ->
     timer:sleep(10),
     ok = emqx_client:disconnect(C1),
     ?assertEqual(0, length(receive_messages(2))),
-    {ok, C2, _} = emqx_client:start_link([{clean_start, false},
+    {ok, C2} = emqx_client:start_link([{clean_start, false},
                                           {client_id, <<"c">>}]),
+    {ok, _} = emqx_client:connect(C2),
+
     timer:sleep(10),
     ok = emqx_client:disconnect(C2),
     ?assertEqual(2, length(receive_messages(2))).
@@ -255,8 +284,10 @@ redelivery_on_reconnect_test(_) ->
 
 dollar_topics_test(_) ->
     ct:print("$ topics test starting"),
-    {ok, C, _} = emqx_client:start_link([{clean_start, true},
-                                         {keepalive, 0}]),
+    {ok, C} = emqx_client:start_link([{clean_start, true},
+                                      {keepalive, 0}]),
+    {ok, _} = emqx_client:connect(C),
+
     {ok, _, [1]} = emqx_client:subscribe(C, nth(6, ?WILD_TOPICS), 1),
     {ok, _} = emqx_client:publish(C, << <<"$">>/binary, (nth(2, ?TOPICS))/binary>>,
                                   <<"test">>, [{qos, 1}, {retain, false}]),