This project is actually a collection of custom operators,hooks and sensors.
The example dag with this repo will be scheduled daily at 3.00 P.M IST and will try to fetch tweets from @diprjk twitter handle which is the Official Twitter handle of Department of Information and Public Relations, Govt of Jammu & Kashmir. The DAG uses a custom sensor to sense tweet related to daily COVID update of J&K.
When the tweet is found, the custom operator will download the update which is in the form of an image and saves it in locally for further analysis.
Then the operator will use a libary ExtractTable (https://extracttable.github.io/ExtractTable-py/) to convert the image into a dataframe.
And finally the dataframe is processed and converted into a structured csv file.
Installation of airflow is simple, you can find installation details for airflow on the official website airflow.apache.org
This plugins is tested with airflow v 1.10.9.
if you have airflow allready installed then jump to Step 12
- We will be using virtualenv so make sure virtual env is installed
~$ pip install virtualenvor
~$ pip3 install virtualenv- Create a new directory for our project like airflow_tweet_test or something you may wish
~$ mkdir airflow_tweet_test- cd into the newly created directory
~$ cd airflow_tweet_test- Use python virtualenv to create a virtual environment for our project
~$ python -m virtualenv {name_of_virtual_env}where {name_of_virtual_env } is the name given to your virtual environment.
- Airflow requires a special variable
PATHvariable with the nameAIRFLOW_HOMEit's necessary for yourairflowto find the project files. Let's do it one by one.
- Now use the following command to open an editor
~$ nano ./{name_of_virtual_env}/bin/activateThis will open a bash script, scroll to the end of the file and paste the following lines at the end of the file where {full_project_path} is full path of your project and {name_of_virtual_env } is the name given to your virtual environment.
#This is for AIRFLOW usage
export AIRFLOW_HOME=/{full_project_path}press CTRL+X and y, to close the editor
- You are ready to activate your virtual environment. enter the following command to activate it
~$ source ./{name_of_virtual_env}/bin/activate- Now install airflow using pip inside virtual env
~$ pip install apache-airflow==1.10.9
~$ pip install SQLAlchemy==1.3.15- Check if airflow is sucessfully installed or not. Type following command inside terminal
~$ airflow version- create a new folder
pluginsinside our project home
~$ mkdir plugins
~$ cd plugins- Now clone this repo inside
pluginsdirectory
~$ git clone https://github.com/kundroomajid/twitter_plugin.git- To install twitter_plugin dependencies, we prefer installation using
requirements.txtfile. Enter the following command to install other dependent packages required for our twitter_plugin
~$ pip install -r ./twitter_plugin/requirements.txt- Once all the dependencies are installed, its time to initialize your
airflowmeta-database
~$ airflow initdb- Once you have initialized your airflow db, you can start airflow webserver and scheduler
~$ airflow webserver~$ airflow scheduler- Now we need to add few required airflow variables and airflow connections using airflow UI.
twitter_pluginrequires aconfigvariable (An Airflow Variable) which contains few details like:twitter_account_id: twitter account id of @diprjkemail: Your Email Id for notifications related stuff *find_param: The string on which the sensor senses if tweet related to covid update is avalaible or not *since_id: Initially empty .But later will be automatically updated by DAG.NOTE :Create an Airflow Variable namedconfigusing Airflow UI, with value as shown in below example.example:
{ "frequency": "daily", "twitter_account_id": 830669077022531584, "email": "yourmail@domain.com", "find_param": "Media Bulletin on Novel", "since_id": 1388848538663088135 }twitter_pluginAlso requires atwitter_defaultconnection (An Airflow Connection) which contains twitter API credentials (visit : https://developer.twitter.com/en/apply-for-access for more info):consumer_key: Obtained from twitter developer accountconsumer_secret: Obtained from twitter developer account *access_token: Obtained from twitter developer account *access_token_secret: Obtained from twitter developer accountNOTE :Create an Airflow Connection namedtwitter_defaultusing Airflow UI, leave all fields blank put your twitter credentials inside extra field like shown below.example:
{ "consumer_key" : "xxxxxx", "consumer_secret":"xxxxxxxxxx", "access_token":"xxxxxxxxx", "access_token_secret":"xxxxxx" }twitter_pluginAlso requires aextract_table_defaultconnection (An Airflow Connection) which contains ExtractTable API credentials (visit : https://extracttable.github.io/ExtractTable-py/):password: ExtractTable API keyNOTE :Create an Airflow Connection namedextract_table_defaultusing Airflow UI, leave all fields blank put your extracttable API key inside password field.
- Now copy
dag_tweet_etl.pyfromtwitter_plugin/example_dags/toproject directory/dags
if dags directory is not avalaible please create one inside project root.
- Also we need to add smtp server details in
airflow.cfgfile to recieve email alerts like
# smtp server here
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user = yoursmtpemailid
smtp_password = yourpassword
smtp_port = 587
smtp_mail_from = airflow@example.com- And we are good to go.