Просмотр исходного кода

Improve the channel modules and fix the extension mods

Feng Lee 6 лет назад
Родитель
Сommit
f8e28e39ed
5 измененных файлов с 135 добавлено и 48 удалено
  1. 18 0
      src/emqx_channel.erl
  2. 33 21
      src/emqx_mod_presence.erl
  3. 12 8
      src/emqx_mod_subscription.erl
  4. 55 19
      src/emqx_protocol.erl
  5. 17 0
      src/emqx_ws_channel.erl

+ 18 - 0
src/emqx_channel.erl

@@ -290,6 +290,13 @@ connected(info, Deliver = {deliver, _Topic, _Msg},
             shutdown(Reason, State#state{proto_state = NProtoState})
     end;
 
+%% TODO: Improve later.
+connected(info, {subscribe, TopicFilters}, State) ->
+    handle_request({subscribe, TopicFilters}, State);
+
+connected(info, {unsubscribe, TopicFilters}, State) ->
+    handle_request({unsubscribe, TopicFilters}, State);
+
 %% Keepalive timer
 connected(info, {keepalive, check}, State = #state{keepalive = KeepAlive}) ->
     case emqx_keepalive:check(KeepAlive) of
@@ -451,6 +458,17 @@ terminate(Reason, _StateName, #state{transport   = Transport,
     ok = emqx_keepalive:cancel(KeepAlive),
     emqx_protocol:terminate(Reason, ProtoState).
 
+%%--------------------------------------------------------------------
+%% Handle internal request
+
+handle_request(Req, State = #state{proto_state = ProtoState}) ->
+    case emqx_protocol:handle_req(Req, ProtoState) of
+        {ok, _Result, NProtoState} -> %% TODO:: how to handle the result?
+            keep_state(State#state{proto_state = NProtoState});
+        {error, Reason, NProtoState} ->
+            shutdown(Reason, State#state{proto_state = NProtoState})
+    end.
+
 %%--------------------------------------------------------------------
 %% Process incoming data
 

+ 33 - 21
src/emqx_mod_presence.erl

@@ -33,39 +33,50 @@
         , unload/1
         ]).
 
--define(ATTR_KEYS, [clean_start, proto_ver, proto_name, keepalive]).
-
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------
 
 load(Env) ->
-    emqx_hooks:add('client.connected',    fun ?MODULE:on_client_connected/4, [Env]),
-    emqx_hooks:add('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]).
+    emqx_hooks:add('client.connected',    {?MODULE, on_client_connected, [Env]}),
+    emqx_hooks:add('client.disconnected', {?MODULE, on_client_disconnected, [Env]}).
 
 on_client_connected(#{client_id := ClientId,
                       username  := Username,
-                      peername  := {IpAddr, _}}, ConnAck, ConnAttrs, Env) ->
-    Attrs = #{},%maps:filter(fun(K, _) ->
-                %                lists:member(K, ?ATTR_KEYS)
-                %        end, ConnAttrs),
-    case emqx_json:safe_encode(Attrs#{clientid => ClientId,
-                                      username => Username,
-                                      ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)),
-                                      connack => ConnAck,
-                                      ts => erlang:system_time(millisecond)
-                                     }) of
+                      peername  := {IpAddr, _}
+                     }, ConnAck,
+                    #{session    := #{clean_start     := CleanStart,
+                                      expiry_interval := Interval
+                                     },
+                      proto_name := ProtoName,
+                      proto_ver  := ProtoVer,
+                      keepalive  := Keepalive
+                     }, Env) ->
+
+    case emqx_json:safe_encode(#{clientid => ClientId,
+                                 username => Username,
+                                 ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)),
+                                 proto_name => ProtoName,
+                                 proto_ver => ProtoVer,
+                                 keepalive => Keepalive,
+                                 clean_start => CleanStart,
+                                 expiry_interval => Interval,
+                                 connack => ConnAck,
+                                 ts => erlang:system_time(millisecond)
+                                }) of
         {ok, Payload} ->
             emqx:publish(message(qos(Env), topic(connected, ClientId), Payload));
         {error, Reason} ->
             ?LOG(error, "Encoding connected event error: ~p", [Reason])
     end.
 
-on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, Env) ->
-    case emqx_json:safe_encode([{clientid, ClientId},
-                                {username, Username},
-                                {reason, reason(Reason)},
-                                {ts, erlang:system_time(millisecond)}]) of
+on_client_disconnected(#{client_id := ClientId,
+                         username := Username}, Reason, Env) ->
+    case emqx_json:safe_encode(#{clientid => ClientId,
+                                 username => Username,
+                                 reason => reason(Reason),
+                                 ts => erlang:system_time(millisecond)
+                                }) of
         {ok, Payload} ->
             emqx_broker:publish(message(qos(Env), topic(disconnected, ClientId), Payload));
         {error, Reason} ->
@@ -73,8 +84,8 @@ on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, E
     end.
 
 unload(_Env) ->
-    emqx_hooks:del('client.connected',    fun ?MODULE:on_client_connected/4),
-    emqx_hooks:del('client.disconnected', fun ?MODULE:on_client_disconnected/3).
+    emqx_hooks:del('client.connected',    {?MODULE, on_client_connected}),
+    emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}).
 
 message(QoS, Topic, Payload) ->
     emqx_message:set_flag(
@@ -91,3 +102,4 @@ qos(Env) -> proplists:get_value(qos, Env, 0).
 reason(Reason) when is_atom(Reason) -> Reason;
 reason({Error, _}) when is_atom(Error) -> Error;
 reason(_) -> internal_error.
+

+ 12 - 8
src/emqx_mod_subscription.erl

@@ -22,28 +22,32 @@
 -include_lib("emqx_mqtt.hrl").
 
 %% APIs
--export([on_session_created/3]).
+-export([on_client_connected/4]).
 
 %% emqx_gen_mod callbacks
 -export([ load/1
         , unload/1
         ]).
-%%------------------------------------------------------------------------------
+
+%%--------------------------------------------------------------------
 %% Load/Unload Hook
-%%------------------------------------------------------------------------------
+%%--------------------------------------------------------------------
 
 load(Topics) ->
-    emqx_hooks:add('session.created', fun ?MODULE:on_session_created/3, [Topics]).
+    emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}).
 
-on_session_created(#{client_id := ClientId}, SessAttrs, Topics) ->
-    Username = proplists:get_value(username, SessAttrs),
+on_client_connected(#{client_id := ClientId,
+                      username  := Username,
+                      conn_mod  := ConnMod
+                     }, ?RC_SUCCESS, _ConnAttrs, Topics) ->
     Replace = fun(Topic) ->
                       rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic))
               end,
-    emqx_session:subscribe(self(), [{Replace(Topic), #{qos => QoS}} || {Topic, QoS} <- Topics]).
+    TopicFilters = [{Replace(Topic), #{qos => QoS}} || {Topic, QoS} <- Topics],
+    self() ! {subscribe, TopicFilters}.
 
 unload(_) ->
-    emqx_hooks:del('session.created', fun ?MODULE:on_session_created/3).
+    emqx_hooks:del('client.connected', {?MODULE, on_client_connected}).
 
 %%------------------------------------------------------------------------------
 %% Internal functions

+ 55 - 19
src/emqx_protocol.erl

@@ -32,6 +32,7 @@
 
 -export([ init/2
         , handle_in/2
+        , handle_req/2
         , handle_deliver/2
         , handle_out/2
         , handle_timeout/3
@@ -264,6 +265,31 @@ handle_in(Packet, PState) ->
     io:format("In: ~p~n", [Packet]),
     {ok, PState}.
 
+%%--------------------------------------------------------------------
+%% Handle internal request
+%%--------------------------------------------------------------------
+
+-spec(handle_req(Req:: term(), proto_state())
+      -> {ok, Result :: term(), proto_state()} |
+         {error, Reason :: term(), proto_state()}).
+handle_req({subscribe, TopicFilters}, PState = #protocol{client = Client}) ->
+    TopicFilters1 = emqx_hooks:run_fold('client.subscribe',
+                                        [Client, #{'Internal' => true}],
+                                        parse(subscribe, TopicFilters)),
+    {ReasonCodes, NPState} = process_subscribe(TopicFilters1, PState),
+    {ok, ReasonCodes, NPState};
+
+handle_req({unsubscribe, TopicFilters}, PState = #protocol{client = Client}) ->
+    TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe',
+                                        [Client, #{'Internal' => true}],
+                                        parse(unsubscribe, TopicFilters)),
+    {ReasonCodes, NPState} = process_unsubscribe(TopicFilters1, PState),
+    {ok, ReasonCodes, NPState};
+
+handle_req(Req, PState) ->
+    ?LOG(error, "Unexpected request: ~p~n", [Req]),
+    {ok, ignored, PState}.
+
 %%--------------------------------------------------------------------
 %% Handle delivers
 %%--------------------------------------------------------------------
@@ -306,14 +332,14 @@ handle_out({connack, ?RC_SUCCESS, SP},
     %% subscribe requests or responses that are not intended for them.
     AckProps1 = if AckProps == undefined -> #{}; true -> AckProps end,
     AckProps2 = AckProps1#{'Retain-Available' => flag(Retain),
-                          'Maximum-Packet-Size' => MaxPktSize,
-                          'Topic-Alias-Maximum' => MaxAlias,
-                          'Wildcard-Subscription-Available' => flag(Wildcard),
-                          'Subscription-Identifier-Available' => 1,
-                          %'Response-Information' =>
-                          'Shared-Subscription-Available' => flag(Shared),
-                          'Maximum-QoS' => MaxQoS
-                         },
+                           'Maximum-Packet-Size' => MaxPktSize,
+                           'Topic-Alias-Maximum' => MaxAlias,
+                           'Wildcard-Subscription-Available' => flag(Wildcard),
+                           'Subscription-Identifier-Available' => 1,
+                           %'Response-Information' =>
+                           'Shared-Subscription-Available' => flag(Shared),
+                           'Maximum-QoS' => MaxQoS
+                          },
     AckProps3 = case emqx_zone:get_env(Zone, server_keepalive) of
                     undefined -> AckProps2;
                     Keepalive -> AckProps2#{'Server-Keep-Alive' => Keepalive}
@@ -334,7 +360,7 @@ handle_out({connack, ReasonCode}, PState = #protocol{client = Client,
     Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer),
     {error, Reason, ?CONNACK_PACKET(ReasonCode1), PState};
 
-handle_out({publish, Publishes}, PState = #protocol{client = Client}) ->
+handle_out({publish, Publishes}, PState) ->
     Packets = [element(2, handle_out(Publish, PState)) || Publish <- Publishes],
     {ok, Packets, PState};
 
@@ -808,6 +834,26 @@ do_unsubscribe(TopicFilter, _SubOpts, PState = #protocol{client = Client,
 is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) ->
     (not IsSuperuser) andalso emqx_zone:get_env(Zone, enable_acl, true).
 
+%%--------------------------------------------------------------------
+%% Parse topic filters
+%%--------------------------------------------------------------------
+
+parse(subscribe, TopicFilters) ->
+    [emqx_topic:parse(TopicFilter, SubOpts) || {TopicFilter, SubOpts} <- TopicFilters];
+
+parse(unsubscribe, TopicFilters) ->
+    lists:map(fun emqx_topic:parse/1, TopicFilters).
+
+%%--------------------------------------------------------------------
+%% Mount/Unmount
+%%--------------------------------------------------------------------
+
+mount(#{mountpoint := MountPoint}, TopicOrMsg) ->
+    emqx_mountpoint:mount(MountPoint, TopicOrMsg).
+
+unmount(#{mountpoint := MountPoint}, TopicOrMsg) ->
+    emqx_mountpoint:unmount(MountPoint, TopicOrMsg).
+
 %%--------------------------------------------------------------------
 %% Pipeline
 %%--------------------------------------------------------------------
@@ -828,16 +874,6 @@ pipeline([Fun|More], Packet, PState) ->
             {error, ReasonCode, NPState}
     end.
 
-%%--------------------------------------------------------------------
-%% Mount/Unmount
-%%--------------------------------------------------------------------
-
-mount(#{mountpoint := MountPoint}, TopicOrMsg) ->
-    emqx_mountpoint:mount(MountPoint, TopicOrMsg).
-
-unmount(#{mountpoint := MountPoint}, TopicOrMsg) ->
-    emqx_mountpoint:unmount(MountPoint, TopicOrMsg).
-
 %%--------------------------------------------------------------------
 %% Helper functions
 %%--------------------------------------------------------------------

+ 17 - 0
src/emqx_ws_channel.erl

@@ -322,6 +322,12 @@ websocket_info({timeout, Timer, Msg},
             stop(Reason, State#state{proto_state = NProtoState})
     end;
 
+websocket_info({subscribe, TopicFilters}, State) ->
+    handle_request({subscribe, TopicFilters}, State);
+
+websocket_info({unsubscribe, TopicFilters}, State) ->
+    handle_request({unsubscribe, TopicFilters}, State);
+
 websocket_info({shutdown, discard, {ClientId, ByPid}}, State) ->
     ?LOG(warning, "Discarded by ~s:~p", [ClientId, ByPid]),
     stop(discard, State);
@@ -381,6 +387,17 @@ ensure_keepalive(Interval, #state{proto_state = ProtoState}) ->
                                 keepalive_backoff, 0.75),
     emqx_keepalive:start(stat_fun(), round(Interval * Backoff), {keepalive, check}).
 
+%%--------------------------------------------------------------------
+%% Handle internal request
+
+handle_request(Req, State = #state{proto_state = ProtoState}) ->
+    case emqx_protocol:handle_req(Req, ProtoState) of
+        {ok, _Result, NProtoState} -> %% TODO:: how to handle the result?
+            {ok, State#state{proto_state = NProtoState}};
+        {error, Reason, NProtoState} ->
+            stop(Reason, State#state{proto_state = NProtoState})
+    end.
+
 %%--------------------------------------------------------------------
 %% Process incoming data