This page looks best with JavaScript enabled

How to solve spark large amount of small files problem

 ·  ☕ 2 min read

How to solve spark large amout of small files problem

It’s a technical issue I solved when working.

Problen Description

In our data pipeline, an hourly spark job writes data to AWS S3.
The data and directory structure looks like this:

/job_name_start_time/event_timestamp_minute/data_1
/job_name_start_time/event_timestamp_minute/data_2
...

At time T the job will process all the events generated between T-1hour ~ T.

For example, an event generated by job JobX at 2020-03-04 10:30AM will probably be stored in a file at /JobX_2020-03-04-11:00/2020-03-04-10:30/data_n.

Sounds good? But events never arrive on time.
Usually in one batch of data there are something like
/JobX_2020-03-04-11:00/2020-03-04-9:58/data_x
or even
/JobX_2020-03-04-11:00/2020-03-03-23:58/data_x.
Upstream data delay is inevitable.

But most of the events in batch JobX_2020-03-04-11:00 are generated between 10:00 and 11:00, others are minorities

And we found that file sizes are skewed,
for batch JobX_2020-03-04-11:00 the distribution looks like

/2020-03-04-10:30/data_1 - 200MB
/2020-03-04-10:30/data_2 - 200MB
/2020-03-04-10:30/data_3 - 200MB
...
/2020-03-04-9:30/data_1 - 1MB
/2020-03-04-9:30/data_2 - 1MB
/2020-03-04-9:30/data_3 - 1MB
...

But ideally each file should be roughly the same size

/2020-03-04-10:30/data_1 - 201MB
/2020-03-04-10:30/data_2 - 201MB
/2020-03-04-10:30/data_3 - 201MB
...
/2020-03-04-9:30/data_1 - 201MB
...

This was the problem we were facing, large amount of small files in 2020-03-04-9:30 directory is probably decreasing reading performance.
Though tweaking with repartiion to solve this problem might increase the writing time, we still wanted to give it a try.

Background information

We used to have this piece of code to handle repartitioning and writting

data.repartition(config.repartition_num)
data.partitionBy(batch_id, minute(event_time)) 
// batch_id = job_name_start_time
...

Let’s first introduce how spark repartition and partitionedBy work.

partitionBy

partitionBy allows you to organize the physical directory structure when writing to it.

For example, partitionBy(col1, col2) will make your structure look like

/col1=A/col2=X/data...
/col1=A/col2=Y/data...
/col1=B/col2=X/data...

More detail: https://sparkbyexamples.com/pyspark/pyspark-partitionby-example/

repartition

repartition allows you to regroup data into partitions in memory, data in one partition will be processed together, increase partition number to parallelly process each partition if you have more machines.

data.repartition(n) <- repartitions data randomly to create n partitions
data.repartition(col) <- repartitions data with col to create 1 partition for each col value

Normally when you write, number of partitions is the number of files you have.

More detail: https://sparkbyexamples.com/spark/spark-repartition-vs-coalesce/

The abobe seems good, but if the data in one partition is splitted by partitionBy, it will still generate multiple files in different directories.
Since we used to partitioned data randomly, each partition contained a lot of T-1 ~ T dat and a little bit of everything else like T-5 ~ T6 or T-31 ~ T-30. When lots of partitions like this are written to disk, it creates lots of seprarte small files for data other than T-1 ~ T(lots of large files for T-1~T)

So the target was to repartition data to have data with the same batch_id, minute(event_time) in the same partition before writing to disk, and if a partition is too large it also needs to be splitted (hopefully 1 file is 1GB)

Solution

The solution is quite simple. Let’s say batch_id, minute(event_time) = partition_key, now we need to find a way to put rows with the same partition_key in the same partition with each partition size controlled.

First we estimate the size of each row by
row_size = length(to_json(struct(col("*")))),
then we can get how many partitions each partition_key needs by doing
partition_key_total_size = groupBy(partition_key).agg(sum(col(row_size))),
and
partition_number = ceil(col(partition_key_total_size)/ max_size_per_partition_key).

Now we can join the information back to the data with broadcast since split info is probably quite small.
df.join(broadcast(split_info), partition_key)

Now we have dataframe like this
if we set the max_size_per_partition_key to 10

+-------------------------------------------------------
|partition_key|partition_key_total_size|partition_number
|-------------------------------------------------------
|            x|                      10|               1
|            y|                      20|               2
|            y|                      20|               2
|            z|                      30|               3
|            z|                      30|               3
|            z|                      30|               3

For partition_key = z, we want there to be 3 partitions (0~2).
At first I thought we could just evenly distribute the rows into each partition. For example, go through rows and assigning them partition = row_number % 3, but that requires the data to be processed in the same executor (to label the row_number).
A better approach is to randomly assign each row to partition 0, 1 or 2. When the number of rows is large, it should be quite evenly distributed.

Then we add another column to indicate which

.withColumn("finalDecision", 
concat(
    col(partition_key), 
    lit("final_partition"),
    (rand()*col("partition_number")).cast("int")
)`

Finally we can do
df.repartition(col("finalDecision"))
to repartition the data and have size controled.
And the output file should be what we expected, no large amount of small files.

As the implementation completed, we found that compacting the files reduceed data processing time around 3% (slightly increase the writing time, but decrease the data reading)

Share on

Marko Peng
WRITTEN BY
Marko Peng
Good man