|
|
@@ -42,6 +42,12 @@
|
|
|
, on_session_terminated/3
|
|
|
]).
|
|
|
|
|
|
+-export([ on_message_publish/1
|
|
|
+ , on_message_dropped/3
|
|
|
+ , on_message_delivered/2
|
|
|
+ , on_message_acked/2
|
|
|
+ ]).
|
|
|
+
|
|
|
%% Utils
|
|
|
-export([ message/1
|
|
|
, stringfy/1
|
|
|
@@ -71,8 +77,15 @@
|
|
|
, {'session.discarded', {?MODULE, on_session_discarded, []}}
|
|
|
, {'session.takeovered', {?MODULE, on_session_takeovered, []}}
|
|
|
, {'session.terminated', {?MODULE, on_session_terminated, []}}
|
|
|
+ %]).
|
|
|
+ , {'message.publish', {?MODULE, on_message_publish, []}}
|
|
|
+ , {'message.delivered', {?MODULE, on_message_delivered, []}}
|
|
|
+ , {'message.acked', {?MODULE, on_message_acked, []}}
|
|
|
+ , {'message.dropped', {?MODULE, on_message_dropped, []}}
|
|
|
]).
|
|
|
|
|
|
+
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Clients
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -185,6 +198,45 @@ on_session_terminated(ClientInfo, Reason, _SessInfo) ->
|
|
|
reason => stringfy(Reason)},
|
|
|
cast('session.terminated', Req).
|
|
|
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+%% Message
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+
|
|
|
+on_message_publish(#message{topic = <<"$SYS/", _/binary>>}) ->
|
|
|
+ ok;
|
|
|
+on_message_publish(Message) ->
|
|
|
+ Req = #{message => message(Message)},
|
|
|
+ case call_fold('message.publish', Req,
|
|
|
+ fun emqx_exhook_handler:merge_responsed_message/2) of
|
|
|
+ {StopOrOk, #{message := NMessage}} ->
|
|
|
+ {StopOrOk, assign_to_message(NMessage, Message)};
|
|
|
+ _ -> {ok, Message}
|
|
|
+ end.
|
|
|
+
|
|
|
+on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason) ->
|
|
|
+ ok;
|
|
|
+on_message_dropped(Message, _By, Reason) ->
|
|
|
+ Req = #{message => message(Message),
|
|
|
+ reason => stringfy(Reason)
|
|
|
+ },
|
|
|
+ cast('message.dropped', Req).
|
|
|
+
|
|
|
+on_message_delivered(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) ->
|
|
|
+ ok;
|
|
|
+on_message_delivered(ClientInfo, Message) ->
|
|
|
+ Req = #{clientinfo => clientinfo(ClientInfo),
|
|
|
+ message => message(Message)
|
|
|
+ },
|
|
|
+ cast('message.delivered', Req).
|
|
|
+
|
|
|
+on_message_acked(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) ->
|
|
|
+ ok;
|
|
|
+on_message_acked(ClientInfo, Message) ->
|
|
|
+ Req = #{clientinfo => clientinfo(ClientInfo),
|
|
|
+ message => message(Message)
|
|
|
+ },
|
|
|
+ cast('message.acked', Req).
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Types
|
|
|
|