Skip to main content

Spark Streaming

Config

This section explains all configuration options available for spark streaming. To configure your jobs you should edit/create the streaming.yaml file in the root of your project. An example file with two jobs: producer and consumer, looks like this:

streamingApplications:
- name: producer
sparkSpec:
numberOfExecutors: 1
driverInstanceType: mx.micro
executorInstanceType: mx.micro
application: "local:///opt/spark/work-dir/src/pysparkstreaming/producer.py"
applicationArgs:
- --env
- "{{ .Env }}"
awsRole: "pyspark_streaming-{{ .Env }}"
- name: consumer
sparkSpec:
numberOfExecutors: 1
driverInstanceType: mx.micro
executorInstanceType: mx.micro
application: "local:///opt/spark/work-dir/src/pysparkstreaming/counting.py"
applicationArgs:
- --env
- "{{ .Env }}"
awsRole: "pyspark_streaming-{{ .Env }}"

In this example we can see that the streaming.yaml file consist of a list of streaming applications. Every streaming application needs to be passed a unique name, and a spark spec.

The following settings are available:

ParameterTypeDefaultExplanation
applicationstrThe application that submitted as a job, either a .jar or .py file.
applicationArgslist[]Arguments for the application being submitted.
sparkConfigmapArbitrary Spark configuration properties, see default settings below.
javaClassstrThe main class of the Java application, this is not needed when using PySpark.
awsRolestrThe aws role to be used by both the driver and executors. For more info see Operators AWS Role.
azure_application_client_idstrThe azure ad service principal used by the container. For more info see Operators Azure Service Principal.
driverInstanceTypestrmx.smallThe Conveyor instance type to use for the Spark driver. This specifies the CPU/Memory the driver can use.
executorInstanceTypestrmx.smallThe Conveyor instance type to use for the executors. This specifies the CPU/Memory the executors can use.
numberOfExecutorsint2Number of executors to launch.
instanceLifeCyclestringdriver-on-demand-executors-spotThe lifecycle of the instance used to run this job. Options are on-demand, spot or driver-on-demand-executors-spot.
s3CommitterstrfileThe S3 committer to be used by the Spark applications. Supported committers are file and magic. The magic S3 committer requires a Spark installation build with hadoop-cloud support. Our ...hadoop-3.3.1-...-v2 images and later support this. For more information on S3 committers, see here.
envVariablesmapExtra environment variables or secrets you want to mount for your Spark application

Templating

In the streaming specification, you can also apply templating. This is useful if you want to change certain settings according to the environment you are deploying to.

We support filling in the environment name by using {{ .Env }}. For example:

streamingApplications:
- name: producer
sparkSpec:
application: "local:///opt/spark/work-dir/src/pysparkstreaming/producer.py"
applicationArgs:
- --env
- "{{ .Env }}"

You can also do an if test by using the following pattern: {{ if eq .Env "ENVIRONMENT" }} VALUE1 {{ else }} VALUE1 {{ end }}. For example, if you want to use bigger instances in the production environment, you can do the following:

streamingApplications:
- name: producer
sparkSpec:
application: "local:///opt/spark/work-dir/src/pysparkstreaming/producer.py"
executorInstanceType: {{ if eq .Env "production" }} mx.xlarge {{ else }} mx.micro {{ end }}

You can also define new variables in the template that you can use in multiple places. This is useful if you have multiple applications using the same role. For example:

{{ $role := printf "spark-streaming-%s" .Env }}
streamingApplications:
- name: producer
sparkSpec:
application: "local:///opt/spark/work-dir/src/pysparkstreaming/producer.py"
awsRole: "{{ $role }}"

The example here will result in the role spark-streaming-dev when deployed to an environment called dev.

The underlying templating engine used is the golang templating engine, the docs of that can be found here.

Default Spark configuration

A number of configuration options are set by default, but can be overwritten by passing using the conf dictionary:

ConfigurationDefault value
spark.kubernetes.container.image.pullPolicyAlways
spark.kubernetes.pyspark.pythonVersion3
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version2
spark.hadoop.hive.imetastoreclient.factory.classcom.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory
spark.hadoop.fs.s3.implorg.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.aws.credentials.providercom.amazonaws.auth.DefaultAWSCredentialsProviderChain
spark.streaming.driver.writeAheadLog.closeFileAfterWritetrue
spark.streaming.receiver.writeAheadLog.closeFileAfterWritetrue
spark.hadoop.fs.s3a.downgrade.syncable.exceptionstrue

Instances

When running Spark/PySpark applications, only a part of the total memory for the container is available for Spark itself. The details are described in the following tables:

AWS

Instance typeCPUTotal Memory (AWS)Spark memory (AWS)PySpark memory (AWS)
mx.micro1*0.875 Gb0.8 Gb0.6 Gb
mx.small1*1.75 Gb1.6 Gb1.25 Gb
mx.medium13.5 Gb3.2 Gb2.5 Gb
mx.large27 Gb6.4 Gb5 Gb
mx.xlarge414 Gb12.7 Gb10 Gb
mx.2xlarge829 Gb26.7 Gb21 Gb
mx.4xlarge1659 Gb54 Gb42.4 Gb
cx.medium11.75 Gb1.6 Gb1.25 Gb
cx.large23.5 Gb3.2 Gb2.5 Gb
cx.xlarge47 Gb6.4 Gb5 Gb
cx.2xlarge814 Gb12.7 Gb10 Gb
cx.4xlarge1629 Gb26.7 Gb21 Gb
rx.xlarge828 Gb26 Gb21 Gb
rx.2xlarge1659 Gb54 Gb43 Gb
rx.4xlarge16120 Gb112 Gb88 Gb
info

(*) These instance types don't get a guaranteed full CPU but only a slice of a full CPU, but they are allowed to burst up to a full CPU if the cluster allows.

Azure

Instance typeCPUTotal Memory (Azure)Spark memory (Azure)PySpark memory (Azure)
mx.micro1*0.75 Gb0.69 Gb0.55 Gb
mx.small1*1.5 Gb1.38 Gb1.1 Gb
mx.medium13 Gb2.75 Gb2.15 Gb
mx.large26 Gb5.5 Gb4.3 Gb
mx.xlarge412 Gb11 Gb8.6 Gb
mx.2xlarge826 Gb23.6 Gb18.6 Gb
mx.4xlarge1655 Gb50 Gb35.7 Gb
info

(*) These instance types don't get a guaranteed full CPU but only a slice of a full CPU, but they are allowed to burst up to a full CPU if the cluster allows.

As you can see from the tables, the supported executor memory configs change depending on using regular (Scala) Spark or PySpark. The explanation for this can be found in the spark.kubernetes.memoryOverheadFactor which can be found in the Spark settings. This setting is configured to 0.1 for JVM jobs (Scala and Java Spark), and to 0.4 for non-JVM jobs (PySpark, SparkR). A portion of the memory is set aside for non-JVM things like: off-heap memory allocations, system-processes, Python, R... Otherwise, your job would commonly fail with the error "Memory Overhead Exceeded".

Instance life cycle

On the streaming application we can set an instance life cycle. This will result in your job running on on-demand or on spot instances. Spot instances can result in discounts up to 90% compared to on-demand prices. However, your job can be canceled when AWS reclaims such a spot instance, luckily this does not happen frequently.

For Spark, we support 3 options:

  • on-demand: All containers will be run on on-demand instances. This is useful if your need your spark streaming job needs to always run.
  • spot: All containers will be run on spot instances. This is the cheapest method, however your driver being killed by a spot interruption will fail your whole job, your job will be automatically restarted by Conveyor. However starting up a new spark application takes time, so for very time critical applications this might be an issue.
  • driver-on-demand-executors-spot: Your driver will be run on on-demand instances, but your spark executors will run on spot instances. This is a good compromise between robustness and cost efficiency, this should probably be your default for production unless you have a very strict SLA. This is the default for spark streaming.

Env variables

We support adding environment variables to your Spark application. These can plain values but also secrets coming from SSM or Secrets Manager. The last 2 make it possible to mount and expose secrets securely into your application.

Specifying environment variables for your Spark applications is done as follows:

notebooks:
- name: producer
sparkSpec:
envVariables:
foo:
value: bar
testSSM:
awsSSMParameterStore:
name: /conveyor-dp-samples
testSecretManager:
awsSecretsManager:
name: conveyor-dp-samples

In order to mount secrets, the awsRole attached to the application, should have permissions to access the secrets. Adding a policy to an awsRole to read ssm parameters/secrets with Terraform is done as follows:

data "aws_iam_policy_document" "allow secrets" {
statement {
actions = [
"ssm:GetParametersByPath",
"ssm:GetParameters",
"ssm:GetParameter",
]
resources = [
"arn:aws:ssm:Region:AccountId:parameter/conveyor-dp-samples/*"
]
effect = "Allow"
}

statement {
actions = [
"secretsmanager:DescribeSecret",
"secretsmanager:List*",
"secretsmanager:GetSecretValue"
]
resources = [
"arn:aws:secretsmanager:Region:AccountId:secret:conveyor-dp-samples"
]
effect = "Allow"
}
}

Alerting

If you want to be alerted that a streaming job is failing, you can add an alerting config to the streaming.yaml:

alerting:
enabled: true
emails:
- youremail@gmail.com
streamingApplications:
...

Alerting configuration

ParameterTypeDefaultExplanation
enabledboolfalseEnable alerting for this streaming application
emailslist(string)The list of emails to send alerts to
restartsThresholdint321The amount of failures needed in the restartsWindow before we send an alert
restartsWindowduration1hThe size of the windows in which failures are taking into account to send an alert. The default is 1 hour, this is also the minimum
restartsAlertCoolDownduration24hInstead of every restart possibly triggering an alert we only send 1 in the cooldown period, if the cooldown period is over we resend the alert. The default is 24 hours, the minimum is 2 hours

By default, we will send you an email when your job has crashed more than once in an hour. We will only warn you about this every 24 hours, so you don't get overwhelmed with emails. Once your application is working properly again, the cooldown timer of 24 hours is reset.

You can configure these using the restartsThreshold, restartsWindow and restartsAlertCoolDown parameters. The durations are specified using the default string version of the golang duration type. Here are some examples:

1h // This is 1 hour
2h // This is 2 hours
24h // This is 24 hours or one day

Here is a full example that modifies all parameters:

alerting:
enabled: true
emails:
- youremail@gmail.com
restartsThreshold: 2
restartsWindow: 2h
restartsAlertCoolDown: 4h
streamingApplications:
...

Using templating in alerting

As you saw in the example, you have to explicitly set the boolean enabled to true before it starts sending alerts. This is so you can use templating to enable alerting in certain environments only. For example:

alerting:
enabled: {{ if eq .Env "production" }} true {{ else }} false {{ end }}
...

Using templating, we can say which environments are important enough to have alerts sent to us, one can also supply multiple environments as in the following example:

alerting:
enabled: {{ if or (eq .Env "production") (eq .Env "development") (eq .Env "staging") }} true {{ else }} false {{ end }}
...

Through this pattern, you can define as many environments as you want.