Browse Source

Release v1.0-rc1

Many simplifications and little optimizations
Now using default host '127.0.0.1' and port 5432 values
Small changes in interface, API stabilization

Release candidate
master
annelin 3 months ago
parent
commit
45c2932112
3 changed files with 60 additions and 108 deletions
  1. 6
    6
      README.md
  2. 48
    97
      pg_tarantool/init.lua
  3. 6
    5
      pg_tarantool/types/init.lua

+ 6
- 6
README.md View File

@@ -31,13 +31,13 @@ luarocks install http://dev.narayana.im/pg-tarantool/raw/tip/pg-tarantool-hg-999

``` lua
pg = require('pg_tarantool')
link = pg.connect({host = localhost, user = 'user', pass = 'pass', db = 'db', sslmode = 'required', connections = 10})
link = pg.connect{host = '127.0.0.1', port = 5432, user = 'user', pass = 'pass', db = 'db', sslmode = 'required', connections = 1}
data = link:query("SELECT 1 as ONE, '2' as TWOSTRING")
```

## API Documentation

### `link = pg.connect(opts = {connections = 1})`
### `link = pg.connectt{host = '127.0.0.1', port = 5432, user = 'user', pass = 'pass', db = 'db', sslmode = 'required', connections = 1}`

Connect to a database.
Returns _link_ (connections pool).
@@ -75,10 +75,10 @@ Arguments can be: numeric, string or table types. They will be escaped and forma
...
```

### `link:prepare(statement, persist)`
### `link:prepare(statement, persistent)`

Prepare a SQL statement.
If `persist` is true, this statement won't be deallocated after execution.
If `persistent` set as true, this statement won't be deallocated after execution.

*Returns*:

@@ -109,8 +109,8 @@ Closes database link.
### Writable variables

- `pg.types.null` -- convert postgresql `NULL` to this value (default: `nil`).
- `link.error` -- function called on SQL query failure (default: `function() end`)
- `link.debug` -- function called on each SQL query (default: `function() end`) .
- `pg.catch.error` -- function called on SQL query failure (default: `function() end`)
- `pg.catch.debug` -- function called on each SQL query (default: `function() end`) .

# See Also

+ 48
- 97
pg_tarantool/init.lua View File

@@ -1,117 +1,62 @@
--
local fiber = require 'fiber'
local types = require 'pg_tarantool.types'
local driver = require 'pg_tarantool.driver'
driver.decode = types.decode
driver.decode = types.decode

local SQL_RESET = "begin; reset all; commit;"
local SQL_PARAMS = {host = '127.0.0.1', port = 5432, user = false, password = false, dbname = false, sslmode = 'disable'}
------------------------------------------------------------------------
local handlers = {error=driver.noop, debug=driver.noop}

-----------------------------------------------------------------------------------------------------------------
--
local function prep_deallocate(stmt, no_dealloc) -- deallocated prepared statement. --
local sql_query, result, state, err
-- ...
if not stmt.persist or no_dealloc == nil then -- if `persist` attribute is not true or it is manual call (without an argument ) --
sql_query = "deallocate "..stmt.uuid
result, state, err = stmt.conn:execute("deallocate "..stmt.uuid)
stmt:debug(sql_query, state, result, err)
stmt.conn, stmt.execute, stmt.queue = stmt.queue:put(state > -127 and stmt.conn)
setmetatable(stmt, nil)
end
--
return stmt
end
--
local function prep_execute(stmt, ...) -- execute prepared statement --
local params, sql_query, result, state, err --
--
params = types.parse({...}, function(s) return stmt.conn:quote(s) end)
sql_query = string.format(#params>0 and "execute %s (%s)" or "execute %s", stmt.uuid, table.concat(params, ','))
result, state, err = stmt.conn:execute(sql_query) stmt:deallocate(true)
stmt:debug(sql_query, state, result, err)
-- ...
return (state > 0) and unpack(result) or false, err, err and stmt.error(err)
-------------------------------------
--- sql queries --
--
local function escape(this) return function(...) return this:quote(...) end end
local function query(this, sql, ...) --
local conn = (this.execute) and this or this.queue:get() -- get connection from pool
local sql = string.format(sql, unpack(types.parse({...}, escape(conn)))) -- build query
local result, state, reason = conn:execute(sql) -- fetch result
handlers.debug(sql, state, result)
conn = conn == this and conn or this.queue:put(state>-127 and conn) -- return connection to pool
return (state > 0) and unpack(result) or false, reason, reason and handlers.error(reason) --
end

--
local function prepare(this, sql_query, persist) -- prepare statement --
local conn, params, result, state, err, uuid --
--
--
conn = this.queue:get() -- get connection from pool --
uuid = "pg_tarantool_" .. conn:uuid() -- create uuid for prepared statement --
sql_query = string.format("prepare %s as %s", uuid, sql_query) -- create statement --
result, state, err = conn:execute(sql_query) --
this.debug(sql_query, state, result, err) --
if state < 0 then this.queue:put(state > -127 and conn) return false, err, err and this.error(err) end -- failed to prepare statement -- return false --
--
-- .. statement
return setmetatable({uuid = uuid, persist = persist, conn = conn, queue = this.queue, error = this.error, debug = this.debug, execute = prep_execute, deallocate = prep_deallocate}, {__call = prep_execute})
-------------------------------------
----- sql prepared statements --
--
local function prepare(this, sql, persist) --
local prep_dealloc = function(s) this.query(s.conn, "deallocate "..s.uuid) this.queue:put(s.conn) setmetatable(s, nil) s.execute, s.deallocate, s.conn = nil return s end
local prep_execute = function(s,...) local res, err = this.query(s.conn, "execute "..s.uuid..types.parse({...}, escape(s.conn), true)) s = s.persist and s or s:deallocate() return res, err end
local conn = this.queue:get() -- get connection from pool
local uuid = "pg_trntl_"..driver.uuid() -- uuid
local _, reason = this.query(conn, "prepare "..uuid.." as "..sql) -- prepare statement --
if reason then this.queue:put(conn) return false, reason end -- return connection on error --
return setmetatable({conn=conn,uuid=uuid,persist=persist,execute=prep_execute,deallocate=prep_dealloc},{__call=prep_execute}) --
end

-----------------------------------------------------------------------------------------------------------------
--
local function query(this, sql_query, ...)
local conn, params, result, state, err --
--
--
conn = this.queue:get() -- get connection from pool --
params = types.parse({...}, function(s) return conn:quote(s) end) -- parse and params --
sql_query = (#params == 0) and sql_query or string.format(sql_query, unpack(params)) -- format query --
result, state, err = conn:execute(sql_query) -- execute query --
this.debug(sql_query, state, result, err) -- debug function --
this.queue:put(state > -127 and conn) -- put connection to pool --
-------------------------------------
----- connection pools ---
--
local queue_t = {
get = function(qu,c) c=qu.pool:get(qu.timeout) or driver.connect(qu.conn_string) or error 'db link is broken' c:execute(c:active() and SQL_RESET) return c end,
put = function(qu,c) return qu.pool:put(c and c:active() ~= nil and c, qu.timeout) end,
close= function(qu,c) while qu.pool:count()>0 do qu.pool:get():close() end return qu.pool:close() end,
}

----------------------------------------
----- connect / disconnect ---
--
local function disconnect(t) t.queue:close() t.queue.conn_string = nil return t end
local function connect(opts)
local conn_string = '' -- we will build queue string here
for param,default in pairs(SQL_PARAMS) do conn_string = (opts[param] or default) and string.format("%s%s='%s' ", conn_string, param, opts[param] or default) or conn_string end --
--
-- return result if succeed, else -- return false and error --
return (state > 0) and unpack(result) or false, err, err and this.error(err)
end


--------------------------------------------------
---
local queue = {
-- get connection from pool --
get = function(this) --
local conn = this.pool:get() or driver.connect(this.conn_str) or error 'db link broken'
return conn, conn:active() and conn:execute("begin; reset all; commit;")
end,
-- return connection to pool --
put = function(this, conn) --
this.pool:put(conn and conn:active() ~= nil and conn)
end,
-- close all connections --
close = function(this) --
while not this.pool:is_empty() do this.pool:get():close() end
this.pool:close()
this.conn_str = nil
end,
}

-----------------------------------------------------------------------------------------------------------------
------------------------------------------------------------------------
local function connect(cfg)
local queue = setmetatable({pool=fiber.channel(opts.connections or 1), conn_string=conn_string, timeout=opts.timeout or 10},{__index = queue_t}) -- make queue
for n=1,opts.connections or 1 do local conn = driver.connect(conn_string) or error 'failed to establish db link' queue.pool:put(conn) end -- add connections
--
local conn_str = ''
for _,param in pairs{'host','port','user','password','dbname','sslmode'} do conn_str = cfg[param] and conn_str .. string.format("%s='%s' ", param, cfg[param]) or conn_str end
--
local pool_sz = (tonumber(cfg.connections) and cfg.connections>0) and cfg.connections or 1
local pool = fiber.channel(pool_sz)
for i=1,pool_sz do
local conn = driver.connect(conn_str)
if conn then pool:put(conn) end
end
if pool:count() == 0 then error "database connection error" end
--
return setmetatable({queue={pool=pool,conn_str=conn_str,get=queue.get,put=queue.put},close=queue.close,prepare=prepare,query=query,error=driver.noop,debug=driver.noop}, {__call=query})
return (queue.pool:count() > 0) and setmetatable({queue=queue, close=disconnect, query=query, prepare=prepare}, {__call=query}) -- db link object
end


--
return { connect = connect, types = types }
return { connect=connect, types=types, catch=handlers }

+ 6
- 5
pg_tarantool/types/init.lua View File

@@ -40,14 +40,15 @@ _M.decode = function(var, id)
return (enum[id] or enum.default)(var)
end
_M.encode = function(var, escape)
local escape = type(escape) == 'function' and escape or function(s) return "'"..s.."'" end
local id = ( type(var) == 'table' and #var == 0 and next(var) ) and 'json' or type(var)
return (enum[id] or enum.default)(var, escape)
escape = (type(escape) == 'function') and escape or function(s) return "'"..s.."'" end
local type_ = ( type(var) == 'table' and #var == 0 and next(var) ) and 'json' or type(var)
return (enum[type_] or enum.default)(var, escape)
end
_M.parse = function(arr, escape)
_M.parse = function(arr, escape, as_text)
local r = {}
for i=1,#arr do r[i] = _M.encode(arr[i], escape) end
return r
--
return (as_text) and (#r>0 and '('..table.concat(r,',')..')' or '') or r
end

-- return pg-tarantool types handler --

Loading…
Cancel
Save