Оптимізація запитів у Spark

Моделі та методи обробки великих даних

Ігор Мірошниченко

КНУ імені Тараса Шевченка, ФІТ

Партиціювання

План лекції

flowchart LR
    A["Партиціювання"] --> B["repartition<br/>vs coalesce"]
    B --> C["Проблема<br/>маленьких файлів"]
    C --> D["Data Skew"]
    D --> E["Spark UI"]
    E --> G["Методи<br/>оптимізації"]
    style A fill:#f9b928,stroke:#333,stroke-width:3px

Що таке партиція?

flowchart LR
    DF["DataFrame<br/>1 млн рядків"] --> P1["Partition 1<br/>250K рядків"]
    DF --> P2["Partition 2<br/>250K рядків"]
    DF --> P3["Partition 3<br/>250K рядків"]
    DF --> P4["Partition 4<br/>250K рядків"]
    P1 --> T1["Task 1<br/>Executor 1"]
    P2 --> T2["Task 2<br/>Executor 2"]
    P3 --> T3["Task 3<br/>Executor 1"]
    P4 --> T4["Task 4<br/>Executor 2"]
    style DF fill:#f9b928,stroke:#333,stroke-width:2px

Партиція — це логічна одиниця паралелізму в Spark:

  • Кожна партиція обробляється одним Task на одному ядрі
  • Більше партицій → більше паралелізму (до кількості ядер)
  • Менше партицій → менший overhead координації

Налаштування кількості партицій

# За замовчуванням після Shuffle — 200 партицій
print(f"spark.sql.shuffle.partitions = {spark.conf.get('spark.sql.shuffle.partitions')}")
spark.sql.shuffle.partitions = 4
# Зміна кількості shuffle-партицій
spark.conf.set("spark.sql.shuffle.partitions", 400)

# Зміна паралелізму для RDD-операцій
spark.conf.set("spark.default.parallelism", 400)

Формула для розрахунку:

\[\text{partitions} = \text{num_executors} \times \text{executor_cores} \times (2 \ldots 4)\]

Цільовий розмір партиції: ~128 MB

Порада

Spark 3.0+ з AQE (Adaptive Query Execution) автоматично оптимізує кількість партицій!

Перевірка кількості партицій

df = spark.range(0, 100000)
print(f"Початкових партицій: {df.rdd.getNumPartitions()}")

# Після groupBy — кількість = spark.sql.shuffle.partitions
df_grouped = df.groupBy(F.col("id") % 10).count()
print(f"Після groupBy: {df_grouped.rdd.getNumPartitions()}")
Початкових партицій: 1
Після groupBy: 1
# Зміна кількості партицій
df_repart = df.repartition(8)
print(f"Після repartition(8): {df_repart.rdd.getNumPartitions()}")

df_coal = df_repart.coalesce(2)
print(f"Після coalesce(2): {df_coal.rdd.getNumPartitions()}")
Після repartition(8): 8
Після coalesce(2): 2

repartition vs coalesce

Коли використовувати repartition

flowchart TD
    subgraph "repartition(4)"
        direction TB
        R1["Partition 1<br/>■ ■ ■ ■ ■ ■ ■ ■"] --> RS["Full Shuffle<br/>(Round Robin / Hash)"]
        R2["Partition 2<br/>■ ■"] --> RS
        RS --> RO1["Partition 1<br/>■ ■ ■"]
        RS --> RO2["Partition 2<br/>■ ■"]
        RS --> RO3["Partition 3<br/>■ ■ ■"]
        RS --> RO4["Partition 4<br/>■ ■"]
    end

# Round-robin repartition — рівномірний розподіл
df = spark.createDataFrame(
    [(i, f"item_{i}", i * 10.0) for i in range(1000)],
    schema=["id", "name", "value"]
)
df_rp = df.repartition(4)
print(f"Партицій: {df_rp.rdd.getNumPartitions()}")
Партицій: 4

Коли: потрібно збільшити кількість партицій або вирівняти їх розмір

repartition за стовпцем

# repartition за стовпцем — дані з однаковим ключем потрапляють в одну партицію
df_by_name = df.withColumn("category", F.when(F.col("id") % 3 == 0, "A")
                                         .when(F.col("id") % 3 == 1, "B")
                                         .otherwise("C"))

df_repart_col = df_by_name.repartition(3, "category")
print(f"Партицій: {df_repart_col.rdd.getNumPartitions()}")
Партицій: 3

flowchart LR
    subgraph "До"
        P1["Part 1: A,B,C,A,B"]
        P2["Part 2: C,A,B,C,A"]
    end
    subgraph "Після repartition('category')"
        PA["Part 1: A,A,A,A"]
        PB["Part 2: B,B,B"]
        PC["Part 3: C,C,C,C"]
    end
    P1 --> PA
    P1 --> PB
    P2 --> PB
    P2 --> PC

  • repartition(n, "col") — Hash-розподіл за значеннями стовпця
  • Рядки з однаковим ключем гарантовано в одній партиції

Коли використовувати coalesce

flowchart TD
    subgraph "coalesce(2)"
        direction TB
        C1["Partition 1<br/>■ ■ ■"] --> CO1["Partition 1<br/>■ ■ ■ ■ ■"]
        C2["Partition 2<br/>■ ■"] --> CO1
        C3["Partition 3<br/>■ ■ ■ ■"] --> CO2["Partition 2<br/>■ ■ ■ ■"]
    end

# coalesce — зменшення без Shuffle
df_many = df.repartition(8)
print(f"До: {df_many.rdd.getNumPartitions()}")

df_few = df_many.coalesce(2)
print(f"Після coalesce(2): {df_few.rdd.getNumPartitions()}")
До: 8
Після coalesce(2): 2

Коли: потрібно зменшити кількість партицій (перед записом, щоб менше файлів)

Попередження

coalesce не може збільшити кількість партицій! Для цього використовуйте repartition.

Порівняння: repartition vs coalesce

Характеристика repartition(n) coalesce(n)
Shuffle Так (повний) Ні
Збільшення партицій Так Ні
Зменшення партицій Так Так
Рівномірність Рівномірний Нерівномірний
Швидкість Повільніше Швидше
Типове використання Збільшити / вирівняти Зменшити перед записом
# Типовий патерн перед записом
df.coalesce(1).write.mode("overwrite").parquet("output/single_file/")

# Типовий патерн для вирівнювання
df.repartition(8).write.mode("overwrite").parquet("output/balanced/")

# Партиціювання за стовпцем
df.repartition("city").write.partitionBy("city").parquet("output/by_city/")

Проблема маленьких файлів

Small File Problem

flowchart TD
    subgraph "Проблема: 1000 маленьких файлів"
        F1["file_001.parquet<br/>1 MB"]
        F2["file_002.parquet<br/>1 MB"]
        F3["file_003.parquet<br/>1 MB"]
        F4["..."]
        F5["file_1000.parquet<br/>1 MB"]
    end
    subgraph "Рішення: 8 великих файлів"
        B1["part-0001.parquet<br/>125 MB"]
        B2["part-0002.parquet<br/>125 MB"]
        B3["..."]
        B4["part-0008.parquet<br/>125 MB"]
    end
    F1 --> |"coalesce(8)"| B1
    style F1 fill:#fde8e8,stroke:#c10000
    style F2 fill:#fde8e8,stroke:#c10000
    style F3 fill:#fde8e8,stroke:#c10000
    style F5 fill:#fde8e8,stroke:#c10000
    style B1 fill:#d9f6ec,stroke:#28a87d
    style B2 fill:#d9f6ec,stroke:#28a87d
    style B4 fill:#d9f6ec,stroke:#28a87d

Проблеми маленьких файлів:

  • Кожен файл = окрема партиція = окремий Task → великий overhead
  • Навантаження на NameNode (HDFS) або S3 API (rate limits)
  • Повільне читання: багато відкриттів/закриттів файлів
  • Неефективне стиснення (Parquet/ORC оптимізовані для великих блоків)

Як уникнути маленьких файлів

При записі:

# coalesce перед записом
df.coalesce(8).write.parquet("output/")

# repartition для рівномірності
df.repartition(8).write.parquet("output/")

# Контроль розміру партицій
spark.conf.set(
    "spark.sql.files.maxPartitionBytes",
    "134217728"  # 128 MB
)

При читанні:

# Об'єднання маленьких файлів
spark.conf.set(
    "spark.sql.files.maxPartitionBytes",
    "134217728"  # 128 MB
)

# AQE автоматично об'єднує
spark.conf.set(
    "spark.sql.adaptive.enabled",
    True
)
spark.conf.set(
    "spark.sql.adaptive.coalescePartitions.enabled",
    True
)

Порада

Правило: цільовий розмір файлу — 128 MB – 1 GB. Занадто маленькі файли — overhead, занадто великі — повільне читання.

Data Skew (перекіс даних)

Що таке Data Skew?

flowchart TD
    subgraph "Нормальний розподіл"
        N1["Part 1<br/>25% даних"]
        N2["Part 2<br/>25% даних"]
        N3["Part 3<br/>25% даних"]
        N4["Part 4<br/>25% даних"]
    end
    subgraph "Data Skew"
        S1["Part 1<br/>5% даних<br/>✓ 2 хв"]
        S2["Part 2<br/>5% даних<br/>✓ 2 хв"]
        S3["Part 3<br/>5% даних<br/>✓ 2 хв"]
        S4["Part 4<br/>85% даних<br/>⏱ 40 хв"]
    end
    style S4 fill:#fde8e8,stroke:#c10000,stroke-width:3px

Data Skew — нерівномірний розподіл даних між партиціями:

  • Одна партиція містить набагато більше даних, ніж інші
  • Весь Stage чекає на найповільніший Task
  • Часта причина: groupBy або join по ключу з нерівномірним розподілом

Приклад Data Skew

# Створимо DataFrame з перекосом: 80% даних — "Київ"
import random
random.seed(42)

cities = ["Київ"] * 8000 + ["Львів"] * 1000 + ["Одеса"] * 500 + ["Харків"] * 500
data = [(i, cities[i], random.randint(100, 10000)) for i in range(len(cities))]

df_skew = spark.createDataFrame(data, schema=["id", "city", "amount"])
df_skew.groupBy("city").count().show()
+------+-----+
|  city|count|
+------+-----+
|  Київ| 8000|
| Львів| 1000|
|Харків|  500|
| Одеса|  500|
+------+-----+

Рішення 1: Broadcast Hash Join

flowchart LR
    subgraph "Звичайний Join (Shuffle)"
        A1["DataFrame A<br/>100 GB"] --> SH["Shuffle<br/>обидва DF"]
        A2["DataFrame B<br/>50 GB"] --> SH
        SH --> J1["Join"]
    end
    
    subgraph "Broadcast Join"
        B1["DataFrame A<br/>100 GB"] --> BJ["Join<br/>(без Shuffle A)"]
        B2["DataFrame B<br/>100 MB"] --> |"broadcast<br/>на всі executor'и"| BJ
    end

    %% Невидимий лінк для горизонтального вирівнювання блоків
    J1 ~~~ B1

    style SH fill:#fde8e8,stroke:#c10000
    style BJ fill:#d9f6ec,stroke:#28a87d

# Маленька таблиця — довідник міст
cities_info = spark.createDataFrame([
    ("Київ", "Центр"), ("Львів", "Захід"),
    ("Одеса", "Південь"), ("Харків", "Схід"),
], schema=["city", "region"])

# Broadcast Join — маленька таблиця копіюється на всі executor'и
result = df_skew.join(F.broadcast(cities_info), "city")
result.show(5)
+----+---+------+------+
|city| id|amount|region|
+----+---+------+------+
|Київ|  0|  1924| Центр|
|Київ|  1|   509| Центр|
|Київ|  2|  4606| Центр|
|Київ|  3|  4112| Центр|
|Київ|  4|  3757| Центр|
+----+---+------+------+
only showing top 5 rows

Поріг Broadcast Join

# Поточний поріг автоматичного broadcast
print(f"Поріг broadcast: {spark.conf.get('spark.sql.autoBroadcastJoinThreshold')} байт")
Поріг broadcast: 10485760b байт
# Збільшити поріг до 100 MB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 104857600)

# Вимкнути автоматичний broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
# Явний broadcast через hint
df_big.join(df_small.hint("broadcast"), "key")

# Або через функцію
from pyspark.sql.functions import broadcast
df_big.join(broadcast(df_small), "key")

Рішення 2: Key Salting

flowchart TD
    subgraph "До salting"
        K1["Київ → 8000 записів<br/>(одна партиція)"]
    end
    subgraph "Після salting"
        KS1["Київ_1 → 1600"]
        KS2["Київ_2 → 1600"]
        KS3["Київ_3 → 1600"]
        KS4["Київ_4 → 1600"]
        KS5["Київ_5 → 1600"]
    end
    K1 --> |"salt = rand() % 5"| KS1
    style K1 fill:#fde8e8,stroke:#c10000
    style KS1 fill:#d9f6ec,stroke:#28a87d
    style KS2 fill:#d9f6ec,stroke:#28a87d
    style KS3 fill:#d9f6ec,stroke:#28a87d
    style KS4 fill:#d9f6ec,stroke:#28a87d
    style KS5 fill:#d9f6ec,stroke:#28a87d

# Key Salting: додаємо випадковий суфікс до ключа
NUM_SALTS = 5

df_salted = df_skew.withColumn(
    "city_salted",
    F.concat(F.col("city"), F.lit("_"), (F.floor(F.rand(seed=42) * NUM_SALTS) + 1).cast("string"))
)
df_salted.select("city", "city_salted", "amount").show(5, truncate=False)
+----+-----------+------+
|city|city_salted|amount|
+----+-----------+------+
|Київ|Київ_4     |1924  |
|Київ|Київ_3     |509   |
|Київ|Київ_5     |4606  |
|Київ|Київ_2     |4112  |
|Київ|Київ_4     |3757  |
+----+-----------+------+
only showing top 5 rows

Key Salting: агрегація

# Крок 1: агрегація по salted-ключу (розподілене)
partial = df_salted.groupBy("city_salted").agg(
    F.sum("amount").alias("partial_sum"),
    F.count("*").alias("partial_count"),
)
partial.show(truncate=False)
+-----------+-----------+-------------+
|city_salted|partial_sum|partial_count|
+-----------+-----------+-------------+
|Львів_3    |1154935    |224          |
|Одеса_5    |594503     |109          |
|Одеса_4    |382119     |85           |
|Київ_3     |8312474    |1641         |
|Львів_5    |1032776    |198          |
|Одеса_2    |477453     |97           |
|Харків_1   |661094     |128          |
|Харків_3   |490280     |105          |
|Київ_4     |8048091    |1572         |
|Київ_5     |7646051    |1527         |
|Київ_2     |8137478    |1597         |
|Київ_1     |8626148    |1663         |
|Львів_2    |852176     |174          |
|Одеса_1    |568040     |112          |
|Львів_1    |901333     |185          |
|Львів_4    |1106438    |219          |
|Одеса_3    |479984     |97           |
|Харків_2   |464676     |93           |
|Харків_5   |419369     |87           |
|Харків_4   |507188     |87           |
+-----------+-----------+-------------+
# Крок 2: витягуємо оригінальний ключ та фінальна агрегація
final = (
    partial
    .withColumn("city", F.regexp_extract("city_salted", r"^(.+)_\d+$", 1))
    .groupBy("city")
    .agg(
        F.sum("partial_sum").alias("total_amount"),
        F.sum("partial_count").alias("total_count"),
    )
)
final.show(truncate=False)
+------+------------+-----------+
|city  |total_amount|total_count|
+------+------------+-----------+
|Львів |5047658     |1000       |
|Київ  |40770242    |8000       |
|Харків|2542607     |500        |
|Одеса |2502099     |500        |
+------+------------+-----------+

Рішення 3: Adaptive Query Execution (AQE)

# Увімкнення AQE (за замовчуванням у Spark 3.2+)
spark.conf.set("spark.sql.adaptive.enabled", True)

# Автоматичне об'єднання маленьких партицій
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", True)

# Автоматична оптимізація Skew Join
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)

# Автоматичний перехід на Broadcast Join
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", True)

AQE автоматично:

  • Об’єднує маленькі партиції
  • Розбиває великі (skewed) партиції
  • Конвертує Sort Merge Join у Broadcast Join

flowchart LR
    Q["Запит"] --> P["План виконання"]
    P --> S1["Stage 1"]
    S1 --> |"збір статистики"| AQE["AQE<br/>оптимізація"]
    AQE --> S2["Stage 2<br/>(оптимізований)"]
    style AQE fill:#f9b928,stroke:#333,stroke-width:2px

Spark UI

Моніторинг через Spark UI

Spark UI доступний за адресою http://localhost:4040 під час роботи SparkSession.

flowchart TD
    UI["Spark UI<br/>:4040"]
    UI --> J["Jobs<br/>список Job'ів"]
    UI --> S["Stages<br/>деталі Stage'ів"]
    UI --> ST["Storage<br/>кешовані дані"]
    UI --> E["Executors<br/>ресурси"]
    UI --> SQL["SQL/DataFrame<br/>плани запитів"]
    UI --> ENV["Environment<br/>конфігурація"]
    style UI fill:#f9b928,stroke:#333,stroke-width:2px

Вкладка Що показує Коли дивитися
Jobs Список Job’ів, статус, тривалість Загальний огляд
Stages Shuffle Read/Write, Task distribution Data Skew, Shuffle overhead
SQL Logical/Physical план, метрики Оптимізація запитів
Executors Пам’ять, ядра, GC OOM, underutilization

explain: аналіз плану запиту

# Простий план
df_skew.filter(F.col("city") == "Київ").groupBy("city").agg(F.sum("amount")).explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[city#19], functions=[sum(amount#20L)])
   +- Exchange hashpartitioning(city#19, 4), ENSURE_REQUIREMENTS, [plan_id=437]
      +- HashAggregate(keys=[city#19], functions=[partial_sum(amount#20L)])
         +- Project [city#19, amount#20L]
            +- Filter (isnotnull(city#19) AND (city#19 = Київ))
               +- Scan ExistingRDD[id#18L,city#19,amount#20L]

# Розширений план з усіма фазами
df_skew.filter(F.col("city") == "Київ").groupBy("city").agg(F.sum("amount")).explain(True)
== Parsed Logical Plan ==
'Aggregate ['city], ['city, unresolvedalias('sum('amount))]
+- Filter (city#19 = Київ)
   +- LogicalRDD [id#18L, city#19, amount#20L], false

== Analyzed Logical Plan ==
city: string, sum(amount): bigint
Aggregate [city#19], [city#19, sum(amount#20L) AS sum(amount)#127L]
+- Filter (city#19 = Київ)
   +- LogicalRDD [id#18L, city#19, amount#20L], false

== Optimized Logical Plan ==
Aggregate [city#19], [city#19, sum(amount#20L) AS sum(amount)#127L]
+- Project [city#19, amount#20L]
   +- Filter (isnotnull(city#19) AND (city#19 = Київ))
      +- LogicalRDD [id#18L, city#19, amount#20L], false

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[city#19], functions=[sum(amount#20L)], output=[city#19, sum(amount)#127L])
   +- Exchange hashpartitioning(city#19, 4), ENSURE_REQUIREMENTS, [plan_id=458]
      +- HashAggregate(keys=[city#19], functions=[partial_sum(amount#20L)], output=[city#19, sum#129L])
         +- Project [city#19, amount#20L]
            +- Filter (isnotnull(city#19) AND (city#19 = Київ))
               +- Scan ExistingRDD[id#18L,city#19,amount#20L]

Методи оптимізації

1. Раннє фільтрування

flowchart LR
    subgraph "Погано ❌"
        A1["10 GB даних"] --> J1["JOIN<br/>(Shuffle 10 GB)"]
        A2["5 GB"] --> J1
        J1 --> F1["filter<br/>(залишаємо 1%)"]
    end
    
    subgraph "Добре ✅"
        B1["10 GB даних"] --> F2["filter<br/>(залишаємо 1%)"]
        F2 --> J2["JOIN<br/>(Shuffle 100 MB)"]
        B2["5 GB"] --> J2
    end

    %% Невидимий зв'язок для вирівнювання підграфіків по горизонталі
    F1 ~~~ B1

    style F1 fill:#fde8e8,stroke:#c10000
    style F2 fill:#d9f6ec,stroke:#28a87d

# ❌ Погано: фільтр після join
result = df_big.join(df_small, "key").filter(col("date") > "2024-01-01")

# ✅ Добре: фільтр перед join
result = df_big.filter(col("date") > "2024-01-01").join(df_small, "key")

Порада

Правило: фільтруйте якомога раніше, щоб зменшити обсяг Shuffle.

2. Уникайте UDF

UDF (User-Defined Function):

from pyspark.sql.functions import udf

# ❌ UDF — повільно
@udf(returnType=DoubleType())
def calc_bonus(salary):
    if salary > 50000:
        return salary * 0.15
    elif salary > 40000:
        return salary * 0.10
    else:
        return salary * 0.05

df.withColumn("bonus", calc_bonus("salary"))
  • Серіалізація Python ↔︎ JVM
  • Немає оптимізації Catalyst
  • Працює по-рядково

Вбудовані функції:

# ✅ Вбудовані — швидко
df.withColumn("bonus",
    F.when(F.col("salary") > 50000,
           F.col("salary") * 0.15)
     .when(F.col("salary") > 40000,
           F.col("salary") * 0.10)
     .otherwise(F.col("salary") * 0.05)
)
  • Виконується в JVM
  • Оптимізується Catalyst
  • Працює з Tungsten (бінарний формат)

3. Кешування

# cache() = persist(StorageLevel.MEMORY_AND_DISK)
df_cached = df_skew.filter(F.col("amount") > 1000).cache()

# Перший виклик — обчислення + збереження в пам'ять
t1 = time.time()
df_cached.count()
time_first = time.time() - t1

# Другий виклик — читання з кешу
t2 = time.time()
df_cached.count()
time_cached = time.time() - t2

print(f"Перший виклик: {time_first:.3f} с")
print(f"З кешу: {time_cached:.3f} с")
Перший виклик: 0.710 с
З кешу: 0.043 с
# Звільнення кешу
df_cached.unpersist()
id city amount
0 Київ 1924
2 Київ 4606
3 Київ 4112
4 Київ 3757
5 Київ 2386
6 Київ 1779
7 Київ 9035
8 Київ 1524
9 Київ 9774
10 Київ 7012
only showing top 10 rows

Коли кешувати, а коли ні

Кешуйте ✅:

  • Дані використовуються кілька разів
  • Після дорогої операції (join, groupBy)
  • Для ітеративних алгоритмів (ML)
  • Дані для інтерактивного аналізу
# Типовий патерн
filtered = df.filter(...).cache()
report1 = filtered.groupBy("a").count()
report2 = filtered.groupBy("b").sum("c")
filtered.unpersist()

Не кешуйте ❌:

  • Дані використовуються один раз
  • Дані занадто великі для пам’яті
  • Дані змінюються часто
  • Прості операції без Shuffle
# persist з рівнями зберігання
from pyspark import StorageLevel

# Тільки пам'ять
df.persist(StorageLevel.MEMORY_ONLY)

# Пам'ять + диск (за замовчуванням)
df.persist(StorageLevel.MEMORY_AND_DISK)

# Серіалізований (менше пам'яті)
df.persist(StorageLevel.MEMORY_ONLY_SER)

4. Налаштування Executor’ів

flowchart TD
    C["Кластер<br/>48 ядер, 192 GB RAM"]
    C --> E1["Executor 1<br/>4 ядра, 24 GB"]
    C --> E2["Executor 2<br/>4 ядра, 24 GB"]
    C --> E3["Executor 3<br/>4 ядра, 24 GB"]
    C --> E4["..."]
    C --> E6["Executor 6<br/>4 ядра, 24 GB"]
    style C fill:#f9b928,stroke:#333,stroke-width:2px

# Типова конфігурація для кластера 48 ядер / 192 GB
spark = (
    SparkSession.builder
    .config("spark.executor.instances", "6")     # кількість executor'ів
    .config("spark.executor.cores", "4")          # ядер на executor
    .config("spark.executor.memory", "24g")       # RAM на executor
    .config("spark.driver.memory", "8g")          # RAM для Driver
    .config("spark.memory.fraction", "0.6")       # частка RAM для обчислень
    .config("spark.memory.storageFraction", "0.5") # частка для кешу
    .getOrCreate()
)

5. Формати даних

Порада

Використовуйте Parquet або ORC як основний формат зберігання. CSV/JSON — тільки для імпорту/експорту.

Зведення оптимізацій

Чеклист оптимізації

Метод Коли Ефект
Раннє фільтрування Завжди Зменшує Shuffle
Broadcast Join Маленька таблиця (< 100 MB) Усуває Shuffle join
Key Salting Data Skew при groupBy/join Рівномірний розподіл
AQE Spark 3.0+ Автоматична оптимізація
Parquet/ORC Завжди Стиснення + Column Pruning
Кешування Повторне використання Уникає повторних обчислень
Уникати UDF Завжди, якщо є аналог 10-100x швидше
coalesce Перед записом Менше маленьких файлів
Налаштування executor’ів Продакшн Ефективне використання ресурсів
partitionBy Великі таблиці Predicate Pushdown

Порядок дій при повільному запиті

flowchart LR
    A["Запит повільний"] --> B["df.explain()"]
    B --> C{"Є Shuffle?"}
    C --> |"Так"| D["Чи можна фільтрувати раніше?"]
    C --> |"Ні"| E["Перевірити формат даних"]
    D --> |"Так"| F["Додати filter до join/groupBy"]
    D --> |"Ні"| G{"Data Skew?"}
    G --> |"Так"| H["Broadcast / Salting / AQE"]
    G --> |"Ні"| I["Spark UI → Stages → Task time"]
    I --> J{"Нерівні Tasks?"}
    J --> |"Так"| K["repartition / coalesce"]
    J --> |"Ні"| L["Збільшити ресурси / кеш"]
    style A fill:#fde8e8,stroke:#c10000
    style F fill:#d9f6ec,stroke:#28a87d
    style H fill:#d9f6ec,stroke:#28a87d
    style K fill:#d9f6ec,stroke:#28a87d
    style L fill:#d9f6ec,stroke:#28a87d

Підсумок

Що ми вивчили

  1. Партиціювання та кількість партицій
  2. repartition vs coalesce
  3. Проблема маленьких файлів
  4. Data Skew та його діагностика
  5. Broadcast Hash Join
  1. Key Salting
  2. Adaptive Query Execution (AQE)
  3. Spark UI та explain()
  4. Раннє фільтрування
  5. Кешування, UDF, формати даних

Ресурси

Домашнє завдання

  1. Створіть DataFrame з >100K рядків та Data Skew (80% записів — один ключ)
  2. Виконайте groupBy та вимірте час виконання
  3. Реалізуйте Key Salting та порівняйте час
  4. Виконайте Broadcast Join з маленькою таблицею (<100 рядків)
  5. Використайте explain() для аналізу плану обох join’ів (звичайний vs broadcast)
  6. Запишіть дані у CSV, JSON та Parquet — порівняйте розміри файлів
  7. Продемонструйте coalesce vs repartition: кількість файлів при записі
  8. Увімкніть AQE та покажіть зміну плану (explain())

Дякую за увагу!



Матеріали курсу

ihor.miroshnychenko@knu.ua

Data Mirosh

@ihormiroshnychenko

@aranaur