vendor/symfony/messenger/Command/FailedMessagesRetryCommand.php line 45

  1. <?php
  2. /*
  3.  * This file is part of the Symfony package.
  4.  *
  5.  * (c) Fabien Potencier <fabien@symfony.com>
  6.  *
  7.  * For the full copyright and license information, please view the LICENSE
  8.  * file that was distributed with this source code.
  9.  */
  10. namespace Symfony\Component\Messenger\Command;
  11. use Psr\Log\LoggerInterface;
  12. use Symfony\Component\Console\Attribute\AsCommand;
  13. use Symfony\Component\Console\Exception\RuntimeException;
  14. use Symfony\Component\Console\Input\InputArgument;
  15. use Symfony\Component\Console\Input\InputInterface;
  16. use Symfony\Component\Console\Input\InputOption;
  17. use Symfony\Component\Console\Output\ConsoleOutputInterface;
  18. use Symfony\Component\Console\Output\OutputInterface;
  19. use Symfony\Component\Console\Style\SymfonyStyle;
  20. use Symfony\Component\EventDispatcher\EventDispatcherInterface;
  21. use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
  22. use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
  23. use Symfony\Component\Messenger\MessageBusInterface;
  24. use Symfony\Component\Messenger\Stamp\MessageDecodingFailedStamp;
  25. use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
  26. use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
  27. use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
  28. use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
  29. use Symfony\Component\Messenger\Worker;
  30. use Symfony\Contracts\Service\ServiceProviderInterface;
  31. /**
  32.  * @author Ryan Weaver <ryan@symfonycasts.com>
  33.  */
  34. #[AsCommand(name'messenger:failed:retry'description'Retry one or more messages from the failure transport')]
  35. class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
  36. {
  37.     private EventDispatcherInterface $eventDispatcher;
  38.     private MessageBusInterface $messageBus;
  39.     private ?LoggerInterface $logger;
  40.     public function __construct(?string $globalReceiverNameServiceProviderInterface $failureTransportsMessageBusInterface $messageBusEventDispatcherInterface $eventDispatcherLoggerInterface $logger nullPhpSerializer $phpSerializer null)
  41.     {
  42.         $this->eventDispatcher $eventDispatcher;
  43.         $this->messageBus $messageBus;
  44.         $this->logger $logger;
  45.         parent::__construct($globalReceiverName$failureTransports$phpSerializer);
  46.     }
  47.     protected function configure(): void
  48.     {
  49.         $this
  50.             ->setDefinition([
  51.                 new InputArgument('id'InputArgument::IS_ARRAY'Specific message id(s) to retry'),
  52.                 new InputOption('force'nullInputOption::VALUE_NONE'Force action without confirmation'),
  53.                 new InputOption('transport'nullInputOption::VALUE_OPTIONAL'Use a specific failure transport'self::DEFAULT_TRANSPORT_OPTION),
  54.             ])
  55.             ->setHelp(<<<'EOF'
  56. The <info>%command.name%</info> retries message in the failure transport.
  57.     <info>php %command.full_name%</info>
  58. The command will interactively ask if each message should be retried
  59. or discarded.
  60. Some transports support retrying a specific message id, which comes
  61. from the <info>messenger:failed:show</info> command.
  62.     <info>php %command.full_name% {id}</info>
  63. Or pass multiple ids at once to process multiple messages:
  64. <info>php %command.full_name% {id1} {id2} {id3}</info>
  65. EOF
  66.             )
  67.         ;
  68.     }
  69.     protected function execute(InputInterface $inputOutputInterface $output): int
  70.     {
  71.         $this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
  72.         $io = new SymfonyStyle($input$output instanceof ConsoleOutputInterface $output->getErrorOutput() : $output);
  73.         $io->comment('Quit this command with CONTROL-C.');
  74.         if (!$output->isVeryVerbose()) {
  75.             $io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
  76.         }
  77.         $failureTransportName $input->getOption('transport');
  78.         if (self::DEFAULT_TRANSPORT_OPTION === $failureTransportName) {
  79.             $this->printWarningAvailableFailureTransports($io$this->getGlobalFailureReceiverName());
  80.         }
  81.         if ('' === $failureTransportName || null === $failureTransportName) {
  82.             $failureTransportName $this->interactiveChooseFailureTransport($io);
  83.         }
  84.         $failureTransportName self::DEFAULT_TRANSPORT_OPTION === $failureTransportName $this->getGlobalFailureReceiverName() : $failureTransportName;
  85.         $receiver $this->getReceiver($failureTransportName);
  86.         $this->printPendingMessagesMessage($receiver$io);
  87.         $io->writeln(sprintf('To retry all the messages, run <comment>messenger:consume %s</comment>'$failureTransportName));
  88.         $shouldForce $input->getOption('force');
  89.         $ids $input->getArgument('id');
  90.         if (=== \count($ids)) {
  91.             if (!$input->isInteractive()) {
  92.                 throw new RuntimeException('Message id must be passed when in non-interactive mode.');
  93.             }
  94.             $this->runInteractive($failureTransportName$io$shouldForce);
  95.             return 0;
  96.         }
  97.         $this->retrySpecificIds($failureTransportName$ids$io$shouldForce);
  98.         $io->success('All done!');
  99.         return 0;
  100.     }
  101.     private function runInteractive(string $failureTransportNameSymfonyStyle $iobool $shouldForce)
  102.     {
  103.         $receiver $this->failureTransports->get($failureTransportName);
  104.         $count 0;
  105.         if ($receiver instanceof ListableReceiverInterface) {
  106.             // for listable receivers, find the messages one-by-one
  107.             // this avoids using get(), which for some less-robust
  108.             // transports (like Doctrine), will cause the message
  109.             // to be temporarily "acked", even if the user aborts
  110.             // handling the message
  111.             while (true) {
  112.                 $envelopes = [];
  113.                 $this->phpSerializer?->acceptPhpIncompleteClass();
  114.                 try {
  115.                     foreach ($receiver->all(1) as $envelope) {
  116.                         ++$count;
  117.                         $envelopes[] = $envelope;
  118.                     }
  119.                 } finally {
  120.                     $this->phpSerializer?->rejectPhpIncompleteClass();
  121.                 }
  122.                 // break the loop if all messages are consumed
  123.                 if (=== \count($envelopes)) {
  124.                     break;
  125.                 }
  126.                 $this->retrySpecificEnvelopes($envelopes$failureTransportName$io$shouldForce);
  127.             }
  128.         } else {
  129.             // get() and ask messages one-by-one
  130.             $count $this->runWorker($failureTransportName$receiver$io$shouldForce);
  131.         }
  132.         // avoid success message if nothing was processed
  133.         if (<= $count) {
  134.             $io->success('All failed messages have been handled or removed!');
  135.         }
  136.     }
  137.     private function runWorker(string $failureTransportNameReceiverInterface $receiverSymfonyStyle $iobool $shouldForce): int
  138.     {
  139.         $count 0;
  140.         $listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io$receiver$shouldForce, &$count) {
  141.             ++$count;
  142.             $envelope $messageReceivedEvent->getEnvelope();
  143.             $this->displaySingleMessage($envelope$io);
  144.             if ($envelope->last(MessageDecodingFailedStamp::class)) {
  145.                 throw new \RuntimeException(sprintf('The message with id "%s" could not decoded, it can only be shown or removed.'$this->getMessageId($envelope) ?? '?'));
  146.             }
  147.             $shouldHandle $shouldForce || $io->confirm('Do you want to retry (yes) or delete this message (no)?');
  148.             if ($shouldHandle) {
  149.                 return;
  150.             }
  151.             $messageReceivedEvent->shouldHandle(false);
  152.             $receiver->reject($envelope);
  153.         };
  154.         $this->eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);
  155.         $worker = new Worker(
  156.             [$failureTransportName => $receiver],
  157.             $this->messageBus,
  158.             $this->eventDispatcher,
  159.             $this->logger
  160.         );
  161.         try {
  162.             $worker->run();
  163.         } finally {
  164.             $this->eventDispatcher->removeListener(WorkerMessageReceivedEvent::class, $listener);
  165.         }
  166.         return $count;
  167.     }
  168.     private function retrySpecificIds(string $failureTransportName, array $idsSymfonyStyle $iobool $shouldForce)
  169.     {
  170.         $receiver $this->getReceiver($failureTransportName);
  171.         if (!$receiver instanceof ListableReceiverInterface) {
  172.             throw new RuntimeException(sprintf('The "%s" receiver does not support retrying messages by id.'$failureTransportName));
  173.         }
  174.         foreach ($ids as $id) {
  175.             $this->phpSerializer?->acceptPhpIncompleteClass();
  176.             try {
  177.                 $envelope $receiver->find($id);
  178.             } finally {
  179.                 $this->phpSerializer?->rejectPhpIncompleteClass();
  180.             }
  181.             if (null === $envelope) {
  182.                 throw new RuntimeException(sprintf('The message "%s" was not found.'$id));
  183.             }
  184.             $singleReceiver = new SingleMessageReceiver($receiver$envelope);
  185.             $this->runWorker($failureTransportName$singleReceiver$io$shouldForce);
  186.         }
  187.     }
  188.     private function retrySpecificEnvelopes(array $envelopesstring $failureTransportNameSymfonyStyle $iobool $shouldForce)
  189.     {
  190.         $receiver $this->getReceiver($failureTransportName);
  191.         foreach ($envelopes as $envelope) {
  192.             $singleReceiver = new SingleMessageReceiver($receiver$envelope);
  193.             $this->runWorker($failureTransportName$singleReceiver$io$shouldForce);
  194.         }
  195.     }
  196. }