|
@@ -218,11 +218,13 @@ fields(listener_type_status) ->
|
|
|
fields(listener_id_status) ->
|
|
fields(listener_id_status) ->
|
|
|
fields(listener_id) ++
|
|
fields(listener_id) ++
|
|
|
[
|
|
[
|
|
|
|
|
+ {type, ?HOCON(?ENUM(listeners_type()), #{desc => "Listener type", required => true})},
|
|
|
|
|
+ {name, ?HOCON(string(), #{desc => "Listener name", required => true})},
|
|
|
{enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})},
|
|
{enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})},
|
|
|
{number, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId counter"})},
|
|
{number, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId counter"})},
|
|
|
{bind,
|
|
{bind,
|
|
|
?HOCON(
|
|
?HOCON(
|
|
|
- hoconsc:union([emqx_schema:ip_port(), integer()]),
|
|
|
|
|
|
|
+ emqx_schema:ip_port(),
|
|
|
#{desc => "Listener bind addr", required => true}
|
|
#{desc => "Listener bind addr", required => true}
|
|
|
)},
|
|
)},
|
|
|
{acceptors, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId acceptors"})},
|
|
{acceptors, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId acceptors"})},
|
|
@@ -231,12 +233,24 @@ fields(listener_id_status) ->
|
|
|
];
|
|
];
|
|
|
fields(status) ->
|
|
fields(status) ->
|
|
|
[
|
|
[
|
|
|
|
|
+ {running,
|
|
|
|
|
+ ?HOCON(
|
|
|
|
|
+ hoconsc:union([inconsistent, boolean()]),
|
|
|
|
|
+ #{desc => "Listener running status", required => true}
|
|
|
|
|
+ )},
|
|
|
{max_connections,
|
|
{max_connections,
|
|
|
?HOCON(hoconsc:union([infinity, integer()]), #{desc => "Max connections"})},
|
|
?HOCON(hoconsc:union([infinity, integer()]), #{desc => "Max connections"})},
|
|
|
{current_connections, ?HOCON(non_neg_integer(), #{desc => "Current connections"})}
|
|
{current_connections, ?HOCON(non_neg_integer(), #{desc => "Current connections"})}
|
|
|
];
|
|
];
|
|
|
fields(node_status) ->
|
|
fields(node_status) ->
|
|
|
- fields(node) ++ fields(status);
|
|
|
|
|
|
|
+ [
|
|
|
|
|
+ {"node",
|
|
|
|
|
+ ?HOCON(atom(), #{
|
|
|
|
|
+ desc => "Node name",
|
|
|
|
|
+ example => "emqx@127.0.0.1"
|
|
|
|
|
+ })},
|
|
|
|
|
+ {status, ?HOCON(?R_REF(status))}
|
|
|
|
|
+ ];
|
|
|
fields(Type) ->
|
|
fields(Type) ->
|
|
|
Listeners = listeners_info(#{bind => true}) ++ listeners_info(#{bind => false}),
|
|
Listeners = listeners_info(#{bind => true}) ++ listeners_info(#{bind => false}),
|
|
|
[Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type],
|
|
[Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type],
|
|
@@ -311,7 +325,7 @@ listener_type_status(get, _Request) ->
|
|
|
Listeners = maps:to_list(listener_status_by_type(list_listeners(), #{})),
|
|
Listeners = maps:to_list(listener_status_by_type(list_listeners(), #{})),
|
|
|
List = lists:map(
|
|
List = lists:map(
|
|
|
fun({Type, L}) ->
|
|
fun({Type, L}) ->
|
|
|
- L1 = maps:without([bind, acceptors], L),
|
|
|
|
|
|
|
+ L1 = maps:without([bind, acceptors, name], L),
|
|
|
L1#{type => Type}
|
|
L1#{type => Type}
|
|
|
end,
|
|
end,
|
|
|
Listeners
|
|
Listeners
|
|
@@ -453,7 +467,7 @@ listener_status_by_id(NodeL) ->
|
|
|
fun({Id, L}) ->
|
|
fun({Id, L}) ->
|
|
|
L1 = maps:remove(ids, L),
|
|
L1 = maps:remove(ids, L),
|
|
|
#{node_status := Nodes} = L1,
|
|
#{node_status := Nodes} = L1,
|
|
|
- L1#{number => maps:size(Nodes), id => Id}
|
|
|
|
|
|
|
+ L1#{number => length(Nodes), id => Id}
|
|
|
end,
|
|
end,
|
|
|
Listeners
|
|
Listeners
|
|
|
).
|
|
).
|
|
@@ -510,67 +524,75 @@ wrap_rpc(Res) ->
|
|
|
format_status(Key, Node, Listener, Acc) ->
|
|
format_status(Key, Node, Listener, Acc) ->
|
|
|
#{
|
|
#{
|
|
|
<<"id">> := Id,
|
|
<<"id">> := Id,
|
|
|
|
|
+ <<"type">> := Type,
|
|
|
|
|
+ <<"enabled">> := Enabled,
|
|
|
<<"running">> := Running,
|
|
<<"running">> := Running,
|
|
|
<<"max_connections">> := MaxConnections,
|
|
<<"max_connections">> := MaxConnections,
|
|
|
<<"current_connections">> := CurrentConnections,
|
|
<<"current_connections">> := CurrentConnections,
|
|
|
<<"acceptors">> := Acceptors,
|
|
<<"acceptors">> := Acceptors,
|
|
|
<<"bind">> := Bind
|
|
<<"bind">> := Bind
|
|
|
} = Listener,
|
|
} = Listener,
|
|
|
|
|
+ {ok, #{name := Name}} = emqx_listeners:parse_listener_id(Id),
|
|
|
GroupKey = maps:get(Key, Listener),
|
|
GroupKey = maps:get(Key, Listener),
|
|
|
case maps:find(GroupKey, Acc) of
|
|
case maps:find(GroupKey, Acc) of
|
|
|
error ->
|
|
error ->
|
|
|
Acc#{
|
|
Acc#{
|
|
|
GroupKey => #{
|
|
GroupKey => #{
|
|
|
- enable => Running,
|
|
|
|
|
|
|
+ name => Name,
|
|
|
|
|
+ type => Type,
|
|
|
|
|
+ enable => Enabled,
|
|
|
ids => [Id],
|
|
ids => [Id],
|
|
|
acceptors => Acceptors,
|
|
acceptors => Acceptors,
|
|
|
- bind => Bind,
|
|
|
|
|
|
|
+ bind => format_raw_bind(Bind),
|
|
|
status => #{
|
|
status => #{
|
|
|
|
|
+ running => Running,
|
|
|
max_connections => MaxConnections,
|
|
max_connections => MaxConnections,
|
|
|
current_connections => CurrentConnections
|
|
current_connections => CurrentConnections
|
|
|
},
|
|
},
|
|
|
- node_status => #{
|
|
|
|
|
- Node => #{
|
|
|
|
|
- max_connections => MaxConnections,
|
|
|
|
|
- current_connections => CurrentConnections
|
|
|
|
|
|
|
+ node_status => [
|
|
|
|
|
+ #{
|
|
|
|
|
+ node => Node,
|
|
|
|
|
+ status => #{
|
|
|
|
|
+ running => Running,
|
|
|
|
|
+ max_connections => MaxConnections,
|
|
|
|
|
+ current_connections => CurrentConnections
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
- }
|
|
|
|
|
|
|
+ ]
|
|
|
}
|
|
}
|
|
|
};
|
|
};
|
|
|
{ok, GroupValue} ->
|
|
{ok, GroupValue} ->
|
|
|
#{
|
|
#{
|
|
|
ids := Ids,
|
|
ids := Ids,
|
|
|
status := #{
|
|
status := #{
|
|
|
|
|
+ running := Running0,
|
|
|
max_connections := MaxConnections0,
|
|
max_connections := MaxConnections0,
|
|
|
current_connections := CurrentConnections0
|
|
current_connections := CurrentConnections0
|
|
|
},
|
|
},
|
|
|
node_status := NodeStatus0
|
|
node_status := NodeStatus0
|
|
|
} = GroupValue,
|
|
} = GroupValue,
|
|
|
- NodeStatus =
|
|
|
|
|
- case maps:find(Node, NodeStatus0) of
|
|
|
|
|
- error ->
|
|
|
|
|
- NodeStatus0#{
|
|
|
|
|
- Node => #{
|
|
|
|
|
- max_connections => MaxConnections,
|
|
|
|
|
- current_connections => CurrentConnections
|
|
|
|
|
- }
|
|
|
|
|
- };
|
|
|
|
|
- {ok, #{
|
|
|
|
|
- max_connections := PrevMax,
|
|
|
|
|
- current_connections := PrevCurr
|
|
|
|
|
- }} ->
|
|
|
|
|
- NodeStatus0#{
|
|
|
|
|
- Node => #{
|
|
|
|
|
- max_connections => max_conn(MaxConnections, PrevMax),
|
|
|
|
|
- current_connections => CurrentConnections + PrevCurr
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ NodeStatus = [
|
|
|
|
|
+ #{
|
|
|
|
|
+ node => Node,
|
|
|
|
|
+ status => #{
|
|
|
|
|
+ running => Running,
|
|
|
|
|
+ max_connections => MaxConnections,
|
|
|
|
|
+ current_connections => CurrentConnections
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ | NodeStatus0
|
|
|
|
|
+ ],
|
|
|
|
|
+ NRunning =
|
|
|
|
|
+ case Running == Running0 of
|
|
|
|
|
+ true -> Running0;
|
|
|
|
|
+ _ -> inconsistent
|
|
|
end,
|
|
end,
|
|
|
Acc#{
|
|
Acc#{
|
|
|
GroupKey =>
|
|
GroupKey =>
|
|
|
GroupValue#{
|
|
GroupValue#{
|
|
|
ids => lists:usort([Id | Ids]),
|
|
ids => lists:usort([Id | Ids]),
|
|
|
status => #{
|
|
status => #{
|
|
|
|
|
+ running => NRunning,
|
|
|
max_connections => max_conn(MaxConnections0, MaxConnections),
|
|
max_connections => max_conn(MaxConnections0, MaxConnections),
|
|
|
current_connections => CurrentConnections0 + CurrentConnections
|
|
current_connections => CurrentConnections0 + CurrentConnections
|
|
|
},
|
|
},
|
|
@@ -583,6 +605,12 @@ max_conn(_Int1, <<"infinity">>) -> <<"infinity">>;
|
|
|
max_conn(<<"infinity">>, _Int) -> <<"infinity">>;
|
|
max_conn(<<"infinity">>, _Int) -> <<"infinity">>;
|
|
|
max_conn(Int1, Int2) -> Int1 + Int2.
|
|
max_conn(Int1, Int2) -> Int1 + Int2.
|
|
|
|
|
|
|
|
|
|
+%% @doc returning a uniform format (ip_port string) is more
|
|
|
|
|
+%% helpful to users
|
|
|
|
|
+format_raw_bind(Bind) when is_integer(Bind) ->
|
|
|
|
|
+ <<"0.0.0.0:", (integer_to_binary(Bind))/binary>>;
|
|
|
|
|
+format_raw_bind(Bind) when is_binary(Bind) -> Bind.
|
|
|
|
|
+
|
|
|
update(Path, Conf) ->
|
|
update(Path, Conf) ->
|
|
|
wrap(emqx_conf:update(Path, {update, Conf}, ?OPTS(cluster))).
|
|
wrap(emqx_conf:update(Path, {update, Conf}, ?OPTS(cluster))).
|
|
|
|
|
|
|
@@ -605,17 +633,27 @@ listener_type_status_example() ->
|
|
|
#{
|
|
#{
|
|
|
enable => false,
|
|
enable => false,
|
|
|
ids => ["tcp:demo"],
|
|
ids => ["tcp:demo"],
|
|
|
- node_status => #{
|
|
|
|
|
- 'emqx@127.0.0.1' => #{
|
|
|
|
|
- current_connections => 11,
|
|
|
|
|
- max_connections => 1024000
|
|
|
|
|
- },
|
|
|
|
|
- 'emqx@127.0.0.2' => #{
|
|
|
|
|
- current_connections => 10,
|
|
|
|
|
- max_connections => 1024000
|
|
|
|
|
- }
|
|
|
|
|
- },
|
|
|
|
|
|
|
+ node_status =>
|
|
|
|
|
+ [
|
|
|
|
|
+ #{
|
|
|
|
|
+ node => 'emqx@127.0.0.1',
|
|
|
|
|
+ status => #{
|
|
|
|
|
+ running => true,
|
|
|
|
|
+ current_connections => 11,
|
|
|
|
|
+ max_connections => 1024000
|
|
|
|
|
+ }
|
|
|
|
|
+ },
|
|
|
|
|
+ #{
|
|
|
|
|
+ node => 'emqx@127.0.0.1',
|
|
|
|
|
+ status => #{
|
|
|
|
|
+ running => true,
|
|
|
|
|
+ current_connections => 10,
|
|
|
|
|
+ max_connections => 1024000
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ ],
|
|
|
status => #{
|
|
status => #{
|
|
|
|
|
+ running => true,
|
|
|
current_connections => 21,
|
|
current_connections => 21,
|
|
|
max_connections => 2048000
|
|
max_connections => 2048000
|
|
|
},
|
|
},
|
|
@@ -624,17 +662,28 @@ listener_type_status_example() ->
|
|
|
#{
|
|
#{
|
|
|
enable => false,
|
|
enable => false,
|
|
|
ids => ["ssl:default"],
|
|
ids => ["ssl:default"],
|
|
|
- node_status => #{
|
|
|
|
|
- 'emqx@127.0.0.1' => #{
|
|
|
|
|
- current_connections => 31,
|
|
|
|
|
- max_connections => infinity
|
|
|
|
|
- },
|
|
|
|
|
- 'emqx@127.0.0.2' => #{
|
|
|
|
|
- current_connections => 40,
|
|
|
|
|
- max_connections => infinity
|
|
|
|
|
- }
|
|
|
|
|
- },
|
|
|
|
|
|
|
+ node_status =>
|
|
|
|
|
+ [
|
|
|
|
|
+ #{
|
|
|
|
|
+ node => 'emqx@127.0.0.1',
|
|
|
|
|
+ status => #{
|
|
|
|
|
+ running => true,
|
|
|
|
|
+ current_connections => 31,
|
|
|
|
|
+ max_connections => infinity
|
|
|
|
|
+ }
|
|
|
|
|
+ },
|
|
|
|
|
+ #{
|
|
|
|
|
+ node => 'emqx@127.0.0.1',
|
|
|
|
|
+ status => #{
|
|
|
|
|
+ running => true,
|
|
|
|
|
+ current_connections => 40,
|
|
|
|
|
+ max_connections => infinity
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ ],
|
|
|
|
|
+
|
|
|
status => #{
|
|
status => #{
|
|
|
|
|
+ running => true,
|
|
|
current_connections => 71,
|
|
current_connections => 71,
|
|
|
max_connections => infinity
|
|
max_connections => infinity
|
|
|
},
|
|
},
|
|
@@ -649,18 +698,30 @@ listener_id_status_example() ->
|
|
|
bind => <<"0.0.0.0:1884">>,
|
|
bind => <<"0.0.0.0:1884">>,
|
|
|
enable => true,
|
|
enable => true,
|
|
|
id => <<"tcp:demo">>,
|
|
id => <<"tcp:demo">>,
|
|
|
- node_status => #{
|
|
|
|
|
- 'emqx@127.0.0.1' => #{
|
|
|
|
|
- current_connections => 100,
|
|
|
|
|
- max_connections => 1024000
|
|
|
|
|
- },
|
|
|
|
|
- 'emqx@127.0.0.2' => #{
|
|
|
|
|
- current_connections => 101,
|
|
|
|
|
- max_connections => 1024000
|
|
|
|
|
- }
|
|
|
|
|
- },
|
|
|
|
|
|
|
+ type => <<"tcp">>,
|
|
|
|
|
+ name => <<"demo">>,
|
|
|
|
|
+ node_status =>
|
|
|
|
|
+ [
|
|
|
|
|
+ #{
|
|
|
|
|
+ node => 'emqx@127.0.0.1',
|
|
|
|
|
+ status => #{
|
|
|
|
|
+ running => true,
|
|
|
|
|
+ current_connections => 100,
|
|
|
|
|
+ max_connections => 1024000
|
|
|
|
|
+ }
|
|
|
|
|
+ },
|
|
|
|
|
+ #{
|
|
|
|
|
+ node => 'emqx@127.0.0.1',
|
|
|
|
|
+ status => #{
|
|
|
|
|
+ running => true,
|
|
|
|
|
+ current_connections => 101,
|
|
|
|
|
+ max_connections => 1024000
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ ],
|
|
|
number => 2,
|
|
number => 2,
|
|
|
status => #{
|
|
status => #{
|
|
|
|
|
+ running => true,
|
|
|
current_connections => 201,
|
|
current_connections => 201,
|
|
|
max_connections => 2048000
|
|
max_connections => 2048000
|
|
|
}
|
|
}
|
|
@@ -670,18 +731,30 @@ listener_id_status_example() ->
|
|
|
bind => <<"0.0.0.0:1883">>,
|
|
bind => <<"0.0.0.0:1883">>,
|
|
|
enable => true,
|
|
enable => true,
|
|
|
id => <<"tcp:default">>,
|
|
id => <<"tcp:default">>,
|
|
|
- node_status => #{
|
|
|
|
|
- 'emqx@127.0.0.1' => #{
|
|
|
|
|
- current_connections => 300,
|
|
|
|
|
- max_connections => infinity
|
|
|
|
|
- },
|
|
|
|
|
- 'emqx@127.0.0.2' => #{
|
|
|
|
|
- current_connections => 201,
|
|
|
|
|
- max_connections => infinity
|
|
|
|
|
- }
|
|
|
|
|
- },
|
|
|
|
|
|
|
+ type => <<"tcp">>,
|
|
|
|
|
+ name => <<"default">>,
|
|
|
|
|
+ node_status =>
|
|
|
|
|
+ [
|
|
|
|
|
+ #{
|
|
|
|
|
+ node => 'emqx@127.0.0.1',
|
|
|
|
|
+ status => #{
|
|
|
|
|
+ running => true,
|
|
|
|
|
+ current_connections => 200,
|
|
|
|
|
+ max_connections => infinity
|
|
|
|
|
+ }
|
|
|
|
|
+ },
|
|
|
|
|
+ #{
|
|
|
|
|
+ node => 'emqx@127.0.0.1',
|
|
|
|
|
+ status => #{
|
|
|
|
|
+ running => true,
|
|
|
|
|
+ current_connections => 301,
|
|
|
|
|
+ max_connections => infinity
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ ],
|
|
|
number => 2,
|
|
number => 2,
|
|
|
status => #{
|
|
status => #{
|
|
|
|
|
+ running => true,
|
|
|
current_connections => 501,
|
|
current_connections => 501,
|
|
|
max_connections => infinity
|
|
max_connections => infinity
|
|
|
}
|
|
}
|