PostgreSQL native library for Tarantool programming language
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

init.lua 4.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. -- init.lua (internal file)
  2. local fiber = require('fiber')
  3. local ffi = require('ffi')
  4. local types = require('pg_tarantool.types')
  5. local driver = require('pg_tarantool.driver')
  6. driver.decode = types.decode
  7. local noop = function() end
  8. -----------------------------------------------------------------------------------------------------------------
  9. -- postgresql prepared statements --
  10. --
  11. local function prepare(this, sql, ...)
  12. local conn, uuid, result, state, err
  13. conn = this:get() -- get connection from pool --
  14. uuid = "pg_tarantool_" .. conn:uuid() -- create uuid for prepared statement --
  15. sql = string.format("prepare %s as (%s)", uuid, sql, ...)
  16. result, state, err = conn:execute(sql) -- create prepared statement --
  17. this.debug(sql, state, result, err) -- debug function --
  18. if state < 0 then this:put(state > -127 and conn); return false, err, this.error(err) end -- failed to prepare statement -- return false --
  19. -- our statement --
  20. statement = {
  21. conn = conn,
  22. uuid = uuid,
  23. is_persist = false,
  24. persist = function(stmt, set)
  25. stmt.is_persist = (set == true) and true or false
  26. return stmt
  27. end,
  28. close = function(stmt, persist)
  29. if persist == true or not stmt.conn then return stmt end
  30. local _, state = stmt.conn:execute("deallocate "..stmt.uuid)
  31. stmt.conn, stmt.execute = this:put(state > -127 and stmt.conn), nil
  32. end,
  33. execute = function(stmt, ...)
  34. local result, state, err
  35. stmt.args = types.parse({...}, function(s) return stmt.conn:quote(s) end)
  36. stmt.query = string.format(#stmt.args>0 and "execute %s (%s)" or "execute %s", stmt.uuid, table.concat(stmt.args, ','))
  37. result, state, err = stmt.conn:execute(stmt.query)
  38. this.debug(stmt.query, state, result, err)
  39. stmt:close(stmt.is_persist == true)
  40. return (state > 0) and unpack(result) or false, err, this.error(err)
  41. end,
  42. }
  43. --
  44. return setmetatable(statement, {__call = statement.execute})
  45. end
  46. -----------------------------------------------------------------------------------------------------------------
  47. -- get first available connection from connection pool and execute a query --
  48. --
  49. local function query(this, sql, ...)
  50. local conn, params, result, state, err
  51. conn = this:get() -- get connection from pool --
  52. params = types.parse({...}, function(s) return conn:quote(s) end) -- escape params --
  53. sql = string.format(sql, unpack(params)) -- format query --
  54. result, state, err = conn:execute(sql) -- execute query --
  55. this.debug(sql, state, result, state) -- debug function --
  56. this:put(state > -127 and conn) -- put connection to pool --
  57. -- return result if succeed, else -- return false and error --
  58. return (state > 0) and unpack(result) or false, err, this.error(err)
  59. end
  60. -----------------------------------------------------------------------------------------------------------------
  61. -- internal functions for: --
  62. -- - get connection from pool-
  63. local function get(this)
  64. local conn = this.queue:get() or driver.connect(this.conn_string) or error 'db link broken'
  65. if conn:active() then conn:execute('begin; reset all; commit;') end
  66. return conn
  67. end
  68. -- - return connection to pool --
  69. local function put(this, conn)
  70. this.queue:put(conn)
  71. return nil
  72. end
  73. -- - close connection or whole pool --
  74. local function close(this, conn)
  75. if conn then conn:close() else while this.queue:count() > 0 do this:get():close() end end
  76. return nil
  77. end
  78. -----------------------------------------------------------------------------------------------------------------
  79. -- create connection pool. Accepts pg connection params (host, port, user,
  80. -- password, dbname) separatelly or in one string, connections ( pool size or 0 if single ) and raise flag.
  81. local function connect(cfg)
  82. --
  83. cfg.conn_string = ''
  84. for _,param in pairs{'host','port','user','password','dbname','sslmode'} do
  85. cfg.conn_string = cfg[param] and cfg.conn_string .. string.format("%s='%s' ", param, cfg[param]) or conn_string
  86. end
  87. --
  88. local queue = fiber.channel(cfg.connections or 1)
  89. for i = 1, (cfg.connections or 1) do
  90. local conn, status = driver.connect(cfg.conn_string)
  91. if status >= 0 then queue:put(conn) end
  92. end
  93. --
  94. if queue:count() == 0 then return error 'failed to establish db link' end
  95. return setmetatable({ conn_string = cfg.conn_string, queue = queue, get = get, put = put, prepare = prepare, query = query, close = close, error = noop, debug = noop }, {__call = query})
  96. end
  97. --
  98. return { connect = connect, types = types }