Articles

legjobb gyakorlatok a Spark SQL gyorsítótárazásához

a Sparkban az SQL gyorsítótár egy általános technika bizonyos számítások újrafelhasználására. Lehetősége van felgyorsítani más, ugyanazokat az adatokat használó lekérdezéseket, de vannak olyan figyelmeztetések, amelyeket jó szem előtt tartani, ha jó teljesítményt akarunk elérni. Ebben a cikkben megnézzük a motorháztető alatt, hogy megnézzük, hogyan működik a gyorsítótár belsőleg, és megpróbáljuk demisztifikálni a Spark viselkedését az adatok tartósságával kapcsolatban.

a DataFrame API-ban két függvény használható a DataFrame gyorsítótárazására, a cache () és a persist():

df.cache() # see in PySpark docs heredf.persist() # see in PySpark docs here

szinte egyenértékűek, a különbség az, hogy a persist egy opcionális argumentumot vehet igénybe storageLevel amellyel meghatározhatjuk, hogy az adatok hol maradnak fenn. A storageLevel alapértelmezett értéke mindkét funkció esetében a MEMORY_AND_DISK, ami azt jelenti, hogy az adatok a memóriában tárolódnak, ha van hely számukra, különben a lemezen tárolódnak. Itt láthatja a (PySpark) dokumentációt az egyéb lehetséges tárolási szintekről.

a gyorsítótár egy lusta transzformáció, így közvetlenül a függvény meghívása után semmi sem történik az adatokkal, de a lekérdezési tervet a gyorsítótár — kezelő frissíti egy új operátor hozzáadásával-InMemoryRelation. Tehát ez csak néhány információ, amelyet később a lekérdezés végrehajtása során használnak fel, amikor valamilyen műveletet meghívnak. A Spark megkeresi az adatokat a gyorsítótárazási rétegben, és onnan olvassa el, ha rendelkezésre áll. Ha nem találja az adatokat a gyorsítótárazási rétegben (ami biztosan megtörténik a lekérdezés első futtatásakor), akkor felelős lesz az adatok odajuttatásáért, és azonnal fel fogja használni.

gyorsítótár-kezelő

a gyorsítótár-kezelő felelős annak nyomon követéséért, hogy a lekérdezési terv szempontjából milyen számítások kerültek már gyorsítótárba. A gyorsítótár-függvény meghívásakor a gyorsítótár-kezelő közvetlenül a motorháztető alatt kerül meghívásra, és kihúzza annak az Adatkeretnek az elemzett logikai tervét, amelyen a gyorsítótár-függvényt meghívják, és a tervet egy cachedData nevű indexelt sorrendben tárolja.

a gyorsítótár-kezelő fázisa a logikai tervezés része, és az analizátor után és az optimalizáló előtt zajlik:

amikor egy lekérdezést egy művelettel futtat, a lekérdezési terv feldolgozásra és átalakításra kerül. A gyorsítótár-kezelő lépésében (közvetlenül az optimalizáló előtt) a Spark ellenőrzi az elemzett terv minden egyes részfáját, ha az a cachedData sorrendben van-e tárolva. Ha talál egyezést, az azt jelenti, hogy ugyanazt a tervet (ugyanazt a számítást) már gyorsítótárazták (talán néhány korábbi lekérdezésben), így a Spark ezt felhasználhatja, és így hozzáadja ezt az információt a lekérdezési tervhez az InMemoryRelation operátor segítségével, amely információkat hordoz erről a gyorsítótárazott tervről. Ezt az InMemoryRelation-t ezután a fizikai tervezés szakaszában használják egy fizikai operátor létrehozására— InMemoryTableScan.

df = spark.table("users").filter(col(col_name) > x).cache()df.count() # now check the query plan in Spark UI

itt a fenti képen egy gyorsítótárat használó lekérdezés grafikus és karakterláncos ábrázolása látható. A gyorsítótárban tárolt átalakítások megtekintéséhez meg kell vizsgálnia a terv karakterlánc-ábrázolását, mert a grafikus ábrázolás nem jeleníti meg ezeket az információkat.

alapvető példa

lássunk egy egyszerű példát, hogy jobban megértsük a gyorsítótár-kezelő működését:

df = spark.read.parquet(data_path)df.select(col1, col2).filter(col2 > 0).cache()

fontolja meg a következő három kérdést. Melyikük fogja kihasználni a gyorsítótárazott adatokat?

1) df.filter(col2 > 0).select(col1, col2)2) df.select(col1, col2).filter(col2 > 10)3) df.select(col1).filter(col2 > 0)

a döntő tényező az elemzett logikai terv. Ha megegyezik a gyorsítótárazott lekérdezés elemzett tervével, akkor a gyorsítótár kihasználásra kerül. Az 1. számú lekérdezéshez kísértésbe eshet, hogy azt mondja, hogy ugyanaz a terve, mert a szűrőt mindkét esetben az optimalizáló fogja nyomni. De ez valójában nem teljesen pontos. Fontos megérteni, hogy a gyorsítótár-kezelő fázisa az optimalizáló előtt zajlik. Ugyanazok lennének az optimalizált tervek, de nem elemzett tervek. Tehát lekérdezés n. Az 1 nem fogja kihasználni a gyorsítótárat egyszerűen azért, mert az elemzett tervek eltérőek.

az n. 2 lekérdezéshez ismét kísértés lehet azt feltételezni, hogy a gyorsítótárazott adatokat fogja használni, mert a szűrő korlátozóbb, mint a gyorsítótárazott lekérdezésben szereplő szűrő. Logikusan láthatjuk, hogy a lekérdezett adatok a gyorsítótárban vannak, de a Spark nem fogja onnan olvasni ugyanazon ok miatt, mint korábban — az elemzett tervek eltérőek — ezúttal a szűrési feltétel nem ugyanaz. A gyorsítótárazott adatok használatához azonban a második lekérdezést csak a szűrő hozzáadásával javíthatjuk:

df.select(col1, col2).filter(col2 > 0).filter(col2 > 10)

első látásra a col2 > 0 szűrő itt haszontalannak tűnik, de nem azért, mert most az elemzett logikai terv egy része megegyezik a gyorsítótárazott tervvel, és a gyorsítótár-kezelő képes lesz megtalálni és használni az InMemoryRelation-t a lekérdezési tervben.

lekérdezés száma 3 trükkös, első látásra, úgy tűnik, hogy ez is lesz egy másik elemzett tervet, mert a lekérdezés más — kiválasztjuk csak col1. A szűrési feltétel azonban a col2-t használja, amely nincs jelen az előző vetületben, így az analizátor egy szabály Resolvemissingreferenciákat hív meg, és col2-t ad hozzá a vetítéshez, és az elemzett terv valóban azonos lesz a gyorsítótárazott tervvel. Ezúttal a gyorsítótár-kezelő megtalálja és használja.

tehát a végső válasz az, hogy az N. 3 lekérdezés kihasználja a gyorsítótárazott adatokat.

legjobb gyakorlatok

soroljuk fel a gyorsítótárazással kapcsolatos néhány hüvelykujjszabályt:

  • amikor cache DataFrame hozzon létre egy új változót, hogy cachedDF = df.gyorsítótár (). Ez lehetővé teszi, hogy megkerülje azokat a problémákat, amelyeket példánkban megoldottunk, hogy néha nem világos, hogy mi az elemzett terv, és mi volt a gyorsítótárban. Itt, amikor hívja cachedDF.válassza a (…) lehetőséget, hogy kihasználja a gyorsítótárazott adatokat.
  • szüntesse meg az Adatkeretet, miután már nincs rá szükség a cachedDF használatával.unpersist (). Ha a gyorsítótárazási réteg megtelik, a Spark elkezdi kilakoltatni az adatokat a memóriából az LRU (legkevésbé használt) stratégia segítségével. Tehát jó gyakorlat, ha az unpersist – et használjuk, hogy jobban ellenőrizzük, hogy mit kell kilakoltatni. Továbbá, minél több hely van a memóriában, annál több Spark használható a végrehajtáshoz, például hash térképek készítéséhez stb.
  • a gyorsítótár használata előtt győződjön meg arról, hogy csak azt tárolja a gyorsítótárban, amire szüksége lesz a lekérdezésekben. Ha például az egyik lekérdezés (col1, col2, col3), a második lekérdezés pedig (col2, col3, col4), válassza ki az alábbi oszlopok felülhalmazát: cachedDF = df.válassza a(col1, col2, col3, col4) lehetőséget.gyorsítótár (). Nem nagyon hasznos a cachedDF = df hívása.cache (), ha a df sok oszlopot tartalmaz, és csak egy kis részhalmazra lesz szükség a nyomon követési lekérdezésekben.
  • csak akkor használja a gyorsítótárat, ha van értelme. Ez az, ha a gyorsítótárazott számítást többször használják. Jó megérteni, hogy az adatok memóriába helyezése némi rezsi miatt is összefügg, így bizonyos esetekben még gyorsabb lehet egyszerűen újra futtatni a számítást, ha elég gyors, és egyáltalán nem használja a gyorsítótárat, amint az a következő bekezdésben látható.

gyorsabb, mint a gyorsítótár

vannak olyan helyzetek, amikor a gyorsítótár egyáltalán nem segít, éppen ellenkezőleg, lassítja a végrehajtást. Ez például az oszlopos fájlformátumban tárolt nagy adatkészleteken alapuló lekérdezésekhez kapcsolódik, amelyek támogatják az oszlopmetszést és a predikátum pushdown-t, például a parketta-t. Vizsgáljuk meg a következő példát, amelyben a teljes adatkészletet gyorsítótárazzuk, majd néhány lekérdezést futtatunk rajta. A következő adatkészletet és fürttulajdonságokat fogjuk használni:

dataset size: 14.3GB in compressed parquet sitting on S3
cluster size: 2 workers c5.4xlarge (32 cores together)
platform: Databricks (runtime 6.6 wit Spark 2.4.5)

először mérjük meg azoknak a lekérdezéseknek a végrehajtási idejét, ahol a gyorsítótárat nem használják:

df = spark.table(table_name)df.count() # runs 7.9sdf.filter(col("id") > xxx).count() # runs 18.2s

most futtassa ugyanazokat a lekérdezéseket gyorsítótárazással (a teljes adatkészlet nem fér el a memóriában, és körülbelül 30% – a Gyorsítótárazva van a lemezen):

df = spark.table(table_name).cache()# this count will take long because it is putting data to memory
df.count() # runs 1.28mindf.count() # runs 14sdf.filter(col("id") > xxx).count() # runs 20.6s

nem csoda, hogy az első számlálás 1,3 percet vesz igénybe, ott van az adatok memóriába helyezésével kapcsolatos rezsi. Azonban, mint látható, a második számlálás és a szűrővel történő lekérdezés is hosszabb időt vesz igénybe a gyorsítótárazott adatkészlethez képest, mint a közvetlenül a parketta-ból történő olvasás. Ez két fő ok kombinációja. Az első a parketta fájlformátum tulajdonságai — a parketta tetején alapuló lekérdezések önmagukban gyorsak. Parketta olvasása esetén a Spark csak a metaadatokat olvassa el a számláláshoz, így nem kell beolvasnia a teljes adatkészletet. A szűrési lekérdezéshez oszlopmetszést fog használni,és csak az azonosító oszlopot vizsgálja. Másrészt, amikor az adatokat a gyorsítótárból olvassa, a Spark elolvassa a teljes adatkészletet. Ez látható a Spark UI-ból, ahol ellenőrizheti az első szakasz bemeneti méretét (lásd az alábbi képet).

a második ok az, hogy az adatkészlet nagy, és nem fér el teljesen a ram-ban. Az adatok egy része a lemezen is tárolódik, és a lemezről történő olvasás sokkal lassabb, mint a ram-ból történő olvasás.

gyorsítótár az SQL-ben

ha a DataFrame DSL helyett közvetlenül az SQL-t szeretné használni, akkor is használhatja a gyorsítótárat, vannak azonban különbségek.

spark.sql("cache table table_name")

a fő különbség az, hogy az SQL használatával a gyorsítótár alapértelmezés szerint lelkes, így egy feladat azonnal elindul, és az adatokat a gyorsítótárazási rétegbe helyezi. Annak érdekében, hogy lusta legyen, mint a DataFrame DSL-ben, kifejezetten használhatjuk a lusta kulcsszót:

spark.sql("cache lazy table table_name")

az adatok eltávolításához a gyorsítótárból csak hívjon:

spark.sql("uncache table table_name")

lásd a gyorsítótárazott adatokat

néha elgondolkodhat azon, hogy milyen adatok vannak már Gyorsítótárazva. Az egyik lehetőség a Spark felhasználói felületének ellenőrzése, amely néhány alapvető információt nyújt a fürtön már tárolt adatokról.

itt minden gyorsítótárazott adatkészletnél láthatja, hogy mennyi helyet foglal el a memóriában vagy a lemezen. Még többet is nagyíthat, majd rákattinthat a rekordra a táblázatban, amely egy másik oldalra vezet, amely az egyes partíciók részleteit tartalmazza.

annak ellenőrzéséhez, hogy a teljes táblázat gyorsítótárban van – e, használhatjuk a katalógus API-t:

spark.catalog.isCached("table_name")

a katalógus API az összes adat eltávolítására is használható a gyorsítótárból az alábbiak szerint:

spark.catalog.clearCache()

a Scala API-ban használhatja a gyorsítótár-kezelő belső API-ját is, amely bizonyos funkciókat biztosít, például megkérdezheti, hogy a gyorsítótár-kezelő üres-e:

// In Scala API:val cm = spark.sharedState.cacheManagercm.isEmpty

a

Adatperzisztencia egyéb lehetőségei a gyorsítótár az egyik olyan technika, amely felhasználható egyes számítások újrafelhasználására. A gyorsítótárazáson kívül checkpointing és exchange-reuse is létezik.

a checkpointing hasznos például olyan helyzetekben, amikor meg kell szakítani a lekérdezési tervet, mert túl nagy. A nagy lekérdezési terv szűk keresztmetszetet jelenthet az illesztőprogramban, ahol feldolgozzák, mert egy nagyon nagy terv feldolgozása sokáig tart. Az ellenőrzőpont azonban megtöri a tervet, és megvalósítja a lekérdezést. A következő átalakításokhoz a Spark új tervet kezd építeni. A checkpointing két Checkpoint és localCheckpoint függvényhez kapcsolódik, amelyek különböznek az adatok tárolásától.

a csere-újrafelhasználás, ahol Spark továbbra is fennáll a kimenet egy shuffle lemezen, van, másrészt, egy technika, amely nem vezérelhető közvetlenül néhány API funkció, hanem, ez egy belső jellemzője, hogy a Spark kezeli a saját. Bizonyos speciális helyzetekben közvetett módon vezérelhető a lekérdezés átírásával, amely megpróbálja elérni az azonos csereágakat. Ha többet szeretne megtudni az exchange-újrafelhasználásról, ellenőrizze a másik cikkemet, ahol részletesebben leírom.

következtetés

ebben a cikkben megpróbáltuk demisztifikálni a Spark gyorsítótárazással kapcsolatos viselkedését. Láttuk, hogyan működik a motorháztető alatt, és mi a különbség a DSL vs SQL használata között. Megbeszéltünk néhány bevált gyakorlatot arról, hogyan lehet a gyorsítótárat a lehető leghatékonyabbá tenni. Az egyik példában megmutattuk, hogy a memóriába nem illeszkedő nagy adatkészletek esetében gyorsabb lehet a gyorsítótárazás elkerülése, különösen, ha az adatokat oszlopos fájlformátumban tárolják. Megemlítettünk néhány gyorsítótárazási alternatívát is, például ellenőrző pontozást vagy újrafelhasznált cserét, amelyek bizonyos helyzetekben hasznosak lehetnek az adatok tartóssága szempontjából.

Vélemény, hozzászólás?

Az e-mail-címet nem tesszük közzé.