| 
<?phpnamespace Aws\Multipart;
 
 use Aws\AwsClientInterface as Client;
 use GuzzleHttp\Psr7;
 use InvalidArgumentException as IAE;
 use Psr\Http\Message\StreamInterface as Stream;
 
 abstract class AbstractUploader extends AbstractUploadManager
 {
 /** @var Stream Source of the data to be uploaded. */
 protected $source;
 
 /**
 * @param Client $client
 * @param mixed  $source
 * @param array  $config
 */
 public function __construct(Client $client, $source, array $config = [])
 {
 $this->source = $this->determineSource($source);
 parent::__construct($client, $config);
 }
 
 /**
 * Create a stream for a part that starts at the current position and
 * has a length of the upload part size (or less with the final part).
 *
 * @param Stream $stream
 *
 * @return Psr7\LimitStream
 */
 protected function limitPartStream(Stream $stream)
 {
 // Limit what is read from the stream to the part size.
 return new Psr7\LimitStream(
 $stream,
 $this->state->getPartSize(),
 $this->source->tell()
 );
 }
 
 protected function getUploadCommands(callable $resultHandler)
 {
 // Determine if the source can be seeked.
 $seekable = $this->source->isSeekable()
 && $this->source->getMetadata('wrapper_type') === 'plainfile';
 
 for ($partNumber = 1; $this->isEof($seekable); $partNumber++) {
 // If we haven't already uploaded this part, yield a new part.
 if (!$this->state->hasPartBeenUploaded($partNumber)) {
 $partStartPos = $this->source->tell();
 if (!($data = $this->createPart($seekable, $partNumber))) {
 break;
 }
 $command = $this->client->getCommand(
 $this->info['command']['upload'],
 $data + $this->state->getId()
 );
 $command->getHandlerList()->appendSign($resultHandler, 'mup');
 yield $command;
 if ($this->source->tell() > $partStartPos) {
 continue;
 }
 }
 
 // Advance the source's offset if not already advanced.
 if ($seekable) {
 $this->source->seek(min(
 $this->source->tell() + $this->state->getPartSize(),
 $this->source->getSize()
 ));
 } else {
 $this->source->read($this->state->getPartSize());
 }
 }
 }
 
 /**
 * Generates the parameters for an upload part by analyzing a range of the
 * source starting from the current offset up to the part size.
 *
 * @param bool $seekable
 * @param int  $number
 *
 * @return array|null
 */
 abstract protected function createPart($seekable, $number);
 
 /**
 * Checks if the source is at EOF.
 *
 * @param bool $seekable
 *
 * @return bool
 */
 private function isEof($seekable)
 {
 return $seekable
 ? $this->source->tell() < $this->source->getSize()
 : !$this->source->eof();
 }
 
 /**
 * Turns the provided source into a stream and stores it.
 *
 * If a string is provided, it is assumed to be a filename, otherwise, it
 * passes the value directly to `Psr7\stream_for()`.
 *
 * @param mixed $source
 *
 * @return Stream
 */
 private function determineSource($source)
 {
 // Use the contents of a file as the data source.
 if (is_string($source)) {
 $source = Psr7\try_fopen($source, 'r');
 }
 
 // Create a source stream.
 $stream = Psr7\stream_for($source);
 if (!$stream->isReadable()) {
 throw new IAE('Source stream must be readable.');
 }
 
 return $stream;
 }
 }
 
 |