1. Introducción
Hola a todos y bienvenidos a un nuevo post de nuestro querido blog!. Recientemente he obtenido la certificación CRT020 Spark Databricks que consta de una parte teórica y otra práctica en donde se pone a prueba la soltura del candidato en la API de Dataframes de Spark. En este post vamos a ver algunos de los ejemplos que me han servido para preparar la parte práctica haciendo un recorrido por las agregaciones, joins, groupby o window.
Tecnologías empleadas:
- Databricks
- Spark 2.4.4
- PySpark 2.4.4
Los siguientes ejemplos han sido ejecutados y verificados en el entorno de test que proporciona Databricks
2. Antes de empezar
Subimos el siguiente fichero al filesystem del entorno de Databricks.
id;name;surname;age 1;Jorge;Hernandez;32 2;Jorge;Hernandez;63 3;Jose;Hernandez;32 4;Barbara;Vazquez;27 5;Jose;Illan;28 6;Jose María;Gutierrez;33 6;Jose María;;80
Esto se hace desde la opción de menú Data
3. Leyendo ficheros
Sin esquema
Utilizamos la opción inferSchema
para que las columnas id y age sean sean inferidas como enteros
from pyspark.sql.functions import * from pyspark.sql.types import * df = spark.read.format('csv')\ .option('sep', ';')\ .option('header', True)\ .option('inferSchema', True)\ .load('/FileStore/tables/user.csv')
Con esquema
Indicamos el esquema de forma programática
from pyspark.sql.functions import * from pyspark.sql.types import * schema = StructType([StructField('id', IntegerType()), StructField('name', StringType()), StructField('surname', StringType()), StructField('age', IntegerType())]) df = spark.read.format('csv')\ .option('sep', ';')\ .option('header', True)\ .schema(schema)\ .load('/FileStore/tables/user.csv')
4. Trabajando con Strings
translate
Resultado
+----------------------------+ |translate(surname, ern, 123)| +----------------------------+ | H123a3d1z| | H123a3d1z| | H123a3d1z| | Vazqu1z| | Illa3| | Guti1221z| | null| +----------------------------+
lpad
Resultado
trim
Resultado
lower
Resultado
upper
Resultado
initcap
Resultado
5. Trabajando con nulos
isNull
Resultado
+---+----------+-------+---+ | id| name|surname|age| +---+----------+-------+---+ | 6|Jose María| null| 80| +---+----------+-------+---+
drop
Resultado
+---+----------+---------+---+ | id| name| surname|age| +---+----------+---------+---+ | 1| Jorge|Hernandez| 32| | 2| Jorge|Hernandez| 63| | 3| Jose|Hernandez| 32| | 4| Barbara| Vazquez| 27| | 5| Jose| Illan| 28| | 6|Jose María|Gutierrez| 33| +---+----------+---------+---+
fill
Resultado
+---+----------+---------+---+ | id| name| surname|age| +---+----------+---------+---+ | 1| Jorge|Hernandez| 32| | 2| Jorge|Hernandez| 63| | 3| Jose|Hernandez| 32| | 4| Barbara| Vazquez| 27| | 5| Jose| Illan| 28| | 6|Jose María|Gutierrez| 33| | 6|Jose María| NULO| 80| +---+----------+---------+---+
replace
Resultado
+---+----------+---------+---+ | id| name| surname|age| +---+----------+---------+---+ | 1| Jorge| APELLIDO| 32| | 2| Jorge| APELLIDO| 63| | 3| Jose| APELLIDO| 32| | 4| Barbara| Vazquez| 27| | 5| Jose| Illan| 28| | 6|Jose María|Gutierrez| 33| | 6|Jose María| null| 80| +---+----------+---------+---+
6. Trabajando con expresiones regulares
regexp_replace
Para reemplazar partes de cadenas
Resultado
+------------------------------------------------+----------+ |regexp_replace(name, (Jorge|Jose|Barbara), PIPO)| name| +------------------------------------------------+----------+ | PIPO| Jorge| | PIPO| Jorge| | PIPO| Jose| | PIPO| Barbara| | PIPO| Jose| | PIPO María|Jose María| | PIPO María|Jose María| +------------------------------------------------+----------+
regexp_extract
Para búsqueda de cadenas a través de expresiones regulares
Resultado
+-------------------------------------------+----------+ |regexp_extract(name, Jorge|Jose|Barbara, 0)| name| +-------------------------------------------+----------+ | Jorge| Jorge| | Jorge| Jorge| | Jose| Jose| | Barbara| Barbara| | Jose| Jose| | Jose|Jose María| | Jose|Jose María| +-------------------------------------------+----------+
7. Trabajando con fechas
date_add, date_sub, datediff, months_between
spark.range(1)\ .select(current_date().alias('date'))\ .select(months_between(date_add(col('date'), 1), date_sub(col('date'), 1)), datediff(date_add(col('date'), 1), date_sub(col('date'), 1))).show()
Resultado
+----------------------------------------------------------+----------------------------------------------+ |months_between(date_add(date, 1), date_sub(date, 1), true)|datediff(date_add(date, 1), date_sub(date, 1))| +----------------------------------------------------------+----------------------------------------------+ | 0.06451613| 2| +----------------------------------------------------------+----------------------------------------------+
Convertir la fecha actual en string y viceserca
spark.range(1)\ .select(current_timestamp().alias('date').cast(StringType()))\ .select(unix_timestamp(col('date')).cast(TimestampType())).show()
Resultado
+------------------------------------------------------------+ |CAST(unix_timestamp(date, yyyy-MM-dd HH:mm:ss) AS TIMESTAMP)| +------------------------------------------------------------+ | 2019-11-17 17:03:52| +------------------------------------------------------------+
8. Trabajando con arrays
Buscar la palabra más repetida
spark.range(1)\ .select(lit('a b c a a b').alias('value'))\ .select(explode(split(col('value'), " ")).alias('word'))\ .groupby('word')\ .count()\ .sort(desc('count'))\ .limit(1)\ .show()
Resultado
size
spark.range(1)\ .select(split(lit('a b c d e'), ' ').alias('array'))\ .select(size(col('array'))).show()
Resultado
slice
spark.range(1)\ .select(split(lit('a b c d e'), ' ').alias('array'))\ .select(slice(col('array'), 1, 2)).show()
Resultado
array_contains
spark.range(1)\ .select(split(lit('a b c d e'), ' ').alias('array'))\ .select(array_contains(col('array'), 'd')).show()
Resultado
+------------------------+ |array_contains(array, d)| +------------------------+ | true| +------------------------+
9. Trabajando con Window
Para cada usuario obtener la máxima edad del usuario con el que comparta apellido
window = Window.partitionBy(col('surname'))\ .orderBy(col('age').desc()) df.select('name', 'age', max(col('age')).over(window).alias('max_age_family'), rank().over(window).alias('max_age_family')).show()
Resultado
+----------+---+--------------+--------------+ | name|age|max_age_family|max_age_family| +----------+---+--------------+--------------+ |Jose María| 33| 33| 1| | Barbara| 27| 27| 1| |Jose María| 80| 80| 1| | Jose| 28| 28| 1| | Jorge| 63| 63| 1| | Jorge| 32| 63| 2| | Jose| 32| 63| 2| +----------+---+--------------+--------------+
10. Trabajando con Json
get_json_object
spark.range(1)\ .select(expr(""" '{"a": {"b": [1, 2]}}'""").alias('j'))\ .select(get_json_object(col('j'), "$.a.b")).show()
Resultado
+-------------------------+ |get_json_object(j, $.a.b)| +-------------------------+ | [1,2]| +-------------------------+
get_json_object
spark.range(1)\ .select(expr(""" '{"a": {"b": [1, 2]}}'""")\ .alias('j'))\ .select(json_tuple(col('j'), "a")).show()
Resultado
11. Trabajando con Groupby
Calcular la edad máxima por apellido y ordenar descendentemente
df.dropna('any')\ .groupby('surname')\ .agg(max('age').alias('max_date'))\ .sort(col('max_date').desc())\ .show()
Resultado
+---------+--------+ | surname|max_date| +---------+--------+ |Hernandez| 63| |Gutierrez| 33| | Illan| 28| | Vazquez| 27| +---------+--------+
Calcular la edad media por apellido haciendo pivot por el nombre
Resultado
+---------+-------+-----+----+----------+ | surname|Barbara|Jorge|Jose|Jose María| +---------+-------+-----+----+----------+ |Gutierrez| null| null|null| 33.0| | Vazquez| 27.0| null|null| null| | null| null| null|null| 80.0| | Illan| null| null|28.0| null| |Hernandez| null| 47.5|32.0| null| +---------+-------+-----+----+----------+
12. Trabajando con Struct
struct
df.na.drop('any')\ .select(struct(col('name'), col('surname')).alias('struct'))\ .select('struct.*').show()
Resultado
+----------+---------+ | name| surname| +----------+---------+ | Jorge|Hernandez| | Jorge|Hernandez| | Jose|Hernandez| | Barbara| Vazquez| | Jose| Illan| |Jose María|Gutierrez| +----------+---------+
to_json, from_json
schema = StructType([StructField('name', StringType()), StructField('surname', StringType())]) df.na.drop('any')\ .select(struct(col('name'), col('surname')).alias('struct')) .select(to_json(col('struct')).alias('json'))\ .select(from_json(col('json'), schema)).show()
Resultado
+--------------------+ | jsontostructs(json)| +--------------------+ | [Jorge, Hernandez]| | [Jorge, Hernandez]| | [Jose, Hernandez]| | [Barbara, Vazquez]| | [Jose, Illan]| |[Jose María, Guti...| +--------------------+
13. Trabajando con Joins
join
df_city = spark.createDataFrame([[1, 'Las Palmas'], [2, 'Madrid']]).toDF('id', 'name') df.join(df_city, df.id == df_city.id).drop(df_city.id).show()
Resultado
+---+-----+---------+---+----------+ | id| name| surname|age| name| +---+-----+---------+---+----------+ | 1|Jorge|Hernandez| 32|Las Palmas| | 2|Jorge|Hernandez| 63| Madrid| +---+-----+---------+---+----------+
14. Escribiendo en ficheros
Escribir el dataframe en un fichero con formato parquet
df.write\ .format('parquet')\ .mode('overwrite')\ .partitionBy('surname')\ .save('/FileStore/tables/user.parquet')
14. Lista de Acciones vs Transformaciones Narrow vs Transformaciones Wide
Las transformaciones en Spark son lazy. Esto quiere decir que no se ejecutan hasta que una acción es realizada. Por ello es importante conocer la diferencia entre acciones y transformaciones.
Acciones
Son métodos que desencadenan el inicio de un job en el cluster.
- show
- collect
- take
- first
- forEach
- toLocalIterator
- save
- saveAsTable
Transformaciones Narrow
Son aquellas transformaciones en las que no hay shuffle de datos, es decir cada partición es capaz de resolver la tarea actual sin necesidad de comunicarse con otras particiones
- union
- filter / where
- drop
- withColumn
- withColumnRenamed
- limit
- join (cuando hay broadcast)
Transformaciones Wide
Son aquellas transformaciones en las que hay shuffle de datos, es decir cada partición necesita hablar con las otras para resolver la tarea actual
Exchange HashPartitions
- join
- sort
- groupby
- distinct
Exchange RoundRobinPartition
- repartition
Exchange Simple
- Todas las agregaciones, first, max, min, count, avg, …
Otras
- coalesce (Aunque no produce shuffle se le considera transformación wide)