【轻知识】Oss Sdk request 使用 Guzzle & guzzle-swoole ,协程化

环境: hyperf框架

需求

减少阻塞调用。支持swoole 协程。

Cos sdk 用的是guzzle。用guzzle-swoole开启协程。

Oss sdk 用的是curl。但是swoole 一键协程。跟Cos sdk 冲突。于是把Oss sdk改成使用guzzle。对比改Cos成本比较小。

改动点

主要改动点如下。

1.prep_request方法。改成返回一个 guzzle的request对象。
2.send_request方法,guzzle client 发送request。
3.process_response方法,处理响应体。
4.框架代码不用改命名空间。单独拎出到一个目录(这个可灵活处理,或者弄个composer包都可以)。composer.json。放到autoload 里面。

autoload": {
        "psr-4": {
            "App\\": "app/",
            "OSS\\": "lib/aliyuncs/oss-sdk-php/src/OSS"
        },
        "files": []
    },

注意点

1.guzzle 对非200操作会抛异常。但是oss是restful设计。其他状态码也表示接口语义。所以抛异常catch下。赋值response。

        try {
               $client->send(xxxxxx);
        } catch (GuzzleHttp\Exception\ClientException $e) {
            // guzzle 非200会抛异常。所以restful接口。下面赋值不做其他处理
            $this->response = $e->getResponse();
        }

2.对接口返回100状态码的处理。

  $temp_headers['Expect'] = '';

3.响应内容拼接。参考下面的getRawResponse方法。

4.对项目中用到的接口功能进行测试。确保兼容功能没有异常。

贴一个自测代码。可能不常用fopen。测试getObject写入文件。

    $resource = fopen($downPath, 'rw+');
    $options = [
        OssClient::OSS_FILE_DOWNLOAD => $resource,
    ];
    $getResult = $ossClient->getObject('xes-private-test', $object,$options);
    var_dump(file_get_contents($downPath));

贴代码,也不多。

<?php
namespace OSS\Http;
use Swoole\Coroutine;
use Swoole\Curl\Handler;
use GuzzleHttp;
use GuzzleHttp\Psr7\Request;
class RequestCore
{
    public $request_url;

    public $request_headers;

    public $response_raw_headers;

    public $response_error_body;

    public $write_file_handle;

    public $request_body;

    public $response;

    public $response_headers;

    public $response_body;

    public $response_code;

    public $response_info;

    public $method;

    public $proxy = null;

    public $username = null;

    public $password = null;

    public $curlopts = null;

    public $debug_mode = false;

    public $request_class = 'OSS\Http\RequestCore';

    public $response_class = 'OSS\Http\ResponseCore';

    public $useragent = 'RequestCore/1.4.3';

    public $read_file = null;

    public $read_stream = null;

    public $read_stream_size = null;

    public $read_stream_read = 0;

    public $write_file = null;

    public $write_stream = null;

    public $seek_position = null;

    public $cacert_location = false;

    public $ssl_verification = true;

    public $registered_streaming_read_callback = null;

    public $registered_streaming_write_callback = null;

    public $timeout = 5184000;

    public $connect_timeout = 10;

    // CONSTANTS

    const HTTP_GET = 'GET';

    const HTTP_POST = 'POST';

    const HTTP_PUT = 'PUT';

    const HTTP_DELETE = 'DELETE';

    const HTTP_HEAD = 'HEAD';


    private $requestAddInfo  = []; // 请求的附加信息。

    // CONSTRUCTOR/DESTRUCTOR

    public function __construct($url = null, $proxy = null, $helpers = null)
    {
        // Set some default values.
        $this->request_url = $url;
        $this->method = self::HTTP_GET;
        $this->request_headers = array();
        $this->request_body = '';

        // Set a new Request class if one was set.
        if (isset($helpers['request']) && !empty($helpers['request'])) {
            $this->request_class = $helpers['request'];
        }

        // Set a new Request class if one was set.
        if (isset($helpers['response']) && !empty($helpers['response'])) {
            $this->response_class = $helpers['response'];
        }

        if ($proxy) {
            $this->set_proxy($proxy);
        }

        return $this;
    }

    public function __destruct()
    {


//        if (isset($this->read_file) && isset($this->read_stream)) {
//            fclose($this->read_stream);
//        }

        if (isset($this->write_file) && isset($this->write_stream)) {
             fclose($this->write_stream);
        }

        return $this;
    }


    // REQUEST METHODS

    public function set_credentials($user, $pass)
    {
        $this->username = $user;
        $this->password = $pass;
        return $this;
    }

    public function add_header($key, $value)
    {
        $this->request_headers[$key] = $value;
        return $this;
    }

    public function remove_header($key)
    {
        if (isset($this->request_headers[$key])) {
            unset($this->request_headers[$key]);
        }
        return $this;
    }

    public function set_method($method)
    {
        $this->method = strtoupper($method);
        return $this;
    }

    public function set_useragent($ua)
    {
        $this->useragent = $ua;
        return $this;
    }

    public function set_body($body)
    {
        $this->request_body = $body;
        return $this;
    }

    public function set_request_url($url)
    {
        $this->request_url = $url;
        return $this;
    }

    public function set_curlopts($curlopts)
    {
        $this->curlopts = $curlopts;
        return $this;
    }

    public function set_read_stream_size($size)
    {
        $this->read_stream_size = $size;

        return $this;
    }

    public function set_read_stream($resource, $size = null)
    {
        if (!isset($size) || $size < 0) {
            $stats = fstat($resource);

            if ($stats && $stats['size'] >= 0) {
                $position = ftell($resource);

                if ($position !== false && $position >= 0) {
                    $size = $stats['size'] - $position;
                }
            }
        }

        $this->read_stream = $resource;

        return $this->set_read_stream_size($size);
    }

    public function set_read_file($location)
    {
        $this->read_file = $location;
        $read_file_handle = fopen($location, 'r');

        return $this->set_read_stream($read_file_handle);
    }

    public function set_write_stream($resource)
    {
        $this->write_stream = $resource;

        return $this;
    }

    public function set_write_file($location)
    {
        $this->write_file = $location;
    }

    public function set_proxy($proxy)
    {
        $proxy = parse_url($proxy);
        $proxy['user'] = isset($proxy['user']) ? $proxy['user'] : null;
        $proxy['pass'] = isset($proxy['pass']) ? $proxy['pass'] : null;
        $proxy['port'] = isset($proxy['port']) ? $proxy['port'] : null;
        $this->proxy = $proxy;
        return $this;
    }

    public function set_seek_position($position)
    {
        $this->seek_position = isset($position) ? (integer)$position : null;

        return $this;
    }

    public function streaming_header_callback($curl_handle, $header_content)
    {
        $code = curl_getinfo($curl_handle, CURLINFO_HTTP_CODE);

        if (isset($this->write_file) && intval($code) / 100 == 2 && !isset($this->write_file_handle))
        {
            $this->write_file_handle = fopen($this->write_file, 'w');
            $this->set_write_stream($this->write_file_handle);
        }

        $this->response_raw_headers .= $header_content;
        return strlen($header_content);
    }


    public function register_streaming_read_callback($callback)
    {
        $this->registered_streaming_read_callback = $callback;

        return $this;
    }

    public function register_streaming_write_callback($callback)
    {
        $this->registered_streaming_write_callback = $callback;

        return $this;
    }


    // PREPARE, SEND, AND PROCESS REQUEST

    public function streaming_read_callback($curl_handle, $file_handle, $length)
    {
        // Once we've sent as much as we're supposed to send...
        if ($this->read_stream_read >= $this->read_stream_size) {
            // Send EOF
            return '';
        }

        // If we're at the beginning of an upload and need to seek...
        if ($this->read_stream_read == 0 && isset($this->seek_position) && $this->seek_position !== ftell($this->read_stream)) {
            if (fseek($this->read_stream, $this->seek_position) !== 0) {
                throw new RequestCore_Exception('The stream does not support seeking and is either not at the requested position or the position is unknown.');
            }
        }

        $read = fread($this->read_stream, min($this->read_stream_size - $this->read_stream_read, $length)); // Remaining upload data or cURL's requested chunk size
        $this->read_stream_read += strlen($read);

        $out = $read === false ? '' : $read;

        // Execute callback function
        if ($this->registered_streaming_read_callback) {
            call_user_func($this->registered_streaming_read_callback, $curl_handle, $file_handle, $out);
        }

        return $out;
    }

    public function streaming_write_callback($curl_handle, $data)
    {
        $code = curl_getinfo($curl_handle, CURLINFO_HTTP_CODE);

        if (intval($code) / 100 != 2)
        {
            $this->response_error_body .= $data;
            return strlen($data);
        }

        $length = strlen($data);
        $written_total = 0;
        $written_last = 0;

        while ($written_total < $length) {
            $written_last = fwrite($this->write_stream, substr($data, $written_total));

            if ($written_last === false) {
                return $written_total;
            }

            $written_total += $written_last;
        }

        // Execute callback function
        if ($this->registered_streaming_write_callback) {
            call_user_func($this->registered_streaming_write_callback, $curl_handle, $written_total);
        }

        return $written_total;
    }

    public function prep_request()
    {

        $this->requestAddInfo = [
            'url'=>$this->request_url,
        ];

        // Process custom headers
        if (isset($this->request_headers) && count($this->request_headers)) {
            $temp_headers = array();

            foreach ($this->request_headers as $k => $v) {
                $temp_headers[$k] =  $v;
            }
        }
        $temp_headers['Expect'] = '';
        $temp_headers['Referer'] =  $this->request_url;
        $temp_headers['User-Agent'] =  $this->useragent;
        $body = null;
        switch ($this->method) {
            case self::HTTP_POST:
            case self::HTTP_PUT:
                if (isset($this->read_stream)) {
                    if (!isset($this->read_stream_size) || $this->read_stream_size < 0) {
                        throw new RequestCore_Exception('The stream size for the streaming upload cannot be determined.');
                    }
                    $this->requestAddInfo['size_upload'] = $this->read_stream_size;
                } else {
                    $this->requestAddInfo['size_upload'] = strlen($this->request_body);
                }
                break;

            case self::HTTP_HEAD:
                $this->request_body = null;
                break;

            default: // Assumed GET
//                if (isset($this->write_stream) || isset($this->write_file)) {
//
//                } else {
//                }
                break;
        }
        $this->requestAddInfo['method'] = $this->method;
        if (isset($this->read_stream)) {// 使用 stream 的方式
            if (!isset($this->read_stream_size) || $this->read_stream_size < 0) {
                throw new RequestCore_Exception('The stream size for the streaming upload cannot be determined.');
            }

            $bodyStream = \GuzzleHttp\Psr7\stream_for($this->read_stream);
            $request = new Request($this->method, $this->request_url, $temp_headers, $bodyStream);
        } else {
            $request = new Request($this->method, $this->request_url, $temp_headers, $this->request_body);
        }
        return $request;
    }

    public function process_response(Request $request = null, GuzzleHttp\Psr7\Response $response = null)
    {
        // Accept a custom one if it's passed.
        if ($request && $response) {
            $this->response = $this->getRawResponse($response);
        }

        $this->response_headers = $response->getHeaders();
        $this->response_code = $response->getStatusCode();
        $this->requestAddInfo['http_code'] = $response->getStatusCode();

        $this->response_info = $this->requestAddInfo; // 为空白
        $header_assoc = [];
        foreach ($this->response_headers as $key=>$header) {
            $header_assoc[strtolower($key)] = $header[0] ?? '';
        }

        // Reset the headers to the appropriate property.
        $this->response_headers = $header_assoc;
        $this->response_headers['info'] = $this->response_info;
        $this->response_headers['info']['method'] = $this->method;

        if ($request && $response) {
            return new ResponseCore($this->response_headers, $this->response_body, $this->response_code);
        }

        if (intval($this->response_code) / 100 != 2 && isset($this->write_file))
        {
            $this->response_headers = $this->response_raw_headers;
            $this->response_body = $this->response_error_body;
        }

        return false;
    }

    public function send_request($parse = false)
    {
        set_time_limit(0);
        $request = $this->prep_request();
        $client = new GuzzleHttp\Client();
        try {
            if ($this->method === self::HTTP_GET && (isset($this->write_stream) || isset($this->write_file))) {
                if (isset($this->write_file)) {
                    $this->response = $client->send($request, [
                        'save_to'=>$this->write_file
                    ]);
                }
                if (isset($this->write_stream)) {
                    $this->response = $client->send($request, [
                        'save_to'=> GuzzleHttp\Psr7\stream_for($this->write_stream)
                    ]);
                }

            } else {
                $this->response = $client->send($request);
            }
        } catch (GuzzleHttp\Exception\ClientException $e) {
            // guzzle 非200会抛异常。所以restful接口。下面赋值不做其他处理
            $this->response = $e->getResponse();
        }

        $parsed_response = $this->process_response($request, $this->response);

        if ($parse) {
            return $parsed_response;
        }
        return $this->response;
    }

    // RESPONSE METHODS

    public function get_response_header($header = null)
    {
        if ($header) {
            return $this->response_headers[strtolower($header)];
        }
        return $this->response_headers;
    }

    public function get_response_body()
    {
        return $this->response_body;
    }

    public function get_response_code()
    {
        return $this->response_code;
    }


    // 拼接 响应报文
    //string(335) "HTTP/1.1 100 Continue
//
//HTTP/1.1 200 OK
//Server: AliyunOSS
//Date: Wed, 26 Aug 2020 21:25:37 GMT
//Content-Length: 0
//Connection: keep-alive
//x-oss-request-id: 5F46D3509EB80732383B67E1
//ETag: "8FA01A1F7C0811B252FF4DF1973F32CE"
//Content-MD5: j6AaH3wIEbJS/03xlz8yzg==
//x-oss-hash-crc64ecma: 16545178446529799215
//x-oss-server-time: 38
    public function getRawResponse(GuzzleHttp\Psr7\Response $response) {

        $str = sprintf("HTTP/%s %s %s",$response->getProtocolVersion(),$response->getStatusCode(), $response->getReasonPhrase());
        $str .= "\r\n";
        if (!empty($response->getHeaders())) {
            foreach ($response->getHeaders() as $key=>$header) {
                $str.= sprintf("%s: %s\r\n", $key, $header[0] ?? '');
            }
            $str .= "\r\n";
        }else {
            $str .= "\r\n";
        }
        $body = $response->getBody()->getContents();
        $this->response_body = $body;
        $str.= $body;
        return $str;
    }
}

参考资料

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352