HTTP反向代理,实现并发请求排队

操作大量http协议的tcp连接,截取需要的数据并定义相关逻辑。可以做连接池,也可以做监听等任意自定义逻辑。

使用workerman实现,本实例实现的是根据host限制到后端的并发请求数(不是连接数!),超出的请求排队,上下游可以任意长连接货、短连接。

协议部分如下:

<?php
namespace Protocols;

use Workerman\Connection\TcpConnection;

class HttpProxy
{

    private static $config = array();

    public static $codes = array(
        100 => 'Continue',
        101 => 'Switching Protocols',
        200 => 'OK',
        201 => 'Created',
        202 => 'Accepted',
        203 => 'Non-Authoritative Information',
        204 => 'No Content',
        205 => 'Reset Content',
        206 => 'Partial Content',
        300 => 'Multiple Choices',
        301 => 'Moved Permanently',
        302 => 'Found',
        303 => 'See Other',
        304 => 'Not Modified',
        305 => 'Use Proxy',
        306 => '(Unused)',
        307 => 'Temporary Redirect',
        400 => 'Bad Request',
        401 => 'Unauthorized',
        402 => 'Payment Required',
        403 => 'Forbidden',
        404 => 'Not Found',
        405 => 'Method Not Allowed',
        406 => 'Not Acceptable',
        407 => 'Proxy Authentication Required',
        408 => 'Request Timeout',
        409 => 'Conflict',
        410 => 'Gone',
        411 => 'Length Required',
        412 => 'Precondition Failed',
        413 => 'Request Entity Too Large',
        414 => 'Request-URI Too Long',
        415 => 'Unsupported Media Type',
        416 => 'Requested Range Not Satisfiable',
        417 => 'Expectation Failed',
        422 => 'Unprocessable Entity',
        423 => 'Locked',
        500 => 'Internal Server Error',
        501 => 'Not Implemented',
        502 => 'Bad Gateway',
        503 => 'Service Unavailable',
        504 => 'Gateway Timeout',
        505 => 'HTTP Version Not Supported'
    );

    /**
     *
     * @param array $config
     */
    public static function setNameConfig(array $config)
    {
        self::$config = $config;
    }

    /**
     * Check the integrity of the package.
     *
     * @param string $recv_buffer
     * @param TcpConnection $connection
     * @return int
     */
    public static function input($recv_buffer, TcpConnection $connection)
    {
        if (! isset($connection->httpRequest)) {
            if (! strpos($recv_buffer, "\r\n\r\n")) {
                return 0;
            }
            list ($header) = explode("\r\n\r\n", $recv_buffer, 2);
            $pos = strpos($header, ' ');
            $method = substr($header, 0, $pos);
            $httpRequest = array();
            $start = strpos($recv_buffer, "Host:") + 5;
            $end = strpos($recv_buffer, "\r\n", $start);
            $name = explode(':',
                trim(substr($recv_buffer, $start, $end - $start)))[0];
            $httpRequest['name'] = $name;
            $httpRequest['sizeTotal'] = self::getRequestSize($header, $method);
            if (! isset($httpRequest['sizeTotal'])) {
                self::responseError($connection, 411);
                return 0;
            }
            $httpRequest['size'] = 0;
            $connection->httpRequest = $httpRequest;
            $running = &$connection->worker->connectionsRunning;
            $paused = &$connection->worker->connectionsPaused;
            if (! isset($running[$name])) {
                $running[$name] = array();
            }
            if (! isset($paused[$name])) {
                $paused[$name] = array();
            }
            $limit = array_key_exists($name, self::$config) ? self::$config[$name] : 0;
            $count = count($running);
            if ($count < 0) {
                user_error('running count is less than zero', E_USER_WARNING);
            }
            if (null !== $limit && $count >= $limit) {
                $paused[$name][] = $connection->id;
                $connection->pauseRecv();
                return 0;
            } else {
                $running[$name][] = $connection->id;
            }
        }
        // 请求头已经正确解析完毕
        return strlen($recv_buffer);
    }

    /**
     *
     * @param TcpConnection $connection
     * @param int $code
     * @param string $msg
     * @param bool $close
     */
    protected static function responseError(TcpConnection $connection, $code,
        $msg = null, $close = null)
    {
        if (! isset($msg)) {
            if (! isset(self::$codes[$code])) {
                user_error('http code is invalid, code=' . $code,
                    E_USER_WARNING);
                return;
            } else {
                $msg = self::$codes[$code];
            }
        }
        $connection->consumeRecvBuffer($connection->getRecvBufferQueueSize());
        $connection->send(
            "HTTP/1.1 $code $msg\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n",
            true);
        if (isset($close) && true === $close) {
            $connection->close();
        }
    }

    /**
     * Get whole size of the request
     * includes the request headers and request body.
     *
     * @param string $header
     *            The request headers
     * @param string $method
     *            The request method
     * @return integer
     */
    protected static function getRequestSize($header, $method)
    {
        if ($method === 'GET' || $method === 'OPTIONS' || $method === 'HEAD') {
            return strlen($header) + 4;
        }
        $match = array();
        if (preg_match("/\r\nContent-Length: ?(\d+)/i", $header, $match)) {
            $content_length = isset($match[1]) ? $match[1] : 0;
            return $content_length + strlen($header) + 4;
        }
    }

    /**
     *
     * @param string $recv_buffer
     * @param TcpConnection $connection
     * @return string
     */
    public static function decode($recv_buffer, TcpConnection $connection)
    {
        $httpRequest = &$connection->httpRequest;
        $httpRequest['size'] += strlen($recv_buffer);
        // 当前http请求已经全部收到
        if ($httpRequest['sizeTotal'] === $httpRequest['size']) {
            $connection->pauseRecv();
        }
        return $recv_buffer;
    }

    /**
     *
     * @param string $content
     * @param TcpConnection $connection
     * @return string
     */
    public static function encode($content, TcpConnection $connection)
    {
        if (! isset($connection->httpResponse)) {
            $connection->httpResponse = array();
        }
        $httpResponse = &$connection->httpResponse;
        if (! isset($httpResponse['content'])) {
            $httpResponse['content'] = '';
        }
        $content = $httpResponse['content'] . $content;
        // 解析返回的HTTP头
        if (! array_key_exists('type', $httpResponse)) {
            if (! strpos($content, "\r\n\r\n")) {
                if (strlen($content) > $connection->maxSendBufferSize) {
                    self::responseError($connection, 502);
                    self::responseFinish($connection);
                    return '';
                }
                $httpResponse['content'] = $content;
                return '';
            }
            list ($header) = explode("\r\n\r\n", $content, 2);
            $sizeHeader = strlen($header) + 4;
            if (preg_match("/\r\nContent-Length: ?(\d+)/i", $header,
                $matchLength)) {
                $httpResponse['type'] = 0;
                $httpResponse['sizeTotal'] = (isset($matchLength[1]) ? $matchLength[1] : 0) +
                     $sizeHeader;
            } elseif (preg_match("/\r\nTransfer-Encoding: ?chunked/i", $header)) {
                $httpResponse['type'] = 1;
                $httpResponse['sizeHeader'] = $sizeHeader;
            } elseif ('100' === substr($header, 9, 3)) {
                $httpResponse['type'] = 3;
            } else {
                $httpResponse['type'] = null;
            }
            $httpResponse['size'] = 0;
        }
        if ($httpResponse['type'] === 0) {
            $httpResponse['size'] += strlen($content);
            if ($httpResponse['size'] === $httpResponse['sizeTotal']) {
                self::responseFinish($connection);
            }
            return $content;
        } elseif ($httpResponse['type'] === 1) {
            $length = 0;
            if (isset($httpResponse['sizeHeader'])) {
                $header = substr($content, 0, $httpResponse['sizeHeader']);
                $body = substr($content, $httpResponse['sizeHeader']);
                unset($httpResponse['sizeHeader']);
            } else {
                $header = '';
                $body = $content;
            }
            $bodyLength = strlen($body);
            while (true) {
                $pos = strpos($body, "\r\n", $length);
                if (false === $pos) {
                    break;
                }
                $chunkLength = substr($body, $length, $pos - $length);
                $chunkLengthTotal = strlen($chunkLength) + 2 +
                     hexdec($chunkLength) + 2;
                if ('0' === $chunkLength && "\r\n\r\n" === substr($body, - 4)) {
                    self::responseFinish($connection);
                }
                $totalLength = $length + $chunkLengthTotal;
                if ($totalLength <= $bodyLength) {
                    $length += $chunkLengthTotal;
                }
            }
            $httpResponse['content'] = substr($body, $length);
            return $header . substr($body, 0, $length);
        } elseif ($httpResponse['type'] === 3) {
            return $content;
        } else {
            self::responseError($connection, 503);
            self::responseFinish($connection);
            return '';
        }
    }

    /**
     *
     * @param TcpConnection $connection
     * @param bool $close
     */
    public static function responseFinish(TcpConnection $connection,
        $close = false)
    {
        if (isset($connection->httpRequest)) {
            $request = $connection->httpRequest;
            $name = $request['name'];
            $connectionRunning = &$connection->worker->connectionsRunning;
            $connectionPaused = &$connection->worker->connectionsPaused;
            
            // 清理状态并使当前连接变成自由连接
            $keyRunning = array_search($connection->id,
                $connectionRunning[$name]);
            unset($connectionRunning[$name][$keyRunning]);
            if (0 === count($connectionRunning[$name])) {
                unset($connectionRunning[$name]);
            }
            unset($connection->httpRequest, $connection->httpResponse);
            if (false === $close) {
                $connection->resumeRecv();
            }
            
            // 取一个暂停的连接
            if (isset($connectionPaused[$name]) &&
                 0 < count($connectionPaused[$name])) {
                $worker = $connection->worker;
                $connectionNext = $worker->connections[array_shift(
                    $connectionPaused[$name])];
                if (0 === count($connectionPaused[$name])) {
                    unset($connectionPaused[$name]);
                }
                $connectionNext->resumeRecv();
                $connectionRunning[$name][] = $connectionNext->id;
            }
        }
    }
}

启动代码:

<?php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Ares333\YafLib\Helper\Error;
use Ares333\YafLib\Helper\System;
use Ares333\YafLib\Tool\Functions;
use Workerman\Connection\AsyncTcpConnection;
use Protocols\HttpProxy;

$dirLogs = __DIR__ . '/../logs';
error_reporting(E_ALL);
ini_set('display_errors', true);
ini_set('memory_limit', '1024m');
ini_set('error_log', $dirLogs . '/php_error.log');
Error::error2exception();
new Functions();
global $argv;

Worker::$logFile = $dirLogs . '/worker.log';
Worker::$stdoutFile = $dirLogs . '/out.log';
Worker::$pidFile = $dirLogs . '/worker.pid';

$sock = '/var/run/workerman-api.sock';
$worker = new Worker('httpProxy://' . $sock);
if (array_key_exists(1, $argv) &&
     ($argv[1] === 'start' || $argv[1] === 'restart')) {
    if (file_exists($sock)) {
        unlink($sock);
    }
}
$worker->user = 'apache';
$worker->group = 'apache';
$worker->transport = 'unix';
$worker->count = System::cpuNum();
$worker->reloadable = false;
$worker->connectionsRunning = array();
// 排队的连接
$worker->connectionsPaused = array();

$worker->onWorkerReload = function () {};
$worker->onWorkerStart = function () {
    HttpProxy::setNameConfig(
        [
            'test1' => 1000,
            'test' => 1
        ]);
};
$worker->onMessage = function (TcpConnection $connection, $buffer) {
    // 上下游连接一一对应,完全规避处理KeepAlive的逻辑
    if (! isset($connection->connectionProxyUpstream)) {
        $connectionUpstream = new AsyncTcpConnection(
            'unix:///var/run/nginx.sock');
        $connectionUpstream->connectionProxy = $connection;
        $connectionUpstream->onMessage = function (
            TcpConnection $connectionUpstream, $buffer) {
            $connectionUpstream->connectionProxy->send($buffer);
        };
        $connectionUpstream->onClose = function (
            TcpConnection $connectionUpstream) {
            HttpProxy::responseFinish($connectionUpstream->connectionProxy);
            unset($connectionUpstream->connectionProxy);
        };
        $connectionUpstream->onError = function (
            TcpConnection $connectionUpstream, $code, $msg) {
            user_error($code . ': ' . $msg, E_USER_WARNING);
        };
        $connectionUpstream->onBufferFull = [
            $connection,
            'pauseRecv'
        ];
        $connectionUpstream->onBufferDrain = [
            $connection,
            'resumeRecv'
        ];
        $connectionUpstream->connect();
        $connection->connectionProxyUpstream = $connectionUpstream;
    } else {
        $connectionUpstream = $connection->connectionProxyUpstream;
    }
    $connectionUpstream->send($buffer, true);
};
$worker->onClose = function (TcpConnection $connection) {
    // 清理状态信息
    HttpProxy::responseFinish($connection, true);
    // 关闭关联连接
    if (isset($connection->connectionProxyUpstream)) {
        $connection->connectionProxyUpstream->close();
        unset($connection->connectionProxyUpstream);
    }
};
$worker->onError = function ($connection, $code, $msg) {
    if ($msg !== 'client closed') {
        user_error($code . ': ' . $msg, E_USER_WARNING);
    }
};
$worker->onBufferFull = function (TcpConnection $connection) {
    $connection->connectionProxyUpstream->pauseRecv();
};
$worker->onBufferDrain = function (TcpConnection $connection) {
    $connection->connectionProxyUpstream->resumeRecv();
};

Worker::runAll();

单进程下完美运行,多进程、多机情况下需要做一个异步tcp中控操作running和paused连接。为了更优化paused连接需要用Timer做超时控制。

发表评论

电子邮件地址不会被公开。

*