ಗಾಳಿಯ ಹರಿವಿನಲ್ಲಿನ ಕ್ರಿಯಾತ್ಮಕ ಕಾರ್ಯ ಅವಲಂಬನೆಗಳ ಶಕ್ತಿಯನ್ನು ಅನ್ಲಾಕ್ ಮಾಡಲಾಗುತ್ತಿದೆ
ಅಪಾಚೆ ಗಾಳಿಯ ಹರಿವು ಪ್ರಬಲ ವರ್ಕ್ಫ್ಲೋ ಯಾಂತ್ರೀಕೃತಗೊಂಡ ಸಾಧನವಾಗಿದೆ, ಆದರೆ ಕ್ರಿಯಾತ್ಮಕ ಅವಲಂಬನೆಗಳನ್ನು ನಿರ್ವಹಿಸುವುದರಿಂದ ಕೆಲವೊಮ್ಮೆ ಒಂದು ಒಗಟು ಪರಿಹರಿಸುವಂತೆ ಅನಿಸುತ್ತದೆ. ನಿರ್ದೇಶಿತ ಅಸಿಕ್ಲಿಕ್ ಗ್ರಾಫ್ (ಡಿಎಜಿ) ಅನ್ನು ವಿನ್ಯಾಸಗೊಳಿಸುವಾಗ, ಹಾರ್ಡ್ಕೋಡಿಂಗ್ ಕಾರ್ಯ ಅನುಕ್ರಮಗಳು ಸರಳ ಬಳಕೆಯ ಸಂದರ್ಭಗಳಲ್ಲಿ ಕೆಲಸ ಮಾಡಬಹುದು, ಆದರೆ ರನ್ಟೈಮ್ನಲ್ಲಿ ರಚನೆಯನ್ನು ನಿರ್ಧರಿಸಬೇಕಾದರೆ ಏನು? 🤔 🤔 🤔
ಕಾರ್ಯಗತಗೊಳಿಸಬೇಕಾದ ಕಾರ್ಯಗಳು ಒಳಬರುವ ಡೇಟಾವನ್ನು ಅವಲಂಬಿಸಿರುವ ಡೇಟಾ ಪೈಪ್ಲೈನ್ನಲ್ಲಿ ನೀವು ಕೆಲಸ ಮಾಡುತ್ತಿದ್ದೀರಿ ಎಂದು g ಹಿಸಿ. ಉದಾಹರಣೆಗೆ, ದೈನಂದಿನ ಸಂರಚನೆಯ ಆಧಾರದ ಮೇಲೆ ವಿಭಿನ್ನ ಫೈಲ್ಗಳನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುವುದು ಅಥವಾ ವ್ಯವಹಾರ ನಿಯಮದ ಆಧಾರದ ಮೇಲೆ ವೇರಿಯಬಲ್ ರೂಪಾಂತರಗಳನ್ನು ಕಾರ್ಯಗತಗೊಳಿಸುವುದು. ಅಂತಹ ಸಂದರ್ಭಗಳಲ್ಲಿ, ಸ್ಥಿರವಾದ ಡಿಎಜಿ ಅದನ್ನು ಕಡಿತಗೊಳಿಸುವುದಿಲ್ಲ the ಅವಲಂಬನೆಗಳನ್ನು ಕ್ರಿಯಾತ್ಮಕವಾಗಿ ವ್ಯಾಖ್ಯಾನಿಸಲು ನಿಮಗೆ ಒಂದು ಮಾರ್ಗ ಬೇಕು.
ಇದು ನಿಖರವಾಗಿ ಗಾಳಿಯ ಹರಿವು dag_run.conf ಆಟ ಬದಲಾಯಿಸುವವರಾಗಿರಬಹುದು. DAG ಅನ್ನು ಪ್ರಚೋದಿಸುವಾಗ ಕಾನ್ಫಿಗರೇಶನ್ ನಿಘಂಟನ್ನು ಹಾದುಹೋಗುವ ಮೂಲಕ, ನೀವು ಕಾರ್ಯ ಅನುಕ್ರಮಗಳನ್ನು ಕ್ರಿಯಾತ್ಮಕವಾಗಿ ರಚಿಸಬಹುದು. ಆದಾಗ್ಯೂ, ಇದನ್ನು ರಚನಾತ್ಮಕ ರೀತಿಯಲ್ಲಿ ಕಾರ್ಯಗತಗೊಳಿಸಲು ಗಾಳಿಯ ಹರಿವಿನ ಮರಣದಂಡನೆ ಮಾದರಿಯ ಬಗ್ಗೆ ಆಳವಾದ ತಿಳುವಳಿಕೆಯ ಅಗತ್ಯವಿದೆ.
ಈ ಲೇಖನದಲ್ಲಿ, ರನ್ಟೈಮ್ನಲ್ಲಿ ಕಾರ್ಯ ಅವಲಂಬನೆಗಳನ್ನು ನಿರ್ಧರಿಸುವ ಡೈನಾಮಿಕ್ ಡಿಎಜಿ ಅನ್ನು ಹೇಗೆ ನಿರ್ಮಿಸುವುದು ಎಂದು ನಾವು ಅನ್ವೇಷಿಸುತ್ತೇವೆ dag_run.conf. ಇದನ್ನು ಸಾಧಿಸಲು ನೀವು ಹೆಣಗಾಡುತ್ತಿದ್ದರೆ ಮತ್ತು ಸ್ಪಷ್ಟ ಪರಿಹಾರವನ್ನು ಕಂಡುಹಿಡಿಯದಿದ್ದರೆ, ಚಿಂತಿಸಬೇಡಿ - ನೀವು ಒಬ್ಬಂಟಿಯಾಗಿಲ್ಲ! ಪ್ರಾಯೋಗಿಕ ಉದಾಹರಣೆಗಳೊಂದಿಗೆ ಹಂತ ಹಂತವಾಗಿ ಅದನ್ನು ಒಡೆಯೋಣ. 🚀
ಸ ೦ ತಾನು | ಬಳಕೆಯ ಉದಾಹರಣೆ |
---|---|
dag_run.conf | ಡಿಎಜಿ ರನ್ ಅನ್ನು ಪ್ರಚೋದಿಸುವಾಗ ಡೈನಾಮಿಕ್ ಕಾನ್ಫಿಗರೇಶನ್ ಮೌಲ್ಯಗಳನ್ನು ಹಿಂಪಡೆಯಲು ಅನುಮತಿಸುತ್ತದೆ. ರನ್ಟೈಮ್ ನಿಯತಾಂಕಗಳನ್ನು ಹಾದುಹೋಗಲು ಅವಶ್ಯಕ. |
PythonOperator | ಪೈಥಾನ್ ಕಾರ್ಯವನ್ನು ಕಾರ್ಯಗತಗೊಳಿಸುವ ಗಾಳಿಯ ಹರಿವಿನ ಕಾರ್ಯವನ್ನು ವ್ಯಾಖ್ಯಾನಿಸುತ್ತದೆ, ಇದು ಡಿಎಜಿ ಒಳಗೆ ಹೊಂದಿಕೊಳ್ಳುವ ಮರಣದಂಡನೆ ತರ್ಕವನ್ನು ಅನುಮತಿಸುತ್ತದೆ. |
set_upstream() | ಕಾರ್ಯಗಳ ನಡುವಿನ ಅವಲಂಬನೆಯನ್ನು ಸ್ಪಷ್ಟವಾಗಿ ವ್ಯಾಖ್ಯಾನಿಸುತ್ತದೆ, ಒಂದು ಕಾರ್ಯವು ಇನ್ನೊಂದರ ನಂತರವೇ ಕಾರ್ಯಗತಗೊಳ್ಳುತ್ತದೆ ಎಂದು ಖಚಿತಪಡಿಸುತ್ತದೆ. |
@dag | ಹೆಚ್ಚು ಪೈಥೋನಿಕ್ ಮತ್ತು ರಚನಾತ್ಮಕ ರೀತಿಯಲ್ಲಿ ಡಿಎಜಿಗಳನ್ನು ವ್ಯಾಖ್ಯಾನಿಸಲು ಟಾಸ್ಕ್ ಫ್ಲೋ ಎಪಿಐ ಒದಗಿಸಿದ ಅಲಂಕಾರಿಕ. |
@task | ಟಾಸ್ಕ್ ಫ್ಲೋ ಎಪಿಐ ಬಳಸಿ ಗಾಳಿಯ ಹರಿವಿನಲ್ಲಿನ ಕಾರ್ಯಗಳನ್ನು ವ್ಯಾಖ್ಯಾನಿಸಲು, ಕಾರ್ಯ ರಚನೆ ಮತ್ತು ಡೇಟಾ ರವಾನೆಯನ್ನು ಸರಳೀಕರಿಸಲು ಅನುಮತಿಸುತ್ತದೆ. |
override(task_id=...) | ಒಂದೇ ಕಾರ್ಯದಿಂದ ಅನೇಕ ಕಾರ್ಯಗಳನ್ನು ತ್ವರಿತಗೊಳಿಸುವಾಗ ಕಾರ್ಯದ ID ಯನ್ನು ಕ್ರಿಯಾತ್ಮಕವಾಗಿ ಮಾರ್ಪಡಿಸಲು ಬಳಸಲಾಗುತ್ತದೆ. |
extract_elements(dag_run=None) | ಕಾರ್ಯ ಮರಣದಂಡನೆಯನ್ನು ಕ್ರಿಯಾತ್ಮಕವಾಗಿ ಕಾನ್ಫಿಗರ್ ಮಾಡಲು dag_run.conf ನಿಘಂಟಿನಿಂದ ಮೌಲ್ಯಗಳನ್ನು ಹೊರತೆಗೆಯುವ ಕಾರ್ಯ. |
schedule_interval=None | ಸ್ಥಿರ ವೇಳಾಪಟ್ಟಿಯಲ್ಲಿ ಚಲಿಸುವ ಬದಲು ಕೈಯಾರೆ ಪ್ರಚೋದಿಸಿದಾಗ ಮಾತ್ರ ಡಿಎಜಿ ಕಾರ್ಯಗತಗೊಳಿಸಲಾಗುತ್ತದೆ ಎಂದು ಖಚಿತಪಡಿಸುತ್ತದೆ. |
op_args=[element] | ಡೈನಾಮಿಕ್ ಆರ್ಗ್ಯುಮೆಂಟ್ಗಳನ್ನು ಪೈಥೋನೊಪೆರೇಟರ್ ಕಾರ್ಯಕ್ಕೆ ರವಾನಿಸುತ್ತದೆ, ಪ್ರತಿ ಕಾರ್ಯ ನಿದರ್ಶನಕ್ಕೆ ವಿಭಿನ್ನ ಮರಣದಂಡನೆಗಳನ್ನು ಸಕ್ರಿಯಗೊಳಿಸುತ್ತದೆ. |
catchup=False | ವಿರಾಮದ ನಂತರ ಪ್ರಾರಂಭಿಸಿದಾಗ ಎಲ್ಲಾ ತಪ್ಪಿದ ಡಿಎಜಿ ಮರಣದಂಡನೆಗಳನ್ನು ಚಲಾಯಿಸುವುದನ್ನು ತಡೆಯುತ್ತದೆ, ಇದು ನೈಜ-ಸಮಯದ ಸಂರಚನೆಗಳಿಗೆ ಉಪಯುಕ್ತವಾಗಿದೆ. |
ಗಾಳಿಯ ಹರಿವಿನಲ್ಲಿ ರನ್ಟೈಮ್ ಕಾನ್ಫಿಗರೇಶನ್ನೊಂದಿಗೆ ಡೈನಾಮಿಕ್ ಡಾಗ್ಗಳನ್ನು ನಿರ್ಮಿಸುವುದು
ಅಪಾಚೆ ಗಾಳಿಯ ಹರಿವು ಸಂಕೀರ್ಣವಾದ ಕೆಲಸದ ಹರಿವುಗಳನ್ನು ಏರ್ಪಡಿಸುವ ಪ್ರಬಲ ಸಾಧನವಾಗಿದೆ, ಆದರೆ ಅದರ ನಿಜವಾದ ಶಕ್ತಿ ಅದರ ನಮ್ಯತೆಯಲ್ಲಿದೆ. ಮೊದಲೇ ಪ್ರಸ್ತುತಪಡಿಸಿದ ಸ್ಕ್ರಿಪ್ಟ್ಗಳು ಎ ಅನ್ನು ಹೇಗೆ ರಚಿಸುವುದು ಎಂಬುದನ್ನು ತೋರಿಸುತ್ತದೆ ಕ್ರಿಯಾಶೀಲ ಡಾಗ್ ಚಾಲನೆಯಲ್ಲಿರುವ ಸಮಯದಲ್ಲಿ ಕಾರ್ಯ ಅವಲಂಬನೆಗಳನ್ನು ನಿರ್ಧರಿಸಲಾಗುತ್ತದೆ dag_run.conf. ಪ್ರಕ್ರಿಯೆಗೊಳಿಸಬೇಕಾದ ಅಂಶಗಳ ಪಟ್ಟಿಯನ್ನು ಹಾರ್ಡ್ಕೋಡಿಂಗ್ ಮಾಡುವ ಬದಲು, ಪ್ರಚೋದಿಸಿದಾಗ ಡಿಎಜಿ ಅವುಗಳನ್ನು ಕ್ರಿಯಾತ್ಮಕವಾಗಿ ಹಿಂಪಡೆಯುತ್ತದೆ, ಇದು ಹೆಚ್ಚು ಹೊಂದಿಕೊಳ್ಳಬಲ್ಲ ಕೆಲಸದ ಹರಿವುಗಳಿಗೆ ಅನುವು ಮಾಡಿಕೊಡುತ್ತದೆ. ವೇರಿಯಬಲ್ ಡೇಟಾಸೆಟ್ಗಳನ್ನು ಸಂಸ್ಕರಿಸುವುದು ಅಥವಾ ಬಾಹ್ಯ ಪರಿಸ್ಥಿತಿಗಳ ಆಧಾರದ ಮೇಲೆ ನಿರ್ದಿಷ್ಟ ಕಾರ್ಯಗಳನ್ನು ಕಾರ್ಯಗತಗೊಳಿಸುವುದು ಮುಂತಾದ ನೈಜ-ಪ್ರಪಂಚದ ಸನ್ನಿವೇಶಗಳಲ್ಲಿ ಇದು ವಿಶೇಷವಾಗಿ ಉಪಯುಕ್ತವಾಗಿದೆ. ಇಟಿಎಲ್ ಪೈಪ್ಲೈನ್ ಅನ್ನು g ಹಿಸಿ, ಅಲ್ಲಿ ಫೈಲ್ಗಳು ಪ್ರತಿದಿನ ಬದಲಾಗುತ್ತವೆ - ಈ ವಿಧಾನವು ಯಾಂತ್ರೀಕೃತಗೊಳಿಸುವಿಕೆಯನ್ನು ಹೆಚ್ಚು ಸುಲಭಗೊಳಿಸುತ್ತದೆ. 🚀
ಮೊದಲ ಸ್ಕ್ರಿಪ್ಟ್ ಬಳಸುತ್ತದೆ ಹೆಬ್ಬಾವಿನರಮಾಪಕ ಕಾರ್ಯಗಳನ್ನು ಕಾರ್ಯಗತಗೊಳಿಸಲು ಮತ್ತು ಅವಲಂಬನೆಗಳನ್ನು ಕ್ರಿಯಾತ್ಮಕವಾಗಿ ಹೊಂದಿಸಲು. ಇದು ಅಂಶಗಳ ಪಟ್ಟಿಯನ್ನು ಹೊರತೆಗೆಯುತ್ತದೆ dag_run.conf, ಅಗತ್ಯವಿದ್ದಾಗ ಮಾತ್ರ ಕಾರ್ಯಗಳನ್ನು ರಚಿಸಲಾಗಿದೆಯೆ ಎಂದು ಖಚಿತಪಡಿಸಿಕೊಳ್ಳುವುದು. ಪಟ್ಟಿಯಲ್ಲಿನ ಪ್ರತಿಯೊಂದು ಅಂಶವು ಒಂದು ಅನನ್ಯ ಕಾರ್ಯವಾಗುತ್ತದೆ, ಮತ್ತು ಅವಲಂಬನೆಗಳನ್ನು ಅನುಕ್ರಮವಾಗಿ ಹೊಂದಿಸಲಾಗಿದೆ. ಎರಡನೆಯ ವಿಧಾನವು ನಿಯಂತ್ರಿಸುತ್ತದೆ ಟಾಸ್ಕ್ಫ್ಲೋ ಎಪಿಐ, ಇದು ಅಲಂಕಾರಿಕರೊಂದಿಗೆ ಡಿಎಜಿ ರಚನೆಯನ್ನು ಸರಳಗೊಳಿಸುತ್ತದೆ agdag ಮತ್ತು ask ಟಾಸ್ಕ್. ಈ ವಿಧಾನವು ಡಿಎಜಿಯನ್ನು ಹೆಚ್ಚು ಓದಬಲ್ಲದು ಮತ್ತು ಕ್ಲೀನರ್ ಎಕ್ಸಿಕ್ಯೂಶನ್ ತರ್ಕವನ್ನು ನಿರ್ವಹಿಸುತ್ತದೆ. ಈ ವಿಧಾನಗಳು ಕೋಡ್ ಬದಲಾವಣೆಗಳ ಅಗತ್ಯವಿಲ್ಲದೇ ಕೆಲಸದ ಹರಿವುಗಳು ವಿಭಿನ್ನ ಸಂರಚನೆಗಳಿಗೆ ಹೊಂದಿಕೊಳ್ಳಬಹುದು ಎಂದು ಖಚಿತಪಡಿಸುತ್ತದೆ.
ಉದಾಹರಣೆಗೆ, ಇ-ಕಾಮರ್ಸ್ ಕಂಪನಿಯು ಬ್ಯಾಚ್ಗಳಲ್ಲಿ ಆದೇಶಗಳನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುವ ಸನ್ನಿವೇಶವನ್ನು ಪರಿಗಣಿಸಿ. ಕೆಲವು ದಿನಗಳು ಇತರರಿಗಿಂತ ಹೆಚ್ಚು ತುರ್ತು ಆದೇಶಗಳನ್ನು ಹೊಂದಿರಬಹುದು, ವಿಭಿನ್ನ ಕಾರ್ಯ ಅನುಕ್ರಮಗಳು ಬೇಕಾಗುತ್ತವೆ. ಸ್ಥಿರವಾದ ಡಿಎಜಿ ಬಳಸುವುದು ಆದ್ಯತೆಗಳು ಬದಲಾದಾಗ ಪ್ರತಿ ಬಾರಿ ಕೋಡ್ ಅನ್ನು ಮಾರ್ಪಡಿಸುವುದು ಎಂದರ್ಥ. ನಮ್ಮ ಡೈನಾಮಿಕ್ ಡಿಎಜಿ ವಿಧಾನದಿಂದ, ಬಾಹ್ಯ ವ್ಯವಸ್ಥೆಯು ನಿರ್ದಿಷ್ಟ ಕಾರ್ಯ ಅನುಕ್ರಮದೊಂದಿಗೆ ಡಿಎಜಿಯನ್ನು ಪ್ರಚೋದಿಸುತ್ತದೆ, ಇದು ಪ್ರಕ್ರಿಯೆಯನ್ನು ಹೆಚ್ಚು ಪರಿಣಾಮಕಾರಿಯಾಗಿ ಮಾಡುತ್ತದೆ. ಮತ್ತೊಂದು ಬಳಕೆಯ ಪ್ರಕರಣ ದತ್ತಾಂಶ ವಿಜ್ಞಾನದಲ್ಲಿದೆ, ಅಲ್ಲಿ ಒಳಬರುವ ಡೇಟಾ ವಿತರಣೆಗಳ ಆಧಾರದ ಮೇಲೆ ಮಾದರಿಗಳಿಗೆ ಮರುಪ್ರಯತ್ನಿಸುವ ಅಗತ್ಯವಿರುತ್ತದೆ. ಅಗತ್ಯವಾದ ಮಾದರಿ ಸಂರಚನೆಗಳನ್ನು ಕ್ರಿಯಾತ್ಮಕವಾಗಿ ಹಾದುಹೋಗುವ ಮೂಲಕ, ಅಗತ್ಯವಾದ ಗಣನೆಗಳನ್ನು ಮಾತ್ರ ಕಾರ್ಯಗತಗೊಳಿಸಲಾಗುತ್ತದೆ, ಸಮಯ ಮತ್ತು ಸಂಪನ್ಮೂಲಗಳನ್ನು ಉಳಿಸುತ್ತದೆ. 🎯
ಸಂಕ್ಷಿಪ್ತವಾಗಿ, ಈ ಸ್ಕ್ರಿಪ್ಟ್ಗಳು ರನ್ಟೈಮ್ ಇನ್ಪುಟ್ಗಳ ಆಧಾರದ ಮೇಲೆ ಕ್ರಿಯಾತ್ಮಕವಾಗಿ ಡಿಎಜಿಗಳನ್ನು ಉತ್ಪಾದಿಸಲು ಒಂದು ಅಡಿಪಾಯವನ್ನು ಒದಗಿಸುತ್ತದೆ. ನಿಯಂತ್ರಿಸುವ ಮೂಲಕ ಗಾಳಿಯ ಹರಿವಿನ ಟಾಸ್ಕ್ ಫ್ಲೋ API ಅಥವಾ ಸಾಂಪ್ರದಾಯಿಕ ಪೈಥೋನಾಪುರೇಟರ್ ವಿಧಾನ, ಡೆವಲಪರ್ಗಳು ಹೊಂದಿಕೊಳ್ಳುವ, ಮಾಡ್ಯುಲರ್ ಮತ್ತು ಪರಿಣಾಮಕಾರಿ ಕೆಲಸದ ಹರಿವುಗಳನ್ನು ರಚಿಸಬಹುದು. ಇದು ಹಸ್ತಚಾಲಿತ ಹಸ್ತಕ್ಷೇಪದ ಅಗತ್ಯವನ್ನು ನಿವಾರಿಸುತ್ತದೆ ಮತ್ತು ಇತರ ಯಾಂತ್ರೀಕೃತಗೊಂಡ ವ್ಯವಸ್ಥೆಗಳೊಂದಿಗೆ ತಡೆರಹಿತ ಏಕೀಕರಣವನ್ನು ಅನುಮತಿಸುತ್ತದೆ. ಗ್ರಾಹಕರ ಆದೇಶಗಳನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುವುದು, ಡೇಟಾ ಪೈಪ್ಲೈನ್ಗಳನ್ನು ನಿರ್ವಹಿಸುವುದು ಅಥವಾ ಕ್ಲೌಡ್ ವರ್ಕ್ಫ್ಲೋಗಳನ್ನು ಏರ್ಪಡಿಸುವುದು, ಡೈನಾಮಿಕ್ ಡಿಎಜಿಗಳು ನಿರ್ದಿಷ್ಟ ವ್ಯವಹಾರ ಅಗತ್ಯಗಳಿಗೆ ಅನುಗುಣವಾಗಿ ಚುರುಕಾದ ಯಾಂತ್ರೀಕೃತಗೊಳಿಸುವಿಕೆಯನ್ನು ಸಕ್ರಿಯಗೊಳಿಸುತ್ತವೆ.
ರನ್ಟೈಮ್ ಕಾನ್ಫಿಗರೇಶನ್ನೊಂದಿಗೆ ಗಾಳಿಯ ಹರಿವಿನಲ್ಲಿ ಡೈನಾಮಿಕ್ ಟಾಸ್ಕ್ ಸೀಕ್ವೆನ್ಸಿಂಗ್ ಅನ್ನು ಅನುಷ್ಠಾನಗೊಳಿಸಲಾಗುತ್ತಿದೆ
ಅಪಾಚೆ ಗಾಳಿಯ ಹರಿವನ್ನು ಬಳಸಿಕೊಂಡು ಪೈಥಾನ್ ಆಧಾರಿತ ಬ್ಯಾಕೆಂಡ್ ಆಟೊಮೇಷನ್
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import DagRun
import json
# Define default args
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
}
# Function to process each element
def process_element(element, kwargs):
print(f"Processing element: {element}")
# Define DAG
dag = DAG(
'dynamic_task_dag',
default_args=default_args,
schedule_interval=None,
)
# Extract elements from dag_run.conf
def generate_tasks(kwargs):
conf = kwargs.get('dag_run').conf or {}
elements = conf.get('elements', [])
task_list = []
for i, group in enumerate(elements):
for j, element in enumerate(group):
task_id = f"process_element_{i}_{j}"
task = PythonOperator(
task_id=task_id,
python_callable=process_element,
op_args=[element],
dag=dag,
)
task_list.append(task)
return task_list
# Generate dynamic tasks
tasks = generate_tasks()
# Define dependencies dynamically
for i in range(len(tasks) - 1):
tasks[i + 1].set_upstream(tasks[i])
ಪರ್ಯಾಯ ವಿಧಾನ: ಉತ್ತಮ ಓದುವಿಕೆಗಾಗಿ ಟಾಸ್ಕ್ ಫ್ಲೋ ಎಪಿಐ ಬಳಸುವುದು
ಗಾಳಿಯ ಹರಿವಿನ ಟಾಸ್ಕ್ ಫ್ಲೋ API ಅನ್ನು ಬಳಸುವ ಆಧುನಿಕ ಪೈಥಾನ್ ವಿಧಾನ
from airflow.decorators import dag, task
from datetime import datetime
# Define DAG
@dag(schedule_interval=None, start_date=datetime(2025, 1, 28), catchup=False)
def dynamic_taskflow_dag():
@task
def process_element(element: str):
print(f"Processing {element}")
@task
def extract_elements(dag_run=None):
conf = dag_run.conf or {}
return conf.get('elements', [])
elements = extract_elements()
task_groups = [[process_element(element) for element in group] for group in elements]
# Define dependencies dynamically
for i in range(len(task_groups) - 1):
for upstream_task in task_groups[i]:
for downstream_task in task_groups[i + 1]:
downstream_task.set_upstream(upstream_task)
dynamic_taskflow_dag()
ಗಾಳಿಯ ಹರಿವಿನಲ್ಲಿ ಷರತ್ತುಬದ್ಧ ಮರಣದಂಡನೆಯೊಂದಿಗೆ ಡೈನಾಮಿಕ್ ಟಾಸ್ಕ್ ಸೀಕ್ವೆನ್ಸಿಂಗ್ ಅನ್ನು ಹೆಚ್ಚಿಸುವುದು
ಒಂದು ಶಕ್ತಿಶಾಲಿ ಮತ್ತು ಆಗಾಗ್ಗೆ ಕಡೆಗಣಿಸಲ್ಪಟ್ಟ ವೈಶಿಷ್ಟ್ಯ ಗಾಳಿಯ ಹರಿ ಷರತ್ತುಬದ್ಧ ಮರಣದಂಡನೆ, ಇದು ಕ್ರಿಯಾತ್ಮಕ ಕಾರ್ಯ ಅನುಕ್ರಮದ ನಮ್ಯತೆಯನ್ನು ಮತ್ತಷ್ಟು ಸುಧಾರಿಸುತ್ತದೆ. ಕಾರ್ಯ ಅವಲಂಬನೆಗಳನ್ನು ಹಿಂಪಡೆಯುವಾಗ dag_run.conf ಉಪಯುಕ್ತವಾಗಿದೆ, ನೈಜ-ಪ್ರಪಂಚದ ಸನ್ನಿವೇಶಗಳು ನಿರ್ದಿಷ್ಟ ಷರತ್ತುಗಳ ಆಧಾರದ ಮೇಲೆ ಕೆಲವು ಕಾರ್ಯಗಳನ್ನು ಮಾತ್ರ ಕಾರ್ಯಗತಗೊಳಿಸುವ ಅಗತ್ಯವಿರುತ್ತದೆ. ಉದಾಹರಣೆಗೆ, ಕೆಲವು ಡೇಟಾಸೆಟ್ಗಳಿಗೆ ವಿಶ್ಲೇಷಣೆಗೆ ಮುಂಚಿತವಾಗಿ ಪ್ರಿಪ್ರೊಸೆಸಿಂಗ್ ಅಗತ್ಯವಿರುತ್ತದೆ, ಆದರೆ ಇತರವುಗಳನ್ನು ನೇರವಾಗಿ ಪ್ರಕ್ರಿಯೆಗೊಳಿಸಬಹುದು.
ಗಾಳಿಯ ಹರಿವಿನಲ್ಲಿ ಷರತ್ತುಬದ್ಧ ಮರಣದಂಡನೆಯನ್ನು ಬಳಸಿಕೊಂಡು ಕಾರ್ಯಗತಗೊಳಿಸಬಹುದು BranchPythonOperator, ಇದು ಪೂರ್ವನಿರ್ಧರಿತ ತರ್ಕದ ಆಧಾರದ ಮೇಲೆ ಕಾರ್ಯಗತಗೊಳಿಸುವ ಮುಂದಿನ ಕಾರ್ಯವನ್ನು ನಿರ್ಧರಿಸುತ್ತದೆ. ಫೈಲ್ಗಳನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುವ ಡೈನಾಮಿಕ್ ಡಿಎಜಿ ನಮ್ಮಲ್ಲಿದೆ ಎಂದು ಭಾವಿಸೋಣ, ಆದರೆ ನಿರ್ದಿಷ್ಟ ಗಾತ್ರದ ಮೇಲಿರುವ ಫೈಲ್ಗಳು ಮಾತ್ರ ation ರ್ಜಿತಗೊಳಿಸುವಿಕೆಯ ಅಗತ್ಯವಿರುತ್ತದೆ. ಎಲ್ಲಾ ಕಾರ್ಯಗಳನ್ನು ಅನುಕ್ರಮವಾಗಿ ಕಾರ್ಯಗತಗೊಳಿಸುವ ಬದಲು, ಯಾವ ಕಾರ್ಯಗಳನ್ನು ಚಲಾಯಿಸಬೇಕು ಎಂದು ನಾವು ಕ್ರಿಯಾತ್ಮಕವಾಗಿ ನಿರ್ಧರಿಸಬಹುದು, ಮರಣದಂಡನೆ ಸಮಯವನ್ನು ಉತ್ತಮಗೊಳಿಸುವುದು ಮತ್ತು ಸಂಪನ್ಮೂಲ ಬಳಕೆಯನ್ನು ಕಡಿಮೆ ಮಾಡುವುದು. ಈ ವಿಧಾನವು ಸಂಬಂಧಿತ ಕೆಲಸದ ಹರಿವುಗಳನ್ನು ಮಾತ್ರ ಪ್ರಚೋದಿಸುತ್ತದೆ ಎಂದು ಖಚಿತಪಡಿಸುತ್ತದೆ, ಇದು ಡೇಟಾ ಪೈಪ್ಲೈನ್ಗಳನ್ನು ಹೆಚ್ಚು ಪರಿಣಾಮಕಾರಿಯಾಗಿ ಮಾಡುತ್ತದೆ. 🚀
ಡೈನಾಮಿಕ್ ಡಿಎಜಿಗಳನ್ನು ಹೆಚ್ಚಿಸುವ ಇನ್ನೊಂದು ಮಾರ್ಗವೆಂದರೆ ಸಂಯೋಜಿಸುವುದು XComs (ಅಡ್ಡ-ಸಂವಹನ ಸಂದೇಶಗಳು). ಡೇಟಾವನ್ನು ವಿನಿಮಯ ಮಾಡಿಕೊಳ್ಳಲು XCOM ಗಳು ಕಾರ್ಯಗಳನ್ನು ಅನುಮತಿಸುತ್ತವೆ, ಅಂದರೆ ಕ್ರಿಯಾತ್ಮಕವಾಗಿ ರಚಿಸಲಾದ ಕಾರ್ಯ ಅನುಕ್ರಮವು ಹಂತಗಳ ನಡುವೆ ಮಾಹಿತಿಯನ್ನು ರವಾನಿಸಬಹುದು. ಉದಾಹರಣೆಗೆ, ಇಟಿಎಲ್ ಪೈಪ್ಲೈನ್ನಲ್ಲಿ, ಪೂರ್ವ -ಸಂಸ್ಕರಣಾ ಕಾರ್ಯವು ಅಗತ್ಯವಾದ ರೂಪಾಂತರಗಳನ್ನು ನಿರ್ಧರಿಸಬಹುದು ಮತ್ತು ಆ ವಿವರಗಳನ್ನು ನಂತರದ ಕಾರ್ಯಗಳಿಗೆ ರವಾನಿಸಬಹುದು. ಈ ವಿಧಾನವು ನಿಜವಾದ ಡೇಟಾ-ಚಾಲಿತ ಕೆಲಸದ ಹರಿವುಗಳನ್ನು ಶಕ್ತಗೊಳಿಸುತ್ತದೆ, ಅಲ್ಲಿ ಮರಣದಂಡನೆ ಹರಿವು ನೈಜ-ಸಮಯದ ಒಳಹರಿವಿನ ಆಧಾರದ ಮೇಲೆ ಹೊಂದಿಕೊಳ್ಳುತ್ತದೆ, ಯಾಂತ್ರೀಕೃತಗೊಂಡ ಸಾಮರ್ಥ್ಯಗಳನ್ನು ಗಮನಾರ್ಹವಾಗಿ ಹೆಚ್ಚಿಸುತ್ತದೆ.
ಗಾಳಿಯ ಹರಿವಿನಲ್ಲಿ ಡೈನಾಮಿಕ್ ಟಾಸ್ಕ್ ಸೀಕ್ವೆನ್ಸಿಂಗ್ ಬಗ್ಗೆ ಸಾಮಾನ್ಯ ಪ್ರಶ್ನೆಗಳು
- ಏನು dag_run.conf ಇದಕ್ಕಾಗಿ ಬಳಸಲಾಗಿದೆಯೇ?
- ಡಾಗ್ ಅನ್ನು ಪ್ರಚೋದಿಸುವಾಗ ರನ್ಟೈಮ್ನಲ್ಲಿ ಸಂರಚನಾ ನಿಯತಾಂಕಗಳನ್ನು ಹಾದುಹೋಗಲು ಇದು ಅನುಮತಿಸುತ್ತದೆ, ಇದು ಕೆಲಸದ ಹರಿವುಗಳನ್ನು ಹೆಚ್ಚು ಸುಲಭವಾಗಿ ಮಾಡುತ್ತದೆ.
- ಗಾಳಿಯ ಹರಿವಿನಲ್ಲಿ ಕಾರ್ಯಗಳನ್ನು ನಾನು ಕ್ರಿಯಾತ್ಮಕವಾಗಿ ಹೇಗೆ ರಚಿಸಬಹುದು?
- ಅನೇಕ ನಿದರ್ಶನಗಳನ್ನು ತ್ವರಿತಗೊಳಿಸಲು ನೀವು ಲೂಪ್ ಅನ್ನು ಬಳಸಬಹುದು PythonOperator ಅಥವಾ ಬಳಸಿ @task ಟಾಸ್ಕ್ ಫ್ಲೋ ಎಪಿಐನಲ್ಲಿ ಅಲಂಕಾರಿಕ.
- ಬಳಸುವ ಪ್ರಯೋಜನವೇನು BranchPythonOperator?
- ಇದು ಷರತ್ತುಬದ್ಧ ಮರಣದಂಡನೆಯನ್ನು ಶಕ್ತಗೊಳಿಸುತ್ತದೆ, ಪೂರ್ವನಿರ್ಧರಿತ ತರ್ಕದ ಆಧಾರದ ಮೇಲೆ ಡಿಎಜಿಗಳಿಗೆ ವಿಭಿನ್ನ ಮಾರ್ಗಗಳನ್ನು ಅನುಸರಿಸಲು ಅನುವು ಮಾಡಿಕೊಡುತ್ತದೆ, ದಕ್ಷತೆಯನ್ನು ಸುಧಾರಿಸುತ್ತದೆ.
- ಹೇಗೆ ಮಾಡುತ್ತದೆ XComs ಡೈನಾಮಿಕ್ ಡಾಗ್ಗಳನ್ನು ಹೆಚ್ಚಿಸುವುದೇ?
- ಡೇಟಾವನ್ನು ಹಂಚಿಕೊಳ್ಳಲು XCOM ಗಳು ಕಾರ್ಯಗಳನ್ನು ಅನುಮತಿಸುತ್ತವೆ, ನಂತರದ ಕಾರ್ಯಗಳು ಹಿಂದಿನ ಹಂತಗಳಿಂದ ಸಂಬಂಧಿತ ಮಾಹಿತಿಯನ್ನು ಪಡೆಯುತ್ತವೆ ಎಂದು ಖಚಿತಪಡಿಸುತ್ತದೆ.
- ನಾನು ಅವಲಂಬನೆಗಳನ್ನು ಕ್ರಿಯಾತ್ಮಕವಾಗಿ ಹೊಂದಿಸಬಹುದೇ?
- ಹೌದು, ನೀವು ಬಳಸಬಹುದು set_upstream() ಮತ್ತು set_downstream() ಡಿಎಜಿ ಒಳಗೆ ಅವಲಂಬನೆಗಳನ್ನು ಕ್ರಿಯಾತ್ಮಕವಾಗಿ ವ್ಯಾಖ್ಯಾನಿಸುವ ವಿಧಾನಗಳು.
ರನ್ಟೈಮ್ ಕಾನ್ಫಿಗರೇಶನ್ಗಳೊಂದಿಗೆ ಡೈನಾಮಿಕ್ ವರ್ಕ್ಫ್ಲೋಗಳನ್ನು ಉತ್ತಮಗೊಳಿಸುವುದು
ಕಾರ್ಯರೂಪಕ್ಕೆ ತರಲಾಗುವಿಕೆ ಡೈನಾಮಿಕ್ ಟಾಸ್ಕ್ ಸೀಕ್ವೆನ್ಸಿಂಗ್ ಗಾಳಿಯ ಹರಿವಿನಲ್ಲಿ ವರ್ಕ್ಫ್ಲೋ ಆಟೊಮೇಷನ್ ಅನ್ನು ಗಮನಾರ್ಹವಾಗಿ ಹೆಚ್ಚಿಸುತ್ತದೆ, ಇದು ಬದಲಾಗುತ್ತಿರುವ ಅವಶ್ಯಕತೆಗಳಿಗೆ ಹೊಂದಿಕೊಳ್ಳುತ್ತದೆ. ಚಾಲನಾಸಮಯ ಸಂರಚನೆಗಳನ್ನು ನಿಯಂತ್ರಿಸುವ ಮೂಲಕ, ಡೆವಲಪರ್ಗಳು ಸ್ಥಿರವಾದ ಡಿಎಜಿ ವ್ಯಾಖ್ಯಾನಗಳನ್ನು ತಪ್ಪಿಸಬಹುದು ಮತ್ತು ಬದಲಾಗಿ ಹೊಂದಿಕೊಳ್ಳುವ, ಡೇಟಾ-ಚಾಲಿತ ಪೈಪ್ಲೈನ್ಗಳನ್ನು ರಚಿಸಬಹುದು. ಹಣಕಾಸು ವರದಿ ಅಥವಾ ಯಂತ್ರ ಕಲಿಕೆ ಮಾದರಿ ತರಬೇತಿಯಂತಹ ನೈಜ-ಸಮಯದ ಇನ್ಪುಟ್ ಆಧರಿಸಿ ಕಾರ್ಯಗಳನ್ನು ವ್ಯಾಖ್ಯಾನಿಸಬೇಕಾದ ಪರಿಸರದಲ್ಲಿ ಈ ವಿಧಾನವು ವಿಶೇಷವಾಗಿ ಮೌಲ್ಯಯುತವಾಗಿದೆ. 🎯
ಸಂಯೋಜಿಸುವ ಮೂಲಕ dag_run.conf, ಷರತ್ತುಬದ್ಧ ಮರಣದಂಡನೆ ಮತ್ತು ಅವಲಂಬನೆ ನಿರ್ವಹಣೆ, ತಂಡಗಳು ಸ್ಕೇಲೆಬಲ್ ಮತ್ತು ಪರಿಣಾಮಕಾರಿ ಕೆಲಸದ ಹರಿವುಗಳನ್ನು ನಿರ್ಮಿಸಬಹುದು. ಇ-ಕಾಮರ್ಸ್ ವಹಿವಾಟುಗಳನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸುವುದು, ಕ್ಲೌಡ್-ಆಧಾರಿತ ದತ್ತಾಂಶ ರೂಪಾಂತರಗಳನ್ನು ನಿರ್ವಹಿಸುವುದು ಅಥವಾ ಸಂಕೀರ್ಣ ಬ್ಯಾಚ್ ಉದ್ಯೋಗಗಳನ್ನು ಆಯೋಜಿಸುವುದು, ಗಾಳಿಯ ಹರಿವಿನ ಕ್ರಿಯಾತ್ಮಕ ಡಿಎಜಿ ಸಾಮರ್ಥ್ಯಗಳು ಆಪ್ಟಿಮೈಸ್ಡ್ ಮತ್ತು ಸ್ವಯಂಚಾಲಿತ ಪರಿಹಾರವನ್ನು ಒದಗಿಸುತ್ತವೆ. ಈ ತಂತ್ರಗಳಲ್ಲಿ ಹೂಡಿಕೆ ಮಾಡುವುದರಿಂದ ಹಸ್ತಚಾಲಿತ ಹಸ್ತಕ್ಷೇಪವನ್ನು ಕಡಿಮೆ ಮಾಡುವಾಗ ವ್ಯವಹಾರಗಳಿಗೆ ಕಾರ್ಯಾಚರಣೆಗಳನ್ನು ಸುಗಮಗೊಳಿಸಲು ಅನುವು ಮಾಡಿಕೊಡುತ್ತದೆ.
ಗಾಳಿಯ ಹರಿವಿನಲ್ಲಿ ಡೈನಾಮಿಕ್ ಟಾಸ್ಕ್ ಸೀಕ್ವೆನ್ಸಿಂಗ್ಗಾಗಿ ಮೂಲಗಳು ಮತ್ತು ಉಲ್ಲೇಖಗಳು
- ಅಪಾಚೆ ಗಾಳಿಯ ಹರಿವಿನ ದಸ್ತಾವೇಜನ್ನು - ಡಿಎಜಿ ಕಾನ್ಫಿಗರೇಶನ್ ಮತ್ತು ರನ್ಟೈಮ್ ನಿಯತಾಂಕಗಳ ಬಗ್ಗೆ ವಿವರವಾದ ಒಳನೋಟಗಳು: ಅಪಾಚೆ ಗಾಳಿಯ ಹರಿವಿನ ಅಧಿಕೃತ ಡಾಕ್ಸ್
- ಡೈನಾಮಿಕ್ ಡಿಎಜಿ ರಚನೆಯ ಮಧ್ಯಮ ಲೇಖನ - ಬಳಸುವ ಮಾರ್ಗದರ್ಶಿ dag_run.conf ಡೈನಾಮಿಕ್ ಟಾಸ್ಕ್ ಸೀಕ್ವೆನ್ಸಿಂಗ್ಗಾಗಿ: ಮಧ್ಯಮ: ಗಾಳಿಯ ಹರಿವಿನಲ್ಲಿ ಡೈನಾಮಿಕ್ ಡಾಗ್ಗಳು
- ಸ್ಟಾಕ್ ಓವರ್ಫ್ಲೋ ಚರ್ಚೆ - ಇನ್ಪುಟ್ ಕಾನ್ಫಿಗರೇಶನ್ ಆಧರಿಸಿ ಡಿಎಜಿಗಳನ್ನು ಕ್ರಿಯಾತ್ಮಕವಾಗಿ ಉತ್ಪಾದಿಸುವ ಸಮುದಾಯ ಪರಿಹಾರಗಳು: ಸ್ಟ್ಯಾಕ್ ಓವರ್ಫ್ಲೋ ಥ್ರೆಡ್
- ಡೇಟಾ ಎಂಜಿನಿಯರಿಂಗ್ ಬ್ಲಾಗ್ - ಸ್ಕೇಲೆಬಲ್ ಗಾಳಿಯ ಹರಿವಿನ ಕೆಲಸದ ಹರಿವುಗಳನ್ನು ವಿನ್ಯಾಸಗೊಳಿಸಲು ಉತ್ತಮ ಅಭ್ಯಾಸಗಳು: ಡೇಟಾ ಎಂಜಿನಿಯರಿಂಗ್ ಬ್ಲಾಗ್