|
|
@@ -32,7 +32,6 @@
|
|
|
%%% @end
|
|
|
%%%
|
|
|
%%% @author Feng Lee <feng@emqtt.io>
|
|
|
-%%%
|
|
|
%%%-----------------------------------------------------------------------------
|
|
|
-module(emqttd_router).
|
|
|
|
|
|
@@ -40,8 +39,12 @@
|
|
|
|
|
|
-include("emqttd_protocol.hrl").
|
|
|
|
|
|
--export([init/1, lookup/1, route/2, add_routes/2,
|
|
|
- delete_routes/1, delete_routes/2]).
|
|
|
+-export([init/1, route/2, lookup_routes/1,
|
|
|
+ add_routes/2, delete_routes/1, delete_routes/2]).
|
|
|
+
|
|
|
+-ifdef(TEST).
|
|
|
+-compile(export_all).
|
|
|
+-endif.
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% @doc Create route tables.
|
|
|
@@ -71,7 +74,7 @@ ensure_tab(Tab, Opts) ->
|
|
|
%%------------------------------------------------------------------------------
|
|
|
-spec add_routes(list({binary(), mqtt_qos()}), pid()) -> ok.
|
|
|
add_routes(TopicTable, Pid) when is_pid(Pid) ->
|
|
|
- case lookup(Pid) of
|
|
|
+ case lookup_routes(Pid) of
|
|
|
[] ->
|
|
|
erlang:monitor(process, Pid),
|
|
|
insert_routes(TopicTable, Pid);
|
|
|
@@ -85,8 +88,8 @@ add_routes(TopicTable, Pid) when is_pid(Pid) ->
|
|
|
%% @doc Lookup Routes
|
|
|
%% @end
|
|
|
%%------------------------------------------------------------------------------
|
|
|
--spec lookup(pid()) -> list({binary(), mqtt_qos()}).
|
|
|
-lookup(Pid) when is_pid(Pid) ->
|
|
|
+-spec lookup_routes(pid()) -> list({binary(), mqtt_qos()}).
|
|
|
+lookup_routes(Pid) when is_pid(Pid) ->
|
|
|
[{Topic, Qos} || {_, Topic, Qos} <- ets:lookup(reverse_route, Pid)].
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
@@ -100,7 +103,7 @@ delete_routes(Topics, Pid) ->
|
|
|
|
|
|
-spec delete_routes(pid()) -> ok.
|
|
|
delete_routes(Pid) when is_pid(Pid) ->
|
|
|
- Routes = [{Topic, Pid} || {Topic, _Qos} <- lookup(Pid)],
|
|
|
+ Routes = [{Topic, Pid} || {Topic, _Qos} <- lookup_routes(Pid)],
|
|
|
ets:delete(reverse_route, Pid),
|
|
|
lists:foreach(fun delete_route_only/1, Routes).
|
|
|
|
|
|
@@ -112,7 +115,7 @@ delete_routes(Pid) when is_pid(Pid) ->
|
|
|
route(Queue = <<"$Q/", _Q>>, Msg) ->
|
|
|
case ets:lookup(route, Queue) of
|
|
|
[] ->
|
|
|
- setstats(dropped, true);
|
|
|
+ emqttd_metrics:inc('messages/dropped');
|
|
|
Routes ->
|
|
|
Idx = crypto:rand_uniform(1, length(Routes) + 1),
|
|
|
{_, SubPid, SubQos} = lists:nth(Idx, Routes),
|
|
|
@@ -120,12 +123,15 @@ route(Queue = <<"$Q/", _Q>>, Msg) ->
|
|
|
end;
|
|
|
|
|
|
route(Topic, Msg) ->
|
|
|
- Routes = ets:lookup(route, Topic),
|
|
|
- setstats(dropped, Routes =:= []),
|
|
|
- lists:foreach(
|
|
|
- fun({_Topic, SubPid, SubQos}) ->
|
|
|
- SubPid ! {dispatch, tune_qos(SubQos, Msg)}
|
|
|
- end, Routes).
|
|
|
+ case ets:lookup(route, Topic) of
|
|
|
+ [] ->
|
|
|
+ emqttd_metrics:inc('messages/dropped');
|
|
|
+ Routes ->
|
|
|
+ lists:foreach(
|
|
|
+ fun({_Topic, SubPid, SubQos}) ->
|
|
|
+ SubPid ! {dispatch, tune_qos(SubQos, Msg)}
|
|
|
+ end, Routes)
|
|
|
+ end.
|
|
|
|
|
|
tune_qos(SubQos, Msg = #mqtt_message{qos = PubQos}) when PubQos > SubQos ->
|
|
|
Msg#mqtt_message{qos = SubQos};
|
|
|
@@ -185,8 +191,3 @@ delete_route({Topic, Pid}) ->
|
|
|
delete_route_only({Topic, Pid}) ->
|
|
|
ets:match_delete(route, {Topic, Pid, '_'}).
|
|
|
|
|
|
-setstats(dropped, false) ->
|
|
|
- ignore;
|
|
|
-setstats(dropped, true) ->
|
|
|
- emqttd_metrics:inc('messages/dropped').
|
|
|
-
|