Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-17099

Improve the process exception logs with the exact processor node name in which processing exceptions occur

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 3.9.0
    • 4.0.0
    • streams
    • None

    Description

      Current Behaviour

      When an exception occurs in a processor node, the task executor does not log the actual processor node where the exception occurs.
       
      For example, considering the following topology:
       
      Topologies:
         Sub-topology: 0
          Source: KSTREAM-SOURCE-0000000000 (topics: [PERSON_TOPIC])
            --> KSTREAM-PEEK-0000000001
          Processor: KSTREAM-PEEK-0000000001 (stores: [])
            --> KSTREAM-MAP-0000000002
            <-- KSTREAM-SOURCE-0000000000
          Processor: KSTREAM-MAP-0000000002 (stores: [])
            --> KSTREAM-SINK-0000000003
            <-- KSTREAM-PEEK-0000000001
          Sink: KSTREAM-SINK-0000000003 (topic: PERSON_MAP_TOPIC)
            <-- KSTREAM-MAP-0000000002
       
      When an exception is thrown in the processor KSTREAM-MAP-0000000002, the following information will be logged by the task executor:
       
      2024-07-08T22:17:19.926+02:00  INFO 10552 — [-StreamThread-1] i.g.l.s.map.app.KafkaStreamsTopology     : Received key = 0, value = {"id": 0, "firstName": "Ethan", "lastName": "Moore", "nationality": "CH", "birthDate": "2011-02-21T15:45:12Z"}
      2024-07-08T22:17:30.082+02:00 ERROR 10552 — [-StreamThread-1] o.a.k.s.p.internals.TaskExecutor         : stream-thread [streams-map-StreamThread-1] Failed to process stream task 0_0 due to the following error:
       
      org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC, partition=0, offset=0, stacktrace=java.lang.RuntimeException: Something bad happened...
      at io.github.loicgreffier.streams.map.app.KafkaStreamsTopology.lambda$topology$1(KafkaStreamsTopology.java:33)
      at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:46)
      at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
      at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
      at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
      at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
      at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810)
      at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
      at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810)
      at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741)
      at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
      at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
      at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765)
      at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
      at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
       
      at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:767) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) ~[kafka-streams-3.6.1.jar:na]
      Caused by: java.lang.RuntimeException: Something bad happened...
      at io.github.loicgreffier.streams.map.app.KafkaStreamsTopology.lambda$topology$1(KafkaStreamsTopology.java:33) ~[classes/:na]
      at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:46) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810) ~[kafka-streams-3.6.1.jar:na]
      at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741) ~[kafka-streams-3.6.1.jar:na]
      ... 6 common frames omitted
       
      On line #1 of the stack trace, it appears that an exception has been caught in the processor KSTREAM-SOURCE-0000000000 while the exception actually occurred in KSTREAM-MAP-0000000002.

      Expected Behaviour

      The stack trace should provide the precise node in which the exception occurred (e.g., KSTREAM-MAP-0000000002).

      Current Limitation

      The current limitation is that processing exceptions are caught in the stream task#process where it is not possible to get the exact processor node where the exception occurred.

      Improvement Proposal

      With the changes brought by KAFKA-16448, processing exceptions will be caught at the processor node level and wrapped into an internal exception named FailedProcessingException before being thrown to the stream task.

       

      This change should allow to identify the precise processor node where a processing exception occurs and bring its name up to the stream task where it will be used to build the StreamsException that will appear in the logs.

       

      This improvement comes from the following PR https://github.com/apache/kafka/pull/16093 and the following discussion https://github.com/apache/kafka/pull/16093#issuecomment-2200265168.

      Attachments

        Issue Links

          Activity

            People

              loicgreffier Loïc Greffier
              loicgreffier Loïc Greffier
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: