Thursday, July 7, 2022
HomeBig DataHow Streaming Queries Are Monitored in Pyspark

How Streaming Queries Are Monitored in Pyspark

Streaming is likely one of the most vital information processing strategies for ingestion and evaluation. It offers customers and builders with low latency and real-time information processing capabilities for analytics and triggering actions. Nevertheless, monitoring streaming information workloads is difficult as a result of the information is constantly processed because it arrives. Due to this always-on nature of stream processing, it’s tougher to troubleshoot issues throughout improvement and manufacturing with out real-time metrics, alerting and dashboarding.

Structured Streaming in Apache Spark™ addresses the issue of monitoring by offering:

Till now, the Observable API has been lacking in PySpark, which forces customers to make use of the Scala API for his or her streaming queries to avail the performance of alerting and dashboarding with different exterior techniques. The dearth of this performance in Python has turn into extra crucial because the significance of Python grows, provided that virtually 70% of pocket book instructions run on Databricks are in Python.

In Databricks Runtime 11, we’re completely happy to announce that the Observable API is now accessible in PySpark. On this weblog publish, we introduce the Python Observable API for Structured Streaming, together with a step-by-step instance of a situation that provides alerting logic right into a streaming question.

Observable API

Builders can now ship streaming metrics to exterior techniques, e.g., for alerting and dashboarding with customized metrics, utilizing a mix of the streaming question listener interface and the Observable API in PySpark. The Streaming Question Listener interface is an summary class that must be inherited and will implement all strategies as proven under:

from pyspark.sql.streaming import StreamingQueryListener

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, occasion):
        Referred to as when a question is began.

        occasion: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties can be found as the identical as Scala API.

        That is known as synchronously with
        that's, ``onQueryStart`` can be known as on all listeners earlier than
        ``DataStreamWriter.begin()`` returns the corresponding
        Don't block on this technique as it'll block your question.

    def onQueryProgress(self, occasion):
        Referred to as when there's some standing replace (ingestion price up to date, and so on.)

        occasion: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties can be found as the identical as Scala API.

        This technique is asynchronous. The standing in
        :class:`pyspark.sql.streaming.StreamingQuery` will at all times be
        newest irrespective of when this technique is known as. Subsequently, the standing
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        could also be modified earlier than/if you course of the occasion.
        For instance, chances are you'll discover :class:`StreamingQuery`
        is terminated when you find yourself processing `QueryProgressEvent`.

    def onQueryTerminated(self, occasion):
        Referred to as when a question is stopped, with or with out error.

        occasion: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties can be found as the identical as Scala API.

my_listener = MyListener()

Word that all of them work asynchronously.

  • StreamingQueryListener.onQueryStarted is triggered when a streaming question is began, e.g., DataStreamWriter.begin.
  • StreamingQueryListener.onQueryProgress is invoked when every micro-batch execution is completed.
  • StreamingQueryListener.onQueryTerminated is known as when the question is stopped, e.g., StreamingQuery.cease.

The listener must be added as a way to be activated through StreamingQueryManager and can be eliminated later as proven under:


In an effort to seize customized metrics, they must be added through DataFrame.observe. The customized metrics are outlined as arbitrary mixture capabilities corresponding to rely("worth") as proven under.

df.observe("title", rely(column), ...)

Error Alert Situation

On this part, we’ll describe an instance of an actual world use case with the Observable API. Suppose you’ve gotten a listing the place new CSV information are constantly arriving from one other system, and it’s a must to ingest them in a streaming trend. On this instance, we’ll use an area file system for simplicity in order that the API may be simply understood. The code snippets under may be copied and pasted within the pyspark shell so that you can run and check out.

First, let’s import the mandatory Python lessons and packages, then create a listing known as my_csv_dir that can be used on this situation.

import os
import shutil
import time
from pathlib import Path

from pyspark.sql.capabilities import rely, col, lit
from pyspark.sql.streaming import StreamingQueryListener

# NOTE: exchange `basedir` with the fused path, e.g., "/dbfs/tmp" in Databricks
# pocket book.
basedir = os.getcwd()  # "/dbfs/tmp"

# My CSV information can be created on this listing later after cleansing 'my_csv_dir'
# listing up in case you already ran this instance under.
my_csv_dir = part of(basedir, "my_csv_dir")
shutil.rmtree(my_csv_dir, ignore_errors=True)

Subsequent, we outline our personal customized streaming question listener. The listener will alert when there are too many malformed information throughout CSV ingestion for every course of. If the malformed information are greater than 50% of the full rely of processed information, we’ll print out a log message. Nevertheless, in manufacturing eventualities, you may connect with the exterior techniques as a substitute of merely printing out.

# Outline my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, occasion):
        print(f"'{occasion.title}' [{}] obtained began!")
    def onQueryProgress(self, occasion):
        row = occasion.progress.observedMetrics.get("metric")
        if row will not be None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"information {row.malformed} out of {row.cnt}!")
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, occasion):
        print(f"{} obtained terminated!")

# Add my listener.
my_listener = MyListener()

To activate the listener, we add it earlier than the question on this instance. Nevertheless, you will need to observe that you could add the listener whatever the question begin and termination as a result of they work asynchronously. This lets you connect and detach to your working streaming queries with out halting them.

Now we’ll begin a streaming question that ingests the information in my_csv_dir listing. Throughout processing, we additionally observe the variety of malformed information and processed information. The CSV information supply shops malformed information at _corrupt_record, by default, so we’ll rely the column for the variety of malformed information.

# Now, begin a streaming question that displays 'my_csv_dir' listing.
# Each time when there are new CSV information arriving right here, we'll course of them.
my_csv = spark.readStream.schema(
    "my_key INT, my_val DOUBLE, _corrupt_record STRING"
# `DataFrame.observe` computes the counts of processed and malformed information,
# and sends an occasion to the listener.
my_observed_csv = my_csv.observe(
    rely(lit(1)).alias("cnt"),  # variety of processed rows
    rely(col("_corrupt_record")).alias("malformed"))  # variety of malformed rows
my_query = my_observed_csv.writeStream.format(
    "console").queryName("My observer").begin()

Now that we now have outlined the streaming question and the alerting capabilities, let’s create CSV information to allow them to be ingested in a streaming trend:

# Now, we'll write CSV information to be processed in a streaming method on time.
# This CSV file is all well-formed.
with open( part of(my_csv_dir, "my_csv_1.csv"), "w") as f:
    _ = f.write("1,1.1n")
    _ = f.write("123,123.123n")

time.sleep(5)  # Assume that one other CSV file arrived in 5 seconds.

# Ouch! it has two malformed information out of three. My observer question ought to alert it!
with open( part of(my_csv_dir, "my_csv_error.csv"), "w") as f:
    _ = f.write("1,1.123n")
    _ = f.write("Ouch! malformed report!n")
    _ = f.write("Arrgggh!n")

time.sleep(5)  # OK, all carried out. Let's cease the question in 5 seconds.

Right here we’ll see that the question begin, termination and processes are logged correctly. As a result of there are two malformed information within the CSV information, the alert is raised correctly with the next error message:

ALERT! Ouch! there are too many malformed information 2 out of three!


PySpark customers at the moment are in a position to set their customized metrics and observe them through the streaming question listener interface and Observable API. They will connect and detach such logic into working queries dynamically when wanted. This characteristic addresses the necessity for dashboarding, alerting and reporting to different exterior techniques.

The Streaming Question Listener interface and Observable API can be found in DBR 11 Beta, and anticipated to be accessible sooner or later Apache Spark. Check out each new capabilities immediately on Databricks by DBR 11 Beta.



Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments