环境: hyperf框架
需求
减少阻塞调用。支持swoole 协程。
Cos sdk 用的是guzzle。用guzzle-swoole开启协程。
Oss sdk 用的是curl。但是swoole 一键协程。跟Cos sdk 冲突。于是把Oss sdk改成使用guzzle。对比改Cos成本比较小。
改动点
主要改动点如下。
1.prep_request方法。改成返回一个 guzzle的request对象。
2.send_request方法,guzzle client 发送request。
3.process_response方法,处理响应体。
4.框架代码不用改命名空间。单独拎出到一个目录(这个可灵活处理,或者弄个composer包都可以)。composer.json。放到autoload 里面。
autoload": {
"psr-4": {
"App\\": "app/",
"OSS\\": "lib/aliyuncs/oss-sdk-php/src/OSS"
},
"files": []
},
注意点
1.guzzle 对非200操作会抛异常。但是oss是restful设计。其他状态码也表示接口语义。所以抛异常catch下。赋值response。
try {
$client->send(xxxxxx);
} catch (GuzzleHttp\Exception\ClientException $e) {
// guzzle 非200会抛异常。所以restful接口。下面赋值不做其他处理
$this->response = $e->getResponse();
}
2.对接口返回100状态码的处理。
$temp_headers['Expect'] = '';
3.响应内容拼接。参考下面的getRawResponse
方法。
4.对项目中用到的接口功能进行测试。确保兼容功能没有异常。
贴一个自测代码。可能不常用fopen。测试getObject写入文件。
$resource = fopen($downPath, 'rw+');
$options = [
OssClient::OSS_FILE_DOWNLOAD => $resource,
];
$getResult = $ossClient->getObject('xes-private-test', $object,$options);
var_dump(file_get_contents($downPath));
贴代码,也不多。
<?php
namespace OSS\Http;
use Swoole\Coroutine;
use Swoole\Curl\Handler;
use GuzzleHttp;
use GuzzleHttp\Psr7\Request;
class RequestCore
{
public $request_url;
public $request_headers;
public $response_raw_headers;
public $response_error_body;
public $write_file_handle;
public $request_body;
public $response;
public $response_headers;
public $response_body;
public $response_code;
public $response_info;
public $method;
public $proxy = null;
public $username = null;
public $password = null;
public $curlopts = null;
public $debug_mode = false;
public $request_class = 'OSS\Http\RequestCore';
public $response_class = 'OSS\Http\ResponseCore';
public $useragent = 'RequestCore/1.4.3';
public $read_file = null;
public $read_stream = null;
public $read_stream_size = null;
public $read_stream_read = 0;
public $write_file = null;
public $write_stream = null;
public $seek_position = null;
public $cacert_location = false;
public $ssl_verification = true;
public $registered_streaming_read_callback = null;
public $registered_streaming_write_callback = null;
public $timeout = 5184000;
public $connect_timeout = 10;
// CONSTANTS
const HTTP_GET = 'GET';
const HTTP_POST = 'POST';
const HTTP_PUT = 'PUT';
const HTTP_DELETE = 'DELETE';
const HTTP_HEAD = 'HEAD';
private $requestAddInfo = []; // 请求的附加信息。
// CONSTRUCTOR/DESTRUCTOR
public function __construct($url = null, $proxy = null, $helpers = null)
{
// Set some default values.
$this->request_url = $url;
$this->method = self::HTTP_GET;
$this->request_headers = array();
$this->request_body = '';
// Set a new Request class if one was set.
if (isset($helpers['request']) && !empty($helpers['request'])) {
$this->request_class = $helpers['request'];
}
// Set a new Request class if one was set.
if (isset($helpers['response']) && !empty($helpers['response'])) {
$this->response_class = $helpers['response'];
}
if ($proxy) {
$this->set_proxy($proxy);
}
return $this;
}
public function __destruct()
{
// if (isset($this->read_file) && isset($this->read_stream)) {
// fclose($this->read_stream);
// }
if (isset($this->write_file) && isset($this->write_stream)) {
fclose($this->write_stream);
}
return $this;
}
// REQUEST METHODS
public function set_credentials($user, $pass)
{
$this->username = $user;
$this->password = $pass;
return $this;
}
public function add_header($key, $value)
{
$this->request_headers[$key] = $value;
return $this;
}
public function remove_header($key)
{
if (isset($this->request_headers[$key])) {
unset($this->request_headers[$key]);
}
return $this;
}
public function set_method($method)
{
$this->method = strtoupper($method);
return $this;
}
public function set_useragent($ua)
{
$this->useragent = $ua;
return $this;
}
public function set_body($body)
{
$this->request_body = $body;
return $this;
}
public function set_request_url($url)
{
$this->request_url = $url;
return $this;
}
public function set_curlopts($curlopts)
{
$this->curlopts = $curlopts;
return $this;
}
public function set_read_stream_size($size)
{
$this->read_stream_size = $size;
return $this;
}
public function set_read_stream($resource, $size = null)
{
if (!isset($size) || $size < 0) {
$stats = fstat($resource);
if ($stats && $stats['size'] >= 0) {
$position = ftell($resource);
if ($position !== false && $position >= 0) {
$size = $stats['size'] - $position;
}
}
}
$this->read_stream = $resource;
return $this->set_read_stream_size($size);
}
public function set_read_file($location)
{
$this->read_file = $location;
$read_file_handle = fopen($location, 'r');
return $this->set_read_stream($read_file_handle);
}
public function set_write_stream($resource)
{
$this->write_stream = $resource;
return $this;
}
public function set_write_file($location)
{
$this->write_file = $location;
}
public function set_proxy($proxy)
{
$proxy = parse_url($proxy);
$proxy['user'] = isset($proxy['user']) ? $proxy['user'] : null;
$proxy['pass'] = isset($proxy['pass']) ? $proxy['pass'] : null;
$proxy['port'] = isset($proxy['port']) ? $proxy['port'] : null;
$this->proxy = $proxy;
return $this;
}
public function set_seek_position($position)
{
$this->seek_position = isset($position) ? (integer)$position : null;
return $this;
}
public function streaming_header_callback($curl_handle, $header_content)
{
$code = curl_getinfo($curl_handle, CURLINFO_HTTP_CODE);
if (isset($this->write_file) && intval($code) / 100 == 2 && !isset($this->write_file_handle))
{
$this->write_file_handle = fopen($this->write_file, 'w');
$this->set_write_stream($this->write_file_handle);
}
$this->response_raw_headers .= $header_content;
return strlen($header_content);
}
public function register_streaming_read_callback($callback)
{
$this->registered_streaming_read_callback = $callback;
return $this;
}
public function register_streaming_write_callback($callback)
{
$this->registered_streaming_write_callback = $callback;
return $this;
}
// PREPARE, SEND, AND PROCESS REQUEST
public function streaming_read_callback($curl_handle, $file_handle, $length)
{
// Once we've sent as much as we're supposed to send...
if ($this->read_stream_read >= $this->read_stream_size) {
// Send EOF
return '';
}
// If we're at the beginning of an upload and need to seek...
if ($this->read_stream_read == 0 && isset($this->seek_position) && $this->seek_position !== ftell($this->read_stream)) {
if (fseek($this->read_stream, $this->seek_position) !== 0) {
throw new RequestCore_Exception('The stream does not support seeking and is either not at the requested position or the position is unknown.');
}
}
$read = fread($this->read_stream, min($this->read_stream_size - $this->read_stream_read, $length)); // Remaining upload data or cURL's requested chunk size
$this->read_stream_read += strlen($read);
$out = $read === false ? '' : $read;
// Execute callback function
if ($this->registered_streaming_read_callback) {
call_user_func($this->registered_streaming_read_callback, $curl_handle, $file_handle, $out);
}
return $out;
}
public function streaming_write_callback($curl_handle, $data)
{
$code = curl_getinfo($curl_handle, CURLINFO_HTTP_CODE);
if (intval($code) / 100 != 2)
{
$this->response_error_body .= $data;
return strlen($data);
}
$length = strlen($data);
$written_total = 0;
$written_last = 0;
while ($written_total < $length) {
$written_last = fwrite($this->write_stream, substr($data, $written_total));
if ($written_last === false) {
return $written_total;
}
$written_total += $written_last;
}
// Execute callback function
if ($this->registered_streaming_write_callback) {
call_user_func($this->registered_streaming_write_callback, $curl_handle, $written_total);
}
return $written_total;
}
public function prep_request()
{
$this->requestAddInfo = [
'url'=>$this->request_url,
];
// Process custom headers
if (isset($this->request_headers) && count($this->request_headers)) {
$temp_headers = array();
foreach ($this->request_headers as $k => $v) {
$temp_headers[$k] = $v;
}
}
$temp_headers['Expect'] = '';
$temp_headers['Referer'] = $this->request_url;
$temp_headers['User-Agent'] = $this->useragent;
$body = null;
switch ($this->method) {
case self::HTTP_POST:
case self::HTTP_PUT:
if (isset($this->read_stream)) {
if (!isset($this->read_stream_size) || $this->read_stream_size < 0) {
throw new RequestCore_Exception('The stream size for the streaming upload cannot be determined.');
}
$this->requestAddInfo['size_upload'] = $this->read_stream_size;
} else {
$this->requestAddInfo['size_upload'] = strlen($this->request_body);
}
break;
case self::HTTP_HEAD:
$this->request_body = null;
break;
default: // Assumed GET
// if (isset($this->write_stream) || isset($this->write_file)) {
//
// } else {
// }
break;
}
$this->requestAddInfo['method'] = $this->method;
if (isset($this->read_stream)) {// 使用 stream 的方式
if (!isset($this->read_stream_size) || $this->read_stream_size < 0) {
throw new RequestCore_Exception('The stream size for the streaming upload cannot be determined.');
}
$bodyStream = \GuzzleHttp\Psr7\stream_for($this->read_stream);
$request = new Request($this->method, $this->request_url, $temp_headers, $bodyStream);
} else {
$request = new Request($this->method, $this->request_url, $temp_headers, $this->request_body);
}
return $request;
}
public function process_response(Request $request = null, GuzzleHttp\Psr7\Response $response = null)
{
// Accept a custom one if it's passed.
if ($request && $response) {
$this->response = $this->getRawResponse($response);
}
$this->response_headers = $response->getHeaders();
$this->response_code = $response->getStatusCode();
$this->requestAddInfo['http_code'] = $response->getStatusCode();
$this->response_info = $this->requestAddInfo; // 为空白
$header_assoc = [];
foreach ($this->response_headers as $key=>$header) {
$header_assoc[strtolower($key)] = $header[0] ?? '';
}
// Reset the headers to the appropriate property.
$this->response_headers = $header_assoc;
$this->response_headers['info'] = $this->response_info;
$this->response_headers['info']['method'] = $this->method;
if ($request && $response) {
return new ResponseCore($this->response_headers, $this->response_body, $this->response_code);
}
if (intval($this->response_code) / 100 != 2 && isset($this->write_file))
{
$this->response_headers = $this->response_raw_headers;
$this->response_body = $this->response_error_body;
}
return false;
}
public function send_request($parse = false)
{
set_time_limit(0);
$request = $this->prep_request();
$client = new GuzzleHttp\Client();
try {
if ($this->method === self::HTTP_GET && (isset($this->write_stream) || isset($this->write_file))) {
if (isset($this->write_file)) {
$this->response = $client->send($request, [
'save_to'=>$this->write_file
]);
}
if (isset($this->write_stream)) {
$this->response = $client->send($request, [
'save_to'=> GuzzleHttp\Psr7\stream_for($this->write_stream)
]);
}
} else {
$this->response = $client->send($request);
}
} catch (GuzzleHttp\Exception\ClientException $e) {
// guzzle 非200会抛异常。所以restful接口。下面赋值不做其他处理
$this->response = $e->getResponse();
}
$parsed_response = $this->process_response($request, $this->response);
if ($parse) {
return $parsed_response;
}
return $this->response;
}
// RESPONSE METHODS
public function get_response_header($header = null)
{
if ($header) {
return $this->response_headers[strtolower($header)];
}
return $this->response_headers;
}
public function get_response_body()
{
return $this->response_body;
}
public function get_response_code()
{
return $this->response_code;
}
// 拼接 响应报文
//string(335) "HTTP/1.1 100 Continue
//
//HTTP/1.1 200 OK
//Server: AliyunOSS
//Date: Wed, 26 Aug 2020 21:25:37 GMT
//Content-Length: 0
//Connection: keep-alive
//x-oss-request-id: 5F46D3509EB80732383B67E1
//ETag: "8FA01A1F7C0811B252FF4DF1973F32CE"
//Content-MD5: j6AaH3wIEbJS/03xlz8yzg==
//x-oss-hash-crc64ecma: 16545178446529799215
//x-oss-server-time: 38
public function getRawResponse(GuzzleHttp\Psr7\Response $response) {
$str = sprintf("HTTP/%s %s %s",$response->getProtocolVersion(),$response->getStatusCode(), $response->getReasonPhrase());
$str .= "\r\n";
if (!empty($response->getHeaders())) {
foreach ($response->getHeaders() as $key=>$header) {
$str.= sprintf("%s: %s\r\n", $key, $header[0] ?? '');
}
$str .= "\r\n";
}else {
$str .= "\r\n";
}
$body = $response->getBody()->getContents();
$this->response_body = $body;
$str.= $body;
return $str;
}
}
参考资料