Feng Lee 10 лет назад
Родитель
Сommit
11c4d24aff

+ 0 - 19
plugins/emqttd_auth_mysql/rebar.config

@@ -1,19 +0,0 @@
-%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
-%% ex: ts=4 sw=4 ft=erlang et
-
-{require_min_otp_vsn, "R17"}.
-
-%% fail_on_warning, 
-{erl_opts, [debug_info, {parse_transform, lager_transform}]}.
-
-{erl_opts, [warn_export_all,
-            warn_unused_import,
-            {i, "include"},
-			{src_dirs, ["src"]}]}.
-
-{xref_checks, [undefined_function_calls]}.
-
-{deps, [
-	{'Emysql', "*", {git, "git://github.com/Eonblast/Emysql.git", {branch, "master"}}}
-]}.
-

+ 2 - 1
plugins/emqttd_auth_mysql/src/emqttd_auth_mysql.erl

@@ -45,7 +45,8 @@ check(#mqtt_client{username = undefined}, _Password, _State) ->
 check(_Client, undefined, _State) ->
     {error, "Password undefined"};
 check(#mqtt_client{username = Username}, Password, #state{user_tab = UserTab}) ->
-    case emysql:select(UserTab, {{username, Username}, {password, erlang:md5(Password)}}) of
+    %%TODO: hash password...
+    case emysql:select(UserTab, {{username, Username}, {password, Password}}) of
         {ok, []} -> {error, "Username or Password not match"};
         {ok, _Record} -> ok
     end.

+ 28 - 1
plugins/emqttd_auth_mysql/src/emqttd_auth_mysql_app.erl

@@ -1,3 +1,29 @@
+%%%-----------------------------------------------------------------------------
+%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
+%%% @doc
+%%% mysql authentication app.
+%%%
+%%% @end
+%%%-----------------------------------------------------------------------------
 -module(emqttd_auth_mysql_app).
 
 -behaviour(application).
@@ -11,7 +37,8 @@
 
 start(_StartType, _StartArgs) ->
     {ok, Sup} = emqttd_auth_mysql_sup:start_link(),
-    emqttd_access_control:register_mod(auth, emqttd_auth_mysql, []),
+    Env = application:get_all_env(),
+    emqttd_access_control:register_mod(auth, emqttd_auth_mysql, Env),
     {ok, Sup}.
 
 stop(_State) ->

+ 42 - 0
plugins/emysql/README.md

@@ -0,0 +1,42 @@
+# emysql
+
+Erlang MySQL client
+
+## config
+
+```
+
+```
+
+## Select API
+
+* emyssql:select(tab).
+* emysql:select({tab, [col1,col2]}).
+* emysql:select({tab, [col1, col2], {id,1}}).
+* emysql:select(Query, Load).
+
+## Update API
+
+* emysql:update(tab, [{Field1, Val}, {Field2, Val2}], {id, 1}).
+
+## Insert API
+
+* emysql:insert(tab, [{Field1, Val}, {Field2, Val2}]).
+
+## Delete API
+
+* emysql:delete(tab, {name, Name}]).
+
+## Query API
+
+* emysql:sqlquery("select * from tab;").
+
+## Prepare API
+
+* emysql:prepare(find_with_id, "select * from tab where id = ?;").
+* emysql:execute(find_with_id, [Id]).
+* emysql:unprepare(find_with_id).
+
+## MySQL Client Protocal
+
+* http://forge.mysql.com/wiki/MySQL_Internals_ClientServer_Protocol

+ 2 - 0
plugins/emysql/include/emysql.hrl

@@ -0,0 +1,2 @@
+%% MySQL result record:
+-record(mysql_result, {fieldinfo = [], rows = [], affectedrows = 0, insert_id =0, error = ""}).

+ 14 - 0
plugins/emysql/src/emysql.app.src

@@ -0,0 +1,14 @@
+{application, emysql,
+ [{description, "Erlang MySQL Driver"},
+  {vsn, "1.0"},
+  {modules, [
+		emysql,
+		emysql_app,
+		emysql_sup,
+		emysql_auth,
+		emysql_conn,
+		emysql_recv]},
+  {registered, []},
+  {applications, [kernel, stdlib, sasl, crypto]},
+  {env, []},
+  {mod, {emysql_app, []}}]}.

+ 508 - 0
plugins/emysql/src/emysql.erl

@@ -0,0 +1,508 @@
+%%%----------------------------------------------------------------------
+%%% File    : emysql.erl
+%%% Author  : Ery Lee <ery.lee@gmail.com>
+%%% Purpose : Mysql access api.
+%%% Created : 19 May 2009
+%%% License : http://www.opengoss.com
+%%%
+%%% Copyright (C) 2012, www.opengoss.com 
+%%%----------------------------------------------------------------------
+-module(emysql).
+
+-author('ery.lee@gmail.com').
+
+-include("emysql.hrl").
+
+-export([start_link/1]).
+
+-ifdef(use_specs).
+
+-spec(conns/0 :: () -> list()).
+
+-endif.
+
+%command functions
+-export([info/0,
+		pool/1,
+		conns/0]).
+
+%sql functions
+-export([insert/2,
+		insert/3,
+        select/1,
+        select/2,
+		select/3,
+        update/2,
+        update/3,
+        delete/1,
+        delete/2,
+		truncate/1,
+        prepare/2,
+        execute/1,
+        execute/2,
+        unprepare/1,
+        sqlquery/1,
+		sqlquery/2]).
+
+-behavior(gen_server).
+
+-export([init/1,
+        handle_call/3,
+        handle_cast/2,
+        handle_info/2,
+        terminate/2,
+        code_change/3]).
+
+-record(state, {ids}).
+
+%% External exports
+-export([encode/1,
+	    encode/2,
+        escape/1,
+	    escape_like/1]).
+
+start_link(PoolSize) ->
+	gen_server:start_link({local, ?MODULE}, ?MODULE, [PoolSize], []).
+
+info() ->
+	[emysql_conn:info(Pid) || Pid <- 
+		pg2:get_local_members(emysql_conn)].
+
+%pool pool
+pool(Id) ->
+	gen_server:cast(?MODULE, {pool, Id}).
+
+conns() ->
+	gen_server:call(?MODULE, conns).
+
+insert(Tab, Record) when is_atom(Tab) ->
+	sqlquery(encode_insert(Tab, Record)).
+
+insert(_Tab, _Fields, Values) when length(Values) == 0 ->
+    {updated, {0, 0}};
+
+insert(Tab, Fields, Values) when length(Values) > 0 ->
+	sqlquery(encode_insert(Tab, Fields, Values)).
+
+encode_insert(Tab, Record) ->
+	{Fields, Values} = lists:unzip([{atom_to_list(F), encode(V)} 
+		|| {F, V} <- Record]),
+	["insert into ", atom_to_list(Tab), "(",
+		 string:join(Fields, ","), ") values(",
+		 string:join(Values, ","), ");"].
+
+encode_insert(Tab, Fields, Rows) ->
+	Encode = fun(Row) -> string:join([encode(V) || V <- Row], ",") end,
+	Rows1 = [lists:concat(["(", Encode(Row), ")"]) || Row <- Rows],
+	["insert into ", atom_to_list(Tab), "(",
+		string:join([atom_to_list(F) || F <- Fields], ","), 
+		") values", string:join(Rows1, ","), ";"].
+
+select(Tab) when is_atom(Tab) ->
+	sqlquery(encode_select(Tab));
+
+select(Select) when is_tuple(Select) ->
+	sqlquery(encode_select(Select)).
+
+select(Tab, Where) when is_atom(Tab) and is_tuple(Where) ->
+	sqlquery(encode_select({Tab, Where}));
+
+select(Tab, Fields) when is_atom(Tab) and is_list(Fields) ->
+	sqlquery(encode_select({Tab, Fields}));
+
+select(Select, Load) when is_tuple(Select) and is_integer(Load) ->
+	sqlquery(encode_select(Select), Load).
+
+select(Tab, Fields, Where) when is_atom(Tab) 
+	and is_list(Fields) and is_tuple(Where) ->
+	sqlquery(encode_select({Tab, Fields, Where})).
+
+encode_select(Tab) when is_atom(Tab) ->
+	encode_select({Tab, ['*'], undefined});
+
+encode_select({Tab, Fields}) when is_atom(Tab) 
+	and is_list(Fields) ->
+    encode_select({Tab, Fields, undefined});
+
+encode_select({Tab, Where}) when is_atom(Tab) 
+	and is_tuple(Where) ->
+	encode_select({Tab, ['*'], Where});
+
+encode_select({Tab, Fields, undefined}) when is_atom(Tab) 
+	and is_list(Fields) ->
+	["select ", encode_fields(Fields), " from ", atom_to_list(Tab), ";"];
+
+encode_select({Tab, Fields, Where}) when is_atom(Tab) 
+	and is_list(Fields) and is_tuple(Where) ->
+	["select ", encode_fields(Fields), " from ",
+	 atom_to_list(Tab), " where ", encode_where(Where), ";"].
+
+encode_fields(Fields) ->
+    string:join([atom_to_list(F) || F <- Fields], " ,").
+
+update(Tab, Record) when is_atom(Tab) 
+	and is_list(Record) ->
+	case proplists:get_value(id, Record) of 
+    undefined ->
+		Updates = string:join([encode_column(Col) || Col <- Record], ","),
+		Query = ["update ", atom_to_list(Tab), " set ", Updates, ";"],
+		sqlquery(Query);
+    Id ->
+        update(Tab, lists:keydelete(id, 1, Record), {id, Id})
+	end.
+
+update(Tab, Record, Where) ->
+	Update = string:join([encode_column(Col) || Col <- Record], ","),
+    Query = ["update ", atom_to_list(Tab), " set ", Update,
+		" where ", encode_where(Where), ";"],
+	sqlquery(Query).
+
+encode_column({F, V}) when is_atom(F) ->
+	lists:concat([atom_to_list(F), "=", encode(V)]).
+
+delete(Tab) when is_atom(Tab) ->
+	sqlquery(["delete from ", atom_to_list(Tab), ";"]).
+
+delete(Tab, Id) when is_atom(Tab)
+	and is_integer(Id) ->
+    Query = ["delete from ", atom_to_list(Tab), 
+			 " where ", encode_where({id, Id})],
+	sqlquery(Query);
+
+delete(Tab, Where) when is_atom(Tab)
+	and is_tuple(Where) ->
+    Query = ["delete from ", atom_to_list(Tab),
+			 " where ", encode_where(Where)],
+	sqlquery(Query).
+
+truncate(Tab) when is_atom(Tab) ->
+	sqlquery(["truncate table ", atom_to_list(Tab), ";"]).
+
+sqlquery(Query) ->
+	sqlquery(Query, 1).
+
+sqlquery(Query, Load) -> 
+	with_next_conn(fun(Conn) ->
+		case catch mysql_to_odbc(emysql_conn:sqlquery(Conn, iolist_to_binary(Query))) of
+		{selected, NewFields, Records} -> 
+			{ok, to_tuple_records(NewFields, Records)};
+		{error, Reason} -> 
+			{error, Reason};
+		Res ->
+			Res
+		end
+	end, Load).
+
+prepare(Name, Stmt) when is_list(Stmt) ->
+	prepare(Name, list_to_binary(Stmt));
+
+prepare(Name, Stmt) when is_binary(Stmt) ->
+	with_all_conns(fun(Conn) ->
+		emysql_conn:prepare(Conn, Name, Stmt)
+	end).
+
+execute(Name) ->
+	execute(Name, []).
+
+execute(Name, Params) ->
+	with_next_conn(fun(Conn) ->
+		case catch mysql_to_odbc(emysql_conn:execute(Conn, Name, Params)) of
+		{selected, NewFields, Records} -> 
+			{ok, to_tuple_records(NewFields, Records)};
+		{error, Reason} -> 
+			{error, Reason};
+		Res ->
+			Res
+		end
+	end, 1).
+
+unprepare(Name) ->
+	with_all_conns(fun(Conn) ->
+		emysql_conn:unprepare(Conn, Name)
+	end).
+
+with_next_conn(Fun, _Load) ->
+	Fun(pg2:get_closest_pid(emysql_conn)).
+
+with_all_conns(Fun) ->
+	[Fun(Pid) || Pid <- pg2:get_local_members(emysql_conn)].
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%%                         {ok, State, Timeout} |
+%%                         ignore               |
+%%                         {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([PoolSize]) ->
+	Ids = lists:seq(1, PoolSize),
+	[put(Id, 0) || Id <- Ids],
+	[put({count, Id}, 0) || Id <- Ids],
+    {ok, #state{ids = Ids}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%%                                      {reply, Reply, State, Timeout} |
+%%                                      {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, Reply, State} |
+%%                                      {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+
+handle_call(info, _From, State) ->
+	Reply = [{conn, Id, Pid, get(Id), get({total, Id})} 
+		|| {Id, Pid} <- get_all_conns()],
+	{reply, Reply, State};
+
+handle_call({next_conn, Load}, _From, #state{ids = Ids} = State) ->
+	{ConnId, ConnLoad} =
+	lists:foldl(fun(Id, {MinId, MinLoad}) -> 
+		ThisLoad = get(Id),
+		if
+		ThisLoad =< MinLoad -> {Id, ThisLoad};
+		true -> {MinId, MinLoad}
+		end
+	end, {undefined, 16#ffffffff}, Ids),
+	Reply =
+	case ConnId of
+	undefined -> 
+		undefined;
+	_ -> 
+		ConnPid = get_conn_pid(ConnId),
+		put(ConnId, ConnLoad+Load),
+		Count = get({total, ConnId}),
+		put({total, ConnId}, Count+1),
+		{ConnId, ConnPid}
+	end,
+	{reply, Reply, State};
+	
+handle_call(conns, _From, State) ->
+	Conns = get_all_conns(),
+	{reply, Conns, State};
+
+handle_call(Req, From, State) ->
+    gen_server:reply(From, {badcall, Req}),
+    {stop, {badcall, Req}, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%%                                      {noreply, State, Timeout} |
+%%                                      {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast({pool, Id}, State) ->
+	put(Id, 0),
+	put({total, Id}, 0),
+	{noreply, State};
+
+handle_cast({done, ConnId, Load}, State) ->
+	put(ConnId, get(ConnId) - Load),
+	{noreply, State};
+
+handle_cast(Msg, State) ->
+    {stop, {badcast, Msg}, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%%                                       {noreply, State, Timeout} |
+%%                                       {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(Info, State) ->
+    {stop, {badinfo, Info}, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+    ok.
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+get_conn_pid(CId) ->
+	[{CId, Pid, _Type, _Modules} | _] =
+	lists:dropwhile(fun ({Id, _Pid, _Type, _Modules})
+						  when Id =:= CId -> false;
+						(_)               -> true
+					end,
+					supervisor:which_children(emysql_sup)),
+	Pid.
+
+get_all_conns() ->
+	[{Id, Pid} || {Id, Pid, _Type, _Modules} <- 
+		supervisor:which_children(emysql_sup), is_integer(Id)].
+
+%% Convert MySQL query result to Erlang ODBC result formalism
+mysql_to_odbc({updated, #mysql_result{affectedrows=AffectedRows, insert_id = InsertId} = _MySQLRes}) ->
+    {updated, {AffectedRows, InsertId}};
+
+mysql_to_odbc({data, #mysql_result{fieldinfo = FieldInfo, rows=AllRows} = _MySQLRes}) ->
+    mysql_item_to_odbc(FieldInfo, AllRows);
+
+mysql_to_odbc({error, MySQLRes}) when is_list(MySQLRes) ->
+    {error, MySQLRes};
+
+mysql_to_odbc({error, #mysql_result{error=Reason} = _MySQLRes}) ->
+    {error, Reason};
+
+mysql_to_odbc({error, Reason}) ->
+    {error, Reason}.
+
+%% When tabular data is returned, convert it to the ODBC formalism
+mysql_item_to_odbc(Columns, Recs) ->
+    %% For now, there is a bug and we do not get the correct value from MySQL
+    %% module:
+    {selected,
+     [element(2, Column) || Column <- Columns],
+     [list_to_tuple(Rec) || Rec <- Recs]}.
+
+%%internal functions
+encode_where({'and', L, R}) ->
+	encode_where(L) ++ " and " ++ encode_where(R);
+
+encode_where({'and', List}) when is_list(List) ->
+	string:join([encode_where(E) || E <- List], " and ");
+
+encode_where({'or', L, R}) ->
+	encode_where(L) ++ " or " ++ encode_where(R);
+
+encode_where({'or', List}) when is_list(List) ->
+	string:join([encode_where(E) || E <- List], " or ");
+
+encode_where({like, Field, Value}) ->	
+	atom_to_list(Field) ++ " like " ++ encode(Value);
+
+encode_where({'<', Field, Value}) ->	
+	atom_to_list(Field) ++ " < " ++ encode(Value);
+
+encode_where({'>', Field, Value}) ->	
+	atom_to_list(Field) ++ " > " ++ encode(Value);
+
+encode_where({'in', Field, Values}) ->	
+	InStr = string:join([encode(Value) || Value <- Values], ","),
+	atom_to_list(Field) ++ " in (" ++ InStr ++ ")";
+
+encode_where({Field, Value}) ->
+	atom_to_list(Field) ++ " = " ++ encode(Value).
+
+to_tuple_records(_Fields, []) ->
+	[];
+
+to_tuple_records(Fields, Records) ->
+	[to_tuple_record(Fields, tuple_to_list(Record)) || Record <- Records].
+	
+to_tuple_record(Fields, Record) when length(Fields) == length(Record) ->
+	to_tuple_record(Fields, Record, []).
+
+to_tuple_record([], [], Acc) ->
+	Acc;
+
+to_tuple_record([_F|FT], [undefined|VT], Acc) ->
+	to_tuple_record(FT, VT, Acc);
+
+to_tuple_record([F|FT], [V|VT], Acc) ->
+	to_tuple_record(FT, VT, [{list_to_atom(binary_to_list(F)), V} | Acc]).
+
+%% Escape character that will confuse an SQL engine
+%% Percent and underscore only need to be escaped for pattern matching like
+%% statement
+escape_like(S) when is_list(S) ->
+    [escape_like(C) || C <- S];
+escape_like($%) -> "\\%";
+escape_like($_) -> "\\_";
+escape_like(C)  -> escape(C).
+
+%% Escape character that will confuse an SQL engine
+escape(S) when is_list(S) ->
+	[escape(C) || C <- S];
+%% Characters to escape
+escape($\0) -> "\\0";
+escape($\n) -> "\\n";
+escape($\t) -> "\\t";
+escape($\b) -> "\\b";
+escape($\r) -> "\\r";
+escape($')  -> "\\'";
+escape($")  -> "\\\"";
+escape($\\) -> "\\\\";
+escape(C)   -> C.
+
+encode(Val) ->
+    encode(Val, false).
+encode(Val, false) when Val == undefined; Val == null ->
+    "NULL";
+encode(Val, true) when Val == undefined; Val == null ->
+    <<"NULL">>;
+encode(Val, false) when is_binary(Val) ->
+    binary_to_list(quote(Val));
+encode(Val, true) when is_binary(Val) ->
+    quote(Val);
+encode(Val, true) ->
+    list_to_binary(encode(Val,false));
+encode(Val, false) when is_atom(Val) ->
+    quote(atom_to_list(Val));
+encode(Val, false) when is_list(Val) ->
+    quote(Val);
+encode(Val, false) when is_integer(Val) ->
+    integer_to_list(Val);
+encode(Val, false) when is_float(Val) ->
+    [Res] = io_lib:format("~w", [Val]),
+    Res;
+encode({datetime, Val}, AsBinary) ->
+    encode(Val, AsBinary);
+encode({{Year, Month, Day}, {Hour, Minute, Second}}, false) ->
+    Res = two_digits([Year, Month, Day, Hour, Minute, Second]),
+    lists:flatten(Res);
+encode({TimeType, Val}, AsBinary)
+  when TimeType == 'date';
+       TimeType == 'time' ->
+    encode(Val, AsBinary);
+encode({Time1, Time2, Time3}, false) ->
+    Res = two_digits([Time1, Time2, Time3]),
+    lists:flatten(Res);
+encode(Val, _AsBinary) ->
+    {error, {unrecognized_value, Val}}.
+
+two_digits(Nums) when is_list(Nums) ->
+    [two_digits(Num) || Num <- Nums];
+two_digits(Num) ->
+    [Str] = io_lib:format("~b", [Num]),
+    case length(Str) of
+	1 -> [$0 | Str];
+	_ -> Str
+    end.
+
+%%  Quote a string or binary value so that it can be included safely in a
+%%  MySQL query.
+quote(String) when is_list(String) ->
+    [39 | lists:reverse([39 | quote(String, [])])];	%% 39 is $'
+quote(Bin) when is_binary(Bin) ->
+    list_to_binary(quote(binary_to_list(Bin))).
+
+quote([], Acc) ->
+    Acc;
+quote([0 | Rest], Acc) ->
+    quote(Rest, [$0, $\\ | Acc]);
+quote([10 | Rest], Acc) ->
+    quote(Rest, [$n, $\\ | Acc]);
+quote([13 | Rest], Acc) ->
+    quote(Rest, [$r, $\\ | Acc]);
+quote([$\\ | Rest], Acc) ->
+    quote(Rest, [$\\ , $\\ | Acc]);
+quote([39 | Rest], Acc) ->		%% 39 is $'
+    quote(Rest, [39, $\\ | Acc]);	%% 39 is $'
+quote([34 | Rest], Acc) ->		%% 34 is $"
+    quote(Rest, [34, $\\ | Acc]);	%% 34 is $"
+quote([26 | Rest], Acc) ->
+    quote(Rest, [$Z, $\\ | Acc]);
+quote([C | Rest], Acc) ->
+    quote(Rest, [C | Acc]).
+

+ 27 - 0
plugins/emysql/src/emysql_app.erl

@@ -0,0 +1,27 @@
+%%%----------------------------------------------------------------------
+%%% File    : emysql_app.erl
+%%% Author  : Ery Lee <ery.lee@gmail.com>
+%%% Purpose : mysql driver application
+%%% Created : 21 May 2009
+%%% Updated : 11 Jan 2010 
+%%% License : http://www.opengoss.com
+%%%
+%%% Copyright (C) 2007-2010, www.opengoss.com 
+%%%----------------------------------------------------------------------
+-module(emysql_app).
+
+-author('ery.lee@gmail.com').
+
+-behavior(application).
+
+-export([start/0, start/2, stop/1]).
+
+start() -> 
+	application:start(emysql).
+
+start(normal, _Args) ->
+	emysql_sup:start_link(application:get_all_env()).
+
+stop(_) ->
+	ok.
+

+ 102 - 0
plugins/emysql/src/emysql_auth.erl

@@ -0,0 +1,102 @@
+-module(emysql_auth).
+
+-export([make_auth/2, make_new_auth/3, password_old/2, password_new/2]).
+
+%%--------------------------------------------------------------------
+%% Macros
+%%--------------------------------------------------------------------
+-define(LONG_PASSWORD, 1).
+-define(LONG_FLAG, 4).
+-define(PROTOCOL_41, 512).
+-define(TRANSACTIONS, 8192).
+-define(SECURE_CONNECTION, 32768).
+-define(CONNECT_WITH_DB, 8).
+-define(MAX_PACKET_SIZE, 1000000).
+
+password_old(Password, Salt) ->
+    {P1, P2} = hash(Password),
+    {S1, S2} = hash(Salt),
+    Seed1 = P1 bxor S1,
+    Seed2 = P2 bxor S2,
+    List = rnd(9, Seed1, Seed2),
+    {L, [Extra]} = lists:split(8, List),
+    list_to_binary(lists:map(fun (E) -> E bxor (Extra - 64) end, L)).
+
+%% part of do_old_auth/4, which is part of mysql_init/4
+make_auth(User, Password) ->
+    Caps = ?LONG_PASSWORD bor ?LONG_FLAG bor ?TRANSACTIONS,
+    Maxsize = 0,
+    UserB = list_to_binary(User),
+    PasswordB = Password,
+    <<Caps:16/little, Maxsize:24/little, UserB/binary, 0:8,
+    PasswordB/binary>>.
+
+%% part of do_new_auth/4, which is part of mysql_init/4
+make_new_auth(User, Password, Database) ->
+    DBCaps = case Database of
+		 none ->
+		     0;
+		 _ ->
+		     ?CONNECT_WITH_DB
+	     end,
+    Caps = ?LONG_PASSWORD bor ?LONG_FLAG bor ?TRANSACTIONS bor
+	?PROTOCOL_41 bor ?SECURE_CONNECTION bor DBCaps,
+    Maxsize = ?MAX_PACKET_SIZE,
+    UserB = list_to_binary(User),
+    PasswordL = size(Password),
+    DatabaseB = case Database of
+		    none ->
+			<<>>;
+		    _ ->
+			list_to_binary(Database)
+		end,
+    <<Caps:32/little, Maxsize:32/little, 8:8, 0:23/integer-unit:8,
+    UserB/binary, 0:8, PasswordL:8, Password/binary, DatabaseB/binary>>.
+
+hash(S) ->
+    hash(S, 1345345333, 305419889, 7).
+
+hash([C | S], N1, N2, Add) ->
+    N1_1 = N1 bxor (((N1 band 63) + Add) * C + N1 * 256),
+    N2_1 = N2 + ((N2 * 256) bxor N1_1),
+    Add_1 = Add + C,
+    hash(S, N1_1, N2_1, Add_1);
+hash([], N1, N2, _Add) ->
+    Mask = (1 bsl 31) - 1,
+    {N1 band Mask , N2 band Mask}.
+
+rnd(N, Seed1, Seed2) ->
+    Mod = (1 bsl 30) - 1,
+    rnd(N, [], Seed1 rem Mod, Seed2 rem Mod).
+
+rnd(0, List, _, _) ->
+    lists:reverse(List);
+rnd(N, List, Seed1, Seed2) ->
+    Mod = (1 bsl 30) - 1,
+    NSeed1 = (Seed1 * 3 + Seed2) rem Mod,
+    NSeed2 = (NSeed1 + Seed2 + 33) rem Mod,
+    Float = (float(NSeed1) / float(Mod))*31,
+    Val = trunc(Float)+64,
+    rnd(N - 1, [Val | List], NSeed1, NSeed2).
+
+
+dualmap(_F, [], []) ->
+    [];
+dualmap(F, [E1 | R1], [E2 | R2]) ->
+    [F(E1, E2) | dualmap(F, R1, R2)].
+
+bxor_binary(B1, B2) ->
+    list_to_binary(dualmap(fun (E1, E2) ->
+				   E1 bxor E2
+			   end, binary_to_list(B1), binary_to_list(B2))).
+
+password_new(Password, Salt) ->
+    Stage1 = crypto:sha(Password),
+    Stage2 = crypto:sha(Stage1),
+    Res = crypto:sha_final(
+	    crypto:sha_update(
+	      crypto:sha_update(crypto:sha_init(), Salt),
+	      Stage2)
+	   ),
+    bxor_binary(Res, Stage1).
+

+ 739 - 0
plugins/emysql/src/emysql_conn.erl

@@ -0,0 +1,739 @@
+%%% File    : emysql_conn.erl
+%%% Author  : Ery Lee
+%%% Purpose : connection of mysql driver
+%%% Created : 11 Jan 2010 
+%%% License : http://www.opengoss.com
+%%%
+%%% Copyright (C) 2012, www.opengoss.com 
+%%%----------------------------------------------------------------------
+-module(emysql_conn).
+
+-include("emysql.hrl").
+
+-import(proplists, [get_value/2, get_value/3]).
+
+-behaviour(gen_server).
+
+%% External exports
+-export([start_link/2,
+		info/1,
+		sqlquery/2,
+		sqlquery/3,
+		prepare/3,
+		execute/3,
+		execute/4,
+		unprepare/2]).
+
+%% Callback
+-export([init/1, 
+		handle_call/3, 
+		handle_cast/2, 
+		handle_info/2, 
+		terminate/2, 
+		code_change/3]).
+
+-record(state, {
+		id,
+		host,
+		port,
+		user,
+		password,
+		database,
+		encoding,
+		mysql_version,
+		recv_pid,
+		socket,
+		data}).
+
+%%-define(KEEPALIVE_QUERY, <<"SELECT 1;">>).
+
+-define(SECURE_CONNECTION, 32768).
+
+-define(MYSQL_QUERY_OP, 3).
+
+%CALL > CONNECT
+-define(CALL_TIMEOUT, 301000).
+
+-define(CONNECT_TIMEOUT, 300000).
+
+-define(MYSQL_4_0, 40). %% Support for MySQL 4.0.x
+
+-define(MYSQL_4_1, 41). %% Support for MySQL 4.1.x et 5.0.x
+
+%%--------------------------------------------------------------------
+%% Function: start(Opts)
+%% Descrip.: Starts a mysql_conn process that connects to a MySQL
+%%           server, logs in and chooses a database.
+%% Returns : {ok, Pid} | {error, Reason}
+%%           Pid    = pid()
+%%           Reason = string()
+%%--------------------------------------------------------------------
+start_link(Id, Opts) ->
+    gen_server:start_link(?MODULE, [Id, Opts], []).
+
+info(Conn) ->
+	gen_server:call(Conn, info).
+
+%%--------------------------------------------------------------------
+%% Function: sqlquery(Query)
+%%           Queries   = A single binary() query or a list of binary() queries.
+%%                     If a list is provided, the return value is the return
+%%                     of the last query, or the first query that has
+%%                     returned an error. If an error occurs, execution of
+%%                     the following queries is aborted.
+%%           From    = pid() or term(), use a From of self() when
+%%                     using this module for a single connection,
+%%                     or pass the gen_server:call/3 From argument if
+%%                     using a gen_server to do the querys (e.g. the
+%%                     mysql_dispatcher)
+%%           Timeout = integer() | infinity, gen_server timeout value
+%% Descrip.: Send a query or a list of queries and wait for the result
+%%           if running stand-alone (From = self()), but don't block
+%%           the caller if we are not running stand-alone
+%%           (From = gen_server From).
+%% Returns : ok                        | (non-stand-alone mode)
+%%           {data, #mysql_result}     | (stand-alone mode)
+%%           {updated, #mysql_result}  | (stand-alone mode)
+%%           {error, #mysql_result}      (stand-alone mode)
+%%           FieldInfo = term()
+%%           Rows      = list() of [string()]
+%%           Reason    = term()
+%%--------------------------------------------------------------------
+sqlquery(Conn, Query) ->
+    sqlquery(Conn, Query, ?CALL_TIMEOUT).
+
+sqlquery(Conn, Query, Timeout)  ->
+    call(Conn, {sqlquery, Query}, Timeout).
+
+prepare(Conn, Name, Stmt) ->
+    call(Conn, {prepare, Name, Stmt}).
+
+execute(Conn, Name, Params) ->
+    execute(Conn, Name, Params, ?CALL_TIMEOUT).
+
+execute(Conn, Name, Params, Timeout) ->
+    call(Conn, {execute, Name, Params}, Timeout).
+
+unprepare(Conn, Name) ->
+    call(Conn, {unprepare, Name}).
+
+%%--------------------------------------------------------------------
+%% Function: init(Host, Port, User, Password, Database, Parent)
+%%           Host     = string()
+%%           Port     = integer()
+%%           User     = string()
+%%           Password = string()
+%%           Database = string()
+%%           Parent   = pid() of process starting this mysql_conn
+%% Descrip.: Connect to a MySQL server, log in and chooses a database.
+%%           Report result of this to Parent, and then enter loop() if
+%%           we were successfull.
+%% Returns : void() | does not return
+%%--------------------------------------------------------------------
+init([Id, Opts]) ->
+	put(queries, 0),
+    Host = get_value(host, Opts, "localhost"),
+    Port = get_value(port, Opts, 3306),
+    UserName = get_value(username, Opts, "root"),
+    Password = get_value(password, Opts, "public"),
+    Database = get_value(database, Opts),
+    Encoding = get_value(encoding, Opts, utf8),
+	case emysql_recv:start_link(Host, Port) of
+	{ok, RecvPid, Sock} ->
+	    case mysql_init(Sock, RecvPid, UserName, Password) of
+		{ok, Version} ->
+		    Db = iolist_to_binary(Database),
+		    case do_query(Sock, RecvPid, <<"use ", Db/binary>>, Version) of
+			{error, #mysql_result{error = Error} = _MySQLRes} ->
+			    error_logger:error_msg("emysql_conn: use '~p' error: ~p", [Database, Error]),
+                {stop, using_db_error};
+			{_ResultType, _MySQLRes} ->
+				emysql:pool(Id), %pool it
+				pg2:create(emysql_conn),
+				pg2:join(emysql_conn, self()),
+                EncodingBinary = list_to_binary(atom_to_list(Encoding)),
+                do_query(Sock, RecvPid, <<"set names '", EncodingBinary/binary, "'">>, Version),
+                State = #state{
+						id = Id,
+                        host = Host, 
+                        port = Port, 
+                        user = UserName, 
+                        password = Password,
+                        database = Database, 
+                        encoding = Encoding, 
+                        mysql_version = Version,
+                        recv_pid = RecvPid,
+                        socket   = Sock,
+                        data     = <<>>},
+			    {ok, State}
+            end;
+		{error, Reason} ->
+            {stop, {login_failed, Reason}}
+        end;
+	{error, Reason} ->
+		{stop, Reason}
+	end.
+
+handle_call(info, _From, #state{id = Id} = State) ->
+	Reply = {Id, self(), get(queries)},
+	{reply, Reply, State};
+
+handle_call({sqlquery, Query}, _From, #state{socket = Socket, 
+        recv_pid = RecvPid, mysql_version = Ver} = State)  ->
+	put(queries, get(queries) + 1),
+    case do_query(Socket, RecvPid, Query, Ver) of
+    {error, mysql_timeout} = Err ->
+        {stop, mysql_timeout, Err, State};
+    Res -> 
+        {reply, Res, State}
+    end;
+
+handle_call({prepare, Name, Stmt}, _From, #state{socket = Socket, 
+	recv_pid = RecvPid, mysql_version = Ver} = State) ->
+
+    case do_prepare(Socket, RecvPid, Name, Stmt, Ver) of
+    {error, mysql_timeout} -> 
+        {stop, mysql_timeout, State};
+    _ ->
+        {reply, ok, State}
+    end;
+
+handle_call({unprepare, Name}, _From, #state{socket = Socket, 
+        recv_pid = RecvPid, mysql_version = Ver} = State) ->
+    case do_unprepare(Socket, RecvPid, Name, Ver) of
+    {error, mysql_timeout} -> 
+        {stop, mysql_timeout, State};
+    _ ->
+        {reply, ok, State}
+    end;
+
+handle_call({execute, Name, Params}, _From, #state{socket = Socket, 
+        recv_pid = RecvPid, mysql_version = Ver} = State) ->
+    case do_execute(Socket, RecvPid, Name, Params, Ver) of
+    {error, mysql_timeout} = Err ->
+        {stop, mysql_timeout, Err, State};
+    Res -> 
+        {reply, Res, State}
+    end;
+
+handle_call(Req, _From, State) ->
+    error_logger:error_msg("badreq to emysql_conn: ~p", [Req]),
+    {reply, {error, badreq}, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({mysql_recv, _RecvPid, data, _Packet, SeqNum}, State) ->
+    error_logger:error_msg("unexpected mysql_recv: seq_num = ~p", [SeqNum]),
+    {noreply, State};
+
+handle_info({mysql_recv, _RecvPid, closed, E}, State) ->
+    error_logger:error_msg("mysql socket closed: ~p", [E]),
+    {stop, socket_closed, State};
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+do_queries(Sock, RecvPid, Queries, Version) ->
+    catch
+	lists:foldl(
+	  fun(Query, _LastResponse) ->
+		  case do_query(Sock, RecvPid, Query, Version) of
+		      {error, _} = Err -> throw(Err);
+		      Res -> Res
+		  end
+	  end, ok, Queries).
+
+do_query(Sock, RecvPid, Query, Version) ->
+    Query1 = iolist_to_binary(Query),
+    %?DEBUG("sqlquery ~p (id ~p)", [Query1, RecvPid]),
+    Packet = <<?MYSQL_QUERY_OP, Query1/binary>>,
+    case do_send(Sock, Packet, 0) of
+	ok ->
+	    get_query_response(RecvPid, Version);
+	{error, Reason} ->
+	    {error, Reason}
+    end.
+
+do_prepare(Socket, RecvPid, Name, Stmt, Ver) ->
+    NameBin = atom_to_binary(Name),
+    StmtBin = <<"PREPARE ", NameBin/binary, " FROM '", Stmt/binary, "'">>,
+    do_query(Socket, RecvPid, StmtBin, Ver).
+
+do_execute(Socket, RecvPid, Name, Params, Ver) ->
+    Stmts = make_statements(Name, Params),
+    do_queries(Socket, RecvPid, Stmts, Ver).
+
+do_unprepare(Socket, RecvPid, Name, Ver) ->
+    NameBin = atom_to_binary(Name),
+    StmtBin = <<"UNPREPARE ", NameBin/binary>>,
+    do_query(Socket, RecvPid, StmtBin, Ver).
+
+make_statements(Name, []) ->
+    NameBin = atom_to_binary(Name),
+    [<<"EXECUTE ", NameBin/binary>>];
+
+make_statements(Name, Params) ->
+    NumParams = length(Params),
+    ParamNums = lists:seq(1, NumParams),
+    NameBin = atom_to_binary(Name),
+    ParamNames =
+	lists:foldl(
+	  fun(Num, Acc) ->
+		  ParamName = [$@ | integer_to_list(Num)],
+		  if Num == 1 ->
+			  ParamName ++ Acc;
+		     true ->
+			  [$, | ParamName] ++ Acc
+		  end
+	  end, [], lists:reverse(ParamNums)),
+    ParamNamesBin = list_to_binary(ParamNames),
+    ExecStmt = <<"EXECUTE ", NameBin/binary, " USING ",
+		ParamNamesBin/binary>>,
+
+    ParamVals = lists:zip(ParamNums, Params),
+    Stmts = lists:foldl(
+	      fun({Num, Val}, Acc) ->
+		      NumBin = emysql:encode(Num, true),
+		      ValBin = emysql:encode(Val, true),
+		      [<<"SET @", NumBin/binary, "=", ValBin/binary>> | Acc]
+	       end, [ExecStmt], lists:reverse(ParamVals)),
+    Stmts.
+
+atom_to_binary(Val) ->
+    <<_:4/binary, Bin/binary>> = term_to_binary(Val),
+    Bin.
+
+%%--------------------------------------------------------------------
+%% authentication
+%%--------------------------------------------------------------------
+do_old_auth(Sock, RecvPid, SeqNum, User, Password, Salt1) ->
+    Auth = emysql_auth:password_old(Password, Salt1),
+    Packet = emysql_auth:make_auth(User, Auth),
+    do_send(Sock, Packet, SeqNum),
+    do_recv(RecvPid, SeqNum).
+
+do_new_auth(Sock, RecvPid, SeqNum, User, Password, Salt1, Salt2) ->
+    Auth = emysql_auth:password_new(Password, Salt1 ++ Salt2),
+    Packet2 = emysql_auth:make_new_auth(User, Auth, none),
+    do_send(Sock, Packet2, SeqNum),
+    case do_recv(RecvPid, SeqNum) of
+    {ok, Packet3, SeqNum2} ->
+        case Packet3 of
+        <<254:8>> ->
+            AuthOld = emysql_auth:password_old(Password, Salt1),
+            do_send(Sock, <<AuthOld/binary, 0:8>>, SeqNum2 + 1),
+            do_recv(RecvPid, SeqNum2 + 1);
+        _ -> 
+            {ok, Packet3, SeqNum2}
+        end;
+    {error, Reason} ->
+        {error, Reason}
+    end.
+
+%%--------------------------------------------------------------------
+%% Function: mysql_init(Sock, RecvPid, User, Password)
+%%           Sock     = term(), gen_tcp socket
+%%           RecvPid  = pid(), mysql_recv process
+%%           User     = string()
+%%           Password = string()
+%%           LogFun   = undefined | function() with arity 3
+%% Descrip.: Try to authenticate on our new socket.
+%% Returns : ok | {error, Reason}
+%%           Reason = string()
+%%--------------------------------------------------------------------
+mysql_init(Sock, RecvPid, User, Password) ->
+    case do_recv(RecvPid, undefined) of
+	{ok, Packet, InitSeqNum} ->
+	    {Version, Salt1, Salt2, Caps} = greeting(Packet),
+        %?DEBUG("version: ~p, ~p, ~p, ~p", [Version, Salt1, Salt2, Caps]),
+	    AuthRes =
+		case Caps band ?SECURE_CONNECTION of
+        ?SECURE_CONNECTION ->
+			do_new_auth(Sock, RecvPid, InitSeqNum + 1, User, Password, Salt1, Salt2);
+        _ ->
+			do_old_auth(Sock, RecvPid, InitSeqNum + 1, User, Password, Salt1)
+		end,
+	    case AuthRes of
+		{ok, <<0:8, _Rest/binary>>, _RecvNum} ->
+		    {ok,Version};
+		{ok, <<255:8, _Code:16/little, Message/binary>>, _RecvNum} ->
+		    {error, binary_to_list(Message)};
+		{ok, RecvPacket, _RecvNum} ->
+		    {error, binary_to_list(RecvPacket)};
+		{error, Reason} ->
+		    %?ERROR("init failed receiving data : ~p", [Reason]),
+		    {error, Reason}
+	    end;
+	{error, Reason} ->
+	    {error, Reason}
+    end.
+
+greeting(Packet) ->
+    <<_Protocol:8, Rest/binary>> = Packet,
+    {Version, Rest2} = asciz(Rest),
+    <<_TreadID:32/little, Rest3/binary>> = Rest2,
+    {Salt, Rest4} = asciz(Rest3),
+    <<Caps:16/little, Rest5/binary>> = Rest4,
+    <<_ServerChar:16/binary-unit:8, Rest6/binary>> = Rest5,
+    {Salt2, _Rest7} = asciz(Rest6),
+    %?DEBUG("greeting version ~p (protocol ~p) salt ~p caps ~p serverchar ~p"
+	  %"salt2 ~p",
+	  %[Version, Protocol, Salt, Caps, ServerChar, Salt2]),
+    {normalize_version(Version), Salt, Salt2, Caps}.
+
+%% part of greeting/2
+asciz(Data) when is_binary(Data) ->
+    asciz_binary(Data, []);
+asciz(Data) when is_list(Data) ->
+    {String, [0 | Rest]} = lists:splitwith(fun (C) ->
+						   C /= 0
+					   end, Data),
+    {String, Rest}.
+
+%% @doc Find the first zero-byte in Data and add everything before it
+%%   to Acc, as a string.
+%%
+%% @spec asciz_binary(Data::binary(), Acc::list()) ->
+%%   {NewList::list(), Rest::binary()}
+asciz_binary(<<>>, Acc) ->
+    {lists:reverse(Acc), <<>>};
+asciz_binary(<<0:8, Rest/binary>>, Acc) ->
+    {lists:reverse(Acc), Rest};
+asciz_binary(<<C:8, Rest/binary>>, Acc) ->
+    asciz_binary(Rest, [C | Acc]).
+
+%%--------------------------------------------------------------------
+%% Function: get_query_response(RecvPid)
+%%           RecvPid = pid(), mysql_recv process
+%%           Version = integer(), Representing MySQL version used
+%% Descrip.: Wait for frames until we have a complete query response.
+%% Returns :   {data, #mysql_result}
+%%             {updated, #mysql_result}
+%%             {error, #mysql_result}
+%%           FieldInfo    = list() of term()
+%%           Rows         = list() of [string()]
+%%           AffectedRows = int()
+%%           Reason       = term()
+%%--------------------------------------------------------------------
+get_query_response(RecvPid, Version) ->
+    case do_recv(RecvPid, undefined) of
+	{ok, <<Fieldcount:8, Rest/binary>>, _} ->
+	    case Fieldcount of
+		0 ->
+		    %% No Tabular data
+            {AffectedRows, Rest1} = decode_length_binary(Rest),
+            {InsertId, _} = decode_length_binary(Rest1),
+		    {updated, #mysql_result{insert_id = InsertId, affectedrows=AffectedRows}};
+		255 ->
+		    <<_Code:16/little, Message/binary>>  = Rest,
+		    {error, #mysql_result{error=Message}};
+		_ ->
+		    %% Tabular data received
+		    case get_fields(RecvPid, [], Version) of
+			{ok, Fields} ->
+			    case get_rows(Fields, RecvPid, []) of
+				{ok, Rows} ->
+				    {data, #mysql_result{fieldinfo=Fields,
+							 rows=Rows}};
+				{error, Reason} ->
+				    {error, Reason}
+			    end;
+			{error, Reason} ->
+			    {error, Reason}
+		    end
+	    end;
+	{error, Reason} ->
+	    {error, Reason}
+    end.
+
+decode_length_binary(<<Len:8, Rest/binary>>) ->
+    if
+    Len =< 251 -> 
+        {Len, Rest};
+    Len == 252 -> %two bytes
+        <<Val:16/little, Rest1/binary>> = Rest,
+        {Val, Rest1};
+    Len == 253 -> %three
+        <<Val:24/little, Rest1/binary>> = Rest,
+        {Val, Rest1};
+    Len == 254 -> %eight
+        <<Val:64/little, Rest1/binary>> = Rest,
+        {Val, Rest1};
+    true ->
+        %?ERROR("affectedrows: ~p", [Len]),
+        {0, Rest}
+    end.
+
+%%--------------------------------------------------------------------
+%% Function: do_recv(RecvPid, SeqNum)
+%%           RecvPid = pid(), mysql_recv process
+%%           SeqNum  = undefined | integer()
+%% Descrip.: Wait for a frame decoded and sent to us by RecvPid.
+%%           Either wait for a specific frame if SeqNum is an integer,
+%%           or just any frame if SeqNum is undefined.
+%% Returns : {ok, Packet, Num} |
+%%           {error, Reason}
+%%           Reason = term()
+%%
+%% Note    : Only to be used externally by the 'mysql_auth' module.
+%%--------------------------------------------------------------------
+do_recv(RecvPid, SeqNum) when SeqNum == undefined ->
+    receive
+    {mysql_recv, RecvPid, data, Packet, Num} ->
+	    {ok, Packet, Num};
+	{mysql_recv, RecvPid, closed, _E} ->
+	    {error, socket_closed}
+    after ?CONNECT_TIMEOUT ->
+        {error, mysql_timeout}
+    end;
+
+do_recv(RecvPid, SeqNum) when is_integer(SeqNum) ->
+    ResponseNum = SeqNum + 1,
+    receive
+    {mysql_recv, RecvPid, data, Packet, ResponseNum} ->
+	    {ok, Packet, ResponseNum};
+	{mysql_recv, RecvPid, closed, _E} ->
+	    {error, socket_closed}
+    after ?CONNECT_TIMEOUT ->
+        {error, mysql_timeout}
+    end.
+
+call(Conn, Req) ->
+    gen_server:call(Conn, Req).
+
+call(Conn, Req, Timeout) ->
+    gen_server:call(Conn, Req, Timeout).
+
+%%--------------------------------------------------------------------
+%% Function: get_fields(RecvPid, [], Version)
+%%           RecvPid = pid(), mysql_recv process
+%%           Version = integer(), Representing MySQL version used
+%% Descrip.: Received and decode field information.
+%% Returns : {ok, FieldInfo} |
+%%           {error, Reason}
+%%           FieldInfo = list() of term()
+%%           Reason    = term()
+%%--------------------------------------------------------------------
+%% Support for MySQL 4.0.x:
+get_fields(RecvPid, Res, ?MYSQL_4_0) ->
+    case do_recv(RecvPid, undefined) of
+	{ok, Packet, _Num} ->
+	    case Packet of
+		<<254:8>> ->
+		    {ok, lists:reverse(Res)};
+		<<254:8, Rest/binary>> when size(Rest) < 8 ->
+		    {ok, lists:reverse(Res)};
+		_ ->
+		    {Table, Rest} = get_with_length(Packet),
+		    {Field, Rest2} = get_with_length(Rest),
+		    {LengthB, Rest3} = get_with_length(Rest2),
+		    LengthL = size(LengthB) * 8,
+		    <<Length:LengthL/little>> = LengthB,
+		    {Type, Rest4} = get_with_length(Rest3),
+		    {_Flags, _Rest5} = get_with_length(Rest4),
+		    This = {Table,
+			    Field,
+			    Length,
+			    %% TODO: Check on MySQL 4.0 if types are specified
+			    %%       using the same 4.1 formalism and could 
+			    %%       be expanded to atoms:
+			    Type},
+		    get_fields(RecvPid, [This | Res], ?MYSQL_4_0)
+	    end;
+	{error, Reason} ->
+	    {error, Reason}
+    end;
+%% Support for MySQL 4.1.x and 5.x:
+get_fields(RecvPid, Res, ?MYSQL_4_1) ->
+    case do_recv(RecvPid, undefined) of
+	{ok, Packet, _Num} ->
+	    case Packet of
+		<<254:8>> ->
+		    {ok, lists:reverse(Res)};
+		<<254:8, Rest/binary>> when size(Rest) < 8 ->
+		    {ok, lists:reverse(Res)};
+		_ ->
+		    {_Catalog, Rest} = get_with_length(Packet),
+		    {_Database, Rest2} = get_with_length(Rest),
+		    {Table, Rest3} = get_with_length(Rest2),
+		    %% OrgTable is the real table name if Table is an alias
+		    {_OrgTable, Rest4} = get_with_length(Rest3),
+		    {Field, Rest5} = get_with_length(Rest4),
+		    %% OrgField is the real field name if Field is an alias
+		    {_OrgField, Rest6} = get_with_length(Rest5),
+
+		    <<_Metadata:8/little, _Charset:16/little,
+		     Length:32/little, Type:8/little,
+		     _Flags:16/little, _Decimals:8/little,
+		     _Rest7/binary>> = Rest6,
+		    
+		    This = {Table,
+			    Field,
+			    Length,
+			    get_field_datatype(Type)},
+		    get_fields(RecvPid, [This | Res], ?MYSQL_4_1)
+	    end;
+	{error, Reason} ->
+	    {error, Reason}
+    end.
+
+%%--------------------------------------------------------------------
+%% Function: get_rows(N, RecvPid, [])
+%%           N       = integer(), number of rows to get
+%%           RecvPid = pid(), mysql_recv process
+%% Descrip.: Receive and decode a number of rows.
+%% Returns : {ok, Rows} |
+%%           {error, Reason}
+%%           Rows = list() of [string()]
+%%--------------------------------------------------------------------
+get_rows(Fields, RecvPid, Res) ->
+    case do_recv(RecvPid, undefined) of
+	{ok, Packet, _Num} ->
+	    case Packet of
+		<<254:8, Rest/binary>> when size(Rest) < 8 ->
+		    {ok, lists:reverse(Res)};
+		_ ->
+		    {ok, This} = get_row(Fields, Packet, []),
+		    get_rows(Fields, RecvPid, [This | Res])
+	    end;
+	{error, Reason} ->
+	    {error, Reason}
+    end.
+
+%% part of get_rows/4
+get_row([], _Data, Res) ->
+    {ok, lists:reverse(Res)};
+get_row([Field | OtherFields], Data, Res) ->
+    {Col, Rest} = get_with_length(Data),
+    This = case Col of
+	       null ->
+		   undefined;
+	       _ ->
+		   convert_type(Col, element(4, Field))
+	   end,
+    get_row(OtherFields, Rest, [This | Res]).
+
+get_with_length(<<251:8, Rest/binary>>) ->
+    {null, Rest};
+get_with_length(<<252:8, Length:16/little, Rest/binary>>) ->
+    split_binary(Rest, Length);
+get_with_length(<<253:8, Length:24/little, Rest/binary>>) ->
+    split_binary(Rest, Length);
+get_with_length(<<254:8, Length:64/little, Rest/binary>>) ->
+    split_binary(Rest, Length);
+get_with_length(<<Length:8, Rest/binary>>) when Length < 251 ->
+    split_binary(Rest, Length).
+
+
+%%--------------------------------------------------------------------
+%% Function: do_send(Sock, Packet, SeqNum)
+%%           Sock   = term(), gen_tcp socket
+%%           Packet = binary()
+%%           SeqNum = integer(), packet sequence number
+%% Descrip.: Send a packet to the MySQL server.
+%% Returns : result of gen_tcp:send/2
+%%--------------------------------------------------------------------
+do_send(Sock, Packet, SeqNum) when is_binary(Packet), is_integer(SeqNum) ->
+    Data = <<(size(Packet)):24/little, SeqNum:8, Packet/binary>>,
+    gen_tcp:send(Sock, Data).
+
+%%--------------------------------------------------------------------
+%% Function: normalize_version(Version)
+%%           Version  = string()
+%% Descrip.: Return a flag corresponding to the MySQL version used.
+%%           The protocol used depends on this flag.
+%% Returns : Version = string()
+%%--------------------------------------------------------------------
+normalize_version([$4,$.,$0|_T]) ->
+    %?DEBUG("switching to MySQL 4.0.x protocol.", []),
+    ?MYSQL_4_0;
+normalize_version([$4,$.,$1|_T]) ->
+    ?MYSQL_4_1;
+normalize_version([$5|_T]) ->
+    %% MySQL version 5.x protocol is compliant with MySQL 4.1.x:
+    ?MYSQL_4_1; 
+normalize_version([$6|_T]) ->
+    %% MySQL version 6.x protocol is compliant with MySQL 4.1.x:
+    ?MYSQL_4_1; 
+normalize_version(_Other) ->
+    %?ERROR("MySQL version '~p' not supported: MySQL Erlang module "
+	% "might not work correctly.", [Other]),
+    %% Error, but trying the oldest protocol anyway:
+    ?MYSQL_4_0.
+
+%%--------------------------------------------------------------------
+%% Function: get_field_datatype(DataType)
+%%           DataType = integer(), MySQL datatype
+%% Descrip.: Return MySQL field datatype as description string
+%% Returns : String, MySQL datatype
+%%--------------------------------------------------------------------
+get_field_datatype(0) ->   'DECIMAL';
+get_field_datatype(1) ->   'TINY';
+get_field_datatype(2) ->   'SHORT';
+get_field_datatype(3) ->   'LONG';
+get_field_datatype(4) ->   'FLOAT';
+get_field_datatype(5) ->   'DOUBLE';
+get_field_datatype(6) ->   'NULL';
+get_field_datatype(7) ->   'TIMESTAMP';
+get_field_datatype(8) ->   'LONGLONG';
+get_field_datatype(9) ->   'INT24';
+get_field_datatype(10) ->  'DATE';
+get_field_datatype(11) ->  'TIME';
+get_field_datatype(12) ->  'DATETIME';
+get_field_datatype(13) ->  'YEAR';
+get_field_datatype(14) ->  'NEWDATE';
+get_field_datatype(246) -> 'NEWDECIMAL';
+get_field_datatype(247) -> 'ENUM';
+get_field_datatype(248) -> 'SET';
+get_field_datatype(249) -> 'TINYBLOB';
+get_field_datatype(250) -> 'MEDIUM_BLOG';
+get_field_datatype(251) -> 'LONG_BLOG';
+get_field_datatype(252) -> 'BLOB';
+get_field_datatype(253) -> 'VAR_STRING';
+get_field_datatype(254) -> 'STRING';
+get_field_datatype(255) -> 'GEOMETRY'.
+
+convert_type(Val, ColType) ->
+    case ColType of
+	T when T == 'TINY';
+	       T == 'SHORT';
+	       T == 'LONG';
+	       T == 'LONGLONG';
+	       T == 'INT24';
+	       T == 'YEAR' ->
+	    list_to_integer(binary_to_list(Val));
+	T when T == 'TIMESTAMP';
+	       T == 'DATETIME' ->
+	    {ok, [Year, Month, Day, Hour, Minute, Second], _Leftovers} =
+		io_lib:fread("~d-~d-~d ~d:~d:~d", binary_to_list(Val)),
+	    {datetime, {{Year, Month, Day}, {Hour, Minute, Second}}};
+	'TIME' ->
+	    {ok, [Hour, Minute, Second], _Leftovers} =
+		io_lib:fread("~d:~d:~d", binary_to_list(Val)),
+	    {time, {Hour, Minute, Second}};
+	'DATE' ->
+	    {ok, [Year, Month, Day], _Leftovers} =
+		io_lib:fread("~d-~d-~d", binary_to_list(Val)),
+	    {date, {Year, Month, Day}};
+	T when T == 'DECIMAL';
+	       T == 'NEWDECIMAL';
+	       T == 'FLOAT';
+	       T == 'DOUBLE' ->
+	    {ok, [Num], _Leftovers} =
+		case io_lib:fread("~f", binary_to_list(Val)) of
+		    {error, _} ->
+			io_lib:fread("~d", binary_to_list(Val));
+		    Res ->
+			Res
+		end,
+	    Num;
+	_Other ->
+	    Val
+    end.

+ 130 - 0
plugins/emysql/src/emysql_recv.erl

@@ -0,0 +1,130 @@
+%%%-------------------------------------------------------------------
+%%% File    : emysql_recv.erl
+%%% Author  : Fredrik Thulin <ft@it.su.se>
+%%% Descrip.: Handles data being received on a MySQL socket. Decodes
+%%%           per-row framing and sends each row to parent.
+%%%
+%%% Created :  4 Aug 2005 by Fredrik Thulin <ft@it.su.se>
+%%%
+%%% Note    : All MySQL code was written by Magnus Ahltorp, originally
+%%%           in the file mysql.erl - I just moved it here.
+%%%
+%%% Copyright (c) 2001-2004 Kungliga Tekniska 
+%%% See the file COPYING
+%%%
+%%%           Signals this receiver process can send to it's parent
+%%%             (the parent is a mysql_conn connection handler) :
+%%%
+%%%             {mysql_recv, self(), data, Packet, Num}
+%%%             {mysql_recv, self(), closed, {error, Reason}}
+%%%             {mysql_recv, self(), closed, normal}
+%%%
+%%%           Internally (from inside init/4 to start_link/4) the
+%%%           following signals may be sent to the parent process :
+%%%
+%%%             {mysql_recv, self(), init, {ok, Sock}}
+%%%             {mysql_recv, self(), init, {error, E}}
+%%%
+%%%-------------------------------------------------------------------
+-module(emysql_recv).
+
+%%--------------------------------------------------------------------
+%% External exports (should only be used by the 'mysql_conn' module)
+%%--------------------------------------------------------------------
+-export([start_link/2]).
+
+%callback
+-export([init/3]).
+
+-record(state, {
+		socket,
+		parent,
+		log_fun,
+		data}).
+
+-define(SECURE_CONNECTION, 32768).
+
+-define(CONNECT_TIMEOUT, 10000).
+
+%%--------------------------------------------------------------------
+%% Function: start_link(Host, Port, Parent)
+%%           Host = string()
+%%           Port = integer()
+%%           Parent = pid(), process that should get received frames
+%% Descrip.: Start a process that connects to Host:Port and waits for
+%%           data. When it has received a MySQL frame, it sends it to
+%%           Parent and waits for the next frame.
+%% Returns : {ok, RecvPid, Socket} |
+%%           {error, Reason}
+%%           RecvPid = pid(), receiver process pid
+%%           Socket  = term(), gen_tcp socket
+%%           Reason  = atom() | string()
+%%--------------------------------------------------------------------
+start_link(Host, Port) ->
+	proc_lib:start_link(?MODULE, init, [self(), Host, Port]).
+
+%%--------------------------------------------------------------------
+%% Function: init((Host, Port, Parent)
+%%           Host = string()
+%%           Port = integer()
+%%           Parent = pid(), process that should get received frames
+%% Descrip.: Connect to Host:Port and then enter receive-loop.
+%% Returns : error | never returns
+%%--------------------------------------------------------------------
+init(Parent, Host, Port) ->
+    case gen_tcp:connect(Host, Port, [binary, {packet, 0}]) of
+	{ok, Sock} ->
+		proc_lib:init_ack(Parent, {ok, self(), Sock}),
+		loop(#state{socket = Sock, parent = Parent, data = <<>>});
+	{error, Reason} ->
+		proc_lib:init_ack(Parent, {error, Reason})
+	end.
+
+%%--------------------------------------------------------------------
+%% Function: loop(State)
+%%           State = state record()
+%% Descrip.: The main loop. Wait for data from our TCP socket and act
+%%           on received data or signals that our socket was closed.
+%% Returns : error | never returns
+%%--------------------------------------------------------------------
+loop(State) ->
+    Sock = State#state.socket,
+    receive
+	{tcp, Sock, InData} ->
+	    NewData = list_to_binary([State#state.data, InData]),
+	    %% send data to parent if we have enough data
+	    Rest = sendpacket(State#state.parent, NewData),
+	    loop(State#state{data = Rest});
+	{tcp_error, Sock, Reason} ->
+	    State#state.parent ! {mysql_recv, self(), closed, {error, Reason}},
+	    error;
+	{tcp_closed, Sock} ->
+	    State#state.parent ! {mysql_recv, self(), closed, normal},
+	    error;
+	_Other -> %maybe system message
+		loop(State)
+    end.
+
+%%--------------------------------------------------------------------
+%% Function: sendpacket(Parent, Data)
+%%           Parent = pid()
+%%           Data   = binary()
+%% Descrip.: Check if we have received one or more complete frames by
+%%           now, and if so - send them to Parent.
+%% Returns : Rest = binary()
+%%--------------------------------------------------------------------
+%% send data to parent if we have enough data
+sendpacket(Parent, Data) ->
+    case Data of
+	<<Length:24/little, Num:8, D/binary>> ->
+	    if
+		Length =< size(D) ->
+		    {Packet, Rest} = split_binary(D, Length),
+		    Parent ! {mysql_recv, self(), data, Packet, Num},
+		    sendpacket(Parent, Rest);
+		true ->
+		    Data
+	    end;
+	_ ->
+	    Data
+    end.

+ 33 - 0
plugins/emysql/src/emysql_sup.erl

@@ -0,0 +1,33 @@
+%%%----------------------------------------------------------------------
+%%% File    : emysql_sup.erl
+%%% Author  : Ery Lee
+%%% Purpose : Mysql driver supervisor
+%%% Created : 21 May 2009 
+%%% Updated : 11 Jan 2010 
+%%% License : http://www.opengoss.com
+%%%
+%%% Copyright (C) 2012, www.opengoss.com 
+%%%----------------------------------------------------------------------
+-module(emysql_sup).
+
+-author('ery.lee@gmail.com').
+
+-behavior(supervisor).
+
+%% API
+-export([start_link/1, init/1]).
+
+start_link(Opts) ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, Opts).  
+
+init(Opts) ->
+    PoolSize = proplists:get_value(pool_size, Opts,
+                                   erlang:system_info(schedulers)),
+    {ok, {{one_for_one, 10, 10},
+		  [{emysql, {emysql, start_link, [PoolSize]}, transient,
+            16#ffffffff, worker, [emysql]} |
+		   [{I, {emysql_conn, start_link, [I, Opts]}, transient, 16#ffffffff,
+			worker, [emysql_conn, emysql_recv]} || I <- lists:seq(1, PoolSize)]]
+		}
+	}.
+	

+ 11 - 2
rel/files/plugins.config

@@ -1,6 +1,15 @@
 [
- {emqttd_plugin_demo, [
-    {config, value}
+ {emysql, [
+    {pool_size, 4},
+    {host, "localhost"},
+    {port, 3306},
+    {username, "root"},
+    {password, "public"},
+    {database, "mqtt"},
+    {encoding, utf8}
+ ]},
+ {emqttd_auth_mysql, [
+    {user_table, mqtt_users}
  ]}
 %%
 % {emqttd_dashboard, [