ConveyorSparkSubmitOperatorV2
The ConveyorSparkSubmitOperatorV2
provides an easy way to run a Spark job containing your transformations.
The following is an example usage of the ConveyorSparkSubmitOperatorV2
:
from conveyor.operators import ConveyorSparkSubmitOperatorV2
role = "job-role-{{ macros.conveyor.env() }}"
ConveyorSparkSubmitOperatorV2(
task_id="the-task-id",
num_executors=1,
driver_instance_type="mx.small",
executor_instance_type="mx.small",
aws_role=role,
java_class="name-of-the-class",
application="path-to-jar-in-container",
application_args=[
"--environment", "{{ macros.conveyor.env() }}",
"--snapshotDates", "2019-11-06"
]
)
The ConveyorSparkSubmitOperatorV2
extends the Airflow BaseOperator, so any parameter supported by the BaseOperator can also be set on the Conveyor operator.
More detail on the supported parameters can be found here
The Conveyor specific parameters supported by the ConveyorSparkSubmitOperatorV2
are:
Parameter | Type | Default | Explanation |
---|---|---|---|
image | str | Project image | Docker image you wish to launch, the default is the image and version of your project. If you want to use the image of another project you can use macros.conveyor.image('PROJECT_NAME') (templated). |
application | str | The application that submitted as a job, either a .jar or .py file (templated). | |
application_args | list | [] | Arguments for the application being submitted (templated). |
conf | dict | Arbitrary Spark configuration properties (templated), see default settings below. | |
java_class | str | The main class of the Java application, this is not needed when using PySpark. | |
num_executors | int | 2 | Number of executors to launch (templated). |
driver_instance_type | str | mx.small | The Conveyor instance type to use for the Spark driver. This specifies the CPU/Memory the driver can use. |
executor_instance_type | str | mx.small | The Conveyor instance type to use for the executors. This specifies the CPU/Memory the executors can use. |
executor_disk_size | int | The total disk size attached to the executor nodes. Depending on the instance type the size will be divided over 1, 2 or 4 disks. | |
aws_role | str | The AWS role to be used by both the driver and executors. | |
aws_availability_zone | str | The availability zone where the Spark application should run (e.g. eu-west-1a | |
azure_application_client_id | string | The Azure service principal used by the container. | |
env_vars | dict | Environment variables for the spark driver (templated). | |
instance_life_cycle | string | spot | The lifecycle of the instance used to run this job. Options are on-demand spot or driver-on-demand-executors-spot |
s3_committer | str | file | The S3 committer to be used by the Spark applications. Supported committers are file and magic . The magic S3 committer requires a Spark installation with hadoop-cloud support. This is provided by our ...hadoop-3.3.1-...-v2 |
abfs_committer | str | file | The ABFS committer to be used by the Spark applications. Supported committers are file and manifest . The manifest ABFS committer requires a Spark installation with hadoop-cloud support. This is provided by our ...hadoop-3.3.5-...-v1 |
mode | str | cluster | This settings allows you to change the Spark mode between cluster , cluster-v2 and local . For more info see here |
verbose | bool | False | This setting will configure spark-submit to run with the --verbose option, printing out fine-grained debugging information. |
In case the aws_role
or azure_application_client_id
is not supplied to the operator,
the default identity configured on the project will be used instead.
Setting CPU/Memory
You can use the driver_instance_type
and executor_instance_type
to set the CPU/Memory of your spark driver and
executors. We support this to make it easier to select CPU/Memory settings for your driver and executors you can set it
to the mx.micro
instance for example:
from conveyor.operators import ConveyorSparkSubmitOperatorV2
ConveyorSparkSubmitOperatorV2(
...,
driver_instance_type='mx.micro',
executor_instance_type='mx.micro',
)
When running Spark/PySpark applications, only a part of the total memory for the container is available for Spark itself. The details are described in the following tables:
- AWS
- Azure
Instance type | CPU | Total memory | Spark memory | PySpark memory |
---|---|---|---|---|
mx.micro | 1* | 0.875 Gb | 0.8 Gb | 0.6 Gb |
mx.small | 1* | 1.75 Gb | 1.6 Gb | 1.25 Gb |
mx.medium | 1 | 3.5 Gb | 3.2 Gb | 2.5 Gb |
mx.large | 2 | 7 Gb | 6.4 Gb | 5 Gb |
mx.xlarge | 4 | 14 Gb | 12.7 Gb | 10 Gb |
mx.2xlarge | 8 | 29 Gb | 26.7 Gb | 21 Gb |
mx.4xlarge | 16 | 59 Gb | 54 Gb | 42.4 Gb |
cx.medium | 1 | 1.75 Gb | 1.6 Gb | 1.25 Gb |
cx.large | 2 | 3.5 Gb | 3.2 Gb | 2.5 Gb |
cx.xlarge | 4 | 7 Gb | 6.4 Gb | 5 Gb |
cx.2xlarge | 8 | 14 Gb | 12.7 Gb | 10 Gb |
cx.4xlarge | 16 | 29 Gb | 26.7 Gb | 21 Gb |
rx.xlarge | 8 | 28 Gb | 26 Gb | 21 Gb |
rx.2xlarge | 16 | 59 Gb | 54 Gb | 43 Gb |
rx.4xlarge | 16 | 120 Gb | 112 Gb | 88 Gb |
Instance type | CPU | Total memory | Spark memory | PySpark memory |
---|---|---|---|---|
mx.micro | 1* | 0.868 Gb | 0.78 Gb | 0.60 Gb |
mx.small | 1* | 1.73 Gb | 1.56 Gb | 1.21 Gb |
mx.medium | 1 | 3.47 Gb | 3.12 Gb | 2.43 Gb |
mx.large | 2 | 6.94 Gb | 6.25 Gb | 4.86 Gb |
mx.xlarge | 4 | 13.89 Gb | 12.50 Gb | 9.72 Gb |
mx.2xlarge | 8 | 30.65 Gb | 27.58 Gb | 21.45 Gb |
mx.4xlarge | 16 | 64.16 Gb | 57.74 Gb | 44.91 Gb |
(*) These instance types don't get a guaranteed full CPU but only a slice of a full CPU. If the cluster has space for it, they are allowed to burst up to a full CPU.
As you can see from the tables, the supported executor memory configs change depending on using regular (Scala) Spark or PySpark.
The explanation for this can be found in the spark.kubernetes.memoryOverheadFactor
which can be found in the
Spark settings.
This setting is configured to 0.1 for JVM jobs (Scala and Java Spark), and to 0.4 for non-JVM jobs (PySpark, SparkR).
A portion of the memory is set aside for non-JVM things like: off-heap memory allocations, system-processes, Python, R...
Otherwise, your job would commonly fail with the error "Memory Overhead Exceeded".
Default Spark configuration
A number of configuration options are set by default, but can be overwritten by passing using the conf
dictionary:
Configuration | Default value |
---|---|
spark.kubernetes.container.image.pullPolicy | Always |
spark.kubernetes.pyspark.pythonVersion | 3 |
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version | 2 |
spark.hadoop.hive.metastore.client.factory.class (spark 2.4) | com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory |
spark.hadoop.hive.imetastoreclient.factory.class (spark 3.0) | com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory |
spark.hadoop.fs.s3.impl | org.apache.hadoop.fs.s3a.S3AFileSystem |
spark.hadoop.fs.s3a.aws.credentials.provider | com.amazonaws.auth.DefaultAWSCredentialsProviderChain |
spark.executor.extraJavaOptions | -Dlog4j.configuration=file:///opt/spark/log4j/log4j.properties -Dlog4j2.configurationFile=file:///opt/spark/log4j/log4j2.properties |
spark.driver.extraJavaOptions | -Dlog4j.configuration=file:///opt/spark/log4j/log4j-executor.properties -Dlog4j2.configurationFile=file:///opt/spark/log4j/log4j2-executor.properties |
spark.decommission.enabled | True (only if spark version >= 3.2.0 ) |
spark.decommission.killInterval | 120 (only if spark version >= 3.2.0 ) |
spark.decommission.forceKillTimeout | 180 (only if spark version >= 3.2.0 ) |
spark.storage.decommission.enabled | True (only if spark version >= 3.2.0 ) |
spark.storage.decommission.shuffleBlocks.enabled | True (only if spark version >= 3.2.0 ) |
spark.storage.decommission.rddBlocks.enabled | True (only if spark version >= 3.2.0 ) |
Spark 3 Support
Spark 3 introduced some changes on how Spark on Kubernetes works, including one breaking change.
When supplying the path to the jar in the container,
it should be prefixed with local://
to inform Spark that the jar is inside the docker container.
Spark 3 also introduced changes in the Hive/Glue support.
The ConveyorSparkSubmitOperatorV2 handles this correctly,
which means you do not need to pass the spark.hadoop.hive.metastore.client.factory.class
configuration yourself.
An example can be found here:
from conveyor.operators import ConveyorSparkSubmitOperatorV2
role = "job-role-{{ macros.conveyor.env() }}"
ConveyorSparkSubmitOperatorV2(
task_id="the-task-id",
num_executors=1,
driver_instance_type="mx.small",
executor_instance_type="mx.small",
aws_role=role,
java_class="name-of-the-class",
application="local://path-to-jar-in-container",
application_args=[
"--environment", "{{ macros.conveyor.env() }}",
"--snapshotDates", "2019-11-06"
]
)
Instance life cycle
An instance life cycle can be set on the operator. This setting controls whether your job will run on on-demand or on spot instances. Spot instances can result in discounts of up to 90% compared to on-demand prices. The downside is that your job can be canceled when AWS reclaims such a spot instance, which is what we call a spot interrupt. Fortunately, this does not happen frequently.
For Spark applications, we provide 3 options:
on-demand
: All containers will be run on on-demand instances. This is useful if your need your spark job to always succeed within a certain time limit or you will miss an SLA. As losing an executor on a spot instance might make your job take longer.spot
: All containers will be run on spot instances. This is the cheapest method, however your driver being killed by a spot interruption will fail your whole job.driver-on-demand-executors-spot
: Your driver will be run on on-demand instances, but your spark executors will run on spot instances. This is a good compromise between robustness and cost efficiency, this should probably be your default for production unless you have a very strict SLA.
The Airflow executor that is running your job will follow the instance life cycle of the driver. Meaning if you choose
instance life cycle spot
, the Airflow executor will run on spot instances. If you use on-demand
or driver-on-demand-executors-spot
the executor will run on on-demand instances.
Example configuration using instance life cycle:
from conveyor.operators import ConveyorSparkSubmitOperatorV2
ConveyorSparkSubmitOperatorV2(
...,
instance_life_cycle = 'driver-on-demand-executors-spot',
)
Environment variables
Similar to the Airflow operators, the Python SDK allows you to set environment variables for your applications. This is a convenient approach to pass slowly-changing or environment-dependent configuration to your applications.A simple example of a static environment variable looks as follows:
ConveyorSparkSubmitOperatorV2(
env_vars={
"STATIC_KEY": "hello world",
},
)
Sometimes these configurations contain sensitive information and should not be part of the task runner configuration. To support these use cases, you can load environment variables dynamically from a secrets store. We support the following stores:
AWS secrets
To access the secrets, the AWS IAM role configured in the task runner needs to have the permissions to access the secret. The following snippet shows how to use an AWS secret in your task runner:
from conveyor.secrets import AWSParameterStoreValue, AWSSecretsManagerValue
ConveyorSparkSubmitOperatorV2(
env_vars={
"USERNAME": AWSParameterStoreValue(name="/example/username"),
"PASSWORD": AWSSecretsManagerValue(name="example-password"),
},
iam_identity="role-with-access-to-secrets",
)
Both stores also support selecting properties from JSON stored secrets using jmesPath syntax. For example, when storing a secret as JSON:
{
"username": "ADMIN",
"password": "MYSECRETPASSWORD"
}
You can access the secret as follows:
from conveyor.secrets import AWSSecretsManagerValue
ConveyorSparkSubmitOperatorV2(
env_vars={
"USERNAME": AWSSecretsManagerValue(name="example", path="username"),
"PASSWORD": AWSSecretsManagerValue(name="example", path="password"),
},
iam_identity="role-with-access-to-secrets",
)
AWS Secrets Manager IAM access
To be able to access a secret from AWS Secrets Manager, you need to add the following actions to your IAM role:
secretsmanager:GetSecretValue
secretsmanager:DescribeSecret
You should scope these actions to the resulting resource for example:
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret"],
"Resource": ["arn:*:secretsmanager:*:*:secret:MySecret-??????"]
}]
}
The reason for the 6 ?
's is that AWS always adds 6 random characters at the end of the ARN of a secret. For more info look
at the AWS docs.
AWS Parameter Store IAM access
To be able to get secrets from the AWS SSM Parameter Store, you need to add the following actions to your IAM role:
ssm:GetParameters
ssm:GetParametersByPath
You should scope these actions to the correct parameter, for example:
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["ssm:GetParameters", "ssm:GetParametersByPath"],
"Resource": ["arn:*:ssm:*:*:parameter/my-parameter"]
}]
}
For more info, you can refer to the AWS docs.
Azure secrets
In order to access the secrets, you must give the Azure Client ID access to the Key Vault that contains your secrets. The following snippet shows how to use an Azure secret in your task runner:
from conveyor.secrets import AzureKeyVaultValue
ConveyorSparkSubmitOperatorV2(
env_vars={
"PASSWORD": AzureKeyVaultValue(name="mySecretKey",vault="myKeyVault",vault_type="secret")
},
iam_identity="azure-client-id-with-access-to-secrets",
)
The vault_type
indicates which type of resource the value is in your keyvault.
The following resource types exist: secret
, key
, certificate
.
The default value in the task runner is secret and can thus be omitted.
For more details, have a look at the Azure documentation
Azure Key Vault RBAC permissions
To be able to access a secret from Azure Key Vault, you need to provide your Azure application client ID access to your Key Vault. The recommended practice is to use Azure role based access control with Key Vaults. You can then give your application access by assigning both of the following Azure roles:
Key Vault Reader
: allows metadata operations on the Key Vault but not reading sensitive valuesKey Vault Secrets User
: allows to read sensitive values in the Key Vault
For more details take a look at the Azure documentation
An example of how to do this in terraform can look as follows:
resource "azuread_application" "azure_application" {
display_name = "azure-application"
}
resource "azuread_service_principal" "azure_application" {
client_id = azuread_application.azure_application.client_id
app_role_assignment_required = false
}
resource "azurerm_role_assignment" "keyvault_read_project" {
scope = var.azure_keyvault_id
role_definition_name = "Key Vault Reader"
principal_id = azuread_service_principal.azure_application.id
}
resource "azurerm_role_assignment" "keyvault_read_secret_project" {
scope = var.azure_keyvault_id
role_definition_name = "Key Vault Secrets User"
principal_id = azuread_service_principal.azure_application.id
}
Executor disk size
The variable executor_disk_size
makes sure you have a certain disk size available on your executors.
By default, the mounted disk space accessible by your Spark application is potentially shared with other containers, but also with Kubernetes.
The machines where your applications run on come with a 100Gb disk attached by default. Your job can use a percentage of this volume.
By supplying the executor_disk_size
setting, an extra volume exclusive to your container gets attached to the machine.
This means you can be sure about both the size of the volume and its performance.
For larger instances, more disks will be mounted automatically in order to increase performance.
- AWS
- Azure
On AWS, GP3 volumes are used. These volumes have 125MB/s throughput and 3000 IOPS by default, resulting in the following numbers.
Container instance | number of disks | expected throughput | expected IOPS |
---|---|---|---|
mx.xlarge (or smaller) | 1 | 125 MB/s | 3000 |
mx.x2large | 2 | 250 MB/s | 6000 |
mx.x4large | 4 | 500 MB/s | 12000 |
On Azure, standard SSDs are used. We recommend referring to the table provided in the Azure docs for info on the IOPS and throughput.
For most smaller volumes we see max burst throughput of 150MB/s, and 600 IOPS.
Container instance | number of disks | max burst throughput | max burst IOPS |
---|---|---|---|
mx.xlarge (or smaller) | 1 | 150 MB/s | 600 |
mx.x2large | 2 | 300 MB/s | 1200 |
mx.x4large | 4 | 600 MB/s | 2400 |
Mode
from conveyor.operators import ConveyorSparkSubmitOperatorV2
ConveyorSparkSubmitOperatorV2(
...,
mode='cluster',
)
There are currently 3 modes available for spark:
cluster
: the default mode for spark jobs, it will launch a spark submitter pod, that will launch the driver and that will launch the executors.cluster-v2
: this mode used by default when running withconveyor run
, and can be configured in the operator. It will directly launch the driver pod, cutting down on startup costs. It will also start placeholder executor pods that will boot up nodes immediately for your Spark job, once the driver launches the real spark executors these placeholder executors will be replaced. This mode will be max 3 minutes fast when new nodes will be launched on AWS.local
: this mode launches a single driver pod without executors, it is fast but only useful for small Spark jobs, but it the fastest way to launch spark jobs