Articles

Best Practices für das Caching in Spark SQL

In Spark ist SQL-Caching eine gängige Technik zur Wiederverwendung einiger Berechnungen. Es hat das Potenzial, andere Abfragen zu beschleunigen, die dieselben Daten verwenden, aber es gibt einige Einschränkungen, die zu beachten sind, wenn wir eine gute Leistung erzielen möchten. In diesem Artikel werfen wir einen Blick unter die Haube, um zu sehen, wie das Caching intern funktioniert, und wir werden versuchen, das Verhalten von Spark in Bezug auf die Datenpersistenz zu entmystifizieren.

In der DataFrame-API gibt es zwei Funktionen, mit denen ein DataFrame zwischengespeichert werden kann: cache() und persist():

df.cache() # see in PySpark docs heredf.persist() # see in PySpark docs here

Sie sind fast gleichwertig, der Unterschied besteht darin, dass persist ein optionales Argument storageLevel mit dem wir angeben können, wo die Daten gespeichert werden. Der Standardwert von storageLevel für beide Funktionen ist MEMORY_AND_DISK, was bedeutet, dass die Daten im Speicher gespeichert werden, wenn Platz dafür vorhanden ist. Hier sehen Sie die (PySpark) Dokumentation für andere mögliche Speicherebenen.

Caching ist eine verzögerte Transformation, daher passiert unmittelbar nach dem Aufruf der Funktion nichts mit den Daten, aber der Abfrageplan wird vom Cache—Manager aktualisiert, indem ein neuer Operator hinzugefügt wird – InMemoryRelation . Dies sind also nur einige Informationen, die später während der Abfrageausführung verwendet werden, wenn eine Aktion aufgerufen wird. Spark sucht nach den Daten in der Caching-Ebene und liest sie von dort aus, wenn sie verfügbar sind. Wenn die Daten nicht in der Caching-Ebene gefunden werden (was bei der ersten Ausführung der Abfrage mit Sicherheit der Fall ist), ist sie dafür verantwortlich, die Daten dorthin zu bringen, und verwendet sie unmittelbar danach.

Cache-Manager

Der Cache-Manager ist dafür verantwortlich, zu verfolgen, welche Berechnungen im Hinblick auf den Abfrageplan bereits zwischengespeichert wurden. Wenn die Caching-Funktion aufgerufen wird, wird der Cache-Manager direkt unter der Haube aufgerufen und ruft den analysierten logischen Plan des Datenrahmens ab, in dem die Caching-Funktion aufgerufen wird, und speichert diesen Plan in einer indizierten Sequenz namens cachedData.

Die Phase des Cache Managers ist Teil der logischen Planung und findet nach dem Analyzer und vor dem Optimizer statt:

Wenn Sie eine Abfrage mit einer Aktion ausführen, wird der Abfrageplan verarbeitet und transformiert. Im Schritt des Cache-Managers (kurz vor dem Optimierer) überprüft Spark für jeden Teilbaum des analysierten Plans, ob er in der cachedData-Sequenz gespeichert ist. Wenn es eine Übereinstimmung findet, bedeutet dies, dass derselbe Plan (dieselbe Berechnung) bereits zwischengespeichert wurde (möglicherweise in einer früheren Abfrage), und Spark kann dies verwenden, und fügt diese Informationen mithilfe des InMemoryRelation Operators zum Abfrageplan hinzu, der Informationen zu diesem zwischengespeicherten Plan enthält. Diese InMemoryRelation wird dann in der Phase der physischen Planung verwendet, um einen physischen Operator zu erstellen — InMemoryTableScan.

df = spark.table("users").filter(col(col_name) > x).cache()df.count() # now check the query plan in Spark UI

Hier im obigen Bild sehen Sie eine grafische und String-Darstellung einer Abfrage, die Caching verwendete. Um zu sehen, welche Transformationen zwischengespeichert wurden, müssen Sie sich die Zeichenfolgendarstellung des Plans ansehen, da die grafische Darstellung diese Informationen nicht anzeigt.

Grundlegendes Beispiel

Sehen wir uns ein einfaches Beispiel an, um besser zu verstehen, wie der Cache-Manager funktioniert:

df = spark.read.parquet(data_path)df.select(col1, col2).filter(col2 > 0).cache()

Betrachten Sie die folgenden drei Abfragen. Welcher von ihnen wird die zwischengespeicherten Daten nutzen?

1) df.filter(col2 > 0).select(col1, col2)2) df.select(col1, col2).filter(col2 > 10)3) df.select(col1).filter(col2 > 0)

Entscheidend ist der analysierte logische Plan. Wenn es mit dem analysierten Plan der zwischengespeicherten Abfrage übereinstimmt, wird der Cache genutzt. Für Abfrage Nummer 1 könnten Sie versucht sein zu sagen, dass es den gleichen Plan hat, weil der Filter in beiden Fällen sowieso vom Optimierer geschoben wird. Aber das ist eigentlich nicht ganz richtig. Wichtig zu verstehen ist, dass die Phase des Cache-Managers vor dem Optimierer stattfindet. Was gleich wäre, sind die optimierten Pläne, aber keine analysierten Pläne. Also Abfrage n. 1 wird den Cache nicht nutzen, nur weil die analysierten Pläne unterschiedlich sind.

Für Abfrage n. 2 könnten Sie erneut versucht sein anzunehmen, dass die zwischengespeicherten Daten verwendet werden, da der Filter restriktiver ist als der Filter in der zwischengespeicherten Abfrage. Wir können logisch sehen, dass sich die abgefragten Daten im Cache befinden, aber Spark liest sie aus demselben Grund wie zuvor nicht von dort — die analysierten Pläne sind unterschiedlich — diesmal ist die Filterbedingung nicht dieselbe. Um die zwischengespeicherten Daten zu verwenden, können wir die zweite Abfrage jedoch beheben, indem wir dort den Filter hinzufügen:

df.select(col1, col2).filter(col2 > 0).filter(col2 > 10)

Auf den ersten Blick scheint der Filter col2 > 0 hier nutzlos zu sein, aber nicht, weil jetzt ein Teil des analysierten logischen Plans mit dem zwischengespeicherten Plan identisch ist und der Cache-Manager ihn finden und verwenden kann die InMemoryRelation im Abfrageplan.

Abfrage Nummer 3 ist schwierig, auf den ersten Blick scheint es, dass es auch einen anderen Analyseplan haben wird, weil die Abfrage anders ist — wir wählen nur col1. Die Filterbedingung verwendet jedoch col2 , das in der vorherigen Projektion nicht vorhanden ist, und daher ruft der Analysator eine Regel ResolveMissingReferences auf und fügt der Projektion col2 hinzu, und der analysierte Plan wird tatsächlich identisch mit dem zwischengespeicherten Plan. Dieses Mal wird der Cache-Manager es finden und verwenden.

Die endgültige Antwort lautet also, dass Abfrage n. 3 die zwischengespeicherten Daten nutzt.

Best Practices

Lassen Sie uns ein paar Faustregeln für das Caching auflisten:

  • Wenn Sie einen Datenrahmen zwischenspeichern, erstellen Sie eine neue Variable dafür cachedDF = df .Cache(). Auf diese Weise können Sie die Probleme umgehen, die wir in unserem Beispiel gelöst haben, und manchmal ist nicht klar, was der analysierte Plan ist und was tatsächlich zwischengespeichert wurde. Hier, wenn Sie cachedDF aufrufen.select(…) es wird die zwischengespeicherten Daten nutzen.
  • Deaktivieren Sie den Datenrahmen, nachdem er mit cachedDF nicht mehr benötigt wird.unpersist(). Wenn die Caching-Schicht voll wird, beginnt Spark, die Daten mithilfe der LRU-Strategie (Least Recently Used) aus dem Speicher zu entfernen. Daher ist es eine gute Praxis, unpersist zu verwenden, um mehr Kontrolle darüber zu behalten, was vertrieben werden sollte. Je mehr Speicherplatz Sie im Speicher haben, desto mehr kann Spark für die Ausführung verwenden, z. B. zum Erstellen von Hash-Maps usw.
  • Stellen Sie vor dem Zwischenspeichern sicher, dass Sie nur das zwischenspeichern, was Sie in Ihren Abfragen benötigen. Wenn beispielsweise eine Abfrage (col1, col2, col3) und die zweite Abfrage (col2, col3, col4) verwendet, wählen Sie eine Obermenge dieser Spalten aus: cachedDF = df .wählen Sie (col1, col2, col3, col4).Cache(). Es ist nicht sehr nützlich, cachedDF = df aufzurufen.cache() wenn der df viele Spalten enthält und nur eine kleine Teilmenge in Folgeabfragen benötigt wird.
  • Verwenden Sie das Caching nur, wenn es sinnvoll ist. Das heißt, wenn die zwischengespeicherte Berechnung mehrmals verwendet wird. Es ist gut zu verstehen, dass das Speichern der Daten auch mit einem gewissen Overhead verbunden ist, so dass es in einigen Fällen sogar noch schneller sein kann, die Berechnung einfach erneut auszuführen, wenn sie schnell genug ist, und überhaupt kein Caching zu verwenden, wie Sie sehen können im nächsten Absatz.

Schneller als Caching

Es gibt Situationen, in denen Caching überhaupt nicht hilft und im Gegenteil die Ausführung verlangsamt. Dies bezieht sich beispielsweise auf Abfragen, die auf großen Datensätzen basieren, die in einem spaltenförmigen Dateiformat gespeichert sind, das Spaltenbereinigung und Prädikat-Pushdown wie Parquet unterstützt. Betrachten wir das folgende Beispiel, in dem wir das gesamte Dataset zwischenspeichern und dann einige Abfragen darüber ausführen. Wir werden die folgenden Dataset- und Cluster-Eigenschaften verwenden:

dataset size: 14.3GB in compressed parquet sitting on S3
cluster size: 2 workers c5.4xlarge (32 cores together)
platform: Databricks (runtime 6.6 wit Spark 2.4.5)

Messen wir zunächst die Ausführungszeiten für die Abfragen, bei denen das Caching nicht verwendet wird:

df = spark.table(table_name)df.count() # runs 7.9sdf.filter(col("id") > xxx).count() # runs 18.2s

Führen Sie nun dieselben Abfragen mit Caching aus (das gesamte Dataset passt nicht in den Speicher und etwa 30% werden auf der Festplatte zwischengespeichert):

df = spark.table(table_name).cache()# this count will take long because it is putting data to memory
df.count() # runs 1.28mindf.count() # runs 14sdf.filter(col("id") > xxx).count() # runs 20.6s

Kein Wunder, dass die erste Zählung 1,3 Minuten dauert. Wie Sie jedoch sehen können, dauern auch die zweite Zählung und die Abfrage mit dem Filter für das zwischengespeicherte Dataset länger als das direkte Lesen aus Parkett. Es ist eine Kombination aus zwei Hauptgründen. Die erste sind die Eigenschaften des Parquet—Dateiformats – Abfragen, die auf Parquet basieren, sind für sich genommen schnell. Beim Lesen von Parquet liest Spark nur die Metadaten, um die Anzahl abzurufen, sodass nicht das gesamte Dataset gescannt werden muss. Für die Filterabfrage wird column Pruning verwendet und nur die ID-Spalte gescannt. Andererseits liest Spark beim Lesen der Daten aus dem Cache den gesamten Datensatz. Dies ist in der Spark-Benutzeroberfläche zu sehen, in der Sie die Eingabegröße für die erste Stufe überprüfen können (siehe Bild unten).

Der zweite Grund ist, dass der Datensatz groß ist und nicht vollständig in den Arbeitsspeicher passt. Ein Teil der Daten wird ebenfalls auf der Festplatte gespeichert und das Lesen von der Festplatte ist viel langsamer als das Lesen aus dem RAM.

Caching in SQL

Wenn Sie direkt SQL anstelle von DataFrame DSL bevorzugen, können Sie immer noch Caching verwenden, es gibt jedoch einige Unterschiede.

spark.sql("cache table table_name")

Der Hauptunterschied besteht darin, dass bei Verwendung von SQL das Caching standardmäßig eifrig ist, sodass ein Job sofort ausgeführt wird und die Daten in die Caching-Ebene verschoben werden. Um es so faul zu machen, wie es im DataFrame DSL ist, können wir das Schlüsselwort lazy explizit verwenden:

spark.sql("cache lazy table table_name")

Um die Daten aus dem Cache zu entfernen, rufen Sie einfach an:

spark.sql("uncache table table_name")

Siehe die zwischengespeicherten Daten

Manchmal fragen Sie sich vielleicht, welche Daten bereits zwischengespeichert sind. Eine Möglichkeit besteht darin, die Spark-Benutzeroberfläche zu überprüfen, die einige grundlegende Informationen zu Daten enthält, die bereits im Cluster zwischengespeichert sind.

Hier können Sie für jedes zwischengespeicherte Dataset sehen, wie viel Speicherplatz im Arbeitsspeicher oder auf der Festplatte belegt ist. Sie können sogar mehr zoomen und auf den Datensatz in der Tabelle klicken, um zu einer anderen Seite mit Details zu jeder Partition zu gelangen.

Um zu überprüfen, ob die gesamte Tabelle zwischengespeichert ist, können wir die Katalog-API verwenden:

spark.catalog.isCached("table_name")

Die Katalog-API kann auch verwendet werden, um alle Daten wie folgt aus dem Cache zu entfernen:

spark.catalog.clearCache()

In der Scala-API können Sie auch die interne API des Cache-Managers verwenden, die einige Funktionen bereitstellt, z. B. können Sie fragen, ob der Cache-Manager leer ist:

// In Scala API:val cm = spark.sharedState.cacheManagercm.isEmpty

Andere Möglichkeiten zur Datenpersistenz

Caching ist eine von mehreren Techniken, die zur Wiederverwendung von Berechnungen verwendet werden können. Neben dem Caching gibt es auch Checkpointing und Exchange-Reuse.

Das Checkpointing ist zum Beispiel in Situationen nützlich, in denen wir den Abfrageplan unterbrechen müssen, weil er zu groß ist. Ein großer Abfrageplan kann in dem Treiber, in dem er verarbeitet wird, zu einem Engpass werden, da die Verarbeitung eines sehr großen Plans zu lange dauert. Der Checkpoint bricht jedoch den Plan und materialisiert die Abfrage. Für die nächsten Transformationen wird Spark mit der Erstellung eines neuen Plans beginnen. Das Checkpointing bezieht sich auf zwei Funktionen checkpoint und localCheckpoint, die sich durch den für die Daten verwendeten Speicher unterscheiden.

Die Exchange-Wiederverwendung, bei der Spark die Ausgabe eines Shuffles auf der Festplatte beibehält, ist andererseits eine Technik, die nicht direkt von einer API-Funktion gesteuert werden kann, sondern eine interne Funktion, die Spark selbst verarbeitet. In einigen speziellen Situationen kann es indirekt gesteuert werden, indem die Abfrage neu geschrieben wird, um identische Austauschzweige zu erreichen. Um mehr über Exchange-Reuse zu erfahren, können Sie meinen anderen Artikel lesen, in dem ich ihn ausführlicher beschreibe.

Fazit

In diesem Artikel haben wir versucht, das Verhalten von Spark in Bezug auf das Caching zu entmystifizieren. Wir haben gesehen, wie es unter der Haube funktioniert und was die Unterschiede zwischen der Verwendung von DSL und SQL sind. Wir haben einige Best Practices besprochen, um das Caching so effizient wie möglich zu gestalten. In einem Beispiel haben wir gezeigt, dass es bei großen Datensätzen, die nicht in den Speicher passen, möglicherweise schneller ist, das Zwischenspeichern zu vermeiden, insbesondere wenn die Daten im spaltenförmigen Dateiformat gespeichert sind. Wir haben auch einige Alternativen zum Caching erwähnt, z. B. Checkpointing oder wiederverwendeter Austausch, die in einigen Situationen für die Datenpersistenz nützlich sein können.

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht.