Skip to main content

6. Run against real data

Everything is now in place to update the code and run the project against real data. Open the project in your favourite IDE. For example Intellij or Scala IDE.

6.1 Update the code

In this section, we will update the code to:

  1. extract the data from the public openaq dataset
  2. filter it to only keep data from Belgium data and calculate the average value per unit at a given location
  3. load the data into S3 and register it in the Glue catalog

Open the SampleJob.scala file and replace the SampleJob class with the content below but leave the SampleJob singleton object untouched.

caution

Don't forget to replace GLUE_DATABASE with the name of the Glue database created in the previous step.

If you specified a GLUE_DATABASE with another name than default, you should make sure the default database also exists otherwise Spark will try to create it, but will not have the necessary permissions based on the created IAM role.

class SampleJob(spark: SparkSession) extends LazyLogging {
import spark.implicits._

def run(environment: String, date: LocalDate): Unit = {
// execute ETL pipeline
val data = extract(environment, date)
val transformed = transform(data, date)
load(transformed, environment)
}

def extract(env: String, date: LocalDate): DataFrame = {
spark.read.json(s"s3://openaq-fetches/realtime-gzipped/$date")
}

def transform(data: DataFrame, date: LocalDate): DataFrame = {
data
.withColumn("ds", lit(Date.valueOf(date)))
.filter($"country" ==="BE")
.groupBy($"location", $"unit", $"ds")
.agg(avg($"value").as("average_value"))

}

def load(data: DataFrame, environment: String) = {
spark.catalog.setCurrentDatabase("GLUE_DATABASE")
data.coalesce(1)
.write
.partitionBy("ds")
.mode("overwrite")
.format("parquet")
.saveAsTable("openaq_sample")
}

}

6.2 Build and deploy

After these change we can build and deploy the project to our environment.

./gradlew clean shadowJar
conveyor build
conveyor deploy --env $ENVIRONMENT_NAME --wait

6.3 Re-run the tasks

The initial deployment of your project ran using generated data. We will instruct Airflow to re-run with the updated code.

In the Conveyor UI navigate to your environment and open Airflow. Navigate to your project DAG, select the last task and clear it including past runs.

6.4 Explore the data

Once the tasks are successful, explore the data using AWS Athena.