# Exploración de datos de pedidos

Utilizar el código de este cuaderno para explorar los datos de pedidos y realizar un análisis exploratorio de los datos.



**Ejemplo que carga en un dataframe los pedidos de 2019 para mostrarlo a continuación**. Usa opción de mostrar cabecera, aunque el resultado no parece adecuado.

In [2]:
df = spark.read.format("csv").option("header","true").load("Files/orders/2019.csv")
# df now is a Spark DataFrame containing CSV data from "Files/orders/2019.csv".
display(df) 

StatementMeta(, f5b6a23d-6f13-4530-bfc6-222179bbafa3, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 34163b8a-1002-49b4-a0ac-bf6f944853cc)

**Ejemplo que personaliza el esquema del dataframe**. El ejemplo carga en un dataframe los datos de pedidos de 2019 y lo muestra a continuación.

In [3]:
from pyspark.sql.types import *

orderSchema = StructType([
    StructField("SalesOrderNumber", StringType()),
    StructField("SalesOrderLineNumber", IntegerType()),
    StructField("OrderDate", DateType()),
    StructField("CustomerName", StringType()),
    StructField("Email", StringType()),
    StructField("Item", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("UnitPrice", FloatType()),
    StructField("Tax", FloatType())
    ])

df = spark.read.format("csv").schema(orderSchema).load("Files/orders/2019.csv")
display(df)

StatementMeta(, f5b6a23d-6f13-4530-bfc6-222179bbafa3, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 44bd004e-76d8-41e5-815b-a0b7d7fba899)

**Ejemplo que carga una carpeta completa en un dataframe con un esquema personalizado**

In [4]:
from pyspark.sql.types import *

orderSchema = StructType([
    StructField("SalesOrderNumber", StringType()),
    StructField("SalesOrderLineNumber", IntegerType()),
    StructField("OrderDate", DateType()),
    StructField("CustomerName", StringType()),
    StructField("Email", StringType()),
    StructField("Item", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("UnitPrice", FloatType()),
    StructField("Tax", FloatType())
    ])

df = spark.read.format("csv").schema(orderSchema).load("Files/orders/*.csv")
display(df)

StatementMeta(, f5b6a23d-6f13-4530-bfc6-222179bbafa3, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a70bf557-2da0-49be-a304-b58210af9365)

**Uso de `select` y operadores de agregación y valores únicos** 

In [5]:
customers = df.select("CustomerName", "Email")
print("Total de clientes: " + str(customers.count()))
print("Total de clientes únicos: " + str(customers.distinct().count()))
display(customers.distinct())

StatementMeta(, f5b6a23d-6f13-4530-bfc6-222179bbafa3, 7, Finished, Available, Finished)

Total de clientes: 32718
Total de clientes únicos: 12427


SynapseWidget(Synapse.DataFrame, d9b99ac3-4ce9-4b14-8a30-8327078addc7)

**Uso de `where`**

In [6]:
customers = df.select("CustomerName", "Email").where(df['Item']=='Road-250 Red, 52')
print("Total de clientes filtrados: " + str(customers.count()))
print("Total de clientes filtrados diferentes: " + str(customers.distinct().count()))
display(customers.distinct())

StatementMeta(, f5b6a23d-6f13-4530-bfc6-222179bbafa3, 8, Finished, Available, Finished)

Total de clientes filtrados: 133
Total de clientes filtrados diferentes: 133


SynapseWidget(Synapse.DataFrame, dcbf2f50-4917-4132-95da-0fb7eb131dcd)

**Uso del `groupby`**: El ejemplo agrupa por la columna `Item` y luego aplica la función de agregación `sum` a todas las columnas numéricas para calcular el total de ventas por producto.

In [7]:
productSales = df.select("Item", "Quantity").groupBy("Item").sum()
display(productSales)

StatementMeta(, f5b6a23d-6f13-4530-bfc6-222179bbafa3, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1b4dbfe9-40c0-41cb-904a-852e066d0a1b)

**Uso de funciones de fecha**: Calcular las ventas anuales y mostrar los resultados en una tabla. El ejemplo agrupa por la columna `OrderDate` y luego aplica la función de agregación `sum` a todas las columnas numéricas para calcular las ventas anuales presentando los datos ordenados por año.

In [8]:
from pyspark.sql.functions import *

yearlySales = df.select(year("OrderDate").alias("Year")).groupBy("Year").count().orderBy("Year")
display(yearlySales)

StatementMeta(, f5b6a23d-6f13-4530-bfc6-222179bbafa3, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a4e111f6-8904-4901-ae9b-1470fa5059e5)

In [9]:
# Añadir una nueva columna TotalPrice al DataFrame que contiene el precio total de cada pedido
transformed_df = df.withColumn("TotalPrice", df["Quantity"] * df["UnitPrice"])

# Crear nuevas columnas Year y Month a partir de la columna OrderDate que muestren el año y el mes de cada pedido
transformed_df = transformed_df.withColumn("Year", year(col("OrderDate"))).withColumn("Month", month(col("OrderDate")))

# Crear nuevas columnas FirstName y LastName a partir de la columna CustomerName que muestren el nombre y el apellido de cada cliente
transformed_df = transformed_df.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))

# Seleccionar sólo las columnas necesarias para el análisis
transformed_df = transformed_df["SalesOrderNumber", "SalesOrderLineNumber", "OrderDate", "Year", "Month", "FirstName", "LastName", "Email", "Item", "Quantity", "UnitPrice", "Tax", "TotalPrice"]

# Mostrar las primeras 10 filas del DataFrame transformado
display(transformed_df.limit(10))

StatementMeta(, f5b6a23d-6f13-4530-bfc6-222179bbafa3, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a630a948-b020-43ae-acb4-4620a76dfcfd)

**Guardar los datos procesados**: Se usar Parquet para almacenar los datos en una carpeta `transformed_data` del DataLake

In [10]:
transformed_df.write.mode("overwrite").parquet('Files/transformed_data/orders')
print ("Datos transformados guardados en Parquet")

StatementMeta(, f5b6a23d-6f13-4530-bfc6-222179bbafa3, 12, Finished, Available, Finished)

Datos transformados guardados en Parquet


**Guardar los datos particionados**: A la hora de guardar datos transformados es habitual crear particiones en los datos para mejorar el rendimiento de las consultas. Las particiones dividen los datos en subconjuntos más pequeños que se almacenan en carpetas separadas. En Spark, las particiones se pueden crear mediante el método `partitionBy` que permite particionar los datos por una o varias columnas.

In [11]:
transformed_df.write.mode("overwrite").partitionBy("Year", "Month").parquet('Files/partitioned_data/orders')
print ("Datos transformados guardados en Parquet particionado")

StatementMeta(, f5b6a23d-6f13-4530-bfc6-222179bbafa3, 13, Finished, Available, Finished)

Datos transformados guardados en Parquet particionado


**Uso de archivos particionados**: Los archivos particionados proporcionan una forma más eficiente de trabajar con datos procesados.

In [13]:
orders_2019_df = spark.read.format("parquet").load("Files/partitioned_data/orders/Year=2019/Month=*")
display(orders_2019_df)

StatementMeta(, f5b6a23d-6f13-4530-bfc6-222179bbafa3, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a88ccb57-fee1-4d26-a105-5cb6f10dda29)

**SQL en Spark**: Guardar los datos del dataframe de ventas en una tabla denominada `salesorders`. Mediante `DESCRIBE` podemos obtener la estructura de la tabla.

In [14]:
# Guardar los datos del DataFrame de ventas en una tabla denominada salesorders
df.write.format("delta").saveAsTable("salesorders")

# Mostrar la estructura de la tabla salesorders
spark.sql("DESCRIBE salesorders").show()

StatementMeta(, f5b6a23d-6f13-4530-bfc6-222179bbafa3, 16, Finished, Available, Finished)

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|    SalesOrderNumber|   string|   NULL|
|SalesOrderLineNumber|      int|   NULL|
|           OrderDate|     date|   NULL|
|        CustomerName|   string|   NULL|
|               Email|   string|   NULL|
|                Item|   string|   NULL|
|            Quantity|      int|   NULL|
|           UnitPrice|    float|   NULL|
|                 Tax|    float|   NULL|
+--------------------+---------+-------+



**Consultas SQL en Spark**: Consulta sobre la tabla `salesorders` para calcular el total de ventas por año incluyendo impuestos y mostrar los resultados en una tabla ordenados por año. 

In [15]:
%%sql
SELECT YEAR(OrderDate) AS OrderYear,
       SUM((UnitPrice * Quantity) + Tax) AS GrossRevenue
FROM salesorders
GROUP BY YEAR(OrderDate)
ORDER BY OrderYear;

StatementMeta(, f5b6a23d-6f13-4530-bfc6-222179bbafa3, 17, Finished, Available, Finished)

<Spark SQL result set with 3 rows and 2 fields>