Articles

cele mai bune practici pentru cache în Spark SQL

în cache-ul Spark SQL este o tehnică comună pentru reutilizarea unor calcule. Are potențialul de a accelera alte interogări care utilizează aceleași date, dar există unele avertismente care sunt bune de reținut dacă dorim să obținem performanțe bune. În acest articol, vom arunca o privire sub capotă pentru a vedea cum funcționează cache-ul intern și vom încerca să demistificăm comportamentul Spark legat de persistența datelor.

în DATAFRAME API, există două funcții care pot fi utilizate pentru cache-ul unui DataFrame, cache () și persist():

df.cache() # see in PySpark docs heredf.persist() # see in PySpark docs here

ele sunt aproape echivalente, diferența este că persistă poate lua un storageLevel argument opțional prin care putem specifica în cazul în care datele vor fi persistat. Valoarea implicită a storageLevel pentru ambele funcții este MEMORY_AND_DISK, ceea ce înseamnă că datele vor fi stocate în memorie dacă există spațiu pentru aceasta, în caz contrar, vor fi stocate pe disc. Aici puteți vedea documentația (PySpark) pentru alte niveluri de stocare posibile.

cache — ul este o transformare leneșă, deci imediat după apelarea funcției nu se întâmplă nimic cu datele, dar planul de interogare este actualizat de managerul Cache prin adăugarea unui nou operator-InMemoryRelation. Deci, aceasta este doar câteva informații care vor fi utilizate în timpul executării interogării mai târziu, atunci când se numește o acțiune. Spark va căuta datele din stratul de cache și le va citi de acolo dacă sunt disponibile. Dacă nu găsește datele în stratul de cache (ceea ce se întâmplă cu siguranță prima dată când interogarea rulează), va deveni responsabil pentru obținerea datelor acolo și le va folosi imediat după aceea.

Manager Cache

managerul Cache este responsabil pentru a urmări ce calcul a fost deja memorat în cache în ceea ce privește planul de interogare. Când se apelează funcția de cache, managerul de Cache este invocat direct sub capotă și scoate planul logic analizat al Cadrului de date pe care se numește funcția de cache și stochează acel plan într-o secvență indexată numită cachedData.

faza managerului Cache face parte din planificarea logică și are loc după analizor și înainte de optimizator:

când executați o interogare cu o acțiune, planul de interogare va fi procesat și transformat. În etapa managerului Cache (chiar înainte de optimizator) Spark va verifica pentru fiecare subarbore al planului analizat dacă este stocat în secvența cachedData. Dacă găsește o potrivire înseamnă că același plan (același calcul) a fost deja memorat în cache (probabil în unele interogări anterioare) și astfel Spark poate folosi asta și astfel adaugă acele informații la planul de interogare folosind operatorul InMemoryRelation care va transporta informații despre acest plan cache. Acest InMemoryRelation este apoi utilizat în faza de planificare fizică pentru a crea un operator fizic— InMemoryTableScan.

df = spark.table("users").filter(col(col_name) > x).cache()df.count() # now check the query plan in Spark UI

aici, în imaginea de mai sus, puteți vedea reprezentarea grafică și șir de o interogare care a fost folosind cache. Pentru a vedea ce transformări au fost memorate în cache, trebuie să analizați reprezentarea șirului planului, deoarece reprezentarea grafică nu afișează aceste informații.

exemplu de bază

să vedem un exemplu simplu pentru a înțelege mai bine cum funcționează managerul Cache:

df = spark.read.parquet(data_path)df.select(col1, col2).filter(col2 > 0).cache()

luați în considerare următoarele trei întrebări. Care dintre ele va folosi datele din cache?

1) df.filter(col2 > 0).select(col1, col2)2) df.select(col1, col2).filter(col2 > 10)3) df.select(col1).filter(col2 > 0)

factorul decisiv este planul logic analizat. Dacă este același cu planul analizat al interogării cache, atunci memoria cache va fi utilizată. Pentru interogarea numărul 1 s-ar putea să fiți tentat să spuneți că are același plan, deoarece filtrul va fi împins de optimizator în ambele cazuri oricum. Dar acest lucru nu este de fapt complet precis. Cel mai important lucru de înțeles este că faza managerului Cache are loc înaintea optimizatorului. Ceea ce ar fi la fel sunt planurile optimizate, dar nu planurile analizate. Deci interogarea n. 1 nu va folosi memoria cache pur și simplu pentru că planurile analizate sunt diferite.

pentru interogarea n. 2 s-ar putea să fiți tentat din nou să presupuneți că va folosi datele din cache, deoarece filtrul este mai restrictiv decât filtrul din interogarea din cache. Putem vedea logic că datele interogate sunt în cache, dar Spark nu le va citi de acolo din același motiv ca înainte — planurile analizate sunt diferite — de data aceasta condiția de filtrare nu este aceeași. Pentru a utiliza datele din cache, putem totuși să remediem a doua interogare doar adăugând filtrul acolo:

df.select(col1, col2).filter(col2 > 0).filter(col2 > 10)

la prima vedere, filtrul col2 > 0 pare a fi inutil aici, dar nu este pentru că acum o parte din planul logic analizat va fi identic cu planul cache și managerul Cache va fi capabil să-l găsească și să utilizeze InMemoryRelation în planul de interogare.

interogarea numărul 3 este complicat, la prima vedere, se pare că, de asemenea, va avea un plan diferit analizat, deoarece interogarea este diferită — vom selecta doar col1. Condiția de filtrare este, totuși, folosind col2, care nu este prezent în proiecția anterioară și astfel analizorul va invoca o regulă ResolveMissingReferences și se va adăuga col2 proiecției și planul analizat va deveni de fapt identic cu planul cache. De data aceasta managerul de Cache îl va găsi și îl va folosi.

deci răspunsul final este că interogarea n. 3 va folosi datele din cache.

cele mai bune practici

să enumerăm câteva reguli generale legate de cache:

  • când cache un cadru de date a crea o nouă variabilă pentru ea cachedDF = df.cache (). Acest lucru vă va permite să ocoliți problemele pe care le rezolvam în exemplul nostru, că uneori nu este clar care este planul analizat și ce a fost de fapt memorat în cache. Aici ori de câte ori apelați cachedDF.selectați ( … ) va utiliza datele din cache.
  • Unpersist cadrul de date după ce nu mai este necesar folosind cachedDF.nepersist (). Dacă stratul de cache devine plin, Spark va începe să evacueze datele din memorie folosind strategia LRU (cel mai puțin utilizat recent). Deci, este o practică bună să folosiți unpersist pentru a rămâne mai mult în control cu privire la ceea ce ar trebui evacuat. De asemenea, cu cât aveți mai mult spațiu în memorie, cu atât mai mult poate declanșa utilizarea pentru execuție, de exemplu, pentru construirea hărților hash și așa mai departe.
  • înainte de cache, asigurați-vă că memorați în cache doar ceea ce veți avea nevoie în interogările dvs. De exemplu, dacă o interogare va folosi (col1, col2, col3) și a doua interogare va folosi (col2, col3, col4), selectați un superset al acestor coloane: cachedDF = df.selectați (col1, col2, col3, col4).cache (). Nu este foarte util să apelați cachedDF = df.cache() dacă df conține o mulțime de coloane și doar un subset mic va fi necesar în interogările de urmărire.
  • utilizați cache-ul numai dacă are sens. Aceasta este în cazul în care calculul cache va fi folosit de mai multe ori. Este bine să înțelegeți că punerea datelor în memorie este, de asemenea, legată de unele cheltuieli generale, astfel încât, în unele cazuri, ar putea fi chiar mai rapid să rulați din nou calculul dacă este suficient de rapid și să nu utilizați deloc cache-ul așa cum puteți vedea în paragraful următor.

mai rapid decât cache-ul

există situații în care cache-ul nu ajută deloc și, dimpotrivă, încetinește execuția. Acest lucru este legat, de exemplu, de interogări bazate pe seturi de date mari stocate într-un format de fișier coloană care acceptă tăierea coloanei și împingerea predicatului, cum ar fi parchetul. Să luăm în considerare următorul exemplu, în care vom cache întregul set de date și apoi vom rula câteva interogări deasupra acestuia. Vom folosi următoarele seturi de date și proprietăți de cluster:

dataset size: 14.3GB in compressed parquet sitting on S3
cluster size: 2 workers c5.4xlarge (32 cores together)
platform: Databricks (runtime 6.6 wit Spark 2.4.5)

în primul rând, să măsurăm timpii de execuție pentru interogările în care cache-ul nu este utilizat:

df = spark.table(table_name)df.count() # runs 7.9sdf.filter(col("id") > xxx).count() # runs 18.2s

acum rulați aceleași interogări cu cache (întregul set de date nu se încadrează în memorie și aproximativ 30% este memorat în cache pe disc):

df = spark.table(table_name).cache()# this count will take long because it is putting data to memory
df.count() # runs 1.28mindf.count() # runs 14sdf.filter(col("id") > xxx).count() # runs 20.6s

nu e de mirare că primul număr durează 1,3 minute, există cheltuielile legate de punerea datelor în memorie. Cu toate acestea, după cum puteți vedea, de asemenea, al doilea număr și interogarea cu filtrul dura mai mult pentru setul de date cache în comparație cu citirea direct de la Parchet. Este o combinație de două motive majore. Primul este proprietățile formatului de fișier parchet — interogările bazate pe partea de sus a parchetului sunt rapide pe cont propriu. În cazul citirii de la Parchet, Spark va citi doar metadatele pentru a obține numărul, astfel încât nu trebuie să scaneze întregul set de date. Pentru interogarea de filtrare, va utiliza tăierea coloanei și va scana doar coloana id. Pe de altă parte, când citiți datele din cache, Spark va citi întregul set de date. Acest lucru poate fi văzut din Spark UI, unde puteți verifica dimensiunea de intrare pentru prima etapă (a se vedea imaginea de mai jos).

al doilea motiv este că setul de date este mare și nu se potrivește în întregime în memoria ram. O parte din date sunt stocate și pe disc, iar citirea de pe disc este mult mai lentă decât citirea din memoria ram.

Caching în SQL

dacă preferați să utilizați direct SQL în loc de DATAFRAME DSL, puteți utiliza în continuare caching, există unele diferențe, cu toate acestea.

spark.sql("cache table table_name")

principala diferență este că folosind SQL cache-ul este dornic în mod implicit, astfel încât o lucrare va rula imediat și va pune datele în stratul de cache. Pentru a face leneș așa cum este în DATAFRAME DSL putem folosi cuvântul cheie leneș în mod explicit:

spark.sql("cache lazy table table_name")

pentru a elimina datele din memoria cache, trebuie doar să apelați:

spark.sql("uncache table table_name")

vedeți datele din cache

uneori vă puteți întreba ce date sunt deja memorate în cache. O posibilitate este de a verifica Spark UI, care oferă câteva informații de bază despre datele care sunt deja memorate în cache pe cluster.

aici pentru fiecare set de date cache, puteți vedea cât spațiu este nevoie în memorie sau pe disc. Puteți chiar să măriți mai mult și să faceți clic pe înregistrarea din tabel, care vă va duce la o altă pagină cu detalii despre fiecare partiție.

pentru a verifica dacă întregul tabel este memorat în cache, putem folosi API-ul catalogului:

spark.catalog.isCached("table_name")

API-ul catalogului poate fi, de asemenea, utilizat pentru a elimina toate datele din memoria cache după cum urmează:

spark.catalog.clearCache()

în Scala API puteți utiliza, de asemenea, API-ul intern al managerului Cache care oferă unele funcții, de exemplu, puteți întreba dacă managerul Cache este gol:

// In Scala API:val cm = spark.sharedState.cacheManagercm.isEmpty

alte posibilități de persistență a datelor

cache-ul este una dintre mai multe tehnici care pot fi utilizate pentru reutilizarea unor calcule. În afară de cache există, de asemenea, checkpointing și schimb-reutilizare.

checkpointing este util, de exemplu, în situațiile în care avem nevoie pentru a rupe planul de interogare, deoarece este prea mare. Un plan de interogare mare poate deveni un blocaj în driverul în care este procesat, deoarece procesarea unui plan foarte mare va dura mult timp. Cu toate acestea, punctul de control va rupe planul și va materializa interogarea. Pentru următoarele transformări, Spark va începe să construiască un nou plan. Checkpointing este legat de două funcții checkpoint și localCheckpoint care diferă de stocarea utilizată pentru date.

schimbul de reutilizare în cazul în care Spark persistă ieșirea unui shuffle pe disc, este, pe de altă parte, o tehnică care nu poate fi controlată direct de o anumită funcție API, dar în schimb, este o caracteristică internă care Spark se ocupă pe cont propriu. În unele situații speciale, acesta poate fi controlat indirect prin rescrierea interogării încercând să obțină ramuri de schimb identice. Pentru a citi mai multe despre schimb-reutilizare, puteți verifica celălalt articol al meu, unde îl descriu mai detaliat.

concluzie

în acest articol, am încercat să demistificăm comportamentul Spark legat de cache. Am văzut cum funcționează sub capotă și care sunt diferențele dintre utilizarea DSL vs SQL. Am discutat câteva dintre cele mai bune practici cu privire la modul de a face cache-ul cât mai eficient posibil. Pe un exemplu am arătat că pentru seturile de date mari care nu se încadrează în memorie, ar putea fi mai rapid să evitați cache-ul, mai ales dacă datele sunt stocate în format de fișier coloană. Am menționat, de asemenea, câteva alternative la cache, cum ar fi checkpointing sau schimb reutilizat, care pot fi utile pentru persistența datelor în anumite situații.

Lasă un răspuns

Adresa ta de email nu va fi publicată.