Airflow How Upload Imports My Dag Will Use

Tutorial¶

This tutorial walks you through some of the fundamental Airflow concepts, objects, and their usage while writing your start pipeline.

Case Pipeline definition¶

Hither is an example of a basic pipeline definition. Do not worry if this looks complicated, a line by line explanation follows beneath.

                            from              datetime              import              datetime              ,              timedelta              from              textwrap              import              dedent              # The DAG object; we'll demand this to instantiate a DAG              from              airflow              import              DAG              # Operators; nosotros need this to operate!              from              airflow.operators.fustigate              import              BashOperator              with              DAG              (              'tutorial'              ,              # These args volition go passed on to each operator              # You tin override them on a per-job basis during operator initialization              default_args              =              {              'depends_on_past'              :              False              ,              'email'              :              [              'airflow@example.com'              ],              'email_on_failure'              :              Simulated              ,              'email_on_retry'              :              False              ,              'retries'              :              1              ,              'retry_delay'              :              timedelta              (              minutes              =              5              ),              # 'queue': 'bash_queue',              # 'pool': 'backfill',              # 'priority_weight': 10,              # 'end_date': datetime(2016, 1, i),              # 'wait_for_downstream': False,              # 'sla': timedelta(hours=2),              # 'execution_timeout': timedelta(seconds=300),              # 'on_failure_callback': some_function,              # 'on_success_callback': some_other_function,              # 'on_retry_callback': another_function,              # 'sla_miss_callback': yet_another_function,              # 'trigger_rule': 'all_success'              },              description              =              'A unproblematic tutorial DAG'              ,              schedule_interval              =              timedelta              (              days              =              1              ),              start_date              =              datetime              (              2021              ,              1              ,              1              ),              catchup              =              False              ,              tags              =              [              'example'              ],              )              as              dag              :              # t1, t2 and t3 are examples of tasks created past instantiating operators              t1              =              BashOperator              (              task_id              =              'print_date'              ,              bash_command              =              'date'              ,              )              t2              =              BashOperator              (              task_id              =              'sleep'              ,              depends_on_past              =              Fake              ,              bash_command              =              'sleep v'              ,              retries              =              3              ,              )              t1              .              doc_md              =              dedent              (              """\                              #### Task Documentation                              You can certificate your chore using the attributes `doc_md` (markdown),                              `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets                              rendered in the UI's Task Instance Details page.                              ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)                              """              )              dag              .              doc_md              =              __doc__              # providing that you have a docstring at the beginning of the DAG              dag              .              doc_md              =              """                              This is a documentation placed anywhere                              """              # otherwise, type it similar this              templated_command              =              dedent              (              """                              {% for i in range(v) %}                              echo "{{ ds }}"                              repeat "{{ macros.ds_add(ds, 7)}}"                              {% endfor %}                              """              )              t3              =              BashOperator              (              task_id              =              'templated'              ,              depends_on_past              =              Simulated              ,              bash_command              =              templated_command              ,              )              t1              >>              [              t2              ,              t3              ]            

It'south a DAG definition file¶

1 thing to wrap your head around (it may not be very intuitive for everyone at first) is that this Airflow Python script is really just a configuration file specifying the DAG's construction as lawmaking. The actual tasks divers hither will run in a different context from the context of this script. Different tasks run on different workers at different points in time, which means that this script cannot be used to cross communicate betwixt tasks. Annotation that for this purpose we take a more avant-garde characteristic chosen XComs.

People sometimes think of the DAG definition file equally a place where they tin can exercise some bodily data processing - that is not the case at all! The script's purpose is to define a DAG object. It needs to evaluate quickly (seconds, not minutes) since the scheduler volition execute it periodically to reflect the changes if any.

Importing Modules¶

An Airflow pipeline is only a Python script that happens to define an Airflow DAG object. Let's start by importing the libraries we will need.

                            from              datetime              import              datetime              ,              timedelta              from              textwrap              import              dedent              # The DAG object; we'll need this to instantiate a DAG              from              airflow              import              DAG              # Operators; we need this to operate!              from              airflow.operators.bash              import              BashOperator            

See Modules Management for details on how Python and Airflow manage modules.

Default Arguments¶

We're about to create a DAG and some tasks, and nosotros have the choice to explicitly pass a ready of arguments to each task'due south constructor (which would become redundant), or (better!) nosotros can define a dictionary of default parameters that nosotros can utilize when creating tasks.

                            # These args will become passed on to each operator              # You lot tin can override them on a per-job basis during operator initialization              default_args              =              {              'depends_on_past'              :              False              ,              'e-mail'              :              [              'airflow@example.com'              ],              'email_on_failure'              :              Simulated              ,              'email_on_retry'              :              Simulated              ,              'retries'              :              i              ,              'retry_delay'              :              timedelta              (              minutes              =              5              ),              # 'queue': 'bash_queue',              # 'pool': 'backfill',              # 'priority_weight': 10,              # 'end_date': datetime(2016, 1, i),              # 'wait_for_downstream': False,              # 'sla': timedelta(hours=2),              # 'execution_timeout': timedelta(seconds=300),              # 'on_failure_callback': some_function,              # 'on_success_callback': some_other_function,              # 'on_retry_callback': another_function,              # 'sla_miss_callback': yet_another_function,              # 'trigger_rule': 'all_success'              },            

For more information about the BaseOperator'southward parameters and what they practise, refer to the airflow.models.BaseOperator documentation.

Also, note that you could easily define different sets of arguments that would serve different purposes. An example of that would exist to have different settings between a production and evolution environment.

Instantiate a DAG¶

We'll need a DAG object to nest our tasks into. Here nosotros laissez passer a string that defines the dag_id , which serves as a unique identifier for your DAG. We also laissez passer the default argument dictionary that nosotros just defined and define a schedule_interval of 1 day for the DAG.

                            with              DAG              (              'tutorial'              ,              # These args volition get passed on to each operator              # Y'all can override them on a per-task basis during operator initialization              default_args              =              {              'depends_on_past'              :              False              ,              'email'              :              [              'airflow@example.com'              ],              'email_on_failure'              :              Simulated              ,              'email_on_retry'              :              False              ,              'retries'              :              i              ,              'retry_delay'              :              timedelta              (              minutes              =              v              ),              # 'queue': 'bash_queue',              # 'pool': 'backfill',              # 'priority_weight': x,              # 'end_date': datetime(2016, one, 1),              # 'wait_for_downstream': False,              # 'sla': timedelta(hours=2),              # 'execution_timeout': timedelta(seconds=300),              # 'on_failure_callback': some_function,              # 'on_success_callback': some_other_function,              # 'on_retry_callback': another_function,              # 'sla_miss_callback': yet_another_function,              # 'trigger_rule': 'all_success'              },              description              =              'A simple tutorial DAG'              ,              schedule_interval              =              timedelta              (              days              =              one              ),              start_date              =              datetime              (              2021              ,              i              ,              1              ),              catchup              =              False              ,              tags              =              [              'example'              ],              )              as              dag              :            

Tasks¶

Tasks are generated when instantiating operator objects. An object instantiated from an operator is called a task. The start argument task_id acts as a unique identifier for the task.

                            t1              =              BashOperator              (              task_id              =              'print_date'              ,              bash_command              =              'engagement'              ,              )              t2              =              BashOperator              (              task_id              =              'sleep'              ,              depends_on_past              =              Fake              ,              bash_command              =              'sleep five'              ,              retries              =              3              ,              )            

Notice how nosotros pass a mix of operator specific arguments ( bash_command ) and an argument common to all operators ( retries ) inherited from BaseOperator to the operator's constructor. This is simpler than passing every argument for every constructor call. As well, find that in the second chore nosotros override the retries parameter with 3 .

The precedence rules for a task are as follows:

  1. Explicitly passed arguments

  2. Values that exist in the default_args dictionary

  3. The operator'southward default value, if one exists

A job must include or inherit the arguments task_id and owner , otherwise Airflow will raise an exception.

Templating with Jinja¶

Airflow leverages the power of Jinja Templating and provides the pipeline writer with a set of built-in parameters and macros. Airflow also provides hooks for the pipeline author to define their own parameters, macros and templates.

This tutorial barely scratches the surface of what yous tin do with templating in Airflow, but the goal of this section is to let you know this feature exists, get you familiar with double curly brackets, and point to the most common template variable: {{ ds }} (today's "appointment stamp").

                            templated_command              =              dedent              (              """              {% for i in range(v) %}                              repeat "{{ ds }}"                              repeat "{{ macros.ds_add(ds, vii)}}"              {% endfor %}              """              )              t3              =              BashOperator              (              task_id              =              'templated'              ,              depends_on_past              =              False              ,              bash_command              =              templated_command              ,              )            

Notice that the templated_command contains lawmaking logic in {% %} blocks, references parameters like {{ ds }} , and calls a role as in {{ macros.ds_add(ds, vii)}} .

Files can likewise exist passed to the bash_command argument, similar bash_command='templated_command.sh' , where the file location is relative to the directory containing the pipeline file ( tutorial.py in this case). This may be desirable for many reasons, like separating your script'due south logic and pipeline code, allowing for proper code highlighting in files composed in different languages, and general flexibility in structuring pipelines. It is likewise possible to define your template_searchpath as pointing to whatever folder locations in the DAG constructor call.

Using that same DAG constructor call, information technology is possible to define user_defined_macros which allow yous to specify your own variables. For example, passing dict(foo='bar') to this argument allows you to employ {{ foo }} in your templates. Moreover, specifying user_defined_filters allows you to register your own filters. For example, passing dict(hullo=lambda name: 'Hello %s' % proper noun) to this argument allows you to apply {{ 'world' | hello }} in your templates. For more than information regarding custom filters accept a look at the Jinja Documentation.

For more information on the variables and macros that can be referenced in templates, brand sure to read through the Templates reference.

Adding DAG and Tasks documentation¶

We can add documentation for DAG or each unmarried task. DAG documentation only supports markdown and then far, while job documentation supports plain text, markdown, reStructuredText, json, and yaml. The DAG documentation can exist written as a physician string at the beginning of the DAG file (recommended), or anywhere else in the file. Beneath you lot tin find some examples on how to implement task and DAG docs, as well as screenshots:

                            t1              .              doc_md              =              dedent              (              """\              #### Task Documentation              You tin document your task using the attributes `doc_md` (markdown),              `medico` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets              rendered in the UI's Task Instance Details folio.              ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)              """              )              dag              .              doc_md              =              __doc__              # providing that y'all have a docstring at the commencement of the DAG              dag              .              doc_md              =              """              This is a documentation placed anywhere              """              # otherwise, type information technology like this            

_images/task_doc.png _images/dag_doc.png

Setting up Dependencies¶

We have tasks t1 , t2 and t3 that do not depend on each other. Here's a few ways you can define dependencies between them:

                            t1              .              set_downstream              (              t2              )              # This means that t2 volition depend on t1              # running successfully to run.              # Information technology is equivalent to:              t2              .              set_upstream              (              t1              )              # The bit shift operator tin can also be              # used to chain operations:              t1              >>              t2              # And the upstream dependency with the              # fleck shift operator:              t2              <<              t1              # Chaining multiple dependencies becomes              # concise with the bit shift operator:              t1              >>              t2              >>              t3              # A listing of tasks can too be set as              # dependencies. These operations              # all have the same effect:              t1              .              set_downstream              ([              t2              ,              t3              ])              t1              >>              [              t2              ,              t3              ]              [              t2              ,              t3              ]              <<              t1            

Note that when executing your script, Airflow will raise exceptions when it finds cycles in your DAG or when a dependency is referenced more than once.

Using time zones¶

Creating a time zone aware DAG is quite simple. Just make sure to supply a time zone aware dates using pendulum . Don't try to use standard library timezone as they are known to have limitations and we deliberately disallow using them in DAGs.

Epitomize¶

Alright, then nosotros have a pretty basic DAG. At this point your lawmaking should look something like this:

                            from              datetime              import              datetime              ,              timedelta              from              textwrap              import              dedent              # The DAG object; nosotros'll need this to instantiate a DAG              from              airflow              import              DAG              # Operators; we demand this to operate!              from              airflow.operators.bash              import              BashOperator              with              DAG              (              'tutorial'              ,              # These args volition become passed on to each operator              # You can override them on a per-task footing during operator initialization              default_args              =              {              'depends_on_past'              :              False              ,              'e-mail'              :              [              'airflow@example.com'              ],              'email_on_failure'              :              False              ,              'email_on_retry'              :              Fake              ,              'retries'              :              1              ,              'retry_delay'              :              timedelta              (              minutes              =              5              ),              # 'queue': 'bash_queue',              # 'pool': 'backfill',              # 'priority_weight': 10,              # 'end_date': datetime(2016, ane, 1),              # 'wait_for_downstream': False,              # 'sla': timedelta(hours=2),              # 'execution_timeout': timedelta(seconds=300),              # 'on_failure_callback': some_function,              # 'on_success_callback': some_other_function,              # 'on_retry_callback': another_function,              # 'sla_miss_callback': yet_another_function,              # 'trigger_rule': 'all_success'              },              description              =              'A simple tutorial DAG'              ,              schedule_interval              =              timedelta              (              days              =              ane              ),              start_date              =              datetime              (              2021              ,              1              ,              1              ),              catchup              =              Imitation              ,              tags              =              [              'instance'              ],              )              as              dag              :              # t1, t2 and t3 are examples of tasks created by instantiating operators              t1              =              BashOperator              (              task_id              =              'print_date'              ,              bash_command              =              'date'              ,              )              t2              =              BashOperator              (              task_id              =              'sleep'              ,              depends_on_past              =              Faux              ,              bash_command              =              'sleep 5'              ,              retries              =              3              ,              )              t1              .              doc_md              =              dedent              (              """\                              #### Task Documentation                              Yous tin can document your chore using the attributes `doc_md` (markdown),                              `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets                              rendered in the UI's Task Instance Details page.                              ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)                              """              )              dag              .              doc_md              =              __doc__              # providing that you have a docstring at the beginning of the DAG              dag              .              doc_md              =              """                              This is a documentation placed anywhere                              """              # otherwise, type information technology like this              templated_command              =              dedent              (              """                              {% for i in range(5) %}                              echo "{{ ds }}"                              echo "{{ macros.ds_add(ds, 7)}}"                              {% endfor %}                              """              )              t3              =              BashOperator              (              task_id              =              'templated'              ,              depends_on_past              =              False              ,              bash_command              =              templated_command              ,              )              t1              >>              [              t2              ,              t3              ]            

Testing¶

Running the Script¶

Time to run some tests. First, let's make sure the pipeline is parsed successfully.

Let'southward presume we are saving the code from the previous step in tutorial.py in the DAGs folder referenced in your airflow.cfg . The default location for your DAGs is ~/airflow/dags .

                python ~/airflow/dags/tutorial.py              

If the script does not raise an exception it ways that you have non done annihilation horribly wrong, and that your Airflow environment is somewhat sound.

Testing¶

Allow'south test by running the bodily chore instances for a specific date. The date specified in this context is called the logical date (also called execution date for historical reasons), which simulates the scheduler running your task or DAG for a specific date and time, even though it physically will run at present (or as soon as its dependencies are met).

We said the scheduler runs your job for a specific engagement and time, not at. This is because each run of a DAG conceptually represents non a specific appointment and time, just an interval between two times, called a data interval. A DAG run's logical date is the start of its data interval.

                                # command layout: command subcommand dag_id task_id engagement                # testing print_date                airflow tasks                examination                tutorial print_date                2015-06-01                # testing sleep                airflow tasks                test                tutorial sleep                2015-06-01              

Now remember what we did with templating before? Meet how this template gets rendered and executed by running this command:

                                # testing templated                airflow tasks                test                tutorial templated                2015-06-01              

This should result in displaying a verbose log of events and ultimately running your bash command and printing the result.

Note that the airflow tasks exam control runs job instances locally, outputs their log to stdout (on screen), does not bother with dependencies, and does not communicate state (running, success, failed, ...) to the database. It simply allows testing a unmarried task instance.

The same applies to airflow dags test [dag_id] [logical_date] , merely on a DAG level. It performs a single DAG run of the given DAG id. While it does take task dependencies into account, no state is registered in the database. It is user-friendly for locally testing a total run of your DAG, given that eastward.thousand. if ane of your tasks expects data at some location, information technology is bachelor.

Backfill¶

Everything looks like it'southward running fine so let's run a backfill. backfill volition respect your dependencies, emit logs into files and talk to the database to tape status. If you practise have a webserver up, you will be able to track the progress. airflow webserver will start a web server if you are interested in tracking the progress visually as your backfill progresses.

Annotation that if you use depends_on_past=Truthful , individual chore instances will depend on the success of their previous task instance (that is, previous according to the logical date). Task instances with their logical dates equal to start_date will disregard this dependency because at that place would exist no past chore instances created for them.

You may also want to consider wait_for_downstream=True when using depends_on_past=Truthful . While depends_on_past=True causes a job instance to depend on the success of its previous task_instance, wait_for_downstream=True volition cause a job instance to also wait for all task instances immediately downstream of the previous task case to succeed.

The date range in this context is a start_date and optionally an end_date , which are used to populate the run schedule with task instances from this dag.

                                # optional, start a web server in debug mode in the background                # airflow webserver --debug &                # first your backfill on a appointment range                airflow dags backfill tutorial                \                --start-date                2015-06-01                \                --end-appointment                2015-06-07              

Pipeline Example¶

Lets look at some other example; we need to get some data from a file which is hosted online and need to insert into our local database. We also need to expect at removing duplicate rows while inserting.

Initial setup¶

We need to have docker and postgres installed. We will exist using this docker file Follow the instructions properly to set upwards Airflow.

You can utilize the postgres_default connectedness:

  • Conn id: postgres_default

  • Conn Type: postgres

  • Host: postgres

  • Schema: airflow

  • Login: airflow

  • Password: airflow

After that, you can test your connexion and if y'all followed all the steps correctly, it should show a success notification. Go on with saving the connection. For

Open up a postgres crush:

                ./airflow.sh airflow db trounce              

Create the Employees table with:

                                CREATE                                                Tabular array                                                EMPLOYEES                                (                                                                "Serial Number"                                                NUMERIC                                                PRIMARY                                                Fundamental                ,                                                                "Company Proper noun"                                                TEXT                ,                                                                "Employee Markme"                                                TEXT                ,                                                                "Description"                                                TEXT                ,                                                                "Go out"                                                INTEGER                                );                              

Afterwards, create the Employees_temp table:

                                CREATE                                                TABLE                                                EMPLOYEES_TEMP                                (                                                                "Serial Number"                                                NUMERIC                                                Chief                                                KEY                ,                                                                "Company Name"                                                TEXT                ,                                                                "Employee Markme"                                                TEXT                ,                                                                "Description"                                                TEXT                ,                                                                "Go out"                                                INTEGER                                );                              

We are at present ready write the DAG.

Let's suspension this downwards into ii steps: get data & merge information:

                                import                requests                from                airflow.decorators                import                chore                from                airflow.providers.postgres.hooks.postgres                import                PostgresHook                @chore                def                get_data                ():                # Annotation: configure this as appropriate for your airflow surroundings                data_path                =                "/opt/airflow/dags/files/employees.csv"                url                =                "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/pipeline_example.csv"                response                =                requests                .                request                (                "GET"                ,                url                )                with                open                (                data_path                ,                "west"                )                as                file                :                file                .                write                (                response                .                text                )                postgres_hook                =                PostgresHook                (                postgres_conn_id                =                "postgres_default"                )                conn                =                postgres_hook                .                get_conn                ()                cur                =                conn                .                cursor                ()                with                open                (                data_path                ,                "r"                )                every bit                file                :                cur                .                copy_expert                (                "Copy EMPLOYEES_TEMP FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '                \"                '"                ,                file                ,                )                conn                .                commit                ()              

Here we are passing a GET request to become the data from the URL and save it in employees.csv file on our Airflow instance and nosotros are dumping the file into a temporary table before merging the data to the final employees tabular array.

                                from                airflow.decorators                import                chore                from                airflow.providers.postgres.hooks.postgres                import                PostgresHook                @task                def                merge_data                ():                query                =                """                                  DELETE FROM EMPLOYEES e                                  USING EMPLOYEES_TEMP et                                  WHERE due east."Serial Number" = et."Series Number";                                  INSERT INTO EMPLOYEES                                  SELECT *                                  FROM EMPLOYEES_TEMP;                                  """                try                :                postgres_hook                =                PostgresHook                (                postgres_conn_id                =                "postgres_default"                )                conn                =                postgres_hook                .                get_conn                ()                cur                =                conn                .                cursor                ()                cur                .                execute                (                query                )                conn                .                commit                ()                return                0                except                Exception                as                east                :                return                1              

Here nosotros are beginning looking for indistinguishable values and removing them before we insert new values in our concluding table.

Lets expect at our DAG:

                                import                datetime                import                pendulum                import                requests                from                airflow.decorators                import                dag                ,                job                from                airflow.providers.postgres.hooks.postgres                import                PostgresHook                @dag                (                schedule_interval                =                "0 0 * * *"                ,                start_date                =                pendulum                .                datetime                (                2021                ,                i                ,                one                ,                tz                =                "UTC"                ),                catchup                =                False                ,                dagrun_timeout                =                datetime                .                timedelta                (                minutes                =                lx                ),                )                def                Etl                ():                @task                def                get_data                ():                # Annotation: configure this as advisable for your airflow environment                data_path                =                "/opt/airflow/dags/files/employees.csv"                url                =                "https://raw.githubusercontent.com/apache/airflow/principal/docs/apache-airflow/pipeline_example.csv"                response                =                requests                .                asking                (                "GET"                ,                url                )                with                open                (                data_path                ,                "due west"                )                equally                file                :                file                .                write                (                response                .                text                )                postgres_hook                =                PostgresHook                (                postgres_conn_id                =                "postgres_default"                )                conn                =                postgres_hook                .                get_conn                ()                cur                =                conn                .                cursor                ()                with                open                (                data_path                ,                "r"                )                equally                file                :                cur                .                copy_expert                (                "COPY EMPLOYEES_TEMP FROM STDIN WITH CSV HEADER DELIMITER Every bit ',' QUOTE '                \"                '"                ,                file                ,                )                conn                .                commit                ()                @task                def                merge_data                ():                query                =                """                                  DELETE FROM EMPLOYEES e                                  USING EMPLOYEES_TEMP et                                  WHERE e."Serial Number" = et."Serial Number";                                  INSERT INTO EMPLOYEES                                  SELECT *                                  FROM EMPLOYEES_TEMP;                                  """                try                :                postgres_hook                =                PostgresHook                (                postgres_conn_id                =                "postgres_default"                )                conn                =                postgres_hook                .                get_conn                ()                cur                =                conn                .                cursor                ()                cur                .                execute                (                query                )                conn                .                commit                ()                return                0                except                Exception                as                eastward                :                return                1                get_data                ()                >>                merge_data                ()                dag                =                Etl                ()              

This dag runs daily at 00:00. Add this python file to airflow/dags folder (east.g. dags/etl.py ) and go back to the main binder and run:

                docker-etch up airflow-init docker-compose upward              

Get to your browser and get to the site http://localhost:8080/home and trigger your DAG Airflow Example:

_images/new_tutorial-1.png _images/new_tutorial-2.png

The DAG ran successfully as we can see the green boxes. If at that place had been an error the boxes would be red. Before the DAG run my local table had x rows after the DAG run it had approx 100 rows.

What's Next?¶

That's it, you have written, tested and backfilled your very showtime Airflow pipeline. Merging your code into a code repository that has a master scheduler running against it should go information technology to get triggered and run every day.

Here's a few things you might want to practice adjacent:

See likewise

  • Read the Concepts section for detailed explanation of Airflow concepts such every bit DAGs, Tasks, Operators, and more.

  • Accept an in-depth tour of the UI - click all the things!

  • Keep reading the docs!

    • Review the how-to guides, which include a guide to writing your own operator

    • Review the Control Line Interface Reference

    • Review the List of operators

    • Review the Macros reference

  • Write your first pipeline!

trippreell1971.blogspot.com

Source: https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html

0 Response to "Airflow How Upload Imports My Dag Will Use"

Publicar un comentario

Iklan Atas Artikel

Iklan Tengah Artikel 1

Iklan Tengah Artikel 2

Iklan Bawah Artikel