Browse Source

Add yajl, tarantool-c as submodule, c89 compatibility

tags/v0.0.1
bigbes 5 years ago
parent
commit
75d0357f0a
7 changed files with 1291 additions and 266 deletions
  1. 6
    0
      .gitmodules
  2. 18
    5
      Makefile
  3. 263
    261
      ngx_http_tnt_module.c
  4. 1
    0
      third_party/tarantool-c
  5. 1
    0
      third_party/yajl
  6. 859
    0
      tp_transcode.c
  7. 143
    0
      tp_transcode.h

+ 6
- 0
.gitmodules View File

@@ -0,0 +1,6 @@
[submodule "third_party/tarantool-c"]
path = third_party/tarantool-c
url = https://github.com/dedok/tarantool-c.git
[submodule "third_party/yajl"]
path = third_party/yajl
url = https://github.com/lloyd/yajl.git

+ 18
- 5
Makefile View File

@@ -1,20 +1,32 @@
.PHONY = all

CUR_PATH = $(PWD)
YAJL_PATH = $(PWD)/third_party/yajl
TNTC_PATH = $(PWD)/third_party/tarantool-c

NGX_PATH = nginx
MODULE_PATH = $(PWD)
PREFIX_PATH = $(PWD)/test
INC_FLAGS = -I$(TNTC_PATH)/src -I$(TNTC_PATH)/src/msgpuck
INC_FLAGS += -I$(YAJL_PATH)
LDFLALGS = -L$(YAJL_PATH)/build/yajl-2.1.0/lib/

.PHONY: all build
all: build

yajl:
ln -sf src third_party/yajl/yajl
cd $(YAJL_PATH); ./configure; make distro
build:
$(MAKE) -C $(NGX_PATH)

configure-debug:
cd $(NGX_PATH) && \
CFLAGS="-ggdb3 -O0 -Wall -Werror" ./configure \
CFLAGS="-ggdb3 -O0 -Wall -Werror $(INC_FLAGS)" ./configure \
--prefix=$(PREFIX_PATH) \
--add-module=$(MODULE_PATH) \
--without-http_rewrite_module \
--with-debug
--with-debug --with-ld-opt='$(LDFLAGS)'
mkdir -p $(PREFIX_PATH)/conf $(PREFIX_PATH)/logs
cp -Rf $(NGX_PATH)/conf/* $(PREFIX_PATH)/conf
rm -f $(PREFIX_PATH)/conf/nginx.conf $(PREFIX_PATH)/conf/nginx.dev.conf
@@ -30,5 +42,6 @@ configure:
clean:
$(MAKE) -C $(NGX_PATH) clean

build-all: configure build
build-all-debug: configure-debug build
build-all: yajl configure build
build-all-debug: yajl configure-debug build


+ 263
- 261
ngx_http_tnt_module.c View File

@@ -22,22 +22,22 @@


typedef struct {
ngx_http_upstream_conf_t upstream;
ngx_int_t index;
ngx_http_upstream_conf_t upstream;
ngx_int_t index;
} ngx_http_tnt_loc_conf_t;


typedef struct {
tp_transcode_t in_t, out_t;
ngx_buf_t *in_cache;
ngx_chain_t *out_chain;
ssize_t payload;
ngx_int_t greetings:1;
tp_transcode_t in_t, out_t;
ngx_buf_t *in_cache;
ngx_chain_t *out_chain;
ssize_t payload;
ngx_int_t greetings:1;

#define GOOD 0
#define GOOD 0
#define INPUT_JSON_PARSE_FAILED 1
#define INPUT_TO_LARGE 2
ngx_int_t state;
#define INPUT_TO_LARGE 2
ngx_int_t state;

} ngx_http_tnt_ctx_t;

@@ -46,7 +46,7 @@ static inline ngx_int_t ngx_http_tnt_read_greetings(ngx_http_request_t *r,
static ngx_int_t ngx_http_tnt_output_json_parse_error(ngx_http_request_t *r,
ngx_http_tnt_ctx_t *ctx);
static inline ngx_int_t ngx_http_tnt_say(ngx_http_request_t *r,
ngx_http_tnt_ctx_t *ctx, ngx_int_t code, const u_char *message);
ngx_http_tnt_ctx_t *ctx, ngx_int_t code, const u_char *message);

static ngx_int_t ngx_http_tnt_create_request(ngx_http_request_t *r);
static ngx_int_t ngx_http_tnt_reinit_request(ngx_http_request_t *r);
@@ -64,14 +64,14 @@ static char *ngx_http_tnt_pass(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);

static size_t JSON_RPC_MAGIC = sizeof(
"{"
"'error': {"
"'code':18446744073709551615,"
"'message':''"
"},"
"'result':{}, "
"'id':18446744073709551615"
"}") - 1;
"{"
"'error': {"
"'code':18446744073709551615,"
"'message':''"
"},"
"'result':{}, "
"'id':18446744073709551615"
"}") - 1;

static ngx_command_t ngx_http_tnt_commands[] = {

@@ -87,32 +87,32 @@ static ngx_command_t ngx_http_tnt_commands[] = {


static ngx_http_module_t ngx_http_tnt_module_ctx = {
NULL, /* preconfiguration */
NULL, /* postconfiguration */
NULL, /* preconfiguration */
NULL, /* postconfiguration */

NULL, /* create main configuration */
NULL, /* init main configuration */
NULL, /* create main configuration */
NULL, /* init main configuration */

NULL, /* create server configuration */
NULL, /* merge server configuration */
NULL, /* create server configuration */
NULL, /* merge server configuration */

ngx_http_tnt_create_loc_conf, /* create location configuration */
ngx_http_tnt_merge_loc_conf /* merge location configuration */
ngx_http_tnt_create_loc_conf, /* create location configuration */
ngx_http_tnt_merge_loc_conf /* merge location configuration */
};


ngx_module_t ngx_http_tnt_module = {
NGX_MODULE_V1,
&ngx_http_tnt_module_ctx, /* module context */
ngx_http_tnt_commands, /* module directives */
NGX_HTTP_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
&ngx_http_tnt_module_ctx, /* module context */
ngx_http_tnt_commands, /* module directives */
NGX_HTTP_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};

@@ -120,20 +120,20 @@ ngx_module_t ngx_http_tnt_module = {
static ngx_int_t
ngx_http_tnt_handler(ngx_http_request_t *r)
{
ngx_int_t rc;
ngx_http_upstream_t *u;
ngx_http_tnt_loc_conf_t *mlcf;
ngx_int_t rc;
ngx_http_upstream_t *u;
ngx_http_tnt_loc_conf_t *mlcf;

if (!(r->method & NGX_HTTP_POST)) {
return NGX_HTTP_NOT_ALLOWED;
return NGX_HTTP_NOT_ALLOWED;
}

if (ngx_http_set_content_type(r) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}

if (ngx_http_upstream_create(r) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}

u = r->upstream;
@@ -160,7 +160,7 @@ ngx_http_tnt_handler(ngx_http_request_t *r)

rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);
if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
return rc;
}

return NGX_DONE;
@@ -170,117 +170,117 @@ ngx_http_tnt_handler(ngx_http_request_t *r)
static ngx_int_t
ngx_http_tnt_create_request(ngx_http_request_t *r)
{
ngx_int_t rc;
ngx_buf_t *b;
ngx_chain_t *body;
ngx_http_tnt_ctx_t *ctx;
size_t complete_msg_size = 0;
ngx_int_t rc;
ngx_buf_t *b;
ngx_chain_t *body;
ngx_http_tnt_ctx_t *ctx;
size_t complete_msg_size = 0;

ctx = ngx_http_get_module_ctx(r, ngx_http_tnt_module);
if (ctx == NULL) {

if (r->headers_in.content_length_n == 0) {
dd("empty body from the client");
return NGX_ERROR;
}
if (r->headers_in.content_length_n == 0) {
dd("empty body from the client");
return NGX_ERROR;
}

ctx = ngx_palloc(r->pool, sizeof(ngx_http_tnt_ctx_t));
if (ctx == NULL) {
return NGX_ERROR;
}
ctx = ngx_palloc(r->pool, sizeof(ngx_http_tnt_ctx_t));
if (ctx == NULL) {
return NGX_ERROR;
}

memset(ctx, 0, sizeof(ngx_http_tnt_ctx_t));
ctx->payload = -1;
memset(ctx, 0, sizeof(ngx_http_tnt_ctx_t));
ctx->payload = -1;

ngx_http_set_ctx(r, ctx, ngx_http_tnt_module);
ngx_http_set_ctx(r, ctx, ngx_http_tnt_module);

ctx->out_chain = ngx_alloc_chain_link(r->pool);
if (ctx->out_chain == NULL) {
return NGX_ERROR;
}
ctx->out_chain = ngx_alloc_chain_link(r->pool);
if (ctx->out_chain == NULL) {
return NGX_ERROR;
}

ctx->out_chain->next = NULL;
ctx->out_chain->next = NULL;

const size_t output_size = r->headers_in.content_length_n * 2;
ctx->out_chain->buf = ngx_create_temp_buf(r->pool, output_size);
if (ctx->out_chain->buf == NULL) {
crit("failed to allocate output buffer, size %ui", output_size);
return NGX_ERROR;
}
ctx->out_chain->buf->last_in_chain = 1;
const size_t output_size = r->headers_in.content_length_n * 2;
ctx->out_chain->buf = ngx_create_temp_buf(r->pool, output_size);
if (ctx->out_chain->buf == NULL) {
crit("failed to allocate output buffer, size %ui", output_size);
return NGX_ERROR;
}
ctx->out_chain->buf->last_in_chain = 1;

rc = tp_transcode_init(&ctx->in_t, (char *) ctx->out_chain->buf->start,
output_size, YAJL_JSON_TO_TP);
if (rc == TP_TRANSCODE_ERROR) {
crit("tp_transcode_init() failed, transcode type: %d",
YAJL_JSON_TO_TP);
return NGX_ERROR;
}
rc = tp_transcode_init(&ctx->in_t, (char *) ctx->out_chain->buf->start,
output_size, YAJL_JSON_TO_TP);
if (rc == TP_TRANSCODE_ERROR) {
crit("tp_transcode_init() failed, transcode type: %d",
YAJL_JSON_TO_TP);
return NGX_ERROR;
}

} else /* return after NGX_AGAIN */ {

if (ctx->in_t.codec.create == NULL) {
crit("[BUG] ngx_http_tnt_create_request w/o valid 'ctx'");
return NGX_ERROR;
}
if (ctx->in_t.codec.create == NULL) {
crit("[BUG] ngx_http_tnt_create_request w/o valid 'ctx'");
return NGX_ERROR;
}

}

for (body = r->upstream->request_bufs; body; body = body->next) {

if (body->buf->in_file) {
if (body->buf->in_file) {

ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"tnt: in-file buffer found. aborted. "
"consider increasing your 'client_body_buffer_size' "
"setting");
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"tnt: in-file buffer found. aborted. "
"consider increasing your 'client_body_buffer_size' "
"setting");

ctx->state = INPUT_TO_LARGE;
ctx->state = INPUT_TO_LARGE;

break;
break;

} else {
b = body->buf;
}
} else {
b = body->buf;
}

rc = tp_transcode(&ctx->in_t, (char *)b->pos, b->last - b->pos);
switch (rc) {
rc = tp_transcode(&ctx->in_t, (char *)b->pos, b->last - b->pos);
switch (rc) {

case TP_TRANSCODE_OK: {
rc = tp_transcode_complete(&ctx->in_t, &complete_msg_size);
if (rc == TP_TRANSCODE_ERROR) {
case TP_TRANSCODE_OK: {
rc = tp_transcode_complete(&ctx->in_t, &complete_msg_size);
if (rc == TP_TRANSCODE_ERROR) {

dd("'input coding' failed to complete");
snprintf(&ctx->in_t.errmsg[0], sizeof(ctx->in_t.errmsg),
"{\"error\":\"Unknown input request parse error\"}");
ctx->state = INPUT_JSON_PARSE_FAILED;
dd("'input coding' failed to complete");
snprintf(&ctx->in_t.errmsg[0], sizeof(ctx->in_t.errmsg),
"{\"error\":\"Unknown input request parse error\"}");
ctx->state = INPUT_JSON_PARSE_FAILED;

} else {
} else {

ctx->out_chain->buf->last =
ctx->out_chain->buf->start + complete_msg_size;
ctx->state = GOOD;
ctx->out_chain->buf->last =
ctx->out_chain->buf->start + complete_msg_size;
ctx->state = GOOD;

}
break;
}
}
break;
}

case TP_TRANSCODE_AGAIN:
/** TODO
* Potential BUG
*/
dd("'input coding' needs mode bytes ... (i.e. NGX_AGAIN)");
return NGX_AGAIN;
case TP_TRANSCODE_AGAIN:
/** TODO
* Potential BUG
*/
dd("'input coding' needs mode bytes ... (i.e. NGX_AGAIN)");
return NGX_AGAIN;

default:
case TP_TRANSCODE_ERROR:
dd("'input coding' failed: '%s'", ctx->in_t.errmsg);
snprintf(&ctx->in_t.errmsg[0], sizeof(ctx->in_t.errmsg),
"{\"error\":\"__TODO_PARSE_INPUT_ERROR_MESSAGE__\"}");
tp_transcode_complete(&ctx->in_t, &complete_msg_size);
ctx->state = INPUT_JSON_PARSE_FAILED;
break;
}
default:
case TP_TRANSCODE_ERROR:
dd("'input coding' failed: '%s'", ctx->in_t.errmsg);
snprintf(&ctx->in_t.errmsg[0], sizeof(ctx->in_t.errmsg),
"{\"error\":\"__TODO_PARSE_INPUT_ERROR_MESSAGE__\"}");
tp_transcode_complete(&ctx->in_t, &complete_msg_size);
ctx->state = INPUT_JSON_PARSE_FAILED;
break;
}
}

/**
@@ -297,8 +297,8 @@ ngx_http_tnt_reinit_request(ngx_http_request_t *r)
{
ngx_http_tnt_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_tnt_module);
if (ctx != NULL) {
ngx_pfree(r->pool, ctx);
ngx_http_set_ctx(r, NULL, ngx_http_tnt_module);
ngx_pfree(r->pool, ctx);
ngx_http_set_ctx(r, NULL, ngx_http_tnt_module);
}

dd("reinit connection with Tarantool...");
@@ -312,72 +312,72 @@ ngx_http_tnt_process_header(ngx_http_request_t *r)
{
ngx_http_upstream_t *u = r->upstream;

ngx_int_t rc;
ngx_http_tnt_ctx_t *ctx;
ngx_buf_t *b = &r->upstream->buffer;
struct tpresponse tpresponse;
ngx_int_t rc;
ngx_http_tnt_ctx_t *ctx;
ngx_buf_t *b = &r->upstream->buffer;
struct tpresponse tpresponse;

ctx = ngx_http_get_module_ctx(r, ngx_http_tnt_module);

rc = ngx_http_tnt_read_greetings(r, ctx, b);
if (rc != NGX_OK) {
return rc;
return rc;
}

switch (ctx->state) {
case GOOD:
break;
break;
case INPUT_JSON_PARSE_FAILED:
return ngx_http_tnt_output_json_parse_error(r, ctx);
return ngx_http_tnt_output_json_parse_error(r, ctx);
case INPUT_TO_LARGE:
return ngx_http_tnt_say(r, ctx, NGX_HTTP_OK, (const u_char *)
"{\"error\":\""
"Request too large, consider increasing your "
"server's setting 'client_body_buffer_size'"
"\"}");
return ngx_http_tnt_say(r, ctx, NGX_HTTP_OK, (const u_char *)
"{\"error\":\""
"Request too large, consider increasing your "
"server's setting 'client_body_buffer_size'"
"\"}");
default:
crit("[BUG] 'ctx' unknown state:%d", ctx->state);
return NGX_ERROR;
crit("[BUG] 'ctx' unknown state:%d", ctx->state);
return NGX_ERROR;
}

if (b->last - b->pos == 0) {
return NGX_AGAIN;
return NGX_AGAIN;
}

r->upstream->length = 0;

ctx->payload = tp_read_payload(&tpresponse,
(const char *)b->pos, (const char *)b->last);
(const char *)b->pos, (const char *)b->last);
switch (ctx->payload) {
case 0:
return NGX_AGAIN;
return NGX_AGAIN;
case -1:
crit("Tarantool sent invalid 'payload' (i.e. message size)");
return NGX_HTTP_UPSTREAM_INVALID_HEADER;
crit("Tarantool sent invalid 'payload' (i.e. message size)");
return NGX_HTTP_UPSTREAM_INVALID_HEADER;
default:
u->headers_in.status_n = 200;
u->state->status = 200;
/** We can't get fair size of outgoing JSON message at this stage - so
* just multiply msgpack payload in hope is more or enough
* for the JSON message.
*/
u->headers_in.content_length_n = JSON_RPC_MAGIC + ctx->payload * 2;
crit("got 'payload' expected input len:%i, _magic_ output len:%i",
ctx->payload, u->headers_in.content_length_n);
u->headers_in.status_n = 200;
u->state->status = 200;
/** We can't get fair size of outgoing JSON message at this stage - so
* just multiply msgpack payload in hope is more or enough
* for the JSON message.
*/
u->headers_in.content_length_n = JSON_RPC_MAGIC + ctx->payload * 2;
crit("got 'payload' expected input len:%i, _magic_ output len:%i",
ctx->payload, u->headers_in.content_length_n);

ctx->in_cache = ngx_create_temp_buf(r->pool, ctx->payload);
if (ctx->in_cache == NULL) {
return NGX_ERROR;
}
ctx->in_cache = ngx_create_temp_buf(r->pool, ctx->payload);
if (ctx->in_cache == NULL) {
return NGX_ERROR;
}

ctx->in_cache->pos = ctx->in_cache->start;
ctx->in_cache->memory = 1;
ctx->in_cache->pos = ctx->in_cache->start;
ctx->in_cache->memory = 1;

r->upstream->length = ctx->payload;
r->upstream->length = ctx->payload;

r->keepalive = 1;
r->keepalive = 1;

break;
break;
}

return NGX_OK;
@@ -390,8 +390,8 @@ ngx_http_tnt_filter_init(void *data)
ngx_http_request_t *r = data;
ngx_http_tnt_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_tnt_module);
if (ctx == NULL) {
crit("[BUG] ngx_http_tnt_filter invalid 'ctx'");
return NGX_ERROR;
crit("[BUG] ngx_http_tnt_filter invalid 'ctx'");
return NGX_ERROR;
}

return NGX_OK;
@@ -401,13 +401,13 @@ ngx_http_tnt_filter_init(void *data)
static ngx_int_t
ngx_http_tnt_filter(void *data, ssize_t bytes)
{
ngx_http_request_t *r = data;
ngx_http_request_t *r = data;

ngx_int_t rc;
ngx_http_tnt_ctx_t *ctx;
ngx_buf_t *b;
ngx_int_t rc;
ngx_http_tnt_ctx_t *ctx;
ngx_buf_t *b;
ngx_http_upstream_t *u;
ngx_chain_t *cl, **ll;
ngx_chain_t *cl, **ll;

u = r->upstream;
b = &u->buffer;
@@ -417,13 +417,13 @@ ngx_http_tnt_filter(void *data, ssize_t bytes)
*/
ctx = ngx_http_get_module_ctx(r, ngx_http_tnt_module);
if (ctx == NULL) {
crit("[BUG] ngx_http_tnt_filter invalid 'ctx'");
return NGX_ERROR;
crit("[BUG] ngx_http_tnt_filter invalid 'ctx'");
return NGX_ERROR;
}

if (ctx->state != GOOD) {
crit("[BUG] ngx_http_tnt_filter invalid 'ctx'");
return NGX_ERROR;
crit("[BUG] ngx_http_tnt_filter invalid 'ctx'");
return NGX_ERROR;
}

dd("bytes recv:%d, upstream length:%d", bytes, ctx->payload);
@@ -433,8 +433,8 @@ ngx_http_tnt_filter(void *data, ssize_t bytes)


if (ctx->in_cache->pos == ctx->in_cache->end) {
crit("in_cache overflow");
return NGX_ERROR;
crit("in_cache overflow");
return NGX_ERROR;
}

ctx->in_cache->pos = ngx_copy(ctx->in_cache->pos, b->pos, bytes);
@@ -443,27 +443,27 @@ ngx_http_tnt_filter(void *data, ssize_t bytes)
/* Done with tnt input
*/
if (ctx->payload > 0) {
return NGX_OK;
return NGX_OK;
}

if (ctx->payload < 0) {
crit("[BUG] payload:%i", ctx->payload);
crit("[BUG] payload:%i", ctx->payload);
}

/* Get next output chail buf and output responce
*/
for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
ll = &cl->next;
ll = &cl->next;
}

*ll = cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
if (cl == NULL) {
return NGX_ERROR;
return NGX_ERROR;
}

cl->buf = ngx_create_temp_buf(r->pool, u->headers_in.content_length_n);
if (cl->buf == NULL) {
return NGX_ERROR;
return NGX_ERROR;
}

cl->buf->memory = 1;
@@ -474,46 +474,47 @@ ngx_http_tnt_filter(void *data, ssize_t bytes)
/* Tarantool message transcode & output
*/
rc = tp_transcode_init(&ctx->out_t, (char *)cl->buf->start,
u->headers_in.content_length_n, TP_REPLY_TO_JSON);
u->headers_in.content_length_n, TP_REPLY_TO_JSON);
if (rc == TP_TRANSCODE_ERROR) {
crit("line: %d tp_transcode_init() failed", __LINE__);
return NGX_ERROR;
crit("line: %d tp_transcode_init() failed", __LINE__);
return NGX_ERROR;
}

ngx_int_t result = NGX_OK;
rc = tp_transcode(&ctx->out_t, (char *)ctx->in_cache->start,
ctx->in_cache->end - ctx->in_cache->start);
ctx->in_cache->end - ctx->in_cache->start);
switch (rc) {
case TP_TRANSCODE_OK:
dd("'output coding' OK");
break;
dd("'output coding' OK");
break;
/** TODO
* Since we wait bytes message from the Tarantool we ignore stream parsing.
* This sould be fixed.
* Since we wait bytes message from the Tarantool we ignore stream parsing.
* This sould be fixed.
*/
case TP_TRANSCODE_AGAIN:
case TP_TRANSCODE_ERROR:
default:
crit("output failed: %s", ctx->out_t.errmsg);
result = NGX_ERROR;
break;
crit("output failed: %s", ctx->out_t.errmsg);
result = NGX_ERROR;
break;
}

size_t complete_msg_size = 0;
rc = tp_transcode_complete(&ctx->out_t, &complete_msg_size);
if (rc != TP_TRANSCODE_OK || result == NGX_ERROR) {
crit("'output coding' failed to complete");
result = NGX_ERROR;
crit("'output coding' failed to complete");
result = NGX_ERROR;
} else {

/** 'erase' trailer
*/
for (u_char *p = cl->buf->start + complete_msg_size;
p < cl->buf->end;
++p)
{
*p = '\0';
}
/** 'erase' trailer
*/
u_char *p = NULL;
for (p = cl->buf->start + complete_msg_size;
p < cl->buf->end;
++p)
{
*p = '\0';
}

}

@@ -531,8 +532,8 @@ ngx_http_tnt_abort_request(ngx_http_request_t *r)

ngx_http_tnt_ctx_t *ctx = ngx_http_get_module_ctx(r, ngx_http_tnt_module);
if (ctx != NULL) {
ngx_pfree(r->pool, ctx);
ngx_http_set_ctx(r, NULL, ngx_http_tnt_module);
ngx_pfree(r->pool, ctx);
ngx_http_set_ctx(r, NULL, ngx_http_tnt_module);
}

return;
@@ -554,27 +555,27 @@ ngx_http_tnt_create_loc_conf(ngx_conf_t *cf)

conf = ngx_pcalloc(cf->pool, sizeof(ngx_http_tnt_loc_conf_t));
if (conf == NULL) {
return NULL;
return NULL;
}

/*
* set by ngx_pcalloc():
*
* conf->upstream.bufs.num = 0;
* conf->upstream.next_upstream = 0;
* conf->upstream.temp_path = NULL;
* conf->upstream.uri = { 0, NULL };
* conf->upstream.location = NULL;
* conf->upstream.bufs.num = 0;
* conf->upstream.next_upstream = 0;
* conf->upstream.temp_path = NULL;
* conf->upstream.uri = { 0, NULL };
* conf->upstream.location = NULL;
*/

conf->upstream.local = NGX_CONF_UNSET_PTR;
conf->upstream.next_upstream_tries = NGX_CONF_UNSET_UINT;
conf->upstream.connect_timeout =
conf->upstream.send_timeout =
conf->upstream.read_timeout =
conf->upstream.local = NGX_CONF_UNSET_PTR;
conf->upstream.next_upstream_tries = NGX_CONF_UNSET_UINT;
conf->upstream.connect_timeout =
conf->upstream.send_timeout =
conf->upstream.read_timeout =
conf->upstream.next_upstream_timeout = NGX_CONF_UNSET_MSEC;

conf->upstream.buffer_size = NGX_CONF_UNSET_SIZE;
conf->upstream.buffer_size = NGX_CONF_UNSET_SIZE;

/* the hardcoded values */
conf->upstream.cyclic_temp_file = 0;
@@ -603,29 +604,29 @@ ngx_http_tnt_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_http_tnt_loc_conf_t *conf = child;

ngx_conf_merge_ptr_value(conf->upstream.local,
prev->upstream.local, NULL);
prev->upstream.local, NULL);

ngx_conf_merge_uint_value(conf->upstream.next_upstream_tries,
prev->upstream.next_upstream_tries, 0);
prev->upstream.next_upstream_tries, 0);

ngx_conf_merge_msec_value(conf->upstream.connect_timeout,
prev->upstream.connect_timeout, 60000);
prev->upstream.connect_timeout, 60000);

ngx_conf_merge_msec_value(conf->upstream.send_timeout,
prev->upstream.send_timeout, 60000);
prev->upstream.send_timeout, 60000);

ngx_conf_merge_msec_value(conf->upstream.read_timeout,
prev->upstream.read_timeout, 60000);
prev->upstream.read_timeout, 60000);

ngx_conf_merge_msec_value(conf->upstream.next_upstream_timeout,
prev->upstream.next_upstream_timeout, 0);
prev->upstream.next_upstream_timeout, 0);

ngx_conf_merge_size_value(conf->upstream.buffer_size,
prev->upstream.buffer_size,
(size_t) ngx_pagesize);
prev->upstream.buffer_size,
(size_t) ngx_pagesize);

if (conf->upstream.upstream == NULL) {
conf->upstream.upstream = prev->upstream.upstream;
conf->upstream.upstream = prev->upstream.upstream;
}

return NGX_CONF_OK;
@@ -637,12 +638,12 @@ ngx_http_tnt_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_tnt_loc_conf_t *mlcf = conf;

ngx_str_t *value;
ngx_url_t u;
ngx_str_t *value;
ngx_url_t u;
ngx_http_core_loc_conf_t *clcf;

if (mlcf->upstream.upstream) {
return "is duplicate";
return "is duplicate";
}

value = cf->args->elts;
@@ -654,7 +655,7 @@ ngx_http_tnt_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)

mlcf->upstream.upstream = ngx_http_upstream_add(cf, &u, 0);
if (mlcf->upstream.upstream == NULL) {
return NGX_CONF_ERROR;
return NGX_CONF_ERROR;
}

clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
@@ -662,7 +663,7 @@ ngx_http_tnt_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
clcf->handler = ngx_http_tnt_handler;

if (clcf->name.data[clcf->name.len - 1] == '/') {
clcf->auto_redirect = 1;
clcf->auto_redirect = 1;
}

return NGX_CONF_OK;
@@ -674,34 +675,34 @@ ngx_http_tnt_read_greetings(ngx_http_request_t *r, ngx_http_tnt_ctx_t *ctx,
ngx_buf_t *b)
{
if (ctx->greetings) {
return NGX_OK;
return NGX_OK;
}

if (b->last - b->pos < 128) {
crit("Tarantool sent invalid greetings len:%i",
b->last - b->pos);
return NGX_ERROR;
crit("Tarantool sent invalid greetings len:%i",
b->last - b->pos);
return NGX_ERROR;
}

if (b->last - b->pos >= sizeof("Tarantool") - 1
&& b->pos[0] == 'T'
&& b->pos[1] == 'a'
&& b->pos[2] == 'r'
&& b->pos[3] == 'a'
&& b->pos[4] == 'n'
&& b->pos[5] == 't'
&& b->pos[6] == 'o'
&& b->pos[7] == 'o'
&& b->pos[8] == 'l')
&& b->pos[0] == 'T'
&& b->pos[1] == 'a'
&& b->pos[2] == 'r'
&& b->pos[3] == 'a'
&& b->pos[4] == 'n'
&& b->pos[5] == 't'
&& b->pos[6] == 'o'
&& b->pos[7] == 'o'
&& b->pos[8] == 'l')
{
b->pos = b->pos + 128;
ctx->greetings = 1;
return NGX_OK;
b->pos = b->pos + 128;
ctx->greetings = 1;
return NGX_OK;
}

crit("Tarantool sent strange greetings: '%*.s',"
" expected 'Tarantool' with len. == 128",
128, b->pos);
" expected 'Tarantool' with len. == 128",
128, b->pos);

return NGX_ERROR;
}
@@ -712,17 +713,17 @@ ngx_http_tnt_output_json_parse_error(ngx_http_request_t *r,
ngx_http_tnt_ctx_t *ctx)
{
ngx_http_upstream_t *u;
ngx_chain_t *cl, **ll;
ngx_chain_t *cl, **ll;

u = r->upstream;

for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
ll = &cl->next;
ll = &cl->next;
}

*ll = cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
if (cl == NULL) {
return NGX_ERROR;
return NGX_ERROR;
}

u->headers_in.status_n = 200;
@@ -731,16 +732,17 @@ ngx_http_tnt_output_json_parse_error(ngx_http_request_t *r,

cl->buf = ngx_create_temp_buf(r->pool, r->headers_in.content_length_n);
if (cl->buf == NULL) {
return NGX_ERROR;
return NGX_ERROR;
}

cl->buf->pos = ngx_snprintf(cl->buf->start,
u->headers_in.content_length_n, "%s", ctx->in_t.errmsg);
u->headers_in.content_length_n, "%s", ctx->in_t.errmsg);

/** 'erase' trailer
*/
for (u_char *p = cl->buf->pos; p < cl->buf->end; ++p) {
*p = '\0';
u_char *p = NULL;
for (p = cl->buf->pos; p < cl->buf->end; ++p) {
*p = '\0';
}

cl->buf->last = cl->buf->end;
@@ -760,7 +762,7 @@ static inline ngx_int_t ngx_http_tnt_say(ngx_http_request_t *r,
ngx_http_tnt_ctx_t *ctx, ngx_int_t code, const u_char *message)
{
ngx_http_upstream_t *u;
ngx_chain_t *cl, **ll;
ngx_chain_t *cl, **ll;

u = r->upstream;

@@ -769,22 +771,22 @@ static inline ngx_int_t ngx_http_tnt_say(ngx_http_request_t *r,
u->headers_in.content_length_n = ngx_strlen(message);

for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
ll = &cl->next;
ll = &cl->next;
}

*ll = cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
if (cl == NULL) {
return NGX_ERROR;
return NGX_ERROR;
}

cl->buf = ngx_create_temp_buf(r->pool, r->headers_in.content_length_n);
if (cl->buf == NULL) {
return NGX_ERROR;
return NGX_ERROR;
}

cl->buf->pos = ngx_snprintf(
cl->buf->pos, u->headers_in.content_length_n,
"%s", message);
cl->buf->pos, u->headers_in.content_length_n,
"%s", message);

cl->buf->last = cl->buf->pos;
cl->buf->pos = cl->buf->start;

+ 1
- 0
third_party/tarantool-c

@@ -0,0 +1 @@
Subproject commit 396a7e4306fe0e5393e88d2d0ae2ec4ea7423871

+ 1
- 0
third_party/yajl

@@ -0,0 +1 @@
Subproject commit a0ecdde0c042b9256170f2f8890dd9451a4240aa

+ 859
- 0
tp_transcode.c View File

@@ -0,0 +1,859 @@
#define __STDC_FORMAT_MACROS 1
#include <inttypes.h>
#include <stdio.h>


#define MP_SOURCE 1

#include "tp.h"
#include "tp_transcode.h"

#if defined VERBOSE
#define dd(...) fprintf(stderr, __VA_ARGS__)
#else
#define dd(...)
#endif

#define say_error_r(ctx, ...) do { \
snprintf((ctx)->tc->errmsg, sizeof((ctx)->tc->errmsg) - 1, __VA_ARGS__); \
return TP_TRANSCODE_ERROR; \
} while (0)

#define say_error(ctx, ...) do \
snprintf((ctx)->tc->errmsg, sizeof((ctx)->tc->errmsg) - 1, __VA_ARGS__); \
while (0)

#define say_overflow_r(c) \
say_error_r((c), "line:%d, 'output' buffer overflow", __LINE__)

/*
* CODEC - YAJL_JSON_RPC
*/
#if defined (HAVE_YAJL)

#include <yajl/yajl_parse.h>
#include <yajl/yajl_gen.h>

typedef struct {
char *ptr;
int16_t count;
#define TYPE_MAP 1
#define TYPE_ARRAY 2
int type;
} stack_item_t;


typedef struct {
yajl_handle hand;

#define STACK_SIZE 256
stack_item_t stack[STACK_SIZE];
uint16_t size;
char *b;
size_t output_size;
struct tp tp;

#define PARAMS 4
#define ID 8
#define METHOD 16
int stage;
int been_stages;

uint32_t id;

tp_transcode_t *tc;
} yajl_ctx_t;


static inline bool
spush(yajl_ctx_t *s, char *ptr, int mask)
{
if (mp_likely(s->size < STACK_SIZE)) {
s->stack[s->size].ptr = ptr;
s->stack[s->size].count = 0;
s->stack[s->size].type = mask;
++s->size;
return true;
}
return false;
}

static inline stack_item_t*
stop(yajl_ctx_t *s)
{
if (mp_likely(s->size > 0))
return &s->stack[s->size - 1];
return NULL;
}

static inline stack_item_t*
spop(yajl_ctx_t *s)
{
stack_item_t *ret = NULL;

if (mp_likely(s->size > 0)) {
--s->size;
ret = &s->stack[s->size];
}

return ret;
}

static inline void
sinc_if(yajl_ctx_t *s, int cond)
{
stack_item_t *item = stop(s);
if (mp_likely(item != NULL && item->type & cond))
++item->count;
}
#define sinc_if_array(s) sinc_if((s), TYPE_ARRAY)
#define sinc_if_map(s) sinc_if((s), TYPE_MAP)
#define say_overflow_r_2(c) do { \
say_error((c), "line:%d, 'output' buffer overflow", __LINE__); \
return 0; \
} while (0)

static int
yajl_null(void *ctx)
{
yajl_ctx_t *s_ctx = (yajl_ctx_t *)ctx;
dd("null\n");

if (mp_likely(s_ctx->stage == PARAMS)) {

sinc_if_array(s_ctx);
if (mp_unlikely(!tp_encode_nil(&s_ctx->tp)))
say_overflow_r_2(s_ctx);
}

return 1;
}

static int
yajl_boolean(void * ctx, int v)
{
yajl_ctx_t *s_ctx = (yajl_ctx_t *)ctx;

dd("bool: %s\n", v ? "true" : "false");

if (mp_likely(s_ctx->stage == PARAMS)) {

sinc_if_array(s_ctx);
if (mp_unlikely(!tp_encode_bool(&s_ctx->tp, v)))
say_overflow_r_2(s_ctx);
}

return 1;
}

static int
yajl_integer(void *ctx, long long v)
{
yajl_ctx_t *s_ctx = (yajl_ctx_t *)ctx;

dd("integer: %lld\n", v);

if (mp_likely(s_ctx->stage == PARAMS)) {

sinc_if_array(s_ctx);

char *r = NULL;
if (v < 0)
r = tp_encode_int(&s_ctx->tp, (int64_t)v);
else
r = tp_encode_uint(&s_ctx->tp, (uint64_t)v);

if (mp_unlikely(!r))
say_overflow_r_2(s_ctx);

} else if (s_ctx->stage == ID) {
if (v > UINT32_MAX) {
say_error(s_ctx, "the 'id' large then UINT32_t");
return 0;
}

tp_reqid(&s_ctx->tp, (uint32_t)v);
s_ctx->stage = 0;
}

return 1;
}

static int
yajl_double(void *ctx, double v)
{
yajl_ctx_t *s_ctx = (yajl_ctx_t *)ctx;

dd("double: %g\n", v);

if (mp_likely(s_ctx->stage == PARAMS)) {

sinc_if_array(s_ctx);

if (mp_unlikely(!tp_encode_double(&s_ctx->tp, v)))
say_overflow_r_2(s_ctx);
}

return 1;
}

static int
yajl_string(void *ctx, const unsigned char * str, size_t len)
{
yajl_ctx_t *s_ctx = (yajl_ctx_t *)ctx;

dd("string: '%.*s\n", len, str);

if (mp_likely(s_ctx->stage == PARAMS)) {

sinc_if_array(s_ctx);

if (mp_unlikely(!tp_encode_str(&s_ctx->tp, (const char *)str, len)))
say_overflow_r_2(s_ctx);

} else if (s_ctx->stage == METHOD) {

if (mp_unlikely(!tp_call(&s_ctx->tp, (const char *)str, len))) {
say_error(s_ctx, "tp_call() return NULL");
return 0;
}

s_ctx->stage = 0;
}

return 1;
}

static int
yajl_map_key(void *ctx, const unsigned char * key, size_t len)
{
yajl_ctx_t *s_ctx = (yajl_ctx_t *)ctx;

dd("key: %.*s\n", len, key);

if (mp_likely(s_ctx->stage == PARAMS)) {

sinc_if_map(s_ctx);

if (mp_unlikely(!tp_encode_str(&s_ctx->tp, (const char *)key, len)))
say_overflow_r_2(s_ctx);

} else if (s_ctx->stage != ID && s_ctx->stage != METHOD) {

if (len == sizeof("params") - 1
&& key[0] == 'p'
&& key[1] == 'a'
&& key[2] == 'r'
&& key[3] == 'a'
&& key[4] == 'm'
&& key[5] == 's')
{
s_ctx->stage = PARAMS;
s_ctx->been_stages |= PARAMS;
}
else if (len == sizeof("id") - 1
&& key[0] == 'i'
&& key[1] == 'd')
{
s_ctx->stage = ID;
s_ctx->been_stages |= ID;
}
else if (len == sizeof("method") - 1
&& key[0] == 'm'
&& key[1] == 'e'
&& key[2] == 't'
&& key[3] == 'h'
&& key[4] == 'o'
&& key[5] == 'd')
{
s_ctx->stage = METHOD;
s_ctx->been_stages |= METHOD;
}
else
{
say_error(s_ctx,
"unknown key '%.*s', allowed: 'id', 'method', 'params'",
(int)len, key);
return 0;
}
}

return 1;
}

static int
yajl_start_map(void *ctx)
{
yajl_ctx_t *s_ctx = (yajl_ctx_t *)ctx;

dd("map open '{'\n");

if (mp_likely(s_ctx->stage == PARAMS)) {

sinc_if_array(s_ctx);

bool r = false;
if (mp_unlikely(s_ctx->size == 0))
r = spush(s_ctx, s_ctx->tp.p, TYPE_MAP | PARAMS);
else
r = spush(s_ctx, s_ctx->tp.p, TYPE_MAP);
if (mp_unlikely(!r)) {
say_error(s_ctx, "'stack' overflow");
return 0;
}

if (mp_unlikely(s_ctx->tp.e < s_ctx->tp.p + 1 + sizeof(uint32_t)))
say_overflow_r_2(s_ctx);

tp_add(&s_ctx->tp, 1 + sizeof(uint32_t));
}

return 1;
}


static int
yajl_end_map(void *ctx)
{
yajl_ctx_t *s_ctx = (yajl_ctx_t *)ctx;

if (mp_likely(s_ctx->stage == PARAMS)) {

stack_item_t *item = spop(s_ctx);

dd("map close, count %d '}'\n", item->count);

*(item->ptr++) = 0xdf;
*(uint32_t *) item->ptr = mp_bswap_u32(item->count);

if (mp_unlikely(item->type & PARAMS)) {
s_ctx->stage = 0;
}
}

return 1;
}

static int
yajl_start_array(void *ctx)
{
yajl_ctx_t *s_ctx = (yajl_ctx_t *)ctx;

dd("array open '['\n");

if (mp_likely(s_ctx->stage == PARAMS)) {

sinc_if_array(s_ctx);

bool r = false;
if (mp_unlikely(s_ctx->size == 0))
r = spush(s_ctx, s_ctx->tp.p, TYPE_ARRAY | PARAMS);
else
r = spush(s_ctx, s_ctx->tp.p, TYPE_ARRAY);
if (mp_unlikely(!r)) {
say_error(s_ctx, "'stack' overflow");
return 0;
}

if (mp_unlikely(s_ctx->tp.e < s_ctx->tp.p + 1 + sizeof(uint32_t)))
say_overflow_r_2(s_ctx);

tp_add(&s_ctx->tp, 1 + sizeof(uint32_t));
}

return 1;
}

static int
yajl_end_array(void *ctx)
{
yajl_ctx_t *s_ctx = (yajl_ctx_t *)ctx;

if (mp_likely(s_ctx->stage == PARAMS)) {

stack_item_t *item = spop(s_ctx);
dd("array close, count %d ']'\n", item->count);

*(item->ptr++) = 0xdd;
*(uint32_t *) item->ptr = mp_bswap_u32(item->count);

if (item->type & PARAMS) {
s_ctx->stage = 0;
}
}

return 1;
}

static void *
yajl_json2tp_create(tp_transcode_t *tc, char *output, size_t output_size)
{
static yajl_callbacks callbacks = { yajl_null, yajl_boolean, yajl_integer,
yajl_double, NULL, yajl_string, yajl_start_map, yajl_map_key,
yajl_end_map, yajl_start_array, yajl_end_array
};

yajl_ctx_t *ctx = malloc(sizeof(yajl_ctx_t));
if (mp_unlikely(ctx == NULL)) {
goto error_exit;
}

memset(ctx, 0 , sizeof(yajl_ctx_t));

ctx->output_size = output_size;
tp_init(&ctx->tp, (char *)output, output_size, NULL, NULL);

ctx->size = 0;
size_t i = 0;
for (i = 0; i < STACK_SIZE; ++i) {
ctx->stack[i].ptr = NULL;
ctx->stack[i].count = -1;
ctx->stack[i].type = 0;
}

ctx->hand = yajl_alloc(&callbacks, NULL, (void *)ctx);
if (mp_unlikely(ctx->hand == NULL)) {
goto error_exit;
}

ctx->tc = tc;

return ctx;

error_exit:
if (ctx && ctx->hand) {
yajl_free(ctx->hand);
}

if (ctx) {
free(ctx);
}

return NULL;
}


static void
yajl_json2tp_free(void *ctx)
{
yajl_ctx_t *s_ctx = (yajl_ctx_t *)ctx;

if (mp_likely(s_ctx->hand != NULL)) {
yajl_free(s_ctx->hand);
}

free(s_ctx);
}

static int
yajl_json2tp_transcode(void *ctx, const char *input, size_t input_size)
{
#if !defined MIN
# define MIN(a, b) ((a) > (b) ? (b) : (a))
#endif

yajl_ctx_t *s_ctx = (yajl_ctx_t *)ctx;

const unsigned char *input_ = (const unsigned char *)input;
yajl_status stat = yajl_parse(s_ctx->hand, input_, input_size);
if (mp_unlikely(stat != yajl_status_ok)) {

if (!s_ctx->tc->errmsg[0]) {
unsigned char *err;
stat = yajl_complete_parse(s_ctx->hand);
err = yajl_get_error(s_ctx->hand, 0, input_, input_size);
say_error(s_ctx, "%s", err);
yajl_free_error(s_ctx->hand, err);
}

return TP_TRANSCODE_ERROR;
}

return TP_TRANSCODE_OK;
#undef MIN
}

static int
yajl_json2tp_complete(void *ctx, size_t *complete_msg_size)
{
yajl_ctx_t *s_ctx = (yajl_ctx_t *)ctx;
yajl_status stat = yajl_complete_parse(s_ctx->hand);
if (mp_unlikely(stat != yajl_status_ok))
return TP_TRANSCODE_ERROR;

if (mp_unlikely(
!(s_ctx->been_stages & ID && s_ctx->been_stages & METHOD))
)
{
char *p = &s_ctx->tc->errmsg[0];
char *e = &s_ctx->tc->errmsg[0] + sizeof(s_ctx->tc->errmsg) - 1;
if (!(s_ctx->been_stages & ID))
p += snprintf(p, e - p, "request _MUST_ contain 'id' key");
if (!(s_ctx->been_stages & METHOD))
snprintf(p, e - p, "; request _MUST_ contain 'method' key");
return TP_TRANSCODE_ERROR;
}

*complete_msg_size = tp_used(&s_ctx->tp);

return TP_TRANSCODE_OK;
}

#endif /* HAVE_YAJL */

/**
* CODEC - Tarantool message to JSON RPC
*/

typedef struct tp2json {
char *output;
char *pos;
char *end;

struct tpresponse r;
bool tp_reply_stage;

tp_transcode_t *tc;
} tp2json_t;

static void*
tp2json_create(tp_transcode_t *tc, char *output, size_t output_size)
{
tp2json_t *ctx = malloc(sizeof(tp2json_t));
if (mp_likely(ctx != NULL)) {
ctx->pos = ctx->output = output;
ctx->end = output + output_size;
ctx->tc = tc;
ctx->tp_reply_stage = true;
}
return ctx;
}

static void
tp2json_free(void *ctx_)
{
tp2json_t *ctx = ctx_;
free(ctx);
}

static int
tp2json_transcode_internal(tp2json_t *ctx, const char **beg, const char *end)
{
#define PUT_CHAR(c) { *ctx->pos = (c); ++ctx->pos; }

int rc;
const char *p = *beg;
size_t len = ctx->end - ctx->pos;

if (p == end)
return TP_TRANSCODE_OK;

if (mp_unlikely(ctx->pos == ctx->end))
say_overflow_r(ctx);

switch (mp_typeof(**beg)) {
case MP_NIL:

if (mp_unlikely(len < 4))
say_overflow_r(ctx);

mp_next(beg);
PUT_CHAR('n')
PUT_CHAR('u')
PUT_CHAR('l')
PUT_CHAR('l')
break;
case MP_UINT:
if (mp_unlikely(len < sizeof("18446744073709551615") - 1))
say_overflow_r(ctx);

ctx->pos += snprintf(ctx->pos, len, "%" PRIu64, mp_decode_uint(beg));
break;
case MP_INT:
if (mp_unlikely(len < sizeof("18446744073709551615") - 1))
say_overflow_r(ctx);

ctx->pos += snprintf(ctx->pos, len, "%" PRId64, mp_decode_int(beg));
break;
case MP_STR:
{
uint32_t strlen = 0;
const char *str = mp_decode_str(beg, &strlen);

if (mp_unlikely(len < strlen + 2/*""*/))
say_overflow_r(ctx);

ctx->pos += snprintf(ctx->pos, len, "\"%.*s\"", strlen, str);
break;
}
case MP_BIN:
{
static const char *hex = "0123456789ABCDEF";

uint32_t binlen = 0;
const char *bin = mp_decode_bin(beg, &binlen);

if (mp_unlikely(len < binlen))
say_overflow_r(ctx);
uint32_t i = 0;
for (i = 0; i < binlen; i++) {
unsigned char c = (unsigned char)bin[i];
ctx->pos +=
snprintf(ctx->pos, len, "%c%c", hex[c >> 4], hex[c & 0xF]);
}
break;
}
case MP_ARRAY:
{
const uint32_t size = mp_decode_array(beg);

if (mp_unlikely(len < size + 2/*,[]*/))
say_overflow_r(ctx);

PUT_CHAR('[')
uint32_t i = 0;
for (i = 0; i < size; i++) {
if (i)
PUT_CHAR(',')
rc = tp2json_transcode_internal(ctx, beg, end);
if (rc != TP_TRANSCODE_OK)
return rc;
}
PUT_CHAR(']')
break;
}
case MP_MAP:
{
const uint32_t size = mp_decode_map(beg);

if (mp_unlikely(len < size + 2/*,{}*/))
say_overflow_r(ctx);

PUT_CHAR('{')
uint32_t i = 0;
for (i = 0; i < size; i++) {
if (i)
PUT_CHAR(',')
rc = tp2json_transcode_internal(ctx, beg, end);
if (rc != TP_TRANSCODE_OK)
return rc;

PUT_CHAR(':')
rc = tp2json_transcode_internal(ctx, beg, end);
if (rc != TP_TRANSCODE_OK)
return rc;
}
PUT_CHAR('}')
break;
}
case MP_BOOL:
if (mp_decode_bool(beg)) {

if (mp_unlikely(len < sizeof("true") - 1))
say_overflow_r(ctx);

PUT_CHAR('t')
PUT_CHAR('r')
PUT_CHAR('u')
PUT_CHAR('e')
} else {

if (mp_unlikely(len < sizeof("false") - 1))
say_overflow_r(ctx);

PUT_CHAR('f')
PUT_CHAR('a')
PUT_CHAR('l')
PUT_CHAR('s')
PUT_CHAR('e')
}
break;
case MP_FLOAT:

if (mp_unlikely(len < 7))
say_overflow_r(ctx);

ctx->pos += snprintf(ctx->pos, len, "%f", mp_decode_float(beg));
break;
case MP_DOUBLE:

if (mp_unlikely(len < 15))
say_overflow_r(ctx);

ctx->pos += snprintf(ctx->pos, len, "%f", mp_decode_double(beg));
break;
case MP_EXT:
/* TODO What we should do here? */
mp_next(beg);
break;
default:
return TP_TRANSCODE_ERROR;
}

return TP_TRANSCODE_OK;

#undef PUT_CHAR
}

static int
tp_reply2json_transcode(void *ctx_, const char *in, size_t in_size)
{
int rc;

tp2json_t *ctx = ctx_;

if (ctx->tp_reply_stage) {

rc = tp_reply(&ctx->r, in, in_size);
if (rc == 0)
return TP_TRANSCODE_AGAIN;
else if (rc < 0) {
say_error(ctx, "tarantool message parse error");
goto error_exit;
}

ctx->pos += snprintf(ctx->output, ctx->end - ctx->output,
"{id:%zu,", (size_t)tp_getreqid(&ctx->r));

ctx->tp_reply_stage = false;
}

if (ctx->r.error) {

const int elen = ctx->r.error_end - ctx->r.error;
ctx->pos += snprintf(ctx->pos, ctx->end - ctx->pos,
"error{msg:\"%.*s\", code:%d}",
elen, ctx->r.error,
ctx->r.code);

} else {

const char *it = ctx->r.data;
rc = tp2json_transcode_internal(ctx, &it, ctx->r.data_end);
if (mp_unlikely(rc == TP_TRANSCODE_ERROR))
goto error_exit;
else if (rc == TP_TRANSCODE_AGAIN)
return TP_TRANSCODE_AGAIN;

}

*ctx->pos = '}';
++ctx->pos;

return TP_TRANSCODE_OK;

error_exit:
ctx->pos = ctx->output;
return TP_TRANSCODE_ERROR;
}

static int
tp2json_transcode(void *ctx_, const char *in, size_t in_size)
{
tp2json_t *ctx = ctx_;

const char *it = in, *end = in + in_size;

/* TODO
* Need add tarantool message structure check like in tp_reply
*/

/* Message len */
int rc = tp2json_transcode_internal(ctx, &it, end);
if (mp_unlikely(rc == TP_TRANSCODE_ERROR))
goto error_exit;
if (rc == TP_TRANSCODE_AGAIN)
return rc;

/* Header */
rc = tp2json_transcode_internal(ctx, &it, end);
if (mp_unlikely(rc == TP_TRANSCODE_ERROR))
goto error_exit;
if (rc == TP_TRANSCODE_AGAIN)
return rc;

/* Body */
rc = tp2json_transcode_internal(ctx, &it, end);
if (mp_unlikely(rc == TP_TRANSCODE_ERROR))
goto error_exit;
if (rc == TP_TRANSCODE_AGAIN)
return rc;

return TP_TRANSCODE_OK;

error_exit:
ctx->pos = ctx->output;
return TP_TRANSCODE_ERROR;
}

static int
tp2json_complete(void *ctx_, size_t *complete_msg_size)
{
tp2json_t *ctx = ctx_;

if (mp_unlikely(ctx->pos == ctx->output)) {
*complete_msg_size = 0;
return TP_TRANSCODE_ERROR;
}
*complete_msg_size = ctx->pos - ctx->output;
return TP_TRANSCODE_OK;
}

/**
* Known codecs
*/
#define CODEC(create_, transcode_, complete_, free_) \
(tp_codec_t) { \
.create = (create_), \
.transcode = (transcode_), \
.complete = (complete_), \
.free = (free_) \
}

tp_codec_t codecs[TP_CODEC_MAX] = {

#if defined (HAVE_YAJL)
CODEC(&yajl_json2tp_create,
&yajl_json2tp_transcode,
&yajl_json2tp_complete,
&yajl_json2tp_free),
#else
CODEC(NULL, NULL, NULL, NULL),
#endif

CODEC(&tp2json_create,
&tp_reply2json_transcode,
&tp2json_complete,
&tp2json_free),

CODEC(&tp2json_create,
&tp2json_transcode,
&tp2json_complete,
&tp2json_free)

};
#undef CODEC

/*
* Public API
*/

int
tp_transcode_init(tp_transcode_t *t, char *output, size_t output_size,
enum tp_codec_type codec)
{
memset(t, 0, sizeof(tp_transcode_t));

if (mp_unlikely(codec == TP_CODEC_MAX))
return TP_TRANSCODE_ERROR;

t->codec = codecs[codec];
if (mp_unlikely(!t->codec.create))
return TP_TRANSCODE_ERROR;

t->codec.ctx = t->codec.create(t, output, output_size);
if (mp_unlikely(!t->codec.ctx))
return TP_TRANSCODE_ERROR;

return TP_TRANSCODE_OK;
}

+ 143
- 0
tp_transcode.h View File

@@ -0,0 +1,143 @@
#ifndef TP_TRANSCODE_H_INCLUDED
#define TP_TRANSCODE_H INCLUDED

#ifdef __cplusplus
extern "C" {
#endif

/* {{{ API declaration */
struct tp_transcode;

/**
* Underlying codec functions
*/
typedef void *(*tp_codec_create)(struct tp_transcode*, char *, size_t);
typedef void (*tp_codec_free)(void *);
typedef int (*tp_do_transcode)(void *, const char *, size_t);
typedef int (*tp_do_transcode_complete)(void *, size_t *);

/**
* Underlying codec object
*/
typedef struct tp_codec {
void *ctx; /* Underlying codec context */

tp_codec_create create; /* create codec context(i.e. 'ctx') function */
tp_codec_free free; /* free codec context(i.e. 'ctx') function */

tp_do_transcode transcode; /* transcode function */
tp_do_transcode_complete complete; /* complete function */
} tp_codec_t;

/**
* Avaliable codecs
*/
enum tp_codec_type {

/**
* JSON PRC to Tarantool message (Yajl engine)
*/
YAJL_JSON_TO_TP = 0,

/**
* Tarantool reply message to JSON
*/
TP_REPLY_TO_JSON,

/**
* Tarantool message to JSON
*/
TP_TO_JSON,

TP_CODEC_MAX
};

/**
* tp_transcode obj - underlying codec holder
*/
typedef struct tp_transcode {
tp_codec_t codec;
char errmsg[128];
} tp_transcode_t;

/**
* Returns codes
*/
#define TP_TRANSCODE_OK 1
#define TP_TRANSCODE_ERROR 2
#define TP_TRANSCODE_AGAIN 3

/**
* Initialize struct tp_transcode.
* Returns TP_TRANSCODE_ERROR if codec not found or create codec failed
* Returns TP_TRANSCODE_OK if codec found and initialize well
*/
int tp_transcode_init(tp_transcode_t *t, char *output, size_t output_size,
enum tp_codec_type codec);

/**
* Convert input data to output (see tp_transcode_init), for instance json to msgpack
* Returns TP_TRANSCODE_OK if bytes enought for finish transcoding
* Returns TP_TRANSOCDE_AGAIN if more bytes requered
* Returns TP_TRANSCODE_ERROR if error occurred
*/
static inline int
tp_transcode(tp_transcode_t *t, char *b, size_t size);

/**
* Finalize (including free memory) transcoding.
* Returns TP_TRANSCODE_OK if transcoding done
* Returns TP_TRANSCODE_ERROR if error occurred
*/
static inline int
tp_transcode_complete(tp_transcode_t *t, size_t *complete_msg_size);

/**
* Dump Tarantool message to output in JSON format
* Returns true, false
*/
static inline bool
tp_dump(char *output, size_t output_size, char *input, size_t input_size);

static inline int
tp_transcode_complete(tp_transcode_t *t, size_t *complete_msg_size)
{
if (mp_likely(t != NULL)) {
const int rc = t->codec.complete(t->codec.ctx, complete_msg_size);
t->codec.free(t->codec.ctx);
return rc;
}
return TP_TRANSCODE_ERROR;
}

static inline int
tp_transcode(tp_transcode_t *t, char *b, size_t size)
{
if (mp_likely(t != NULL))
return t->codec.transcode(t->codec.ctx, b, size);
return TP_TRANSCODE_ERROR;
}

static inline bool
tp_dump(char *output, size_t output_size, char *input, size_t input_size)
{
tp_transcode_t t;
if (tp_transcode_init(&t, output, output_size, TP_TO_JSON)
== TP_TRANSCODE_ERROR)
return false;

if (tp_transcode(&t, input, input_size) == TP_TRANSCODE_ERROR)
return false;

size_t complete_msg_size = 0;
tp_transcode_complete(&t, &complete_msg_size);
output[complete_msg_size] = '0';

return complete_msg_size > 0;
}

#ifdef __cplusplus
} /* extern "C" */
#endif

#endif

Loading…
Cancel
Save