作者 : Chandan Rupakheti 和 Hernan Garcia
日期 : 2024年10月23日
来源 : [Amazon Managed Workflows for Apache Airflow (AmazonMWAA)](https://aws.amazon.com/blogs/big-data/category/application-
integration/amazon-managed-workflows-for-apache-airflow-amazon-mwaa/)
InvokeRestApi 功能大大简化了与Airflow REST API的交互,无需管理访问令牌和会话cookie。今天,我们很高兴地宣布,Amazon MWAA与 的集成得到了增强。这一改进简化了访问和管理您的Airflow环境的能力,同时也便于程序化地与工作流交互。Airflow RESTAPI支持多种用例,包括集中和自动化管理任务,以及构建基于事件的、数据感知的数据管道。
接下来的部分,我们将讨论这一增强功能并展示若干用例,展示其在您的Amazon MWAA环境中的应用。
Airflow RESTAPI是一个程序接口,允许您与Airflow的核心功能进行交互。它是一组HTTP端点,可以执行诸如调用(DAGs)、检查任务状态、检索工作流元数据、管理连接和变量,甚至启动与数据集相关的事件,而无需直接访问Airflow的网页界面或命令行工具。
在此之前,Amazon MWAA为与Airflow REST API的交互提供了基础,但管理访问令牌和会话cookie的过程增加了复杂性。现在,AmazonMWAA支持通过AWS凭证与Airflow REST API交互,简化了操作并提高了可用性。
新的 功能允许您使用有效的SigV4签名和您的现有AWS凭证发起Airflow REST API请求。这个特性现在适用于所有AmazonMWAA环境(2.4.3+)中的。通过作为中介,该RESTAPI代表用户处理请求,只需环境名称和API请求有效负载作为输入。
通过增强的Amazon MWAA API与Airflow REST API的集成带来了几个关键好处:
| 好处 | 描述 |
|---|---|
| 简化集成 | 新的 InvokeRestApi 功能消除了管理访问令牌和会话cookies的复杂性。 |
| 提升可用性 | 增强后的API直接向客户端提供Airflow REST API执行结果。 |
| 自动管理 | 支持自动化各种行政和管理任务,比如管理Airflow变量、连接等。 |
| 事件驱动架构 | 支持根据外部事件触发Airflow DAG。 |
| 数据感知调度 | 通过数据集调度特性,增强了对于工作负载的管理和资源的动态扩展。 |
在接下来的部分,我们将展示如何在各种用例中使用增强后的API。
以下代码片段展示了增强REST API的一般请求格式:
{ Name: String, Method: String, Path: String, QueryParameters: Json, Body:
Json } ```
所需的参数包括Amazon MWAA环境的`Name`、要调用的Airflow REST API端点的`Path`以及所使用的HTTP
`Method`,而`QueryParameters`和`Body`为可选参数。
以下代码片段展示了一般的响应格式:
`json { RestApiStatusCode: Number, RestApiResponse: Json }`
`RestApiStatusCode`表示由Airflow RESTAPI调用返回的HTTP状态代码,而`RestApiResponse`包含来自Airflow REST API的响应有效负载。
以下示例代码片段展示了如何使用增强集成更新Airflow变量的描述字段。该调用使用来调用。
```python import boto3
# 创建一个boto3客户端
mwaa_client = boto3.client("mwaa")
# 使用boto3客户端调用增强的REST API
response = mwaa_client.invoke_rest_api( Name="", Method="PATCH",
Path=f"/variables/", Body={ "key": "", "value": "", "description": "" },
QueryParameters={ "update_mask": ,您需要对Airflow变量执行创建、读取、更新和删除等API操作。以下是一个简单的Python客户端示例(`mwaa_variables_client.py`):
```python import boto3
# MWAA环境变量管理客户端
class MWAAVariablesClient: # 初始化客户端,包含环境名称和可选的MWAA boto3客户端 def **init**(self,
env_name, mwaa_client=None): self.env_name = env_name self.client =
mwaa_client or boto3.client("mwaa")
# 列出MWAA环境中的所有变量
def list(self):
response = self.client.invoke_rest_api(
Name=self.env_name,
Method="GET",
Path="/variables"
)
output = response['RestApiResponse']['variables']
return output
# 通过键获取特定变量
def get(self, key):
response = self.client.invoke_rest_api(
Name=self.env_name,
Method="GET",
Path=f"/variables/{key}"
)
return response['RestApiResponse']
# 创建一个新变量,包含键、值和可选描述
def create(self, key, value, description=None):
response = self.client.invoke_rest_api(
Name=self.env_name,
Method="POST",
Path="/variables",
Body={
"key": key,
"value": value,
"description": description
}
)
return response['RestApiResponse']
# 更新现有变量的值和描述
def update(self, key, value, description, query_parameters=None):
response = self.client.invoke_rest_api(
Name=self.env_name,
Method="PATCH",
Path=f"/variables/{key}",
Body={
"key": key,
"value": value,
"description": description
},
QueryParameters=query_parameters
)
return response['RestApiResponse']
# 根据键删除变量
def delete(self, key):
response = self.client.invoke_rest_api(
Name=self.env_name,
Method="DELETE",
Path=f"/variables/{key}"
)
return response['RestApiStatusCode']
if **name** == "**main** ": client = MWAAVariablesClient("")
print("\n正在创建测试变量 ...")
response = client.create(
key="test",
value="Test value",
description="Test description"
)
print(response)
print("\n正在列出所有变量 ...")
variables = client.list()
print(variables)
print("\n正在获取测试变量 ...")
response = client.get("test")
print(response)
print("\n正在更新测试变量的值和描述 ...")
response = client.update(
key="test",
value="Updated Value",
description="Updated description"
)
print(response)
print("\n仅更新测试变量的描述 ...")
response = client.update(
key="test",
value="Updated Value",
description="Yet another updated description",
query_parameters={"update_mask": ["description"]}
)
print(response)
print("\n正在删除测试变量 ...")
response_code = client.delete("test")
print(f"响应码: {response_code}")
print("\n最终获取已删除的测试变量 ...")
try:
response = client.get("test")
print(response)
except Exception as e:
print(e.response配置您的终端,您可以运行上面的Python脚本来实现以下结果:
```plaintext $ python mwaa_variables_client.py
正在创建测试变量 ... {'description': 'Test description', 'key': 'test', 'value': 'Testvalue'}
正在列出所有变量 ... 时,我们引入了使管道能对数据集变化做反应的新功能。这种与AirflowREST API的简化集成使得实现数据驱动的管道变得更为直接。
考虑一个用例,您需要运行一个使用外部事件输入的管道。以下示例DAG执行一个作为参数提供的bash命令(`any_bash_command.py`):
```python """ 此DAG允许您执行作为参数提供的bash命令。 命令作为参数传递,称为'command'。 """
from airflow import DAG from airflow.operators.bash_operator importBashOperator from airflow.models.param import Param from datetime importdatetime
with DAG( dag_id="any_bash_command", schedule=None, start_date=datetime(2022,
1, 1), catchup=False, params={ "command": Param("env", type="string") }, ) asdag: cli_command = BashOperator( task_id="triggered_bash_command",
bash_command="{{ dag_run.conf['command'] }}" ) ```
借助增强的REST API,您可以创建一个客户端来调用这个DAG,并提供您选择的bash命令,如下所示(`mwaa_dag_run_client.py`):
```python import boto3
# 触发DAG运行的客户端
class MWAADagRunClient: # 初始化客户端,包含MWAA环境名称和可选的MWAA boto3客户端 def
**init**(self, env_name, mwaa_client=None): self.env_name = env_nameself.client = mwaa_client or boto3.client("mwaa")
# 触发DAG运行,指定参数
def trigger_run(self,
dag_id,
dag_run_id=None,
logical_date=None,
data_interval_start=None,
data_interval_end=None,
note=None,
conf=None,
):
body = {}
if dag_run_id:
body["dag_run_id"] = dag_run_id
if logical_date:
body["logical_date"] = logical_date
if data_interval_start:
body["data_interval_start"] = data_interval_start
if data_interval_end:
body["data_interval_end"] = data_interval_end
if note:
body["note"] = note
body["conf"] = conf or {}
response = self.client.invoke_rest_api(
Name=self.env_name,
Method="POST",
Path=f"/dags/{dag_id}/dagRuns",
Body=body
)
return response['RestApiResponse']
if **name** == "**main** ": client = MWAADagRunClient("")
print("\n正在触发DAG运行 ...")
result = client.trigger_run(
dag_id="any_bash_command",
conf={
"command": "echo 'Hello from external trigger!'"
}
)
print(result)
以下代码片段展示了该脚本的样本运行:
plaintext $ python mwaa_dag_run_client.py 正在触发DAG运行 ... {'conf': {'command': "echo 'Hello from external trigger!'"}, 'dag_id': 'any_bash_command', 'dag_run_id': 'manual__2024-10-21T16:56:09.852908+00:00', 'data_interval_end': '2024-10-21T16:56:09.852908+00:00', 'data_interval_start': '2024-10-21T16:56:09.852908+00:00', 'execution_date': '2024-10-21T16:56:09.852908+00:00', 'external_trigger': True, 'logical_date': '2024-10-21T16:56:09.852908+00:00', 'run_type': 'manual', 'state': 'queued'}
在Airflow UI中,trigger_bash_command任务显示了以下执行日志:
`plaintext 数据湖时运行数据管道并执行提取、转换和加载(ETL)作业。以下图表展示了一种架构方法。
删除)
在通过外部输入调用DAG的上下文中,函数对AmazonMWAA网络服务器的繁忙程度一无所知,这可能导致函数在短时间内处理大量文件,从而压倒Amazon MWAA网络服务器。
一种调节文件处理吞吐量的方法是在S3桶和Lambda函数之间引入一个(AmazonSQS)队列,这可以帮助限制对网络服务器的API请求速率。您可以通过来实现Lambda的SQS事件源。然而,Lambda函数仍然不知道Amazon MWAA环境中可用的处理能力,以
Leave a Reply