Intro
Airflow provides different ways of working with automated flows and one of the ways is the possibility of accessing external APIs using HTTP operators and extracting the necessary data.
hands-on
In this tutorial we will create a DAG which will access an external API and extract the data directly to a local file.
If this is your first time using Airflow, I recommend accessing this link to understand more about Airflow and how to set up an environment.
Creating the DAG
For this tutorial, we will create a DAG that will trigger every 1 hour (schedule_interval=”0 * * * *”) and access an external API by extracting some data directly to a local JSON file.
In this scenario we will use the SimpleHttpOperator operator which provides an API capable of executing requests to external APIs.
from airflow import DAG
from datetime import datetime
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
import json
def _write_response(task_instance):
response = task_instance.xcom_pull(task_ids="extract_data")
data_file = open("extract_data.json","a")
data_file.write(response)
data_file.close()
with DAG(dag_id="api_external",
start_date=datetime(2023,1,1),
schedule_interval="0 * * * *",
catchup=False) as dag:
extract_data = SimpleHttpOperator(
task_id="extract_data",
http_conn_id='api_product',
endpoint='products',
method='GET',
response_filter=lambda response: json.dumps(response.text)
)
write_response = PythonOperator(
task_id="write_response",
python_callable=_write_response
)
extract_data >> write_response
Note that we use two operators within the same DAG. The SimpleHttpOperator operator that provides ways of accessing external APIs that through the method field we define HTTPs methods (GET, POST, PUT, DELETE). The endpoint field allows specifying the endpoint of the API, which in this case is products and finally, the http_conn_id parameter, where it’s necessary to pass the identifier of the connection that will be defined next through the Airflow UI.
As shown below, access the menu Admin > Connections
Fill in the data as shown in the image below and then save.
About the PythonOperator operator, we are only using it to execute a Python function called _write_response using XComs where through the task_id of the write_response task, it is possible to retrieve the result of the response and use it in any part of the code. In this scenario we are using the result retrieved from the API to write to the file.
XCom is a communication mechanism between different tasks that makes Airflow very flexible. Tasks can often be executed on different machines and with the use of XComs, communication and information exchange between Tasks is possible.
Finally, we define the execution of the tasks and their dependencies, see that we use the >> operator, which is basically to define the order of execution between the tasks. In our case, API access and extraction must be performed before writing to the file extract_data >> write_response.
After executing the DAG, it is possible to access the file that was generated with the result of the extraction, just access one of the workers via the terminal, which in this case will only have one. Run the following command below to list the containers:
docker ps
A listing similar to the one below will be displayed. Notice that one of the lines in the NAMES column refers to the worker, in this case coffee_and_tips_airflow-worker_1.
Continuing in the terminal, type the following command to access the Airflow directory where the extract_data.json file is located.
docker exec -it coffee_and_tips_airflow-worker_1 /bin/bash
It’s done, now just open the file and check the content.
Conclusion
Once again we saw the power of Airflow for automated processes that require easy access and integration of external APIs with few lines of code. In this example, we explore the use of XComs, which aims to make the exchange of messages between tasks that can be executed on different machines in a distributed environment more flexible.
Hope you enjoyed!