Log all tasks the Shopware 6 queue processes

The Shopware 6 software architecture heavily relies on a message queue to process tasks in the background. This is a fundamental change of how Shopware works compared to version 5, which did not have a message queue.

To fully leverage the benefits of Shopware 6 you should make use of the message queue as much as possible. Shopware can already do the following tasks in the background out of the box:

  • Store and Index Imported Products / Categories via the Sync API
  • Indexing of all entities
  • Warmup product and category pages in the HTTP Cache
  • Generate Thumbnails for Images
  • Generate Sitemap
  • Product Exports (Google Shopping Feed, …)
  • several others…

This has a lot of performance benefits, but it also introduces an operational risk: What if there is a bug, an error or any kind of problem in the queue? Will I see this and can I debug it?

You can make use of the canonical log line pattern to essentially generate yourself a more powerful equivalent of the webservers “access.log” for every task that the message queue processes. The output will look like this:

2021-08-15 10:39:58 ProductIndexing data=1791edec1f04464a8a4074423501e781 duration=0.190
2021-08-15 11:30:48 UpdateThumbnails mediaIds=102ac62ba27347a688030a05c1790db7 duration=0.015
2021-08-15 11:32:16 DeleteFile files=shirt_red_600x600_800x800.jpg duration=0.002
2021-08-15 12:08:25 WarmUp route=frontend.detail.page domain=http://shopware6.tideways.io cache_id=cc3e1493bc874b62b16c11cd291826f9 duration=0.003
2021-08-15 12:08:25 WarmUp route=frontend.navigation.page domain=http://shopware6.tideways.io cache_id=cc3e1493bc874b62b16c11cd291826f9 duration=0.002

This can easily be achieved, because the message queue is based on the Symfony Messenger component and it can be extended via so called Middleware.

We create a middleware class with the following structure:

namespace Shopware\Production\Messenger;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\StackInterface;

class TaskLoggingMiddleware implements MiddlewareInterface
{
    function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        return $stack->next()->handle($envelope, $stack);
    }
}

And we register it as a service that is a message queue middleware with this Symfony container service definition:

# config/packages/framework.yaml
framework:
  messenger:
    buses:
      messenger.bus.shopware:
        middleware:
          - "Shopware\\Core\\Framework\\MessageQueue\\Midleware\\RetryMiddleware"
          - "Shopware\\Production\\Messenger\\TaskLoggingMiddleware"

services:
- "Shopware\\Production\\Messenger\\TaskLoggingMiddleware": ~

This will run the middleware whenever a task is processed in the Shopware Message queue. Next up is the implementation of logging what is happening:

  • map each task into a task name
  • extract arguments from all known message task types to enhance the log output
  • handle errors in tasks
  • write the task processing to a file

On the macro level I use the following implementation for handle:

public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
    $message = $envelope->getMessage();
    $taskName = $this->getTaskName($message);
    $args = $this->extractArgumentsFromMessage($message);

    $start = microtime(true);
    try {
        return $stack->next()->handle($envelope, $stack);
    } catch (HandlerFailedException $e) {
        $args = $this->addExceptionToArgs($e, $args);

        throw $e;
    } finally {
        $this->logTaskProcessing($taskName, $args, $start);
    }
}

The try catch finally makes sure that regardless of an Exception, we always “logTaskProcessing” to store information about the processed task. We have two methods for mapping the message to a human readable task name and extract the arguments at the beginning and when an Exception occurs, we extract the exception class and message into the arguments.

The interesting part is “extractArgumentsFromMessage”, which has special code for a lot of the different Shopware internal messages, for example “EntityIndexingMessage”, “WarmUpMessage” or “ScheduledTask”.

private function extractArgumentsFromMessage($message)
{
    if ($message instanceof EntityIndexingMessage) {
        $data = $message->getData();

        if (is_array($data)) {
            return ['data' => implode(',', $data)];
        }
    } else if ($message instanceof ScheduledTask) {
        return ['taskId' => $message->getTaskId()];
    } else if ($message instanceof DeleteImportExportFile ||
              $message instanceof DeleteFileMessage) {
       return ['files' => implode(',', array_map(
           function ($f) { return basename($f); },
           $message->getFiles())
       )];
    } else if ($message instanceof GenerateThumbnailsMessage) {
       return ['mediaIds' => implode(',', $message->getMediaIds())];
    } else if ($message instanceof WarmUpMessage) {
       return [
           'route' => $message->getRoute(),
           'domain' => $message->getDomain(), 
           'cache_id' => $message->getCacheId()
       ];
    }

    return [];
}

When you add your own messages to Shopware, you should extend this method to add arguments to your tasks as well for the log file.

You can find the full code of the TaskLoggingMiddleware in this Gist. We are working to turn this into a Composer package in the near future.

Combining this with Tideways, you can even get aggregated performance insights and profiling for Message Queue on top using the “tideways/symfony-messenger-middleware” package.

Related content:

Benjamin Benjamin 23.08.2021