|
|
@@ -56,7 +56,8 @@
|
|
|
qs2ms/2,
|
|
|
run_fuzzy_filter/2,
|
|
|
format_channel_info/1,
|
|
|
- format_channel_info/2
|
|
|
+ format_channel_info/2,
|
|
|
+ format_channel_info/3
|
|
|
]).
|
|
|
|
|
|
%% for batch operation
|
|
|
@@ -66,7 +67,10 @@
|
|
|
|
|
|
-define(CLIENT_QSCHEMA, [
|
|
|
{<<"node">>, atom},
|
|
|
+ %% list
|
|
|
{<<"username">>, binary},
|
|
|
+ %% list
|
|
|
+ {<<"clientid">>, binary},
|
|
|
{<<"ip_address">>, ip},
|
|
|
{<<"conn_state">>, atom},
|
|
|
{<<"clean_start">>, atom},
|
|
|
@@ -125,10 +129,13 @@ schema("/clients") ->
|
|
|
example => <<"emqx@127.0.0.1">>
|
|
|
})},
|
|
|
{username,
|
|
|
- hoconsc:mk(binary(), #{
|
|
|
+ hoconsc:mk(hoconsc:array(binary()), #{
|
|
|
in => query,
|
|
|
required => false,
|
|
|
- desc => <<"User name">>
|
|
|
+ desc => <<
|
|
|
+ "User name, multiple values can be specified by"
|
|
|
+ " repeating the parameter: username=u1&username=u2"
|
|
|
+ >>
|
|
|
})},
|
|
|
{ip_address,
|
|
|
hoconsc:mk(binary(), #{
|
|
|
@@ -202,7 +209,17 @@ schema("/clients") ->
|
|
|
"Search client connection creation time by less"
|
|
|
" than or equal method, rfc3339 or timestamp(millisecond)"
|
|
|
>>
|
|
|
- })}
|
|
|
+ })},
|
|
|
+ {clientid,
|
|
|
+ hoconsc:mk(hoconsc:array(binary()), #{
|
|
|
+ in => query,
|
|
|
+ required => false,
|
|
|
+ desc => <<
|
|
|
+ "Client ID, multiple values can be specified by"
|
|
|
+ " repeating the parameter: clientid=c1&clientid=c2"
|
|
|
+ >>
|
|
|
+ })},
|
|
|
+ ?R_REF(requested_client_fields)
|
|
|
],
|
|
|
responses => #{
|
|
|
200 =>
|
|
|
@@ -656,6 +673,30 @@ fields(message) ->
|
|
|
{from_clientid, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_clientid)})},
|
|
|
{from_username, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_username)})},
|
|
|
{payload, hoconsc:mk(binary(), #{desc => ?DESC(msg_payload)})}
|
|
|
+ ];
|
|
|
+fields(requested_client_fields) ->
|
|
|
+ %% NOTE: some Client fields actually returned in response are missing in schema:
|
|
|
+ %% enable_authn, is_persistent, listener, peerport
|
|
|
+ ClientFields = [element(1, F) || F <- fields(client)],
|
|
|
+ [
|
|
|
+ {fields,
|
|
|
+ hoconsc:mk(
|
|
|
+ hoconsc:union([all, hoconsc:array(hoconsc:enum(ClientFields))]),
|
|
|
+ #{
|
|
|
+ in => query,
|
|
|
+ required => false,
|
|
|
+ default => all,
|
|
|
+ desc => <<"Comma separated list of client fields to return in the response">>,
|
|
|
+ converter => fun
|
|
|
+ (all, _Opts) ->
|
|
|
+ all;
|
|
|
+ (<<"all">>, _Opts) ->
|
|
|
+ all;
|
|
|
+ (CsvFields, _Opts) when is_binary(CsvFields) ->
|
|
|
+ binary:split(CsvFields, <<",">>, [global, trim_all])
|
|
|
+ end
|
|
|
+ }
|
|
|
+ )}
|
|
|
].
|
|
|
|
|
|
%%%==============================================================================================
|
|
|
@@ -971,7 +1012,10 @@ list_clients_cluster_query(QString, Options) ->
|
|
|
?CHAN_INFO_TAB, NQString, fun ?MODULE:qs2ms/2, Meta, Options
|
|
|
),
|
|
|
Res = do_list_clients_cluster_query(Nodes, QueryState, ResultAcc),
|
|
|
- emqx_mgmt_api:format_query_result(fun ?MODULE:format_channel_info/2, Meta, Res)
|
|
|
+ Opts = #{fields => maps:get(<<"fields">>, QString, all)},
|
|
|
+ emqx_mgmt_api:format_query_result(
|
|
|
+ fun ?MODULE:format_channel_info/3, Meta, Res, Opts
|
|
|
+ )
|
|
|
catch
|
|
|
throw:{bad_value_type, {Key, ExpectedType, AcutalValue}} ->
|
|
|
{error, invalid_query_string_param, {Key, ExpectedType, AcutalValue}}
|
|
|
@@ -1023,7 +1067,8 @@ list_clients_node_query(Node, QString, Options) ->
|
|
|
?CHAN_INFO_TAB, NQString, fun ?MODULE:qs2ms/2, Meta, Options
|
|
|
),
|
|
|
Res = do_list_clients_node_query(Node, QueryState, ResultAcc),
|
|
|
- emqx_mgmt_api:format_query_result(fun ?MODULE:format_channel_info/2, Meta, Res)
|
|
|
+ Opts = #{fields => maps:get(<<"fields">>, QString, all)},
|
|
|
+ emqx_mgmt_api:format_query_result(fun ?MODULE:format_channel_info/3, Meta, Res, Opts)
|
|
|
end.
|
|
|
|
|
|
add_persistent_session_count(QueryState0 = #{total := Totals0}) ->
|
|
|
@@ -1190,19 +1235,36 @@ qs2ms(_Tab, {QString, FuzzyQString}) ->
|
|
|
-spec qs2ms(list()) -> ets:match_spec().
|
|
|
qs2ms(Qs) ->
|
|
|
{MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}),
|
|
|
- [{{'$1', MtchHead, '_'}, Conds, ['$_']}].
|
|
|
+ [{{{'$1', '_'}, MtchHead, '_'}, Conds, ['$_']}].
|
|
|
|
|
|
qs2ms([], _, {MtchHead, Conds}) ->
|
|
|
{MtchHead, lists:reverse(Conds)};
|
|
|
+qs2ms([{Key, '=:=', Value} | Rest], N, {MtchHead, Conds}) when is_list(Value) ->
|
|
|
+ {Holder, NxtN} = holder_and_nxt(Key, N),
|
|
|
+ NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Holder)),
|
|
|
+ qs2ms(Rest, NxtN, {NMtchHead, [orelse_cond(Holder, Value) | Conds]});
|
|
|
qs2ms([{Key, '=:=', Value} | Rest], N, {MtchHead, Conds}) ->
|
|
|
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Value)),
|
|
|
qs2ms(Rest, N, {NMtchHead, Conds});
|
|
|
qs2ms([Qs | Rest], N, {MtchHead, Conds}) ->
|
|
|
- Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8),
|
|
|
+ Holder = holder(N),
|
|
|
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)),
|
|
|
NConds = put_conds(Qs, Holder, Conds),
|
|
|
qs2ms(Rest, N + 1, {NMtchHead, NConds}).
|
|
|
|
|
|
+%% This is a special case: clientid is a part of the key (ClientId, Pid}, as the table is ordered_set,
|
|
|
+%% using partially bound key optimizes traversal.
|
|
|
+holder_and_nxt(clientid, N) ->
|
|
|
+ {'$1', N};
|
|
|
+holder_and_nxt(_, N) ->
|
|
|
+ {holder(N), N + 1}.
|
|
|
+
|
|
|
+holder(N) -> list_to_atom([$$ | integer_to_list(N)]).
|
|
|
+
|
|
|
+orelse_cond(Holder, ValuesList) ->
|
|
|
+ Conds = [{'=:=', Holder, V} || V <- ValuesList],
|
|
|
+ erlang:list_to_tuple(['orelse' | Conds]).
|
|
|
+
|
|
|
put_conds({_, Op, V}, Holder, Conds) ->
|
|
|
[{Op, Holder, V} | Conds];
|
|
|
put_conds({_, Op1, V1, Op2, V2}, Holder, Conds) ->
|
|
|
@@ -1212,8 +1274,8 @@ put_conds({_, Op1, V1, Op2, V2}, Holder, Conds) ->
|
|
|
| Conds
|
|
|
].
|
|
|
|
|
|
-ms(clientid, X) ->
|
|
|
- #{clientinfo => #{clientid => X}};
|
|
|
+ms(clientid, _X) ->
|
|
|
+ #{};
|
|
|
ms(username, X) ->
|
|
|
#{clientinfo => #{username => X}};
|
|
|
ms(conn_state, X) ->
|
|
|
@@ -1257,7 +1319,11 @@ format_channel_info({ClientId, PSInfo}) ->
|
|
|
%% offline persistent session
|
|
|
format_persistent_session_info(ClientId, PSInfo).
|
|
|
|
|
|
-format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) ->
|
|
|
+format_channel_info(WhichNode, ChanInfo) ->
|
|
|
+ DefaultOpts = #{fields => all},
|
|
|
+ format_channel_info(WhichNode, ChanInfo, DefaultOpts).
|
|
|
+
|
|
|
+format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}, Opts) ->
|
|
|
Node = maps:get(node, ClientInfo0, WhichNode),
|
|
|
ClientInfo1 = emqx_utils_maps:deep_remove([conninfo, clientid], ClientInfo0),
|
|
|
ClientInfo2 = emqx_utils_maps:deep_remove([conninfo, username], ClientInfo1),
|
|
|
@@ -1276,45 +1342,17 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) ->
|
|
|
ClientInfoMap5 = convert_expiry_interval_unit(ClientInfoMap4),
|
|
|
ClientInfoMap = maps:put(connected, Connected, ClientInfoMap5),
|
|
|
|
|
|
- RemoveList =
|
|
|
- [
|
|
|
- auth_result,
|
|
|
- peername,
|
|
|
- sockname,
|
|
|
- peerhost,
|
|
|
- conn_state,
|
|
|
- send_pend,
|
|
|
- conn_props,
|
|
|
- peercert,
|
|
|
- sockstate,
|
|
|
- subscriptions,
|
|
|
- receive_maximum,
|
|
|
- protocol,
|
|
|
- is_superuser,
|
|
|
- sockport,
|
|
|
- anonymous,
|
|
|
- socktype,
|
|
|
- active_n,
|
|
|
- await_rel_timeout,
|
|
|
- conn_mod,
|
|
|
- sockname,
|
|
|
- retry_interval,
|
|
|
- upgrade_qos,
|
|
|
- zone,
|
|
|
- %% session_id, defined in emqx_session.erl
|
|
|
- id,
|
|
|
- acl
|
|
|
- ],
|
|
|
+ #{fields := RequestedFields} = Opts,
|
|
|
TimesKeys = [created_at, connected_at, disconnected_at],
|
|
|
%% format timestamp to rfc3339
|
|
|
result_format_undefined_to_null(
|
|
|
lists:foldl(
|
|
|
fun result_format_time_fun/2,
|
|
|
- maps:without(RemoveList, ClientInfoMap),
|
|
|
+ with_client_info_fields(ClientInfoMap, RequestedFields),
|
|
|
TimesKeys
|
|
|
)
|
|
|
);
|
|
|
-format_channel_info(undefined, {ClientId, PSInfo0 = #{}}) ->
|
|
|
+format_channel_info(undefined, {ClientId, PSInfo0 = #{}}, _Opts) ->
|
|
|
format_persistent_session_info(ClientId, PSInfo0).
|
|
|
|
|
|
format_persistent_session_info(ClientId, PSInfo0) ->
|
|
|
@@ -1337,6 +1375,41 @@ format_persistent_session_info(ClientId, PSInfo0) ->
|
|
|
),
|
|
|
result_format_undefined_to_null(PSInfo).
|
|
|
|
|
|
+with_client_info_fields(ClientInfoMap, all) ->
|
|
|
+ RemoveList =
|
|
|
+ [
|
|
|
+ auth_result,
|
|
|
+ peername,
|
|
|
+ sockname,
|
|
|
+ peerhost,
|
|
|
+ peerport,
|
|
|
+ conn_state,
|
|
|
+ send_pend,
|
|
|
+ conn_props,
|
|
|
+ peercert,
|
|
|
+ sockstate,
|
|
|
+ subscriptions,
|
|
|
+ receive_maximum,
|
|
|
+ protocol,
|
|
|
+ is_superuser,
|
|
|
+ sockport,
|
|
|
+ anonymous,
|
|
|
+ socktype,
|
|
|
+ active_n,
|
|
|
+ await_rel_timeout,
|
|
|
+ conn_mod,
|
|
|
+ sockname,
|
|
|
+ retry_interval,
|
|
|
+ upgrade_qos,
|
|
|
+ zone,
|
|
|
+ %% session_id, defined in emqx_session.erl
|
|
|
+ id,
|
|
|
+ acl
|
|
|
+ ],
|
|
|
+ maps:without(RemoveList, ClientInfoMap);
|
|
|
+with_client_info_fields(ClientInfoMap, RequestedFields) when is_list(RequestedFields) ->
|
|
|
+ maps:with(RequestedFields, ClientInfoMap).
|
|
|
+
|
|
|
format_msgs_resp(MsgType, Msgs, Meta, QString) ->
|
|
|
#{
|
|
|
<<"payload">> := PayloadFmt,
|