Using secrets in the Conveyor operators
Description
Often your job needs to have access to sensitive values (e.g. password for a database connection), which you do not want to commit into git nor write in plain text in your Airflow DAGs. In order to solve this, Conveyor integrates with AWS Secrets Manager and AWS Parameter Store. This makes it possible to fetch sensitive values while running the Conveyor job without the need to specify the value explicitly.
In this example, we will use the AWS parameter store but the same steps can be applied for AWS secrets manager. For more details on how to specify the secret manager secret in Airflow and which IAM permissions to add to your role, please have a look at the AWS Secrets Manager reference.
If you are using Azure, the technical details on how to use secrets is described in the Azure secrets reference. A how-to guide for Azure will be added later but the steps are the same as for AWS.
Usage
Create the secret in AWS
The first step is to create a sensitive value in AWS parameter store. Since you typically do not want to include this value into source control, it is often a manual process to add it in the AWS console as described here:
- Go to systems manager in the AWS console
- Choose parameter store in the navigation panel
- Create parameter
- Fill in the name (e.g.
/Conveyor/HelloWorld
) - Fill in the value (e.g.
My first parameter
) and set the dataType to aSecureString
as if it was a sensitive value - (Optional) description
- Fill in the name (e.g.
- Click on Create parameter
Use the secret in Airflow
Now that your parameter is created, we can start using it in Airflow, but first we need a Conveyor project. Let's create a new project starting from the Python template as that is the simplest. Create the project as follows:
conveyor project create --name sample-secrets --template python
On the command prompts, specify the following values:
- cloud:
aws
(For Azure we currently do not support integration with Azure Keyvaults) - conveyor_managed_role:
Yes
- dev_environment:
local
Now that you have the project, let's use mount the secret as an environment variable in the ConveyorContainerOperatorV2
.
Change the content in the dags/sample-secrets.py
as follows:
from airflow import DAG
from conveyor.operators import ConveyorContainerOperatorV2
from conveyor.secrets import AWSParameterStoreValue
from datetime import datetime, timedelta
default_args = {
"owner": "Conveyor",
"depends_on_past": False,
"start_date": datetime(year=2023, month=1, day=24),
"email": [],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"sample-secrets", default_args=default_args, schedule_interval="@daily", max_active_runs=1
)
ConveyorContainerOperatorV2(
dag=dag,
task_id="sample",
cmds=["python"],
arguments=["-m", "samplesecrets.sample", "--date", "{{ ds }}", "--env", "{{ macros.conveyor.env() }}"],
instance_type="mx.micro",
aws_role="sample-secrets-{{ macros.conveyor.env() }}",
env_vars={"HELLO_WORLD": AWSParameterStoreValue(name="/Conveyor/HelloWorld"),},
)
In the code we assumed that you created the parameter /Conveyor/HelloWorld
.
If you used another parameter, please change the code accordingly.
Next change the src/samplesecrets/sample.py
to also print the HELLO_WORLD
environment variable such that we can validate that everything works.
import argparse
import logging
import sys
import os
import requests
from typing import Optional
def main():
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
parser = argparse.ArgumentParser(description="sample-secrets")
parser.add_argument(
"-d", "--date", dest="date", help="date in format YYYY-mm-dd", required=True
)
parser.add_argument(
"-e", "--env", dest="env", help="environment we are executing in", required=True
)
args = parser.parse_args()
logging.info(f"Using args: {args}")
logging.info(f"Environment variable: {os.environ['HELLO_WORLD']}")
run(args.env, args.date)
def run(env: str, date: str):
"""Main ETL script definition.
:return: None
"""
# execute ETL pipeline
data = extract_data()
logging.info("Downloaded the weather info, now loading it")
if data is None:
logging.error("Received no weather data")
load_data(data, env)
def extract_data() -> Optional[str]:
"""
Gets the data from the open weather map api and returns the result.
:return: The weather data
"""
return requests.get(
"https://samples.openweathermap.org/data/2.5/weather?q=Leuven&appid=b6907d289e10d714a6e88b30761fae22"
).json()
def load_data(data: str, env: str):
"""Writes the data on s3
:param data: The string to write
:param env: The environment
:return: None
"""
print(data)
# Uncomment the following block to write to S3
# s3 = boto3.resource("s3")
# s3_object = s3.Object("DEFAULT_S3", f"raw/weather/ds={config.date}/weather.json")
# s3_object.put(Body=r.text)
if __name__ == "__main__":
main()
The last step is to make sure that our container has permissions to fetch the parameter from AWS parameter store,
for this we must add several permissions to the sample-secrets-{{ macros.conveyor.env() }}
role.
Since we specified that Conveyor manages our roles, you will have some terraform code in the resources
directory.
Add the following code to the resources/sample-secrets.tf
file:
resource "aws_iam_role_policy" "secret_access" {
name = "secret-access"
role = aws_iam_role.default.id
policy = data.aws_iam_policy_document.secret_access.json
}
data "aws_iam_policy_document" "secret_access" {
statement {
actions = [
"ssm:GetParameters",
"ssm:GetParametersByPath",
]
resources = [
"arn:aws:ssm:*:*:parameter/Conveyor/HelloWorld",
]
effect = "Allow"
}
}
In the code we assumed that you created the parameter /Conveyor/HelloWorld
.
If you used another parameter, please change the code accordingly.
Deploy your code
Now you can build and deploy your code:
conveyor build && conveyor deploy --env <environment_name>
Activate the workflow
In the Conveyor UI navigate to your environment, by clicking environment on the left and clicking on your environment in the list. This will automatically open Airflow.
By default, your workflow will be disabled. Enable it by selecting the toggle on the left next to your DAG. Airflow will now start scheduling your project, one run for each day since the start date you specified when creating the project. For more details on Airflow as well as how Conveyor integrates with Airflow, take a look here
Check the logs for your job
Go to the Task executions
tab of your environment and click on the logs icon for your succeeded sample
task.
In the logs you should see a line INFO:root:Environment variable: My first parameter
if you used the default value.
If you used another value for your parameter, it should print that one instead.
Next steps
This tutorial focussed on the simple use case of exposing a secret as an environment variable.
For more information on how to use complex secret values,
take a look at the technical reference.
As mentioned, the same mechanism is supported by both the ConveyorSparkSubmitOperatorV2
and the ConveyorDbtTaskFactory
.