|
|
@@ -1,23 +1,30 @@
|
|
|
-%% The contents of this file are subject to the Mozilla Public License
|
|
|
-%% Version 1.1 (the "License"); you may not use this file except in
|
|
|
-%% compliance with the License. You may obtain a copy of the License at
|
|
|
-%% http://www.mozilla.org/MPL/
|
|
|
-%%
|
|
|
-%% Software distributed under the License is distributed on an "AS IS"
|
|
|
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
|
|
|
-%% License for the specific language governing rights and limitations
|
|
|
-%% under the License.
|
|
|
-%%
|
|
|
-%% The Original Code is eMQTT
|
|
|
-%%
|
|
|
-%% The Initial Developer of the Original Code is <ery.lee at gmail dot com>
|
|
|
-%% Copyright (C) 2012 Ery Lee All Rights Reserved.
|
|
|
+%%-----------------------------------------------------------------------------
|
|
|
+%% Copyright (c) 2014, Feng Lee <feng.lee@slimchat.io>
|
|
|
+%%
|
|
|
+%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
|
+%% of this software and associated documentation files (the "Software"), to deal
|
|
|
+%% in the Software without restriction, including without limitation the rights
|
|
|
+%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
|
+%% copies of the Software, and to permit persons to whom the Software is
|
|
|
+%% furnished to do so, subject to the following conditions:
|
|
|
+%%
|
|
|
+%% The above copyright notice and this permission notice shall be included in all
|
|
|
+%% copies or substantial portions of the Software.
|
|
|
+%%
|
|
|
+%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
|
+%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
|
+%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
|
+%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
|
+%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
|
+%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
|
+%% SOFTWARE.
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
|
|
|
-module(emqtt_client).
|
|
|
|
|
|
--behaviour(gen_server2).
|
|
|
+-behaviour(gen_server).
|
|
|
|
|
|
--export([start_link/0, go/2, info/1]).
|
|
|
+-export([start_link/1, info/1]).
|
|
|
|
|
|
-export([init/1,
|
|
|
handle_call/3,
|
|
|
@@ -28,12 +35,12 @@
|
|
|
|
|
|
-include("emqtt.hrl").
|
|
|
|
|
|
+-include("emqtt_log.hrl").
|
|
|
+
|
|
|
-include("emqtt_frame.hrl").
|
|
|
|
|
|
-include("emqtt_internal.hrl").
|
|
|
|
|
|
--include_lib("elog/include/elog.hrl").
|
|
|
-
|
|
|
-define(CLIENT_ID_MAXLEN, 23).
|
|
|
|
|
|
-record(state, {socket,
|
|
|
@@ -55,38 +62,23 @@
|
|
|
-define(FRAME_TYPE(Frame, Type),
|
|
|
Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
|
|
|
|
|
|
-start_link() ->
|
|
|
- gen_server2:start_link(?MODULE, [], []).
|
|
|
-
|
|
|
-go(Pid, Sock) ->
|
|
|
- gen_server2:call(Pid, {go, Sock}, infinity).
|
|
|
+start_link(Sock) ->
|
|
|
+ Res = gen_server:start_link(?MODULE, [Sock], []),
|
|
|
+ ?INFO("~p", [Res]).
|
|
|
|
|
|
info(Pid) ->
|
|
|
- gen_server2:call(Pid, info).
|
|
|
-
|
|
|
-init([]) ->
|
|
|
- {ok, undefined, hibernate, {backoff, 1000, 1000, 10000}}.
|
|
|
-
|
|
|
-handle_call(duplicate_id, _From, State=#state{conn_name=ConnName, client_id=ClientId}) ->
|
|
|
- ?ERROR("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]),
|
|
|
- stop({shutdown, duplicate_id}, State);
|
|
|
-
|
|
|
-handle_call(info, _From, #state{conn_name=ConnName,
|
|
|
- message_id=MsgId, client_id=ClientId} = State) ->
|
|
|
- Info = [{conn_name, ConnName},
|
|
|
- {message_id, MsgId},
|
|
|
- {client_id, ClientId}],
|
|
|
- {reply, Info, State};
|
|
|
+ gen_server:call(Pid, info).
|
|
|
|
|
|
-handle_call({go, Sock}, _From, _State) ->
|
|
|
+init([Sock]) ->
|
|
|
process_flag(trap_exit, true),
|
|
|
+ esockd_client:ack(Sock),
|
|
|
+ %%TODO: Move to esockd...
|
|
|
ok = throw_on_error(
|
|
|
inet_error, fun () -> emqtt_net:tune_buffer_size(Sock) end),
|
|
|
{ok, ConnStr} = emqtt_net:connection_string(Sock, inbound),
|
|
|
%FIXME: merge to registry
|
|
|
- emqtt_client_monitor:mon(self()),
|
|
|
- ?ERROR("accepting connection (~s)", [ConnStr]),
|
|
|
- {reply, ok,
|
|
|
+ %%emqtt_client_monitor:mon(self()),
|
|
|
+ {ok,
|
|
|
control_throttle(
|
|
|
#state{ socket = Sock,
|
|
|
conn_name = ConnStr,
|
|
|
@@ -99,6 +91,22 @@ handle_call({go, Sock}, _From, _State) ->
|
|
|
awaiting_ack = gb_trees:empty(),
|
|
|
awaiting_rel = gb_trees:empty()})}.
|
|
|
|
|
|
+ %{ok, undefined, hibernate, {backoff, 1000, 1000, 10000}}.
|
|
|
+
|
|
|
+handle_call(duplicate_id, _From, State=#state{conn_name=ConnName, client_id=ClientId}) ->
|
|
|
+ ?ERROR("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]),
|
|
|
+ stop({shutdown, duplicate_id}, State);
|
|
|
+
|
|
|
+handle_call(info, _From, #state{conn_name=ConnName,
|
|
|
+ message_id=MsgId, client_id=ClientId} = State) ->
|
|
|
+ Info = [{conn_name, ConnName},
|
|
|
+ {message_id, MsgId},
|
|
|
+ {client_id, ClientId}],
|
|
|
+ {reply, Info, State};
|
|
|
+
|
|
|
+handle_call(_Req, _From, State) ->
|
|
|
+ {reply, ok, State}.
|
|
|
+
|
|
|
handle_cast(Msg, State) ->
|
|
|
{stop, {badmsg, Msg}, State}.
|
|
|
|
|
|
@@ -484,6 +492,6 @@ make_msg(#mqtt_frame{
|
|
|
qos = Qos,
|
|
|
topic = Topic,
|
|
|
dup = Dup,
|
|
|
- message_id = MessageId,
|
|
|
+ msgid = MessageId,
|
|
|
payload = Payload}.
|
|
|
|