Improve Spark performance
This section contains some improvements you can employ for your Spark jobs.
Decreasing startup time for small jobs
When processing little data using spark you can speed up your job by using mode=local
,
instead of launching a driver and x executors.
With this setting your job will run on a single machine, decreasing the time to start up the job.
Increase write performance to S3
To increase write performance to S3 you can turn on s3_committer=magic
in your configuration.
If you are writing a lot of data to S3 this should increase the speed of your job tremendously.
Use multiple disks
When shuffling a lot of data you may benefit from setting
the executor_disk_size
option on larger instances (mx.2xlarge
and mx.4xlarge
).
Multiple different volumes will be mounted. When you are shuffling TB's of data,
mounting multiple disks will increase throughput by a factor of up to 2x
depending on the workload.
More information on this feature can be found in the technical reference.
Decide on the number of cores per executor
Within Conveyor, you can choose the number of executors as well as the instance_type
for these executors.
This allows you to use T-shirt sizes for your executors. Unfortunately, the ratio between the number of cores
and the available memory is fixed for m,c,r instance families. This might be an issue if the Spark tasks require a lot
of memory to run as the memory per Spark task is per core and equals executor memory / cores
.
In the edge case where you want to increase the available memory per core, you should thus reduce
the number of cores per executor. This way your tasks might be able to finish instead of going OOM.
You can find the available memory and cores for each instance type in the technical reference.
Reducing the number of tasks that can run in parallel on an executor can be achieved by specifying the spark.executor.cores
option in your Spark configuration:
from conveyor.operators import ConveyorSparkSubmitOperatorV2
pi_big_node_task = ConveyorSparkSubmitOperatorV2(
dag=dag,
task_id="executor-reduced-exec-cores",
driver_instance_type="mx_large",
executor_instance_type="cx_2xlarge",
aws_role=role,
conf={
'spark.executor.cores': '2',
},
application="local:///opt/spark/work-dir/src/greeter/main.py",
application_args=["--date", "{{ ds }}"],
)
The result will be:
- each task will have more memory and thus is less likely to go OOM
- as you have reduced the parallelism, the job might take longer to finish if you have a lot of tasks.