Kaynağa Gözat

emqttd:publish/1

Feng 10 yıl önce
ebeveyn
işleme
ef8cff4cac

+ 2 - 2
src/emqttd_alarm.erl

@@ -91,12 +91,12 @@ handle_event({set_alarm, Alarm = #mqtt_alarm{id       = AlarmId,
                               {title, iolist_to_binary(Title)},
                               {title, iolist_to_binary(Title)},
                               {summary, iolist_to_binary(Summary)},
                               {summary, iolist_to_binary(Summary)},
                               {ts, emqttd_time:now_to_secs(Timestamp)}]),
                               {ts, emqttd_time:now_to_secs(Timestamp)}]),
-    emqttd_pubsub:publish(alarm_msg(alert, AlarmId, Json)),
+    emqttd:publish(alarm_msg(alert, AlarmId, Json)),
     {ok, [Alarm#mqtt_alarm{timestamp = Timestamp} | Alarms]};
     {ok, [Alarm#mqtt_alarm{timestamp = Timestamp} | Alarms]};
 
 
 handle_event({clear_alarm, AlarmId}, Alarms) ->
 handle_event({clear_alarm, AlarmId}, Alarms) ->
     Json = mochijson2:encode([{id, AlarmId}, {ts, emqttd_time:now_to_secs()}]),
     Json = mochijson2:encode([{id, AlarmId}, {ts, emqttd_time:now_to_secs()}]),
-    emqttd_pubsub:publish(alarm_msg(clear, AlarmId, Json)),
+    emqttd:publish(alarm_msg(clear, AlarmId, Json)),
     {ok, lists:keydelete(AlarmId, 2, Alarms), hibernate};
     {ok, lists:keydelete(AlarmId, 2, Alarms), hibernate};
 
 
 handle_event(_, Alarms)->
 handle_event(_, Alarms)->

+ 2 - 2
src/emqttd_metrics.erl

@@ -243,7 +243,7 @@ init([]) ->
     % Init metrics
     % Init metrics
     [create_metric(Metric) ||  Metric <- Metrics],
     [create_metric(Metric) ||  Metric <- Metrics],
     % $SYS Topics for metrics
     % $SYS Topics for metrics
-    [ok = emqttd_pubsub:create(topic, metric_topic(Topic)) || {_, Topic} <- Metrics],
+    [ok = emqttd:create(topic, metric_topic(Topic)) || {_, Topic} <- Metrics],
     % Tick to publish metrics
     % Tick to publish metrics
     {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}.
     {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}.
 
 
@@ -273,7 +273,7 @@ code_change(_OldVsn, State, _Extra) ->
 
 
 publish(Metric, Val) ->
 publish(Metric, Val) ->
     Msg = emqttd_message:make(metrics, metric_topic(Metric), bin(Val)),
     Msg = emqttd_message:make(metrics, metric_topic(Metric), bin(Val)),
-    emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)).
+    emqttd:publish(emqttd_message:set_flag(sys, Msg)).
 
 
 create_metric({gauge, Name}) ->
 create_metric({gauge, Name}) ->
     ets:insert(?METRIC_TAB, {{Name, 0}, 0});
     ets:insert(?METRIC_TAB, {{Name, 0}, 0});

+ 43 - 50
src/emqttd_session.erl

@@ -154,6 +154,10 @@ info(SessPid) ->
 destroy(SessPid, ClientId) ->
 destroy(SessPid, ClientId) ->
     gen_server2:cast(SessPid, {destroy, ClientId}).
     gen_server2:cast(SessPid, {destroy, ClientId}).
 
 
+%%--------------------------------------------------------------------
+%% PubSub
+%%--------------------------------------------------------------------
+
 %% @doc Subscribe Topics
 %% @doc Subscribe Topics
 -spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> ok.
 -spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> ok.
 subscribe(SessPid, TopicTable) ->
 subscribe(SessPid, TopicTable) ->
@@ -171,11 +175,11 @@ subscribe(SessPid, PacketId, TopicTable) ->
 -spec publish(pid(), mqtt_message()) -> ok | {error, any()}.
 -spec publish(pid(), mqtt_message()) -> ok | {error, any()}.
 publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_0}) ->
 publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_0}) ->
     %% publish qos0 directly
     %% publish qos0 directly
-    emqttd_pubsub:publish(Msg);
+    emqttd:publish(Msg);
 
 
 publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_1}) ->
 publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_1}) ->
     %% publish qos1 directly, and client will puback automatically
     %% publish qos1 directly, and client will puback automatically
-    emqttd_pubsub:publish(Msg);
+    emqttd:publish(Msg);
 
 
 publish(SessPid, Msg = #mqtt_message{qos = ?QOS_2}) ->
 publish(SessPid, Msg = #mqtt_message{qos = ?QOS_2}) ->
     %% publish qos2 by session 
     %% publish qos2 by session 
@@ -281,62 +285,51 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}},
 handle_call(Req, _From, State) ->
 handle_call(Req, _From, State) ->
     ?UNEXPECTED_REQ(Req, State).
     ?UNEXPECTED_REQ(Req, State).
 
 
-handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId,
+handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id     = ClientId,
                                                                  subscriptions = Subscriptions}) ->
                                                                  subscriptions = Subscriptions}) ->
 
 
     TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0),
     TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0),
-
-    case TopicTable -- dict:to_list(Subscriptions) of
-        [] ->
-            AckFun([Qos || {_, Qos} <- TopicTable]),
-            hibernate(Session);
-        _  ->
-            %% subscribe first and don't care if the subscriptions have been existed
-            {ok, GrantedQos} = emqttd_pubsub:subscribe(ClientId, TopicTable),
-
-            AckFun(GrantedQos),
-
-            emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]),
-
-            ?LOG(info, "Subscribe ~p, Granted QoS: ~p", [TopicTable, GrantedQos], Session),
-
-            Subscriptions1 =
-            lists:foldl(fun({Topic, Qos}, Dict) ->
-                            case dict:find(Topic, Dict) of
-                                {ok, Qos} ->
-                                    ?LOG(warning, "resubscribe ~s, qos = ~w", [Topic, Qos], Session),
-                                    Dict;
-                                {ok, OldQos} ->
-                                    ?LOG(warning, "resubscribe ~s, old qos=~w, new qos=~w", [Topic, OldQos, Qos], Session),
-                                    dict:store(Topic, Qos, Dict);
-                                error ->
-                                    %%TODO: the design is ugly, rewrite later...:(
-                                    %% <MQTT V3.1.1>: 3.8.4
-                                    %% Where the Topic Filter is not identical to any existing Subscription’s filter,
-                                    %% a new Subscription is created and all matching retained messages are sent.
-                                    emqttd_retainer:dispatch(Topic, self()),
-
-                                    dict:store(Topic, Qos, Dict)
-                            end
-                        end, Subscriptions, TopicTable),
-            hibernate(Session#session{subscriptions = Subscriptions1})
-    end;
+    ?LOG(info, "Subscribe ~p", [TopicTable], Session),
+    Subscriptions1 = lists:foldl(
+        fun({Topic, Qos}, SubDict) ->
+            case dict:find(Topic, SubDict) of
+                {ok, Qos} ->
+                    ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session),
+                    SubDict;
+                {ok, OldQos} ->
+                    emqttd_server:update_subscription(ClientId, Topic, OldQos, Qos),
+                    ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session),
+                    dict:store(Topic, Qos, SubDict);
+                error ->
+                    emqttd:subscribe(ClientId, Topic, Qos),
+                    %%TODO: the design is ugly...
+                    %% <MQTT V3.1.1>: 3.8.4
+                    %% Where the Topic Filter is not identical to any existing Subscription’s filter,
+                    %% a new Subscription is created and all matching retained messages are sent.
+                    emqttd_retainer:dispatch(Topic, self()),
+
+                    dict:store(Topic, Qos, SubDict)
+            end
+        end, Subscriptions, TopicTable),
+    AckFun([Qos || {_, Qos} <- TopicTable]),
+    emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]),
+    hibernate(Session#session{subscriptions = Subscriptions1});
 
 
 handle_cast({unsubscribe, Topics0}, Session = #session{client_id     = ClientId,
 handle_cast({unsubscribe, Topics0}, Session = #session{client_id     = ClientId,
                                                        subscriptions = Subscriptions}) ->
                                                        subscriptions = Subscriptions}) ->
 
 
     Topics = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics0),
     Topics = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics0),
-
-    %% unsubscribe from topic tree
-    ok = emqttd_pubsub:unsubscribe(Topics),
-
     ?LOG(info, "unsubscribe ~p", [Topics], Session),
     ?LOG(info, "unsubscribe ~p", [Topics], Session),
-
-    Subscriptions1 =
-    lists:foldl(fun(Topic, Dict) ->
-                    dict:erase(Topic, Dict)
-                end, Subscriptions, Topics),
-
+    Subscriptions1 = lists:foldl(
+        fun(Topic, SubDict) ->
+            case dict:find(Topic, SubDict) of
+                {ok, Qos} ->
+                    emqttd:unsubscribe(ClientId, Topic, Qos),
+                    dict:erase(Topic, SubDict);
+                error ->
+                    SubDict
+            end
+        end, Subscriptions, Topics),
     hibernate(Session#session{subscriptions = Subscriptions1});
     hibernate(Session#session{subscriptions = Subscriptions1});
 
 
 handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) ->
 handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) ->
@@ -430,7 +423,7 @@ handle_cast({pubrel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) ->
     case maps:find(PktId, AwaitingRel) of
     case maps:find(PktId, AwaitingRel) of
         {ok, {Msg, TRef}} ->
         {ok, {Msg, TRef}} ->
             cancel_timer(TRef),
             cancel_timer(TRef),
-            emqttd_pubsub:publish(Msg),
+            emqttd:publish(Msg),
             hibernate(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)});
             hibernate(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)});
         error ->
         error ->
             ?LOG(error, "Cannot find PUBREL: ~p", [PktId], Session),
             ?LOG(error, "Cannot find PUBREL: ~p", [PktId], Session),

+ 9 - 10
src/emqttd_stats.erl

@@ -52,14 +52,14 @@
 
 
 %% $SYS Topics for Subscribers
 %% $SYS Topics for Subscribers
 -define(SYSTOP_PUBSUB, [
 -define(SYSTOP_PUBSUB, [
-    'routes/count',      % ...
-    'routes/reverse',    % ...
-    'topics/count',      % ...
-    'topics/max',        % ...
+    'routes/count',        % ...
+    'routes/max',          % ...
+    'topics/count',        % ...
+    'topics/max',          % ...
+    'subscribers/count',   % ...
+    'subscribers/max',     % ...
     'subscriptions/count', % ...
     'subscriptions/count', % ...
-    'subscriptions/max',   % ...
-    'queues/count',      % ...
-    'queues/max'         % ...
+    'subscriptions/max'    % ...
 ]).
 ]).
 
 
 %% $SYS Topic for retained
 %% $SYS Topic for retained
@@ -122,7 +122,7 @@ init([]) ->
     Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB ++ ?SYSTOP_RETAINED,
     Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB ++ ?SYSTOP_RETAINED,
     ets:insert(?STATS_TAB, [{Topic, 0} || Topic <- Topics]),
     ets:insert(?STATS_TAB, [{Topic, 0} || Topic <- Topics]),
     % Create $SYS Topics
     % Create $SYS Topics
-    [ok = emqttd_pubsub:create(topic, stats_topic(Topic)) || Topic <- Topics],
+    [ok = emqttd:create(topic, stats_topic(Topic)) || Topic <- Topics],
     % Tick to publish stats
     % Tick to publish stats
     {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}.
     {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}.
 
 
@@ -165,8 +165,7 @@ code_change(_OldVsn, State, _Extra) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 publish(Stat, Val) ->
 publish(Stat, Val) ->
-    Msg = emqttd_message:make(stats, stats_topic(Stat), bin(Val)),
-    emqttd_pubsub:publish(Msg).
+    emqttd:publish(emqttd_message:make(stats, stats_topic(Stat), bin(Val))).
 
 
 stats_topic(Stat) ->
 stats_topic(Stat) ->
     emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))).
     emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))).

+ 1 - 1
src/emqttd_sysmon.erl

@@ -157,7 +157,7 @@ procinfo(Pid) ->
 
 
 publish(Sysmon, WarnMsg) ->
 publish(Sysmon, WarnMsg) ->
     Msg = emqttd_message:make(sysmon, topic(Sysmon), iolist_to_binary(WarnMsg)),
     Msg = emqttd_message:make(sysmon, topic(Sysmon), iolist_to_binary(WarnMsg)),
-    emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)).
+    emqttd:publish(emqttd_message:set_flag(sys, Msg)).
 
 
 topic(Sysmon) ->
 topic(Sysmon) ->
     emqttd_topic:systop(list_to_binary(lists:concat(['sysmon/', Sysmon]))).
     emqttd_topic:systop(list_to_binary(lists:concat(['sysmon/', Sysmon]))).