From 10ce346530a757323bd9ea0aeae3cac1de5cd960 Mon Sep 17 00:00:00 2001 From: Remi Collet Date: Fri, 31 Aug 2018 14:02:25 +0200 Subject: v4.1.0 --- channel.h | 107 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 channel.h (limited to 'channel.h') diff --git a/channel.h b/channel.h new file mode 100644 index 0000000..ab4fb5f --- /dev/null +++ b/channel.h @@ -0,0 +1,107 @@ +#pragma once + +#include "swoole.h" +#include "context.h" +#include "coroutine.h" +#include +#include +#include +#include +#include + +namespace swoole { + +enum channel_op +{ + PRODUCER = 1, + CONSUMER = 2, +}; + +class Channel; + +struct notify_msg_t +{ + Channel *chan; + enum channel_op type; +}; + +struct timeout_msg_t +{ + Channel *chan; + coroutine_t *co; + bool error; + swTimer_node *timer; +}; + +class Channel +{ +private: + std::list producer_queue; + std::list consumer_queue; + std::queue data_queue; + size_t capacity; + uint32_t notify_producer_count; + uint32_t notify_consumer_count; + +public: + bool closed; + inline bool is_empty() + { + return data_queue.size() == 0; + } + + inline bool is_full() + { + return data_queue.size() == capacity; + } + + inline size_t length() + { + return data_queue.size(); + } + + inline size_t consumer_num() + { + return consumer_queue.size(); + } + + inline size_t producer_num() + { + return producer_queue.size(); + } + + inline void remove(coroutine_t *co) + { + consumer_queue.remove(co); + } + + inline coroutine_t* pop_coroutine(enum channel_op type) + { + coroutine_t* co; + if (type == PRODUCER) + { + co = producer_queue.front(); + producer_queue.pop_front(); + notify_producer_count--; + swDebug("resume producer[%d]", coroutine_get_cid(co)); + } + else + { + co = consumer_queue.front(); + consumer_queue.pop_front(); + notify_consumer_count--; + swDebug("resume consumer[%d]", coroutine_get_cid(co)); + } + return co; + } + + Channel(size_t _capacity); + void yield(enum channel_op type); + void notify(enum channel_op type); + void* pop(double timeout = 0); + bool push(void *data); + bool close(); +}; + +}; + -- cgit