PostgreSQL native library for Tarantool programming language
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

init.lua 6.2KB


  1. -- init.lua (internal file)
  2. local fiber = require('fiber')
  3. local driver = require('pg.driver')
  4. local ffi = require('ffi')
  5. local pool_mt
  6. local conn_mt
  7. --create a new connection
  8. local function conn_create(pg_conn)
  9. local queue = fiber.channel(1)
  10. queue:put(true)
  11. local conn = setmetatable({
  12. usable = true,
  13. conn = pg_conn,
  14. queue = queue,
  15. }, conn_mt)
  16. return conn
  17. end
  18. -- get connection from pool
  19. local function conn_get(pool)
  20. local pg_conn = pool.queue:get()
  21. local status
  22. if pg_conn == nil then
  23. status, pg_conn = driver.connect(pool.conn_string)
  24. if status < 0 then
  25. return error(pg_conn)
  26. end
  27. end
  28. local conn = conn_create(pg_conn, pool)
  29. conn.__gc_hook = ffi.gc(ffi.new('void *'),
  30. function(self)
  31. pg_conn:close()
  32. pool.queue:put(nil)
  33. end)
  34. return conn
  35. end
  36. local function conn_put(conn)
  37. local pgconn = conn.conn
  38. ffi.gc(conn.__gc_hook, nil)
  39. if not conn.queue:get() then
  40. conn.usable = false
  41. return nil
  42. end
  43. conn.usable = false
  44. return pgconn
  45. end
  46. conn_mt = {
  47. __index = {
  48. execute = function(self, sql, ...)
  49. if not self.usable then
  50. return get_error(self.raise.pool, 'Connection is not usable')
  51. end
  52. if not self.queue:get() then
  53. self.queue:put(false)
  54. return get_error(self.raise.pool, 'Connection is broken')
  55. end
  56. local status, datas = self.conn:execute(sql, ...)
  57. if status ~= 0 then
  58. self.queue:put(status > 0)
  59. return error(datas)
  60. end
  61. self.queue:put(true)
  62. return datas, true
  63. end,
  64. begin = function(self)
  65. return self:execute('BEGIN') ~= nil
  66. end,
  67. commit = function(self)
  68. return self:execute('COMMIT') ~= nil
  69. end,
  70. rollback = function(self)
  71. return self:execute('ROLLBACK') ~= nil
  72. end,
  73. ping = function(self)
  74. local status, data, msg = pcall(self.execute, self, 'SELECT 1 AS code')
  75. return msg and data[1][1].code == 1
  76. end,
  77. close = function(self)
  78. if not self.usable then
  79. return error('Connection is not usable')
  80. end
  81. if not self.queue:get() then
  82. self.queue:put(false)
  83. return error('Connection is broken')
  84. end
  85. self.usable = false
  86. self.conn:close()
  87. self.queue:put(false)
  88. return true
  89. end,
  90. active = function(self)
  91. if not self.usable then
  92. return get_error(self.raise.pool, 'Connection is not usable')
  93. end
  94. if not self.queue:get() then
  95. self.queue:put(false)
  96. return get_error(self.raise.pool, 'Connection is broken')
  97. end
  98. local status, msg = self.conn:active()
  99. if status ~= 1 then
  100. self.queue:put(false)
  101. return get_error(self.raise.pool, msg)
  102. end
  103. self.queue:put(true)
  104. return msg
  105. end
  106. }
  107. }
  108. local function build_conn_string(opts)
  109. if opts.conn_string then
  110. return opts.conn_string
  111. end
  112. local connb = {}
  113. if opts.host then
  114. table.insert(connb, string.format(" host='%s'", opts.host))
  115. end
  116. if opts.port then
  117. table.insert(connb, string.format(" port='%s'", opts.port))
  118. end
  119. if opts.user then
  120. table.insert(connb, string.format(" user='%s'", opts.user))
  121. end
  122. if opts.pass or opts.password then
  123. table.insert(connb, string.format(" password='%s'",
  124. opts.pass or opts.password))
  125. end
  126. if opts.db then
  127. table.insert(connb, string.format(" dbname='%s'", opts.db))
  128. end
  129. return table.concat(connb)
  130. end
  131. -- Create connection pool. Accepts pg connection params (host, port, user,
  132. -- password, dbname) separatelly or in one string, size and raise flag.
  133. local function pool_create(opts)
  134. opts = opts or {}
  135. local conn_string = build_conn_string(opts)
  136. opts.size = opts.size or 1
  137. local queue = fiber.channel(opts.size)
  138. for i = 1, opts.size do
  139. local status, conn = driver.connect(conn_string)
  140. if status < 0 then
  141. while queue:count() > 0 do
  142. local pg_conn = queue:get()
  143. pg_conn:close()
  144. end
  145. if status < 0 then
  146. return error(conn)
  147. end
  148. end
  149. queue:put(conn)
  150. end
  151. return setmetatable({
  152. -- connection variables
  153. host = opts.host,
  154. port = opts.port,
  155. user = opts.user,
  156. pass = opts.pass,
  157. db = opts.db,
  158. size = opts.size,
  159. conn_string = conn_string,
  160. -- private variables
  161. queue = queue,
  162. usable = true
  163. }, pool_mt)
  164. end
  165. -- Close pool
  166. local function pool_close(self)
  167. self.usable = false
  168. for i = 1, self.size do
  169. local pg_conn = self.queue:get()
  170. if pg_conn ~= nil then
  171. pg_conn:close()
  172. end
  173. end
  174. end
  175. -- Returns connection
  176. local function pool_get(self)
  177. if not self.usable then
  178. return get_error(self.raise, 'Pool is not usable')
  179. end
  180. local conn = conn_get(self)
  181. local reset_sql = 'BEGIN; RESET ALL; COMMIT;'
  182. if conn:active() then
  183. reset_sql = 'ROLLBACK; ' .. reset_sql
  184. end
  185. conn:execute(reset_sql)
  186. return conn
  187. end
  188. -- Free binded connection
  189. local function pool_put(self, conn)
  190. if conn.usable then
  191. self.queue:put(conn_put(conn))
  192. end
  193. end
  194. pool_mt = {
  195. __index = {
  196. get = pool_get;
  197. put = pool_put;
  198. close = pool_close;
  199. }
  200. }
  201. -- Create connection. Accepts pg connection params (host, port, user,
  202. -- password, dbname) separatelly or in one string and raise flag.
  203. local function connect(opts)
  204. opts = opts or {}
  205. local conn_string = build_conn_string(opts)
  206. local status, pg_conn = driver.connect(conn_string)
  207. if status < 0 then
  208. return error(pg_conn)
  209. end
  210. return conn_create(pg_conn)
  211. end
  212. return {
  213. connect = connect;
  214. pool_create = pool_create;
  215. }