Skip to content

Commit c042a15

Browse files
authored
fix: b/219021576 (GoogleCloudPlatform#7473)
## Description Fixes b/219021576, corresponding cl/429403697 Also adds jinja templated variables as best practice as a part of addressing GoogleCloudPlatform#6555 [Airflow 1 successful DAG Run](https://screenshot.googleplex.com/AwmdtnCmh94dKXE) [Airflow 2 successful DAG Run](https://screenshot.googleplex.com/7sNuCXtQ6NvXbyp) Note: It's a good idea to open an issue first for discussion. ## Checklist - [x] I have followed [Sample Guidelines from AUTHORING_GUIDE.MD](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md) - [ ] README is updated to include [all relevant information](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md#readme-file) - [ ] **Tests** pass: `nox -s py-3.6` (see [Test Environment Setup](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md#test-environment-setup)) - [ ] **Lint** pass: `nox -s lint` (see [Test Environment Setup](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md#test-environment-setup)) - [ ] These samples need a new **API enabled** in testing projects to pass (let us know which ones) - [ ] These samples need a new/updated **env vars** in testing projects set to pass (let us know which ones) - [x] Please **merge** this PR for me once it is approved. - [ ] This sample adds a new sample directory, and I updated the [CODEOWNERS file](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/.github/CODEOWNERS) with the codeowners for this sample
1 parent 0367e6d commit c042a15

File tree

4 files changed

+17
-13
lines changed

4 files changed

+17
-13
lines changed
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pandas-gbq==0.14.1 # must be under 0.15.0 until https://github.com/apache/airflow/issues/15113 is addressed
22
SQLAlchemy==1.3.23 # must be under 1.4 until at least Airflow 2.0 (check airflow setup.py for restrictions)
3-
WTForms==2.3.3 # Must stay under 3.0 because of https://github.com/dpgaspar/Flask-AppBuilder/issues/1732
3+
WTForms==2.3.3 # Must stay under 3.0 because of https://github.com/dpgaspar/Flask-AppBuilder/issues/1732
4+
MarkupSafe==1.1.1 # as found in Airflow 1.10.15 constraints

composer/airflow_1_samples/hadoop_tutorial.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
This DAG relies on three Airflow variables
2020
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
2121
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
22-
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
22+
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
2323
created.
2424
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
2525
See https://cloud.google.com/storage/docs/creating-buckets for creating a
@@ -38,7 +38,7 @@
3838
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
3939
# for best practices
4040
output_file = os.path.join(
41-
models.Variable.get('gcs_bucket'), 'wordcount',
41+
'{{ var.value.gcs_bucket }}', 'wordcount',
4242
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
4343
# Path to Hadoop wordcount example available on every Dataproc cluster.
4444
WORDCOUNT_JAR = (
@@ -63,7 +63,7 @@
6363
# If a task fails, retry it once after waiting at least 5 minutes
6464
'retries': 1,
6565
'retry_delay': datetime.timedelta(minutes=5),
66-
'project_id': models.Variable.get('gcp_project')
66+
'project_id': '{{ var.value.gcp_project }}'
6767
}
6868

6969
# [START composer_hadoop_schedule_airflow_1]
@@ -81,7 +81,7 @@
8181
# See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
8282
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
8383
num_workers=2,
84-
zone=models.Variable.get('gce_zone'),
84+
region='{{ var.value.gce_region }}',
8585
master_machine_type='n1-standard-2',
8686
worker_machine_type='n1-standard-2')
8787

@@ -90,13 +90,16 @@
9090
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
9191
task_id='run_dataproc_hadoop',
9292
main_jar=WORDCOUNT_JAR,
93+
region='{{ var.value.gce_region }}',
9394
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
9495
arguments=wordcount_args)
9596

9697
# Delete Cloud Dataproc cluster.
9798
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
9899
task_id='delete_dataproc_cluster',
99100
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
101+
region='{{ var.value.gce_region }}',
102+
100103
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
101104
# even if the Dataproc job fails.
102105
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

composer/airflow_1_samples/requirements.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,3 @@ scipy==1.4.1; python_version > '3.0'
88
scipy==1.2.3; python_version < '3.0'
99
numpy==1.19.5; python_version > '3.0'
1010
numpy==1.16.6; python_version < '3.0'
11-
markupsafe==2.0.1

composer/workflows/hadoop_tutorial.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
This DAG relies on three Airflow variables
2020
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
2121
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
22-
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
22+
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
2323
created.
2424
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
2525
See https://cloud.google.com/storage/docs/creating-buckets for creating a
@@ -38,7 +38,7 @@
3838
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
3939
# for best practices
4040
output_file = os.path.join(
41-
models.Variable.get('gcs_bucket'), 'wordcount',
41+
'{{ var.value.gcs_bucket }}', 'wordcount',
4242
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
4343
# Path to Hadoop wordcount example available on every Dataproc cluster.
4444
WORDCOUNT_JAR = (
@@ -49,7 +49,7 @@
4949
wordcount_args = ['wordcount', input_file, output_file]
5050

5151
HADOOP_JOB = {
52-
"reference": {"project_id": models.Variable.get('gcp_project')},
52+
"reference": {"project_id": '{{ var.value.gcp_project }}'},
5353
"placement": {"cluster_name": 'composer-hadoop-tutorial-cluster-{{ ds_nodash }}'},
5454
"hadoop_job": {
5555
"main_jar_file_uri": WORDCOUNT_JAR,
@@ -83,8 +83,8 @@
8383
# If a task fails, retry it once after waiting at least 5 minutes
8484
'retries': 1,
8585
'retry_delay': datetime.timedelta(minutes=5),
86-
'project_id': models.Variable.get('gcp_project'),
87-
'location': models.Variable.get('gce_region'),
86+
'project_id': '{{ var.value.gcp_project }}',
87+
'region': '{{ var.value.gce_region }}',
8888

8989
}
9090

@@ -104,7 +104,8 @@
104104
# See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
105105
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
106106
cluster_config=CLUSTER_CONFIG,
107-
region=models.Variable.get('gce_region'))
107+
region='{{ var.value.gce_region }}'
108+
)
108109

109110
# Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
110111
# master node.
@@ -116,7 +117,7 @@
116117
delete_dataproc_cluster = dataproc.DataprocDeleteClusterOperator(
117118
task_id='delete_dataproc_cluster',
118119
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
119-
region=models.Variable.get('gce_region'),
120+
region='{{ var.value.gce_region }}',
120121
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
121122
# even if the Dataproc job fails.
122123
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

0 commit comments

Comments
 (0)