Follow this Link
- Install Docker Community Edition (CE) for your workstation.
Install Docker Compose on your workstation.
To check the versions installed in your system:
docker --version
docker-compose --versionTo deploy Airflow on Docker Compose, fetch docker-compose.yaml
curl -LFO 'https://github.com/GeoscienceAustralia/dea-airflow/blob/master/docker-compose.workflow.yaml'Initializing the Environment:
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .envInitializing the database:
docker-compose up airflow-initStart all services:
docker-compose upID: airflow
Password: airflow
Follow this Link
-
Add Kafka and Zookeeper images in services in docker-compose.yaml file.
Specify the ports correctly. -
To check if Kafka is working fine:
docker exec -it <kafka_container_name> /bin/sh
cd opt/kafkaTo create Kafka-topics:
bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic covid-tweetTo list the Kafka-topics:
bin/kafka-topics.sh --list --zookeeper zookeeper:2181To send messages from the console-producer
bin/kafka-console-producer.sh --broker-list kafka:9092 --topic covid-tweetTo fetch the messages in console-consumer
bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic covid-tweet --from-beginning- Add MongoDB image in services in docker-compose.yaml file.
To check if MongoDB is working fine, connect to the server:
docker exec -it <MongoDB_container_name> /bin/sh
mongo --host localhost:27017 -u <user_name> -p <password>To list all the databases present:
show dbs- Install the required libraries
RUN pip install pymongo
RUN pip install quandl- Copy the required files in the Airflow
COPY /dags/resources/Query_8_GDP.csv /Query_8_GDP.csv
COPY /dags/resources/countries.csv /countries.csv- Import all the airflow utilities and query files
- Create tasks to run the queries
- Schedule the DAG with schedule_interval
- To run the scala file present in the JAR
t = BashOperator(
task_id="task_id",
bash_command='cd ~/../../opt/airflow/dags/ && pwd && java -cp <JAR_filename> <Scala_filename>'- To run the python scripts as tasks
t = PythonOperator(
task_id="task_id",
python_callable=python_func
)