操作大量http协议的tcp连接,截取需要的数据并定义相关逻辑。可以做连接池,也可以做监听等任意自定义逻辑。
使用workerman实现,本实例实现的是根据host限制到后端的并发请求数(不是连接数!),超出的请求排队,上下游可以任意长连接货、短连接。
协议部分如下:
<?php namespace Protocols; use Workerman\Connection\TcpConnection; class HttpProxy { private static $config = array(); public static $codes = array( 100 => 'Continue', 101 => 'Switching Protocols', 200 => 'OK', 201 => 'Created', 202 => 'Accepted', 203 => 'Non-Authoritative Information', 204 => 'No Content', 205 => 'Reset Content', 206 => 'Partial Content', 300 => 'Multiple Choices', 301 => 'Moved Permanently', 302 => 'Found', 303 => 'See Other', 304 => 'Not Modified', 305 => 'Use Proxy', 306 => '(Unused)', 307 => 'Temporary Redirect', 400 => 'Bad Request', 401 => 'Unauthorized', 402 => 'Payment Required', 403 => 'Forbidden', 404 => 'Not Found', 405 => 'Method Not Allowed', 406 => 'Not Acceptable', 407 => 'Proxy Authentication Required', 408 => 'Request Timeout', 409 => 'Conflict', 410 => 'Gone', 411 => 'Length Required', 412 => 'Precondition Failed', 413 => 'Request Entity Too Large', 414 => 'Request-URI Too Long', 415 => 'Unsupported Media Type', 416 => 'Requested Range Not Satisfiable', 417 => 'Expectation Failed', 422 => 'Unprocessable Entity', 423 => 'Locked', 500 => 'Internal Server Error', 501 => 'Not Implemented', 502 => 'Bad Gateway', 503 => 'Service Unavailable', 504 => 'Gateway Timeout', 505 => 'HTTP Version Not Supported' ); /** * * @param array $config */ public static function setNameConfig(array $config) { self::$config = $config; } /** * Check the integrity of the package. * * @param string $recv_buffer * @param TcpConnection $connection * @return int */ public static function input($recv_buffer, TcpConnection $connection) { if (! isset($connection->httpRequest)) { if (! strpos($recv_buffer, "\r\n\r\n")) { return 0; } list ($header) = explode("\r\n\r\n", $recv_buffer, 2); $pos = strpos($header, ' '); $method = substr($header, 0, $pos); $httpRequest = array(); $start = strpos($recv_buffer, "Host:") + 5; $end = strpos($recv_buffer, "\r\n", $start); $name = explode(':', trim(substr($recv_buffer, $start, $end - $start)))[0]; $httpRequest['name'] = $name; $httpRequest['sizeTotal'] = self::getRequestSize($header, $method); if (! isset($httpRequest['sizeTotal'])) { self::responseError($connection, 411); return 0; } $httpRequest['size'] = 0; $connection->httpRequest = $httpRequest; $running = &$connection->worker->connectionsRunning; $paused = &$connection->worker->connectionsPaused; if (! isset($running[$name])) { $running[$name] = array(); } if (! isset($paused[$name])) { $paused[$name] = array(); } $limit = array_key_exists($name, self::$config) ? self::$config[$name] : 0; $count = count($running); if ($count < 0) { user_error('running count is less than zero', E_USER_WARNING); } if (null !== $limit && $count >= $limit) { $paused[$name][] = $connection->id; $connection->pauseRecv(); return 0; } else { $running[$name][] = $connection->id; } } // 请求头已经正确解析完毕 return strlen($recv_buffer); } /** * * @param TcpConnection $connection * @param int $code * @param string $msg * @param bool $close */ protected static function responseError(TcpConnection $connection, $code, $msg = null, $close = null) { if (! isset($msg)) { if (! isset(self::$codes[$code])) { user_error('http code is invalid, code=' . $code, E_USER_WARNING); return; } else { $msg = self::$codes[$code]; } } $connection->consumeRecvBuffer($connection->getRecvBufferQueueSize()); $connection->send( "HTTP/1.1 $code $msg\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n", true); if (isset($close) && true === $close) { $connection->close(); } } /** * Get whole size of the request * includes the request headers and request body. * * @param string $header * The request headers * @param string $method * The request method * @return integer */ protected static function getRequestSize($header, $method) { if ($method === 'GET' || $method === 'OPTIONS' || $method === 'HEAD') { return strlen($header) + 4; } $match = array(); if (preg_match("/\r\nContent-Length: ?(\d+)/i", $header, $match)) { $content_length = isset($match[1]) ? $match[1] : 0; return $content_length + strlen($header) + 4; } } /** * * @param string $recv_buffer * @param TcpConnection $connection * @return string */ public static function decode($recv_buffer, TcpConnection $connection) { $httpRequest = &$connection->httpRequest; $httpRequest['size'] += strlen($recv_buffer); // 当前http请求已经全部收到 if ($httpRequest['sizeTotal'] === $httpRequest['size']) { $connection->pauseRecv(); } return $recv_buffer; } /** * * @param string $content * @param TcpConnection $connection * @return string */ public static function encode($content, TcpConnection $connection) { if (! isset($connection->httpResponse)) { $connection->httpResponse = array(); } $httpResponse = &$connection->httpResponse; if (! isset($httpResponse['content'])) { $httpResponse['content'] = ''; } $content = $httpResponse['content'] . $content; // 解析返回的HTTP头 if (! array_key_exists('type', $httpResponse)) { if (! strpos($content, "\r\n\r\n")) { if (strlen($content) > $connection->maxSendBufferSize) { self::responseError($connection, 502); self::responseFinish($connection); return ''; } $httpResponse['content'] = $content; return ''; } list ($header) = explode("\r\n\r\n", $content, 2); $sizeHeader = strlen($header) + 4; if (preg_match("/\r\nContent-Length: ?(\d+)/i", $header, $matchLength)) { $httpResponse['type'] = 0; $httpResponse['sizeTotal'] = (isset($matchLength[1]) ? $matchLength[1] : 0) + $sizeHeader; } elseif (preg_match("/\r\nTransfer-Encoding: ?chunked/i", $header)) { $httpResponse['type'] = 1; $httpResponse['sizeHeader'] = $sizeHeader; } elseif ('100' === substr($header, 9, 3)) { $httpResponse['type'] = 3; } else { $httpResponse['type'] = null; } $httpResponse['size'] = 0; } if ($httpResponse['type'] === 0) { $httpResponse['size'] += strlen($content); if ($httpResponse['size'] === $httpResponse['sizeTotal']) { self::responseFinish($connection); } return $content; } elseif ($httpResponse['type'] === 1) { $length = 0; if (isset($httpResponse['sizeHeader'])) { $header = substr($content, 0, $httpResponse['sizeHeader']); $body = substr($content, $httpResponse['sizeHeader']); unset($httpResponse['sizeHeader']); } else { $header = ''; $body = $content; } $bodyLength = strlen($body); while (true) { $pos = strpos($body, "\r\n", $length); if (false === $pos) { break; } $chunkLength = substr($body, $length, $pos - $length); $chunkLengthTotal = strlen($chunkLength) + 2 + hexdec($chunkLength) + 2; if ('0' === $chunkLength && "\r\n\r\n" === substr($body, - 4)) { self::responseFinish($connection); } $totalLength = $length + $chunkLengthTotal; if ($totalLength <= $bodyLength) { $length += $chunkLengthTotal; } } $httpResponse['content'] = substr($body, $length); return $header . substr($body, 0, $length); } elseif ($httpResponse['type'] === 3) { return $content; } else { self::responseError($connection, 503); self::responseFinish($connection); return ''; } } /** * * @param TcpConnection $connection * @param bool $close */ public static function responseFinish(TcpConnection $connection, $close = false) { if (isset($connection->httpRequest)) { $request = $connection->httpRequest; $name = $request['name']; $connectionRunning = &$connection->worker->connectionsRunning; $connectionPaused = &$connection->worker->connectionsPaused; // 清理状态并使当前连接变成自由连接 $keyRunning = array_search($connection->id, $connectionRunning[$name]); unset($connectionRunning[$name][$keyRunning]); if (0 === count($connectionRunning[$name])) { unset($connectionRunning[$name]); } unset($connection->httpRequest, $connection->httpResponse); if (false === $close) { $connection->resumeRecv(); } // 取一个暂停的连接 if (isset($connectionPaused[$name]) && 0 < count($connectionPaused[$name])) { $worker = $connection->worker; $connectionNext = $worker->connections[array_shift( $connectionPaused[$name])]; if (0 === count($connectionPaused[$name])) { unset($connectionPaused[$name]); } $connectionNext->resumeRecv(); $connectionRunning[$name][] = $connectionNext->id; } } } }
启动代码:
<?php use Workerman\Worker; use Workerman\Connection\TcpConnection; use Ares333\YafLib\Helper\Error; use Ares333\YafLib\Helper\System; use Ares333\YafLib\Tool\Functions; use Workerman\Connection\AsyncTcpConnection; use Protocols\HttpProxy; $dirLogs = __DIR__ . '/../logs'; error_reporting(E_ALL); ini_set('display_errors', true); ini_set('memory_limit', '1024m'); ini_set('error_log', $dirLogs . '/php_error.log'); Error::error2exception(); new Functions(); global $argv; Worker::$logFile = $dirLogs . '/worker.log'; Worker::$stdoutFile = $dirLogs . '/out.log'; Worker::$pidFile = $dirLogs . '/worker.pid'; $sock = '/var/run/workerman-api.sock'; $worker = new Worker('httpProxy://' . $sock); if (array_key_exists(1, $argv) && ($argv[1] === 'start' || $argv[1] === 'restart')) { if (file_exists($sock)) { unlink($sock); } } $worker->user = 'apache'; $worker->group = 'apache'; $worker->transport = 'unix'; $worker->count = System::cpuNum(); $worker->reloadable = false; $worker->connectionsRunning = array(); // 排队的连接 $worker->connectionsPaused = array(); $worker->onWorkerReload = function () {}; $worker->onWorkerStart = function () { HttpProxy::setNameConfig( [ 'test1' => 1000, 'test' => 1 ]); }; $worker->onMessage = function (TcpConnection $connection, $buffer) { // 上下游连接一一对应,完全规避处理KeepAlive的逻辑 if (! isset($connection->connectionProxyUpstream)) { $connectionUpstream = new AsyncTcpConnection( 'unix:///var/run/nginx.sock'); $connectionUpstream->connectionProxy = $connection; $connectionUpstream->onMessage = function ( TcpConnection $connectionUpstream, $buffer) { $connectionUpstream->connectionProxy->send($buffer); }; $connectionUpstream->onClose = function ( TcpConnection $connectionUpstream) { HttpProxy::responseFinish($connectionUpstream->connectionProxy); unset($connectionUpstream->connectionProxy); }; $connectionUpstream->onError = function ( TcpConnection $connectionUpstream, $code, $msg) { user_error($code . ': ' . $msg, E_USER_WARNING); }; $connectionUpstream->onBufferFull = [ $connection, 'pauseRecv' ]; $connectionUpstream->onBufferDrain = [ $connection, 'resumeRecv' ]; $connectionUpstream->connect(); $connection->connectionProxyUpstream = $connectionUpstream; } else { $connectionUpstream = $connection->connectionProxyUpstream; } $connectionUpstream->send($buffer, true); }; $worker->onClose = function (TcpConnection $connection) { // 清理状态信息 HttpProxy::responseFinish($connection, true); // 关闭关联连接 if (isset($connection->connectionProxyUpstream)) { $connection->connectionProxyUpstream->close(); unset($connection->connectionProxyUpstream); } }; $worker->onError = function ($connection, $code, $msg) { if ($msg !== 'client closed') { user_error($code . ': ' . $msg, E_USER_WARNING); } }; $worker->onBufferFull = function (TcpConnection $connection) { $connection->connectionProxyUpstream->pauseRecv(); }; $worker->onBufferDrain = function (TcpConnection $connection) { $connection->connectionProxyUpstream->resumeRecv(); }; Worker::runAll();
单进程下完美运行,多进程、多机情况下需要做一个异步tcp中控操作running和paused连接。为了更优化paused连接需要用Timer做超时控制。