6. Run against real data
Everything is now in place to update the code and run the project against real data. Open the project in your favourite IDE. For example Intellij or Scala IDE.
6.1 Update the code
In this section, we will update the code to:
- extract the data from the public openaq dataset
- filter it to only keep data from Belgium data and calculate the average value per unit at a given location
- load the data into S3 and register it in the Glue catalog
Open the SampleJob.scala
file and replace the SampleJob class with the content below but leave the SampleJob singleton object untouched.
Don't forget to replace GLUE_DATABASE with the name of the Glue database created in the previous step.
If you specified a GLUE_DATABASE with another name than default
,
you should make sure the default
database also exists otherwise Spark will try to create it,
but will not have the necessary permissions based on the created IAM role.
class SampleJob(spark: SparkSession) extends LazyLogging {
import spark.implicits._
def run(environment: String, date: LocalDate): Unit = {
// execute ETL pipeline
val data = extract(environment, date)
val transformed = transform(data, date)
load(transformed, environment)
}
def extract(env: String, date: LocalDate): DataFrame = {
spark.read.json(s"s3://openaq-fetches/realtime-gzipped/$date")
}
def transform(data: DataFrame, date: LocalDate): DataFrame = {
data
.withColumn("ds", lit(Date.valueOf(date)))
.filter($"country" ==="BE")
.groupBy($"location", $"unit", $"ds")
.agg(avg($"value").as("average_value"))
}
def load(data: DataFrame, environment: String) = {
spark.catalog.setCurrentDatabase("GLUE_DATABASE")
data.coalesce(1)
.write
.partitionBy("ds")
.mode("overwrite")
.format("parquet")
.saveAsTable("openaq_sample")
}
}
6.2 Build and deploy
After these change we can build and deploy the project to our environment.
./gradlew clean shadowJar
conveyor build
conveyor deploy --env $ENVIRONMENT_NAME --wait
6.3 Re-run the tasks
The initial deployment of your project ran using generated data. We will instruct Airflow to re-run with the updated code.
Navigate to your environment in the Conveyor UI and open Airflow. Navigate to your project DAG, select the last task and clear it including past runs.
6.4 Explore the data
Once the tasks are successful, explore the data using AWS Athena.