Spark Streaming
Config
This section explains all configuration options available for spark streaming. To configure your jobs you should edit/create the streaming.yaml file in the root of your project. An example file with two jobs: producer and consumer, looks like this:
streamingApplications:
- name: producer
sparkSpec:
numberOfExecutors: 1
driverInstanceType: mx.micro
executorInstanceType: mx.micro
application: "local:///opt/spark/work-dir/src/pysparkstreaming/producer.py"
applicationArgs:
- --env
- "{{ .Env }}"
awsRole: "pyspark_streaming-{{ .Env }}"
- name: consumer
sparkSpec:
numberOfExecutors: 1
driverInstanceType: mx.micro
executorInstanceType: mx.micro
application: "local:///opt/spark/work-dir/src/pysparkstreaming/counting.py"
applicationArgs:
- --env
- "{{ .Env }}"
awsRole: "pyspark_streaming-{{ .Env }}"
In this example we can see that the streaming.yaml
file consist of a list of streaming applications. Every streaming
application needs to be passed a unique name, and a spark spec.
The following settings are available:
Parameter | Type | Default | Explanation |
---|---|---|---|
application | str | The application that submitted as a job, either a .jar or .py file. | |
applicationArgs | list | [] | Arguments for the application being submitted. |
sparkConfig | map | Arbitrary Spark configuration properties, see default settings below. | |
javaClass | str | The main class of the Java application, this is not needed when using PySpark. | |
awsRole | str | The aws role to be used by both the driver and executors. For more info see Operators AWS Role. | |
azure_application_client_id | str | The azure ad service principal used by the container. For more info see Operators Azure Service Principal. | |
driverInstanceType | str | mx.small | The Conveyor instance type to use for the Spark driver. This specifies the CPU/Memory the driver can use. |
executorInstanceType | str | mx.small | The Conveyor instance type to use for the executors. This specifies the CPU/Memory the executors can use. |
numberOfExecutors | int | 2 | Number of executors to launch. |
instanceLifeCycle | string | driver-on-demand-executors-spot | The lifecycle of the instance used to run this job. Options are on-demand , spot or driver-on-demand-executors-spot . |
s3Committer | str | file | The S3 committer to be used by the Spark applications. Supported committers are file and magic . The magic S3 committer requires a Spark installation build with hadoop-cloud support. Our ...hadoop-3.3.1-...-v2 images and later support this. For more information on S3 committers, see here. |
envVariables | map | Extra environment variables or secrets you want to mount for your Spark application |
Templating
In the streaming specification, you can also apply templating. This is useful if you want to change certain settings according to the environment you are deploying to.
We support filling in the environment name by using {{ .Env }}
. For example:
streamingApplications:
- name: producer
sparkSpec:
application: "local:///opt/spark/work-dir/src/pysparkstreaming/producer.py"
applicationArgs:
- --env
- "{{ .Env }}"
You can also do an if test by using the following pattern:
{{ if eq .Env "ENVIRONMENT" }} VALUE1 {{ else }} VALUE1 {{ end }}
.
For example, if you want to use bigger instances in the production environment, you can do the following:
streamingApplications:
- name: producer
sparkSpec:
application: "local:///opt/spark/work-dir/src/pysparkstreaming/producer.py"
executorInstanceType: {{ if eq .Env "production" }} mx.xlarge {{ else }} mx.micro {{ end }}
You can also define new variables in the template that you can use in multiple places. This is useful if you have multiple applications using the same role. For example:
{{ $role := printf "spark-streaming-%s" .Env }}
streamingApplications:
- name: producer
sparkSpec:
application: "local:///opt/spark/work-dir/src/pysparkstreaming/producer.py"
awsRole: "{{ $role }}"
The example here will result in the role spark-streaming-dev
when deployed to an environment called dev
.
The underlying templating engine used is the golang templating engine, the docs of that can be found here.
Default Spark configuration
A number of configuration options are set by default, but can be overwritten by passing using the conf
dictionary:
Configuration | Default value |
---|---|
spark.kubernetes.container.image.pullPolicy | Always |
spark.kubernetes.pyspark.pythonVersion | 3 |
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version | 2 |
spark.hadoop.hive.imetastoreclient.factory.class | com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory |
spark.hadoop.fs.s3.impl | org.apache.hadoop.fs.s3a.S3AFileSystem |
spark.hadoop.fs.s3a.aws.credentials.provider | com.amazonaws.auth.DefaultAWSCredentialsProviderChain |
spark.streaming.driver.writeAheadLog.closeFileAfterWrite | true |
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite | true |
spark.hadoop.fs.s3a.downgrade.syncable.exceptions | true |
Instances
When running Spark/PySpark applications, only a part of the total memory for the container is available for Spark itself. The details are described in the following tables:
- AWS
- Azure
Instance type | CPU | Total memory | Spark memory | PySpark memory |
---|---|---|---|---|
mx.micro | 1* | 0.875 Gb | 0.8 Gb | 0.6 Gb |
mx.small | 1* | 1.75 Gb | 1.6 Gb | 1.25 Gb |
mx.medium | 1 | 3.5 Gb | 3.2 Gb | 2.5 Gb |
mx.large | 2 | 7 Gb | 6.4 Gb | 5 Gb |
mx.xlarge | 4 | 14 Gb | 12.7 Gb | 10 Gb |
mx.2xlarge | 8 | 29 Gb | 26.7 Gb | 21 Gb |
mx.4xlarge | 16 | 59 Gb | 54 Gb | 42.4 Gb |
cx.medium | 1 | 1.75 Gb | 1.6 Gb | 1.25 Gb |
cx.large | 2 | 3.5 Gb | 3.2 Gb | 2.5 Gb |
cx.xlarge | 4 | 7 Gb | 6.4 Gb | 5 Gb |
cx.2xlarge | 8 | 14 Gb | 12.7 Gb | 10 Gb |
cx.4xlarge | 16 | 29 Gb | 26.7 Gb | 21 Gb |
rx.xlarge | 8 | 28 Gb | 26 Gb | 21 Gb |
rx.2xlarge | 16 | 59 Gb | 54 Gb | 43 Gb |
rx.4xlarge | 16 | 120 Gb | 112 Gb | 88 Gb |
Instance type | CPU | Total memory | Spark memory | PySpark memory |
---|---|---|---|---|
mx.micro | 1* | 0.868 Gb | 0.78 Gb | 0.60 Gb |
mx.small | 1* | 1.73 Gb | 1.56 Gb | 1.21 Gb |
mx.medium | 1 | 3.47 Gb | 3.12 Gb | 2.43 Gb |
mx.large | 2 | 6.94 Gb | 6.25 Gb | 4.86 Gb |
mx.xlarge | 4 | 13.89 Gb | 12.50 Gb | 9.72 Gb |
mx.2xlarge | 8 | 30.65 Gb | 27.58 Gb | 21.45 Gb |
mx.4xlarge | 16 | 64.16 Gb | 57.74 Gb | 44.91 Gb |
(*) These instance types don't get a guaranteed full CPU but only a slice of a full CPU. If the cluster has space for it, they are allowed to burst up to a full CPU.
As you can see from the tables, the supported executor memory configs change depending on using regular (Scala) Spark or PySpark.
The explanation for this can be found in the spark.kubernetes.memoryOverheadFactor
which can be found in the
Spark settings.
This setting is configured to 0.1 for JVM jobs (Scala and Java Spark), and to 0.4 for non-JVM jobs (PySpark, SparkR).
A portion of the memory is set aside for non-JVM things like: off-heap memory allocations, system-processes, Python, R...
Otherwise, your job would commonly fail with the error "Memory Overhead Exceeded".
Instance life cycle
On the streaming application we can set an instance life cycle. This will result in your job running on on-demand or on spot instances. Spot instances can result in discounts up to 90% compared to on-demand prices. However, your job can be canceled when AWS reclaims such a spot instance, luckily this does not happen frequently.
For Spark, we support 3 options:
on-demand
: All containers will be run on on-demand instances. This is useful if your need your spark streaming job needs to always run.spot
: All containers will be run on spot instances. This is the cheapest method, however your driver being killed by a spot interruption will fail your whole job, your job will be automatically restarted by Conveyor. However starting up a new spark application takes time, so for very time critical applications this might be an issue.driver-on-demand-executors-spot
: Your driver will be run on on-demand instances, but your spark executors will run on spot instances. This is a good compromise between robustness and cost efficiency, this should probably be your default for production unless you have a very strict SLA. This is the default for spark streaming.
Env variables
We support adding environment variables to your Spark application. These can plain values but also secrets coming from SSM or Secrets Manager. The last 2 make it possible to mount and expose secrets securely into your application.
Specifying environment variables for your Spark applications is done as follows:
notebooks:
- name: producer
sparkSpec:
envVariables:
foo:
value: bar
testSSM:
awsSSMParameterStore:
name: /conveyor-dp-samples
testSecretManager:
awsSecretsManager:
name: conveyor-dp-samples
In order to mount secrets, the awsRole attached to the application, should have permissions to access the secrets. Adding a policy to an awsRole to read ssm parameters/secrets with Terraform is done as follows:
data "aws_iam_policy_document" "allow secrets" {
statement {
actions = [
"ssm:GetParametersByPath",
"ssm:GetParameters",
"ssm:GetParameter",
]
resources = [
"arn:aws:ssm:Region:AccountId:parameter/conveyor-dp-samples/*"
]
effect = "Allow"
}
statement {
actions = [
"secretsmanager:DescribeSecret",
"secretsmanager:List*",
"secretsmanager:GetSecretValue"
]
resources = [
"arn:aws:secretsmanager:Region:AccountId:secret:conveyor-dp-samples"
]
effect = "Allow"
}
}
Alerting
If you want to be alerted that a streaming job is failing, you can add an alerting config to the streaming.yaml
:
alerting:
enabled: true
emails:
- youremail@gmail.com
streamingApplications:
...
Alerting configuration
Parameter | Type | Default | Explanation |
---|---|---|---|
enabled | bool | false | Enable alerting for this streaming application |
emails | list(string) | The list of emails to send alerts to | |
restartsThreshold | int32 | 1 | The amount of failures needed in the restartsWindow before we send an alert |
restartsWindow | duration | 1h | The size of the windows in which failures are taking into account to send an alert. The default is 1 hour, this is also the minimum |
restartsAlertCoolDown | duration | 24h | Instead of every restart possibly triggering an alert we only send 1 in the cooldown period, if the cooldown period is over we resend the alert. The default is 24 hours, the minimum is 2 hours |
By default, we will send you an email when your job has crashed more than once in an hour. We will only warn you about this every 24 hours, so you don't get overwhelmed with emails. Once your application is working properly again, the cooldown timer of 24 hours is reset.
You can configure these using the restartsThreshold
, restartsWindow
and restartsAlertCoolDown
parameters.
The durations are specified using the default string version of the golang duration type. Here are some examples:
1h // This is 1 hour
2h // This is 2 hours
24h // This is 24 hours or one day
Here is a full example that modifies all parameters:
alerting:
enabled: true
emails:
- youremail@gmail.com
restartsThreshold: 2
restartsWindow: 2h
restartsAlertCoolDown: 4h
streamingApplications:
...
Using templating in alerting
As you saw in the example, you have to explicitly set the boolean enabled to true
before it starts sending alerts.
This is so you can use templating to enable alerting in certain environments only.
For example:
alerting:
enabled: {{ if eq .Env "production" }} true {{ else }} false {{ end }}
...
Using templating, we can say which environments are important enough to have alerts sent to us, one can also supply multiple environments as in the following example:
alerting:
enabled: {{ if or (eq .Env "production") (eq .Env "development") (eq .Env "staging") }} true {{ else }} false {{ end }}
...
Through this pattern, you can define as many environments as you want.