Articles

Prácticas recomendadas para el almacenamiento en caché en Spark SQL

En Spark, el almacenamiento en caché SQL es una técnica común para reutilizar algunos cálculos. Tiene el potencial de acelerar otras consultas que utilizan los mismos datos, pero hay algunas advertencias que es bueno tener en cuenta si queremos lograr un buen rendimiento. En este artículo, echaremos un vistazo debajo del capó para ver cómo funciona el almacenamiento en caché internamente e intentaremos desmitificar el comportamiento de Spark relacionado con la persistencia de datos.

En la API de DataFrame, hay dos funciones que se pueden usar para almacenar en caché un DataFrame, cache () y persist():

df.cache() # see in PySpark docs heredf.persist() # see in PySpark docs here

Son casi equivalentes, la diferencia es que persist puede tomar un argumento opcional storageLevel mediante el cual podemos especificar dónde se conservarán los datos. El valor predeterminado del storageLevel para ambas funciones es MEMORY_AND_DISK, lo que significa que los datos se almacenarán en la memoria si hay espacio para ellos, de lo contrario, se almacenarán en el disco. Aquí puede ver la documentación (PySpark) para otros niveles de almacenamiento posibles.

El almacenamiento en caché es una transformación perezosa, por lo que inmediatamente después de llamar a la función no pasa nada con los datos, pero el plan de consulta es actualizado por el Administrador de caché agregando un nuevo operador: InMemoryRelation. Así que esto es solo información que se utilizará durante la ejecución de la consulta más adelante cuando se llame a alguna acción. Spark buscará los datos en la capa de almacenamiento en caché y los leerá desde allí si están disponibles. Si no encuentra los datos en la capa de almacenamiento en caché (lo que sucede con seguridad la primera vez que se ejecuta la consulta), será responsable de obtener los datos allí y los usará inmediatamente después.

Administrador de caché

El Administrador de caché es responsable de realizar un seguimiento de los cálculos que ya se han almacenado en caché en términos del plan de consulta. Cuando se llama a la función de almacenamiento en caché, el Administrador de caché se invoca directamente debajo del capó y extrae el plan lógico analizado del marco de datos en el que se llama a la función de almacenamiento en caché y almacena ese plan en una secuencia indexada llamada cachedData.

La fase del Administrador de caché es parte de la planificación lógica y tiene lugar después del analizador y antes del optimizador:

Cuando ejecuta una consulta con una acción, el plan de consulta se procesará y transformará. En el paso del Administrador de Caché (justo antes del optimizador), Spark comprobará si cada subárbol del plan analizado está almacenado en la secuencia cachedData. Si encuentra una coincidencia, significa que el mismo plan (el mismo cálculo) ya se ha almacenado en caché (tal vez en alguna consulta anterior) y, por lo tanto, Spark puede usar eso y, por lo tanto, agrega esa información al plan de consulta utilizando el operador InMemoryRelation que llevará información sobre este plan en caché. Esta relaciÃ3n inmemorial se utiliza en la fase de planificaciÃ3n física para crear un operador fÃsico: el escáner inmemorial.

df = spark.table("users").filter(col(col_name) > x).cache()df.count() # now check the query plan in Spark UI

Aquí en la imagen de arriba se puede ver la representación gráfica y de cadena de una consulta que estaba utilizando el almacenamiento en caché. Para ver qué transformaciones se almacenaron en caché, debe buscar en la representación de cadena del plan porque la representación gráfica no muestra esta información.

Ejemplo básico

Veamos un ejemplo simple para comprender mejor cómo funciona el Administrador de caché:

df = spark.read.parquet(data_path)df.select(col1, col2).filter(col2 > 0).cache()

Considere las siguientes tres preguntas. ¿Cuál de ellos aprovechará los datos almacenados en caché?

1) df.filter(col2 > 0).select(col1, col2)2) df.select(col1, col2).filter(col2 > 10)3) df.select(col1).filter(col2 > 0)

El factor decisivo es el plan lógico analizado. Si es el mismo que el plan analizado de la consulta en caché, se aprovechará la caché. Para la consulta número 1, podría estar tentado a decir que tiene el mismo plan porque el filtro será empujado por el optimizador en ambos casos de todos modos. Pero esto no es del todo exacto. Lo importante a entender es que la fase del Administrador de Caché tiene lugar antes del optimizador. Lo que sería lo mismo son los planes optimizados pero no los planes analizados. Así que consulta n. 1 no aprovechará la caché simplemente porque los planes analizados son diferentes.

Para la consulta n. 2, es posible que nuevamente se sienta tentado a asumir que utilizará los datos almacenados en caché porque el filtro es más restrictivo que el filtro de la consulta almacenada en caché. Podemos ver lógicamente que los datos consultados están en la caché, pero Spark no los leerá desde allí por la misma razón que antes, los planes analizados son diferentes, esta vez la condición de filtrado no es la misma. Para usar los datos almacenados en caché, podemos, sin embargo, arreglar la segunda consulta simplemente agregando el filtro allí:

df.select(col1, col2).filter(col2 > 0).filter(col2 > 10)

A primera vista, el filtro col2 > 0 parece ser inútil aquí, pero no lo es porque ahora parte del plan lógico analizado será idéntico al plan almacenado en caché y el Administrador de caché podrá encontrarlo y usar la relación Inmemoria en el plan de consulta.

Consulta número 3 es difícil, a primera vista, parece que también tendrá un diferente analizado el plan porque la consulta es diferente — seleccionamos sólo col1. Sin embargo, la condición de filtrado es usar col2, que no está presente en la proyección anterior, por lo que el analizador invocará una regla ResolveMissingReferences y agregará col2 a la proyección y el plan analizado se convertirá en idéntico al plan almacenado en caché. Esta vez el Administrador de Caché lo encontrará y lo usará.

Así que la respuesta final es que la consulta n. 3 aprovechará los datos almacenados en caché.

Mejores prácticas

Vamos a enumerar un par de reglas básicas relacionadas con el almacenamiento en caché:

  • Cuando almacene en caché un DataFrame, cree una nueva variable para él cachedDF = df.cache(). Esto le permitirá evitar los problemas que estábamos resolviendo en nuestro ejemplo, que a veces no está claro cuál es el plan analizado y qué fue realmente almacenado en caché. Aquí cuando llames a cachedDF.seleccione (select) aprovechará los datos almacenados en caché.
  • Desperfecte el DataFrame después de que ya no se necesite usando cachedDF.unpersist (). Si la capa de almacenamiento en caché se llena, Spark comenzará a desalojar los datos de la memoria utilizando la estrategia LRU (menos utilizada recientemente). Por lo tanto, es una buena práctica usar la falta de atención para mantener un mayor control sobre lo que debe ser desalojado. Además, cuanto más espacio tenga en memoria, más se puede activar el uso para la ejecución, por ejemplo, para crear mapas hash, etc.
  • Antes de almacenar en caché, asegúrese de que solo almacena en caché lo que necesitará en sus consultas. Por ejemplo, si una consulta usará (col1, col2, col3) y la segunda consulta usará (col2, col3, col4), seleccione un superconjunto de estas columnas: cachedDF = df.seleccione (col1, col2, col3, col4).cache(). No es muy útil llamar a cachedDF = df.cache() si el df contiene muchas columnas y solo se necesitará un pequeño subconjunto en las consultas de seguimiento.
  • Utilice el almacenamiento en caché solo si tiene sentido. Es decir, si el cálculo en caché se utilizará varias veces. Es bueno entender que poner los datos en memoria también está relacionado con cierta sobrecarga, por lo que en algunos casos, podría ser aún más rápido simplemente ejecutar el cálculo de nuevo si es lo suficientemente rápido y no usar el almacenamiento en caché como se puede ver en el siguiente párrafo.

Más rápido que el almacenamiento en caché

Hay situaciones en las que el almacenamiento en caché no ayuda en absoluto y, por el contrario, ralentiza la ejecución. Esto está relacionado, por ejemplo, con consultas basadas en grandes conjuntos de datos almacenados en un formato de archivo en columnas que admite la poda de columnas y el empuje de predicados, como parquet. Consideremos el siguiente ejemplo, en el que almacenaremos en caché todo el conjunto de datos y luego ejecutaremos algunas consultas encima. Utilizaremos las siguientes propiedades de conjuntos de datos y clústeres:

dataset size: 14.3GB in compressed parquet sitting on S3
cluster size: 2 workers c5.4xlarge (32 cores together)
platform: Databricks (runtime 6.6 wit Spark 2.4.5)

Primero, midamos los tiempos de ejecución de las consultas en las que no se utiliza el almacenamiento en caché:

df = spark.table(table_name)df.count() # runs 7.9sdf.filter(col("id") > xxx).count() # runs 18.2s

Ahora ejecute las mismas consultas con almacenamiento en caché (el conjunto de datos completo no cabe en la memoria y aproximadamente el 30% se almacena en caché en el disco):

df = spark.table(table_name).cache()# this count will take long because it is putting data to memory
df.count() # runs 1.28mindf.count() # runs 14sdf.filter(col("id") > xxx).count() # runs 20.6s

No es de extrañar que el primer recuento tome 1,3 min, existe la sobrecarga relacionada con poner datos en memoria. Sin embargo, como puede ver, también el segundo recuento y la consulta con el filtro tardan más en el conjunto de datos en caché en comparación con la lectura directamente desde parquet. Es una combinación de dos razones principales. La primera es las propiedades del formato de archivo de parquet: las consultas basadas en la parte superior del parquet son rápidas por sí solas. En el caso de la lectura desde parquet, Spark solo leerá los metadatos para obtener el recuento, de modo que no sea necesario escanear todo el conjunto de datos. Para la consulta de filtrado, utilizará el recorte de columnas y escaneará solo la columna id. Por otro lado, al leer los datos de la caché, Spark leerá todo el conjunto de datos. Esto se puede ver en la interfaz de usuario de Spark, donde puede verificar el tamaño de entrada para la primera etapa (vea la imagen a continuación).

La segunda razón es que el conjunto de datos es grande y no encaja completamente en la ram. Parte de los datos también se almacenan en el disco y la lectura desde el disco es mucho más lenta que la lectura desde la ram.

Almacenamiento en caché en SQL

Si prefiere usar directamente SQL en lugar de DataFrame DSL, aún puede usar almacenamiento en caché, sin embargo, hay algunas diferencias.

spark.sql("cache table table_name")

La principal diferencia es que al usar SQL el almacenamiento en caché está ansioso de forma predeterminada, por lo que un trabajo se ejecutará inmediatamente y colocará los datos en la capa de almacenamiento en caché. Para hacerlo perezoso como es en el DataFrame DSL podemos usar la palabra clave lazy explícitamente:

spark.sql("cache lazy table table_name")

Para eliminar los datos de la caché, simplemente llame:

spark.sql("uncache table table_name")

Vea los datos almacenados en caché

A veces puede preguntarse qué datos ya están almacenados en caché. Una posibilidad es comprobar la interfaz de usuario de Spark, que proporciona información básica sobre los datos que ya están almacenados en caché en el clúster.

a continuación para cada conjunto de datos en caché, usted puede ver cuánto espacio ocupa en la memoria o en el disco. Incluso puede hacer zoom más y hacer clic en el registro en la tabla que lo llevará a otra página con detalles sobre cada partición.

Para comprobar si toda la tabla está almacenada en caché, podemos usar la API de catálogo:

spark.catalog.isCached("table_name")

La API de catálogo también se puede usar para eliminar todos los datos de la caché de la siguiente manera:

spark.catalog.clearCache()

En la API de Scala también puede usar la API interna del Administrador de Caché que proporciona algunas funciones, por ejemplo, puede preguntar si el Administrador de Caché está vacío:

// In Scala API:val cm = spark.sharedState.cacheManagercm.isEmpty

Otras posibilidades para la persistencia de datos

El almacenamiento en caché es una de las más técnicas que se pueden usar para reutilizar algunos cálculos. Además del almacenamiento en caché, también hay puntos de verificación y reutilización de intercambio.

El checkpointing es útil, por ejemplo, en situaciones en las que necesitamos romper el plan de consulta porque es demasiado grande. Un plan de consulta grande puede convertirse en un cuello de botella en el conductor donde se procesa porque el procesamiento de un plan muy grande tomará mucho tiempo. Sin embargo, el punto de control romperá el plan y materializará la consulta. Para las próximas transformaciones, Spark comenzará a construir un nuevo plan. El checkpointing está relacionado con dos funciones checkpoint y localCheckpoint que se diferencian por el almacenamiento utilizado para los datos.

La reutilización de intercambio donde Spark persiste la salida de una mezcla en disco, es, por otro lado, una técnica que no puede ser controlada directamente por alguna función API, sino que es una característica interna que Spark maneja por sí sola. En algunas situaciones especiales, se puede controlar indirectamente reescribiendo la consulta tratando de obtener ramas de intercambio idénticas. Para leer más sobre intercambio-reutilización, puedes consultar mi otro artículo, donde lo describo con más detalle.

Conclusión

En este artículo, intentamos desmitificar el comportamiento de Spark relacionado con el almacenamiento en caché. Hemos visto cómo funciona bajo el capó y cuáles son las diferencias entre el uso de DSL vs SQL. Discutimos algunas prácticas recomendadas sobre cómo hacer que el almacenamiento en caché sea lo más eficiente posible. En un ejemplo, mostramos que para grandes conjuntos de datos que no caben en la memoria, podría ser más rápido evitar el almacenamiento en caché, especialmente si los datos se almacenan en formato de archivo en columnas. También mencionamos algunas alternativas al almacenamiento en caché, como el punto de verificación o el intercambio reutilizado, que pueden ser útiles para la persistencia de datos en algunas situaciones.

Deja una respuesta

Tu dirección de correo electrónico no será publicada.