From 10ce346530a757323bd9ea0aeae3cac1de5cd960 Mon Sep 17 00:00:00 2001 From: Remi Collet Date: Fri, 31 Aug 2018 14:02:25 +0200 Subject: v4.1.0 --- PHPINFO | 10 +- REFLECTION | 272 +++++++++- channel.cc | 151 ++++++ channel.h | 107 ++++ php-pecl-swoole4.spec | 43 +- socket.cc | 1441 +++++++++++++++++++++++++++++++++++++++++++++++++ socket.h | 142 +++++ 7 files changed, 2138 insertions(+), 28 deletions(-) create mode 100644 channel.cc create mode 100644 channel.h create mode 100644 socket.cc create mode 100644 socket.h diff --git a/PHPINFO b/PHPINFO index 884fd1b..177993c 100644 --- a/PHPINFO +++ b/PHPINFO @@ -2,29 +2,31 @@ swoole swoole support => enabled -Version => 4.0.4 +Version => 4.1.0 Author => Swoole Group[email: team@swoole.com] coroutine => enabled +trace-log => enabled epoll => enabled eventfd => enabled signalfd => enabled cpu affinity => enabled spinlock => enabled rwlock => enabled -async postgresql => enabled -async redis client => enabled -async http/websocket client => enabled sockets => enabled openssl => enabled http2 => enabled pcre => enabled zlib => enabled +brotli => enabled mutex_timedlock => enabled pthread_barrier => enabled futex => enabled mysqlnd => enabled +redis client => enabled +postgresql client => enabled Directive => Local Value => Master Value +swoole.enable_coroutine => On => On swoole.aio_thread_num => 2 => 2 swoole.display_errors => On => On swoole.use_namespace => On => On diff --git a/REFLECTION b/REFLECTION index 4997035..8075de9 100644 --- a/REFLECTION +++ b/REFLECTION @@ -1,6 +1,9 @@ -Extension [ extension #147 swoole version 4.0.4 ] { +Extension [ extension #148 swoole version 4.1.0 ] { - INI { + Entry [ swoole.enable_coroutine ] + Current = 'On' + } Entry [ swoole.aio_thread_num ] Current = '2' } @@ -21,7 +24,7 @@ Extension [ extension #147 swoole version 4.0.4 ] { } } - - Constants [164] { + - Constants [192] { Constant [ integer SWOOLE_BASE ] { 4 } Constant [ integer SWOOLE_THREAD ] { 2 } Constant [ integer SWOOLE_PROCESS ] { 3 } @@ -66,7 +69,7 @@ Extension [ extension #147 swoole version 4.0.4 ] { Constant [ integer SWOOLE_DTLSv1_CLIENT_METHOD ] { 17 } Constant [ integer SWOOLE_EVENT_READ ] { 512 } Constant [ integer SWOOLE_EVENT_WRITE ] { 1024 } - Constant [ string SWOOLE_VERSION ] { 4.0.4 } + Constant [ string SWOOLE_VERSION ] { 4.1.0 } Constant [ integer SWOOLE_ERROR_MALLOC_FAIL ] { 501 } Constant [ integer SWOOLE_ERROR_SYSTEM_CALL_FAIL ] { 502 } Constant [ integer SWOOLE_ERROR_PHP_FATAL_ERROR ] { 503 } @@ -156,6 +159,8 @@ Extension [ extension #147 swoole version 4.0.4 ] { Constant [ integer SW_PGSQL_ASSOC ] { 1 } Constant [ integer SW_PGSQL_NUM ] { 2 } Constant [ integer SW_PGSQL_BOTH ] { 3 } + Constant [ integer SWOOLE_EXIT_IN_COROUTINE ] { 2 } + Constant [ integer SWOOLE_EXIT_IN_SERVER ] { 4 } Constant [ integer SWOOLE_AIO_BASE ] { 0 } Constant [ integer SWOOLE_AIO_LINUX ] { 0 } Constant [ integer SWOOLE_FILELOCK ] { 2 } @@ -163,13 +168,39 @@ Extension [ extension #147 swoole version 4.0.4 ] { Constant [ integer SWOOLE_SEM ] { 4 } Constant [ integer SWOOLE_RWLOCK ] { 1 } Constant [ integer SWOOLE_SPINLOCK ] { 5 } - Constant [ integer WEBSOCKET_OPCODE_TEXT ] { 1 } - Constant [ integer WEBSOCKET_OPCODE_BINARY ] { 2 } - Constant [ integer WEBSOCKET_OPCODE_PING ] { 9 } Constant [ integer WEBSOCKET_STATUS_CONNECTION ] { 1 } Constant [ integer WEBSOCKET_STATUS_HANDSHAKE ] { 2 } Constant [ integer WEBSOCKET_STATUS_FRAME ] { 3 } Constant [ integer WEBSOCKET_STATUS_ACTIVE ] { 3 } + Constant [ integer WEBSOCKET_STATUS_CLOSING ] { 4 } + Constant [ integer WEBSOCKET_OPCODE_CONTINUATION ] { 0 } + Constant [ integer WEBSOCKET_OPCODE_TEXT ] { 1 } + Constant [ integer WEBSOCKET_OPCODE_BINARY ] { 2 } + Constant [ integer WEBSOCKET_OPCODE_CLOSE ] { 8 } + Constant [ integer WEBSOCKET_OPCODE_PING ] { 9 } + Constant [ integer WEBSOCKET_OPCODE_PONG ] { 10 } + Constant [ integer WEBSOCKET_CLOSE_NORMAL ] { 1000 } + Constant [ integer WEBSOCKET_CLOSE_GOING_AWAY ] { 1001 } + Constant [ integer WEBSOCKET_CLOSE_PROTOCOL_ERROR ] { 1002 } + Constant [ integer WEBSOCKET_CLOSE_DATA_ERROR ] { 1003 } + Constant [ integer WEBSOCKET_CLOSE_STATUS_ERROR ] { 1005 } + Constant [ integer WEBSOCKET_CLOSE_ABNORMAL ] { 1006 } + Constant [ integer WEBSOCKET_CLOSE_MESSAGE_ERROR ] { 1007 } + Constant [ integer WEBSOCKET_CLOSE_POLICY_ERROR ] { 1008 } + Constant [ integer WEBSOCKET_CLOSE_MESSAGE_TOO_BIG ] { 1009 } + Constant [ integer WEBSOCKET_CLOSE_EXTENSION_MISSING ] { 1010 } + Constant [ integer WEBSOCKET_CLOSE_SERVER_ERROR ] { 1011 } + Constant [ integer WEBSOCKET_CLOSE_TLS ] { 1015 } + Constant [ integer SWOOLE_HTTP2_TYPE_DATA ] { 0 } + Constant [ integer SWOOLE_HTTP2_TYPE_HEADERS ] { 1 } + Constant [ integer SWOOLE_HTTP2_TYPE_PRIORITY ] { 2 } + Constant [ integer SWOOLE_HTTP2_TYPE_RST_STREAM ] { 3 } + Constant [ integer SWOOLE_HTTP2_TYPE_SETTINGS ] { 4 } + Constant [ integer SWOOLE_HTTP2_TYPE_PUSH_PROMISE ] { 5 } + Constant [ integer SWOOLE_HTTP2_TYPE_PING ] { 6 } + Constant [ integer SWOOLE_HTTP2_TYPE_GOAWAY ] { 7 } + Constant [ integer SWOOLE_HTTP2_TYPE_WINDOW_UPDATE ] { 8 } + Constant [ integer SWOOLE_HTTP2_TYPE_CONTINUATION ] { 9 } Constant [ integer SWOOLE_HTTP2_ERROR_NO_ERROR ] { 0 } Constant [ integer SWOOLE_HTTP2_ERROR_PROTOCOL_ERROR ] { 1 } Constant [ integer SWOOLE_HTTP2_ERROR_INTERNAL_ERROR ] { 2 } @@ -428,7 +459,7 @@ Extension [ extension #147 swoole version 4.0.4 ] { } } - - Classes [48] { + - Classes [51] { Class [ class Swoole\Server ] { - Constants [0] { @@ -995,7 +1026,7 @@ Extension [ extension #147 swoole version 4.0.4 ] { } } - Class [ class Swoole\Connection\Iterator implements Iterator, Traversable, Countable, ArrayAccess ] { + Class [ class Swoole\Connection\Iterator implements Iterator, Traversable, ArrayAccess, Countable ] { - Constants [0] { } @@ -1591,16 +1622,15 @@ Extension [ extension #147 swoole version 4.0.4 ] { - Static methods [0] { } - - Properties [6] { + - Properties [5] { Property [ public $errCode ] Property [ public $sock ] Property [ public $type ] - Property [ public $id ] Property [ public $setting ] Property [ public $connected ] } - - Methods [17] { + - Methods [18] { Method [ public method __construct ] { - Parameters [1] { @@ -1646,9 +1676,8 @@ Extension [ extension #147 swoole version 4.0.4 ] { Method [ public method send ] { - - Parameters [2] { + - Parameters [1] { Parameter #0 [ $data ] - Parameter #1 [ $flag ] } } @@ -1664,12 +1693,21 @@ Extension [ extension #147 swoole version 4.0.4 ] { Method [ public method sendto ] { - Parameters [3] { - Parameter #0 [ $ip ] + Parameter #0 [ $address ] Parameter #1 [ $port ] Parameter #2 [ $data ] } } + Method [ public method recvfrom ] { + + - Parameters [3] { + Parameter #0 [ $length ] + Parameter #1 [ &$address ] + Parameter #2 [ &$port ] + } + } + Method [ public method enableSSL ] { - Parameters [0] { @@ -2801,7 +2839,7 @@ Extension [ extension #147 swoole version 4.0.4 ] { Property [ public $body ] } - - Methods [19] { + - Methods [20] { Method [ public method __construct ] { - Parameters [3] { @@ -2902,6 +2940,16 @@ Extension [ extension #147 swoole version 4.0.4 ] { } } + Method [ public method addData ] { + + - Parameters [4] { + Parameter #0 [ $path ] + Parameter #1 [ $name ] + Parameter #2 [ $type ] + Parameter #3 [ $filename ] + } + } + Method [ public method isConnected ] { - Parameters [0] { @@ -2953,7 +3001,7 @@ Extension [ extension #147 swoole version 4.0.4 ] { - Static properties [0] { } - - Static methods [16] { + - Static methods [18] { Method [ static public method create ] { - Parameters [1] { @@ -3071,6 +3119,21 @@ Extension [ extension #147 swoole version 4.0.4 ] { Parameter #4 [ $service ] } } + + Method [ static public method getBackTrace ] { + + - Parameters [3] { + Parameter #0 [ $cid ] + Parameter #1 [ $options ] + Parameter #2 [ $limit ] + } + } + + Method [ static public method listCoroutines ] { + + - Parameters [0] { + } + } } - Properties [0] { @@ -3080,6 +3143,134 @@ Extension [ extension #147 swoole version 4.0.4 ] { } } + Class [ class Swoole\Coroutine\Iterator implements Iterator, Traversable, Countable ] { + + - Constants [0] { + } + + - Static properties [0] { + } + + - Static methods [0] { + } + + - Properties [0] { + } + + - Methods [7] { + Method [ public method rewind ] { + + - Parameters [0] { + } + } + + Method [ public method next ] { + + - Parameters [0] { + } + } + + Method [ public method current ] { + + - Parameters [0] { + } + } + + Method [ public method key ] { + + - Parameters [0] { + } + } + + Method [ public method valid ] { + + - Parameters [0] { + } + } + + Method [ public method count ] { + + - Parameters [0] { + } + } + + Method [ public method __destruct ] { + + - Parameters [0] { + } + } + } + } + + Class [ class Swoole\ExitException extends Exception implements Throwable ] { + + - Constants [0] { + } + + - Static properties [0] { + } + + - Static methods [0] { + } + + - Properties [4] { + Property [ protected $message ] + Property [ protected $code ] + Property [ protected $file ] + Property [ protected $line ] + } + + - Methods [12] { + Method [ public method getFlags ] { + + - Parameters [0] { + } + } + + Method [ public method getStatus ] { + + - Parameters [0] { + } + } + + Method [ public method __construct ] { + + - Parameters [3] { + Parameter #0 [ $message ] + Parameter #1 [ $code ] + Parameter #2 [ $previous ] + } + } + + Method [ public method __wakeup ] { + } + + Method [ final public method getMessage ] { + } + + Method [ final public method getCode ] { + } + + Method [ final public method getFile ] { + } + + Method [ final public method getLine ] { + } + + Method [ final public method getTrace ] { + } + + Method [ final public method getPrevious ] { + } + + Method [ final public method getTraceAsString ] { + } + + Method [ public method __toString ] { + } + } + } + Class [ class Swoole\Http\Client ] { - Constants [0] { @@ -3495,7 +3686,7 @@ Extension [ extension #147 swoole version 4.0.4 ] { } } - Class [ class Swoole\Table implements ArrayAccess, Iterator, Traversable, Countable ] { + Class [ class Swoole\Table implements Iterator, Traversable, ArrayAccess, Countable ] { - Constants [3] { Constant [ public integer TYPE_INT ] { 1 } @@ -3717,6 +3908,36 @@ Extension [ extension #147 swoole version 4.0.4 ] { } } + Class [ class Swoole\Runtime ] { + + - Constants [0] { + } + + - Static properties [0] { + } + + - Static methods [2] { + Method [ static public method enableStrictMode ] { + + - Parameters [0] { + } + } + + Method [ static public method enableCoroutine ] { + + - Parameters [1] { + Parameter #0 [ $enable ] + } + } + } + + - Properties [0] { + } + + - Methods [0] { + } + } + Class [ class Swoole\Lock ] { - Constants [5] { @@ -4344,8 +4565,9 @@ Extension [ extension #147 swoole version 4.0.4 ] { Method [ public method status ] { - - Parameters [1] { + - Parameters [2] { Parameter #0 [ $http_code ] + Parameter #1 [ $reason ] } } @@ -4430,8 +4652,9 @@ Extension [ extension #147 swoole version 4.0.4 ] { - Static methods [0] { } - - Properties [9] { + - Properties [10] { Property [ public $fd ] + Property [ public $streamId ] Property [ public $header ] Property [ public $server ] Property [ public $request ] @@ -5668,7 +5891,7 @@ Extension [ extension #147 swoole version 4.0.4 ] { Property [ public $port ] } - - Methods [10] { + - Methods [11] { Method [ public method __construct ] { - Parameters [3] { @@ -5700,7 +5923,14 @@ Extension [ extension #147 swoole version 4.0.4 ] { Method [ public method stats ] { - Parameters [1] { - Parameter #0 [ $key ] + Parameter #0 [ $key ] + } + } + + Method [ public method isStreamExist ] { + + - Parameters [1] { + Parameter #0 [ $stream_id ] } } diff --git a/channel.cc b/channel.cc new file mode 100644 index 0000000..88423fc --- /dev/null +++ b/channel.cc @@ -0,0 +1,151 @@ +#include "channel.h" + +using namespace swoole; + +static void channel_defer_callback(void *data) +{ + notify_msg_t *msg = (notify_msg_t*) data; + coroutine_t *co = msg->chan->pop_coroutine(msg->type); + coroutine_resume(co); + delete msg; +} + +static void channel_pop_timeout(swTimer *timer, swTimer_node *tnode) +{ + timeout_msg_t *msg = (timeout_msg_t *) tnode->data; + msg->error = true; + msg->timer = nullptr; + msg->chan->remove(msg->co); + coroutine_resume(msg->co); +} + +Channel::Channel(size_t _capacity) +{ + capacity = _capacity; + closed = false; + notify_producer_count = 0; + notify_consumer_count = 0; +} + +void Channel::yield(enum channel_op type) +{ + int _cid = coroutine_get_current_cid(); + if (_cid == -1) + { + swError("Socket::yield() must be called in the coroutine."); + } + coroutine_t *co = coroutine_get_by_id(_cid); + if (type == PRODUCER) + { + producer_queue.push_back(co); + swDebug("producer[%d]", coroutine_get_cid(co)); + } + else + { + consumer_queue.push_back(co); + swDebug("consumer[%d]", coroutine_get_cid(co)); + } + coroutine_yield(co); +} + +void Channel::notify(enum channel_op type) +{ + notify_msg_t *msg = new notify_msg_t; + msg->chan = this; + msg->type = type; + if (type == PRODUCER) + { + notify_producer_count++; + } + else + { + notify_consumer_count++; + } + SwooleG.main_reactor->defer(SwooleG.main_reactor, channel_defer_callback, msg); +} + +void* Channel::pop(double timeout) +{ + timeout_msg_t msg; + msg.error = false; + if (timeout > 0) + { + int msec = (int) (timeout * 1000); + if (SwooleG.timer.fd == 0) + { + swTimer_init (msec); + } + msg.chan = this; + msg.co = coroutine_get_by_id(coroutine_get_current_cid()); + msg.timer = SwooleG.timer.add(&SwooleG.timer, msec, 0, &msg, channel_pop_timeout); + } + else + { + msg.timer = NULL; + } + if (is_empty() || consumer_queue.size() > 0) + { + yield(CONSUMER); + } + if (msg.error) + { + return nullptr; + } + if (msg.timer) + { + swTimer_del(&SwooleG.timer, msg.timer); + } + /** + * pop data + */ + void *data = data_queue.front(); + data_queue.pop(); + /** + * notify producer + */ + if (producer_queue.size() > 0 && notify_producer_count < producer_queue.size()) + { + notify(PRODUCER); + } + return data; +} + +bool Channel::push(void *data) +{ + if (is_full() || producer_queue.size() > 0) + { + yield(PRODUCER); + } + /** + * push data + */ + data_queue.push(data); + swDebug("push data, count=%ld", length()); + /** + * notify consumer + */ + if (consumer_queue.size() > 0 && notify_consumer_count < consumer_queue.size()) + { + notify(CONSUMER); + } + return true; +} + +bool Channel::close() +{ + if (closed) + { + return false; + } + swDebug("closed"); + closed = true; + while (producer_queue.size() > 0 && notify_producer_count < producer_queue.size()) + { + notify(PRODUCER); + } + while (consumer_queue.size() > 0 && notify_consumer_count < producer_queue.size()) + { + notify(CONSUMER); + } + return true; +} diff --git a/channel.h b/channel.h new file mode 100644 index 0000000..ab4fb5f --- /dev/null +++ b/channel.h @@ -0,0 +1,107 @@ +#pragma once + +#include "swoole.h" +#include "context.h" +#include "coroutine.h" +#include +#include +#include +#include +#include + +namespace swoole { + +enum channel_op +{ + PRODUCER = 1, + CONSUMER = 2, +}; + +class Channel; + +struct notify_msg_t +{ + Channel *chan; + enum channel_op type; +}; + +struct timeout_msg_t +{ + Channel *chan; + coroutine_t *co; + bool error; + swTimer_node *timer; +}; + +class Channel +{ +private: + std::list producer_queue; + std::list consumer_queue; + std::queue data_queue; + size_t capacity; + uint32_t notify_producer_count; + uint32_t notify_consumer_count; + +public: + bool closed; + inline bool is_empty() + { + return data_queue.size() == 0; + } + + inline bool is_full() + { + return data_queue.size() == capacity; + } + + inline size_t length() + { + return data_queue.size(); + } + + inline size_t consumer_num() + { + return consumer_queue.size(); + } + + inline size_t producer_num() + { + return producer_queue.size(); + } + + inline void remove(coroutine_t *co) + { + consumer_queue.remove(co); + } + + inline coroutine_t* pop_coroutine(enum channel_op type) + { + coroutine_t* co; + if (type == PRODUCER) + { + co = producer_queue.front(); + producer_queue.pop_front(); + notify_producer_count--; + swDebug("resume producer[%d]", coroutine_get_cid(co)); + } + else + { + co = consumer_queue.front(); + consumer_queue.pop_front(); + notify_consumer_count--; + swDebug("resume consumer[%d]", coroutine_get_cid(co)); + } + return co; + } + + Channel(size_t _capacity); + void yield(enum channel_op type); + void notify(enum channel_op type); + void* pop(double timeout = 0); + bool push(void *data); + bool close(); +}; + +}; + diff --git a/php-pecl-swoole4.spec b/php-pecl-swoole4.spec index 0555a2c..2ceed15 100644 --- a/php-pecl-swoole4.spec +++ b/php-pecl-swoole4.spec @@ -12,7 +12,7 @@ %if 0%{?scl:1} %global sub_prefix %{scl_prefix} -%scl_package php-pecl-swoole +%scl_package php-pecl-swoole4 %endif %global with_zts 0%{!?_without_zts:%{?__ztsphp:1}} @@ -27,17 +27,32 @@ %endif %global with_nghttpd2 1 %global with_hiredis 1 +%if 0%{?fedora} >= 25 || 0%{?rhel} >= 8 +%global with_brotli 1 +%else +%global with_brotli 0 +%endif + Summary: PHP's asynchronous concurrent distributed networking framework Name: %{?sub_prefix}php-pecl-%{pecl_name}4 -Version: 4.0.4 -Release: 2%{?dist}%{!?scl:%{!?nophptag:%(%{__php} -r 'echo ".".PHP_MAJOR_VERSION.".".PHP_MINOR_VERSION;')}} +Version: 4.1.0 +Release: 1%{?dist}%{!?scl:%{!?nophptag:%(%{__php} -r 'echo ".".PHP_MAJOR_VERSION.".".PHP_MINOR_VERSION;')}} License: BSD URL: http://pecl.php.net/package/%{pecl_name} Source0: http://pecl.php.net/get/%{pecl_name}-%{version}.tgz +Source1: https://raw.githubusercontent.com/swoole/swoole-src/master/include/socket.h +Source2: https://raw.githubusercontent.com/swoole/swoole-src/master/include/channel.h +Source3: https://raw.githubusercontent.com/swoole/swoole-src/master/src/coroutine/socket.cc +Source4: https://raw.githubusercontent.com/swoole/swoole-src/master/src/coroutine/channel.cc + +%if 0%{?rhel} == 6 +BuildRequires: devtoolset-6-toolchain +%else BuildRequires: %{?dtsprefix}gcc BuildRequires: %{?dtsprefix}gcc-c++ +%endif BuildRequires: %{?scl_prefix}php-devel > 7 BuildRequires: %{?scl_prefix}php-pear BuildRequires: %{?scl_prefix}php-sockets @@ -53,6 +68,9 @@ BuildRequires: postgresql-devel > 9.5 %if %{with_hiredis} BuildRequires: hiredis-devel %endif +%if %{with_brotli} +BuildRequires: brotli-devel +%endif Requires: %{?scl_prefix}php(zend-abi) = %{php_zend_api} Requires: %{?scl_prefix}php(api) = %{php_core_api} @@ -152,6 +170,9 @@ sed -e '/examples/s/role="src"/role="doc"/' \ cd NTS +cp %{SOURCE1} %{SOURCE2} include/ +cp %{SOURCE3} %{SOURCE4} src/coroutine/ + # Sanity check, really often broken extver=$(sed -n '/#define PHP_SWOOLE_VERSION/{s/.* "//;s/".*$//;p}' php_swoole.h) if test "x${extver}" != "x%{version}%{?prever:-%{prever}}"; then @@ -171,6 +192,7 @@ cat << 'EOF' | tee %{ini_name} extension=%{pecl_name}.so ; Configuration +;swoole.enable_coroutine = On ;swoole.aio_thread_num = 2 ;swoole.display_errors = On ;swoole.use_namespace = On @@ -181,7 +203,12 @@ EOF %build +%if 0%{?rhel} == 6 +source /opt/rh/devtoolset-6/enable +g++ --version +%else %{?dtsenable} +%endif peclbuild() { %configure \ @@ -218,7 +245,12 @@ peclbuild %{_bindir}/zts-php-config %install +%if 0%{?rhel} == 6 +source /opt/rh/devtoolset-6/enable +g++ --version +%else %{?dtsenable} +%endif make -C NTS \ install INSTALL_ROOT=%{buildroot} @@ -312,6 +344,11 @@ cd ../ZTS %changelog +* Fri Aug 31 2018 Remi Collet - 4.1.0-1 +- update to 4.1.0 +- add dependency on brotli library (Fedora) +- open https://github.com/swoole/swoole-src/issues/1931 missing files + * Thu Aug 16 2018 Remi Collet - 4.0.4-2 - rebuild for 7.3.0beta2 new ABI diff --git a/socket.cc b/socket.cc new file mode 100644 index 0000000..7edb58f --- /dev/null +++ b/socket.cc @@ -0,0 +1,1441 @@ +#include "socket.h" +#include "context.h" +#include "async.h" +#include "buffer.h" + +#include +#include +#include + +using namespace swoole; +using namespace std; + +static int socket_onRead(swReactor *reactor, swEvent *event); +static int socket_onWrite(swReactor *reactor, swEvent *event); +static void socket_onTimeout(swTimer *timer, swTimer_node *tnode); +static void socket_onResolveCompleted(swAio_event *event); + +bool Socket::socks5_handshake() +{ + swSocks5 *ctx = socks5_proxy; + char *buf = ctx->buf; + int n; + + /** + * handshake + */ + swSocks5_pack(buf, socks5_proxy->username == NULL ? 0x00 : 0x02); + socks5_proxy->state = SW_SOCKS5_STATE_HANDSHAKE; + if (send(buf, 3) <= 0) + { + return false; + } + n = recv(buf, sizeof(ctx->buf)); + if (n <= 0) + { + return false; + } + uchar version = buf[0]; + uchar method = buf[1]; + if (version != SW_SOCKS5_VERSION_CODE) + { + swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_VERSION, "SOCKS version is not supported."); + return SW_ERR; + } + if (method != ctx->method) + { + swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_METHOD, "SOCKS authentication method not supported."); + return SW_ERR; + } + //authenticate request + if (method == SW_SOCKS5_METHOD_AUTH) + { + buf[0] = 0x01; + buf[1] = ctx->l_username; + + buf += 2; + memcpy(buf, ctx->username, ctx->l_username); + buf += ctx->l_username; + buf[0] = ctx->l_password; + memcpy(buf + 1, ctx->password, ctx->l_password); + + ctx->state = SW_SOCKS5_STATE_AUTH; + + if (send(ctx->buf, ctx->l_username + ctx->l_password + 3) < 0) + { + return false; + } + + n = recv(buf, sizeof(ctx->buf)); + if (n <= 0) + { + return false; + } + + uchar version = buf[0]; + uchar status = buf[1]; + if (version != 0x01) + { + swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_VERSION, "SOCKS version is not supported."); + return false; + } + if (status != 0) + { + swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_AUTH_FAILED, + "SOCKS username/password authentication failed."); + return false; + } + goto send_connect_request; + } + //send connect request + else + { + send_connect_request: buf[0] = SW_SOCKS5_VERSION_CODE; + buf[1] = 0x01; + buf[2] = 0x00; + + ctx->state = SW_SOCKS5_STATE_CONNECT; + + if (ctx->dns_tunnel) + { + buf[3] = 0x03; + buf[4] = ctx->l_target_host; + buf += 5; + memcpy(buf, ctx->target_host, ctx->l_target_host); + buf += ctx->l_target_host; + *(uint16_t *) buf = htons(ctx->target_port); + + if (send(ctx->buf, ctx->l_target_host + 7) < 0) + { + return false; + } + } + else + { + buf[3] = 0x01; + buf += 4; + *(uint32_t *) buf = htons(ctx->l_target_host); + buf += 4; + *(uint16_t *) buf = htons(ctx->target_port); + + if (send(ctx->buf, ctx->l_target_host + 7) < 0) + { + return false; + } + } + + /** + * response + */ + n = recv(buf, sizeof(ctx->buf)); + if (n <= 0) + { + return false; + } + + uchar version = buf[0]; + if (version != SW_SOCKS5_VERSION_CODE) + { + swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_UNSUPPORT_VERSION, "SOCKS version is not supported."); + return false; + } + uchar result = buf[1]; +#if 0 + uchar reg = buf[2]; + uchar type = buf[3]; + uint32_t ip = *(uint32_t *) (buf + 4); + uint16_t port = *(uint16_t *) (buf + 8); +#endif + if (result == 0) + { + ctx->state = SW_SOCKS5_STATE_READY; + } + else + { + swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SOCKS5_SERVER_ERROR, "Socks5 server error, reason :%s.", + swSocks5_strerror(result)); + } + return result; + } +} + +bool Socket::http_proxy_handshake() +{ +#ifdef SW_USE_OPENSSL + if (socket->ssl) + { + if (ssl_handshake() == false) + { + return false; + } + } + else + { + return true; + } +#else + return true; +#endif + + //CONNECT + int n = snprintf(http_proxy->buf, sizeof(http_proxy->buf), "CONNECT %*s:%d HTTP/1.1\r\n\r\n", + http_proxy->l_target_host, http_proxy->target_host, http_proxy->target_port); + if (send(http_proxy->buf, n) <= 0) + { + return false; + } + + n = recv(http_proxy->buf, sizeof(http_proxy->buf)); + if (n <= 0) + { + return false; + } + char *buf = http_proxy->buf; + int len = n; + int state = 0; + char *p = buf; + for (p = buf; p < buf + len; p++) + { + if (state == 0) + { + if (strncasecmp(p, "HTTP/1.1", 8) == 0 || strncasecmp(p, "HTTP/1.0", 8) == 0) + { + state = 1; + p += 8; + } + else + { + break; + } + } + else if (state == 1) + { + if (isspace(*p)) + { + continue; + } + else + { + if (strncasecmp(p, "200", 3) == 0) + { + state = 2; + p += 3; + } + else + { + break; + } + } + } + else if (state == 2) + { + if (isspace(*p)) + { + continue; + } + else + { + if (strncasecmp(p, "Connection established", sizeof("Connection established") - 1) == 0) + { + return true; + } + else + { + break; + } + } + } + } + return false; +} + +static inline int socket_connect(int fd, struct sockaddr *addr, socklen_t len) +{ + int retval; + while (1) + { + retval = ::connect(fd, addr, len); + if (retval < 0) + { + if (errno == EINTR) + { + continue; + } + } + break; + } + return retval; +} + +Socket::Socket(enum swSocket_type _type) +{ + type = _type; + switch (type) + { + case SW_SOCK_TCP6: + _sock_domain = AF_INET6; + _sock_type = SOCK_STREAM; + break; + case SW_SOCK_UNIX_STREAM: + _sock_domain = AF_UNIX; + _sock_type = SOCK_STREAM; + break; + case SW_SOCK_UDP: + _sock_domain = AF_INET; + _sock_type = SOCK_DGRAM; + break; + case SW_SOCK_UDP6: + _sock_domain = AF_INET6; + _sock_type = SOCK_DGRAM; + break; + case SW_SOCK_UNIX_DGRAM: + _sock_domain = AF_UNIX; + _sock_type = SOCK_DGRAM; + break; + case SW_SOCK_TCP: + default: + _sock_domain = AF_INET; + _sock_type = SOCK_STREAM; + break; + } + +#ifdef SOCK_CLOEXEC + int sockfd = ::socket(_sock_domain, _sock_type | SOCK_CLOEXEC, 0); +#else + int sockfd = ::socket(_sock_domain, _sock_type, 0); +#endif + if (sockfd < 0) + { + swWarn("socket() failed. Error: %s[%d]", strerror(errno), errno); + return; + } + + if (swIsMaster() && SwooleTG.type == SW_THREAD_REACTOR) + { + reactor = SwooleTG.reactor; + } + else + { + reactor = SwooleG.main_reactor; + } + socket = swReactor_get(reactor, sockfd); + + bzero(socket, sizeof(swConnection)); + socket->fd = sockfd; + socket->object = this; + socket->socket_type = type; + + swSetNonBlock(socket->fd); + if (!swReactor_handle_isset(reactor, SW_FD_CORO_SOCKET)) + { + reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_READ, socket_onRead); + reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_WRITE, socket_onWrite); + reactor->setHandle(reactor, SW_FD_CORO_SOCKET | SW_EVENT_ERROR, socket_onRead); + } + init(); +} + +Socket::Socket(int _fd, Socket *sock) +{ + reactor = sock->reactor; + + socket = swReactor_get(reactor, _fd); + bzero(socket, sizeof(swConnection)); + socket->fd = _fd; + socket->object = this; + socket->socket_type = sock->type; + + _sock_domain = sock->_sock_domain; + _sock_type = sock->_sock_type; + init(); +} + +bool Socket::connect(string host, int port, int flags) +{ + //enable socks5 proxy + if (socks5_proxy) + { + socks5_proxy->target_host = (char *) host.c_str(); + socks5_proxy->l_target_host = host.size(); + socks5_proxy->target_port = port; + + host = socks5_proxy->host; + port = socks5_proxy->port; + } + + //enable http proxy + if (http_proxy) + { + http_proxy->target_host = (char *) host.c_str(); + http_proxy->l_target_host = host.size(); + http_proxy->target_port = port; + + host = http_proxy->proxy_host; + port = http_proxy->proxy_port; + } + + if (_sock_domain == AF_INET6 || _sock_domain == AF_INET) + { + if (port == -1) + { + swWarn("Socket of type AF_INET/AF_INET6 requires port argument"); + return false; + } + else if (port == 0 || port >= 65536) + { + swWarn("Invalid port argument[%d]", port); + return false; + } + } + + if (unlikely(_cid && _cid != coroutine_get_current_cid())) + { + swWarn( "socket has already been bound to another coroutine."); + return false; + } + + int retval = 0; + _host = host; + _port = port; + + for (int i = 0; i < 2; i++) + { + if (_sock_domain == AF_INET) + { + socket->info.addr.inet_v4.sin_family = AF_INET; + socket->info.addr.inet_v4.sin_port = htons(port); + + if (!inet_pton(AF_INET, _host.c_str(), & socket->info.addr.inet_v4.sin_addr)) + { + _host = resolve(_host); + if (_host.size() == 0) + { + return false; + } + continue; + } + else + { + socket->info.len = sizeof( socket->info.addr.inet_v4); + retval = socket_connect(socket->fd, (struct sockaddr *) &socket->info.addr.inet_v4, socket->info.len); + break; + } + } + else if (_sock_domain == AF_INET6) + { + socket->info.addr.inet_v6.sin6_family = AF_INET6; + socket->info.addr.inet_v6.sin6_port = htons(port); + + if (!inet_pton(AF_INET6, _host.c_str(), &socket->info.addr.inet_v6.sin6_addr)) + { + _host = resolve(_host); + if (_host.size() == 0) + { + return false; + } + continue; + } + else + { + socket->info.len = sizeof(socket->info.addr.inet_v6); + retval = socket_connect(socket->fd, (struct sockaddr *) &socket->info.addr.inet_v6, socket->info.len); + break; + } + } + else if (_sock_domain == AF_UNIX) + { + if (_host.size() >= sizeof(socket->info.addr.un.sun_path)) + { + return false; + } + socket->info.addr.un.sun_family = AF_UNIX; + memcpy(&socket->info.addr.un.sun_path, _host.c_str(), _host.size()); + retval = socket_connect(socket->fd, (struct sockaddr *) &socket->info.addr.un, + (socklen_t) (offsetof(struct sockaddr_un, sun_path) + _host.size())); + break; + } + else + { + return false; + } + } + + if (retval == -1) + { + if (errno != EINPROGRESS) + { + _error: errCode = errno; + return false; + } + if (!wait_events(SW_EVENT_WRITE)) + { + goto _error; + } + yield(); + //Connection has timed out + if (errCode == ETIMEDOUT) + { + errMsg = strerror(errCode); + return false; + } + socklen_t len = sizeof(errCode); + if (getsockopt(socket->fd, SOL_SOCKET, SO_ERROR, &errCode, &len) < 0 || errCode != 0) + { + errMsg = strerror(errCode); + return false; + } + } + socket->active = 1; + //socks5 proxy + if (socks5_proxy && socks5_handshake() == false) + { + return false; + } + //http proxy + if (http_proxy && http_proxy_handshake() == false) + { + return false; + } + return true; +} + +static void socket_onResolveCompleted(swAio_event *event) +{ + Socket *sock = (Socket *) event->object; + if (event->error != 0) + { + sock->errCode = SW_ERROR_DNSLOOKUP_RESOLVE_FAILED; + } + sock->resume(); +} + +static void socket_onTimeout(swTimer *timer, swTimer_node *tnode) +{ + Socket *sock = (Socket *) tnode->data; + sock->timer = NULL; + sock->errCode = ETIMEDOUT; + swDebug("socket[%d] timeout", sock->socket->fd); + sock->reactor->del(sock->reactor, sock->socket->fd); + sock->resume(); +} + +static int socket_onRead(swReactor *reactor, swEvent *event) +{ + Socket *sock = (Socket *) event->socket->object; + reactor->del(reactor, event->fd); + sock->resume(); + return SW_OK; +} + +static int socket_onWrite(swReactor *reactor, swEvent *event) +{ + Socket *sock = (Socket *) event->socket->object; + reactor->del(reactor, event->fd); + sock->resume(); + return SW_OK; +} + +ssize_t Socket::peek(void *__buf, size_t __n) +{ + return swConnection_peek(socket, __buf, __n, 0); +} + +ssize_t Socket::recv(void *__buf, size_t __n) +{ + ssize_t retval = swConnection_recv(socket, __buf, __n, 0); + if (retval >= 0) + { + return retval; + } + + if (swConnection_error(errno) != SW_WAIT) + { + errCode = errno; + return -1; + } + + while (true) + { + int events = SW_EVENT_READ; +#ifdef SW_USE_OPENSSL + if (socket->ssl && socket->ssl_want_write) + { + events = SW_EVENT_WRITE; + } +#endif + if (!wait_events(events)) + { + return -1; + } + yield(); + if (errCode == ETIMEDOUT) + { + return -1; + } + retval = swConnection_recv(socket, __buf, __n, 0); + if (retval < 0) + { + if (swConnection_error(errno) == SW_WAIT) + { + continue; + } + errCode = errno; + } + break; + } + return retval; +} + +ssize_t Socket::recv_all(void *__buf, size_t __n) +{ + ssize_t retval, total_bytes = 0; + while (true) + { + retval = recv((char*) __buf + total_bytes, __n - total_bytes); + if (retval <= 0) + { + if (total_bytes == 0) + { + total_bytes = retval; + } + break; + } + total_bytes += retval; + if ((size_t) total_bytes == __n) + { + break; + } + } + return total_bytes; +} + +ssize_t Socket::send_all(const void *__buf, size_t __n) +{ + ssize_t retval, total_bytes = 0; + while (true) + { + retval = send((char*) __buf + total_bytes, __n - total_bytes); + if (retval <= 0) + { + if (total_bytes == 0) + { + total_bytes = retval; + } + break; + } + total_bytes += retval; + if ((size_t) total_bytes == __n) + { + break; + } + } + return total_bytes; +} + +ssize_t Socket::send(const void *__buf, size_t __n) +{ + ssize_t retval = swConnection_send(socket, (void *) __buf, __n, 0); + if (retval >= 0) + { + return retval; + } + + if (swConnection_error(errno) != SW_WAIT) + { + errCode = errno; + return -1; + } + + while (true) + { + int events = SW_EVENT_WRITE; +#ifdef SW_USE_OPENSSL + if (socket->ssl && socket->ssl_want_read) + { + events = SW_EVENT_READ; + } +#endif + if (!wait_events(events)) + { + return -1; + } + yield(); + if (errCode == ETIMEDOUT) + { + return -1; + } + ssize_t retval = swConnection_send(socket, (void *) __buf, __n, 0); + if (retval < 0) + { + if (swConnection_error(errno) == SW_WAIT) + { + continue; + } + errCode = errno; + } + break; + } + + return retval; +} + +void Socket::yield() +{ + if (suspending) + { + swError("socket has already been bound to another coroutine."); + } + errCode = 0; + if (_timeout > 0) + { + int ms = (int) (_timeout * 1000); + if (SwooleG.timer.fd == 0) + { + swTimer_init(ms); + } + timer = SwooleG.timer.add(&SwooleG.timer, ms, 0, this, socket_onTimeout); + } + _cid = coroutine_get_current_cid(); + if (_cid == -1) + { + swError("Socket::yield() must be called in the coroutine."); + } + //suspend + suspending = true; + coroutine_yield(coroutine_get_by_id(_cid)); + suspending = false; + //clear timer + if (timer) + { + swTimer_del(&SwooleG.timer, timer); + timer = nullptr; + } +} + +void Socket::resume() +{ + coroutine_resume(coroutine_get_by_id(_cid)); +} + +bool Socket::bind(std::string address, int port) +{ + bind_address = address; + bind_port = port; + + struct sockaddr_storage sa_storage = { 0 }; + struct sockaddr *sock_type = (struct sockaddr*) &sa_storage; + + int retval; + switch (_sock_domain) + { + case AF_UNIX: + { + struct sockaddr_un *sa = (struct sockaddr_un *) sock_type; + sa->sun_family = AF_UNIX; + + if (bind_address.size() >= sizeof(sa->sun_path)) + { + return false; + } + memcpy(&sa->sun_path, bind_address.c_str(), bind_address.size()); + + retval = ::bind(socket->fd, (struct sockaddr *) sa, + offsetof(struct sockaddr_un, sun_path) + bind_address.size()); + break; + } + + case AF_INET: + { + struct sockaddr_in *sa = (struct sockaddr_in *) sock_type; + sa->sin_family = AF_INET; + sa->sin_port = htons((unsigned short) bind_port); + if (!inet_aton(bind_address.c_str(), &sa->sin_addr)) + { + return false; + } + retval = ::bind(socket->fd, (struct sockaddr *) sa, sizeof(struct sockaddr_in)); + break; + } + + case AF_INET6: + { + struct sockaddr_in6 *sa = (struct sockaddr_in6 *) sock_type; + sa->sin6_family = AF_INET6; + sa->sin6_port = htons((unsigned short) bind_port); + + if (!inet_pton(AF_INET6, bind_address.c_str(), &sa->sin6_addr)) + { + return false; + } + retval = ::bind(socket->fd, (struct sockaddr *) sa, sizeof(struct sockaddr_in6)); + break; + } + default: + return false; + } + + if (retval != 0) + { + errCode = errno; + return false; + } + + return true; +} + +bool Socket::listen(int backlog) +{ + _backlog = backlog; + if (::listen(socket->fd, backlog) != 0) + { + errCode = errno; + return false; + } + return true; +} + +Socket* Socket::accept() +{ + if (!wait_events(SW_EVENT_READ)) + { + return nullptr; + } + yield(); + if (errCode == ETIMEDOUT) + { + return nullptr; + } + int conn; + swSocketAddress client_addr; + socklen_t client_addrlen = sizeof(client_addr); + +#ifdef HAVE_ACCEPT4 + conn = ::accept4(socket->fd, (struct sockaddr *) &client_addr, &client_addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC); +#else + conn = ::accept(socket->fd, (struct sockaddr *) &client_addr, &client_addrlen); + if (conn >= 0) + { + swoole_fcntl_set_option(conn, 1, 1); + } +#endif + if (conn >= 0) + { + return new Socket(conn, this); + } + else + { + errCode = errno; + return nullptr; + } +} + +string Socket::resolve(string domain_name) +{ + swAio_event ev; + bzero(&ev, sizeof(swAio_event)); + ev.nbytes = SW_IP_MAX_LENGTH; + ev.buf = sw_malloc(ev.nbytes); + if (!ev.buf) + { + errCode = errno; + return ""; + } + + memcpy(ev.buf, domain_name.c_str(), domain_name.size()); + ((char *) ev.buf)[domain_name.size()] = 0; + ev.flags = _sock_domain; + ev.type = SW_AIO_GETHOSTBYNAME; + ev.object = this; + ev.callback = socket_onResolveCompleted; + + if (SwooleAIO.init == 0) + { + swAio_init(); + } + + if (swAio_dispatch(&ev) < 0) + { + errCode = SwooleG.error; + sw_free(ev.buf); + return ""; + } + /** + * cannot timeout + */ + double tmp_timeout = _timeout; + _timeout = -1; + yield(); + _timeout = tmp_timeout; + + if (errCode == SW_ERROR_DNSLOOKUP_RESOLVE_FAILED) + { + errMsg = hstrerror(ev.error); + return ""; + } + else + { + string addr((char *) ev.buf); + sw_free(ev.buf); + return addr; + } +} + +bool Socket::shutdown(int __how) +{ + if (!socket || socket->closed) + { + return false; + } + if (__how == SHUT_RD) + { + if (shutdown_read || shutdow_rw || ::shutdown(socket->fd, SHUT_RD)) + { + return false; + } + else + { + shutdown_read = 1; + return true; + } + } + else if (__how == SHUT_WR) + { + if (shutdown_write || shutdow_rw || ::shutdown(socket->fd, SHUT_RD) < 0) + { + return false; + } + else + { + shutdown_write = 1; + return true; + } + } + else if (__how == SHUT_RDWR) + { + if (shutdow_rw || ::shutdown(socket->fd, SHUT_RDWR) < 0) + { + return false; + } + else + { + shutdown_read = 1; + return true; + } + } + else + { + return false; + } +} + +bool Socket::close() +{ + if (socket == NULL || socket->closed) + { + return false; + } + socket->closed = 1; + + int fd = socket->fd; + + if (_sock_type == SW_SOCK_UNIX_DGRAM) + { + unlink(socket->info.addr.un.sun_path); + } + +#ifdef SW_USE_OPENSSL + if (open_ssl && ssl_context) + { + if (socket->ssl) + { + swSSL_close(socket); + } + swSSL_free_context(ssl_context); + if (ssl_option.cert_file) + { + sw_free(ssl_option.cert_file); + } + if (ssl_option.key_file) + { + sw_free(ssl_option.key_file); + } + if (ssl_option.passphrase) + { + sw_free(ssl_option.passphrase); + } +#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME + if (ssl_option.tls_host_name) + { + sw_free(ssl_option.tls_host_name); + } +#endif + if (ssl_option.cafile) + { + sw_free(ssl_option.cafile); + } + if (ssl_option.capath) + { + sw_free(ssl_option.capath); + } + } +#endif + if (_sock_type == SW_SOCK_UNIX_DGRAM) + { + unlink(socket->info.addr.un.sun_path); + } + if (timer) + { + swTimer_del(&SwooleG.timer, timer); + timer = NULL; + } + socket->active = 0; + ::close(fd); + return true; +} + +#ifdef SW_USE_OPENSSL +bool Socket::ssl_handshake() +{ + if (socket->ssl) + { + return false; + } + + ssl_context = swSSL_get_context(&ssl_option); + if (ssl_context == NULL) + { + return false; + } + + if (ssl_option.verify_peer) + { + if (swSSL_set_capath(&ssl_option, ssl_context) < 0) + { + return false; + } + } + + socket->ssl_send = 1; +#if defined(SW_USE_HTTP2) && defined(SW_USE_OPENSSL) && OPENSSL_VERSION_NUMBER >= 0x10002000L + if (http2) + { + if (SSL_CTX_set_alpn_protos(ssl_context, (const unsigned char *) "\x02h2", 3) < 0) + { + return false; + } + } +#endif + + if (swSSL_create(socket, ssl_context, SW_SSL_CLIENT) < 0) + { + return false; + } +#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME + if (ssl_option.tls_host_name) + { + SSL_set_tlsext_host_name(socket->ssl, ssl_option.tls_host_name); + } +#endif + + while (true) + { + int retval = swSSL_connect(socket); + if (retval < 0) + { + errCode = SwooleG.error; + return false; + } + if (socket->ssl_state == SW_SSL_STATE_WAIT_STREAM) + { + int events = socket->ssl_want_write ? SW_EVENT_WRITE : SW_EVENT_READ; + if (!wait_events(events)) + { + return false; + } + yield(); + if (errCode == ETIMEDOUT) + { + return false; + } + } + else if (socket->ssl_state == SW_SSL_STATE_READY) + { + return true; + } + } + + if (socket->ssl_state == SW_SSL_STATE_READY && ssl_option.verify_peer) + { + if (ssl_verify(ssl_option.allow_self_signed) < 0) + { + return false; + } + } + return true; +} + +int Socket::ssl_verify(bool allow_self_signed) +{ + if (swSSL_verify(socket, allow_self_signed) < 0) + { + return SW_ERR; + } + if (ssl_option.tls_host_name && swSSL_check_host(socket, ssl_option.tls_host_name) < 0) + { + return SW_ERR; + } + return SW_OK; +} +#endif + +bool Socket::sendfile(char *filename, off_t offset, size_t length) +{ + int file_fd = open(filename, O_RDONLY); + if (file_fd < 0) + { + swSysError("open(%s) failed.", filename); + return false; + } + + if (length == 0) + { + struct stat file_stat; + if (::fstat(file_fd, &file_stat) < 0) + { + swSysError("fstat(%s) failed.", filename); + ::close(file_fd); + return false; + } + length = file_stat.st_size; + } + else + { + // total length of the file + length = offset + length; + } + + int n, sendn; + while ((size_t) offset < length) + { + sendn = (length - offset > SW_SENDFILE_CHUNK_SIZE) ? SW_SENDFILE_CHUNK_SIZE : length - offset; +#ifdef SW_USE_OPENSSL + if (socket->ssl) + { + n = swSSL_sendfile(socket, file_fd, &offset, sendn); + } + else +#endif + { + n = ::swoole_sendfile(socket->fd, file_fd, &offset, sendn); + } + if (n > 0) + { + continue; + } + else if (n == 0) + { + swWarn("sendfile return zero."); + return false; + } + else if (errno != EAGAIN) + { + swSysError("sendfile(%d, %s) failed.", socket->fd, filename); + _error: errCode = errno; + ::close(file_fd); + return false; + } + if (!wait_events(SW_EVENT_WRITE)) + { + goto _error; + } + yield(); + if (errCode == ETIMEDOUT) + { + goto _error; + } + } + ::close(file_fd); + return true; +} + +int Socket::sendto(char *address, int port, char *data, int len) +{ + if (type == SW_SOCK_UDP) + { + return swSocket_udp_sendto(socket->fd, address, port, data, len); + } + else if (type == SW_SOCK_UDP6) + { + return swSocket_udp_sendto6(socket->fd, address, port, data, len); + } + else + { + swWarn("only supports SWOOLE_SOCK_UDP or SWOOLE_SOCK_UDP6."); + return -1; + } +} + +int Socket::recvfrom(void *__buf, size_t __n, char *address, int *port) +{ + socket->info.len = sizeof(socket->info.addr); + int retval; + + _recv: retval = ::recvfrom(socket->fd, __buf, __n, 0, (struct sockaddr *) &socket->info.addr, &socket->info.len); + if (retval < 0) + { + if (errno == EINTR) + { + goto _recv; + } + else if (swConnection_error(errno) == SW_WAIT) + { + if (!wait_events(SW_EVENT_READ)) + { + return -1; + } + yield(); + if (errCode == ETIMEDOUT) + { + return -1; + } + retval = ::recvfrom(socket->fd, __buf, __n, 0, (struct sockaddr *) &socket->info.addr, &socket->info.len); + if (retval < 0) + { + errCode = errno; + } + else + { + strcpy(address, swConnection_get_ip(socket)); + *port = swConnection_get_port(socket); + } + } + else + { + errCode = errno; + } + } + return retval; +} + +/** + * recv packet with protocol + */ +ssize_t Socket::recv_packet() +{ + get_buffer(); + ssize_t buf_len = SW_BUFFER_SIZE_STD; + ssize_t retval; + + if (open_length_check) + { + uint32_t header_len; + + _get_header_len: header_len = protocol.package_length_offset + protocol.package_length_size; + if (buffer->length > 0) + { + if (buffer->length < header_len) + { + goto _recv_header; + } + else + { + goto _get_length; + } + } + + _recv_header: retval = recv(buffer->str + buffer->length, header_len - buffer->length); + if (retval <= 0) + { + return 0; + } + else if (retval < 0 || retval != header_len) + { + return 0; + } + else + { + buffer->length += retval; + } + + _get_length: buf_len = protocol.get_package_length(&protocol, socket, buffer->str, (uint32_t) buffer->length); + swDebug("packet_len=%ld, length=%ld", buf_len, buffer->length); + //error package + if (buf_len < 0) + { + return 0; + } + else if (buf_len == 0) + { + header_len = protocol.real_header_length; + goto _recv_header; + } + //empty package + else if (buf_len == header_len) + { + buffer->length = 0; + return header_len; + } + else if (buf_len > protocol.package_max_length) + { + swoole_error_log(SW_LOG_WARNING, SW_ERROR_PACKAGE_LENGTH_TOO_LARGE, "packet[length=%d] is too big.", (int )buf_len); + return 0; + } + + if ((size_t) buf_len == buffer->length) + { + buffer->length = 0; + return buf_len; + } + else if ((size_t) buf_len < buffer->length) + { + buffer->length = buffer->length - buf_len; + memmove(buffer->str, buffer->str + buf_len, buffer->length); + goto _get_header_len; + } + + if ((size_t) buf_len >= buffer->size) + { + if (swString_extend(buffer, buf_len) < 0) + { + buffer->length = 0; + return -1; + } + } + + retval = recv_all(buffer->str + buffer->length, buf_len - buffer->length); + if (retval > 0) + { + buffer->length += retval; + if (buffer->length != (size_t) buf_len) + { + retval = 0; + } + else + { + buffer->length = 0; + return buf_len; + } + } + } + else if (open_eof_check) + { + int eof = -1; + char *buf; + + if (buffer->length > 0) + { + goto find_eof; + } + + while (1) + { + buf = buffer->str + buffer->length; + buf_len = buffer->size - buffer->length; + + if (buf_len > SW_BUFFER_SIZE_BIG) + { + buf_len = SW_BUFFER_SIZE_BIG; + } + + retval = recv(buf, buf_len); + if (retval < 0) + { + buffer->length = 0; + return -1; + } + else if (retval == 0) + { + buffer->length = 0; + return 0; + } + + buffer->length += retval; + + if (buffer->length < protocol.package_eof_len) + { + continue; + } + + find_eof: eof = swoole_strnpos(buffer->str, buffer->length, protocol.package_eof, protocol.package_eof_len); + if (eof >= 0) + { + eof += protocol.package_eof_len; + if (buffer->length > (uint32_t) eof) + { + buffer->length -= eof; + memmove(buffer->str, buffer->str + eof, buffer->length); + } + else + { + buffer->length = 0; + } + return eof; + } + else + { + if (buffer->length == protocol.package_max_length) + { + swWarn("no package eof"); + buffer->length = 0; + return -1; + } + else if (buffer->length == buffer->size) + { + if (buffer->size < protocol.package_max_length) + { + size_t new_size = buffer->size * 2; + if (new_size > protocol.package_max_length) + { + new_size = protocol.package_max_length; + } + if (swString_extend(buffer, new_size) < 0) + { + buffer->length = 0; + return -1; + } + } + } + } + } + buffer->length = 0; + } + else + { + return -1; + } + + return retval; +} + +swString* Socket::get_buffer() +{ + if (unlikely(buffer == nullptr)) + { + buffer = swString_new(SW_BUFFER_SIZE_STD); + } + return buffer; +} + +Socket::~Socket() +{ + if (!socket->closed) + { + close(); + } + if (socket->out_buffer) + { + swBuffer_free(socket->out_buffer); + socket->out_buffer = NULL; + } + if (socket->in_buffer) + { + swBuffer_free(socket->in_buffer); + socket->in_buffer = NULL; + } + if (buffer) + { + swString_free(buffer); + } + bzero(socket, sizeof(swConnection)); + socket->removed = 1; +} diff --git a/socket.h b/socket.h new file mode 100644 index 0000000..7a3d039 --- /dev/null +++ b/socket.h @@ -0,0 +1,142 @@ +#pragma once + +#include "swoole.h" +#include "connection.h" +#include "socks5.h" +#include + +namespace swoole +{ +class Socket +{ +public: + Socket(enum swSocket_type type); + Socket(int _fd, Socket *sock); + ~Socket(); + bool connect(std::string host, int port, int flags = 0); + bool shutdown(int how); + bool close(); + ssize_t send(const void *__buf, size_t __n); + ssize_t peek(void *__buf, size_t __n); + ssize_t recv(void *__buf, size_t __n); + ssize_t recv_all(void *__buf, size_t __n); + ssize_t send_all(const void *__buf, size_t __n); + ssize_t recv_packet(); + Socket* accept(); + void resume(); + void yield(); + bool bind(std::string address, int port = 0); + std::string resolve(std::string host); + bool listen(int backlog = 0); + bool sendfile(char *filename, off_t offset, size_t length); + int sendto(char *address, int port, char *data, int len); + int recvfrom(void *__buf, size_t __n, char *address, int *port = nullptr); + swString* get_buffer(); + + void setTimeout(double timeout) + { + _timeout = timeout; + } + +#ifdef SW_USE_OPENSSL + bool ssl_handshake(); + int ssl_verify(bool allow_self_signed); +#endif + +protected: + inline void init() + { + _cid = 0; + suspending = false; + _timeout = 0; + _port = 0; + errCode = 0; + errMsg = nullptr; + timer = nullptr; + bind_port = 0; + _backlog = 0; + + http2 = 0; + shutdow_rw = 0; + shutdown_read = 0; + shutdown_write = 0; + open_length_check = 0; + open_eof_check = 0; + + socks5_proxy = nullptr; + http_proxy = nullptr; + + buffer = nullptr; + protocol = {0}; + + protocol.package_length_type = 'N'; + protocol.package_length_size = 4; + protocol.package_body_offset = 0; + protocol.package_max_length = SW_BUFFER_INPUT_SIZE; + +#ifdef SW_USE_OPENSSL + open_ssl = 0; + ssl_wait_handshake = 0; + ssl_context = NULL; + ssl_option = {0}; +#endif + } + + inline bool wait_events(int events) + { + if (reactor->add(reactor, socket->fd, SW_FD_CORO_SOCKET | events) < 0) + { + errCode = errno; + return false; + } + else + { + return true; + } + } + + bool socks5_handshake(); + bool http_proxy_handshake(); + +public: + swTimer_node *timer; + swReactor *reactor; + std::string _host; + std::string bind_address; + int bind_port; + int _port; + int _cid; + bool suspending; + swConnection *socket; + enum swSocket_type type; + int _sock_type; + int _sock_domain; + double _timeout; + int _backlog; + int errCode; + const char *errMsg; + uint32_t http2 :1; + uint32_t shutdow_rw :1; + uint32_t shutdown_read :1; + uint32_t shutdown_write :1; + /** + * one package: length check + */ + uint32_t open_length_check :1; + uint32_t open_eof_check :1; + + swProtocol protocol; + swString *buffer; + + struct _swSocks5 *socks5_proxy; + struct _http_proxy* http_proxy; + +#ifdef SW_USE_OPENSSL + uint8_t open_ssl :1; + uint8_t ssl_wait_handshake :1; + SSL_CTX *ssl_context; + swSSL_option ssl_option; +#endif +}; + +}; -- cgit