emqx_shared_sub.erl 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_shared_sub).
  17. -behaviour(gen_server).
  18. -include_lib("emqx_libs/include/emqx.hrl").
  19. -include_lib("emqx_libs/include/emqx_mqtt.hrl").
  20. -include_lib("emqx_libs/include/logger.hrl").
  21. -include_lib("emqx_libs/include/types.hrl").
  22. -logger_header("[Shared Sub]").
  23. %% Mnesia bootstrap
  24. -export([mnesia/1]).
  25. -boot_mnesia({mnesia, [boot]}).
  26. -copy_mnesia({mnesia, [copy]}).
  27. %% APIs
  28. -export([start_link/0]).
  29. -export([ subscribe/3
  30. , unsubscribe/3
  31. ]).
  32. -export([dispatch/3]).
  33. -export([ maybe_ack/1
  34. , maybe_nack_dropped/1
  35. , nack_no_connection/1
  36. , is_ack_required/1
  37. ]).
  38. %% for testing
  39. -export([subscribers/2]).
  40. %% gen_server callbacks
  41. -export([ init/1
  42. , handle_call/3
  43. , handle_cast/2
  44. , handle_info/2
  45. , terminate/2
  46. , code_change/3
  47. ]).
  48. -define(SERVER, ?MODULE).
  49. -define(TAB, emqx_shared_subscription).
  50. -define(SHARED_SUBS, emqx_shared_subscriber).
  51. -define(ALIVE_SUBS, emqx_alive_shared_subscribers).
  52. -define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5).
  53. -define(ack, shared_sub_ack).
  54. -define(nack(Reason), {shared_sub_nack, Reason}).
  55. -define(IS_LOCAL_PID(Pid), (is_pid(Pid) andalso node(Pid) =:= node())).
  56. -define(no_ack, no_ack).
  57. -record(state, {pmon}).
  58. -record(emqx_shared_subscription, {group, topic, subpid}).
  59. %%--------------------------------------------------------------------
  60. %% Mnesia bootstrap
  61. %%--------------------------------------------------------------------
  62. mnesia(boot) ->
  63. ok = ekka_mnesia:create_table(?TAB, [
  64. {type, bag},
  65. {ram_copies, [node()]},
  66. {record_name, emqx_shared_subscription},
  67. {attributes, record_info(fields, emqx_shared_subscription)}]);
  68. mnesia(copy) ->
  69. ok = ekka_mnesia:copy_table(?TAB).
  70. %%--------------------------------------------------------------------
  71. %% API
  72. %%--------------------------------------------------------------------
  73. -spec(start_link() -> startlink_ret()).
  74. start_link() ->
  75. gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
  76. -spec(subscribe(emqx_topic:group(), emqx_topic:topic(), pid()) -> ok).
  77. subscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
  78. gen_server:call(?SERVER, {subscribe, Group, Topic, SubPid}).
  79. -spec(unsubscribe(emqx_topic:group(), emqx_topic:topic(), pid()) -> ok).
  80. unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
  81. gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}).
  82. record(Group, Topic, SubPid) ->
  83. #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
  84. -spec(dispatch(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery())
  85. -> emqx_types:deliver_result()).
  86. dispatch(Group, Topic, Delivery) ->
  87. dispatch(Group, Topic, Delivery, _FailedSubs = []).
  88. dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
  89. #message{from = ClientId} = Msg,
  90. case pick(strategy(), ClientId, Group, Topic, FailedSubs) of
  91. false ->
  92. {error, no_subscribers};
  93. {Type, SubPid} ->
  94. case do_dispatch(SubPid, Topic, Msg, Type) of
  95. ok -> {ok, 1};
  96. {error, _Reason} ->
  97. %% Failed to dispatch to this sub, try next.
  98. dispatch(Group, Topic, Delivery, [SubPid | FailedSubs])
  99. end
  100. end.
  101. -spec(strategy() -> random | round_robin | sticky | hash).
  102. strategy() ->
  103. emqx:get_env(shared_subscription_strategy, round_robin).
  104. -spec(ack_enabled() -> boolean()).
  105. ack_enabled() ->
  106. emqx:get_env(shared_dispatch_ack_enabled, false).
  107. do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() ->
  108. %% Deadlock otherwise
  109. _ = erlang:send(SubPid, {deliver, Topic, Msg}),
  110. ok;
  111. do_dispatch(SubPid, Topic, Msg, Type) ->
  112. dispatch_per_qos(SubPid, Topic, Msg, Type).
  113. %% return either 'ok' (when everything is fine) or 'error'
  114. dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
  115. %% For QoS 0 message, send it as regular dispatch
  116. _ = erlang:send(SubPid, {deliver, Topic, Msg}),
  117. ok;
  118. dispatch_per_qos(SubPid, Topic, Msg, retry) ->
  119. %% Retry implies all subscribers nack:ed, send again without ack
  120. _ = erlang:send(SubPid, {deliver, Topic, Msg}),
  121. ok;
  122. dispatch_per_qos(SubPid, Topic, Msg, fresh) ->
  123. case ack_enabled() of
  124. true ->
  125. dispatch_with_ack(SubPid, Topic, Msg);
  126. false ->
  127. _ = erlang:send(SubPid, {deliver, Topic, Msg}),
  128. ok
  129. end.
  130. dispatch_with_ack(SubPid, Topic, Msg) ->
  131. %% For QoS 1/2 message, expect an ack
  132. Ref = erlang:monitor(process, SubPid),
  133. Sender = self(),
  134. _ = erlang:send(SubPid, {deliver, Topic, with_ack_ref(Msg, {Sender, Ref})}),
  135. Timeout = case Msg#message.qos of
  136. ?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS);
  137. ?QOS_2 -> infinity
  138. end,
  139. try
  140. receive
  141. {Ref, ?ack} ->
  142. ok;
  143. {Ref, ?nack(Reason)} ->
  144. %% the receive session may nack this message when its queue is full
  145. {error, Reason};
  146. {'DOWN', Ref, process, SubPid, Reason} ->
  147. {error, Reason}
  148. after
  149. Timeout ->
  150. {error, timeout}
  151. end
  152. after
  153. _ = erlang:demonitor(Ref, [flush])
  154. end.
  155. with_ack_ref(Msg, SenderRef) ->
  156. emqx_message:set_headers(#{shared_dispatch_ack => SenderRef}, Msg).
  157. without_ack_ref(Msg) ->
  158. emqx_message:set_headers(#{shared_dispatch_ack => ?no_ack}, Msg).
  159. get_ack_ref(Msg) ->
  160. emqx_message:get_header(shared_dispatch_ack, Msg, ?no_ack).
  161. -spec(is_ack_required(emqx_types:message()) -> boolean()).
  162. is_ack_required(Msg) -> ?no_ack =/= get_ack_ref(Msg).
  163. %% @doc Negative ack dropped message due to inflight window or message queue being full.
  164. -spec(maybe_nack_dropped(emqx_types:message()) -> ok).
  165. maybe_nack_dropped(Msg) ->
  166. case get_ack_ref(Msg) of
  167. ?no_ack -> ok;
  168. {Sender, Ref} -> nack(Sender, Ref, dropped)
  169. end.
  170. %% @doc Negative ack message due to connection down.
  171. %% Assuming this function is always called when ack is required
  172. %% i.e is_ack_required returned true.
  173. -spec(nack_no_connection(emqx_types:message()) -> ok).
  174. nack_no_connection(Msg) ->
  175. {Sender, Ref} = get_ack_ref(Msg),
  176. nack(Sender, Ref, no_connection).
  177. -spec(nack(pid(), reference(), dropped | no_connection) -> ok).
  178. nack(Sender, Ref, Reason) ->
  179. erlang:send(Sender, {Ref, ?nack(Reason)}),
  180. ok.
  181. -spec(maybe_ack(emqx_types:message()) -> emqx_types:message()).
  182. maybe_ack(Msg) ->
  183. case get_ack_ref(Msg) of
  184. ?no_ack ->
  185. Msg;
  186. {Sender, Ref} ->
  187. erlang:send(Sender, {Ref, ?ack}),
  188. without_ack_ref(Msg)
  189. end.
  190. pick(sticky, ClientId, Group, Topic, FailedSubs) ->
  191. Sub0 = erlang:get({shared_sub_sticky, Group, Topic}),
  192. case is_active_sub(Sub0, FailedSubs) of
  193. true ->
  194. %% the old subscriber is still alive
  195. %% keep using it for sticky strategy
  196. {fresh, Sub0};
  197. false ->
  198. %% randomly pick one for the first message
  199. {Type, Sub} = do_pick(random, ClientId, Group, Topic, [Sub0 | FailedSubs]),
  200. %% stick to whatever pick result
  201. erlang:put({shared_sub_sticky, Group, Topic}, Sub),
  202. {Type, Sub}
  203. end;
  204. pick(Strategy, ClientId, Group, Topic, FailedSubs) ->
  205. do_pick(Strategy, ClientId, Group, Topic, FailedSubs).
  206. do_pick(Strategy, ClientId, Group, Topic, FailedSubs) ->
  207. All = subscribers(Group, Topic),
  208. case All -- FailedSubs of
  209. [] when FailedSubs =:= [] ->
  210. %% Genuinely no subscriber
  211. false;
  212. [] ->
  213. %% All offline? pick one anyway
  214. {retry, pick_subscriber(Group, Topic, Strategy, ClientId, All)};
  215. Subs ->
  216. %% More than one available
  217. {fresh, pick_subscriber(Group, Topic, Strategy, ClientId, Subs)}
  218. end.
  219. pick_subscriber(_Group, _Topic, _Strategy, _ClientId, [Sub]) -> Sub;
  220. pick_subscriber(Group, Topic, Strategy, ClientId, Subs) ->
  221. Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, length(Subs)),
  222. lists:nth(Nth, Subs).
  223. do_pick_subscriber(_Group, _Topic, random, _ClientId, Count) ->
  224. rand:uniform(Count);
  225. do_pick_subscriber(_Group, _Topic, hash, ClientId, Count) ->
  226. 1 + erlang:phash2(ClientId) rem Count;
  227. do_pick_subscriber(Group, Topic, round_robin, _ClientId, Count) ->
  228. Rem = case erlang:get({shared_sub_round_robin, Group, Topic}) of
  229. undefined -> 0;
  230. N -> (N + 1) rem Count
  231. end,
  232. _ = erlang:put({shared_sub_round_robin, Group, Topic}, Rem),
  233. Rem + 1.
  234. subscribers(Group, Topic) ->
  235. ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
  236. %%--------------------------------------------------------------------
  237. %% gen_server callbacks
  238. %%--------------------------------------------------------------------
  239. init([]) ->
  240. {ok, _} = mnesia:subscribe({table, ?TAB, simple}),
  241. {atomic, PMon} = mnesia:transaction(fun init_monitors/0),
  242. ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]),
  243. ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]),
  244. {ok, update_stats(#state{pmon = PMon})}.
  245. init_monitors() ->
  246. mnesia:foldl(
  247. fun(#emqx_shared_subscription{subpid = SubPid}, Mon) ->
  248. emqx_pmon:monitor(SubPid, Mon)
  249. end, emqx_pmon:new(), ?TAB).
  250. handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) ->
  251. mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)),
  252. case ets:member(?SHARED_SUBS, {Group, Topic}) of
  253. true -> ok;
  254. false -> ok = emqx_router:do_add_route(Topic, {Group, node()})
  255. end,
  256. ok = maybe_insert_alive_tab(SubPid),
  257. true = ets:insert(?SHARED_SUBS, {{Group, Topic}, SubPid}),
  258. {reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
  259. handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
  260. mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)),
  261. true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
  262. case ets:member(?SHARED_SUBS, {Group, Topic}) of
  263. true -> ok;
  264. false -> ok = emqx_router:do_delete_route(Topic, {Group, node()})
  265. end,
  266. {reply, ok, State};
  267. handle_call(Req, _From, State) ->
  268. ?LOG(error, "Unexpected call: ~p", [Req]),
  269. {reply, ignored, State}.
  270. handle_cast(Msg, State) ->
  271. ?LOG(error, "Unexpected cast: ~p", [Msg]),
  272. {noreply, State}.
  273. handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) ->
  274. #emqx_shared_subscription{subpid = SubPid} = NewRecord,
  275. {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
  276. handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
  277. #emqx_shared_subscription{subpid = SubPid} = OldRecord,
  278. {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
  279. handle_info({mnesia_table_event, _Event}, State) ->
  280. {noreply, State};
  281. handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) ->
  282. ?LOG(info, "Shared subscriber down: ~p", [SubPid]),
  283. cleanup_down(SubPid),
  284. {noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
  285. handle_info(Info, State) ->
  286. ?LOG(error, "Unexpected info: ~p", [Info]),
  287. {noreply, State}.
  288. terminate(_Reason, _State) ->
  289. mnesia:unsubscribe({table, ?TAB, simple}).
  290. code_change(_OldVsn, State, _Extra) ->
  291. {ok, State}.
  292. %%--------------------------------------------------------------------
  293. %% Internal functions
  294. %%--------------------------------------------------------------------
  295. %% keep track of alive remote pids
  296. maybe_insert_alive_tab(Pid) when ?IS_LOCAL_PID(Pid) -> ok;
  297. maybe_insert_alive_tab(Pid) when is_pid(Pid) -> ets:insert(?ALIVE_SUBS, {Pid}), ok.
  298. cleanup_down(SubPid) ->
  299. ?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid),
  300. lists:foreach(
  301. fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) ->
  302. ok = mnesia:dirty_delete_object(?TAB, Record),
  303. true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
  304. case ets:member(?SHARED_SUBS, {Group, Topic}) of
  305. true -> ok;
  306. false -> ok = emqx_router:do_delete_route(Topic, {Group, node()})
  307. end
  308. end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})).
  309. update_stats(State) ->
  310. emqx_stats:setstat('subscriptions.shared.count', 'subscriptions.shared.max', ets:info(?TAB, size)),
  311. State.
  312. %% Return 'true' if the subscriber process is alive AND not in the failed list
  313. is_active_sub(Pid, FailedSubs) ->
  314. is_alive_sub(Pid) andalso not lists:member(Pid, FailedSubs).
  315. %% erlang:is_process_alive/1 does not work with remote pid.
  316. is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->
  317. erlang:is_process_alive(Pid);
  318. is_alive_sub(Pid) ->
  319. [] =/= ets:lookup(?ALIVE_SUBS, Pid).