Jelajahi Sumber

chore(gw): improve the gateway api swagger codes

JianBo He 4 tahun lalu
induk
melakukan
fd12a7ac9c

+ 189 - 245
apps/emqx_gateway/src/emqx_gateway_api.erl

@@ -18,12 +18,6 @@
 
 
 -behaviour(minirest_api).
 -behaviour(minirest_api).
 
 
--compile(nowarn_unused_function).
-
--import(emqx_mgmt_util,
-        [ schema/1
-        ]).
-
 -import(emqx_gateway_http,
 -import(emqx_gateway_http,
         [ return_http_error/2
         [ return_http_error/2
         ]).
         ]).
@@ -37,18 +31,160 @@
         , gateway_insta_stats/2
         , gateway_insta_stats/2
         ]).
         ]).
 
 
--define(EXAMPLE_GATEWAY_LIST,
-        [ #{ name => <<"lwm2m">>
-           , status => <<"running">>
-           , started_at => <<"2021-08-19T11:45:56.006373+08:00">>
-           , max_connection => 1024000
-           , current_connection => 1000
-           , listeners => [
-                #{name => <<"lw-udp-1">>, status => <<"activing">>},
-                #{name => <<"lw-udp-2">>, status => <<"inactived">>}
-             ]
-           }
-        ]).
+%%--------------------------------------------------------------------
+%% minirest behaviour callbacks
+%%--------------------------------------------------------------------
+
+api_spec() ->
+    {metadata(apis()), []}.
+
+apis() ->
+    [ {"/gateway", gateway}
+    , {"/gateway/:name", gateway_insta}
+    , {"/gateway/:name/stats", gateway_insta_stats}
+    ].
+%%--------------------------------------------------------------------
+%% http handlers
+
+gateway(get, Request) ->
+    Params = maps:get(query_string, Request, #{}),
+    Status = case maps:get(<<"status">>, Params, undefined) of
+                 undefined -> all;
+                 S0 -> binary_to_existing_atom(S0, utf8)
+             end,
+    {200, emqx_gateway_http:gateways(Status)}.
+
+gateway_insta(delete, #{bindings := #{name := Name0}}) ->
+    Name = binary_to_existing_atom(Name0),
+    case emqx_gateway:unload(Name) of
+        ok ->
+            {204};
+        {error, not_found} ->
+            return_http_error(404, <<"Gateway not found">>)
+    end;
+gateway_insta(get, #{bindings := #{name := Name0}}) ->
+    Name = binary_to_existing_atom(Name0),
+    case emqx_gateway:lookup(Name) of
+        #{config := _Config} ->
+            %% FIXME: Got the parsed config, but we should return rawconfig to
+            %% frontend
+            RawConf = emqx_config:fill_defaults(
+                        emqx_config:get_root_raw([<<"gateway">>])
+                       ),
+            {200, emqx_map_lib:deep_get([<<"gateway">>, Name0], RawConf)};
+        undefined ->
+            return_http_error(404, <<"Gateway not found">>)
+    end;
+gateway_insta(put, #{body := RawConfsIn,
+                     bindings := #{name := Name}
+                    }) ->
+    %% FIXME: Cluster Consistence ??
+    case emqx_gateway:update_rawconf(Name, RawConfsIn) of
+        ok ->
+            {200};
+        {error, not_found} ->
+            return_http_error(404, <<"Gateway not found">>);
+        {error, Reason} ->
+            return_http_error(500, Reason)
+    end.
+
+gateway_insta_stats(get, _Req) ->
+    return_http_error(401, <<"Implement it later (maybe 5.1)">>).
+
+
+%%--------------------------------------------------------------------
+%% Swagger defines
+%%--------------------------------------------------------------------
+
+metadata(APIs) ->
+    metadata(APIs, []).
+metadata([], APIAcc) ->
+    lists:reverse(APIAcc);
+metadata([{Path, Fun}|More], APIAcc) ->
+    Methods = [get, post, put, delete, patch],
+    Mds = lists:foldl(fun(M, Acc) ->
+              try
+                  Acc#{M => swagger(Path, M)}
+              catch
+                  error : function_clause ->
+                      Acc
+              end
+          end, #{}, Methods),
+    metadata(More, [{Path, Mds, Fun} | APIAcc]).
+
+swagger("/gateway", get) ->
+    #{ description => <<"Get gateway list">>
+     , parameters => params_gateway_status_in_qs()
+     , responses =>
+        #{ <<"200">> => schema_gateway_overview_list() }
+     };
+swagger("/gateway/:name", get) ->
+    #{ description => <<"Get the gateway configurations">>
+     , parameters => params_gateway_name_in_path()
+     , responses =>
+        #{ <<"404">> => schema_not_found()
+         , <<"200">> => schema_gateway_conf()
+         }
+      };
+swagger("/gateway/:name", delete) ->
+    #{ description => <<"Delete/Unload the gateway">>
+     , parameters => params_gateway_name_in_path()
+     , responses =>
+        #{ <<"404">> => schema_not_found()
+         , <<"204">> => schema_no_content()
+         }
+      };
+swagger("/gateway/:name", put) ->
+    #{ description => <<"Update the gateway configurations/status">>
+     , parameters => params_gateway_name_in_path()
+     , requestBody => schema_gateway_conf()
+     , responses =>
+        #{ <<"404">> => schema_not_found()
+         , <<"200">> => schema_no_content()
+         }
+     };
+swagger("/gateway/:name/stats", get) ->
+    #{ description => <<"Get gateway Statistic">>
+     , parameters => params_gateway_name_in_path()
+     , responses =>
+        #{ <<"404">> => schema_not_found()
+         , <<"200">> => schema_gateway_stats()
+         }
+     }.
+
+%%--------------------------------------------------------------------
+%% params defines
+
+params_gateway_name_in_path() ->
+    [#{ name => name
+      , in => path
+      , schema => #{type => string}
+      , required => true
+      }].
+
+params_gateway_status_in_qs() ->
+    [#{ name => status
+      , in => query
+      , schema => #{type => string}
+      , required => false
+      }].
+
+%%--------------------------------------------------------------------
+%% schemas
+
+schema_not_found() ->
+    emqx_mgmt_util:error_schema(<<"Gateway not found or unloaded">>).
+
+schema_no_content() ->
+    #{description => <<"No Content">>}.
+
+schema_gateway_overview_list() ->
+    emqx_mgmt_util:array_schema(
+      #{ type => object
+       , properties => properties_gateway_overview()
+       },
+      <<"Gateway Overview list">>
+     ).
 
 
 %% XXX: This is whole confs for all type gateways. It is used to fill the
 %% XXX: This is whole confs for all type gateways. It is used to fill the
 %% default configurations and generate the swagger-schema
 %% default configurations and generate the swagger-schema
@@ -154,234 +290,42 @@
 
 
 %% --- END
 %% --- END
 
 
--define(EXAMPLE_GATEWAY_STATS, #{
-            max_connection => 10240000,
-            current_connection => 1000,
-            messages_in => 100.24,
-            messages_out => 32.5
-        }).
-
-%%--------------------------------------------------------------------
-%% minirest behaviour callbacks
-%%--------------------------------------------------------------------
-
-api_spec() ->
-    {apis(), schemas()}.
-
-apis() ->
-    [ {"/gateway", metadata(gateway), gateway}
-    , {"/gateway/:name", metadata(gateway_insta), gateway_insta}
-    , {"/gateway/:name/stats", metadata(gateway_insta_stats), gateway_insta_stats}
-    ].
-
-metadata(gateway) ->
-    #{get => #{
-        description => <<"Get gateway list">>,
-        parameters => [
-            #{name => status,
-              in => query,
-              schema => #{type => string},
-              required => false
-             }
-        ],
-        responses => #{
-            <<"200">> => #{
-                description => <<"OK">>,
-                content => #{
-                    'application/json' => #{
-                        schema => minirest:ref(<<"gateway_overrview">>),
-                        examples => #{
-                            simple => #{
-                                summary => <<"Gateway List Example">>,
-                                value => emqx_json:encode(?EXAMPLE_GATEWAY_LIST)
-                            }
-                        }
-                    }
-                }
-            }
-        }
-     }};
-
-metadata(gateway_insta) ->
-    UriNameParamDef = #{name => name,
-                        in => path,
-                        schema => #{type => string},
-                        required => true
-                       },
-    NameNotFoundRespDef =
-        #{description => <<"Not Found">>,
-          content => #{
-            'application/json' => #{
-                schema => minirest:ref(<<"error">>),
-                examples => #{
-                    simple => #{
-                        summary => <<"Not Found">>,
-                        value => #{
-                            code => <<"NOT_FOUND">>,
-                            message => <<"The gateway not found">>
-                        }
-                    }
-                }
-            }
-         }},
-    #{delete => #{
-        description => <<"Delete/Unload the gateway">>,
-        parameters => [UriNameParamDef],
-        responses => #{
-            <<"404">> => NameNotFoundRespDef,
-            <<"204">> => #{description => <<"No Content">>}
-        }
-      },
-      get => #{
-        description => <<"Get the gateway configurations">>,
-        parameters => [UriNameParamDef],
-        responses => #{
-            <<"404">> => NameNotFoundRespDef,
-            <<"200">> => schema(schema_for_gateway_conf())
-        }
-      },
-      put => #{
-        description => <<"Update the gateway configurations/status">>,
-        parameters => [UriNameParamDef],
-        requestBody => schema(schema_for_gateway_conf()),
-        responses => #{
-            <<"404">> => NameNotFoundRespDef,
-            <<"200">> => #{description => <<"Changed">>}
-        }
-      }
-     };
+schema_gateway_conf() ->
+    emqx_mgmt_util:schema(
+      #{oneOf =>
+        [ emqx_mgmt_api_configs:gen_schema(?STOMP_GATEWAY_CONFS)
+        , emqx_mgmt_api_configs:gen_schema(?MQTTSN_GATEWAY_CONFS)
+        , emqx_mgmt_api_configs:gen_schema(?COAP_GATEWAY_CONFS)
+        , emqx_mgmt_api_configs:gen_schema(?LWM2M_GATEWAY_CONFS)
+        , emqx_mgmt_api_configs:gen_schema(?EXPROTO_GATEWAY_CONFS)
+        ]}).
 
 
-metadata(gateway_insta_stats) ->
-    #{get => #{
-        description => <<"Get gateway Statistic">>,
-        responses => #{
-            <<"200">> => #{
-                description => <<"OK">>,
-                content => #{
-                    'application/json' => #{
-                        schema => minirest:ref(<<"gateway_stats">>),
-                        examples => #{
-                            simple => #{
-                                summary => <<"Gateway Statistic">>,
-                                value => emqx_json:encode(?EXAMPLE_GATEWAY_STATS)
-                            }
-                        }
-                    }
-                }
-            }
-        }
-     }}.
-
-schemas() ->
-    [ #{<<"gateway_overrview">> => schema_for_gateway_overrview()}
-    , #{<<"gateway_stats">> => schema_for_gateway_stats()}
-    ].
-
-schema_for_gateway_overrview() ->
-    #{type => array,
-      items => #{
-        type => object,
-        properties => #{
-            name => #{
-                type => string,
-                example => <<"lwm2m">>
-            },
-            status => #{
-                type => string,
-                enum => [<<"running">>, <<"stopped">>, <<"unloaded">>],
-                example => <<"running">>
-            },
-            started_at => #{
-                type => string,
-                example => <<"2021-08-19T11:45:56.006373+08:00">>
-            },
-            max_connection => #{
-                type => integer,
-                example => 1024000
-            },
-            current_connection => #{
-                type => integer,
-                example => 1000
-            },
-            listeners => #{
-                type => array,
-                items => #{
-                    type => object,
-                    properties => #{
-                        name => #{
-                            type => string,
-                            example => <<"lw-udp">>
-                        },
-                        status => #{
-                            type => string,
-                            enum => [<<"activing">>, <<"inactived">>]
-                        }
-                    }
-                }
-            }
-        }
-      }
-     }.
-
-schema_for_gateway_conf() ->
-   #{oneOf =>
-     [ emqx_mgmt_api_configs:gen_schema(?STOMP_GATEWAY_CONFS)
-     , emqx_mgmt_api_configs:gen_schema(?MQTTSN_GATEWAY_CONFS)
-     , emqx_mgmt_api_configs:gen_schema(?COAP_GATEWAY_CONFS)
-     , emqx_mgmt_api_configs:gen_schema(?LWM2M_GATEWAY_CONFS)
-     , emqx_mgmt_api_configs:gen_schema(?EXPROTO_GATEWAY_CONFS)
-     ]}.
-
-schema_for_gateway_stats() ->
-    #{type => object,
-      properties => #{
-        a_key => #{type => string}
-     }}.
+schema_gateway_stats() ->
+    emqx_mgmt_util:schema(
+      #{ type => object
+       , properties =>
+        #{ a_key => #{type => string}
+       }}).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
-%% http handlers
-
-gateway(get, Request) ->
-    Params = maps:get(query_string, Request, #{}),
-    Status = case maps:get(<<"status">>, Params, undefined) of
-                 undefined -> all;
-                 S0 -> binary_to_existing_atom(S0, utf8)
-             end,
-    {200, emqx_gateway_http:gateways(Status)}.
-
-gateway_insta(delete, #{bindings := #{name := Name0}}) ->
-    Name = binary_to_existing_atom(Name0),
-    case emqx_gateway:unload(Name) of
-        ok ->
-            {200};
-        {error, not_found} ->
-            return_http_error(404, <<"Gateway not found">>)
-    end;
-gateway_insta(get, #{bindings := #{name := Name0}}) ->
-    Name = binary_to_existing_atom(Name0),
-    case emqx_gateway:lookup(Name) of
-        #{config := _Config} ->
-            %% FIXME: Got the parsed config, but we should return rawconfig to
-            %% frontend
-            RawConf = emqx_config:fill_defaults(
-                        emqx_config:get_root_raw([<<"gateway">>])
-                       ),
-            {200, emqx_map_lib:deep_get([<<"gateway">>, Name0], RawConf)};
-        undefined ->
-            return_http_error(404, <<"Gateway not found">>)
-    end;
-gateway_insta(put, #{body := RawConfsIn,
-                     bindings := #{name := Name}
-                    }) ->
-    %% FIXME: Cluster Consistence ??
-    case emqx_gateway:update_rawconf(Name, RawConfsIn) of
-        ok ->
-            {200};
-        {error, not_found} ->
-            return_http_error(404, <<"Gateway not found">>);
-        {error, Reason} ->
-            return_http_error(500, Reason)
-    end.
+%% properties
 
 
-gateway_insta_stats(get, _Req) ->
-    return_http_error(401, <<"Implement it later (maybe 5.1)">>).
+properties_gateway_overview() ->
+    ListenerProps =
+        [ {name, string,
+           <<"Listener Name">>}
+        , {status, string,
+           <<"Listener Status">>, [<<"activing">>, <<"inactived">>]}
+        ],
+    emqx_mgmt_util:properties(
+      [ {name, string,
+         <<"Gateway Name">>}
+      , {status, string,
+         <<"Gateway Status">>,
+         [<<"running">>, <<"stopped">>, <<"unloaded">>]}
+      , {started_at, string,
+         <<>>}
+      , {max_connection, integer, <<>>}
+      , {current_connection, integer, <<>>}
+      , {listeners, {array, object}, ListenerProps}
+      ]).

+ 2 - 2
apps/emqx_gateway/src/emqx_gateway_api_clients.erl

@@ -481,7 +481,7 @@ queries(Ls) ->
     end, Ls).
     end, Ls).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
-%% Schemas
+%% schemas
 
 
 schema_not_found() ->
 schema_not_found() ->
     emqx_mgmt_util:error_schema(<<"Gateway not found or unloaded">>).
     emqx_mgmt_util:error_schema(<<"Gateway not found or unloaded">>).
@@ -518,7 +518,7 @@ schema_subscription() ->
      ).
      ).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
-%% Object properties def
+%% properties defines
 
 
 properties_client() ->
 properties_client() ->
     emqx_mgmt_util:properties(
     emqx_mgmt_util:properties(

+ 3 - 3
apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl

@@ -548,9 +548,9 @@ check_subscribed_status({SubId, {ParsedTopic, _SubOpts}},
                           }) ->
                           }) ->
     MountedTopic = emqx_mountpoint:mount(Mountpoint, ParsedTopic),
     MountedTopic = emqx_mountpoint:mount(Mountpoint, ParsedTopic),
     case lists:keyfind(SubId, 1, Subs) of
     case lists:keyfind(SubId, 1, Subs) of
-        {SubId, MountedTopic, _Ack} ->
+        {SubId, MountedTopic, _Ack, _SubOpts} ->
             ok;
             ok;
-        {SubId, _OtherTopic, _Ack} ->
+        {SubId, _OtherTopic, _Ack, _SubOpts} ->
             {error, "Conflict subscribe id"};
             {error, "Conflict subscribe id"};
         false ->
         false ->
             ok
             ok
@@ -795,7 +795,7 @@ handle_deliver(Delivers,
     Frames0 = lists:foldl(fun({_, _, Message}, Acc) ->
     Frames0 = lists:foldl(fun({_, _, Message}, Acc) ->
                 Topic0 = emqx_message:topic(Message),
                 Topic0 = emqx_message:topic(Message),
                 case lists:keyfind(Topic0, 2, Subs) of
                 case lists:keyfind(Topic0, 2, Subs) of
-                    {Id, Topic, Ack} ->
+                    {Id, Topic, Ack, _SubOpts} ->
                         %% XXX: refactor later
                         %% XXX: refactor later
                         metrics_inc('messages.delivered', Channel),
                         metrics_inc('messages.delivered', Channel),
                         NMessage = run_hooks_without_metrics(
                         NMessage = run_hooks_without_metrics(

+ 5 - 0
apps/emqx_management/src/emqx_mgmt_util.erl

@@ -224,6 +224,11 @@ properties([{Key, Type} | Props], Acc) ->
 properties([{Key, object, Props1} | Props], Acc) ->
 properties([{Key, object, Props1} | Props], Acc) ->
     properties(Props, maps:put(Key, #{type => object,
     properties(Props, maps:put(Key, #{type => object,
                                       properties => properties(Props1)}, Acc));
                                       properties => properties(Props1)}, Acc));
+properties([{Key, {array, object}, Props1} | Props], Acc) ->
+    properties(Props, maps:put(Key, #{type => array,
+                                      items => #{type => object,
+                                                 properties => properties(Props1)
+                                                }}, Acc));
 properties([{Key, {array, Type}, Desc} | Props], Acc) ->
 properties([{Key, {array, Type}, Desc} | Props], Acc) ->
     properties(Props, maps:put(Key, #{type => array,
     properties(Props, maps:put(Key, #{type => array,
                                       items => #{type => Type},
                                       items => #{type => Type},