summaryrefslogtreecommitdiffstats
path: root/channel.h
diff options
context:
space:
mode:
authorRemi Collet <remi@remirepo.net>2018-08-31 14:02:25 +0200
committerRemi Collet <remi@remirepo.net>2018-08-31 14:02:25 +0200
commit10ce346530a757323bd9ea0aeae3cac1de5cd960 (patch)
treedb41d6e0904a1bd91689e28bf07f59b5427c1b98 /channel.h
parent0f701e29462ce5c4e2aedc9bde0f1aa8fae39bd2 (diff)
v4.1.0
Diffstat (limited to 'channel.h')
-rw-r--r--channel.h107
1 files changed, 107 insertions, 0 deletions
diff --git a/channel.h b/channel.h
new file mode 100644
index 0000000..ab4fb5f
--- /dev/null
+++ b/channel.h
@@ -0,0 +1,107 @@
+#pragma once
+
+#include "swoole.h"
+#include "context.h"
+#include "coroutine.h"
+#include <string>
+#include <iostream>
+#include <list>
+#include <queue>
+#include <sys/stat.h>
+
+namespace swoole {
+
+enum channel_op
+{
+ PRODUCER = 1,
+ CONSUMER = 2,
+};
+
+class Channel;
+
+struct notify_msg_t
+{
+ Channel *chan;
+ enum channel_op type;
+};
+
+struct timeout_msg_t
+{
+ Channel *chan;
+ coroutine_t *co;
+ bool error;
+ swTimer_node *timer;
+};
+
+class Channel
+{
+private:
+ std::list<coroutine_t *> producer_queue;
+ std::list<coroutine_t *> consumer_queue;
+ std::queue<void *> data_queue;
+ size_t capacity;
+ uint32_t notify_producer_count;
+ uint32_t notify_consumer_count;
+
+public:
+ bool closed;
+ inline bool is_empty()
+ {
+ return data_queue.size() == 0;
+ }
+
+ inline bool is_full()
+ {
+ return data_queue.size() == capacity;
+ }
+
+ inline size_t length()
+ {
+ return data_queue.size();
+ }
+
+ inline size_t consumer_num()
+ {
+ return consumer_queue.size();
+ }
+
+ inline size_t producer_num()
+ {
+ return producer_queue.size();
+ }
+
+ inline void remove(coroutine_t *co)
+ {
+ consumer_queue.remove(co);
+ }
+
+ inline coroutine_t* pop_coroutine(enum channel_op type)
+ {
+ coroutine_t* co;
+ if (type == PRODUCER)
+ {
+ co = producer_queue.front();
+ producer_queue.pop_front();
+ notify_producer_count--;
+ swDebug("resume producer[%d]", coroutine_get_cid(co));
+ }
+ else
+ {
+ co = consumer_queue.front();
+ consumer_queue.pop_front();
+ notify_consumer_count--;
+ swDebug("resume consumer[%d]", coroutine_get_cid(co));
+ }
+ return co;
+ }
+
+ Channel(size_t _capacity);
+ void yield(enum channel_op type);
+ void notify(enum channel_op type);
+ void* pop(double timeout = 0);
+ bool push(void *data);
+ bool close();
+};
+
+};
+