Tarantool nginx upstream module
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

ngx_http_tnt_module.c 31KB


  1. /*
  2. * Redistribution and use in source and binary forms, with or
  3. * without modification, are permitted provided that the following
  4. * conditions are met:
  5. *
  6. * 1. Redistributions of source code must retain the above
  7. * copyright notice, this list of conditions and the
  8. * following disclaimer.
  9. *
  10. * 2. Redistributions in binary form must reproduce the above
  11. * copyright notice, this list of conditions and the following
  12. * disclaimer in the documentation and/or other materials
  13. * provided with the distribution.
  14. *
  15. * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
  16. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
  17. * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  18. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
  19. * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
  20. * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  21. * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  22. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
  23. * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
  24. * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  25. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
  26. * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  27. * SUCH DAMAGE.
  28. *
  29. * Copyright (C) 2015 Tarantool AUTHORS:
  30. * please see AUTHORS file.
  31. */
  32. #include <ngx_config.h>
  33. #include <ngx_core.h>
  34. #include <ngx_http.h>
  35. #include <tp_transcode.h>
  36. #include <debug.h>
  37. enum ctx_state {
  38. OK = 0,
  39. INPUT_JSON_PARSE_FAILED,
  40. INPUT_TO_LARGE,
  41. INPUT_EMPTY,
  42. READ_PAYLOAD,
  43. READ_BODY,
  44. SEND_REPLY
  45. };
  46. typedef struct ngx_http_tnt_error {
  47. const ngx_str_t msg;
  48. int code;
  49. } ngx_http_tnt_error_t;
  50. typedef struct {
  51. ngx_http_upstream_conf_t upstream;
  52. ngx_int_t index;
  53. size_t in_multiplier;
  54. size_t out_multiplier;
  55. } ngx_http_tnt_loc_conf_t;
  56. typedef struct {
  57. struct {
  58. u_char mem[6];
  59. u_char *p, *e;
  60. } payload;
  61. enum ctx_state state;
  62. ngx_buf_t *in_err, *tp_cache;
  63. ssize_t rest, payload_size;
  64. int rest_batch_size, batch_size;
  65. ngx_int_t greeting:1;
  66. } ngx_http_tnt_ctx_t;
  67. /** Pre-filter functions
  68. */
  69. static inline ngx_http_tnt_ctx_t * ngx_http_tnt_create_ctx(
  70. ngx_http_request_t *r);
  71. static inline void ngx_http_tnt_reset_ctx(ngx_http_tnt_ctx_t *ctx);
  72. static inline ngx_buf_t* ngx_http_set_err(ngx_http_request_t *r,
  73. int errcode, const u_char *msg, size_t msglen);
  74. static inline ngx_int_t ngx_http_tnt_output_err(ngx_http_request_t *r,
  75. ngx_http_tnt_ctx_t *ctx, ngx_int_t code);
  76. /** Filter functions
  77. */
  78. static inline ngx_int_t ngx_http_tnt_read_greeting(ngx_http_request_t *r,
  79. ngx_http_tnt_ctx_t *ctx, ngx_buf_t *b);
  80. static ngx_int_t ngx_http_tnt_send_reply(ngx_http_request_t *r,
  81. ngx_http_upstream_t *u, ngx_http_tnt_ctx_t *ctx);
  82. static ngx_int_t ngx_http_tnt_filter_reply(ngx_http_request_t *r,
  83. ngx_http_upstream_t *u, ngx_buf_t *b);
  84. /** Rest
  85. */
  86. static inline void ngx_http_tnt_cleanup(ngx_http_request_t *r);
  87. static inline ngx_buf_t * ngx_http_tnt_create_mem_buf(ngx_http_request_t *r,
  88. ngx_http_upstream_t *u, size_t size);
  89. static inline ngx_int_t ngx_http_tnt_output(ngx_http_request_t *r,
  90. ngx_http_upstream_t *u, ngx_buf_t *b);
  91. /** Ngx handlers
  92. */
  93. static ngx_int_t ngx_http_tnt_create_request(ngx_http_request_t *r);
  94. static ngx_int_t ngx_http_tnt_reinit_request(ngx_http_request_t *r);
  95. static ngx_int_t ngx_http_tnt_process_header(ngx_http_request_t *r);
  96. static ngx_int_t ngx_http_tnt_filter_init(void *data);
  97. static ngx_int_t ngx_http_tnt_filter(void *data, ssize_t bytes);
  98. static void ngx_http_tnt_abort_request(ngx_http_request_t *r);
  99. static void ngx_http_tnt_finalize_request(ngx_http_request_t *r, ngx_int_t rc);
  100. static void *ngx_http_tnt_create_loc_conf(ngx_conf_t *cf);
  101. static char *ngx_http_tnt_merge_loc_conf(ngx_conf_t *cf,
  102. void *parent, void *child);
  103. static char *ngx_http_tnt_pass(ngx_conf_t *cf, ngx_command_t *cmd,
  104. void *conf);
  105. static const ngx_http_tnt_error_t errors[] = {
  106. { ngx_string("Request too large, consider increasing your "
  107. "server's setting 'client_body_buffer_size'"),
  108. -32001
  109. },
  110. { ngx_string("Unknown parse error"),
  111. -32002
  112. }
  113. };
  114. enum ngx_http_tnt_err_messages_idx {
  115. REQUEST_TOO_LARGE = 0,
  116. UNKNOWN_PARSE_ERROR = 1
  117. };
  118. static size_t OVERHEAD = sizeof("{"
  119. "'error': {"
  120. "'code':-XXXXX,"
  121. "'message':''"
  122. "},"
  123. "'result':{},"
  124. "'id':4294967295"
  125. "}") - 1;
  126. static ngx_conf_bitmask_t ngx_http_tnt_next_upstream_masks[] = {
  127. { ngx_string("error"), NGX_HTTP_UPSTREAM_FT_ERROR },
  128. { ngx_string("timeout"), NGX_HTTP_UPSTREAM_FT_TIMEOUT },
  129. { ngx_string("invalid_response"), NGX_HTTP_UPSTREAM_FT_INVALID_HEADER },
  130. { ngx_string("off"), NGX_HTTP_UPSTREAM_FT_OFF },
  131. { ngx_null_string, 0 }
  132. };
  133. static ngx_command_t ngx_http_tnt_commands[] = {
  134. { ngx_string("tnt_pass"),
  135. NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF|NGX_CONF_TAKE1,
  136. ngx_http_tnt_pass,
  137. NGX_HTTP_LOC_CONF_OFFSET,
  138. 0,
  139. NULL },
  140. { ngx_string("tnt_connect_timeout"),
  141. NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
  142. ngx_conf_set_msec_slot,
  143. NGX_HTTP_LOC_CONF_OFFSET,
  144. offsetof(ngx_http_tnt_loc_conf_t, upstream.connect_timeout),
  145. NULL },
  146. { ngx_string("tnt_send_timeout"),
  147. NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
  148. ngx_conf_set_msec_slot,
  149. NGX_HTTP_LOC_CONF_OFFSET,
  150. offsetof(ngx_http_tnt_loc_conf_t, upstream.send_timeout),
  151. NULL },
  152. { ngx_string("tnt_read_timeout"),
  153. NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
  154. ngx_conf_set_msec_slot,
  155. NGX_HTTP_LOC_CONF_OFFSET,
  156. offsetof(ngx_http_tnt_loc_conf_t, upstream.read_timeout),
  157. NULL },
  158. { ngx_string("tnt_buffer_size"),
  159. NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
  160. ngx_conf_set_size_slot,
  161. NGX_HTTP_LOC_CONF_OFFSET,
  162. offsetof(ngx_http_tnt_loc_conf_t, upstream.buffer_size),
  163. NULL },
  164. { ngx_string("tnt_next_upstream"),
  165. NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_1MORE,
  166. ngx_conf_set_bitmask_slot,
  167. NGX_HTTP_LOC_CONF_OFFSET,
  168. offsetof(ngx_http_tnt_loc_conf_t, upstream.next_upstream),
  169. &ngx_http_tnt_next_upstream_masks },
  170. { ngx_string("tnt_in_multiplier"),
  171. NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
  172. ngx_conf_set_size_slot,
  173. NGX_HTTP_LOC_CONF_OFFSET,
  174. offsetof(ngx_http_tnt_loc_conf_t, in_multiplier),
  175. NULL },
  176. { ngx_string("tnt_out_multiplier"),
  177. NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
  178. ngx_conf_set_size_slot,
  179. NGX_HTTP_LOC_CONF_OFFSET,
  180. offsetof(ngx_http_tnt_loc_conf_t, out_multiplier),
  181. NULL },
  182. ngx_null_command
  183. };
  184. static ngx_http_module_t ngx_http_tnt_module_ctx = {
  185. NULL, /* preconfiguration */
  186. NULL, /* postconfiguration */
  187. NULL, /* create main configuration */
  188. NULL, /* init main configuration */
  189. NULL, /* create server configuration */
  190. NULL, /* merge server configuration */
  191. ngx_http_tnt_create_loc_conf, /* create location configuration */
  192. ngx_http_tnt_merge_loc_conf /* merge location configuration */
  193. };
  194. ngx_module_t ngx_http_tnt_module = {
  195. NGX_MODULE_V1,
  196. &ngx_http_tnt_module_ctx, /* module context */
  197. ngx_http_tnt_commands, /* module directives */
  198. NGX_HTTP_MODULE, /* module type */
  199. NULL, /* init master */
  200. NULL, /* init module */
  201. NULL, /* init process */
  202. NULL, /* init thread */
  203. NULL, /* exit thread */
  204. NULL, /* exit process */
  205. NULL, /* exit master */
  206. NGX_MODULE_V1_PADDING
  207. };
  208. /** Ngx handlers
  209. */
  210. static ngx_int_t
  211. ngx_http_tnt_handler(ngx_http_request_t *r)
  212. {
  213. ngx_int_t rc;
  214. ngx_http_upstream_t *u;
  215. ngx_http_tnt_loc_conf_t *tlcf;
  216. if (!(r->method & NGX_HTTP_POST)) {
  217. return NGX_HTTP_NOT_ALLOWED;
  218. }
  219. if (ngx_http_set_content_type(r) != NGX_OK
  220. || ngx_http_upstream_create(r) != NGX_OK)
  221. {
  222. return NGX_HTTP_INTERNAL_SERVER_ERROR;
  223. }
  224. u = r->upstream;
  225. ngx_str_set(&u->schema, "tnt://");
  226. u->output.tag = (ngx_buf_tag_t) &ngx_http_tnt_module;
  227. tlcf = ngx_http_get_module_loc_conf(r, ngx_http_tnt_module);
  228. u->conf = &tlcf->upstream;
  229. u->create_request = ngx_http_tnt_create_request;
  230. u->reinit_request = ngx_http_tnt_reinit_request;
  231. u->process_header = ngx_http_tnt_process_header;
  232. u->abort_request = ngx_http_tnt_abort_request;
  233. u->finalize_request = ngx_http_tnt_finalize_request;
  234. u->input_filter_init = ngx_http_tnt_filter_init;
  235. u->input_filter = ngx_http_tnt_filter;
  236. u->input_filter_ctx = r;
  237. u->length = 0;
  238. u->state = 0;
  239. rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);
  240. if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
  241. return rc;
  242. }
  243. return NGX_DONE;
  244. }
  245. static ngx_int_t
  246. ngx_http_tnt_create_request(ngx_http_request_t *r)
  247. {
  248. ngx_buf_t *b;
  249. ngx_chain_t *body;
  250. size_t complete_msg_size;
  251. tp_transcode_t tc;
  252. ngx_http_tnt_ctx_t *ctx;
  253. ngx_chain_t *out_chain;
  254. ngx_http_tnt_loc_conf_t *tlcf;
  255. if (r->headers_in.content_length_n == 0) {
  256. /** XXX
  257. * Probably, this case we should handle like 'NOT ALLOWED'?
  258. */
  259. ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "empty body");
  260. return NGX_ERROR;
  261. }
  262. ctx = ngx_http_tnt_create_ctx(r);
  263. if (ctx == NULL) {
  264. return NGX_ERROR;
  265. }
  266. tlcf = ngx_http_get_module_loc_conf(r, ngx_http_tnt_module);
  267. out_chain = ngx_alloc_chain_link(r->pool);
  268. if (out_chain == NULL) {
  269. return NGX_ERROR;
  270. }
  271. out_chain->buf = ngx_create_temp_buf(r->pool,
  272. r->headers_in.content_length_n * tlcf->in_multiplier);
  273. if (out_chain->buf == NULL) {
  274. crit("[BUG?] failed to allocate output buffer, size %ui",
  275. r->headers_in.content_length_n * tlcf->in_multiplier);
  276. return NGX_ERROR;
  277. }
  278. out_chain->next = NULL;
  279. out_chain->buf->memory = 1;
  280. out_chain->buf->flush = 1;
  281. out_chain->buf->pos = out_chain->buf->start;
  282. out_chain->buf->last = out_chain->buf->pos;
  283. out_chain->buf->last_in_chain = 1;
  284. /**
  285. * Conv. input json to Tarantool message [
  286. */
  287. if (tp_transcode_init(&tc,
  288. (char *)out_chain->buf->start,
  289. out_chain->buf->end - out_chain->buf->start,
  290. YAJL_JSON_TO_TP,
  291. NULL)
  292. == TP_TRANSCODE_ERROR)
  293. {
  294. crit("[BUG] failed to call tp_transcode_init(input)");
  295. return NGX_ERROR;
  296. }
  297. for (body = r->upstream->request_bufs; body; body = body->next) {
  298. if (body->buf->in_file) {
  299. ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
  300. "tnt: in-file buffer found. aborted. "
  301. "consider increasing your 'client_body_buffer_size' "
  302. "setting");
  303. const ngx_http_tnt_error_t *e = &errors[REQUEST_TOO_LARGE];
  304. ctx->in_err = ngx_http_set_err(r, e->code,
  305. e->msg.data, e->msg.len);
  306. if (ctx->in_err == NULL) {
  307. goto error_exit;
  308. }
  309. ctx->state = INPUT_TO_LARGE;
  310. goto read_input_done;
  311. } else {
  312. b = body->buf;
  313. }
  314. if (tp_transcode(&tc, (char *)b->pos, b->last - b->pos)
  315. == TP_TRANSCODE_ERROR)
  316. {
  317. ctx->in_err = ngx_http_set_err(r, tc.errcode,
  318. (u_char *)tc.errmsg,
  319. ngx_strlen(tc.errmsg));
  320. if (ctx->in_err == NULL) {
  321. goto error_exit;
  322. }
  323. ctx->state = INPUT_JSON_PARSE_FAILED;
  324. goto read_input_done;
  325. }
  326. }
  327. if (tp_transcode_complete(&tc, &complete_msg_size) == TP_TRANSCODE_OK) {
  328. out_chain->buf->last = out_chain->buf->start + complete_msg_size;
  329. if (tc.batch_size > 1) {
  330. ctx->rest_batch_size = ctx->batch_size = tc.batch_size;
  331. }
  332. dd("ctx->batch_size:%i, tc.batch_size:%i, complete_msg_size:%i",
  333. ctx->batch_size,
  334. tc.batch_size,
  335. (int)complete_msg_size);
  336. } else {
  337. dd("[input] failed to complete");
  338. ctx->in_err = ngx_http_set_err(r, tc.errcode,
  339. (u_char *)tc.errmsg,
  340. ngx_strlen(tc.errmsg));
  341. if (ctx->in_err == NULL) {
  342. goto error_exit;
  343. }
  344. ctx->state = INPUT_JSON_PARSE_FAILED;
  345. goto read_input_done;
  346. }
  347. /** ]
  348. */
  349. read_input_done:
  350. tp_transcode_free(&tc);
  351. /**
  352. * Hooking output chain
  353. */
  354. r->upstream->request_bufs = out_chain;
  355. return NGX_OK;
  356. error_exit:
  357. tp_transcode_free(&tc);
  358. return NGX_ERROR;
  359. }
  360. static ngx_int_t
  361. ngx_http_tnt_reinit_request(ngx_http_request_t *r)
  362. {
  363. dd("reinit connection with Tarantool...");
  364. ngx_http_tnt_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_tnt_module);
  365. if (ctx == NULL) {
  366. return NGX_OK;
  367. }
  368. ngx_http_tnt_reset_ctx(ctx);
  369. return NGX_OK;
  370. }
  371. static ngx_int_t
  372. ngx_http_tnt_process_header(ngx_http_request_t *r)
  373. {
  374. ngx_http_upstream_t *u = r->upstream;
  375. ngx_buf_t *b = &r->upstream->buffer;
  376. ngx_http_tnt_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_tnt_module);
  377. ngx_int_t rc;
  378. dd("process_header-> greeting: '%s', recv: %i",
  379. ctx->greeting ? "yes" : "no",
  380. (int)(b->last - b->pos));
  381. if (!ctx->greeting) {
  382. rc = ngx_http_tnt_read_greeting(r, ctx, b);
  383. if (rc == NGX_ERROR) {
  384. return rc;
  385. /**
  386. * If ctx->state is not OK we did not sent request to Tarantool
  387. * backend but we still must handle ctx->state at this stage --
  388. * so just ignore NGX_AGAIN and pass to next handler.
  389. */
  390. } else if (rc == NGX_AGAIN && ctx->state == OK) {
  391. return rc;
  392. }
  393. }
  394. switch (ctx->state) {
  395. case OK:
  396. break;
  397. case INPUT_TO_LARGE:
  398. case INPUT_JSON_PARSE_FAILED:
  399. case INPUT_EMPTY:
  400. return ngx_http_tnt_output_err(r, ctx, NGX_HTTP_BAD_REQUEST);
  401. default:
  402. crit("[BUG] unexpected ctx->stage(%i)", ctx->state);
  403. return NGX_ERROR;
  404. }
  405. /*
  406. * At this stage we can't get full upstream size,
  407. * since Tarantool could send to us 1 upto N messages
  408. * where each of messages could have X size.
  409. *
  410. * As fix -- just set each upstream mode to chunked.
  411. */
  412. u->headers_in.chunked = 1;
  413. u->headers_in.status_n = 200;
  414. u->state->status = 200;
  415. return NGX_OK;
  416. }
  417. static ngx_int_t
  418. ngx_http_tnt_filter_init(void *data)
  419. {
  420. dd("init filter");
  421. ngx_http_request_t *r = data;
  422. ngx_http_upstream_t *u = r->upstream;
  423. ngx_http_tnt_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_tnt_module);
  424. ctx->state = READ_PAYLOAD;
  425. ctx->payload_size = ctx->rest = 0;
  426. if (u->headers_in.status_n != 200) {
  427. u->length = 0;
  428. } else {
  429. u->length = -1;
  430. }
  431. return NGX_OK;
  432. }
  433. static ngx_int_t
  434. ngx_http_tnt_filter(void *data, ssize_t bytes)
  435. {
  436. dd("filter");
  437. ngx_http_request_t *r = data;
  438. ngx_http_upstream_t *u = r->upstream;
  439. ngx_buf_t *b = &u->buffer;
  440. b->last = b->last + bytes;
  441. /**
  442. *
  443. */
  444. ngx_int_t rc = NGX_OK;
  445. for (;;) {
  446. rc = ngx_http_tnt_filter_reply(r, u, b);
  447. if (rc != NGX_AGAIN) break;
  448. dd("Next message in same input buffer -- merge");
  449. }
  450. return rc;
  451. }
  452. static void
  453. ngx_http_tnt_abort_request(ngx_http_request_t *r)
  454. {
  455. dd("abort http tnt request");
  456. ngx_http_tnt_cleanup(r);
  457. }
  458. static void
  459. ngx_http_tnt_finalize_request(ngx_http_request_t *r, ngx_int_t rc)
  460. {
  461. dd("finalize http tnt request");
  462. ngx_http_tnt_cleanup(r);
  463. }
  464. static void *
  465. ngx_http_tnt_create_loc_conf(ngx_conf_t *cf)
  466. {
  467. ngx_http_tnt_loc_conf_t *conf;
  468. conf = ngx_pcalloc(cf->pool, sizeof(ngx_http_tnt_loc_conf_t));
  469. if (conf == NULL) {
  470. return NULL;
  471. }
  472. /*
  473. * set by ngx_pcalloc():
  474. *
  475. * conf->upstream.bufs.num = 0;
  476. * conf->upstream.next_upstream = 0;
  477. * conf->upstream.temp_path = NULL;
  478. * conf->upstream.uri = { 0, NULL };
  479. * conf->upstream.location = NULL;
  480. */
  481. conf->upstream.local = NGX_CONF_UNSET_PTR;
  482. conf->upstream.connect_timeout =
  483. conf->upstream.send_timeout =
  484. conf->upstream.read_timeout = NGX_CONF_UNSET_MSEC;
  485. conf->upstream.buffer_size =
  486. conf->in_multiplier =
  487. conf->out_multiplier = NGX_CONF_UNSET_SIZE;
  488. /*
  489. * The hardcoded values
  490. */
  491. conf->upstream.cyclic_temp_file = 0;
  492. conf->upstream.buffering = 0;
  493. conf->upstream.ignore_client_abort = 0;
  494. conf->upstream.send_lowat = 0;
  495. conf->upstream.bufs.num = 0;
  496. conf->upstream.busy_buffers_size = 0;
  497. conf->upstream.max_temp_file_size = 0;
  498. conf->upstream.temp_file_write_size = 0;
  499. conf->upstream.intercept_errors = 1;
  500. conf->upstream.intercept_404 = 1;
  501. conf->upstream.pass_request_headers = 0;
  502. conf->upstream.pass_request_body = 0;
  503. conf->index = NGX_CONF_UNSET;
  504. return conf;
  505. }
  506. static char *
  507. ngx_http_tnt_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
  508. {
  509. ngx_http_tnt_loc_conf_t *prev = parent;
  510. ngx_http_tnt_loc_conf_t *conf = child;
  511. ngx_conf_merge_ptr_value(conf->upstream.local,
  512. prev->upstream.local, NULL);
  513. ngx_conf_merge_msec_value(conf->upstream.connect_timeout,
  514. prev->upstream.connect_timeout, 60000);
  515. ngx_conf_merge_msec_value(conf->upstream.send_timeout,
  516. prev->upstream.send_timeout, 60000);
  517. ngx_conf_merge_msec_value(conf->upstream.read_timeout,
  518. prev->upstream.read_timeout, 60000);
  519. ngx_conf_merge_size_value(conf->upstream.buffer_size,
  520. prev->upstream.buffer_size,
  521. (size_t) ngx_pagesize);
  522. ngx_conf_merge_bitmask_value(conf->upstream.next_upstream,
  523. prev->upstream.next_upstream,
  524. (NGX_CONF_BITMASK_SET
  525. |NGX_HTTP_UPSTREAM_FT_ERROR
  526. |NGX_HTTP_UPSTREAM_FT_TIMEOUT));
  527. if (conf->upstream.next_upstream & NGX_HTTP_UPSTREAM_FT_OFF) {
  528. conf->upstream.next_upstream = NGX_CONF_BITMASK_SET
  529. |NGX_HTTP_UPSTREAM_FT_OFF;
  530. }
  531. if (conf->upstream.upstream == NULL) {
  532. conf->upstream.upstream = prev->upstream.upstream;
  533. }
  534. ngx_conf_merge_size_value(conf->in_multiplier, prev->in_multiplier, 2);
  535. ngx_conf_merge_size_value(conf->out_multiplier, prev->out_multiplier, 2);
  536. return NGX_CONF_OK;
  537. }
  538. static char *
  539. ngx_http_tnt_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
  540. {
  541. ngx_http_tnt_loc_conf_t *mlcf = conf;
  542. ngx_str_t *value;
  543. ngx_url_t u;
  544. ngx_http_core_loc_conf_t *clcf;
  545. if (mlcf->upstream.upstream) {
  546. return "is duplicate";
  547. }
  548. value = cf->args->elts;
  549. ngx_memzero(&u, sizeof(ngx_url_t));
  550. u.url = value[1];
  551. u.no_resolve = 1;
  552. mlcf->upstream.upstream = ngx_http_upstream_add(cf, &u, 0);
  553. if (mlcf->upstream.upstream == NULL) {
  554. return NGX_CONF_ERROR;
  555. }
  556. clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
  557. clcf->handler = ngx_http_tnt_handler;
  558. if (clcf->name.data[clcf->name.len - 1] == '/') {
  559. clcf->auto_redirect = 1;
  560. }
  561. return NGX_CONF_OK;
  562. }
  563. /** Pre-filter functions
  564. */
  565. static inline ngx_http_tnt_ctx_t *
  566. ngx_http_tnt_create_ctx(ngx_http_request_t *r)
  567. {
  568. ngx_http_tnt_ctx_t *ctx;
  569. ctx = ngx_palloc(r->pool, sizeof(ngx_http_tnt_ctx_t));
  570. if (ctx == NULL) {
  571. return NULL;
  572. }
  573. ngx_http_tnt_reset_ctx(ctx);
  574. ngx_http_set_ctx(r, ctx, ngx_http_tnt_module);
  575. ctx->state = OK;
  576. return ctx;
  577. }
  578. static inline void
  579. ngx_http_tnt_reset_ctx(ngx_http_tnt_ctx_t *ctx)
  580. {
  581. ctx->payload.p = &ctx->payload.mem[0];
  582. ctx->payload.e = &ctx->payload.mem[sizeof(ctx->payload.mem) - 1];
  583. ctx->state = OK;
  584. ctx->in_err = ctx->tp_cache = NULL;
  585. ctx->rest = 0;
  586. ctx->payload_size = 0;
  587. ctx->rest_batch_size = 0;
  588. ctx->batch_size = 0;
  589. ctx->greeting = 0;
  590. }
  591. static inline ngx_buf_t*
  592. ngx_http_set_err(ngx_http_request_t *r,
  593. int errcode,
  594. const u_char *msg, size_t len)
  595. {
  596. const size_t msglen = len + sizeof("{"
  597. "'error':{"
  598. "'message':'',"
  599. "'code':-XXXXX"
  600. "}"
  601. "}") - 1;
  602. ngx_buf_t *b = ngx_create_temp_buf(r->pool, msglen);
  603. if (b == NULL) {
  604. return NULL;
  605. }
  606. b->memory = 1;
  607. b->pos = b->start;
  608. b->last = ngx_snprintf(b->start, msglen, "{"
  609. "\"error\":{"
  610. "\"code\":%d,"
  611. "\"message\":\"%s\""
  612. "}"
  613. "}",
  614. errcode,
  615. msg);
  616. return b;
  617. }
  618. static inline ngx_int_t
  619. ngx_http_tnt_output_err(ngx_http_request_t *r,
  620. ngx_http_tnt_ctx_t *ctx,
  621. ngx_int_t code)
  622. {
  623. ngx_http_upstream_t *u;
  624. ngx_chain_t *cl, **ll;
  625. u = r->upstream;
  626. if (ctx->in_err == NULL) {
  627. u->headers_in.status_n = 500;
  628. u->state->status = 500;
  629. u->headers_in.content_length_n = 0;
  630. return NGX_OK;
  631. }
  632. u->headers_in.status_n = code;
  633. u->state->status = code;
  634. u->headers_in.content_length_n = ctx->in_err->last - ctx->in_err->pos;
  635. u->length = 0;
  636. for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
  637. ll = &cl->next;
  638. }
  639. *ll = cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
  640. if (cl == NULL) {
  641. return NGX_ERROR;
  642. }
  643. cl->buf = ctx->in_err;
  644. cl->next = NULL;
  645. cl->buf->pos = cl->buf->start;
  646. cl->buf->end = cl->buf->last;
  647. cl->buf->flush = 1;
  648. cl->buf->memory = 1;
  649. cl->buf->tag = u->output.tag;
  650. cl->buf->last_in_chain = 1;
  651. return NGX_OK;
  652. }
  653. /** Filter functions
  654. */
  655. static inline ngx_int_t
  656. ngx_http_tnt_read_greeting(ngx_http_request_t *r,
  657. ngx_http_tnt_ctx_t *ctx,
  658. ngx_buf_t *b)
  659. {
  660. if (b->last - b->pos < 128) {
  661. crit("[BUG] Tarantool sent invalid greeting len:%i",
  662. b->last - b->pos);
  663. return NGX_AGAIN;
  664. }
  665. if (b->last - b->pos >= (ptrdiff_t)sizeof("Tarantool") - 1
  666. && b->pos[0] == 'T'
  667. && b->pos[1] == 'a'
  668. && b->pos[2] == 'r'
  669. && b->pos[3] == 'a'
  670. && b->pos[4] == 'n'
  671. && b->pos[5] == 't'
  672. && b->pos[6] == 'o'
  673. && b->pos[7] == 'o'
  674. && b->pos[8] == 'l')
  675. {
  676. b->pos = b->pos + 128;
  677. ctx->greeting = 1;
  678. /**
  679. * Sometimes Nginx reads only 'greeting'(i.e. 128 bytes) -- to avoid
  680. * side effects (inside 'init'/'filter') we must return to
  681. * 'process_header'
  682. */
  683. if (b->pos == b->last) {
  684. return NGX_AGAIN;
  685. }
  686. return NGX_OK;
  687. }
  688. crit("[BUG] Tarantool sent strange greeting: '%.*s',"
  689. " expected 'Tarantool' with len. == 128",
  690. 128, b->pos);
  691. return NGX_ERROR;
  692. }
  693. static ngx_int_t
  694. ngx_http_tnt_send_reply(ngx_http_request_t *r,
  695. ngx_http_upstream_t *u,
  696. ngx_http_tnt_ctx_t *ctx)
  697. {
  698. tp_transcode_t tc;
  699. ngx_int_t rc;
  700. ngx_http_tnt_loc_conf_t *tlcf;
  701. ngx_buf_t *output;
  702. tlcf = ngx_http_get_module_loc_conf(r, ngx_http_tnt_module);
  703. output = ngx_http_tnt_create_mem_buf(r, u,
  704. (ctx->tp_cache->end - ctx->tp_cache->start)
  705. * tlcf->out_multiplier + OVERHEAD);
  706. if (output == NULL) {
  707. return NGX_ERROR;
  708. }
  709. if (ctx->batch_size > 0
  710. && ctx->rest_batch_size == ctx->batch_size)
  711. {
  712. *output->pos = '[';
  713. ++output->pos;
  714. }
  715. rc = tp_transcode_init(&tc,
  716. (char *)output->pos, output->end - output->pos,
  717. TP_REPLY_TO_JSON,
  718. NULL);
  719. if (rc == TP_TRANSCODE_ERROR) {
  720. crit("[BUG] failed to call tp_transcode_init(output)");
  721. return NGX_ERROR;
  722. }
  723. rc = tp_transcode(&tc, (char *)ctx->tp_cache->start,
  724. ctx->tp_cache->end - ctx->tp_cache->start);
  725. if (rc == TP_TRANSCODE_OK) {
  726. size_t complete_msg_size = 0;
  727. rc = tp_transcode_complete(&tc, &complete_msg_size);
  728. if (rc == TP_TRANSCODE_ERROR) {
  729. crit("[BUG] failed to complete output transcoding");
  730. ngx_pfree(r->pool, output);
  731. const ngx_http_tnt_error_t *e = &errors[UNKNOWN_PARSE_ERROR];
  732. output = ngx_http_set_err(r, e->code, e->msg.data, e->msg.len);
  733. if (output == NULL) {
  734. goto error_exit;
  735. }
  736. goto done;
  737. }
  738. output->last = output->pos + complete_msg_size;
  739. } else if (rc == TP_TRANSCODE_ERROR) {
  740. crit("[BUG] failed to transcode output, err: '%s'", tc.errmsg);
  741. ngx_pfree(r->pool, output);
  742. output = ngx_http_set_err(r,
  743. tc.errcode,
  744. (u_char *)tc.errmsg,
  745. ngx_strlen(tc.errmsg));
  746. if (output == NULL) {
  747. goto error_exit;
  748. }
  749. }
  750. done:
  751. tp_transcode_free(&tc);
  752. if (ctx->batch_size > 0) {
  753. if (ctx->rest_batch_size == 1)
  754. {
  755. *output->last = ']';
  756. ++output->last;
  757. }
  758. else if (ctx->rest_batch_size <= ctx->batch_size)
  759. {
  760. *output->last = ',';
  761. ++output->last;
  762. }
  763. }
  764. return ngx_http_tnt_output(r, u, output);
  765. error_exit:
  766. tp_transcode_free(&tc);
  767. return NGX_ERROR;
  768. }
  769. static ngx_int_t
  770. ngx_http_tnt_filter_reply(ngx_http_request_t *r,
  771. ngx_http_upstream_t *u,
  772. ngx_buf_t *b)
  773. {
  774. ngx_http_tnt_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_tnt_module);
  775. ssize_t bytes = b->last - b->pos;
  776. dd("filter_reply -> recv bytes: %i, rest: %i", (int)bytes, (int)ctx->rest);
  777. if (ctx->state == READ_PAYLOAD) {
  778. ssize_t payload_rest = ngx_min(ctx->payload.e - ctx->payload.p, bytes);
  779. if (payload_rest > 0) {
  780. ctx->payload.p = ngx_copy(ctx->payload.p, b->pos, payload_rest);
  781. bytes -= payload_rest;
  782. b->pos += payload_rest;
  783. payload_rest = ctx->payload.e - ctx->payload.p;
  784. dd("filter_reply -> payload rest:%i", (int)payload_rest);
  785. }
  786. if (payload_rest == 0) {
  787. ctx->payload_size = tp_read_payload((char *)&ctx->payload.mem[0],
  788. (char *)ctx->payload.e);
  789. if (ctx->payload_size <= 0) {
  790. crit("[BUG] tp_read_payload failed, ret:%i",
  791. (int)ctx->payload_size);
  792. return NGX_ERROR;
  793. }
  794. ctx->rest = ctx->payload_size - 5 /* - header size */;
  795. dd("filter_reply -> got header payload:%i, rest:%i",
  796. (int)ctx->payload_size,
  797. (int)ctx->rest);
  798. ctx->tp_cache = ngx_create_temp_buf(r->pool, ctx->payload_size);
  799. if (ctx->tp_cache == NULL) {
  800. return NGX_ERROR;
  801. }
  802. ctx->tp_cache->pos = ctx->tp_cache->start;
  803. ctx->tp_cache->memory = 1;
  804. ctx->tp_cache->pos = ngx_copy(ctx->tp_cache->pos,
  805. &ctx->payload.mem[0],
  806. sizeof(ctx->payload.mem) - 1);
  807. ctx->payload.p = &ctx->payload.mem[0];
  808. ctx->state = READ_BODY;
  809. } else {
  810. return NGX_OK;
  811. }
  812. }
  813. ngx_int_t rc = NGX_OK;
  814. if (ctx->state == READ_BODY) {
  815. ssize_t rest = ctx->rest - bytes, read_on = bytes;
  816. if (rest < 0) {
  817. rest *= -1;
  818. read_on = bytes - rest;
  819. ctx->rest = 0;
  820. ctx->state = SEND_REPLY;
  821. rc = NGX_AGAIN;
  822. } else if (rest == 0) {
  823. ctx->state = SEND_REPLY;
  824. ctx->rest = 0;
  825. } else {
  826. ctx->rest -= bytes;
  827. }
  828. ctx->tp_cache->pos = ngx_copy(ctx->tp_cache->pos, b->pos, read_on);
  829. b->pos += read_on;
  830. dd("filter_reply -> read_on:%i, rest:%i, cache rest:%i, buf size:%i",
  831. (int)read_on,
  832. (int)ctx->rest,
  833. (int)(ctx->tp_cache->end - ctx->tp_cache->pos),
  834. (int)(b->last - b->pos));
  835. }
  836. if (ctx->state == SEND_REPLY) {
  837. rc = ngx_http_tnt_send_reply(r, u, ctx);
  838. ctx->state = READ_PAYLOAD;
  839. ctx->rest = ctx->payload_size = 0;
  840. --ctx->rest_batch_size;
  841. if (ctx->rest_batch_size <= 0) {
  842. u->length = 0;
  843. ctx->rest_batch_size = 0;
  844. ctx->batch_size = 0;
  845. }
  846. ngx_pfree(r->pool, ctx->tp_cache);
  847. ctx->tp_cache = NULL;
  848. if (b->last - b->pos > 0) {
  849. rc = NGX_AGAIN;
  850. }
  851. }
  852. return rc;
  853. }
  854. /** Rest
  855. */
  856. static inline ngx_buf_t *
  857. ngx_http_tnt_create_mem_buf(ngx_http_request_t *r,
  858. ngx_http_upstream_t *u,
  859. size_t size)
  860. {
  861. ngx_buf_t *b = ngx_create_temp_buf(r->pool, size);
  862. if (b == NULL) {
  863. return NULL;
  864. }
  865. b->pos = b->start;
  866. b->memory = 1;
  867. b->flush = 1;
  868. b->tag = u->output.tag;
  869. b->last = b->end;
  870. return b;
  871. }
  872. static inline ngx_int_t
  873. ngx_http_tnt_output(ngx_http_request_t *r,
  874. ngx_http_upstream_t *u,
  875. ngx_buf_t *b)
  876. {
  877. ngx_chain_t *cl, **ll;
  878. for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
  879. ll = &cl->next;
  880. }
  881. cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
  882. if (cl == NULL) {
  883. return NGX_ERROR;
  884. }
  885. b->pos = b->start;
  886. b->flush = 1;
  887. b->last_in_chain = 1;
  888. b->tag = u->output.tag;
  889. cl->buf = b;
  890. cl->next = NULL;
  891. *ll = cl;
  892. return NGX_OK;
  893. }
  894. static inline void
  895. ngx_http_tnt_cleanup(ngx_http_request_t *r)
  896. {
  897. ngx_http_tnt_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_tnt_module);
  898. if (ctx == NULL) {
  899. crit("[BUG] nothing to cleanup");
  900. return;
  901. }
  902. ngx_pfree(r->pool, ctx);
  903. if (ctx->tp_cache != NULL) {
  904. ngx_pfree(r->pool, ctx->tp_cache);
  905. }
  906. ngx_http_set_ctx(r, NULL, ngx_http_tnt_module);
  907. }