Skip to main content

Using secrets in ConveyorContainerOperatorV2

Description

Often your job needs to have access to sensitive values (e.g. password for a database connection), which you do not want to commit into git nor write in plain text in your Airflow DAGs. In order to solve this, Conveyor integrates with AWS Secrets Manager and AWS Parameter Store. This makes it possible to fetch sensitive values while running the Conveyor job without the need to specify the value explicitly.

note

In this example, we will use the AWS parameter store but the same steps can be applied for AWS secrets manager. For more details on how to specify the secret manager secret in Airflow and which IAM permissions to add to your role, take a look here

info

If you are using Azure, the technical details on how to use secrets is described here. A howto guide for Azure will be added later but the steps are the same as for AWS.

Usage

Create the secret in AWS

The first step is to create a sensitive value in AWS parameter store. Since you do not want to check this value into source control, it is often a manual process to add it in the AWS console as described here:

  • Go to systems manager in the AWS console
  • Choose parameter store in the navigation panel
  • Create parameter
    • Fill in the name (e.g. /Conveyor/HelloWorld)
    • Fill in the value (e.g. My first parameter) and set the dataType to a SecureString as if it was a sensitive value
    • (Optional) description
  • Click on Create parameter

Use the secret in Airflow

Now that your parameter is created, we can start using it in Airflow, but first we need a Conveyor project. Let's create a new project starting from the Python template as that is the simplest. Create the project as follows:

conveyor project create --name sample-secrets --template python

On the command prompts, specify the following values:

  • cloud: aws (For Azure we currently do not support integration with Azure Keyvaults)
  • conveyor_managed_role: Yes
  • dev_environment: local

Now that you have the project, let's use mount the secret as an environment variable in the ConveyorContainerOperatorV2. Change the content in the dags/sample-secrets.py as follows:

from airflow import DAG
from conveyor.operators import ConveyorContainerOperatorV2
from conveyor.secrets import AWSParameterStoreValue
from datetime import datetime, timedelta


default_args = {
"owner": "Conveyor",
"depends_on_past": False,
"start_date": datetime(year=2023, month=1, day=24),
"email": [],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"retry_delay": timedelta(minutes=5),
}

dag = DAG(
"sample-secrets", default_args=default_args, schedule_interval="@daily", max_active_runs=1
)

ConveyorContainerOperatorV2(
dag=dag,
task_id="sample",
cmds=["python"],
arguments=["-m", "samplesecrets.sample", "--date", "{{ ds }}", "--env", "{{ macros.conveyor.env() }}"],
instance_type="mx.micro",
aws_role="sample-secrets-{{ macros.conveyor.env() }}",
env_vars={"HELLO_WORLD": AWSParameterStoreValue(name="/Conveyor/HelloWorld"),},
)
important

In the code we assumed that you created the parameter /Conveyor/HelloWorld. If you used another parameter, change the code accordingly

Next change the src/samplesecrets/sample.py to also print the HELLO_WORLD environment variable such that we can validate that everything works.

import argparse
import logging
import sys
import os

import requests
from typing import Optional


def main():
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
parser = argparse.ArgumentParser(description="sample-secrets")
parser.add_argument(
"-d", "--date", dest="date", help="date in format YYYY-mm-dd", required=True
)
parser.add_argument(
"-e", "--env", dest="env", help="environment we are executing in", required=True
)
args = parser.parse_args()
logging.info(f"Using args: {args}")
logging.info(f"Environment variable: {os.environ['HELLO_WORLD']}")
run(args.env, args.date)


def run(env: str, date: str):
"""Main ETL script definition.

:return: None
"""
# execute ETL pipeline
data = extract_data()
logging.info("Downloaded the weather info, now loading it")
if data is None:
logging.error("Received no weather data")
load_data(data, env)


def extract_data() -> Optional[str]:
"""
Gets the data from the open weather map api and returns the result.
:return: The weather data
"""
return requests.get(
"https://samples.openweathermap.org/data/2.5/weather?q=Leuven&appid=b6907d289e10d714a6e88b30761fae22"
).json()


def load_data(data: str, env: str):
"""Writes the data on s3

:param data: The string to write
:param env: The environment
:return: None
"""
print(data)
# Uncomment the following block to write to S3
# s3 = boto3.resource("s3")
# s3_object = s3.Object("DEFAULT_S3", f"raw/weather/ds={config.date}/weather.json")
# s3_object.put(Body=r.text)


if __name__ == "__main__":
main()

The last step is to make sure that our container has permissions to fetch the parameter from AWS parameter store, for this we must add several permissions to the sample-secrets-{{ macros.conveyor.env() }} role. Since we specified that Conveyor manages our roles, you will have some terraform code in the resources directory. Add the following code to the resources/sample-secrets.tf file:

resource "aws_iam_role_policy" "secret_access" {
name = "secret-access"
role = aws_iam_role.default.id
policy = data.aws_iam_policy_document.secret_access.json
}

data "aws_iam_policy_document" "secret_access" {
statement {
actions = [
"ssm:GetParameters",
"ssm:GetParametersByPath",
]
resources = [
"arn:aws:ssm:*:*:parameter/Conveyor/HelloWorld",
]
effect = "Allow"
}
}
important

In the code we assumed that you created the parameter /Conveyor/HelloWorld. If you used another parameter, change the code accordingly

Deploy your code

Now you can build and deploy your code:

conveyor build && conveyor deploy --env <environment_name>

Activate the workflow

In the Conveyor UI navigate to your environment, by clicking environment on the left and clicking on your environment in the list. This will automatically open Airflow.

By default, your workflow will be disabled. Enable it by selecting the toggle on the left next to your DAG. Airflow will now start scheduling your project, one run for each day since the start date you specified when creating the project. For more details on Airflow as well as how Conveyor integrates with Airflow, take a look here

Check the logs for your job

Go to the Task executions tab of your environment and click on the logs icon for your succeeded sample task. In the logs you should see a line INFO:root:Environment variable: My first parameter if you used the default value. If you used another value for your parameter, it should print that one instead.

Next steps

This tutorial focussed on the simple use case of exposing a secret as an environment variable. For more information on how to use complex secret values, take a look at the technical reference. As mentioned, the same mechanism is supported by ConveyorSparkSubmitOperatorV2 and the ConveyorDbtTaskFactory.