From 0b97ec3739d99f0778ff827cb58c011b92d27a74 Mon Sep 17 00:00:00 2001 From: michael-grunder Date: Thu, 18 Oct 2018 09:47:10 -0700 Subject: [PATCH] Update STREAM API to handle STATUS -> BULK reply change Right before Redis 5.0 was released, the api was changed to send message ids as BULK instead of STATUS replies. --- library.c | 35 ++++++++++++++++++++++------------- redis.c | 2 +- redis_cluster.c | 2 +- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/library.c b/library.c index 8fecaef1..d5752f0d 100644 --- a/library.c +++ b/library.c @@ -1269,17 +1269,18 @@ redis_read_stream_messages(RedisSock *redis_sock, int count, zval *z_ret { zval zv, *z_message = &zv; int i, mhdr, fields; - char id[1024]; - size_t idlen; + char *id = NULL; + int idlen; /* Iterate over each message */ for (i = 0; i < count; i++) { /* Consume inner multi-bulk header, message ID itself and finaly * the multi-bulk header for field and values */ if ((read_mbulk_header(redis_sock, &mhdr TSRMLS_CC) < 0 || mhdr != 2) || - redis_sock_read_single_line(redis_sock, id, sizeof(id), &idlen, 0 TSRMLS_CC) < 0 || + ((id = redis_sock_read(redis_sock, &idlen TSRMLS_CC)) == NULL) || (read_mbulk_header(redis_sock, &fields TSRMLS_CC) < 0 || fields % 2 != 0)) { + if (id) efree(id); return -1; } @@ -1289,6 +1290,7 @@ redis_read_stream_messages(RedisSock *redis_sock, int count, zval *z_ret redis_mbulk_reply_loop(redis_sock, z_message, fields, UNSERIALIZE_VALS TSRMLS_CC); array_zip_values_and_scores(redis_sock, z_message, SCORE_DECODE_NONE TSRMLS_CC); add_assoc_zval_ex(z_ret, id, idlen, z_message); + efree(id); } return 0; @@ -1404,24 +1406,30 @@ PHP_REDIS_API int redis_read_xclaim_response(RedisSock *redis_sock, int count, zval *rv TSRMLS_DC) { zval zv, *z_msg = &zv; REDIS_REPLY_TYPE type; - char id[1024]; - int i, fields; + char *id; + int i, fields, idlen; long li; - size_t idlen; for (i = 0; i < count; i++) { /* Consume inner reply type */ if (redis_read_reply_type(redis_sock, &type, &li TSRMLS_CC) < 0 || - (type != TYPE_LINE && type != TYPE_MULTIBULK)) return -1; + (type != TYPE_BULK && type != TYPE_MULTIBULK) || + (type == TYPE_BULK && li <= 0)) return -1; - if (type == TYPE_LINE) { - /* JUSTID variant */ - if (redis_sock_gets(redis_sock, id, sizeof(id), &idlen TSRMLS_CC) < 0) + /* TYPE_BULK is the JUSTID variant, otherwise it's standard xclaim response */ + if (type == TYPE_BULK) { + if ((id = redis_sock_read_bulk_reply(redis_sock, (size_t)li TSRMLS_CC)) == NULL) return -1; - add_next_index_stringl(rv, id, idlen); + + add_next_index_stringl(rv, id, li); + efree(id); } else { - if (li != 2 || redis_sock_read_single_line(redis_sock, id, sizeof(id), &idlen, 0 TSRMLS_CC) < 0 || - (read_mbulk_header(redis_sock, &fields TSRMLS_CC) < 0 || fields % 2 != 0)) return -1; + if ((li != 2 || (id = redis_sock_read(redis_sock, &idlen TSRMLS_CC)) == NULL) || + (read_mbulk_header(redis_sock, &fields TSRMLS_CC) < 0 || fields % 2 != 0)) + { + if (id) efree(id); + return -1; + } REDIS_MAKE_STD_ZVAL(z_msg); array_init(z_msg); @@ -1429,6 +1437,7 @@ redis_read_xclaim_response(RedisSock *redis_sock, int count, zval *rv TSRMLS_DC) redis_mbulk_reply_loop(redis_sock, z_msg, fields, UNSERIALIZE_VALS TSRMLS_CC); array_zip_values_and_scores(redis_sock, z_msg, SCORE_DECODE_NONE TSRMLS_CC); add_assoc_zval_ex(rv, id, idlen, z_msg); + efree(id); } } diff --git a/redis.c b/redis.c index b8244718..2718997c 100644 --- a/redis.c +++ b/redis.c @@ -3592,7 +3592,7 @@ PHP_METHOD(Redis, xack) { } PHP_METHOD(Redis, xadd) { - REDIS_PROCESS_CMD(xadd, redis_single_line_reply); + REDIS_PROCESS_CMD(xadd, redis_read_variant_reply); } PHP_METHOD(Redis, xclaim) { diff --git a/redis_cluster.c b/redis_cluster.c index 130b961a..8f10bcb0 100644 --- a/redis_cluster.c +++ b/redis_cluster.c @@ -2983,7 +2983,7 @@ PHP_METHOD(RedisCluster, xack) { /* {{{ proto string RedisCluster::xadd(string key, string id, array field_values) }}} */ PHP_METHOD(RedisCluster, xadd) { - CLUSTER_PROCESS_CMD(xadd, cluster_single_line_resp, 0); + CLUSTER_PROCESS_CMD(xadd, cluster_bulk_raw_resp, 0); } /* {{{ proto array RedisCluster::xclaim(string key, string group, string consumer,