diff --git a/README.md b/README.md index bb1fe2c8..073b0ede 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,12 @@ See `pgsql_big_dedupe_example/README.md` for details This is the same example as the MySQL IL campaign contributions dataset above, but ported to run on PostgreSQL. +### Athena example - IL campaign contributions +See `athena_example/README.md` for details + +This is the same example as the MySQL IL campaign contributions dataset above, but ported to run on Athena. + + ## Training diff --git a/athena_example/README.md b/athena_example/README.md new file mode 100644 index 00000000..7b481322 --- /dev/null +++ b/athena_example/README.md @@ -0,0 +1,19 @@ +# Athena Example + +Takes a database of IL campaign contribution data, loads it in to a +Athena database, and identifies the unique donors. + +To follow this example you need to + +* Update `athena_example/config.py` with your Athena credentials, database name and the path to sroe the data +* Install dependencies, `pip install -r requirements.txt` + +Once that's all done you can run the example: + +```bash +cd athena_example +python athena_init.py +python athena_example.py +``` + + (use 'y', 'n' and 'u' keys to flag duplicates for active learning, 'f' when you are finished) diff --git a/athena_example/athena_example.py b/athena_example/athena_example.py new file mode 100644 index 00000000..d275af51 --- /dev/null +++ b/athena_example/athena_example.py @@ -0,0 +1,397 @@ +""" +This is an example of working with very large data. There are about +700,000 unduplicated donors in this database of Illinois political +campaign contributions. + +With such a large set of input data, we cannot store all the comparisons +we need to make in memory. Instead, we will read the pairs on demand +from the Athena database. + +__Note:__ You will need to run `python athena_init_db.py` +before running this script. See the annotates source for +[athena_init_db.py](athena_init_db.html) + +For smaller datasets (<10,000), see our +[csv_example](csv_example.html) +""" + +# There is a little bit difference between the result +# of this module and the athena one. The reason is due to +# Some special (and mostly erroneous) characters, such as \a .. +# Which are dealt with differently by athena and athena/panda + +import athenautils +import config +import sys +import os +import time +import logging +import optparse +import locale +import json + +import dedupe +import dedupe.backport + +sys.path.insert(0, "../athena_example/") + +sys.path.insert(0, "../athena_example/") + + +def cursor_execute(query, database): + """ + The MySQL compatible Cursor + """ + return athenautils.cursor_execute( + query, + database=database, + cursortype="tuple", + buffersize=config.BUFFERSIZE, + escapechar=None, + keep_default_na=False, + na_values=[""], + ) + + +def dict_cursor_execute(query, database): + """ + The MySQL compatible DicCursor + """ + return athenautils.cursor_execute( + query, + database=database, + cursortype="dict", + buffersize=config.BUFFERSIZE, + escapechar=None, + keep_default_na=False, + na_values=[""], + ) + + +def record_pairs(result_set): + for i, row in enumerate(result_set): + a_record_id, a_record, b_record_id, b_record = row + record_a = (a_record_id, json.loads(a_record)) + record_b = (b_record_id, json.loads(b_record)) + + yield record_a, record_b + + if i % 10000 == 0: + print(i) + + +def cluster_ids(clustered_dupes): + + for cluster, scores in clustered_dupes: + cluster_id = cluster[0] + for donor_id, score in zip(cluster, scores): + yield donor_id, cluster_id, score + + +if __name__ == "__main__": + + # Logging + + # Dedupe uses Python logging to show or suppress verbose output. Added + # for convenience. To enable verbose output, run `python + # examples/athena_example/athena_example.py -v` + + optp = optparse.OptionParser() + optp.add_option( + "-v", + "--verbose", + dest="verbose", + action="count", + help="Increase verbosity (specify multiple times for more)", + ) + (opts, args) = optp.parse_args() + log_level = logging.WARNING + if opts.verbose: + if opts.verbose == 1: + log_level = logging.INFO + elif opts.verbose >= 2: + log_level = logging.DEBUG + + logging.getLogger().setLevel(log_level) + + settings_file = "athena_example_settings" + training_file = "athena_example_training.json" + + start_time = time.time() + + # We'll be using variations on this following select statement to pull + # in campaign donor info. + # + # We did a fair amount of preprocessing of the fields in + # `athena_init_db.py` + DONOR_SELECT = """SELECT donor_id, city, name, zip, state, address + from processed_donors""" + + # ## Training + + if os.path.exists(settings_file): + print("reading from ", settings_file) + with open(settings_file, "rb") as sf: + deduper = dedupe.StaticDedupe(sf, num_cores=4) + else: + # Define the fields dedupe will pay attention to + # + # The address, city, and zip fields are often missing, so we'll + # tell dedupe that, and we'll learn a model that take that into + # account + fields = [ + {"field": "name", "type": "String"}, + {"field": "address", "type": "String", "has missing": True}, + {"field": "city", "type": "ShortString", "has missing": True}, + {"field": "state", "type": "ShortString", "has missing": True}, + {"field": "zip", "type": "ShortString", "has missing": True}, + ] + + # Create a new deduper object and pass our data model to it. + deduper = dedupe.Dedupe(fields, num_cores=4) + + # We will sample pairs from the entire donor table for training + cur = dict_cursor_execute(DONOR_SELECT, database=config.DATABASE) + temp_d = {i: row for i, row in enumerate(cur)} + + # If we have training data saved from a previous run of dedupe, + # look for it an load it in. + # + # __Note:__ if you want to train from + # scratch, delete the training_file + if os.path.exists(training_file): + print("reading labeled examples from ", training_file) + with open(training_file) as tf: + deduper.prepare_training(temp_d, training_file=tf) + else: + deduper.prepare_training(temp_d) + + del temp_d + + # ## Active learning + + print("starting active labeling...") + # Starts the training loop. Dedupe will find the next pair of records + # it is least certain about and ask you to label them as duplicates + # or not. + + # use 'y', 'n' and 'u' keys to flag duplicates + # press 'f' when you are finished + dedupe.convenience.console_label(deduper) + # When finished, save our labeled, training pairs to disk + with open(training_file, "w") as tf: + deduper.write_training(tf) + + # Notice our the argument here + # + # `recall` is the proportion of true dupes pairs that the learned + # rules must cover. You may want to reduce this if your are making + # too many blocks and too many comparisons. + deduper.train(recall=0.90) + + with open(settings_file, "wb") as sf: + deduper.write_settings(sf) + + # We can now remove some of the memory hobbing objects we used + # for training + deduper.cleanup_training() + + # ## Blocking + + print("blocking...") + + # To run blocking on such a large set of data, we create a separate table + # that contains blocking keys and record ids + print("creating blocking_map database") + athenautils.drop_external_table( + "blocking_map", + location="s3://{}/{}".format( + config.DATABASE_BUCKET, config.DATABASE_ROOT_KEY + "blocking_map" + ), + database=config.DATABASE, + ) + + q = """ + CREATE EXTERNAL TABLE blocking_map + (block_key VARCHAR(200), donor_id INTEGER) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY '\t' + LINES TERMINATED BY '\n' + LOCATION + 's3://{}/{}' + TBLPROPERTIES ( + 'classification'='csv', + --'skip.header.line.count'='1', + 'serialization.null.format'='') + """.format( + config.DATABASE_BUCKET, config.DATABASE_ROOT_KEY + "blocking_map" + ) + athenautils.athena_start_query(q, database=config.DATABASE) + + # If dedupe learned a Index Predicate, we have to take a pass + # through the data and create indices. + print("creating inverted index") + + # Armin: + # This never runs, index_fields is empty, possible bug? + for field in deduper.fingerprinter.index_fields: + q = """ + SELECT DISTINCT {field} FROM processed_donors + WHERE {field} IS NOT NULL + """.format( + field=field + ) + cur = dict_cursor_execute(q, databse=config.DATABASE) + field_data = (row[field] for row in cur) + deduper.fingerprinter.index(field_data, field) + + # Now we are ready to write our blocking map table by creating a + # generator that yields unique `(block_key, donor_id)` tuples. + print("writing blocking map") + + read_cur = dict_cursor_execute(DONOR_SELECT, database=config.DATABASE) + full_data = ((row["donor_id"], row) for row in read_cur) + + b_data = deduper.fingerprinter(full_data) + athenautils.write_many( + b_data, + filename="s3://{}/{}".format( + config.DATABASE_BUCKET, + config.DATABASE_ROOT_KEY + "blocking_map/blocking.csv", + ), + ) + + # select unique pairs to compare + q = """ + SELECT a.donor_id, + json_format(CAST (MAP(ARRAY['city', 'name', 'zip', 'state', 'address'], + ARRAY[ a.city, a.name, a.zip, a.state, a.address]) + AS JSON)), + b.donor_id, + json_format(CAST (MAP(ARRAY['city', 'name', 'zip', 'state', 'address'], + ARRAY[ b.city, b.name, b.zip, b.state, b.address]) + AS JSON)) + FROM (SELECT DISTINCT l.donor_id as east, r.donor_id as west + from blocking_map as l + INNER JOIN blocking_map as r + using (block_key) + where l.donor_id < r.donor_id) ids + INNER JOIN processed_donors a on ids.east=a.donor_id + INNER JOIN processed_donors b on ids.west=b.donor_id + """ + read_cur = cursor_execute(q, database=config.DATABASE) + + # ## Clustering + + print("clustering...") + clustered_dupes = deduper.cluster( + deduper.score(record_pairs(read_cur)), threshold=0.5 + ) + + # athenautils.athena_start_query("DROP TABLE IF EXISTS entity_map", database=config.DATABASE) + athenautils.drop_external_table( + "entity_map", + location="s3://{}/{}".format( + config.DATABASE_BUCKET, config.DATABASE_ROOT_KEY + "entity_map/" + ), + database=config.DATABASE, + ) + + print("creating entity_map database") + q = """ + CREATE EXTERNAL TABLE entity_map + (donor_id INTEGER, canon_id INTEGER, + cluster_score FLOAT) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY '\t' + LINES TERMINATED BY '\n' + LOCATION + 's3://{}/{}' + TBLPROPERTIES ( + 'classification'='csv', + --'skip.header.line.count'='1', + 'serialization.null.format'='') + """.format( + config.DATABASE_BUCKET, config.DATABASE_ROOT_KEY + "entity_map" + ) + athenautils.athena_start_query(q, database=config.DATABASE) + + athenautils.write_many( + cluster_ids(clustered_dupes), + filename="s3://{}/{}".format( + config.DATABASE_BUCKET, + config.DATABASE_ROOT_KEY + "entity_map/entity_map.csv", + ), + ) + + # Print out the number of duplicates found + print("# duplicate sets") + + # ## Payoff + + # With all this done, we can now begin to ask interesting questions + # of the data + # + # For example, let's see who the top 10 donors are. + + # for pretty printing numbers + locale.setlocale(locale.LC_ALL, "en_CA.UTF-8") + + athenautils.athena_start_query( + "DROP TABLE IF EXISTS e_map", database=config.DATABASE + ) + + q = """ + CREATE TABLE e_map as + SELECT COALESCE(canon_id, entity_map.donor_id) AS canon_id, entity_map.donor_id + FROM entity_map + RIGHT JOIN donors USING(donor_id) + """ + athenautils.athena_start_query(q, database=config.DATABASE) + + q = """ + SELECT array_join(filter(array[donors.first_name, donors.last_name], x-> x IS NOT NULL), ' ') AS name, + donation_totals.totals AS totals + FROM donors INNER JOIN + (SELECT canon_id, SUM(cast (amount as double)) AS totals + FROM contributions INNER JOIN e_map + USING (donor_id) + GROUP BY (canon_id) + ORDER BY totals + DESC LIMIT 10) + AS donation_totals + ON donors.donor_id = donation_totals.canon_id + ORDER BY totals DESC + """ + cur = dict_cursor_execute(q, database=config.DATABASE) + + print("Top Donors (deduped)") + for row in cur: + row["totals"] = locale.currency(row["totals"], grouping=True) + print("%(totals)20s: %(name)s" % row) + + # Compare this to what we would have gotten if we hadn't done any + # deduplication + q = """ + with donorscontributions as( + + SELECT donors.donor_id, + array_join(filter(array[donors.first_name, donors.last_name], x-> x IS NOT NULL), ' ') AS name, + cast(contributions.amount as double) as amount + FROM donors INNER JOIN contributions + USING (donor_id) + ) + SELECT name, sum(amount) AS totals + FROM donorscontributions + GROUP BY donor_id, name + ORDER BY totals DESC + LIMIT 10 + """ + cur = dict_cursor_execute(q, database=config.DATABASE) + + print("Top Donors (raw)") + for row in cur: + row["totals"] = locale.currency(row["totals"], grouping=True) + print("%(totals)20s: %(name)s" % row) + + print("ran in", time.time() - start_time, "seconds") diff --git a/athena_example/athena_init.py b/athena_example/athena_init.py new file mode 100644 index 00000000..42108846 --- /dev/null +++ b/athena_example/athena_init.py @@ -0,0 +1,239 @@ +#!/usr/bin/python +""" +This is a setup script for athena_example. It downloads a zip file of +Illinois campaign contributions and loads them into a Athena database +named 'contributions'. + +__Note:__ You will need to run this script first before execuing +[athena_example.py](athena_example.py). + +Tables created: +* raw_table - raw import of entire CSV file +* donors - all distinct donors based on name and address +* recipients - all distinct campaign contribution recipients +* contributions - contribution amounts tied to donor and recipients tables +""" + +import athenautils +import os +import zipfile +import pandas as pd +import numpy as np +from urllib.request import urlopen +import config +import csv +import sys + +sys.path.insert(0, "../athena_example/") + + +contributions_zip_file = "Illinois-campaign-contributions.txt.zip" +contributions_txt_file = "Illinois-campaign-contributions.txt" + +if not os.path.exists(contributions_zip_file): + print("downloading", contributions_zip_file, "(~60mb) ...") + u = urlopen( + "https://s3.amazonaws.com/dedupe-data/Illinois-campaign-contributions.txt.zip" + ) + localFile = open(contributions_zip_file, "wb") + localFile.write(u.read()) + localFile.close() + +if not os.path.exists(contributions_txt_file): + zip_file = zipfile.ZipFile(contributions_zip_file, "r") + print("extracting %s" % contributions_zip_file) + zip_file_contents = zip_file.namelist() + for f in zip_file_contents: + if ".txt" in f: + zip_file.extract(f) + zip_file.close() + + +print("importing raw data from csv...") +athenautils.drop_external_table( + "raw_table", + location="s3://{}/{}".format( + config.DATABASE_BUCKET, config.DATABASE_ROOT_KEY + "raw_table" + ), + database=config.DATABASE, +) +athenautils.athena_start_query("DROP TABLE IF EXISTS donors", database=config.DATABASE) +athenautils.athena_start_query( + "DROP TABLE IF EXISTS recipients", database=config.DATABASE +) +athenautils.athena_start_query( + "DROP TABLE IF EXISTS contributions", database=config.DATABASE +) +athenautils.athena_start_query( + "DROP TABLE IF EXISTS processed_donors", database=config.DATABASE +) + + +q = r""" +CREATE EXTERNAL TABLE raw_table + (reciept_id INT, last_name VARCHAR(70), first_name VARCHAR(35), + address_1 VARCHAR(35), address_2 VARCHAR(36), city VARCHAR(20), + state VARCHAR(15), zip VARCHAR(11), report_type VARCHAR(24), + date_recieved VARCHAR(10), loan_amount VARCHAR(12), + amount VARCHAR(23), receipt_type VARCHAR(23), + employer VARCHAR(70), occupation VARCHAR(40), + vendor_last_name VARCHAR(70), vendor_first_name VARCHAR(20), + vendor_address_1 VARCHAR(35), vendor_address_2 VARCHAR(31), + vendor_city VARCHAR(20), vendor_state VARCHAR(10), + vendor_zip VARCHAR(10), description VARCHAR(90), + election_type VARCHAR(10), election_year VARCHAR(10), + report_period_begin VARCHAR(10), report_period_end VARCHAR(33), + committee_name VARCHAR(70), committee_id VARCHAR(37)) +ROW FORMAT DELIMITED + FIELDS TERMINATED BY '\t' + ESCAPED BY '\\' + LINES TERMINATED BY '\n' +LOCATION + 's3://{}/{}' +TBLPROPERTIES ( + 'classification'='csv', + 'skip.header.line.count'='1', + 'serialization.null.format'='') +""".format( + config.DATABASE_BUCKET, config.DATABASE_ROOT_KEY + "raw_table" +) +athenautils.athena_start_query(q, database=config.DATABASE) + + +df_cursor = pd.read_csv( + contributions_txt_file, + sep="\t", + escapechar="\\", + quoting=csv.QUOTE_NONE, + error_bad_lines=False, + warn_bad_lines=True, + dtype=str, + keep_default_na=False, + na_values=[""], + chunksize=config.BUFFERSIZE, +) +chunkcount = 0 +filename = os.path.join( + "s3://", + config.DATABASE_BUCKET, + config.DATABASE_ROOT_KEY, + "raw_table", + os.path.splitext(contributions_txt_file)[0] + ".csv", +) +for df in df_cursor: + # Remove the very few records that mess up the demo + # (demo purposes only! Don't do something like this in production) + df = df[df["RcvDate"].str.len() >= 10] + + # set empty, non-zero, strings in date columns to null + df.loc[df["RptPdBegDate"].str.len() < 10, "RptPdBegDate"] = np.nan + + df.loc[df["RptPdEndDate"].str.len() < 10, "RptPdEndDate"] = np.nan + + # committee ID is requred. Remove the 2 rows that don't have it. + df = df[df["ID"] != ""] + + # There's a record with a date stuck in the committee_id column, + # which causes problems when inserting into the contributions table below. + # Get rid of it this way. + + df = df[df["ID"].str.len() <= 9] + + # dropping the last columns + df = df.drop(columns="Unnamed: 29") + + df_lower = df.apply( + lambda x: x.str.lower() + .str.normalize("NFKD") + .str.encode("ascii", errors="ignore") + .str.decode("utf-8") + if x.dtype == "object" + else x, + result_type="expand", + ) + + buffer = df_lower.to_csv( + quoting=csv.QUOTE_NONE, sep="\t", escapechar="\\", index=None + ) + + chunk_fname = athenautils.file_name_append( + filename, "_{}".format(chunkcount), ommitext=False + ) + athenautils.write(body=buffer, filename=chunk_fname) + chunkcount += 1 + +print("creating donors table...") +q = """ +CREATE TABLE donors as + with tmp as + (SELECT DISTINCT + NULLIF(TRIM(last_name), '') as last_name, + NULLIF(TRIM(first_name), '') as first_name, + NULLIF(TRIM(address_1), '') as address_1, + NULLIF(TRIM(address_2), '') as address_2, + NULLIF(TRIM(city), '') city, + NULLIF(TRIM(state), '') as state, + NULLIF(TRIM(zip), '') as zip, + NULLIF(TRIM(employer), '') as employer, + NULLIF(TRIM(occupation), '') as occupation + FROM raw_table) + SELECT row_number() over () as donor_id, * from tmp""" +athenautils.athena_start_query(q, database=config.DATABASE) + + +q = """ +CREATE TABLE recipients as + SELECT DISTINCT committee_id as recipient_id, committee_name as name FROM raw_table +""" +athenautils.athena_start_query(q, database=config.DATABASE) + +print("creating contributions table") + +q = """ +CREATE TABLE contributions as + SELECT reciept_id as contribution_id, + donors.donor_id as donor_id , + committee_id as recipient_id, + report_type, date_parse(date_recieved, '%m/%d/%Y') as date_recieved, + loan_amount, amount, + receipt_type, vendor_last_name , + vendor_first_name, vendor_address_1, vendor_address_2, + vendor_city, vendor_state, vendor_zip, description, + election_type, election_year, + date_parse(report_period_begin, '%m/%d/%Y') as report_period_begin, + date_parse(report_period_end, '%m/%d/%Y') as report_period_end + FROM raw_table JOIN donors donors ON + coalesce(donors.first_name, '') = coalesce(TRIM(raw_table.first_name), '') AND + coalesce(donors.last_name, '') = coalesce(TRIM(raw_table.last_name), '') AND + coalesce(donors.address_1, '') = coalesce(TRIM(raw_table.address_1), '') AND + coalesce(donors.address_2, '') = coalesce(TRIM(raw_table.address_2), '') AND + coalesce(donors.city, '') = coalesce(TRIM(raw_table.city), '') AND + coalesce(donors.state, '') = coalesce(TRIM(raw_table.state), '') AND + coalesce(donors.employer, '') = coalesce(TRIM(raw_table.employer), '') AND + coalesce(donors.occupation , '')= coalesce(TRIM(raw_table.occupation), '') AND + coalesce(donors.zip, '') = coalesce(TRIM(raw_table.zip), '')""" + +athenautils.athena_start_query(q, database=config.DATABASE) + +q = """ +CREATE TABLE processed_donors AS + SELECT donor_id, + LOWER(city) AS city, + CASE WHEN (first_name IS NULL AND last_name IS NULL) + THEN NULL + ELSE LOWER(array_join(filter(array[first_name, last_name], x-> x IS NOT NULL), ' ')) + END AS name, + LOWER(zip) AS zip, + LOWER(state) AS state, + CASE WHEN (address_1 IS NULL AND address_2 IS NULL) + THEN NULL + ELSE LOWER(array_join(filter(array[address_1, address_2], x-> x IS NOT NULL), ' ')) + END AS address, + LOWER(occupation) AS occupation, + LOWER(employer) AS employer, + first_name is null AS person + FROM donors""" +athenautils.athena_start_query(q, database=config.DATABASE) + + +print("done") diff --git a/athena_example/athenautils.py b/athena_example/athenautils.py new file mode 100644 index 00000000..9a68f367 --- /dev/null +++ b/athena_example/athenautils.py @@ -0,0 +1,248 @@ +from __future__ import print_function +import config +import re +import boto3 +import botocore +import sys +import os +import time +from os import listdir +import shutil +import pandas as pd + + +pyver = sys.version_info[0] + +if pyver < 3: + from StringIO import StringIO as SomethingIO + from urlparse import urlparse +else: + from io import BytesIO as SomethingIO + from urllib.parse import urlparse + +sys.path.insert(0, "../athena_example/") + +s3 = boto3.client( + "s3", + region_name=config.REGION, + aws_access_key_id=config.ACCESS_KEY_ID, + aws_secret_access_key=config.SECRET_ACCESS_KEY, +) + +athena = boto3.client( + "athena", + region_name=config.REGION, + aws_access_key_id=config.ACCESS_KEY_ID, + aws_secret_access_key=config.SECRET_ACCESS_KEY, +) + + +def cursor_execute( + query, + database=None, + cursortype="tuple", + buffersize=1000000, + output_location=config.ATHENA_GARBAGE_PATH, + region=config.REGION, + workgroup=config.WORKGROUP, + **kwargs +): + + kwargs["chunksize"] = buffersize + df_cur = athena_to_panda( + query, + database=database, + output_location=output_location, + region=region, + workgroup=workgroup, + **kwargs + ) + for df in df_cur: + if cursortype == "dict": + all_rows = df.where(pd.notnull(df), None).to_dict("records") + if cursortype == "tuple": + all_rows = df.where(pd.notnull(df), None).itertuples(index=False, name=None) + for row in all_rows: + yield row + + +def athena_to_panda( + query, + database=None, + output_location=config.ATHENA_GARBAGE_PATH, + region=config.REGION, + workgroup=config.WORKGROUP, + **kwargs +): + query_execution_id = athena_start_query( + query, + database=database, + output_location=output_location, + region=region, + workgroup=workgroup, + wait_until_finished=True, + ) + df = pandas_read_csv( + os.path.join(output_location, query_execution_id + ".csv"), **kwargs + ) + return df + + +def athena_start_query( + query, + database=None, + output_location=config.ATHENA_GARBAGE_PATH, + region=config.REGION, + workgroup=config.WORKGROUP, + wait_until_finished=True, +): + query_execution_id = athena.start_query_execution( + QueryString=query, + QueryExecutionContext={"Database": database}, + WorkGroup=workgroup, + ResultConfiguration={"OutputLocation": output_location}, + )["QueryExecutionId"] + + seconds_to_wait = 1 + + if wait_until_finished: + while True: + time.sleep(seconds_to_wait) + seconds_to_wait += 1 + # seconds_to_wait *= 2 + + execution = athena.get_query_execution(QueryExecutionId=query_execution_id) + + if execution["QueryExecution"]["Status"]["State"] not in [ + "QUEUED", + "RUNNING", + ]: + break + + if execution["QueryExecution"]["Status"]["State"] != "SUCCEEDED": + raise Exception( + "Athena query failed: %s" + % (execution["QueryExecution"]["Status"]["StateChangeReason"],), + query_execution_id, + ) + + return query_execution_id + + +# Copied from +# https://github.com/pandas-dev/pandas/blob/master/pandas/io/common.py +# Import it instead, when it's updated. + + +def is_s3_url(url): + """Check for an s3, s3n, or s3a url""" + try: + return urlparse(url).scheme in ["s3", "s3n", "s3a"] + except Exception: + return False + + +def seperate_bucket_key(url): + m = re.match("s3://([^/]+)/(.*)", url) + return m.group(1), m.group(2) + + +def list_all(path): + if is_s3_url(path): + bucket, key = seperate_bucket_key(path) + objects = s3.list_objects_v2(Bucket=bucket, Prefix=key) + if "Contents" not in objects: + return [] + return [key["Key"] for key in objects["Contents"]] + if not os.path.exists(path): + return [] + return listdir(path) + + +def del_all_files(path): + filelist = list_all(path) + if is_s3_url(path): + bucket, key = seperate_bucket_key(path) + for f in filelist: + s3.delete_object(Bucket=bucket, Key=f) + return + filelist = [os.path.join(path, f) for f in filelist] + for f in filelist: + if os.path.isfile(f): + os.remove(f) + else: + shutil.rmtree(f) + + +def drop_external_table( + tablename, + location, + database=None, + output_location=config.ATHENA_GARBAGE_PATH, + region=config.REGION, + workgroup=config.WORKGROUP, +): + athena_start_query( + "drop table if exists {}".format(tablename), + database=database, + output_location=output_location, + region=region, + workgroup=workgroup, + ) + del_all_files(location) + + +def pandas_read_csv(filepath_or_buffer, **kwargs): + bucket, key = seperate_bucket_key(filepath_or_buffer) + obj = s3.get_object(Bucket=bucket, Key=key) + return pd.read_csv(SomethingIO(obj["Body"].read()), **kwargs) + + +def read(filename): + if is_s3_url(filename): + bucket, key = seperate_bucket_key(filename) + obj = s3.get_object(Bucket=bucket, Key=key) + return obj["Body"].read() + with open(filename) as f: + return f.read() + + +def write(body, filename): + bucket, key = seperate_bucket_key(filename) + s3.put_object(Bucket=bucket, Key=key, Body=body) + return + + +def file_name_append(filename, append, ommitext): + filename_base, ext = os.path.splitext(filename) + if ommitext: + return "%s%s" % (filename_base, append) + return "%s%s%s" % (filename_base, append, ext) + + +def write_many(read_cursor, filename, buffersize=config.BUFFERSIZE): + chunkcount = 0 + while True: + buffer_df = pd.DataFrame.from_records(read_cursor, nrows=buffersize) + if buffer_df.empty: + break + buffer = buffer_df.to_csv(index=False, header=False, sep="\t") + chunk_fname = file_name_append( + filename, "_{}".format(chunkcount), ommitext=False + ) + write(buffer, chunk_fname) + chunkcount += 1 + + +def file_exists(filename): + bucket, key = seperate_bucket_key(filename) + try: + s3.get_object(Bucket=bucket, Key=key) + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "NoSuchKey": + return False + else: + # Something else has gone wrong. + raise + else: + return True diff --git a/athena_example/config.py b/athena_example/config.py new file mode 100644 index 00000000..b37da68c --- /dev/null +++ b/athena_example/config.py @@ -0,0 +1,14 @@ +LOG_FILE = 'log.txt' + +# Connection parameters +ACCESS_KEY_ID = None +SECRET_ACCESS_KEY = None +ATHENA_GARBAGE_PATH = '' +WORKGROUP = '' +REGION = '' +DATABASE = '' + +# Database Parameters +DATABASE_BUCKET = '' +DATABASE_ROOT_KEY = 'dedupe/' +BUFFERSIZE = 100000 diff --git a/athena_example/requirements.txt b/athena_example/requirements.txt new file mode 100644 index 00000000..5dcfecc9 --- /dev/null +++ b/athena_example/requirements.txt @@ -0,0 +1,3 @@ +pandas +boto3 +dedupe