Przeglądaj źródła

Add ignore self publish message

turtled 9 lat temu
rodzic
commit
45ca461fd9
3 zmienionych plików z 25 dodań i 3 usunięć
  1. 3 0
      etc/emq.conf
  2. 8 1
      priv/emq.schema
  3. 14 2
      src/emqttd_session.erl

+ 3 - 0
etc/emq.conf

@@ -173,6 +173,9 @@ mqtt.session.enable_stats = off
 ## s - second
 mqtt.session.expiry_interval = 2h
 
+## Ignore message from self publish
+mqtt.session.ignore_loop_deliver = false
+
 ##--------------------------------------------------------------------
 ## MQTT Message Queue
 ##--------------------------------------------------------------------

+ 8 - 1
priv/emq.schema

@@ -436,6 +436,12 @@ end}.
   {datatype, {duration, ms}}
 ]}.
 
+%% @doc Ignore message from self publish
+{mapping, "mqtt.session.ignore_loop_deliver", "emqttd.session", [
+  {default, false},
+  {datatype, {enum, [true, false]}}
+]}.
+
 {translation, "emqttd.session", fun(Conf) ->
   [{max_subscriptions, cuttlefish:conf_get("mqtt.session.max_subscriptions", Conf)},
    {upgrade_qos,       cuttlefish:conf_get("mqtt.session.upgrade_qos", Conf)},
@@ -444,7 +450,8 @@ end}.
    {max_awaiting_rel,  cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)},
    {await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)},
    {enable_stats,      cuttlefish:conf_get("mqtt.session.enable_stats", Conf)},
-   {expiry_interval,   cuttlefish:conf_get("mqtt.session.expiry_interval", Conf)}]
+   {expiry_interval,   cuttlefish:conf_get("mqtt.session.expiry_interval", Conf)},
+   {ignore_loop_deliver, cuttlefish:conf_get("mqtt.session.ignore_loop_deliver", Conf)}]
 end}.
 
 %%--------------------------------------------------------------------

+ 14 - 2
src/emqttd_session.erl

@@ -152,7 +152,9 @@
          %% Force GC Count
          force_gc_count :: undefined | integer(),
 
-         created_at :: erlang:timestamp()
+         created_at :: erlang:timestamp(),
+
+         ignore_loop_deliver = false :: boolean()
         }).
 
 -define(TIMEOUT, 60000).
@@ -284,6 +286,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) ->
     {ok, QEnv} = emqttd:env(mqueue),
     MaxInflight = get_value(max_inflight, Env, 0),
     EnableStats = get_value(enable_stats, Env, false),
+    IgnoreLoopDeliver = get_value(ignore_loop_deliver, Env, false),
     ForceGcCount = emqttd_gc:conn_max_gc_count(),
     MQueue = ?MQueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()),
     State = #state{clean_sess        = CleanSess,
@@ -304,7 +307,8 @@ init([CleanSess, {ClientId, Username}, ClientPid]) ->
                    expiry_interval   = get_value(expiry_interval, Env),
                    enable_stats      = EnableStats,
                    force_gc_count    = ForceGcCount,
-                   created_at        = os:timestamp()},
+                   created_at        = os:timestamp(),
+                   ignore_loop_deliver = IgnoreLoopDeliver},
     emqttd_sm:register_session(ClientId, CleanSess, info(State)),
     emqttd_hooks:run('session.created', [ClientId, Username]),
     {ok, emit_stats(State), hibernate, {backoff, 1000, 1000, 10000}}.
@@ -525,6 +529,14 @@ handle_cast({destroy, ClientId},
 handle_cast(Msg, State) ->
     ?UNEXPECTED_MSG(Msg, State).
 
+%% Dispatch message from self publish
+handle_info({dispatch, Topic, Msg = #mqtt_message{from = {ClientId, _}}}, 
+             State = #state{client_id = ClientId, 
+                            ignore_loop_deliver = IgnoreLoopDeliver}) when is_record(Msg, mqtt_message) ->
+    case IgnoreLoopDeliver of
+        true  -> {noreply, State, hibernate};
+        false -> {noreply, gc(dispatch(tune_qos(Topic, Msg, State), State)), hibernate}
+    end;
 %% Dispatch Message
 handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) ->
     {noreply, gc(dispatch(tune_qos(Topic, Msg, State), State)), hibernate};