Vasiliy Soshnikov 4 years ago
parent
commit
c9ab0d85c8
9 changed files with 1265 additions and 639 deletions
  1. 7
    2
      misc/json2tp.c
  2. 38
    20
      misc/nginx.dev.conf
  3. 9
    2
      misc/tp_dump.c
  4. 818
    0
      ngx_http_tnt_handlers.c
  5. 122
    0
      ngx_http_tnt_handlers.h
  6. 90
    596
      ngx_http_tnt_module.c
  7. 18
    1
      tp_ext.h
  8. 129
    14
      tp_transcode.c
  9. 34
    4
      tp_transcode.h

+ 7
- 2
misc/json2tp.c View File

@@ -60,8 +60,13 @@ main(int argc, char **argv)
}

tp_transcode_t t;
if (tp_transcode_init(&t, output, size, YAJL_JSON_TO_TP, NULL)
== TP_TRANSCODE_ERROR)
tp_transcode_init_args_t args = {
.output = output,
.output_size = size,
.method = NULL, .method_len = 0,
.codec = YAJL_JSON_TO_TP,
.mf = NULL };
if (tp_transcode_init(&t, &args) == TP_TRANSCODE_ERROR)
{
fprintf(stderr, "json2tp: failed to initialize transcode, exiting\n");
exit(2);

+ 38
- 20
misc/nginx.dev.conf View File

@@ -1,6 +1,6 @@
#daemon off;
#master_process off;
worker_processes 4;
daemon off;
master_process off;
#worker_processes 4;

worker_rlimit_core 500M;

@@ -10,14 +10,9 @@ error_log logs/notice.log notice;
error_log logs/info.log info;
error_log logs/crit.log crit;
error_log logs/debug.log debug;
#error_log stderr;
error_log stderr;

events {
# use kqueue;
# use epoll;
# use poll;
worker_connections 10000;
}
events {}

http {

@@ -37,15 +32,8 @@ http {
keepalive_timeout 15;
send_timeout 10;

upstream echo {
server 127.0.0.1:10001 max_fails=1 fail_timeout=30s;
keepalive 1024;
}

upstream tnt {
server 127.0.0.1:9999 max_fails=1 fail_timeout=30s;
server 127.0.0.1:10000 max_fails=1 fail_timeout=30s;
server 127.0.0.1:10001 backup;
keepalive 1024;
}

@@ -54,11 +42,41 @@ http {

server_name tnt_test;

location = /echo {
tnt_pass echo;
location = /tnt {
tnt_pass tnt;
}

location = /tnt {
location /rest_api {
tnt_pass tnt;
tnt_pass_http_request on;
}

location /preset_method {
tnt_method 'echo_2';
tnt_pass tnt;
}

location = /post_pass_http_request {
tnt_pass_http_request on;
tnt_pass tnt;
}

location /post_preset_method {
tnt_method 'echo_2';
tnt_pass_http_request on;
tnt_pass tnt;
}

location /overflow_rest_api {
tnt_pass_http_request_buffer_size 30;
tnt_pass tnt;
tnt_pass_http_request on;
}

location /overflow_post_pass_http_request {
tnt_pass_http_request_buffer_size 30;
tnt_method 'echo_2';
tnt_pass_http_request on;
tnt_pass tnt;
}
}

+ 9
- 2
misc/tp_dump.c View File

@@ -57,8 +57,15 @@ main(int argc, char ** argv)
memset(ibuf, 0, sizeof(ibuf));

tp_transcode_t tc;
if (tp_transcode_init(&tc, obuf, sizeof(obuf) - 1, TP_TO_JSON, NULL)
== TP_TRANSCODE_ERROR)
tp_transcode_init_args_t args = {
.output = obuf,
.output_size = sizeof(obuf) - 1,
.method = NULL,
.method_len = 0,
.codec = TP_TO_JSON,
.mf = NULL
};
if (tp_transcode_init(&tc, &args) == TP_TRANSCODE_ERROR)
{
fprintf(stderr, "tp_dump: failed to initialize transcode\n");
exit(2);

+ 818
- 0
ngx_http_tnt_handlers.c View File

@@ -0,0 +1,818 @@
/*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the
* following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* Copyright (C) 2015-2016 Tarantool AUTHORS:
* please see AUTHORS file.
*/

#include <tp_ext.h>
#include <ngx_http_tnt_handlers.h>
#include <debug.h>

extern ngx_module_t ngx_http_tnt_module;

static inline size_t
ngx_http_tnt_get_output_size(
ngx_http_request_t *r,
ngx_http_tnt_ctx_t *ctx,
ngx_http_tnt_loc_conf_t *tlcf,
ngx_buf_t *request_b)
{
(void)ctx;
size_t output_size = 0;

if (r->method & NGX_HTTP_POST) {
output_size += r->headers_in.content_length_n;
}

if (tlcf->method.len) {
output_size += tlcf->method.len;
}

output_size *= tlcf->in_multiplier;

if (request_b != NULL) {
output_size += request_b->last - request_b->start;
}

return output_size;
}


static inline ngx_int_t
ngx_http_tnt_output_err(ngx_http_request_t *r,
ngx_http_tnt_ctx_t *ctx,
ngx_int_t code)
{
ngx_http_upstream_t *u;
ngx_chain_t *cl, **ll;

u = r->upstream;

if (ctx->in_err == NULL) {
u->headers_in.status_n = 500;
u->state->status = 500;
u->headers_in.content_length_n = 0;
return NGX_OK;
}

u->headers_in.status_n = code;
u->state->status = code;
u->headers_in.content_length_n = ctx->in_err->last - ctx->in_err->pos;
u->length = 0;

for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
ll = &cl->next;
}

*ll = cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
if (cl == NULL) {
return NGX_ERROR;
}

cl->buf = ctx->in_err;
cl->next = NULL;

cl->buf->pos = cl->buf->start;
cl->buf->end = cl->buf->last;

cl->buf->flush = 1;
cl->buf->memory = 1;
cl->buf->tag = u->output.tag;
cl->buf->last_in_chain = 1;

return NGX_OK;
}


static inline ngx_int_t
ngx_http_tnt_read_greeting(ngx_http_request_t *r,
ngx_http_tnt_ctx_t *ctx,
ngx_buf_t *b)
{
if (b->last - b->pos >= (ptrdiff_t)sizeof("Tarantool") - 1
&& b->pos[0] == 'T'
&& b->pos[1] == 'a'
&& b->pos[2] == 'r'
&& b->pos[3] == 'a'
&& b->pos[4] == 'n'
&& b->pos[5] == 't'
&& b->pos[6] == 'o'
&& b->pos[7] == 'o'
&& b->pos[8] == 'l')
{
b->pos = b->pos + 128;
/**
* Sometimes Nginx reads only 'greeting'(i.e. 128 bytes) -- to avoid
* side effects (inside 'init'/'filter') we must return to
* 'process_header'
*/
if (b->pos == b->last) {
return NGX_AGAIN;
}
}

ctx->greeting = 1;

return NGX_OK;
}


static ngx_int_t
ngx_http_tnt_send_once(ngx_http_request_t *r,
ngx_http_tnt_ctx_t *ctx,
ngx_chain_t *out_chain,
const u_char *buf, size_t len)
{
tp_transcode_t tc;
size_t complete_msg_size;
tp_transcode_init_args_t args = {
.output = (char *)out_chain->buf->start,
.output_size = out_chain->buf->end - out_chain->buf->start,
.method = NULL, .method_len = 0,
.codec = YAJL_JSON_TO_TP,
.mf = NULL
};

if (tp_transcode_init(&tc, &args) == TP_TRANSCODE_ERROR) {
goto error_exit;
}

if (tp_transcode(&tc, (char *)buf, len) == TP_TRANSCODE_ERROR) {
dd("ngx_http_tnt_send:tp_transcode error: %s, code:%d",
tc.errmsg, tc.errcode);
goto error_exit;
}

if (tp_transcode_complete(&tc, &complete_msg_size) == TP_TRANSCODE_OK) {

out_chain->buf->last = out_chain->buf->start + complete_msg_size;

if (tc.batch_size > 1) {
ctx->rest_batch_size = ctx->batch_size = tc.batch_size;
}
} else {
goto error_exit;
}

tp_transcode_free(&tc);
return NGX_OK;

error_exit:
tp_transcode_free(&tc);
return NGX_ERROR;
}


static inline void
ngx_http_tnt_cleanup(ngx_http_request_t *r, ngx_http_tnt_ctx_t *ctx)
{
if (ctx == NULL) {
return;
}

if (ctx->tp_cache != NULL) {
ngx_pfree(r->pool, ctx->tp_cache);
ctx->tp_cache = NULL;
}
}


static inline ngx_str_t
ngx_http_tnt_get_method(ngx_http_request_t *r, ngx_http_tnt_loc_conf_t *tlcf)
{
size_t len;
u_char *pos, *end;
ngx_str_t method = { .data = NULL, .len = 0 };
if (tlcf->method.data && tlcf->method.len)
method = tlcf->method;
else if (r->uri.data && r->uri.len)
{
len = r->uri.len;
end = pos = r->uri.data + len;
for (;len; --len, --pos) {
if (*pos == '/') {
method.data = pos + 1;
method.len = end - method.data;
break;
}
}
}
return method;
}


static inline ngx_int_t
ngx_http_tnt_get_request_data(ngx_http_request_t *r, struct tp *tp)
{
char *root_map_place;
char *headers_map_place;
size_t root_items;
size_t headers_items;
ngx_list_part_t *part;
ngx_table_elt_t *h;

root_items = 0;
root_map_place = tp->p;
tp_add(tp, 1 + sizeof(uint32_t));

++root_items;
if (!tp_encode_str_map_item(tp,
"uri", sizeof("uri")-1,
(const char*)r->unparsed_uri.data,
r->unparsed_uri.len))
{
return NGX_ERROR;
}

++root_items;
if (!tp_encode_str(tp, "headers", sizeof("headers")-1)) {
return NGX_ERROR;
}

headers_items = 0;
headers_map_place = tp->p;
if (!tp_add(tp, 1 + sizeof(uint32_t))) {
return NGX_ERROR;
}

if (r->headers_in.headers.size) {

part = &r->headers_in.headers.part;
h = part->elts;

size_t i = 0;
for (;; i++) {

if (i >= part->nelts) {
if (part->next == NULL) {
break;
}
part = part->next;
h = part->elts;
i = 0;
}

++headers_items;
if (!tp_encode_str_map_item(tp,
(const char *)h[i].key.data,
h[i].key.len,
(const char *)h[i].value.data,
h[i].value.len))
{
return NGX_ERROR;
}
}
}

*(headers_map_place++) = 0xdf;
*(uint32_t *) headers_map_place = mp_bswap_u32(headers_items);

*(root_map_place++) = 0xdf;
*(uint32_t *) root_map_place = mp_bswap_u32(root_items);

return NGX_OK;
}


static inline ngx_buf_t *
ngx_http_tnt_get_request_data_map(
ngx_http_request_t *r,
ngx_http_tnt_loc_conf_t *tlcf)
{
ngx_int_t rc;
struct tp tp;
ngx_buf_t *b;

b = ngx_create_temp_buf(r->pool, tlcf->pass_http_request_buffer_size);
if (b == NULL) {
crit("[BUG?] ngx_http_tnt_get_request_data_map - "
"failed to allocate output buffer, size");
return NULL;
}

b->memory = 1;
b->flush = 1;

b->pos = b->start;

tp_init(&tp, (char *)b->start, b->end - b->start, NULL, NULL);
tp.size = tp.p;

rc = ngx_http_tnt_get_request_data(r, &tp);
if (rc != NGX_OK) {
return NULL;
}

b->last = (u_char *)tp.p;

return b;
}


ngx_http_tnt_ctx_t *
ngx_http_tnt_create_ctx(ngx_http_request_t *r)
{
ngx_http_tnt_ctx_t *ctx;

ctx = ngx_palloc(r->pool, sizeof(ngx_http_tnt_ctx_t));
if (ctx == NULL) {
return NULL;
}

ngx_http_tnt_reset_ctx(ctx);

ngx_http_set_ctx(r, ctx, ngx_http_tnt_module);

ctx->state = OK;

return ctx;
}


void
ngx_http_tnt_reset_ctx(ngx_http_tnt_ctx_t *ctx)
{
ctx->payload.p = &ctx->payload.mem[0];
ctx->payload.e = &ctx->payload.mem[sizeof(ctx->payload.mem) - 1];

ctx->state = OK;

ctx->in_err = ctx->tp_cache = NULL;

ctx->rest = 0;
ctx->payload_size = 0;

ctx->rest_batch_size = 0;
ctx->batch_size = 0;

ctx->greeting = 0;
}


void ngx_http_tnt_set_handlers(
ngx_http_request_t *r,
ngx_http_upstream_t *u,
ngx_http_tnt_loc_conf_t *tlcf)
{
(void)tlcf;

u->reinit_request = ngx_http_tnt_reinit_request;
u->process_header = ngx_http_tnt_process_header;
u->abort_request = ngx_http_tnt_abort_request;
u->finalize_request = ngx_http_tnt_finalize_request;

switch (r->method) {
case NGX_HTTP_POST:
u->create_request = ngx_http_tnt_post_json_handler;
break;
default:
u->create_request = ngx_http_tnt_query_handler;
break;
}
}


ngx_int_t
ngx_http_tnt_post_json_handler(ngx_http_request_t *r)
{
ngx_buf_t *b, *request_b = NULL;
ngx_chain_t *body;
size_t complete_msg_size;
tp_transcode_t tc;
ngx_http_tnt_ctx_t *ctx;
ngx_chain_t *out_chain;
ngx_http_tnt_loc_conf_t *tlcf;

if (r->headers_in.content_length_n == 0) {
/** XXX
* Probably, this case we should handle like 'NOT ALLOWED'?
*/
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "empty body");
return NGX_ERROR;
}

ctx = ngx_http_tnt_create_ctx(r);
if (ctx == NULL) {
return NGX_ERROR;
}

tlcf = ngx_http_get_module_loc_conf(r, ngx_http_tnt_module);

out_chain = ngx_alloc_chain_link(r->pool);
if (out_chain == NULL) {
return NGX_ERROR;
}

if (tlcf->pass_http_request == NGX_TNT_CONF_ON) {
request_b = ngx_http_tnt_get_request_data_map(r, tlcf);
if (request_b == NULL) {
return NGX_ERROR;
}
}

out_chain->buf = ngx_create_temp_buf(r->pool,
ngx_http_tnt_get_output_size(r, ctx, tlcf, request_b));
if (out_chain->buf == NULL) {
crit("[BUG?] ngx_http_tnt_post_json_handler -- "
"failed to allocate output buffer, size %ui",
r->headers_in.content_length_n * tlcf->in_multiplier);
return NGX_ERROR;
}

out_chain->next = NULL;
out_chain->buf->memory = 1;
out_chain->buf->flush = 1;

out_chain->buf->pos = out_chain->buf->start;
out_chain->buf->last = out_chain->buf->pos;
out_chain->buf->last_in_chain = 1;

/**
* Conv. input json to Tarantool message [
*/
tp_transcode_init_args_t args = {
.output = (char *)out_chain->buf->start,
.output_size = out_chain->buf->end - out_chain->buf->start,
.method = (char *)tlcf->method.data,
.method_len = tlcf->method.len,
.codec = YAJL_JSON_TO_TP,
.mf = NULL
};

if (tp_transcode_init(&tc, &args) == TP_TRANSCODE_ERROR) {
crit("[BUG] failed to call tp_transcode_init(input)");
return NGX_ERROR;
}

if (request_b != NULL)
{
tp_transcode_bind_data(&tc,
(const char *)request_b->start,
(const char *)request_b->last);
}

for (body = r->upstream->request_bufs; body; body = body->next) {

if (body->buf->in_file) {

ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"tnt: in-file buffer found. aborted. "
"consider increasing your 'client_body_buffer_size' "
"setting");

const ngx_http_tnt_error_t *e = get_error_text(REQUEST_TOO_LARGE);
ctx->in_err = ngx_http_tnt_set_err(r,
e->code,
e->msg.data, e->msg.len);
if (ctx->in_err == NULL) {
goto error_exit;
}

ctx->state = INPUT_TO_LARGE;

goto read_input_done;

} else {
b = body->buf;
}

if (tp_transcode(&tc, (char *)b->pos, b->last - b->pos)
== TP_TRANSCODE_ERROR)
{
ctx->state = INPUT_JSON_PARSE_FAILED;
goto read_input_done;
}
}

if (tp_transcode_complete(&tc, &complete_msg_size) == TP_TRANSCODE_OK) {

out_chain->buf->last = out_chain->buf->start + complete_msg_size;

if (tc.batch_size > 1) {
ctx->rest_batch_size = ctx->batch_size = tc.batch_size;
}

dd("ctx->batch_size:%i, tc.batch_size:%i, complete_msg_size:%i",
ctx->batch_size,
tc.batch_size,
(int)complete_msg_size);

} else {
ctx->state = INPUT_JSON_PARSE_FAILED;
goto read_input_done;
}
/** ]
*/
read_input_done:

if (ctx->state != OK) {

if (ctx->in_err == NULL) {
ctx->in_err = ngx_http_tnt_set_err(r,
tc.errcode,
(u_char *)tc.errmsg,
ngx_strlen(tc.errmsg));
if (ctx->in_err == NULL) {
goto error_exit;
}
}

/* Rewrite output buffer since it may be less then needed
*/
static const u_char fd_event[] =
"{\"method\":\"__nginx_needs_fd_event\",\"params\":[]}";

out_chain->buf = ngx_create_temp_buf(r->pool,
ngx_strlen(fd_event) * tlcf->in_multiplier);
if (out_chain->buf == NULL) {
goto error_exit;
}

out_chain->next = NULL;
out_chain->buf->memory = 1;
out_chain->buf->flush = 1;

out_chain->buf->pos = out_chain->buf->start;
out_chain->buf->last = out_chain->buf->pos;
out_chain->buf->last_in_chain = 1;

/** Fire event manualy on tarantool socket.
* This is need for run all parts of sequence
*/
if (ngx_http_tnt_send_once(r,
ctx,
out_chain,
fd_event, ngx_strlen(fd_event))
!= NGX_OK)
{
dd("ngx_http_tnt_send (i.e. file fd event) failed");
goto error_exit;
}
}

/**
* Hooking output chain
*/
r->upstream->request_bufs = out_chain;

tp_transcode_free(&tc);
return NGX_OK;

error_exit:
tp_transcode_free(&tc);
return NGX_ERROR;
}


ngx_int_t
ngx_http_tnt_query_handler(ngx_http_request_t *r)
{
ngx_int_t rc;
ngx_buf_t *buf;
ngx_http_tnt_ctx_t *ctx;
ngx_chain_t *out_chain;
ngx_http_tnt_loc_conf_t *tlcf;
struct tp tp;
const ngx_http_tnt_error_t *err = NULL;

ctx = ngx_http_tnt_create_ctx(r);
if (ctx == NULL) {
return NGX_ERROR;
}

tlcf = ngx_http_get_module_loc_conf(r, ngx_http_tnt_module);

out_chain = ngx_alloc_chain_link(r->pool);
if (out_chain == NULL) {
return NGX_ERROR;
}

out_chain->buf = ngx_create_temp_buf(r->pool,
tlcf->pass_http_request_buffer_size);
if (out_chain->buf == NULL) {
return NGX_ERROR;
}

out_chain->next = NULL;
out_chain->buf->memory = 1;
out_chain->buf->flush = 1;

out_chain->buf->pos = out_chain->buf->start;
out_chain->buf->last = out_chain->buf->pos;
out_chain->buf->last_in_chain = 1;

/**
* Conv. GET/PUT/DELETE to Tarantool message [
*/
buf = out_chain->buf;
tp_init(&tp, (char *)buf->start, buf->end - buf->start, NULL, NULL);

ngx_str_t method = ngx_http_tnt_get_method(r, tlcf);
if (!method.data && !method.len) {
crit("[BUG?] ngx_http_tnt_query_handler - ngx_http_tnt_get_method failed");
return NGX_ERROR;
}

if (!tp_call_nargs(&tp,
(const char *)method.data,
(size_t)method.len, 1))
{
err = get_error_text(HTTP_REQUEST_TOO_LARGE);
crit("ngx_http_tnt_query_handler - %s", err->msg.data);
return NGX_ERROR;
}

rc = ngx_http_tnt_get_request_data(r, &tp);
if (rc != NGX_OK) {
err = get_error_text(HTTP_REQUEST_TOO_LARGE);
crit("ngx_http_tnt_query_handler - %s", err->msg.data);
return rc;
}

out_chain->buf->last = (u_char *)tp.p;
/** ]
*/

/**
* Hooking output chain
*/
r->upstream->request_bufs = out_chain;

return NGX_OK;
}


ngx_int_t
ngx_http_tnt_reinit_request(ngx_http_request_t *r)
{
dd("reinit connection with Tarantool...");

ngx_http_tnt_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_tnt_module);
if (ctx == NULL) {
return NGX_OK;
}

ngx_http_tnt_cleanup(r, ctx);
ngx_http_tnt_reset_ctx(ctx);

return NGX_OK;
}


ngx_int_t
ngx_http_tnt_process_header(ngx_http_request_t *r)
{
ngx_http_upstream_t *u = r->upstream;
ngx_buf_t *b = &r->upstream->buffer;
ngx_http_tnt_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_tnt_module);

ngx_int_t rc;

dd("process_header-> greeting: '%s', recv: %i",
ctx->greeting ? "yes" : "no",
(int)(b->last - b->pos));

if (!ctx->greeting) {

rc = ngx_http_tnt_read_greeting(r, ctx, b);
if (rc == NGX_ERROR) {
return rc;

/**
* If ctx->state is not OK we did not sent request to Tarantool
* backend but we still must handle ctx->state at this stage --
* so just ignore NGX_AGAIN and pass to next handler.
*/
} else if (rc == NGX_AGAIN && ctx->state == OK) {
return rc;
}
}

switch (ctx->state) {
case OK:
break;
case INPUT_TO_LARGE:
case INPUT_JSON_PARSE_FAILED:
case INPUT_EMPTY:
return ngx_http_tnt_output_err(r, ctx, NGX_HTTP_BAD_REQUEST);
default:
crit("[BUG] unexpected ctx->stage(%i)", ctx->state);
return NGX_ERROR;
}

/*
* At this stage we can't get full upstream size,
* since Tarantool could send to us 1 upto N messages
* where each of messages could have X size.
*
* As fix -- just set each upstream mode to chunked.
*/
u->headers_in.chunked = 1;
u->headers_in.status_n = 200;
u->state->status = 200;

return NGX_OK;
}


void
ngx_http_tnt_abort_request(ngx_http_request_t *r)
{
dd("abort http tnt request");
ngx_http_tnt_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_tnt_module);
ngx_http_tnt_cleanup(r, ctx);
}


void
ngx_http_tnt_finalize_request(ngx_http_request_t *r, ngx_int_t rc)
{
dd("finalize http tnt request");
ngx_http_tnt_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_tnt_module);
ngx_http_tnt_cleanup(r, ctx);
}



ngx_buf_t*
ngx_http_tnt_set_err(ngx_http_request_t *r,
int errcode,
const u_char *msg, size_t len)
{
const size_t msglen = len + sizeof("{"
"'error':{"
"'message':'',"
"'code':-XXXXX"
"}"
"}");

ngx_buf_t *b = ngx_create_temp_buf(r->pool, msglen);
if (b == NULL) {
return NULL;
}

b->memory = 1;
b->pos = b->start;

b->last = ngx_snprintf(b->start, msglen, "{"
"\"error\":{"
"\"code\":%d,"
"\"message\":\"%s\""
"}"
"}",
errcode,
msg);

return b;
}

/**
*/
static const ngx_http_tnt_error_t errors[] = {

{ ngx_string("Request too large, consider increasing your "
"server's setting 'client_body_buffer_size'"),
-32001
},

{ ngx_string("Unknown parse error"),
-32002
},

{ ngx_string("Request too largs, consider increasing your "
"server's setting 'tnt_pass_http_request_buffer_size'"),
-32001
}
};

const ngx_http_tnt_error_t *
get_error_text(int type)
{
return &errors[type];
}

+ 122
- 0
ngx_http_tnt_handlers.h View File

@@ -0,0 +1,122 @@
/*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the
* following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* Copyright (C) 2015-2016 Tarantool AUTHORS:
* please see AUTHORS file.
*/

#ifndef NGX_HTTP_TNT_CREATE_REQUEST_H_INCLUDED
#define NGX_HTTP_TNT_CREATE_REQUEST_H_INCLUDED

#include <ngx_core.h>
#include <ngx_http.h>
#include <tp_transcode.h>

typedef enum ngx_tnt_conf_states {
NGX_TNT_CONF_ON = 0,
NGX_TNT_CONF_OFF = 1
} ngx_tnt_conf_states_e;

typedef struct {
ngx_http_upstream_conf_t upstream;
ngx_int_t index;

size_t in_multiplier;
size_t out_multiplier;

ngx_str_t method;
size_t pass_http_request_buffer_size;
ngx_uint_t pass_http_request;
} ngx_http_tnt_loc_conf_t;

enum ctx_state {
OK = 0,

INPUT_JSON_PARSE_FAILED,
INPUT_TO_LARGE,
INPUT_EMPTY,

READ_PAYLOAD,
READ_BODY,
SEND_REPLY
};

typedef struct ngx_http_tnt_ctx {

struct {
u_char mem[6];
u_char *p, *e;
} payload;

enum ctx_state state;
ngx_buf_t *in_err, *tp_cache;
ssize_t rest, payload_size;
int rest_batch_size, batch_size;
ngx_int_t greeting:1;

} ngx_http_tnt_ctx_t;

ngx_http_tnt_ctx_t * ngx_http_tnt_create_ctx(ngx_http_request_t *r);
void ngx_http_tnt_reset_ctx(ngx_http_tnt_ctx_t *ctx);

void ngx_http_tnt_set_handlers(ngx_http_request_t *r,
ngx_http_upstream_t *u,
ngx_http_tnt_loc_conf_t *tlcf);

/** create tarantool requests handlers [
*/
ngx_int_t ngx_http_tnt_post_json_handler(ngx_http_request_t *r);
ngx_int_t ngx_http_tnt_query_handler(ngx_http_request_t *r);
/* ] */

ngx_int_t ngx_http_tnt_reinit_request(ngx_http_request_t *r);
ngx_int_t ngx_http_tnt_process_header(ngx_http_request_t *r);
void ngx_http_tnt_abort_request(ngx_http_request_t *r);
void ngx_http_tnt_finalize_request(ngx_http_request_t *r, ngx_int_t rc);

ngx_buf_t* ngx_http_tnt_set_err(ngx_http_request_t *r,
int errcode,
const u_char *msg,
size_t msglen);

/**
*/
typedef struct ngx_http_tnt_error {
const ngx_str_t msg;
int code;
} ngx_http_tnt_error_t;

enum ngx_http_tnt_err_messages_idx {
REQUEST_TOO_LARGE = 0,
UNKNOWN_PARSE_ERROR = 1,
HTTP_REQUEST_TOO_LARGE = 2
};

const ngx_http_tnt_error_t *get_error_text(int type);

#endif

+ 90
- 596
ngx_http_tnt_module.c View File

@@ -26,125 +26,57 @@
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* Copyright (C) 2015 Tarantool AUTHORS:
* Copyright (C) 2015-2016 Tarantool AUTHORS:
* please see AUTHORS file.
*/

#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>

#include <tp_transcode.h>
#include <debug.h>

enum ctx_state {
OK = 0,

INPUT_JSON_PARSE_FAILED,
INPUT_TO_LARGE,
INPUT_EMPTY,

READ_PAYLOAD,
READ_BODY,
SEND_REPLY
};


typedef struct ngx_http_tnt_error {
const ngx_str_t msg;
int code;
} ngx_http_tnt_error_t;


typedef struct {
ngx_http_upstream_conf_t upstream;
ngx_int_t index;

size_t in_multiplier;
size_t out_multiplier;
} ngx_http_tnt_loc_conf_t;


typedef struct {

struct {
u_char mem[6];
u_char *p, *e;
} payload;

enum ctx_state state;
ngx_buf_t *in_err, *tp_cache;
ssize_t rest, payload_size;
int rest_batch_size, batch_size;
ngx_int_t greeting:1;

} ngx_http_tnt_ctx_t;


/** Pre-filter functions
*/
static inline ngx_http_tnt_ctx_t * ngx_http_tnt_create_ctx(
ngx_http_request_t *r);
static inline void ngx_http_tnt_reset_ctx(ngx_http_tnt_ctx_t *ctx);
static inline ngx_buf_t* ngx_http_tnt_set_err(ngx_http_request_t *r,
int errcode, const u_char *msg, size_t msglen);
static inline ngx_int_t ngx_http_tnt_output_err(ngx_http_request_t *r,
ngx_http_tnt_ctx_t *ctx, ngx_int_t code);
#include <ngx_http_tnt_handlers.h>

/** Filter functions
*/
static inline ngx_int_t ngx_http_tnt_read_greeting(ngx_http_request_t *r,
ngx_http_tnt_ctx_t *ctx, ngx_buf_t *b);
static ngx_int_t ngx_http_tnt_send_reply(ngx_http_request_t *r,
ngx_http_upstream_t *u, ngx_http_tnt_ctx_t *ctx);
static ngx_int_t ngx_http_tnt_send_once(ngx_http_request_t *r,
ngx_http_tnt_ctx_t *ctx, ngx_chain_t *out_chain,
const u_char *buf, size_t len);
static ngx_int_t ngx_http_tnt_filter_reply(ngx_http_request_t *r,
ngx_http_upstream_t *u, ngx_buf_t *b);
static ngx_int_t ngx_http_tnt_send_reply(
ngx_http_request_t *r,
ngx_http_upstream_t *u,
ngx_http_tnt_ctx_t *ctx);

static ngx_int_t ngx_http_tnt_filter_reply(
ngx_http_request_t *r,
ngx_http_upstream_t *u,
ngx_buf_t *b);

/** Rest
*/
static inline void ngx_http_tnt_cleanup(ngx_http_request_t *r,
ngx_http_tnt_ctx_t *ctx);
static inline ngx_buf_t * ngx_http_tnt_create_mem_buf(ngx_http_request_t *r,
ngx_http_upstream_t *u, size_t size);
static inline ngx_int_t ngx_http_tnt_output(ngx_http_request_t *r,
ngx_http_upstream_t *u, ngx_buf_t *b);
static inline ngx_buf_t * ngx_http_tnt_create_mem_buf(
ngx_http_request_t *r,
ngx_http_upstream_t *u,
size_t size);

/** Ngx handlers
static inline ngx_int_t ngx_http_tnt_output(
ngx_http_request_t *r,
ngx_http_upstream_t *u,
ngx_buf_t *b);

/** Filters
*/
static ngx_int_t ngx_http_tnt_create_request(ngx_http_request_t *r);
static ngx_int_t ngx_http_tnt_reinit_request(ngx_http_request_t *r);
static ngx_int_t ngx_http_tnt_process_header(ngx_http_request_t *r);
static ngx_int_t ngx_http_tnt_filter_init(void *data);
static ngx_int_t ngx_http_tnt_filter(void *data, ssize_t bytes);
static void ngx_http_tnt_abort_request(ngx_http_request_t *r);
static void ngx_http_tnt_finalize_request(ngx_http_request_t *r, ngx_int_t rc);

/** Confs
*/
static void *ngx_http_tnt_create_loc_conf(ngx_conf_t *cf);
static char *ngx_http_tnt_merge_loc_conf(ngx_conf_t *cf,
void *parent, void *child);

static char *ngx_http_tnt_pass(ngx_conf_t *cf, ngx_command_t *cmd,
static char *ngx_http_tnt_merge_loc_conf(
ngx_conf_t *cf,
void *parent,
void *child);

static char *ngx_http_tnt_pass(
ngx_conf_t *cf,
ngx_command_t *cmd,
void *conf);

static const ngx_http_tnt_error_t errors[] = {

{ ngx_string("Request too large, consider increasing your "
"server's setting 'client_body_buffer_size'"),
-32001
},

{ ngx_string("Unknown parse error"),
-32002
}
};

enum ngx_http_tnt_err_messages_idx {
REQUEST_TOO_LARGE = 0,
UNKNOWN_PARSE_ERROR = 1
};

static ngx_conf_bitmask_t ngx_http_tnt_next_upstream_masks[] = {
{ ngx_string("error"), NGX_HTTP_UPSTREAM_FT_ERROR },
@@ -154,6 +86,11 @@ static ngx_conf_bitmask_t ngx_http_tnt_next_upstream_masks[] = {
{ ngx_null_string, 0 }
};

static ngx_conf_enum_t ngx_http_tnt_pass_http_request[] = {
{ ngx_string("off"), NGX_TNT_CONF_OFF },
{ ngx_string("on"), NGX_TNT_CONF_ON },
{ ngx_null_string, 0 }
};

static ngx_command_t ngx_http_tnt_commands[] = {

@@ -227,6 +164,28 @@ static ngx_command_t ngx_http_tnt_commands[] = {
offsetof(ngx_http_tnt_loc_conf_t, out_multiplier),
NULL },

{ ngx_string("tnt_method"),
NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_tnt_loc_conf_t, method),
NULL },

{ ngx_string("tnt_pass_http_request_buffer_size"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_size_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_tnt_loc_conf_t, pass_http_request_buffer_size),
NULL },

{ ngx_string("tnt_pass_http_request"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF
|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF|NGX_CONF_TAKE1,
ngx_conf_set_enum_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_tnt_loc_conf_t, pass_http_request),
&ngx_http_tnt_pass_http_request },

ngx_null_command
};

@@ -261,8 +220,9 @@ ngx_module_t ngx_http_tnt_module = {
NGX_MODULE_V1_PADDING
};

/** Ngx handlers
/** Handlers
*/

static ngx_int_t
ngx_http_tnt_handler(ngx_http_request_t *r)
{
@@ -270,7 +230,11 @@ ngx_http_tnt_handler(ngx_http_request_t *r)
ngx_http_upstream_t *u;
ngx_http_tnt_loc_conf_t *tlcf;

if (!(r->method & NGX_HTTP_POST)) {
if (!(r->method & NGX_HTTP_POST
|| r->method & NGX_HTTP_GET
|| r->method & NGX_HTTP_PUT
|| r->method & NGX_HTTP_DELETE))
{
return NGX_HTTP_NOT_ALLOWED;
}

@@ -289,11 +253,7 @@ ngx_http_tnt_handler(ngx_http_request_t *r)

u->conf = &tlcf->upstream;

u->create_request = ngx_http_tnt_create_request;
u->reinit_request = ngx_http_tnt_reinit_request;
u->process_header = ngx_http_tnt_process_header;
u->abort_request = ngx_http_tnt_abort_request;
u->finalize_request = ngx_http_tnt_finalize_request;
ngx_http_tnt_set_handlers(r, u, tlcf);

u->input_filter_init = ngx_http_tnt_filter_init;
u->input_filter = ngx_http_tnt_filter;
@@ -311,257 +271,6 @@ ngx_http_tnt_handler(ngx_http_request_t *r)
}


static ngx_int_t
ngx_http_tnt_create_request(ngx_http_request_t *r)
{

static const u_char fd_event[] = "{\"method\":\"__nginx_needs_fd_event\",\"params\":[]}";

ngx_buf_t *b;
ngx_chain_t *body;
size_t complete_msg_size;
tp_transcode_t tc;
ngx_http_tnt_ctx_t *ctx;
ngx_chain_t *out_chain;
ngx_http_tnt_loc_conf_t *tlcf;

if (r->headers_in.content_length_n == 0) {
/** XXX
* Probably, this case we should handle like 'NOT ALLOWED'?
*/
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "empty body");
return NGX_ERROR;
}

ctx = ngx_http_tnt_create_ctx(r);
if (ctx == NULL) {
return NGX_ERROR;
}

tlcf = ngx_http_get_module_loc_conf(r, ngx_http_tnt_module);

out_chain = ngx_alloc_chain_link(r->pool);
if (out_chain == NULL) {
return NGX_ERROR;
}

out_chain->buf = ngx_create_temp_buf(r->pool,
r->headers_in.content_length_n * tlcf->in_multiplier);
if (out_chain->buf == NULL) {
crit("[BUG?] failed to allocate output buffer, size %ui",
r->headers_in.content_length_n * tlcf->in_multiplier);
return NGX_ERROR;
}

out_chain->next = NULL;
out_chain->buf->memory = 1;
out_chain->buf->flush = 1;

out_chain->buf->pos = out_chain->buf->start;
out_chain->buf->last = out_chain->buf->pos;
out_chain->buf->last_in_chain = 1;

/**
* Conv. input json to Tarantool message [
*/
if (tp_transcode_init(&tc,
(char *)out_chain->buf->start,
out_chain->buf->end - out_chain->buf->start,
YAJL_JSON_TO_TP,
NULL)
== TP_TRANSCODE_ERROR)
{
crit("[BUG] failed to call tp_transcode_init(input)");
return NGX_ERROR;
}

for (body = r->upstream->request_bufs; body; body = body->next) {

if (body->buf->in_file) {

ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"tnt: in-file buffer found. aborted. "
"consider increasing your 'client_body_buffer_size' "
"setting");

const ngx_http_tnt_error_t *e = &errors[REQUEST_TOO_LARGE];
ctx->in_err = ngx_http_tnt_set_err(r,
e->code,
e->msg.data, e->msg.len);
if (ctx->in_err == NULL) {
goto error_exit;
}

ctx->state = INPUT_TO_LARGE;

goto read_input_done;

} else {
b = body->buf;
}

if (tp_transcode(&tc, (char *)b->pos, b->last - b->pos)
== TP_TRANSCODE_ERROR)
{
ctx->state = INPUT_JSON_PARSE_FAILED;

goto read_input_done;
}
}

if (tp_transcode_complete(&tc, &complete_msg_size) == TP_TRANSCODE_OK) {

out_chain->buf->last = out_chain->buf->start + complete_msg_size;

if (tc.batch_size > 1) {
ctx->rest_batch_size = ctx->batch_size = tc.batch_size;
}

dd("ctx->batch_size:%i, tc.batch_size:%i, complete_msg_size:%i",
ctx->batch_size,
tc.batch_size,
(int)complete_msg_size);

} else {

ctx->state = INPUT_JSON_PARSE_FAILED;

goto read_input_done;
}
/** ]
*/

read_input_done:

if (ctx->state != OK) {

if (ctx->in_err == NULL) {
ctx->in_err = ngx_http_tnt_set_err(r,
tc.errcode,
(u_char *)tc.errmsg,
ngx_strlen(tc.errmsg));
if (ctx->in_err == NULL) {
goto error_exit;
}
}

/* Rewrite output buffer since it may be less then needed
*/
out_chain->buf = ngx_create_temp_buf(r->pool,
ngx_strlen(fd_event) * tlcf->in_multiplier);
if (out_chain->buf == NULL) {
goto error_exit;
}

out_chain->next = NULL;
out_chain->buf->memory = 1;
out_chain->buf->flush = 1;

out_chain->buf->pos = out_chain->buf->start;
out_chain->buf->last = out_chain->buf->pos;
out_chain->buf->last_in_chain = 1;

/** Fire event manualy on tarantool socket.
* This is need for run all parts of sequence
*/
if (ngx_http_tnt_send_once(r,
ctx,
out_chain,
fd_event, ngx_strlen(fd_event))
!= NGX_OK)
{
dd("ngx_http_tnt_send (i.e. file fd event) failed");
goto error_exit;
}
}

/**
* Hooking output chain
*/
r->upstream->request_bufs = out_chain;

tp_transcode_free(&tc);
return NGX_OK;

error_exit:
tp_transcode_free(&tc);
return NGX_ERROR;
}


static ngx_int_t
ngx_http_tnt_reinit_request(ngx_http_request_t *r)
{
dd("reinit connection with Tarantool...");

ngx_http_tnt_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_tnt_module);
if (ctx == NULL) {
return NGX_OK;
}

ngx_http_tnt_cleanup(r, ctx);
ngx_http_tnt_reset_ctx(ctx);

return NGX_OK;
}


static ngx_int_t
ngx_http_tnt_process_header(ngx_http_request_t *r)
{
ngx_http_upstream_t *u = r->upstream;
ngx_buf_t *b = &r->upstream->buffer;
ngx_http_tnt_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_tnt_module);

ngx_int_t rc;

dd("process_header-> greeting: '%s', recv: %i",
ctx->greeting ? "yes" : "no",
(int)(b->last - b->pos));

if (!ctx->greeting) {

rc = ngx_http_tnt_read_greeting(r, ctx, b);
if (rc == NGX_ERROR) {
return rc;

/**
* If ctx->state is not OK we did not sent request to Tarantool
* backend but we still must handle ctx->state at this stage --
* so just ignore NGX_AGAIN and pass to next handler.
*/
} else if (rc == NGX_AGAIN && ctx->state == OK) {
return rc;
}
}

switch (ctx->state) {
case OK:
break;
case INPUT_TO_LARGE:
case INPUT_JSON_PARSE_FAILED:
case INPUT_EMPTY:
return ngx_http_tnt_output_err(r, ctx, NGX_HTTP_BAD_REQUEST);
default:
crit("[BUG] unexpected ctx->stage(%i)", ctx->state);
return NGX_ERROR;
}

/*
* At this stage we can't get full upstream size,
* since Tarantool could send to us 1 upto N messages
* where each of messages could have X size.
*
* As fix -- just set each upstream mode to chunked.
*/
u->headers_in.chunked = 1;
u->headers_in.status_n = 200;
u->state->status = 200;

return NGX_OK;
}


static ngx_int_t
ngx_http_tnt_filter_init(void *data)
{
@@ -608,26 +317,6 @@ ngx_http_tnt_filter(void *data, ssize_t bytes)
}


static void
ngx_http_tnt_abort_request(ngx_http_request_t *r)
{
dd("abort http tnt request");

ngx_http_tnt_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_tnt_module);
ngx_http_tnt_cleanup(r, ctx);
}


static void
ngx_http_tnt_finalize_request(ngx_http_request_t *r, ngx_int_t rc)
{
dd("finalize http tnt request");

ngx_http_tnt_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_tnt_module);
ngx_http_tnt_cleanup(r, ctx);
}


static void *
ngx_http_tnt_create_loc_conf(ngx_conf_t *cf)
{
@@ -646,20 +335,24 @@ ngx_http_tnt_create_loc_conf(ngx_conf_t *cf)
* conf->upstream.temp_path = NULL;
* conf->upstream.uri = { 0, NULL };
* conf->upstream.location = NULL;
* conf->method = { 0, NULL };
* conf->queries = NULL;
*/

conf->upstream.local = NGX_CONF_UNSET_PTR;

conf->upstream.connect_timeout =
conf->upstream.send_timeout =
conf->upstream.read_timeout = NGX_CONF_UNSET_MSEC;

conf->upstream.read_timeout =
conf->upstream.next_upstream_timeout = NGX_CONF_UNSET_MSEC;
conf->upstream.next_upstream_tries = NGX_CONF_UNSET;

conf->upstream.buffer_size =
conf->in_multiplier =
conf->out_multiplier = NGX_CONF_UNSET_SIZE;
conf->out_multiplier =
conf->pass_http_request_buffer_size = NGX_CONF_UNSET_SIZE;

conf->pass_http_request = NGX_CONF_UNSET_UINT;

/*
* The hardcoded values
@@ -724,6 +417,14 @@ ngx_http_tnt_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)

ngx_conf_merge_size_value(conf->out_multiplier, prev->out_multiplier, 2);

ngx_conf_merge_str_value(conf->method, prev->method, "");

ngx_conf_merge_size_value(conf->pass_http_request_buffer_size,
prev->pass_http_request_buffer_size, 4096*2);

ngx_conf_merge_uint_value(conf->pass_http_request,
prev->pass_http_request, NGX_TNT_CONF_OFF);

return NGX_CONF_OK;
}

@@ -764,160 +465,8 @@ ngx_http_tnt_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
return NGX_CONF_OK;
}


/** Pre-filter functions
*/
static inline ngx_http_tnt_ctx_t *
ngx_http_tnt_create_ctx(ngx_http_request_t *r)
{
ngx_http_tnt_ctx_t *ctx;

ctx = ngx_palloc(r->pool, sizeof(ngx_http_tnt_ctx_t));
if (ctx == NULL) {
return NULL;
}

ngx_http_tnt_reset_ctx(ctx);

ngx_http_set_ctx(r, ctx, ngx_http_tnt_module);

ctx->state = OK;

return ctx;
}


static inline void
ngx_http_tnt_reset_ctx(ngx_http_tnt_ctx_t *ctx)
{
ctx->payload.p = &ctx->payload.mem[0];
ctx->payload.e = &ctx->payload.mem[sizeof(ctx->payload.mem) - 1];

ctx->state = OK;

ctx->in_err = ctx->tp_cache = NULL;

ctx->rest = 0;
ctx->payload_size = 0;

ctx->rest_batch_size = 0;
ctx->batch_size = 0;

ctx->greeting = 0;
}


static inline ngx_buf_t*
ngx_http_tnt_set_err(ngx_http_request_t *r,
int errcode,
const u_char *msg, size_t len)
{
const size_t msglen = len + sizeof("{"
"'error':{"
"'message':'',"
"'code':-XXXXX"
"}"
"}");

ngx_buf_t *b = ngx_create_temp_buf(r->pool, msglen);
if (b == NULL) {
return NULL;
}

b->memory = 1;
b->pos = b->start;

b->last = ngx_snprintf(b->start, msglen, "{"
"\"error\":{"
"\"code\":%d,"
"\"message\":\"%s\""
"}"
"}",
errcode,
msg);

return b;
}

static inline ngx_int_t
ngx_http_tnt_output_err(ngx_http_request_t *r,
ngx_http_tnt_ctx_t *ctx,
ngx_int_t code)
{
ngx_http_upstream_t *u;
ngx_chain_t *cl, **ll;

u = r->upstream;

if (ctx->in_err == NULL) {
u->headers_in.status_n = 500;
u->state->status = 500;
u->headers_in.content_length_n = 0;
return NGX_OK;
}

u->headers_in.status_n = code;
u->state->status = code;
u->headers_in.content_length_n = ctx->in_err->last - ctx->in_err->pos;
u->length = 0;

for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
ll = &cl->next;
}

*ll = cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
if (cl == NULL) {
return NGX_ERROR;
}

cl->buf = ctx->in_err;
cl->next = NULL;

cl->buf->pos = cl->buf->start;
cl->buf->end = cl->buf->last;

cl->buf->flush = 1;
cl->buf->memory = 1;
cl->buf->tag = u->output.tag;
cl->buf->last_in_chain = 1;

return NGX_OK;
}

/** Filter functions
*/
static inline ngx_int_t
ngx_http_tnt_read_greeting(ngx_http_request_t *r,
ngx_http_tnt_ctx_t *ctx,
ngx_buf_t *b)
{
if (b->last - b->pos >= (ptrdiff_t)sizeof("Tarantool") - 1
&& b->pos[0] == 'T'
&& b->pos[1] == 'a'
&& b->pos[2] == 'r'
&& b->pos[3] == 'a'
&& b->pos[4] == 'n'
&& b->pos[5] == 't'
&& b->pos[6] == 'o'
&& b->pos[7] == 'o'
&& b->pos[8] == 'l')
{
b->pos = b->pos + 128;
/**
* Sometimes Nginx reads only 'greeting'(i.e. 128 bytes) -- to avoid
* side effects (inside 'init'/'filter') we must return to
* 'process_header'
*/
if (b->pos == b->last) {
return NGX_AGAIN;
}
}

ctx->greeting = 1;

return NGX_OK;
}


static ngx_int_t
ngx_http_tnt_send_reply(ngx_http_request_t *r,
@@ -955,10 +504,14 @@ ngx_http_tnt_send_reply(ngx_http_request_t *r,
++output->pos;
}

rc = tp_transcode_init(&tc,
(char *)output->pos, output->end - output->pos,
TP_REPLY_TO_JSON,
NULL);
tp_transcode_init_args_t args = {
.output = (char *)output->pos,
.output_size = output->end - output->pos,
.method = NULL, .method_len = 0,
.codec = TP_REPLY_TO_JSON,
.mf = NULL
};
rc = tp_transcode_init(&tc, &args);
if (rc == TP_TRANSCODE_ERROR) {
crit("[BUG] failed to call tp_transcode_init(output)");
return NGX_ERROR;
@@ -976,7 +529,7 @@ ngx_http_tnt_send_reply(ngx_http_request_t *r,

ngx_pfree(r->pool, output);

const ngx_http_tnt_error_t *e = &errors[UNKNOWN_PARSE_ERROR];
const ngx_http_tnt_error_t *e = get_error_text(UNKNOWN_PARSE_ERROR);
output = ngx_http_tnt_set_err(r, e->code, e->msg.data, e->msg.len);
if (output == NULL) {
goto error_exit;
@@ -1027,51 +580,6 @@ error_exit:
}


static ngx_int_t
ngx_http_tnt_send_once(ngx_http_request_t *r,
ngx_http_tnt_ctx_t *ctx,
ngx_chain_t *out_chain,
const u_char *buf, size_t len)
{
tp_transcode_t tc;
size_t complete_msg_size;

if (tp_transcode_init(&tc,
(char *)out_chain->buf->start,
out_chain->buf->end - out_chain->buf->start,
YAJL_JSON_TO_TP,
NULL)
== TP_TRANSCODE_ERROR)
{
goto error_exit;
}

if (tp_transcode(&tc, (char *)buf, len) == TP_TRANSCODE_ERROR) {
dd("ngx_http_tnt_send:tp_transcode error: %s, code:%d",
tc.errmsg, tc.errcode);
goto error_exit;
}

if (tp_transcode_complete(&tc, &complete_msg_size) == TP_TRANSCODE_OK) {

out_chain->buf->last = out_chain->buf->start + complete_msg_size;

if (tc.batch_size > 1) {
ctx->rest_batch_size = ctx->batch_size = tc.batch_size;
}
} else {
goto error_exit;
}

tp_transcode_free(&tc);
return NGX_OK;

error_exit:
tp_transcode_free(&tc);
return NGX_ERROR;
}


static ngx_int_t
ngx_http_tnt_filter_reply(ngx_http_request_t *r,
ngx_http_upstream_t *u,
@@ -1237,17 +745,3 @@ ngx_http_tnt_output(ngx_http_request_t *r,
return NGX_OK;
}


static inline void
ngx_http_tnt_cleanup(ngx_http_request_t *r, ngx_http_tnt_ctx_t *ctx)
{
if (ctx == NULL) {
return;
}

if (ctx->tp_cache != NULL) {
ngx_pfree(r->pool, ctx->tp_cache);
ctx->tp_cache = NULL;
}
}


+ 18
- 1
tp_ext.h View File

@@ -89,5 +89,22 @@ tp_call_wof_add_params(struct tp *p)
return tp_add(p, sz);
}

#endif
static inline char *
tp_call_nargs(struct tp *p, const char *method, size_t method_len,
size_t nargs)
{
if (!tp_call(p, (const char *)method, (int)method_len))
return NULL;
return tp_encode_array(p, nargs);
}

static inline char *
tp_encode_str_map_item(struct tp *p, const char *key, size_t key_len,
const char * value, size_t value_len)
{
if (!tp_encode_str(p, key, key_len))
return NULL;
return tp_encode_str(p, value, value_len);
}

#endif

+ 129
- 14
tp_transcode.c View File

@@ -142,6 +142,8 @@ typedef struct {
uint32_t id;

tp_transcode_t *tc;

bool read_method;
} yajl_ctx_t;

static inline bool
@@ -315,7 +317,7 @@ yajl_string(void *ctx, const unsigned char * str, size_t len)
say_overflow_r_2(s_ctx);
}

} else if (s_ctx->stage == METHOD) {
} else if (s_ctx->read_method && s_ctx->stage == METHOD) {

dd("METHOD: '%.*s' END\n", (int)len, str);

@@ -327,7 +329,8 @@ yajl_string(void *ctx, const unsigned char * str, size_t len)

s_ctx->stage = WAIT_NEXT;
s_ctx->been_stages |= METHOD;
}
} else
s_ctx->stage = WAIT_NEXT;

return 1;
}
@@ -353,6 +356,13 @@ yajl_map_key(void *ctx, const unsigned char * key, size_t len)
if (unlikely(!tp_call_wof(&s_ctx->tp)))
say_overflow_r_2(s_ctx);

if (!s_ctx->read_method) {
tp_transcode_t *tc = s_ctx->tc;
if (unlikely(!tp_call_wof_add_func(&s_ctx->tp,
tc->method, tc->method_len)))
say_overflow_r_2(s_ctx);
}

s_ctx->stage = WAIT_NEXT;
++s_ctx->tc->batch_size;
}
@@ -390,7 +400,8 @@ yajl_map_key(void *ctx, const unsigned char * key, size_t len)
dd("ID STAGE\n");
s_ctx->stage = ID;
}
else if (len == sizeof("method") - 1
else if (s_ctx->read_method
&& len == sizeof("method") - 1
&& key[0] == 'm'
&& key[1] == 'e'
&& key[2] == 't'
@@ -448,6 +459,13 @@ yajl_end_map(void *ctx)
{
yajl_ctx_t *s_ctx = (yajl_ctx_t *)ctx;

if (!s_ctx->read_method
&& (s_ctx->been_stages & (PARAMS | ID))
== (PARAMS | ID))
{
s_ctx->been_stages |= METHOD;
}

if ((s_ctx->been_stages & (PARAMS | ID | METHOD))
== (PARAMS | ID | METHOD))
{
@@ -497,9 +515,11 @@ yajl_start_array(void *ctx)

stack_grow_array(s_ctx);

bool r = false;
if (unlikely(s_ctx->size == 0))
bool r = false, is_params = false;
if (unlikely(s_ctx->size == 0)) {
r = stack_push(s_ctx, s_ctx->tp.p, TYPE_ARRAY | PARAMS);
is_params = true;
}
else
r = stack_push(s_ctx, s_ctx->tp.p, TYPE_ARRAY);

@@ -513,6 +533,18 @@ yajl_start_array(void *ctx)

tp_add(&s_ctx->tp, 1 + sizeof(uint32_t));

// Bind data here [
if (unlikely(is_params)) {
tp_transcode_t *tc = s_ctx->tc;
if (tc->data.pos && tc->data.len) {
if (s_ctx->tp.e - s_ctx->tp.p < tc->data.len)
say_overflow_r_2(s_ctx);
memcpy(s_ctx->tp.p, tc->data.pos, tc->data.len);
tp_add(&s_ctx->tp, tc->data.len);
}
}
// ]

return 1;
}

@@ -533,6 +565,11 @@ yajl_end_array(void *ctx)
dd("PARAMS END\n");
s_ctx->stage = WAIT_NEXT;
s_ctx->been_stages |= PARAMS;
// Increase number of args for binded data [
tp_transcode_t *tc = s_ctx->tc;
if (tc->data.pos && tc->data.len)
++item->count;
// ]
}

*(item->ptr++) = 0xdd;
@@ -604,6 +641,10 @@ yajl_json2tp_create(tp_transcode_t *tc, char *output, size_t output_size)
if (unlikely(!ctx->hand))
goto error_exit;

ctx->read_method = true;
if (tc->method && tc->method_len)
ctx->read_method = false;

ctx->tc = tc;

return ctx;
@@ -690,6 +731,56 @@ yajl_json2tp_complete(void *ctx, size_t *complete_msg_size)
return TP_TRANSCODE_ERROR;
}


/**
* CODEC - uri query to Tarantool message
*/

typedef struct query2tp_ctx {
char *pos;
char *end;
tp_transcode_t *tc;
} query2tp_ctx_t;

static void*
query2tp_create(tp_transcode_t *tc, char *output, size_t output_size)
{
query2tp_ctx_t *ctx = tc->mf.alloc(tc->mf.ctx, sizeof(query2tp_ctx_t));
if (unlikely(!ctx))
return NULL;

ctx->pos = output;
ctx->end = output + output_size;
ctx->tc = tc;

return ctx;
}

static void
query2tp_free(void *ctx_)
{
if (unlikely(!ctx_))
return;
query2tp_ctx_t *ctx = ctx_;
tp_transcode_t * tc = ctx->tc;
tc->mf.free(tc->mf.ctx, ctx);
}

enum tt_result
query2tp_transcode(void *ctx, const char *in, size_t in_size)
{
(void)ctx, (void)in, (void)in_size;
return 0;
}

enum tt_result
query2tp_complete(void *ctx, size_t *cmpl_msg_size)
{
(void)ctx, (void)cmpl_msg_size;
return 0;
}


/**
* CODEC - Tarantool message to JSON RPC
*/
@@ -1044,6 +1135,11 @@ tp_codec_t codecs[TP_CODEC_MAX] = {
&yajl_json2tp_complete,
&yajl_json2tp_free),

CODEC(&query2tp_create,
&query2tp_transcode,
&query2tp_complete,
&query2tp_free),

CODEC(&tp2json_create,
&tp_reply2json_transcode,
&tp2json_complete,
@@ -1083,25 +1179,27 @@ def_free(void *ctx, void *m)
}

enum tt_result
tp_transcode_init(tp_transcode_t *t, char *output, size_t output_size,
enum tp_codec_type codec, mem_fun_t *mf)
tp_transcode_init(tp_transcode_t *t, const tp_transcode_init_args_t *args)
{
memset(t, 0, sizeof(tp_transcode_t));

if (unlikely(codec == TP_CODEC_MAX))
if (unlikely(args->codec == TP_CODEC_MAX))
return TP_TRANSCODE_ERROR;

t->codec = codecs[codec];
t->codec = codecs[args->codec];
if (unlikely(!t->codec.create))
return TP_TRANSCODE_ERROR;

t->mf.alloc = &def_alloc;
t->mf.realloc = &def_realloc;
t->mf.free = &def_free;
if (likely(mf != NULL))
t->mf = *mf;
if (likely(args->mf != NULL))
t->mf = *args->mf;

t->method = args->method;
t->method_len = args->method_len;

t->codec.ctx = t->codec.create(t, output, output_size);
t->codec.ctx = t->codec.create(t, args->output, args->output_size);
if (unlikely(!t->codec.ctx))
return TP_TRANSCODE_ERROR;

@@ -1123,6 +1221,9 @@ tp_transcode_free(tp_transcode_t *t)

t->codec.free(t->codec.ctx);
t->codec.ctx = NULL;

t->method = NULL;
t->method_len = 0;
}

enum tt_result
@@ -1142,13 +1243,27 @@ tp_transcode(tp_transcode_t *t, const char *b, size_t s)
return t->codec.transcode(t->codec.ctx, b, s);
}

void
tp_transcode_bind_data(tp_transcode_t *t,
const char *data_beg, const char *data_end)
{
assert(t);
t->data.pos = data_beg;
t->data.end = data_end;
t->data.len = data_end - data_beg;
}

bool
tp_dump(char *output, size_t output_size,
const char *input, size_t input_size)
{
tp_transcode_t t;
if (tp_transcode_init(&t, output, output_size, TP_TO_JSON, NULL)
== TP_TRANSCODE_ERROR)
tp_transcode_init_args_t args = {
.output = output, .output_size = output_size,
.method = NULL, .method_len = 0,
.codec = TP_TO_JSON, .mf = NULL };