Просмотр исходного кода

Merge pull request #6481 from terry-xiaoyu/fix_rule_apis

fix(connector): some desc for connector APIs
Shawn 4 лет назад
Родитель
Сommit
efb0365cdd

+ 5 - 4
apps/emqx_bridge/etc/emqx_bridge.conf

@@ -9,10 +9,10 @@
 #    direction = ingress
 #    ## topic mappings for this bridge
 #    remote_topic = "aws/#"
-#    subscribe_qos = 1
+#    remote_qos = 1
 #    local_topic = "from_aws/${topic}"
+#    local_qos = "${qos}"
 #    payload = "${payload}"
-#    qos = "${qos}"
 #    retain = "${retain}"
 #}
 #
@@ -23,14 +23,15 @@
 #    ## topic mappings for this bridge
 #    local_topic = "emqx/#"
 #    remote_topic = "from_emqx/${topic}"
+#    remote_qos = "${qos}"
 #    payload = "${payload}"
-#    qos = 1
 #    retain = false
 #}
 #
 ## HTTP bridges to an HTTP server
 #bridges.http.my_http_bridge {
 #    enable = true
+#    direction = egress
 #    ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url
 #    url = "http://localhost:9901/messages/${topic}"
 #    request_timeout = "30s"
@@ -47,7 +48,7 @@
 #        cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
 #    }
 #
-#    from_local_topic = "emqx_http/#"
+#    local_topic = "emqx_http/#"
 #    ## the following config entries can use placehodler variables:
 #    ##   url, method, body, headers
 #    method = post

+ 38 - 21
apps/emqx_bridge/src/emqx_bridge.erl

@@ -68,14 +68,9 @@ load_hook(Bridges) ->
         end, maps:to_list(Bridges)).
 
 do_load_hook(#{local_topic := _} = Conf) ->
-    case maps:find(direction, Conf) of
-        error ->
-            %% this bridge has no direction field, it means that it has only egress bridges
-            emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []});
-        {ok, egress} ->
-            emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []});
-        {ok, ingress} ->
-            ok
+    case maps:get(direction, Conf, egress) of
+        egress -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []});
+        ingress -> ok
     end;
 do_load_hook(_Conf) -> ok.
 
@@ -111,13 +106,15 @@ bridge_type(emqx_connector_http) -> http.
 post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
     #{added := Added, removed := Removed, changed := Updated}
         = diff_confs(NewConf, OldConf),
-    _ = perform_bridge_changes([
+    %% The config update will be failed if any task in `perform_bridge_changes` failed.
+    Result = perform_bridge_changes([
         {fun remove/3, Removed},
         {fun create/3, Added},
         {fun update/3, Updated}
     ]),
     ok = unload_hook(),
-    ok = load_hook(NewConf).
+    ok = load_hook(NewConf),
+    Result.
 
 perform_bridge_changes(Tasks) ->
     perform_bridge_changes(Tasks, ok).
@@ -195,13 +192,13 @@ create(Type, Name, Conf) ->
     ?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
         config => Conf}),
     case emqx_resource:create_local(resource_id(Type, Name), emqx_bridge:resource_type(Type),
-            parse_confs(Type, Name, Conf)) of
+            parse_confs(Type, Name, Conf), #{force_create => true}) of
         {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf);
         {ok, _} -> maybe_disable_bridge(Type, Name, Conf);
         {error, Reason} -> {error, Reason}
     end.
 
-update(Type, Name, {_OldConf, Conf}) ->
+update(Type, Name, {OldConf, Conf}) ->
     %% TODO: sometimes its not necessary to restart the bridge connection.
     %%
     %% - if the connection related configs like `servers` is updated, we should restart/start
@@ -210,11 +207,22 @@ update(Type, Name, {_OldConf, Conf}) ->
     %% the `method` or `headers` of a HTTP bridge is changed, then the bridge can be updated
     %% without restarting the bridge.
     %%
-    ?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
-        config => Conf}),
-    case recreate(Type, Name, Conf) of
-        {ok, _} -> maybe_disable_bridge(Type, Name, Conf);
-        {error, _} = Err -> Err
+    case if_only_to_toggole_enable(OldConf, Conf) of
+        false ->
+            ?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
+                config => Conf}),
+            case recreate(Type, Name, Conf) of
+                {ok, _} -> maybe_disable_bridge(Type, Name, Conf);
+                {error, not_found} ->
+                    ?SLOG(warning, #{ msg => "updating a non-exist bridge, create a new one"
+                                    , type => Type, name => Name, config => Conf}),
+                    create(Type, Name, Conf);
+                {error, Reason} -> {update_bridge_failed, Reason}
+            end;
+        true ->
+            %% we don't need to recreate the bridge if this config change is only to
+            %% toggole the config 'bridge.{type}.{name}.enable'
+            ok
     end.
 
 recreate(Type, Name) ->
@@ -263,10 +271,8 @@ get_matched_bridges(Topic) ->
             (_BName, #{direction := ingress}, Acc1) ->
                 Acc1;
             (BName, #{direction := egress} = Egress, Acc1) ->
-                get_matched_bridge_id(Egress, Topic, BType, BName, Acc1);
-            %% HTTP, MySQL bridges only have egress direction
-            (BName, BridgeConf, Acc1) ->
-                get_matched_bridge_id(BridgeConf, Topic, BType, BName, Acc1)
+                %% HTTP, MySQL bridges only have egress direction
+                get_matched_bridge_id(Egress, Topic, BType, BName, Acc1)
         end, Acc0, Conf)
     end, [], Bridges).
 
@@ -338,6 +344,17 @@ maybe_disable_bridge(Type, Name, Conf) ->
         true -> ok
     end.
 
+if_only_to_toggole_enable(OldConf, Conf) ->
+    #{added := Added, removed := Removed, changed := Updated} =
+        emqx_map_lib:diff_maps(OldConf, Conf),
+    case {Added, Removed, Updated} of
+        {Added, Removed, #{enable := _}= Updated}
+            when map_size(Added) =:= 0,
+                 map_size(Removed) =:= 0,
+                 map_size(Updated) =:= 1 -> true;
+        {_, _, _} -> false
+    end.
+
 bin(Bin) when is_binary(Bin) -> Bin;
 bin(Str) when is_list(Str) -> list_to_binary(Str);
 bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).

+ 16 - 4
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -134,7 +134,12 @@ method_example(Type, Direction, get) ->
     #{
         id => bin(SType ++ ":" ++ SName),
         type => bin(SType),
-        name => bin(SName)
+        name => bin(SName),
+        metrics => ?METRICS(0, 0, 0, 0, 0, 0),
+        node_metrics => [
+            #{node => node(),
+              metrics => ?METRICS(0, 0, 0, 0, 0, 0)}
+        ]
     };
 method_example(Type, Direction, post) ->
     SType = atom_to_list(Type),
@@ -269,7 +274,8 @@ schema("/bridges/:id/operation/:operation") ->
         }
     }.
 
-'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf}) ->
+'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf0}) ->
+    Conf = filter_out_request_body(Conf0),
     BridgeName = maps:get(<<"name">>, Conf, emqx_misc:gen_id()),
     case emqx_bridge:lookup(BridgeType, BridgeName) of
         {ok, _} ->
@@ -291,7 +297,8 @@ list_local_bridges(Node) ->
 '/bridges/:id'(get, #{bindings := #{id := Id}}) ->
     ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200));
 
-'/bridges/:id'(put, #{bindings := #{id := Id}, body := Conf}) ->
+'/bridges/:id'(put, #{bindings := #{id := Id}, body := Conf0}) ->
+    Conf = filter_out_request_body(Conf0),
     ?TRY_PARSE_ID(Id,
         case emqx_bridge:lookup(BridgeType, BridgeName) of
             {ok, _} ->
@@ -334,7 +341,7 @@ lookup_from_local_node(BridgeType, BridgeName) ->
         invalid -> {404, error_msg('BAD_ARG', <<"invalid operation">>)};
         UpReq ->
             case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
-                    UpReq, #{override_to => cluster}) of
+                    {UpReq, BridgeType, BridgeName}, #{override_to => cluster}) of
                 {ok, _} -> {200};
                 {error, {pre_config_update, _, bridge_not_found}} ->
                     {404, error_msg('NOT_FOUND', <<"bridge not found">>)};
@@ -423,6 +430,11 @@ rpc_multicall(Func, Args) ->
         ErrL -> {error, ErrL}
     end.
 
+filter_out_request_body(Conf) ->
+    ExtraConfs = [<<"id">>, <<"status">>, <<"node_status">>, <<"node_metrics">>,
+        <<"metrics">>, <<"node">>],
+    maps:without(ExtraConfs, Conf).
+
 rpc_call(Node, Fun, Args) ->
     rpc_call(Node, ?MODULE, Fun, Args).
 

+ 12 - 3
apps/emqx_bridge/src/emqx_bridge_app.erl

@@ -40,10 +40,15 @@ stop(_State) ->
     ok.
 
 -define(IS_OPER(O), when Oper == start; Oper == stop; Oper == restart).
-pre_config_update(_, Oper, undefined) ?IS_OPER(Oper) ->
+pre_config_update(_, {Oper, _, _}, undefined) ?IS_OPER(Oper) ->
     {error, bridge_not_found};
-pre_config_update(_, Oper, OldConfig) ?IS_OPER(Oper) ->
-    {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
+pre_config_update(_, {Oper, Type, Name}, OldConfig) ?IS_OPER(Oper) ->
+    case perform_operation(Oper, Type, Name) of
+        ok ->
+            %% we also need to save the 'enable' to the config files
+            {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
+        {error, _} = Err -> Err
+    end;
 pre_config_update(_, Conf, _OldConfig) ->
     {ok, Conf}.
 
@@ -51,3 +56,7 @@ pre_config_update(_, Conf, _OldConfig) ->
 operation_to_enable(start) -> true;
 operation_to_enable(stop) -> false;
 operation_to_enable(restart) -> true.
+
+perform_operation(start, Type, Name) -> emqx_bridge:restart(Type, Name);
+perform_operation(restart, Type, Name) -> emqx_bridge:restart(Type, Name);
+perform_operation(stop, Type, Name) -> emqx_bridge:stop(Type, Name).

+ 6 - 2
apps/emqx_bridge/src/emqx_bridge_http_schema.erl

@@ -76,14 +76,18 @@ fields("put") ->
 
 fields("get") ->
     [ id_field()
-    ] ++ fields("post").
+    ] ++ emqx_bridge_schema:metrics_status_fields() ++ fields("post").
 
 basic_config() ->
     [ {enable,
         mk(boolean(),
-           #{ desc =>"Enable or disable this bridge"
+           #{ desc => "Enable or disable this bridge"
             , default => true
             })}
+    , {direction,
+        mk(egress,
+           #{ desc => "The direction of this bridge, MUST be egress"
+            })}
     ]
     ++ proplists:delete(base_url, emqx_connector_http:fields(config)).
 

+ 2 - 2
apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl

@@ -38,10 +38,10 @@ fields("put_egress") ->
 
 fields("get_ingress") ->
     [ id_field()
-    ] ++ fields("post_ingress");
+    ] ++ emqx_bridge_schema:metrics_status_fields() ++ fields("post_ingress");
 fields("get_egress") ->
     [ id_field()
-    ] ++ fields("post_egress").
+    ] ++ emqx_bridge_schema:metrics_status_fields() ++ fields("post_egress").
 
 %%======================================================================================
 id_field() ->

+ 49 - 4
apps/emqx_bridge/src/emqx_bridge_schema.erl

@@ -12,6 +12,7 @@
         ]).
 
 -export([ common_bridge_fields/0
+        , metrics_status_fields/0
         , direction_field/2
         ]).
 
@@ -48,14 +49,25 @@ common_bridge_fields() ->
     , {connector,
         mk(binary(),
            #{ nullable => false
+            , example => <<"mqtt:my_mqtt_connector">>
             , desc =>"""
-The connector name to be used for this bridge.
-Connectors are configured as 'connectors.{type}.{name}',
-for example 'connectors.http.mybridge'.
+The connector Id to be used for this bridge. Connector Ids must be of format: '{type}:{name}'.<br>
+In config files, you can find the corresponding config entry for a connector by such path: 'connectors.{type}.{name}'.<br>
 """
             })}
     ].
 
+metrics_status_fields() ->
+    [ {"metrics", mk(ref(?MODULE, "metrics"), #{desc => "The metrics of the bridge"})}
+    , {"node_metrics", mk(hoconsc:array(ref(?MODULE, "node_metrics")),
+        #{ desc => "The metrics of the bridge for each node"
+         })}
+    , {"status", mk(ref(?MODULE, "status"), #{desc => "The status of the bridge"})}
+    , {"node_status", mk(hoconsc:array(ref(?MODULE, "node_status")),
+        #{ desc => "The status of the bridge for each node"
+         })}
+    ].
+
 direction_field(Dir, Desc) ->
     {direction, mk(Dir,
         #{ nullable => false
@@ -72,7 +84,40 @@ fields(bridges) ->
     ++ [{T, mk(hoconsc:map(name, hoconsc:union([
             ref(schema_mod(T), "ingress"),
             ref(schema_mod(T), "egress")
-        ])), #{})} || T <- ?CONN_TYPES].
+        ])), #{})} || T <- ?CONN_TYPES];
+
+fields("metrics") ->
+    [ {"matched", mk(integer(), #{desc => "Count of this bridge is queried"})}
+    , {"success", mk(integer(), #{desc => "Count of query success"})}
+    , {"failed", mk(integer(), #{desc => "Count of query failed"})}
+    , {"rate", mk(float(), #{desc => "The rate of matched, times/second"})}
+    , {"rate_max", mk(float(), #{desc => "The max rate of matched, times/second"})}
+    , {"rate_last5m", mk(float(),
+        #{desc => "The average rate of matched in last 5 mins, times/second"})}
+    ];
+
+fields("node_metrics") ->
+    [ node_name()
+    , {"metrics", mk(ref(?MODULE, "metrics"), #{})}
+    ];
+
+fields("status") ->
+    [ {"matched", mk(integer(), #{desc => "Count of this bridge is queried"})}
+    , {"success", mk(integer(), #{desc => "Count of query success"})}
+    , {"failed", mk(integer(), #{desc => "Count of query failed"})}
+    , {"rate", mk(float(), #{desc => "The rate of matched, times/second"})}
+    , {"rate_max", mk(float(), #{desc => "The max rate of matched, times/second"})}
+    , {"rate_last5m", mk(float(),
+        #{desc => "The average rate of matched in last 5 mins, times/second"})}
+    ];
+
+fields("node_status") ->
+    [ node_name()
+    , {"status", mk(ref(?MODULE, "status"), #{})}
+    ].
+
+node_name() ->
+    {"node", mk(binary(), #{desc => "The node name", example => "emqx@127.0.0.1"})}.
 
 schema_mod(Type) ->
     list_to_atom(lists:concat(["emqx_bridge_", Type, "_schema"])).

+ 9 - 4
apps/emqx_connector/src/emqx_connector_api.erl

@@ -38,9 +38,9 @@
             _ = ConnName,
             EXPR
     catch
-        error:{invalid_bridge_id, Id0} ->
-            {400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary,
-                ". Bridge Ids must be of format {type}:{name}">>}}
+        error:{invalid_connector_id, Id0} ->
+            {400, #{code => 'INVALID_ID', message => <<"invalid_connector_id: ", Id0/binary,
+                ". Connector Ids must be of format {type}:{name}">>}}
     end).
 
 namespace() -> "connector".
@@ -234,7 +234,8 @@ schema("/connectors/:id") ->
                 {404, error_msg('NOT_FOUND', <<"connector not found">>)}
         end);
 
-'/connectors/:id'(put, #{bindings := #{id := Id}, body := Params}) ->
+'/connectors/:id'(put, #{bindings := #{id := Id}, body := Params0}) ->
+    Params = filter_out_request_body(Params0),
     ?TRY_PARSE_ID(Id,
         case emqx_connector:lookup(ConnType, ConnName) of
             {ok, _} ->
@@ -277,5 +278,9 @@ format_resp(ConnId, RawConf) ->
         <<"num_of_bridges">> => NumOfBridges
     }.
 
+filter_out_request_body(Conf) ->
+    ExtraConfs = [<<"num_of_bridges">>, <<"type">>, <<"name">>],
+    maps:without(ExtraConfs, Conf).
+
 bin(S) when is_list(S) ->
     list_to_binary(S).

+ 12 - 7
apps/emqx_connector/src/emqx_connector_http.erl

@@ -169,6 +169,7 @@ on_start(InstId, #{base_url := #{scheme := Scheme,
         pool_name => PoolName,
         host => Host,
         port => Port,
+        connect_timeout => ConnectTimeout,
         base_path => BasePath,
         request => preprocess_request(maps:get(request, Config, undefined))
     },
@@ -216,13 +217,17 @@ on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery,
     end,
     Result.
 
-on_health_check(_InstId, #{host := Host, port := Port} = State) ->
-    case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), 3000) of
-        {ok, Sock} ->
-            gen_tcp:close(Sock),
-            {ok, State};
-        {error, _Reason} ->
-            {error, test_query_failed, State}
+on_health_check(_InstId, #{host := Host, port := Port, connect_timeout := Timeout} = State) ->
+    case do_health_check(Host, Port, Timeout) of
+        ok -> {ok, State};
+        {error, Reason} ->
+            {error, {http_health_check_failed, Reason}, State}
+    end.
+
+do_health_check(Host, Port, Timeout) ->
+    case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of
+        {ok, Sock} -> gen_tcp:close(Sock), ok;
+        {error, Reason} -> {error, Reason}
     end.
 
 %%--------------------------------------------------------------------

+ 10 - 5
apps/emqx_connector/src/emqx_connector_mqtt.erl

@@ -129,12 +129,11 @@ on_start(InstId, Conf) ->
     },
     case ?MODULE:create_bridge(BridgeConf) of
         {ok, _Pid} ->
-            case emqx_connector_mqtt_worker:ensure_started(InstanceId) of
-                ok -> {ok, #{name => InstanceId}};
-                {error, Reason} -> {error, Reason}
-            end;
+            ensure_mqtt_worker_started(InstanceId);
         {error, {already_started, _Pid}} ->
-            {ok, #{name => InstanceId}};
+            ok = ?MODULE:drop_bridge(InstanceId),
+            {ok, _} = ?MODULE:create_bridge(BridgeConf),
+            ensure_mqtt_worker_started(InstanceId);
         {error, Reason} ->
             {error, Reason}
     end.
@@ -162,6 +161,12 @@ on_health_check(_InstId, #{name := InstanceId} = State) ->
         _ -> {error, {connector_down, InstanceId}, State}
     end.
 
+ensure_mqtt_worker_started(InstanceId) ->
+    case emqx_connector_mqtt_worker:ensure_started(InstanceId) of
+        ok -> {ok, #{name => InstanceId}};
+        {error, Reason} -> {error, Reason}
+    end.
+
 make_sub_confs(EmptyMap) when map_size(EmptyMap) == 0 ->
     undefined;
 make_sub_confs(undefined) ->

+ 3 - 0
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl

@@ -84,6 +84,9 @@ stop(#{client_pid := Pid}) ->
     safe_stop(Pid, fun() -> emqtt:stop(Pid) end, 1000),
     ok.
 
+ping(undefined) ->
+    pang;
+
 ping(#{client_pid := Pid}) ->
     emqtt:ping(Pid).
 

+ 46 - 1
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -379,7 +379,7 @@ t_mqtt_conn_update(_) ->
     %% then we try to update 'server' of the connector, to an unavailable IP address
     %% the update should fail because of 'unreachable' or 'connrefused'
     {ok, 400, _ErrorMsg} = request(put, uri(["connectors", ?CONNECTR_ID]),
-                                 ?MQTT_CONNECOTR2(<<"127.0.0.1:2883">>)),
+                                 ?MQTT_CONNECOTR2(<<"127.0.0.1:2603">>)),
     %% we fix the 'server' parameter to a normal one, it should work
     {ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]),
                                  ?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)),
@@ -391,6 +391,51 @@ t_mqtt_conn_update(_) ->
     {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
     {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
 
+t_mqtt_conn_update2(_) ->
+    %% assert we there's no connectors and no bridges at first
+    {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+    %% then we add a mqtt connector, using POST
+    %% but this connector is point to a unreachable server "2603"
+    {ok, 201, Connector} = request(post, uri(["connectors"]),
+                            ?MQTT_CONNECOTR2(<<"127.0.0.1:2603">>)
+                                #{ <<"type">> => ?CONNECTR_TYPE
+                                 , <<"name">> => ?CONNECTR_NAME
+                                 }),
+
+    ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
+                  , <<"server">> := <<"127.0.0.1:2603">>
+                  }, jsx:decode(Connector)),
+
+    %% ... and a MQTT bridge, using POST
+    %% we bind this bridge to the connector created just now
+    {ok, 201, Bridge} = request(post, uri(["bridges"]),
+        ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{
+            <<"type">> => ?CONNECTR_TYPE,
+            <<"name">> => ?BRIDGE_NAME_EGRESS
+        }),
+    ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
+                  , <<"type">> := <<"mqtt">>
+                  , <<"name">> := ?BRIDGE_NAME_EGRESS
+                  , <<"status">> := <<"disconnected">>
+                  , <<"connector">> := ?CONNECTR_ID
+                  }, jsx:decode(Bridge)),
+    %% we fix the 'server' parameter to a normal one, it should work
+    {ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]),
+                                 ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)),
+    {ok, 200, BridgeStr} = request(get, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
+    ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
+                  , <<"status">> := <<"connected">>
+                  }, jsx:decode(BridgeStr)),
+    %% delete the bridge
+    {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+    %% delete the connector
+    {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
+    {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
+
 t_mqtt_conn_testing(_) ->
     %% APIs for testing the connectivity
     %% then we add a mqtt connector, using POST

+ 6 - 0
apps/emqx_resource/include/emqx_resource.hrl

@@ -29,6 +29,12 @@
     metrics := emqx_plugin_libs_metrics:metrics()
 }.
 -type resource_group() :: binary().
+-type create_opts() :: #{
+        %% The emqx_resource:create/4 will return OK event if the Mod:on_start/2 fails,
+        %% the 'status' of the resource will be 'stopped' in this case.
+        %% Defaults to 'false'
+        force_create => boolean()
+    }.
 -type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} |
     undefined.
 

+ 29 - 5
apps/emqx_resource/src/emqx_resource.erl

@@ -33,7 +33,9 @@
 
 -export([ check_config/2
         , check_and_create/3
+        , check_and_create/4
         , check_and_create_local/3
+        , check_and_create_local/4
         , check_and_recreate/4
         , check_and_recreate_local/4
         ]).
@@ -42,7 +44,9 @@
 %% provisional solution: rpc:multical to all the nodes for creating/updating/removing
 %% todo: replicate operations
 -export([ create/3 %% store the config and start the instance
+        , create/4
         , create_local/3
+        , create_local/4
         , create_dry_run/2 %% run start/2, health_check/2 and stop/1 sequentially
         , create_dry_run_local/2
         , recreate/4 %% this will do create_dry_run, stop the old instance and start a new one
@@ -141,12 +145,22 @@ apply_query_after_calls(Funcs) ->
 -spec create(instance_id(), resource_type(), resource_config()) ->
     {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
 create(InstId, ResourceType, Config) ->
-    cluster_call(create_local, [InstId, ResourceType, Config]).
+    create(InstId, ResourceType, Config, #{}).
+
+-spec create(instance_id(), resource_type(), resource_config(), create_opts()) ->
+    {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
+create(InstId, ResourceType, Config, Opts) ->
+    cluster_call(create_local, [InstId, ResourceType, Config, Opts]).
 
 -spec create_local(instance_id(), resource_type(), resource_config()) ->
     {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
 create_local(InstId, ResourceType, Config) ->
-    call_instance(InstId, {create, InstId, ResourceType, Config}).
+    create_local(InstId, ResourceType, Config, #{}).
+
+-spec create_local(instance_id(), resource_type(), resource_config(), create_opts()) ->
+    {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
+create_local(InstId, ResourceType, Config, Opts) ->
+    call_instance(InstId, {create, InstId, ResourceType, Config, Opts}).
 
 -spec create_dry_run(resource_type(), resource_config()) ->
     ok | {error, Reason :: term()}.
@@ -188,7 +202,7 @@ query(InstId, Request) ->
 query(InstId, Request, AfterQuery) ->
     case get_instance(InstId) of
         {ok, #{status := stopped}} ->
-            error({InstId, stopped});
+            error({resource_stopped, InstId});
         {ok, #{mod := Mod, state := ResourceState, status := started}} ->
             %% the resource state is readonly to Module:on_query/4
             %% and the `after_query()` functions should be thread safe
@@ -294,14 +308,24 @@ check_config(ResourceType, RawConfigTerm) ->
 -spec check_and_create(instance_id(), resource_type(), raw_resource_config()) ->
     {ok, resource_data() | 'already_created'} | {error, term()}.
 check_and_create(InstId, ResourceType, RawConfig) ->
+    check_and_create(InstId, ResourceType, RawConfig, #{}).
+
+-spec check_and_create(instance_id(), resource_type(), raw_resource_config(), create_opts()) ->
+    {ok, resource_data() | 'already_created'} | {error, term()}.
+check_and_create(InstId, ResourceType, RawConfig, Opts) ->
     check_and_do(ResourceType, RawConfig,
-        fun(InstConf) -> create(InstId, ResourceType, InstConf) end).
+        fun(InstConf) -> create(InstId, ResourceType, InstConf, Opts) end).
 
 -spec check_and_create_local(instance_id(), resource_type(), raw_resource_config()) ->
     {ok, resource_data()} | {error, term()}.
 check_and_create_local(InstId, ResourceType, RawConfig) ->
+    check_and_create_local(InstId, ResourceType, RawConfig, #{}).
+
+-spec check_and_create_local(instance_id(), resource_type(), raw_resource_config(),
+    create_opts()) -> {ok, resource_data()} | {error, term()}.
+check_and_create_local(InstId, ResourceType, RawConfig, Opts) ->
     check_and_do(ResourceType, RawConfig,
-        fun(InstConf) -> create_local(InstId, ResourceType, InstConf) end).
+        fun(InstConf) -> create_local(InstId, ResourceType, InstConf, Opts) end).
 
 -spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), term()) ->
     {ok, resource_data()} | {error, term()}.

+ 36 - 37
apps/emqx_resource/src/emqx_resource_instance.erl

@@ -26,7 +26,6 @@
 -export([ lookup/1
         , get_metrics/1
         , list_all/0
-        , create_local/3
         ]).
 
 -export([ hash_call/2
@@ -85,15 +84,6 @@ list_all() ->
         error:badarg -> []
     end.
 
-
--spec create_local(instance_id(), resource_type(), resource_config()) ->
-    {ok, resource_data()} | {error, term()}.
-create_local(InstId, ResourceType, InstConf) ->
-    case hash_call(InstId, {create, InstId, ResourceType, InstConf}, 15000) of
-        {ok, Data} -> {ok, Data};
-        Error -> Error
-    end.
-
 %%------------------------------------------------------------------------------
 %% gen_server callbacks
 %%------------------------------------------------------------------------------
@@ -105,8 +95,8 @@ init({Pool, Id}) ->
     true = gproc_pool:connect_worker(Pool, {Pool, Id}),
     {ok, #state{worker_pool = Pool, worker_id = Id}}.
 
-handle_call({create, InstId, ResourceType, Config}, _From, State) ->
-    {reply, do_create(InstId, ResourceType, Config), State};
+handle_call({create, InstId, ResourceType, Config, Opts}, _From, State) ->
+    {reply, do_create(InstId, ResourceType, Config, Opts), State};
 
 handle_call({create_dry_run, InstId, ResourceType, Config}, _From, State) ->
     {reply, do_create_dry_run(InstId, ResourceType, Config), State};
@@ -146,7 +136,7 @@ code_change(_OldVsn, State, _Extra) ->
 
 %% suppress the race condition check, as these functions are protected in gproc workers
 -dialyzer({nowarn_function, [do_recreate/4,
-                             do_create/3,
+                             do_create/4,
                              do_restart/1,
                              do_stop/1,
                              do_health_check/1]}).
@@ -156,10 +146,11 @@ do_recreate(InstId, ResourceType, NewConfig, Params) ->
         {ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} ->
             Config = emqx_resource:call_config_merge(ResourceType, OldConfig,
                         NewConfig, Params),
-            case do_create_dry_run(InstId, ResourceType, Config) of
+            TestInstId = iolist_to_binary(emqx_misc:gen_id(16)),
+            case do_create_dry_run(TestInstId, ResourceType, Config) of
                 ok ->
                     do_remove(ResourceType, InstId, ResourceState),
-                    do_create(InstId, ResourceType, Config);
+                    do_create(InstId, ResourceType, Config, #{force_create => true});
                 Error ->
                     Error
             end;
@@ -169,21 +160,27 @@ do_recreate(InstId, ResourceType, NewConfig, Params) ->
             {error, not_found}
     end.
 
-do_create(InstId, ResourceType, Config) ->
+do_create(InstId, ResourceType, Config, Opts) ->
+    ForceCreate = maps:get(force_create, Opts, false),
     case lookup(InstId) of
         {ok, _} -> {ok, already_created};
         _ ->
+            Res0 = #{id => InstId, mod => ResourceType, config => Config,
+                     status => stopped, state => undefined},
             case emqx_resource:call_start(InstId, ResourceType, Config) of
                 {ok, ResourceState} ->
-                    ets:insert(emqx_resource_instance, {InstId,
-                        #{mod => ResourceType, config => Config,
-                          state => ResourceState, status => stopped}}),
-                    _ = do_health_check(InstId),
                     ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId),
+                    %% this is the first time we do health check, this will update the
+                    %% status and then do ets:insert/2
+                    _ = do_health_check(Res0#{state => ResourceState}),
                     {ok, force_lookup(InstId)};
-                {error, Reason} ->
-                    logger:error("start ~ts resource ~ts failed: ~p",
+                {error, Reason} when ForceCreate == true ->
+                    logger:error("start ~ts resource ~ts failed: ~p, "
+                                 "force_create it as a stopped resource",
                                  [ResourceType, InstId, Reason]),
+                    ets:insert(emqx_resource_instance, {InstId, Res0}),
+                    {ok, Res0};
+                {error, Reason} when ForceCreate == false ->
                     {error, Reason}
             end
     end.
@@ -243,22 +240,24 @@ do_stop(InstId) ->
             Error
     end.
 
-do_health_check(InstId) ->
+do_health_check(InstId) when is_binary(InstId) ->
     case lookup(InstId) of
-        {ok, #{mod := Mod, state := ResourceState0} = Data} ->
-            case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of
-                {ok, ResourceState1} ->
-                    ets:insert(emqx_resource_instance,
-                        {InstId, Data#{status => started, state => ResourceState1}}),
-                    ok;
-                {error, Reason, ResourceState1} ->
-                    logger:error("health check for ~p failed: ~p", [InstId, Reason]),
-                    ets:insert(emqx_resource_instance,
-                        {InstId, Data#{status => stopped, state => ResourceState1}}),
-                    {error, Reason}
-            end;
-        Error ->
-            Error
+        {ok, Data} -> do_health_check(Data);
+        Error -> Error
+    end;
+do_health_check(#{state := undefined}) ->
+    {error, resource_not_initialized};
+do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) ->
+    case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of
+        {ok, ResourceState1} ->
+            ets:insert(emqx_resource_instance,
+                {InstId, Data#{status => started, state => ResourceState1}}),
+            ok;
+        {error, Reason, ResourceState1} ->
+            logger:error("health check for ~p failed: ~p", [InstId, Reason]),
+            ets:insert(emqx_resource_instance,
+                {InstId, Data#{status => stopped, state => ResourceState1}}),
+            {error, Reason}
     end.
 
 %%------------------------------------------------------------------------------

+ 1 - 4
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -142,10 +142,7 @@ t_stop_start(_) ->
 
     ?assertNot(is_process_alive(Pid0)),
 
-    ?assertException(
-       error,
-       {?ID, stopped},
-       emqx_resource:query(?ID, get_state)),
+    ?assertException(error, {resource_stopped, ?ID}, emqx_resource:query(?ID, get_state)),
 
     ok = emqx_resource:restart(?ID),
 

+ 1 - 1
apps/emqx_rule_engine/include/rule_engine.hrl

@@ -47,7 +47,7 @@
         , name := binary()
         , sql := binary()
         , outputs := [output()]
-        , enabled := boolean()
+        , enable := boolean()
         , description => binary()
         , created_at := integer() %% epoch in millisecond precision
         , from := list(topic())

+ 3 - 1
apps/emqx_rule_engine/src/emqx_rule_api_schema.erl

@@ -43,7 +43,9 @@ fields("rule_creation") ->
 fields("rule_info") ->
     [ rule_id()
     , {"metrics", sc(ref("metrics"), #{desc => "The metrics of the rule"})}
-    , {"node_metrics", sc(ref("node_metrics"), #{desc => "The metrics of the rule"})}
+    , {"node_metrics", sc(hoconsc:array(ref("node_metrics")),
+        #{ desc => "The metrics of the rule for each node"
+         })}
     , {"from", sc(hoconsc:array(binary()),
         #{desc => "The topics of the rule", example => "t/#"})}
     , {"created_at", sc(binary(),

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -223,7 +223,7 @@ do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) ->
                 id => RuleId,
                 name => maps:get(name, Params, <<"">>),
                 created_at => erlang:system_time(millisecond),
-                enabled => maps:get(enabled, Params, true),
+                enable => maps:get(enable, Params, true),
                 sql => Sql,
                 outputs => parse_outputs(Outputs),
                 description => maps:get(description, Params, ""),

+ 23 - 12
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -164,14 +164,15 @@ param_path_id() ->
     Records = emqx_rule_engine:get_rules_ordered_by_ts(),
     {200, format_rule_resp(Records)};
 
-'/rules'(post, #{body := Params}) ->
-    Id = maps:get(<<"id">>, Params, list_to_binary(emqx_misc:gen_id(8))),
+'/rules'(post, #{body := Params0}) ->
+    Id = maps:get(<<"id">>, Params0, list_to_binary(emqx_misc:gen_id(8))),
+    Params = filter_out_request_body(Params0),
     ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
     case emqx_rule_engine:get_rule(Id) of
         {ok, _Rule} ->
             {400, #{code => 'BAD_ARGS', message => <<"rule id already exists">>}};
         not_found ->
-            case emqx:update_config(ConfPath, maps:remove(<<"id">>, Params), #{}) of
+            case emqx:update_config(ConfPath, Params, #{}) of
                 {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
                     [Rule] = get_one_rule(AllRules, Id),
                     {201, format_rule_resp(Rule)};
@@ -196,9 +197,10 @@ param_path_id() ->
             {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}
     end;
 
-'/rules/:id'(put, #{bindings := #{id := Id}, body := Params}) ->
+'/rules/:id'(put, #{bindings := #{id := Id}, body := Params0}) ->
+    Params = filter_out_request_body(Params0),
     ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
-    case emqx:update_config(ConfPath, maps:remove(<<"id">>, Params), #{}) of
+    case emqx:update_config(ConfPath, Params, #{}) of
         {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
             [Rule] = get_one_rule(AllRules, Id),
             {200, format_rule_resp(Rule)};
@@ -233,7 +235,7 @@ format_rule_resp(#{ id := Id, name := Name,
                     from := Topics,
                     outputs := Output,
                     sql := SQL,
-                    enabled := Enabled,
+                    enable := Enable,
                     description := Descr}) ->
     NodeMetrics = get_rule_metrics(Id),
     #{id => Id,
@@ -243,7 +245,7 @@ format_rule_resp(#{ id := Id, name := Name,
       sql => SQL,
       metrics => aggregate_metrics(NodeMetrics),
       node_metrics => NodeMetrics,
-      enabled => Enabled,
+      enable => Enable,
       created_at => format_datetime(CreatedAt, millisecond),
       description => Descr
      }.
@@ -266,10 +268,12 @@ get_rule_metrics(Id) ->
                           rate_max := Max,
                           rate_last5m := Last5M
                         }) ->
-        #{ matched => Matched
-         , rate => Current
-         , rate_max => Max
-         , rate_last5m => Last5M
+        #{ metrics => #{
+            matched => Matched,
+            rate => Current,
+            rate_max => Max,
+            rate_last5m => Last5M
+           }
          , node => Node
          }
     end,
@@ -279,7 +283,8 @@ get_rule_metrics(Id) ->
 aggregate_metrics(AllMetrics) ->
     InitMetrics = #{matched => 0, rate => 0, rate_max => 0, rate_last5m => 0},
     lists:foldl(fun
-        (#{matched := Match1, rate := Rate1, rate_max := RateMax1, rate_last5m := Rate5m1},
+        (#{metrics := #{matched := Match1, rate := Rate1,
+                        rate_max := RateMax1, rate_last5m := Rate5m1}},
          #{matched := Match0, rate := Rate0, rate_max := RateMax0, rate_last5m := Rate5m0}) ->
             #{matched => Match1 + Match0, rate => Rate1 + Rate0,
               rate_max => RateMax1 + RateMax0, rate_last5m => Rate5m1 + Rate5m0}
@@ -287,3 +292,9 @@ aggregate_metrics(AllMetrics) ->
 
 get_one_rule(AllRules, Id) ->
     [R || R = #{id := Id0} <- AllRules, Id0 == Id].
+
+filter_out_request_body(Conf) ->
+    ExtraConfs = [<<"id">>, <<"status">>, <<"node_status">>, <<"node_metrics">>,
+        <<"metrics">>, <<"node">>],
+    maps:without(ExtraConfs, Conf).
+

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -48,7 +48,7 @@
 -spec(apply_rules(list(rule()), input()) -> ok).
 apply_rules([], _Input) ->
     ok;
-apply_rules([#{enabled := false}|More], Input) ->
+apply_rules([#{enable := false}|More], Input) ->
     apply_rules(More, Input);
 apply_rules([Rule = #{id := RuleID}|More], Input) ->
     try apply_rule_discard_result(Rule, Input)

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_sqltester.erl

@@ -47,7 +47,7 @@ test_rule(Sql, Select, Context, EventTopics) ->
         sql => Sql,
         from => EventTopics,
         outputs => [#{mod => ?MODULE, func => get_selected_data, args => #{}}],
-        enabled => true,
+        enable => true,
         is_foreach => emqx_rule_sqlparser:select_is_foreach(Select),
         fields => emqx_rule_sqlparser:select_fields(Select),
         doeach => emqx_rule_sqlparser:select_doeach(Select),