JimMoen 4 лет назад
Родитель
Сommit
b3f9220d02

+ 19 - 11
apps/emqx_authz/src/emqx_authz_api_sources.erl

@@ -32,7 +32,8 @@
 -define(EXAMPLE_FILE,
         #{type=> file,
           enable => true,
-          rules => <<"{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}.\n{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}.">>
+          rules => <<"{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}.\n",
+                     "{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}.">>
                    }).
 
 -define(EXAMPLE_RETURNED,
@@ -90,7 +91,7 @@ sources_api() ->
         },
         post => #{
             description => "Add new source",
-            requestBody => #{
+            'requestBody' => #{
                 content => #{
                     'application/json' => #{
                         schema => minirest:ref(<<"sources">>),
@@ -114,7 +115,7 @@ sources_api() ->
         },
         put => #{
             description => "Update all sources",
-            requestBody => #{
+            'requestBody' => #{
                 content => #{
                     'application/json' => #{
                         schema => #{
@@ -206,7 +207,7 @@ source_api() ->
                     required => true
                 }
             ],
-            requestBody => #{
+            'requestBody' => #{
                 content => #{
                     'application/json' => #{
                         schema => minirest:ref(<<"sources">>),
@@ -279,7 +280,7 @@ move_source_api() ->
                     required => true
                 }
             ],
-            requestBody => #{
+            'requestBody' => #{
                 content => #{
                     'application/json' => #{
                         schema => #{
@@ -287,7 +288,7 @@ move_source_api() ->
                             required => [position],
                             properties => #{
                                 position => #{
-                                    oneOf => [
+                                    'oneOf' => [
                                         #{type => string,
                                           enum => [<<"top">>, <<"bottom">>]
                                         },
@@ -326,7 +327,8 @@ move_source_api() ->
     {"/authorization/sources/:type/move", Metadata, move_source}.
 
 sources(get, _) ->
-    Sources = lists:foldl(fun (#{<<"type">> := <<"file">>, <<"enable">> := Enable, <<"path">> := Path}, AccIn) ->
+    Sources = lists:foldl(fun (#{<<"type">> := <<"file">>,
+                                 <<"enable">> := Enable, <<"path">> := Path}, AccIn) ->
                                   case file:read_file(Path) of
                                       {ok, Rules} ->
                                           lists:append(AccIn, [#{type => file,
@@ -345,7 +347,8 @@ sources(get, _) ->
     {200, #{sources => Sources}};
 sources(post, #{body := #{<<"type">> := <<"file">>, <<"rules">> := Rules}}) ->
     {ok, Filename} = write_file(acl_conf_file(), Rules),
-    update_config(?CMD_PREPEND, [#{<<"type">> => <<"file">>, <<"enable">> => true, <<"path">> => Filename}]);
+    update_config(?CMD_PREPEND, [#{<<"type">> => <<"file">>,
+                                   <<"enable">> => true, <<"path">> => Filename}]);
 sources(post, #{body := Body}) when is_map(Body) ->
     update_config(?CMD_PREPEND, [maybe_write_certs(Body)]);
 sources(put, #{body := Body}) when is_list(Body) ->
@@ -377,9 +380,13 @@ source(get, #{bindings := #{type := Type}}) ->
         [Source] ->
             {200, read_certs(Source)}
     end;
-source(put, #{bindings := #{type := <<"file">>}, body := #{<<"type">> := <<"file">>, <<"rules">> := Rules, <<"enable">> := Enable}}) ->
+source(put, #{bindings := #{type := <<"file">>}, body := #{<<"type">> := <<"file">>,
+                                                           <<"rules">> := Rules,
+                                                           <<"enable">> := Enable}}) ->
     {ok, Filename} = write_file(maps:get(path, emqx_authz:lookup(file), ""), Rules),
-    case emqx_authz:update({?CMD_REPLACE, <<"file">>}, #{<<"type">> => <<"file">>, <<"enable">> => Enable, <<"path">> => Filename}) of
+    case emqx_authz:update({?CMD_REPLACE, <<"file">>}, #{<<"type">> => <<"file">>,
+                                                         <<"enable">> => Enable,
+                                                         <<"path">> => Filename}) of
         {ok, _} -> {204};
         {error, Reason} ->
             {400, #{code => <<"BAD_REQUEST">>,
@@ -405,7 +412,8 @@ get_raw_sources() ->
     RawSources = emqx:get_raw_config([authorization, sources], []),
     Schema = #{roots => emqx_authz_schema:fields("authorization"), fields => #{}},
     Conf = #{<<"sources">> => RawSources},
-    #{<<"sources">> := Sources} = hocon_schema:check_plain(Schema, Conf, #{only_fill_defaults => true}),
+    #{<<"sources">> := Sources} = hocon_schema:check_plain(Schema, Conf,
+                                                           #{only_fill_defaults => true}),
     Sources.
 
 get_raw_source(Type) ->

+ 5 - 4
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -45,10 +45,10 @@ req_schema() ->
                 emqx_mgmt_api_configs:gen_schema(Conf)
         end
      || T <- ?TYPES],
-    #{oneOf => Schema}.
+    #{'oneOf' => Schema}.
 
 resp_schema() ->
-    #{oneOf := Schema} = req_schema(),
+    #{'oneOf' := Schema} = req_schema(),
     AddMetadata = fun(Prop) ->
         Prop#{is_connected => #{type => boolean},
               id => #{type => string},
@@ -57,7 +57,7 @@ resp_schema() ->
     end,
     Schema1 = [S#{properties => AddMetadata(Prop)}
                || S = #{properties := Prop} <- Schema],
-    #{oneOf => Schema1}.
+    #{'oneOf' => Schema1}.
 
 api_spec() ->
     {bridge_apis(), []}.
@@ -120,7 +120,8 @@ operation_apis() ->
                 param_path_id(),
                 param_path_operation()],
             responses => #{
-                <<"500">> => emqx_mgmt_util:error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']),
+                <<"500">> => emqx_mgmt_util:error_schema(<<"Operation Failed">>,
+                                                         ['INTERNAL_ERROR']),
                 <<"200">> => emqx_mgmt_util:schema(<<"Operation success">>)}}},
     {"/nodes/:node/bridges/:id/operation/:operation", Metadata, manage_bridges}.
 

+ 12 - 12
apps/emqx_dashboard/src/emqx_dashboard_api.erl

@@ -48,16 +48,16 @@ paths() -> ["/login", "/logout", "/users",
 
 schema("/login") ->
     #{
-        operationId => login,
+        'operationId' => login,
         post => #{
             tags => [<<"dashboard">>],
             description => <<"Dashboard Auth">>,
             summary => <<"Dashboard Auth">>,
-            requestBody =>
+            'requestBody' =>
             [
                 {username, mk(binary(),
                     #{desc => <<"The User for which to create the token.">>,
-                        maxLength => 100, example => <<"admin">>})},
+                        'maxLength' => 100, example => <<"admin">>})},
                 {password, mk(binary(),
                     #{desc => "password", example => "public"})}
             ],
@@ -76,14 +76,14 @@ schema("/login") ->
         }};
 schema("/logout") ->
     #{
-        operationId => logout,
+        'operationId' => logout,
         post => #{
             tags => [<<"dashboard">>],
             description => <<"Dashboard User logout">>,
-            requestBody => [
+            'requestBody' => [
                 {username, mk(binary(),
                     #{desc => <<"The User for which to create the token.">>,
-                        maxLength => 100, example => <<"admin">>})}
+                        'maxLength' => 100, example => <<"admin">>})}
             ],
             responses => #{
                 200 => <<"Dashboard logout successfully">>
@@ -92,7 +92,7 @@ schema("/logout") ->
     };
 schema("/users") ->
     #{
-        operationId => users,
+        'operationId' => users,
         get => #{
             tags => [<<"dashboard">>],
             description => <<"Get dashboard users">>,
@@ -104,7 +104,7 @@ schema("/users") ->
         post => #{
             tags => [<<"dashboard">>],
             description => <<"Create dashboard users">>,
-            requestBody => fields(user_password),
+            'requestBody' => fields(user_password),
             responses => #{
                 200 => <<"Create user successfully">>,
                 400 => [{code, mk(string(), #{example => 'CREATE_FAIL'})},
@@ -114,13 +114,13 @@ schema("/users") ->
 
 schema("/users/:username") ->
     #{
-        operationId => user,
+        'operationId' => user,
         put => #{
             tags => [<<"dashboard">>],
             description => <<"Update dashboard users">>,
             parameters => [{username, mk(binary(),
                 #{in => path, example => <<"admin">>})}],
-            requestBody => [{tag, mk(binary(), #{desc => <<"Tag">>})}],
+            'requestBody' => [{tag, mk(binary(), #{desc => <<"Tag">>})}],
             responses => #{
                 200 => <<"Update User successfully">>,
                 400 => [{code, mk(string(), #{example => 'UPDATE_FAIL'})},
@@ -138,13 +138,13 @@ schema("/users/:username") ->
     };
 schema("/users/:username/change_pwd") ->
     #{
-        operationId => change_pwd,
+        'operationId' => change_pwd,
         put => #{
             tags => [<<"dashboard">>],
             description => <<"Update dashboard users password">>,
             parameters => [{username, mk(binary(),
                 #{in => path, required => true, example => <<"admin">>})}],
-            requestBody => [
+            'requestBody' => [
                 {old_pwd, mk(binary(), #{required => true})},
                 {new_pwd, mk(binary(), #{required => true})}
             ],

+ 9 - 5
apps/emqx_management/src/emqx_mgmt_api_alarms.erl

@@ -36,7 +36,7 @@ paths() ->
 
 schema("/alarms") ->
     #{
-        operationId => alarms,
+        'operationId' => alarms,
         get => #{
             description => <<"EMQ X alarms">>,
             parameters => [
@@ -63,17 +63,21 @@ schema("/alarms") ->
 
 fields(alarm) ->
     [
-        {node, hoconsc:mk(binary(), #{desc => <<"Alarm in node">>, example => atom_to_list(node())})},
-        {name, hoconsc:mk(binary(), #{desc => <<"Alarm name">>, example => <<"high_system_memory_usage">>})},
+        {node, hoconsc:mk(binary(),
+                          #{desc => <<"Alarm in node">>, example => atom_to_list(node())})},
+        {name, hoconsc:mk(binary(),
+                          #{desc => <<"Alarm name">>, example => <<"high_system_memory_usage">>})},
         {message, hoconsc:mk(binary(), #{desc => <<"Alarm readable information">>,
             example => <<"System memory usage is higher than 70%">>})},
         {details, hoconsc:mk(map(), #{desc => <<"Alarm details information">>,
             example => #{<<"high_watermark">> => 70}})},
-        {duration, hoconsc:mk(integer(), #{desc => <<"Alarms duration time; UNIX time stamp, millisecond">>,
+        {duration, hoconsc:mk(integer(),
+                              #{desc => <<"Alarms duration time; UNIX time stamp, millisecond">>,
             example => 297056})},
         {activate_at, hoconsc:mk(binary(), #{desc => <<"Alarms activate time, RFC 3339">>,
             example => <<"2021-10-25T11:52:52.548+08:00">>})},
-        {deactivate_at, hoconsc:mk(binary(), #{desc => <<"Nullable, alarms deactivate time, RFC 3339">>,
+        {deactivate_at, hoconsc:mk(binary(),
+                                   #{desc => <<"Nullable, alarms deactivate time, RFC 3339">>,
             example => <<"2021-10-31T10:52:52.548+08:00">>})}
     ];
 

+ 15 - 10
apps/emqx_management/src/emqx_mgmt_api_listeners.erl

@@ -59,10 +59,10 @@ req_schema() ->
     Schema = [emqx_mgmt_api_configs:gen_schema(
         emqx:get_raw_config([listeners, T, default], #{}))
      || T <- ?TYPES_ATOM],
-    #{oneOf => Schema}.
+    #{'oneOf' => Schema}.
 
 resp_schema() ->
-    #{oneOf := Schema} = req_schema(),
+    #{'oneOf' := Schema} = req_schema(),
     AddMetadata = fun(Prop) ->
         Prop#{running => #{type => boolean},
               id => #{type => string},
@@ -70,7 +70,7 @@ resp_schema() ->
     end,
     Schema1 = [S#{properties => AddMetadata(Prop)}
                || S = #{properties := Prop} <- Schema],
-    #{oneOf => Schema1}.
+    #{'oneOf' => Schema1}.
 
 api_list_listeners() ->
     Metadata = #{
@@ -78,7 +78,8 @@ api_list_listeners() ->
             description => <<"List listeners from all nodes in the cluster">>,
             responses => #{
                 <<"200">> =>
-                    emqx_mgmt_util:array_schema(resp_schema(), <<"List listeners successfully">>)}}},
+                    emqx_mgmt_util:array_schema(resp_schema(),
+                                                <<"List listeners successfully">>)}}},
     {"/listeners", Metadata, list_listeners}.
 
 api_list_update_listeners_by_id() ->
@@ -92,18 +93,21 @@ api_list_update_listeners_by_id() ->
                 <<"200">> =>
                     emqx_mgmt_util:array_schema(resp_schema(), <<"List listeners successfully">>)}},
         put => #{
-            description => <<"Create or update a listener by a given Id to all nodes in the cluster">>,
+            description =>
+                <<"Create or update a listener by a given Id to all nodes in the cluster">>,
             parameters => [param_path_id()],
-            requestBody => emqx_mgmt_util:schema(req_schema(), <<"Listener Config">>),
+            'requestBody' => emqx_mgmt_util:schema(req_schema(), <<"Listener Config">>),
             responses => #{
                 <<"400">> =>
-                    emqx_mgmt_util:error_schema(?UPDATE_CONFIG_FAILED, ['BAD_LISTENER_ID', 'BAD_CONFIG_SCHEMA']),
+                    emqx_mgmt_util:error_schema(?UPDATE_CONFIG_FAILED,
+                                                ['BAD_LISTENER_ID', 'BAD_CONFIG_SCHEMA']),
                 <<"404">> =>
                     emqx_mgmt_util:error_schema(?LISTENER_NOT_FOUND, ['BAD_LISTENER_ID']),
                 <<"500">> =>
                     emqx_mgmt_util:error_schema(?OPERATION_FAILED, ['INTERNAL_ERROR']),
                 <<"200">> =>
-                    emqx_mgmt_util:array_schema(resp_schema(), <<"Create or update listener successfully">>)}},
+                    emqx_mgmt_util:array_schema(resp_schema(),
+                                                <<"Create or update listener successfully">>)}},
         delete => #{
             description => <<"Delete a listener by a given Id to all nodes in the cluster">>,
             parameters => [param_path_id()],
@@ -143,10 +147,11 @@ api_get_update_listener_by_id_on_node() ->
         put => #{
             description => <<"Create or update a listener by a given Id on a specific node">>,
             parameters => [param_path_node(), param_path_id()],
-            requestBody => emqx_mgmt_util:schema(req_schema(), <<"Listener Config">>),
+            'requestBody' => emqx_mgmt_util:schema(req_schema(), <<"Listener Config">>),
             responses => #{
                 <<"400">> =>
-                    emqx_mgmt_util:error_schema(?UPDATE_CONFIG_FAILED, ['BAD_LISTENER_ID', 'BAD_CONFIG_SCHEMA']),
+                    emqx_mgmt_util:error_schema(?UPDATE_CONFIG_FAILED,
+                                                ['BAD_LISTENER_ID', 'BAD_CONFIG_SCHEMA']),
                 <<"404">> =>
                     emqx_mgmt_util:error_schema(?NODE_LISTENER_NOT_FOUND,
                         ['BAD_NODE_NAME', 'BAD_LISTENER_ID']),

+ 20 - 12
apps/emqx_modules/src/emqx_delayed_api.erl

@@ -52,7 +52,7 @@ paths() -> ["/mqtt/delayed", "/mqtt/delayed/messages", "/mqtt/delayed/messages/:
 
 schema("/mqtt/delayed") ->
     #{
-        operationId => status,
+        'operationId' => status,
         get => #{
             tags => [<<"mqtt">>],
             description => <<"Get delayed status">>,
@@ -64,25 +64,28 @@ schema("/mqtt/delayed") ->
         put => #{
             tags => [<<"mqtt">>],
             description => <<"Enable or disable delayed, set max delayed messages">>,
-            requestBody => ref(emqx_modules_schema, "delayed"),
+            'requestBody' => ref(emqx_modules_schema, "delayed"),
             responses => #{
                 200 => mk(ref(emqx_modules_schema, "delayed"),
                     #{desc => <<"Enable or disable delayed successfully">>}),
-                400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Max limit illegality">>)
+                400 => emqx_dashboard_swagger:error_codes( [?BAD_REQUEST]
+                                                         , <<"Max limit illegality">>)
             }
         }
     };
 
 schema("/mqtt/delayed/messages/:msgid") ->
-    #{operationId => delayed_message,
+    #{'operationId' => delayed_message,
         get => #{
             tags => [<<"mqtt">>],
             description => <<"Get delayed message">>,
             parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}],
             responses => #{
                 200 => ref("message_without_payload"),
-                400 => emqx_dashboard_swagger:error_codes([?MESSAGE_ID_SCHEMA_ERROR], <<"Bad MsgId format">>),
-                404 => emqx_dashboard_swagger:error_codes([?MESSAGE_ID_NOT_FOUND], <<"MsgId not found">>)
+                400 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_SCHEMA_ERROR]
+                                                         , <<"Bad MsgId format">>),
+                404 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_NOT_FOUND]
+                                                         , <<"MsgId not found">>)
             }
         },
         delete => #{
@@ -91,14 +94,16 @@ schema("/mqtt/delayed/messages/:msgid") ->
             parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}],
             responses => #{
                 204 => <<"Delete delayed message success">>,
-                400 => emqx_dashboard_swagger:error_codes([?MESSAGE_ID_SCHEMA_ERROR], <<"Bad MsgId format">>),
-                404 => emqx_dashboard_swagger:error_codes([?MESSAGE_ID_NOT_FOUND], <<"MsgId not found">>)
+                400 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_SCHEMA_ERROR]
+                                                         , <<"Bad MsgId format">>),
+                404 => emqx_dashboard_swagger:error_codes( [?MESSAGE_ID_NOT_FOUND]
+                                                         , <<"MsgId not found">>)
             }
         }
     };
 schema("/mqtt/delayed/messages") ->
     #{
-        operationId => delayed_messages,
+        'operationId' => delayed_messages,
         get => #{
             tags => [<<"mqtt">>],
             description => <<"List delayed messages">>,
@@ -130,7 +135,8 @@ fields("message_without_payload") ->
         {from_username, mk(binary(), #{desc => <<"From Username">>})}
     ];
 fields("message") ->
-    PayloadDesc = io_lib:format("Payload, base64 encode. Payload will be ~p if length large than ~p",
+    PayloadDesc = io_lib:format(
+                    "Payload, base64 encode. Payload will be ~p if length large than ~p",
         [?PAYLOAD_TOO_LARGE, ?MAX_PAYLOAD_LENGTH]),
     fields("message_without_payload") ++
     [{payload, mk(binary(), #{desc => iolist_to_binary(PayloadDesc)})}].
@@ -233,9 +239,11 @@ update_config_(Node, Config) ->
     rpc_call(Node, ?MODULE, ?FUNCTION_NAME, [Node, Config]).
 
 generate_http_code_map(id_schema_error, Id) ->
-    #{code => ?MESSAGE_ID_SCHEMA_ERROR, message => iolist_to_binary(io_lib:format("Message ID ~p schema error", [Id]))};
+    #{code => ?MESSAGE_ID_SCHEMA_ERROR, message =>
+          iolist_to_binary(io_lib:format("Message ID ~p schema error", [Id]))};
 generate_http_code_map(not_found, Id) ->
-    #{code => ?MESSAGE_ID_NOT_FOUND, message => iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}.
+    #{code => ?MESSAGE_ID_NOT_FOUND, message =>
+          iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}.
 
 rpc_call(Node, Module, Fun, Args) ->
     case rpc:call(Node, Module, Fun, Args) of

+ 6 - 3
apps/emqx_modules/src/emqx_topic_metrics_api.erl

@@ -96,7 +96,8 @@ topic_metrics_api() ->
             responses => #{
                 <<"200">> => schema(<<"Create topic metrics success">>),
                 <<"409">> => error_schema(<<"Topic metrics max limit">>, [?EXCEED_LIMIT]),
-                <<"400">> => error_schema(<<"Topic metrics already exist or bad topic">>, [?BAD_REQUEST, ?BAD_TOPIC])
+                <<"400">> => error_schema( <<"Topic metrics already exist or bad topic">>
+                                         , [?BAD_REQUEST, ?BAD_TOPIC])
             }
         }
     },
@@ -174,8 +175,10 @@ register(Topic) ->
                                         [Topic])),
             {400, #{code => ?BAD_TOPIC, message => Message}};
         {error, {quota_exceeded, bad_topic}} ->
-            Message = list_to_binary(io_lib:format("Max topic metrics count is ~p, and topic cannot have wildcard ~p",
-                                        [emqx_topic_metrics:max_limit(), Topic])),
+            Message = list_to_binary(
+                          io_lib:format(
+                              "Max topic metrics count is ~p, and topic cannot have wildcard ~p",
+                              [emqx_topic_metrics:max_limit(), Topic])),
             {400, #{code => ?BAD_REQUEST, message => Message}};
         {error, already_existed} ->
             Message = list_to_binary(io_lib:format("Topic ~p already registered", [Topic])),

+ 5 - 3
apps/emqx_retainer/src/emqx_retainer_api.erl

@@ -151,7 +151,7 @@ with_topic(delete, #{bindings := Bindings}) ->
 
 -spec lookup(undefined | binary(),
              map(),
-             fun((#message{}) -> map())) ->
+             fun((emqx_types:message()) -> map())) ->
           {200, map()}.
 lookup(Topic, #{query_string := Qs}, Formatter) ->
     Page = maps:get(page, Qs, 1),
@@ -166,11 +166,13 @@ format_message(Messages, Formatter) when is_list(Messages)->
 format_message(Message, Formatter) ->
     Formatter(Message).
 
-format_message(#message{id = ID, qos = Qos, topic = Topic, from = From, timestamp = Timestamp, headers = Headers}) ->
+format_message(#message{ id = ID, qos = Qos, topic = Topic, from = From
+                       , timestamp = Timestamp, headers = Headers}) ->
     #{msgid => emqx_guid:to_hexstr(ID),
       qos => Qos,
       topic => Topic,
-      publish_at => list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, millisecond}])),
+      publish_at => list_to_binary(calendar:system_time_to_rfc3339(
+                                     Timestamp, [{unit, millisecond}])),
       from_clientid => to_bin_string(From),
       from_username => maps:get(username, Headers, <<>>)
      }.

+ 11 - 7
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -62,7 +62,7 @@ api_rules_list_create() ->
                     emqx_mgmt_util:array_schema(resp_schema(), <<"List rules successfully">>)}},
         post => #{
             description => <<"Create a new rule using given Id to all nodes in the cluster">>,
-            requestBody => emqx_mgmt_util:schema(post_req_schema(), <<"Rule parameters">>),
+            'requestBody' => emqx_mgmt_util:schema(post_req_schema(), <<"Rule parameters">>),
             responses => #{
                 <<"400">> =>
                     emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']),
@@ -94,12 +94,13 @@ api_rules_crud() ->
         put => #{
             description => <<"Create or update a rule by given Id to all nodes in the cluster">>,
             parameters => [param_path_id()],
-            requestBody => emqx_mgmt_util:schema(put_req_schema(), <<"Rule parameters">>),
+            'requestBody' => emqx_mgmt_util:schema(put_req_schema(), <<"Rule parameters">>),
             responses => #{
                 <<"400">> =>
                     emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']),
                 <<"200">> =>
-                    emqx_mgmt_util:schema(resp_schema(), <<"Create or update rule successfully">>)}},
+                    emqx_mgmt_util:schema(resp_schema(),
+                                          <<"Create or update rule successfully">>)}},
         delete => #{
             description => <<"Delete a rule by given Id from all nodes in the cluster">>,
             parameters => [param_path_id()],
@@ -113,7 +114,7 @@ api_rule_test() ->
     Metadata = #{
         post => #{
             description => <<"Test a rule">>,
-            requestBody => emqx_mgmt_util:schema(rule_test_req_schema(), <<"Rule parameters">>),
+            'requestBody' => emqx_mgmt_util:schema(rule_test_req_schema(), <<"Rule parameters">>),
             responses => #{
                 <<"400">> =>
                     emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']),
@@ -141,7 +142,7 @@ put_req_schema() ->
             description => <<"The outputs of the rule">>,
             type => array,
             items => #{
-                oneOf => [
+                'oneOf' => [
                     #{
                         type => string,
                         example => <<"channel_id_of_my_bridge">>,
@@ -253,7 +254,7 @@ crud_rules(post, #{body := #{<<"id">> := Id} = Params}) ->
         not_found ->
             case emqx:update_config(ConfPath, maps:remove(<<"id">>, Params), #{}) of
                 {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
-                    [Rule] = [R || R = #{id := Id0} <- AllRules, Id0 == Id],
+                    [Rule] = get_one_rule(AllRules, Id),
                     {201, format_rule_resp(Rule)};
                 {error, Reason} ->
                     ?SLOG(error, #{msg => "create_rule_failed",
@@ -280,7 +281,7 @@ crud_rules_by_id(put, #{bindings := #{id := Id}, body := Params}) ->
     ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
     case emqx:update_config(ConfPath, maps:remove(<<"id">>, Params), #{}) of
         {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
-            [Rule] = [R || R = #{id := Id0} <- AllRules, Id0 == Id],
+            [Rule] = get_one_rule(AllRules, Id),
             {200, format_rule_resp(Rule)};
         {error, Reason} ->
             ?SLOG(error, #{msg => "update_rule_failed",
@@ -339,3 +340,6 @@ do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) ->
 get_rule_metrics(Id) ->
     [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]))
      || Node <- mria_mnesia:running_nodes()].
+
+get_one_rule(AllRules, Id) ->
+    [R || R = #{id := Id0} <- AllRules, Id0 == Id].