You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
467 lines
17 KiB
467 lines
17 KiB
* This file is part of the Symfony package.
* (c) Fabien Potencier <>
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
namespace Symfony\Component\HttpClient\Response;
use Amp\ByteStream\StreamException;
use Amp\CancellationTokenSource;
use Amp\Coroutine;
use Amp\Deferred;
use Amp\Http\Client\HttpException;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
use Psr\Log\LoggerInterface;
use Symfony\Component\HttpClient\Chunk\FirstChunk;
use Symfony\Component\HttpClient\Chunk\InformationalChunk;
use Symfony\Component\HttpClient\Exception\InvalidArgumentException;
use Symfony\Component\HttpClient\Exception\TransportException;
use Symfony\Component\HttpClient\HttpClientTrait;
use Symfony\Component\HttpClient\Internal\AmpBody;
use Symfony\Component\HttpClient\Internal\AmpClientState;
use Symfony\Component\HttpClient\Internal\Canary;
use Symfony\Component\HttpClient\Internal\ClientState;
use Symfony\Contracts\HttpClient\ResponseInterface;
* @author Nicolas Grekas <>
* @internal
final class AmpResponse implements ResponseInterface, StreamableInterface
use CommonResponseTrait;
use TransportResponseTrait;
private static $nextId = 'a';
private $multi;
private $options;
private $onProgress;
private static $delay;
* @internal
public function __construct(AmpClientState $multi, Request $request, array $options, ?LoggerInterface $logger)
$this->multi = $multi;
$this->options = &$options;
$this->logger = $logger;
$this->timeout = $options['timeout'];
$this->shouldBuffer = $options['buffer'];
if ($this->inflate = \extension_loaded('zlib') && !$request->hasHeader('accept-encoding')) {
$request->setHeader('Accept-Encoding', 'gzip');
$this->initializer = static function (self $response) {
return null !== $response->options;
$info = &$this->info;
$headers = &$this->headers;
$canceller = new CancellationTokenSource();
$handle = &$this->handle;
$info['url'] = (string) $request->getUri();
$info['http_method'] = $request->getMethod();
$info['start_time'] = null;
$info['redirect_url'] = null;
$info['redirect_time'] = 0.0;
$info['redirect_count'] = 0;
$info['size_upload'] = 0.0;
$info['size_download'] = 0.0;
$info['upload_content_length'] = -1.0;
$info['download_content_length'] = -1.0;
$info['user_data'] = $options['user_data'];
$info['max_duration'] = $options['max_duration'];
$info['debug'] = '';
$resolve = static function (string $host, ?string $ip = null) use ($multi): ?string {
if (null !== $ip) {
$multi->dnsCache[$host] = $ip;
return $multi->dnsCache[$host] ?? null;
$onProgress = $options['on_progress'] ?? static function () {};
$onProgress = $this->onProgress = static function () use (&$info, $onProgress, $resolve) {
$info['total_time'] = microtime(true) - $info['start_time'];
$onProgress((int) $info['size_download'], ((int) (1 + $info['download_content_length']) ?: 1) - 1, (array) $info, $resolve);
$pauseDeferred = new Deferred();
$pause = new Success();
$throttleWatcher = null;
$this->id = $id = self::$nextId++;
Loop::defer(static function () use ($request, $multi, &$id, &$info, &$headers, $canceller, &$options, $onProgress, &$handle, $logger, &$pause) {
return new Coroutine(self::generateResponse($request, $multi, $id, $info, $headers, $canceller, $options, $onProgress, $handle, $logger, $pause));
$info['pause_handler'] = static function (float $duration) use (&$throttleWatcher, &$pauseDeferred, &$pause) {
if (null !== $throttleWatcher) {
$pause = $pauseDeferred->promise();
if ($duration <= 0) {
$deferred = $pauseDeferred;
$pauseDeferred = new Deferred();
} else {
$throttleWatcher = Loop::delay(ceil(1000 * $duration), static function () use (&$pauseDeferred) {
$deferred = $pauseDeferred;
$pauseDeferred = new Deferred();
$multi->lastTimeout = null;
$multi->openHandles[$id] = $id;
$this->canary = new Canary(static function () use ($canceller, $multi, $id) {
unset($multi->openHandles[$id], $multi->handlesActivity[$id]);
* {@inheritdoc}
public function getInfo(?string $type = null)
return null !== $type ? $this->info[$type] ?? null : $this->info;
public function __sleep(): array
throw new \BadMethodCallException('Cannot serialize '.__CLASS__);
public function __wakeup()
throw new \BadMethodCallException('Cannot unserialize '.__CLASS__);
public function __destruct()
try {
} finally {
// Clear the DNS cache when all requests completed
if (0 >= --$this->multi->responseCount) {
$this->multi->responseCount = 0;
$this->multi->dnsCache = [];
* {@inheritdoc}
private static function schedule(self $response, array &$runningResponses): void
if (isset($runningResponses[0])) {
$runningResponses[0][1][$response->id] = $response;
} else {
$runningResponses[0] = [$response->multi, [$response->id => $response]];
if (!isset($response->multi->openHandles[$response->id])) {
$response->multi->handlesActivity[$response->id][] = null;
$response->multi->handlesActivity[$response->id][] = null !== $response->info['error'] ? new TransportException($response->info['error']) : null;
* {@inheritdoc}
* @param AmpClientState $multi
private static function perform(ClientState $multi, ?array &$responses = null): void
if ($responses) {
foreach ($responses as $response) {
try {
if ($response->info['start_time']) {
$response->info['total_time'] = microtime(true) - $response->info['start_time'];
} catch (\Throwable $e) {
$multi->handlesActivity[$response->id][] = null;
$multi->handlesActivity[$response->id][] = $e;
* {@inheritdoc}
* @param AmpClientState $multi
private static function select(ClientState $multi, float $timeout): int
$timeout += microtime(true);
self::$delay = Loop::defer(static function () use ($timeout) {
if (0 < $timeout -= microtime(true)) {
self::$delay = Loop::delay(ceil(1000 * $timeout), [Loop::class, 'stop']);
} else {
return null === self::$delay ? 1 : 0;
private static function generateResponse(Request $request, AmpClientState $multi, string $id, array &$info, array &$headers, CancellationTokenSource $canceller, array &$options, \Closure $onProgress, &$handle, ?LoggerInterface $logger, Promise &$pause)
$request->setInformationalResponseHandler(static function (Response $response) use ($multi, $id, &$info, &$headers) {
self::addResponseHeaders($response, $info, $headers);
$multi->handlesActivity[$id][] = new InformationalChunk($response->getStatus(), $response->getHeaders());
try {
/* @var Response $response */
if (null === $response = yield from self::getPushedResponse($request, $multi, $info, $headers, $options, $logger)) {
$logger && $logger->info(sprintf('Request: "%s %s"', $info['http_method'], $info['url']));
$response = yield from self::followRedirects($request, $multi, $info, $headers, $canceller, $options, $onProgress, $handle, $logger, $pause);
$options = null;
$multi->handlesActivity[$id][] = new FirstChunk();
if ('HEAD' === $response->getRequest()->getMethod() || \in_array($info['http_code'], [204, 304], true)) {
$multi->handlesActivity[$id][] = null;
$multi->handlesActivity[$id][] = null;
if ($response->hasHeader('content-length')) {
$info['download_content_length'] = (float) $response->getHeader('content-length');
$body = $response->getBody();
while (true) {
yield $pause;
if (null === $data = yield $body->read()) {
$info['size_download'] += \strlen($data);
$multi->handlesActivity[$id][] = $data;
$multi->handlesActivity[$id][] = null;
$multi->handlesActivity[$id][] = null;
} catch (\Throwable $e) {
$multi->handlesActivity[$id][] = null;
$multi->handlesActivity[$id][] = $e;
} finally {
$info['download_content_length'] = $info['size_download'];
private static function followRedirects(Request $originRequest, AmpClientState $multi, array &$info, array &$headers, CancellationTokenSource $canceller, array $options, \Closure $onProgress, &$handle, ?LoggerInterface $logger, Promise &$pause)
yield $pause;
$originRequest->setBody(new AmpBody($options['body'], $info, $onProgress));
$response = yield $multi->request($options, $originRequest, $canceller->getToken(), $info, $onProgress, $handle);
$previousUrl = null;
while (true) {
self::addResponseHeaders($response, $info, $headers);
$status = $response->getStatus();
if (!\in_array($status, [301, 302, 303, 307, 308], true) || null === $location = $response->getHeader('location')) {
return $response;
$urlResolver = new class() {
use HttpClientTrait {
parseUrl as public;
resolveUrl as public;
try {
$previousUrl = $previousUrl ?? $urlResolver::parseUrl($info['url']);
$location = $urlResolver::parseUrl($location);
$location = $urlResolver::resolveUrl($location, $previousUrl);
$info['redirect_url'] = implode('', $location);
} catch (InvalidArgumentException $e) {
return $response;
if (0 >= $options['max_redirects'] || $info['redirect_count'] >= $options['max_redirects']) {
return $response;
$logger && $logger->info(sprintf('Redirecting: "%s %s"', $status, $info['url']));
try {
// Discard body of redirects
while (null !== yield $response->getBody()->read()) {
} catch (HttpException|StreamException $e) {
// Ignore streaming errors on previous responses
$info['url'] = $info['redirect_url'];
$info['redirect_url'] = null;
$previousUrl = $location;
$request = new Request($info['url'], $info['http_method']);
if (\in_array($status, [301, 302, 303], true)) {
// Do like curl and browsers: turn POST to GET on 301, 302 and 303
if ('POST' === $response->getRequest()->getMethod() || 303 === $status) {
$info['http_method'] = 'HEAD' === $response->getRequest()->getMethod() ? 'HEAD' : 'GET';
} else {
foreach ($originRequest->getRawHeaders() as [$name, $value]) {
$request->addHeader($name, $value);
if ($request->getUri()->getAuthority() !== $originRequest->getUri()->getAuthority()) {
yield $pause;
$response = yield $multi->request($options, $request, $canceller->getToken(), $info, $onProgress, $handle);
$info['redirect_time'] = microtime(true) - $info['start_time'];
private static function addResponseHeaders(Response $response, array &$info, array &$headers): void
$info['http_code'] = $response->getStatus();
if ($headers) {
$info['debug'] .= "< \r\n";
$headers = [];
$h = sprintf('HTTP/%s %s %s', $response->getProtocolVersion(), $response->getStatus(), $response->getReason());
$info['debug'] .= "< {$h}\r\n";
$info['response_headers'][] = $h;
foreach ($response->getRawHeaders() as [$name, $value]) {
$headers[strtolower($name)][] = $value;
$h = $name.': '.$value;
$info['debug'] .= "< {$h}\r\n";
$info['response_headers'][] = $h;
$info['debug'] .= "< \r\n";
* Accepts pushed responses only if their headers related to authentication match the request.
private static function getPushedResponse(Request $request, AmpClientState $multi, array &$info, array &$headers, array $options, ?LoggerInterface $logger)
if ('' !== $options['body']) {
return null;
$authority = $request->getUri()->getAuthority();
foreach ($multi->pushedResponses[$authority] ?? [] as $i => [$pushedUrl, $pushDeferred, $pushedRequest, $pushedResponse, $parentOptions]) {
if ($info['url'] !== $pushedUrl || $info['http_method'] !== $pushedRequest->getMethod()) {
foreach ($parentOptions as $k => $v) {
if ($options[$k] !== $v) {
continue 2;
foreach (['authorization', 'cookie', 'range', 'proxy-authorization'] as $k) {
if ($pushedRequest->getHeaderArray($k) !== $request->getHeaderArray($k)) {
continue 2;
$response = yield $pushedResponse;
foreach ($response->getHeaderArray('vary') as $vary) {
foreach (preg_split('/\s*+,\s*+/', $vary) as $v) {
if ('*' === $v || ($pushedRequest->getHeaderArray($v) !== $request->getHeaderArray($v) && 'accept-encoding' !== strtolower($v))) {
$logger && $logger->debug(sprintf('Skipping pushed response: "%s"', $info['url']));
continue 3;
$logger && $logger->debug(sprintf('Accepting pushed response: "%s %s"', $info['http_method'], $info['url']));
self::addResponseHeaders($response, $info, $headers);
if (!$multi->pushedResponses[$authority]) {
return $response;
private static function stopLoop(): void
if (null !== self::$delay) {
self::$delay = null;
Loop::defer([Loop::class, 'stop']);