From 0d0de0f5b7d8b10bfd07a3746c466c32150d4c39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fl=C3=A1vio=20Juvenal?= Date: Thu, 17 Sep 2020 16:18:24 -0300 Subject: [PATCH 1/6] First version of parallel pgsql_big_dedupe_example.py --- .../pgsql_big_dedupe_example.py | 152 +++++++++++++++--- 1 file changed, 126 insertions(+), 26 deletions(-) diff --git a/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py b/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py index e2c62957..0ed4ad8c 100755 --- a/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py +++ b/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py @@ -24,6 +24,9 @@ import itertools import io import csv +import multiprocessing +import math +import tempfile import dj_database_url import psycopg2 @@ -44,8 +47,16 @@ class Readable(object): def __init__(self, iterator): - self.output = io.StringIO() - self.writer = csv.writer(self.output) + self.output = io.StringIO( + # necesary for csv. See: https://docs.python.org/3/library/csv.html#id3 + newline='', + ) + self.writer = csv.writer( + self.output, + # csv.unix_dialect seems the right one + # based on our tests and Postgres CSV defaults: + # https://www.postgresql.org/docs/12/sql-copy.html + dialect=csv.unix_dialect) self.iterator = iterator def read(self, size): @@ -80,6 +91,30 @@ def cluster_ids(clustered_dupes): yield donor_id, cluster_id, score +def parallel_fingerprinter(db_conf, deduper_fingerprinter, donor_partition_select, partition_offset, partition_end): + read_con = psycopg2.connect(database=db_conf['NAME'], + user=db_conf['USER'], + password=db_conf['PASSWORD'], + host=db_conf['HOST'], + cursor_factory=psycopg2.extras.RealDictCursor) + + with read_con.cursor('donor_partition_select') as read_cur: + read_cur.execute(donor_partition_select, (partition_offset, partition_end)) + + partition_data = ((row['donor_id'], row) for row in read_cur) + b_data = deduper_fingerprinter(partition_data) + + with tempfile.NamedTemporaryFile( + prefix="pid_%d_blocking_map_" % os.getpid(), + delete=False, + mode="w", + newline="", + ) as block_file: + csv_writer = csv.writer(block_file, dialect=csv.unix_dialect) + csv_writer.writerows(b_data) + return block_file.name + + if __name__ == '__main__': # ## Logging @@ -102,6 +137,7 @@ def cluster_ids(clustered_dupes): # ## Setup settings_file = 'pgsql_big_dedupe_example_settings' training_file = 'pgsql_big_dedupe_example_training.json' + num_cores = multiprocessing.cpu_count() start_time = time.time() @@ -135,14 +171,19 @@ def cluster_ids(clustered_dupes): # `pgsql_big_dedupe_example_init_db.py` DONOR_SELECT = "SELECT donor_id, city, name, zip, state, address " \ - "from processed_donors" + "FROM processed_donors" + DONOR_PARTITION_SELECT = "SELECT donor_id, city, name, zip, state, address " \ + "FROM processed_donors " \ + "WHERE donor_id >= %s " \ + "AND donor_id < %s" + COUNT_SELECT = "SELECT COUNT(*) FROM processed_donors" # ## Training if os.path.exists(settings_file): print('reading from ', settings_file) with open(settings_file, 'rb') as sf: - deduper = dedupe.StaticDedupe(sf, num_cores=4) + deduper = dedupe.StaticDedupe(sf, num_cores=num_cores) else: # Define the fields dedupe will pay attention to @@ -159,7 +200,7 @@ def cluster_ids(clustered_dupes): ] # Create a new deduper object and pass our data model to it. - deduper = dedupe.Dedupe(fields, num_cores=4) + deduper = dedupe.Dedupe(fields, num_cores=num_cores) # Named cursor runs server side with psycopg2 with read_con.cursor('donor_select') as cur: @@ -220,34 +261,93 @@ def cluster_ids(clustered_dupes): cur.execute("CREATE TABLE blocking_map " "(block_key text, donor_id INTEGER)") + # Compute `(block_key, donor_id)` tuples and write on `blocking_map` in parallel, + # but only for the non-index predicates. Only those can run in parallel + # because they don't need global predicate indexes. + # We cannot share predicate indexes across Python processes because it would consume + # too much RAM + print('computing blocking map in parallel (non-index predicates)') + + predicates_without_index = [] + predicates_with_index = [] + for full_predicate in deduper.fingerprinter.predicates: + if any(predicate in deduper.fingerprinter.index_predicates + for predicate in full_predicate): + predicates_with_index.append(full_predicate) + else: + predicates_without_index.append(full_predicate) + + # Use only predicates WITHOUT indexes for parallel blocking + deduper.fingerprinter.predicates = predicates_without_index + + # processed_donors `id` starts at 1 and goes up to `COUNT(*)` + with read_con.cursor('donor_select') as cur: + cur.execute(COUNT_SELECT) + total_rows = cur.fetchone()['count'] + partition_size = math.ceil(total_rows / num_cores) + partition_offsets = range(1, total_rows + 1, partition_size) + + # Use a multiprocessing.Pool to run parallel_fingerprinter in num_cores. + # Set context to 'spawn' to start new clean processes. + # 'spawn' is the defualt on Windows and macOS, but not on Unix + multiprocessing.set_start_method('spawn') + with multiprocessing.Pool(processes=num_cores) as pool: + block_file_path_list = pool.starmap( + parallel_fingerprinter, + [ + ( + db_conf, + deduper.fingerprinter, + DONOR_PARTITION_SELECT, + partition_offset, + (partition_offset + partition_size) + ) + for partition_offset in partition_offsets + ], + ) + + # parallel_fingerprinter output are CSV files that we'll now copy into `blocking_map` + print('writing blocking map (non-index predicates)') + + for blocking_file_path in block_file_path_list: + with write_con.cursor() as write_cur: + with open(blocking_file_path, "r", newline="") as f: + logging.info("Appending to blocking_map from %s" % blocking_file_path) + write_cur.copy_expert("COPY blocking_map FROM STDIN CSV", f) + os.remove(blocking_file_path) + # If dedupe learned a Index Predicate, we have to take a pass - # through the data and create indices. - print('creating inverted index') + # through the data and create indices + if predicates_with_index: + print('creating inverted indexes') - for field in deduper.fingerprinter.index_fields: - with read_con.cursor('field_values') as cur: - cur.execute("SELECT DISTINCT %s FROM processed_donors" % field) - field_data = (row[field] for row in cur) - deduper.fingerprinter.index(field_data, field) + # Use only predicates WITH indexes for non-parallel blocking + deduper.fingerprinter.predicates = predicates_with_index - # Now we are ready to write our blocking map table by creating a - # generator that yields unique `(block_key, donor_id)` tuples. - print('writing blocking map') + for field in deduper.fingerprinter.index_fields: + with read_con.cursor('field_values') as cur: + cur.execute("SELECT DISTINCT %s FROM processed_donors" % field) + field_data = (row[field] for row in cur) + deduper.fingerprinter.index(field_data, field) - with read_con.cursor('donor_select') as read_cur: - read_cur.execute(DONOR_SELECT) + # Compute `(block_key, donor_id)` tuples and write on `blocking_map` in serial + # for the index predicates + print('computing and writing blocking map (index predicates)') - full_data = ((row['donor_id'], row) for row in read_cur) - b_data = deduper.fingerprinter(full_data) + with read_con.cursor('donor_select') as read_cur: + read_cur.execute(DONOR_SELECT) - with write_con: - with write_con.cursor() as write_cur: - write_cur.copy_expert('COPY blocking_map FROM STDIN WITH CSV', - Readable(b_data), - size=10000) + full_data = ((row['donor_id'], row) for row in read_cur) + b_data = deduper.fingerprinter(full_data) + + with write_con: + with write_con.cursor() as write_cur: + write_cur.copy_expert('COPY blocking_map FROM STDIN WITH CSV', + Readable(b_data), + size=10000) - # free up memory by removing indices - deduper.fingerprinter.reset_indices() + # free up memory by removing indices + deduper.fingerprinter.reset_indices() logging.info("indexing block_key") with write_con: From b30c6d27bba36a1d6f49022a36a509730e851c8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fl=C3=A1vio=20Juvenal?= Date: Fri, 18 Sep 2020 17:34:32 -0300 Subject: [PATCH 2/6] pgsql_big_dedupe_example/test_parallel_vs_serial.sh --- .../pgsql_big_dedupe_example.py | 233 ++++----- .../pgsql_big_dedupe_example_old.py | 461 ++++++++++++++++++ .../pgsql_big_dedupe_example_serial.py | 385 +++++++++++++++ .../pgsql_big_dedupe_example_serial_old.py | 373 ++++++++++++++ .../test_parallel_vs_serial.sh | 11 + 5 files changed, 1347 insertions(+), 116 deletions(-) create mode 100755 pgsql_big_dedupe_example/pgsql_big_dedupe_example_old.py create mode 100755 pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py create mode 100755 pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial_old.py create mode 100755 pgsql_big_dedupe_example/test_parallel_vs_serial.sh diff --git a/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py b/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py index 0ed4ad8c..59f759dd 100755 --- a/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py +++ b/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py @@ -313,7 +313,7 @@ def parallel_fingerprinter(db_conf, deduper_fingerprinter, donor_partition_selec with write_con.cursor() as write_cur: with open(blocking_file_path, "r", newline="") as f: logging.info("Appending to blocking_map from %s" % blocking_file_path) - write_cur.copy_expert("COPY blocking_map FROM STDIN CSV", f) + write_cur.copy_expert("COPY blocking_map FROM STDIN CSV", f, size=10000) os.remove(blocking_file_path) # If dedupe learned a Index Predicate, we have to take a pass @@ -349,123 +349,124 @@ def parallel_fingerprinter(db_conf, deduper_fingerprinter, donor_partition_selec # free up memory by removing indices deduper.fingerprinter.reset_indices() - logging.info("indexing block_key") - with write_con: - with write_con.cursor() as cur: - cur.execute("CREATE UNIQUE INDEX ON blocking_map " - "(block_key text_pattern_ops, donor_id)") - - # ## Clustering - - with write_con: - with write_con.cursor() as cur: - cur.execute("DROP TABLE IF EXISTS entity_map") - - print('creating entity_map database') - cur.execute("CREATE TABLE entity_map " - "(donor_id INTEGER, canon_id INTEGER, " - " cluster_score FLOAT, PRIMARY KEY(donor_id))") - - with read_con.cursor('pairs', cursor_factory=psycopg2.extensions.cursor) as read_cur: - read_cur.execute(""" - select a.donor_id, - row_to_json((select d from (select a.city, - a.name, - a.zip, - a.state, - a.address) d)), - b.donor_id, - row_to_json((select d from (select b.city, - b.name, - b.zip, - b.state, - b.address) d)) - from (select DISTINCT l.donor_id as east, r.donor_id as west - from blocking_map as l - INNER JOIN blocking_map as r - using (block_key) - where l.donor_id < r.donor_id) ids - INNER JOIN processed_donors a on ids.east=a.donor_id - INNER JOIN processed_donors b on ids.west=b.donor_id""") - - print('clustering...') - clustered_dupes = deduper.cluster(deduper.score(record_pairs(read_cur)), - threshold=0.5) - - # ## Writing out results - - # We now have a sequence of tuples of donor ids that dedupe believes - # all refer to the same entity. We write this out onto an entity map - # table - - print('writing results') - with write_con: - with write_con.cursor() as write_cur: - write_cur.copy_expert('COPY entity_map FROM STDIN WITH CSV', - Readable(cluster_ids(clustered_dupes)), - size=10000) - - with write_con: - with write_con.cursor() as cur: - cur.execute("CREATE INDEX head_index ON entity_map (canon_id)") - - # Print out the number of duplicates found - - # ## Payoff - - # With all this done, we can now begin to ask interesting questions - # of the data - # - # For example, let's see who the top 10 donors are. - - locale.setlocale(locale.LC_ALL, '') # for pretty printing numbers - - # Create a temporary table so each group and unmatched record has - # a unique id - - with read_con.cursor() as cur: - cur.execute("CREATE TEMPORARY TABLE e_map " - "AS SELECT COALESCE(canon_id, donor_id) AS canon_id, donor_id " - "FROM entity_map " - "RIGHT JOIN donors USING(donor_id)") - - cur.execute( - "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) AS name, " - "donation_totals.totals AS totals " - "FROM donors INNER JOIN " - "(SELECT canon_id, SUM(CAST(amount AS FLOAT)) AS totals " - " FROM contributions INNER JOIN e_map " - " USING (donor_id) " - " GROUP BY (canon_id) " - " ORDER BY totals " - " DESC LIMIT 10) " - "AS donation_totals ON donors.donor_id=donation_totals.canon_id " - "WHERE donors.donor_id = donation_totals.canon_id" - ) - - print("Top Donors (deduped)") - for row in cur: - row['totals'] = locale.currency(row['totals'], grouping=True) - print('%(totals)20s: %(name)s' % row) - - # Compare this to what we would have gotten if we hadn't done any - # deduplication - cur.execute( - "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) as name, " - "SUM(CAST(contributions.amount AS FLOAT)) AS totals " - "FROM donors INNER JOIN contributions " - "USING (donor_id) " - "GROUP BY (donor_id) " - "ORDER BY totals DESC " - "LIMIT 10" - ) - - print("Top Donors (raw)") - for row in cur: - row['totals'] = locale.currency(row['totals'], grouping=True) - print('%(totals)20s: %(name)s' % row) + # logging.info("indexing block_key") + # with write_con: + # with write_con.cursor() as cur: + # cur.execute("CREATE UNIQUE INDEX ON blocking_map " + # "(block_key text_pattern_ops, donor_id)") + + # # ## Clustering + + # with write_con: + # with write_con.cursor() as cur: + # cur.execute("DROP TABLE IF EXISTS entity_map") + + # print('creating entity_map database') + # cur.execute("CREATE TABLE entity_map " + # "(donor_id INTEGER, canon_id INTEGER, " + # " cluster_score FLOAT, PRIMARY KEY(donor_id))") + + # with read_con.cursor('pairs', cursor_factory=psycopg2.extensions.cursor) as read_cur: + # read_cur.execute(""" + # select a.donor_id, + # row_to_json((select d from (select a.city, + # a.name, + # a.zip, + # a.state, + # a.address) d)), + # b.donor_id, + # row_to_json((select d from (select b.city, + # b.name, + # b.zip, + # b.state, + # b.address) d)) + # from (select DISTINCT l.donor_id as east, r.donor_id as west + # from blocking_map as l + # INNER JOIN blocking_map as r + # using (block_key) + # where l.donor_id < r.donor_id) ids + # INNER JOIN processed_donors a on ids.east=a.donor_id + # INNER JOIN processed_donors b on ids.west=b.donor_id""") + + # print('clustering...') + # clustered_dupes = deduper.cluster(deduper.score(record_pairs(read_cur)), + # threshold=0.5) + + # # ## Writing out results + + # # We now have a sequence of tuples of donor ids that dedupe believes + # # all refer to the same entity. We write this out onto an entity map + # # table + + # print('writing results') + # with write_con: + # with write_con.cursor() as write_cur: + # write_cur.copy_expert('COPY entity_map FROM STDIN WITH CSV', + # Readable(cluster_ids(clustered_dupes)), + # size=10000) + + # with write_con: + # with write_con.cursor() as cur: + # cur.execute("CREATE INDEX head_index ON entity_map (canon_id)") + + # # Print out the number of duplicates found + + # # ## Payoff + + # # With all this done, we can now begin to ask interesting questions + # # of the data + # # + # # For example, let's see who the top 10 donors are. + + # locale.setlocale(locale.LC_ALL, '') # for pretty printing numbers + + # # Create a temporary table so each group and unmatched record has + # # a unique id + + # with read_con.cursor() as cur: + # cur.execute("CREATE TEMPORARY TABLE e_map " + # "AS SELECT COALESCE(canon_id, donor_id) AS canon_id, donor_id " + # "FROM entity_map " + # "RIGHT JOIN donors USING(donor_id)") + + # cur.execute( + # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) AS name, " + # "donation_totals.totals AS totals " + # "FROM donors INNER JOIN " + # "(SELECT canon_id, SUM(CAST(amount AS FLOAT)) AS totals " + # " FROM contributions INNER JOIN e_map " + # " USING (donor_id) " + # " GROUP BY (canon_id) " + # " ORDER BY totals " + # " DESC LIMIT 10) " + # "AS donation_totals ON donors.donor_id=donation_totals.canon_id " + # "WHERE donors.donor_id = donation_totals.canon_id" + # ) + + # print("Top Donors (deduped)") + # for row in cur: + # row['totals'] = locale.currency(row['totals'], grouping=True) + # print('%(totals)20s: %(name)s' % row) + + # # Compare this to what we would have gotten if we hadn't done any + # # deduplication + # cur.execute( + # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) as name, " + # "SUM(CAST(contributions.amount AS FLOAT)) AS totals " + # "FROM donors INNER JOIN contributions " + # "USING (donor_id) " + # "GROUP BY (donor_id) " + # "ORDER BY totals DESC " + # "LIMIT 10" + # ) + + # print("Top Donors (raw)") + # for row in cur: + # row['totals'] = locale.currency(row['totals'], grouping=True) + # print('%(totals)20s: %(name)s' % row) read_con.close() + write_con.commit() write_con.close() print('ran in', time.time() - start_time, 'seconds') diff --git a/pgsql_big_dedupe_example/pgsql_big_dedupe_example_old.py b/pgsql_big_dedupe_example/pgsql_big_dedupe_example_old.py new file mode 100755 index 00000000..3235573a --- /dev/null +++ b/pgsql_big_dedupe_example/pgsql_big_dedupe_example_old.py @@ -0,0 +1,461 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +This is an example of working with very large data. There are about +700,000 unduplicated donors in this database of Illinois political +campaign contributions. + +With such a large set of input data, we cannot store all the comparisons +we need to make in memory. Instead, we will read the pairs on demand +from the PostgresSQL database. + +__Note:__ You will need to run `python pgsql_big_dedupe_example_init_db.py` +before running this script. + +For smaller datasets (<10,000), see our +[csv_example](http://datamade.github.io/dedupe-examples/docs/csv_example.html) +""" +import os +import time +import logging +import optparse +import locale +import itertools +import io +import csv +import multiprocessing +import math + +import dj_database_url +import psycopg2 +import psycopg2.extras + +import dedupe +import numpy + + +from psycopg2.extensions import register_adapter, AsIs +register_adapter(numpy.int32, AsIs) +register_adapter(numpy.int64, AsIs) +register_adapter(numpy.float32, AsIs) +register_adapter(numpy.float64, AsIs) + + +class Readable(object): + + def __init__(self, iterator): + + self.output = io.StringIO( + # necesary for csv. See: https://docs.python.org/3/library/csv.html#id3 + newline='', + ) + self.writer = csv.writer( + self.output, + # csv.unix_dialect seems the right one + # based on our tests and Postgres CSV defaults: + # https://www.postgresql.org/docs/12/sql-copy.html + dialect=csv.unix_dialect) + self.iterator = iterator + + def read(self, size): + + self.writer.writerows(itertools.islice(self.iterator, size)) + + chunk = self.output.getvalue() + self.output.seek(0) + self.output.truncate(0) + + return chunk + + +def record_pairs(result_set): + + for i, row in enumerate(result_set): + a_record_id, a_record, b_record_id, b_record = row + record_a = (a_record_id, a_record) + record_b = (b_record_id, b_record) + + yield record_a, record_b + + if i % 10000 == 0: + print(i) + + +def cluster_ids(clustered_dupes): + + for cluster, scores in clustered_dupes: + cluster_id = cluster[0] + for donor_id, score in zip(cluster, scores): + yield donor_id, cluster_id, score + + +def parallel_fingerprinter(db_conf, deduper_fingerprinter, donor_partition_select, partition_offset, partition_end): + read_con = psycopg2.connect(database=db_conf['NAME'], + user=db_conf['USER'], + password=db_conf['PASSWORD'], + host=db_conf['HOST'], + cursor_factory=psycopg2.extras.RealDictCursor) + write_con = psycopg2.connect(database=db_conf['NAME'], + user=db_conf['USER'], + password=db_conf['PASSWORD'], + host=db_conf['HOST']) + + with read_con.cursor('donor_partition_select') as read_cur: + read_cur.execute(donor_partition_select, (partition_offset, partition_end)) + + partition_data = ((row['donor_id'], row) for row in read_cur) + b_data = deduper_fingerprinter(partition_data) + + with write_con: + with write_con.cursor() as write_cur: + write_cur.copy_expert('COPY blocking_map FROM STDIN WITH CSV', + Readable(b_data), + size=10000) + + +if __name__ == '__main__': + # ## Logging + + # Dedupe uses Python logging to show or suppress verbose output. Added + # for convenience. To enable verbose output, run `python + # pgsql_big_dedupe_example.py -v` + optp = optparse.OptionParser() + optp.add_option('-v', '--verbose', dest='verbose', action='count', + help='Increase verbosity (specify multiple times for more)' + ) + (opts, args) = optp.parse_args() + log_level = logging.WARNING + if opts.verbose: + if opts.verbose == 1: + log_level = logging.INFO + elif opts.verbose >= 2: + log_level = logging.DEBUG + logging.getLogger().setLevel(log_level) + + # ## Setup + settings_file = 'pgsql_big_dedupe_example_settings' + training_file = 'pgsql_big_dedupe_example_training.json' + num_cores = multiprocessing.cpu_count() + + start_time = time.time() + + # Set the database connection from environment variable using + # [dj_database_url](https://github.com/kennethreitz/dj-database-url) + # For example: + # export DATABASE_URL=postgres://user:password@host/mydatabase + db_conf = dj_database_url.config() + + if not db_conf: + raise Exception( + 'set DATABASE_URL environment variable with your connection, e.g. ' + 'export DATABASE_URL=postgres://user:password@host/mydatabase' + ) + + read_con = psycopg2.connect(database=db_conf['NAME'], + user=db_conf['USER'], + password=db_conf['PASSWORD'], + host=db_conf['HOST'], + cursor_factory=psycopg2.extras.RealDictCursor) + + write_con = psycopg2.connect(database=db_conf['NAME'], + user=db_conf['USER'], + password=db_conf['PASSWORD'], + host=db_conf['HOST']) + + # We'll be using variations on this following select statement to pull + # in campaign donor info. + # + # We did a fair amount of preprocessing of the fields in + # `pgsql_big_dedupe_example_init_db.py` + + DONOR_SELECT = "SELECT donor_id, city, name, zip, state, address " \ + "FROM processed_donors" + DONOR_PARTITION_SELECT = "SELECT donor_id, city, name, zip, state, address " \ + "FROM processed_donors " \ + "WHERE donor_id >= %s " \ + "AND donor_id < %s" + COUNT_SELECT = "SELECT COUNT(*) FROM processed_donors" + + # ## Training + + if os.path.exists(settings_file): + print('reading from ', settings_file) + with open(settings_file, 'rb') as sf: + deduper = dedupe.StaticDedupe(sf, num_cores=num_cores) + else: + + # Define the fields dedupe will pay attention to + # + # The address, city, and zip fields are often missing, so we'll + # tell dedupe that, and we'll learn a model that take that into + # account + fields = [{'field': 'name', 'type': 'String'}, + {'field': 'address', 'type': 'String', + 'has missing': True}, + {'field': 'city', 'type': 'ShortString', 'has missing': True}, + {'field': 'state', 'type': 'ShortString', 'has missing': True}, + {'field': 'zip', 'type': 'ShortString', 'has missing': True}, + ] + + # Create a new deduper object and pass our data model to it. + deduper = dedupe.Dedupe(fields, num_cores=num_cores) + + # Named cursor runs server side with psycopg2 + with read_con.cursor('donor_select') as cur: + cur.execute(DONOR_SELECT) + temp_d = {i: row for i, row in enumerate(cur)} + + # If we have training data saved from a previous run of dedupe, + # look for it an load it in. + # + # __Note:__ if you want to train from + # scratch, delete the training_file + if os.path.exists(training_file): + print('reading labeled examples from ', training_file) + with open(training_file) as tf: + deduper.prepare_training(temp_d, tf) + else: + deduper.prepare_training(temp_d) + + del temp_d + + # ## Active learning + + print('starting active labeling...') + # Starts the training loop. Dedupe will find the next pair of records + # it is least certain about and ask you to label them as duplicates + # or not. + + # use 'y', 'n' and 'u' keys to flag duplicates + # press 'f' when you are finished + dedupe.console_label(deduper) + # When finished, save our labeled, training pairs to disk + with open(training_file, 'w') as tf: + deduper.write_training(tf) + + # Notice our argument here + # + # `recall` is the proportion of true dupes pairs that the learned + # rules must cover. You may want to reduce this if your are making + # too many blocks and too many comparisons. + deduper.train(recall=0.90) + + with open(settings_file, 'wb') as sf: + deduper.write_settings(sf) + + # We can now remove some of the memory hogging objects we used + # for training + deduper.cleanup_training() + + # ## Blocking + print('blocking...') + + # To run blocking on such a large set of data, we create a separate table + # that contains blocking keys and record ids + print('creating blocking_map database') + with write_con: + with write_con.cursor() as cur: + cur.execute("DROP TABLE IF EXISTS blocking_map") + cur.execute("CREATE TABLE blocking_map " + "(block_key text, donor_id INTEGER)") + + # Compute `(block_key, donor_id)` tuples and write on `blocking_map` in parallel, + # but only for the non-index predicates. Only those can run in parallel + # because they don't need global predicate indexes. + # We cannot share predicate indexes across Python processes because it would consume + # too much RAM + print('computing blocking map in parallel (non-index predicates)') + + predicates_without_index = [] + predicates_with_index = [] + for full_predicate in deduper.fingerprinter.predicates: + if any(predicate in deduper.fingerprinter.index_predicates + for predicate in full_predicate): + predicates_with_index.append(full_predicate) + else: + predicates_without_index.append(full_predicate) + + # Use only predicates WITHOUT indexes for parallel blocking + deduper.fingerprinter.predicates = predicates_without_index + + # processed_donors `id` starts at 1 and goes up to `COUNT(*)` + with read_con.cursor('donor_select') as cur: + cur.execute(COUNT_SELECT) + total_rows = cur.fetchone()['count'] + partition_size = math.ceil(total_rows / num_cores) + partition_offsets = range(1, total_rows + 1, partition_size) + + # Use a multiprocessing.Pool to run parallel_fingerprinter in num_cores. + # Set context to 'spawn' to start new clean processes. + # 'spawn' is the defualt on Windows and macOS, but not on Unix + multiprocessing.set_start_method('spawn') + with multiprocessing.Pool(processes=num_cores) as pool: + block_file_path_list = pool.starmap( + parallel_fingerprinter, + [ + ( + db_conf, + deduper.fingerprinter, + DONOR_PARTITION_SELECT, + partition_offset, + (partition_offset + partition_size) + ) + for partition_offset in partition_offsets + ], + ) + + # If dedupe learned a Index Predicate, we have to take a pass + # through the data and create indices + if predicates_with_index: + print('creating inverted indexes') + + # Use only predicates WITH indexes for non-parallel blocking + deduper.fingerprinter.predicates = predicates_with_index + + for field in deduper.fingerprinter.index_fields: + with read_con.cursor('field_values') as cur: + cur.execute("SELECT DISTINCT %s FROM processed_donors" % field) + field_data = (row[field] for row in cur) + deduper.fingerprinter.index(field_data, field) + + # Compute `(block_key, donor_id)` tuples and write on `blocking_map` in serial + # for the index predicates + print('computing and writing blocking map (index predicates)') + + with read_con.cursor('donor_select') as read_cur: + read_cur.execute(DONOR_SELECT) + + full_data = ((row['donor_id'], row) for row in read_cur) + b_data = deduper.fingerprinter(full_data) + + with write_con: + with write_con.cursor() as write_cur: + write_cur.copy_expert('COPY blocking_map FROM STDIN WITH CSV', + Readable(b_data), + size=10000) + + # free up memory by removing indices + deduper.fingerprinter.reset_indices() + + # logging.info("indexing block_key") + # with write_con: + # with write_con.cursor() as cur: + # cur.execute("CREATE UNIQUE INDEX ON blocking_map " + # "(block_key text_pattern_ops, donor_id)") + + # # ## Clustering + + # with write_con: + # with write_con.cursor() as cur: + # cur.execute("DROP TABLE IF EXISTS entity_map") + + # print('creating entity_map database') + # cur.execute("CREATE TABLE entity_map " + # "(donor_id INTEGER, canon_id INTEGER, " + # " cluster_score FLOAT, PRIMARY KEY(donor_id))") + + # with read_con.cursor('pairs', cursor_factory=psycopg2.extensions.cursor) as read_cur: + # read_cur.execute(""" + # select a.donor_id, + # row_to_json((select d from (select a.city, + # a.name, + # a.zip, + # a.state, + # a.address) d)), + # b.donor_id, + # row_to_json((select d from (select b.city, + # b.name, + # b.zip, + # b.state, + # b.address) d)) + # from (select DISTINCT l.donor_id as east, r.donor_id as west + # from blocking_map as l + # INNER JOIN blocking_map as r + # using (block_key) + # where l.donor_id < r.donor_id) ids + # INNER JOIN processed_donors a on ids.east=a.donor_id + # INNER JOIN processed_donors b on ids.west=b.donor_id""") + + # print('clustering...') + # clustered_dupes = deduper.cluster(deduper.score(record_pairs(read_cur)), + # threshold=0.5) + + # # ## Writing out results + + # # We now have a sequence of tuples of donor ids that dedupe believes + # # all refer to the same entity. We write this out onto an entity map + # # table + + # print('writing results') + # with write_con: + # with write_con.cursor() as write_cur: + # write_cur.copy_expert('COPY entity_map FROM STDIN WITH CSV', + # Readable(cluster_ids(clustered_dupes)), + # size=10000) + + # with write_con: + # with write_con.cursor() as cur: + # cur.execute("CREATE INDEX head_index ON entity_map (canon_id)") + + # # Print out the number of duplicates found + + # # ## Payoff + + # # With all this done, we can now begin to ask interesting questions + # # of the data + # # + # # For example, let's see who the top 10 donors are. + + # locale.setlocale(locale.LC_ALL, '') # for pretty printing numbers + + # # Create a temporary table so each group and unmatched record has + # # a unique id + + # with read_con.cursor() as cur: + # cur.execute("CREATE TEMPORARY TABLE e_map " + # "AS SELECT COALESCE(canon_id, donor_id) AS canon_id, donor_id " + # "FROM entity_map " + # "RIGHT JOIN donors USING(donor_id)") + + # cur.execute( + # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) AS name, " + # "donation_totals.totals AS totals " + # "FROM donors INNER JOIN " + # "(SELECT canon_id, SUM(CAST(amount AS FLOAT)) AS totals " + # " FROM contributions INNER JOIN e_map " + # " USING (donor_id) " + # " GROUP BY (canon_id) " + # " ORDER BY totals " + # " DESC LIMIT 10) " + # "AS donation_totals ON donors.donor_id=donation_totals.canon_id " + # "WHERE donors.donor_id = donation_totals.canon_id" + # ) + + # print("Top Donors (deduped)") + # for row in cur: + # row['totals'] = locale.currency(row['totals'], grouping=True) + # print('%(totals)20s: %(name)s' % row) + + # # Compare this to what we would have gotten if we hadn't done any + # # deduplication + # cur.execute( + # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) as name, " + # "SUM(CAST(contributions.amount AS FLOAT)) AS totals " + # "FROM donors INNER JOIN contributions " + # "USING (donor_id) " + # "GROUP BY (donor_id) " + # "ORDER BY totals DESC " + # "LIMIT 10" + # ) + + # print("Top Donors (raw)") + # for row in cur: + # row['totals'] = locale.currency(row['totals'], grouping=True) + # print('%(totals)20s: %(name)s' % row) + + read_con.close() + write_con.commit() + write_con.close() + + print('ran in', time.time() - start_time, 'seconds') diff --git a/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py b/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py new file mode 100755 index 00000000..b27b409d --- /dev/null +++ b/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py @@ -0,0 +1,385 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +This is an example of working with very large data. There are about +700,000 unduplicated donors in this database of Illinois political +campaign contributions. + +With such a large set of input data, we cannot store all the comparisons +we need to make in memory. Instead, we will read the pairs on demand +from the PostgresSQL database. + +__Note:__ You will need to run `python pgsql_big_dedupe_example_init_db.py` +before running this script. + +For smaller datasets (<10,000), see our +[csv_example](http://datamade.github.io/dedupe-examples/docs/csv_example.html) +""" +import os +import time +import logging +import optparse +import locale +import itertools +import io +import csv +import tempfile + +import dj_database_url +import psycopg2 +import psycopg2.extras + +import dedupe +import numpy + + +from psycopg2.extensions import register_adapter, AsIs +register_adapter(numpy.int32, AsIs) +register_adapter(numpy.int64, AsIs) +register_adapter(numpy.float32, AsIs) +register_adapter(numpy.float64, AsIs) + + +class Readable(object): + + def __init__(self, iterator): + + self.output = io.StringIO() + self.writer = csv.writer(self.output) + self.iterator = iterator + + def read(self, size): + + self.writer.writerows(itertools.islice(self.iterator, size)) + + chunk = self.output.getvalue() + self.output.seek(0) + self.output.truncate(0) + + return chunk + + +def record_pairs(result_set): + + for i, row in enumerate(result_set): + a_record_id, a_record, b_record_id, b_record = row + record_a = (a_record_id, a_record) + record_b = (b_record_id, b_record) + + yield record_a, record_b + + if i % 10000 == 0: + print(i) + + +def cluster_ids(clustered_dupes): + + for cluster, scores in clustered_dupes: + cluster_id = cluster[0] + for donor_id, score in zip(cluster, scores): + yield donor_id, cluster_id, score + + +if __name__ == '__main__': + # ## Logging + + # Dedupe uses Python logging to show or suppress verbose output. Added + # for convenience. To enable verbose output, run `python + # pgsql_big_dedupe_example.py -v` + optp = optparse.OptionParser() + optp.add_option('-v', '--verbose', dest='verbose', action='count', + help='Increase verbosity (specify multiple times for more)' + ) + (opts, args) = optp.parse_args() + log_level = logging.WARNING + if opts.verbose: + if opts.verbose == 1: + log_level = logging.INFO + elif opts.verbose >= 2: + log_level = logging.DEBUG + logging.getLogger().setLevel(log_level) + + # ## Setup + settings_file = 'pgsql_big_dedupe_example_settings' + training_file = 'pgsql_big_dedupe_example_training.json' + + start_time = time.time() + + # Set the database connection from environment variable using + # [dj_database_url](https://github.com/kennethreitz/dj-database-url) + # For example: + # export DATABASE_URL=postgres://user:password@host/mydatabase + db_conf = dj_database_url.config() + + if not db_conf: + raise Exception( + 'set DATABASE_URL environment variable with your connection, e.g. ' + 'export DATABASE_URL=postgres://user:password@host/mydatabase' + ) + + read_con = psycopg2.connect(database=db_conf['NAME'], + user=db_conf['USER'], + password=db_conf['PASSWORD'], + host=db_conf['HOST'], + cursor_factory=psycopg2.extras.RealDictCursor) + + write_con = psycopg2.connect(database=db_conf['NAME'], + user=db_conf['USER'], + password=db_conf['PASSWORD'], + host=db_conf['HOST']) + + # We'll be using variations on this following select statement to pull + # in campaign donor info. + # + # We did a fair amount of preprocessing of the fields in + # `pgsql_big_dedupe_example_init_db.py` + + DONOR_SELECT = "SELECT donor_id, city, name, zip, state, address " \ + "from processed_donors" + + # ## Training + + if os.path.exists(settings_file): + print('reading from ', settings_file) + with open(settings_file, 'rb') as sf: + deduper = dedupe.StaticDedupe(sf, num_cores=4) + else: + + # Define the fields dedupe will pay attention to + # + # The address, city, and zip fields are often missing, so we'll + # tell dedupe that, and we'll learn a model that take that into + # account + fields = [{'field': 'name', 'type': 'String'}, + {'field': 'address', 'type': 'String', + 'has missing': True}, + {'field': 'city', 'type': 'ShortString', 'has missing': True}, + {'field': 'state', 'type': 'ShortString', 'has missing': True}, + {'field': 'zip', 'type': 'ShortString', 'has missing': True}, + ] + + # Create a new deduper object and pass our data model to it. + deduper = dedupe.Dedupe(fields, num_cores=4) + + # Named cursor runs server side with psycopg2 + with read_con.cursor('donor_select') as cur: + cur.execute(DONOR_SELECT) + temp_d = {i: row for i, row in enumerate(cur)} + + # If we have training data saved from a previous run of dedupe, + # look for it an load it in. + # + # __Note:__ if you want to train from + # scratch, delete the training_file + if os.path.exists(training_file): + print('reading labeled examples from ', training_file) + with open(training_file) as tf: + deduper.prepare_training(temp_d, tf) + else: + deduper.prepare_training(temp_d) + + del temp_d + + # ## Active learning + + print('starting active labeling...') + # Starts the training loop. Dedupe will find the next pair of records + # it is least certain about and ask you to label them as duplicates + # or not. + + # use 'y', 'n' and 'u' keys to flag duplicates + # press 'f' when you are finished + dedupe.console_label(deduper) + # When finished, save our labeled, training pairs to disk + with open(training_file, 'w') as tf: + deduper.write_training(tf) + + # Notice our argument here + # + # `recall` is the proportion of true dupes pairs that the learned + # rules must cover. You may want to reduce this if your are making + # too many blocks and too many comparisons. + deduper.train(recall=0.90) + + with open(settings_file, 'wb') as sf: + deduper.write_settings(sf) + + # We can now remove some of the memory hogging objects we used + # for training + deduper.cleanup_training() + + # ## Blocking + print('blocking...') + + # To run blocking on such a large set of data, we create a separate table + # that contains blocking keys and record ids + print('creating blocking_map_serial database') + with write_con: + with write_con.cursor() as cur: + cur.execute("DROP TABLE IF EXISTS blocking_map_serial") + cur.execute("CREATE TABLE blocking_map_serial " + "(block_key text, donor_id INTEGER)") + + # If dedupe learned a Index Predicate, we have to take a pass + # through the data and create indices. + print('creating inverted index') + + for field in deduper.fingerprinter.index_fields: + with read_con.cursor('field_values') as cur: + cur.execute("SELECT DISTINCT %s FROM processed_donors" % field) + field_data = (row[field] for row in cur) + deduper.fingerprinter.index(field_data, field) + + # Now we are ready to write our blocking map table by creating a + # generator that yields unique `(block_key, donor_id)` tuples. + print('writing blocking map') + + with read_con.cursor('donor_select') as read_cur: + read_cur.execute(DONOR_SELECT) + + full_data = ((row['donor_id'], row) for row in read_cur) + b_data = deduper.fingerprinter(full_data) + + with write_con: + with write_con.cursor() as write_cur: + with tempfile.NamedTemporaryFile( + prefix="blocking_map_", + mode="w+", + newline="", + ) as block_file: + csv_writer = csv.writer(block_file, dialect=csv.unix_dialect) + csv_writer.writerows(b_data) + block_file.seek(0) + write_cur.copy_expert("COPY blocking_map FROM STDIN CSV", block_file, size=10000) + + # with write_con: + # with write_con.cursor() as write_cur: + # write_cur.copy_expert('COPY blocking_map_serial FROM STDIN WITH CSV', + # Readable(b_data), + # size=10000) + + # free up memory by removing indices + deduper.fingerprinter.reset_indices() + + # logging.info("indexing block_key") + # with write_con: + # with write_con.cursor() as cur: + # cur.execute("CREATE UNIQUE INDEX ON blocking_map_serial " + # "(block_key text_pattern_ops, donor_id)") + + # # ## Clustering + + # with write_con: + # with write_con.cursor() as cur: + # cur.execute("DROP TABLE IF EXISTS entity_map") + + # print('creating entity_map database') + # cur.execute("CREATE TABLE entity_map " + # "(donor_id INTEGER, canon_id INTEGER, " + # " cluster_score FLOAT, PRIMARY KEY(donor_id))") + + # with read_con.cursor('pairs', cursor_factory=psycopg2.extensions.cursor) as read_cur: + # read_cur.execute(""" + # select a.donor_id, + # row_to_json((select d from (select a.city, + # a.name, + # a.zip, + # a.state, + # a.address) d)), + # b.donor_id, + # row_to_json((select d from (select b.city, + # b.name, + # b.zip, + # b.state, + # b.address) d)) + # from (select DISTINCT l.donor_id as east, r.donor_id as west + # from blocking_map_serial as l + # INNER JOIN blocking_map_serial as r + # using (block_key) + # where l.donor_id < r.donor_id) ids + # INNER JOIN processed_donors a on ids.east=a.donor_id + # INNER JOIN processed_donors b on ids.west=b.donor_id""") + + # print('clustering...') + # clustered_dupes = deduper.cluster(deduper.score(record_pairs(read_cur)), + # threshold=0.5) + + # # ## Writing out results + + # # We now have a sequence of tuples of donor ids that dedupe believes + # # all refer to the same entity. We write this out onto an entity map + # # table + + # print('writing results') + # with write_con: + # with write_con.cursor() as write_cur: + # write_cur.copy_expert('COPY entity_map FROM STDIN WITH CSV', + # Readable(cluster_ids(clustered_dupes)), + # size=10000) + + # with write_con: + # with write_con.cursor() as cur: + # cur.execute("CREATE INDEX head_index ON entity_map (canon_id)") + + # # Print out the number of duplicates found + + # # ## Payoff + + # # With all this done, we can now begin to ask interesting questions + # # of the data + # # + # # For example, let's see who the top 10 donors are. + + # locale.setlocale(locale.LC_ALL, '') # for pretty printing numbers + + # # Create a temporary table so each group and unmatched record has + # # a unique id + + # with read_con.cursor() as cur: + # cur.execute("CREATE TEMPORARY TABLE e_map " + # "AS SELECT COALESCE(canon_id, donor_id) AS canon_id, donor_id " + # "FROM entity_map " + # "RIGHT JOIN donors USING(donor_id)") + + # cur.execute( + # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) AS name, " + # "donation_totals.totals AS totals " + # "FROM donors INNER JOIN " + # "(SELECT canon_id, SUM(CAST(amount AS FLOAT)) AS totals " + # " FROM contributions INNER JOIN e_map " + # " USING (donor_id) " + # " GROUP BY (canon_id) " + # " ORDER BY totals " + # " DESC LIMIT 10) " + # "AS donation_totals ON donors.donor_id=donation_totals.canon_id " + # "WHERE donors.donor_id = donation_totals.canon_id" + # ) + + # print("Top Donors (deduped)") + # for row in cur: + # row['totals'] = locale.currency(row['totals'], grouping=True) + # print('%(totals)20s: %(name)s' % row) + + # # Compare this to what we would have gotten if we hadn't done any + # # deduplication + # cur.execute( + # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) as name, " + # "SUM(CAST(contributions.amount AS FLOAT)) AS totals " + # "FROM donors INNER JOIN contributions " + # "USING (donor_id) " + # "GROUP BY (donor_id) " + # "ORDER BY totals DESC " + # "LIMIT 10" + # ) + + # print("Top Donors (raw)") + # for row in cur: + # row['totals'] = locale.currency(row['totals'], grouping=True) + # print('%(totals)20s: %(name)s' % row) + + read_con.close() + write_con.commit() + write_con.close() + + print('ran in', time.time() - start_time, 'seconds') diff --git a/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial_old.py b/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial_old.py new file mode 100755 index 00000000..1629540f --- /dev/null +++ b/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial_old.py @@ -0,0 +1,373 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +This is an example of working with very large data. There are about +700,000 unduplicated donors in this database of Illinois political +campaign contributions. + +With such a large set of input data, we cannot store all the comparisons +we need to make in memory. Instead, we will read the pairs on demand +from the PostgresSQL database. + +__Note:__ You will need to run `python pgsql_big_dedupe_example_init_db.py` +before running this script. + +For smaller datasets (<10,000), see our +[csv_example](http://datamade.github.io/dedupe-examples/docs/csv_example.html) +""" +import os +import time +import logging +import optparse +import locale +import itertools +import io +import csv +import tempfile + +import dj_database_url +import psycopg2 +import psycopg2.extras + +import dedupe +import numpy + + +from psycopg2.extensions import register_adapter, AsIs +register_adapter(numpy.int32, AsIs) +register_adapter(numpy.int64, AsIs) +register_adapter(numpy.float32, AsIs) +register_adapter(numpy.float64, AsIs) + + +class Readable(object): + + def __init__(self, iterator): + + self.output = io.StringIO() + self.writer = csv.writer(self.output) + self.iterator = iterator + + def read(self, size): + + self.writer.writerows(itertools.islice(self.iterator, size)) + + chunk = self.output.getvalue() + self.output.seek(0) + self.output.truncate(0) + + return chunk + + +def record_pairs(result_set): + + for i, row in enumerate(result_set): + a_record_id, a_record, b_record_id, b_record = row + record_a = (a_record_id, a_record) + record_b = (b_record_id, b_record) + + yield record_a, record_b + + if i % 10000 == 0: + print(i) + + +def cluster_ids(clustered_dupes): + + for cluster, scores in clustered_dupes: + cluster_id = cluster[0] + for donor_id, score in zip(cluster, scores): + yield donor_id, cluster_id, score + + +if __name__ == '__main__': + # ## Logging + + # Dedupe uses Python logging to show or suppress verbose output. Added + # for convenience. To enable verbose output, run `python + # pgsql_big_dedupe_example.py -v` + optp = optparse.OptionParser() + optp.add_option('-v', '--verbose', dest='verbose', action='count', + help='Increase verbosity (specify multiple times for more)' + ) + (opts, args) = optp.parse_args() + log_level = logging.WARNING + if opts.verbose: + if opts.verbose == 1: + log_level = logging.INFO + elif opts.verbose >= 2: + log_level = logging.DEBUG + logging.getLogger().setLevel(log_level) + + # ## Setup + settings_file = 'pgsql_big_dedupe_example_settings' + training_file = 'pgsql_big_dedupe_example_training.json' + + start_time = time.time() + + # Set the database connection from environment variable using + # [dj_database_url](https://github.com/kennethreitz/dj-database-url) + # For example: + # export DATABASE_URL=postgres://user:password@host/mydatabase + db_conf = dj_database_url.config() + + if not db_conf: + raise Exception( + 'set DATABASE_URL environment variable with your connection, e.g. ' + 'export DATABASE_URL=postgres://user:password@host/mydatabase' + ) + + read_con = psycopg2.connect(database=db_conf['NAME'], + user=db_conf['USER'], + password=db_conf['PASSWORD'], + host=db_conf['HOST'], + cursor_factory=psycopg2.extras.RealDictCursor) + + write_con = psycopg2.connect(database=db_conf['NAME'], + user=db_conf['USER'], + password=db_conf['PASSWORD'], + host=db_conf['HOST']) + + # We'll be using variations on this following select statement to pull + # in campaign donor info. + # + # We did a fair amount of preprocessing of the fields in + # `pgsql_big_dedupe_example_init_db.py` + + DONOR_SELECT = "SELECT donor_id, city, name, zip, state, address " \ + "from processed_donors" + + # ## Training + + if os.path.exists(settings_file): + print('reading from ', settings_file) + with open(settings_file, 'rb') as sf: + deduper = dedupe.StaticDedupe(sf, num_cores=4) + else: + + # Define the fields dedupe will pay attention to + # + # The address, city, and zip fields are often missing, so we'll + # tell dedupe that, and we'll learn a model that take that into + # account + fields = [{'field': 'name', 'type': 'String'}, + {'field': 'address', 'type': 'String', + 'has missing': True}, + {'field': 'city', 'type': 'ShortString', 'has missing': True}, + {'field': 'state', 'type': 'ShortString', 'has missing': True}, + {'field': 'zip', 'type': 'ShortString', 'has missing': True}, + ] + + # Create a new deduper object and pass our data model to it. + deduper = dedupe.Dedupe(fields, num_cores=4) + + # Named cursor runs server side with psycopg2 + with read_con.cursor('donor_select') as cur: + cur.execute(DONOR_SELECT) + temp_d = {i: row for i, row in enumerate(cur)} + + # If we have training data saved from a previous run of dedupe, + # look for it an load it in. + # + # __Note:__ if you want to train from + # scratch, delete the training_file + if os.path.exists(training_file): + print('reading labeled examples from ', training_file) + with open(training_file) as tf: + deduper.prepare_training(temp_d, tf) + else: + deduper.prepare_training(temp_d) + + del temp_d + + # ## Active learning + + print('starting active labeling...') + # Starts the training loop. Dedupe will find the next pair of records + # it is least certain about and ask you to label them as duplicates + # or not. + + # use 'y', 'n' and 'u' keys to flag duplicates + # press 'f' when you are finished + dedupe.console_label(deduper) + # When finished, save our labeled, training pairs to disk + with open(training_file, 'w') as tf: + deduper.write_training(tf) + + # Notice our argument here + # + # `recall` is the proportion of true dupes pairs that the learned + # rules must cover. You may want to reduce this if your are making + # too many blocks and too many comparisons. + deduper.train(recall=0.90) + + with open(settings_file, 'wb') as sf: + deduper.write_settings(sf) + + # We can now remove some of the memory hogging objects we used + # for training + deduper.cleanup_training() + + # ## Blocking + print('blocking...') + + # To run blocking on such a large set of data, we create a separate table + # that contains blocking keys and record ids + print('creating blocking_map_serial database') + with write_con: + with write_con.cursor() as cur: + cur.execute("DROP TABLE IF EXISTS blocking_map_serial") + cur.execute("CREATE TABLE blocking_map_serial " + "(block_key text, donor_id INTEGER)") + + # If dedupe learned a Index Predicate, we have to take a pass + # through the data and create indices. + print('creating inverted index') + + for field in deduper.fingerprinter.index_fields: + with read_con.cursor('field_values') as cur: + cur.execute("SELECT DISTINCT %s FROM processed_donors" % field) + field_data = (row[field] for row in cur) + deduper.fingerprinter.index(field_data, field) + + # Now we are ready to write our blocking map table by creating a + # generator that yields unique `(block_key, donor_id)` tuples. + print('writing blocking map') + + with read_con.cursor('donor_select') as read_cur: + read_cur.execute(DONOR_SELECT) + + full_data = ((row['donor_id'], row) for row in read_cur) + b_data = deduper.fingerprinter(full_data) + + with write_con: + with write_con.cursor() as write_cur: + write_cur.copy_expert('COPY blocking_map_serial FROM STDIN WITH CSV', + Readable(b_data), + size=10000) + + # free up memory by removing indices + deduper.fingerprinter.reset_indices() + + # logging.info("indexing block_key") + # with write_con: + # with write_con.cursor() as cur: + # cur.execute("CREATE UNIQUE INDEX ON blocking_map_serial " + # "(block_key text_pattern_ops, donor_id)") + + # # ## Clustering + + # with write_con: + # with write_con.cursor() as cur: + # cur.execute("DROP TABLE IF EXISTS entity_map") + + # print('creating entity_map database') + # cur.execute("CREATE TABLE entity_map " + # "(donor_id INTEGER, canon_id INTEGER, " + # " cluster_score FLOAT, PRIMARY KEY(donor_id))") + + # with read_con.cursor('pairs', cursor_factory=psycopg2.extensions.cursor) as read_cur: + # read_cur.execute(""" + # select a.donor_id, + # row_to_json((select d from (select a.city, + # a.name, + # a.zip, + # a.state, + # a.address) d)), + # b.donor_id, + # row_to_json((select d from (select b.city, + # b.name, + # b.zip, + # b.state, + # b.address) d)) + # from (select DISTINCT l.donor_id as east, r.donor_id as west + # from blocking_map_serial as l + # INNER JOIN blocking_map_serial as r + # using (block_key) + # where l.donor_id < r.donor_id) ids + # INNER JOIN processed_donors a on ids.east=a.donor_id + # INNER JOIN processed_donors b on ids.west=b.donor_id""") + + # print('clustering...') + # clustered_dupes = deduper.cluster(deduper.score(record_pairs(read_cur)), + # threshold=0.5) + + # # ## Writing out results + + # # We now have a sequence of tuples of donor ids that dedupe believes + # # all refer to the same entity. We write this out onto an entity map + # # table + + # print('writing results') + # with write_con: + # with write_con.cursor() as write_cur: + # write_cur.copy_expert('COPY entity_map FROM STDIN WITH CSV', + # Readable(cluster_ids(clustered_dupes)), + # size=10000) + + # with write_con: + # with write_con.cursor() as cur: + # cur.execute("CREATE INDEX head_index ON entity_map (canon_id)") + + # # Print out the number of duplicates found + + # # ## Payoff + + # # With all this done, we can now begin to ask interesting questions + # # of the data + # # + # # For example, let's see who the top 10 donors are. + + # locale.setlocale(locale.LC_ALL, '') # for pretty printing numbers + + # # Create a temporary table so each group and unmatched record has + # # a unique id + + # with read_con.cursor() as cur: + # cur.execute("CREATE TEMPORARY TABLE e_map " + # "AS SELECT COALESCE(canon_id, donor_id) AS canon_id, donor_id " + # "FROM entity_map " + # "RIGHT JOIN donors USING(donor_id)") + + # cur.execute( + # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) AS name, " + # "donation_totals.totals AS totals " + # "FROM donors INNER JOIN " + # "(SELECT canon_id, SUM(CAST(amount AS FLOAT)) AS totals " + # " FROM contributions INNER JOIN e_map " + # " USING (donor_id) " + # " GROUP BY (canon_id) " + # " ORDER BY totals " + # " DESC LIMIT 10) " + # "AS donation_totals ON donors.donor_id=donation_totals.canon_id " + # "WHERE donors.donor_id = donation_totals.canon_id" + # ) + + # print("Top Donors (deduped)") + # for row in cur: + # row['totals'] = locale.currency(row['totals'], grouping=True) + # print('%(totals)20s: %(name)s' % row) + + # # Compare this to what we would have gotten if we hadn't done any + # # deduplication + # cur.execute( + # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) as name, " + # "SUM(CAST(contributions.amount AS FLOAT)) AS totals " + # "FROM donors INNER JOIN contributions " + # "USING (donor_id) " + # "GROUP BY (donor_id) " + # "ORDER BY totals DESC " + # "LIMIT 10" + # ) + + # print("Top Donors (raw)") + # for row in cur: + # row['totals'] = locale.currency(row['totals'], grouping=True) + # print('%(totals)20s: %(name)s' % row) + + read_con.close() + write_con.commit() + write_con.close() + + print('ran in', time.time() - start_time, 'seconds') diff --git a/pgsql_big_dedupe_example/test_parallel_vs_serial.sh b/pgsql_big_dedupe_example/test_parallel_vs_serial.sh new file mode 100755 index 00000000..f0ee4367 --- /dev/null +++ b/pgsql_big_dedupe_example/test_parallel_vs_serial.sh @@ -0,0 +1,11 @@ +#!/bin/bash +set -e # exit immediately + +echo "Parallel:" +time (python pgsql_big_dedupe_example.py > /dev/null 2>&1) +echo "Parallel old:" +time (python pgsql_big_dedupe_example_old.py > /dev/null 2>&1) +echo "Serial:" +time (python pgsql_big_dedupe_example_serial.py > /dev/null 2>&1) +echo "Serial old:" +time (python pgsql_big_dedupe_example_serial_old.py > /dev/null 2>&1) From d9e5365f13a001054fda922fde0d8dbb81980671 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fl=C3=A1vio=20Juvenal?= Date: Fri, 18 Sep 2020 17:46:13 -0300 Subject: [PATCH 3/6] Faster parallel pgsql_big_dedupe_example.py --- .../pgsql_big_dedupe_example.py | 29 +- .../pgsql_big_dedupe_example_serial.py | 18 +- .../pgsql_big_dedupe_example_serial_old.py | 373 ------------------ .../test_parallel_vs_serial.sh | 4 - 4 files changed, 12 insertions(+), 412 deletions(-) delete mode 100755 pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial_old.py diff --git a/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py b/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py index 59f759dd..3235573a 100755 --- a/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py +++ b/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py @@ -26,7 +26,6 @@ import csv import multiprocessing import math -import tempfile import dj_database_url import psycopg2 @@ -97,6 +96,10 @@ def parallel_fingerprinter(db_conf, deduper_fingerprinter, donor_partition_selec password=db_conf['PASSWORD'], host=db_conf['HOST'], cursor_factory=psycopg2.extras.RealDictCursor) + write_con = psycopg2.connect(database=db_conf['NAME'], + user=db_conf['USER'], + password=db_conf['PASSWORD'], + host=db_conf['HOST']) with read_con.cursor('donor_partition_select') as read_cur: read_cur.execute(donor_partition_select, (partition_offset, partition_end)) @@ -104,15 +107,11 @@ def parallel_fingerprinter(db_conf, deduper_fingerprinter, donor_partition_selec partition_data = ((row['donor_id'], row) for row in read_cur) b_data = deduper_fingerprinter(partition_data) - with tempfile.NamedTemporaryFile( - prefix="pid_%d_blocking_map_" % os.getpid(), - delete=False, - mode="w", - newline="", - ) as block_file: - csv_writer = csv.writer(block_file, dialect=csv.unix_dialect) - csv_writer.writerows(b_data) - return block_file.name + with write_con: + with write_con.cursor() as write_cur: + write_cur.copy_expert('COPY blocking_map FROM STDIN WITH CSV', + Readable(b_data), + size=10000) if __name__ == '__main__': @@ -306,16 +305,6 @@ def parallel_fingerprinter(db_conf, deduper_fingerprinter, donor_partition_selec ], ) - # parallel_fingerprinter output are CSV files that we'll now copy into `blocking_map` - print('writing blocking map (non-index predicates)') - - for blocking_file_path in block_file_path_list: - with write_con.cursor() as write_cur: - with open(blocking_file_path, "r", newline="") as f: - logging.info("Appending to blocking_map from %s" % blocking_file_path) - write_cur.copy_expert("COPY blocking_map FROM STDIN CSV", f, size=10000) - os.remove(blocking_file_path) - # If dedupe learned a Index Predicate, we have to take a pass # through the data and create indices if predicates_with_index: diff --git a/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py b/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py index b27b409d..1629540f 100755 --- a/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py +++ b/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py @@ -243,21 +243,9 @@ def cluster_ids(clustered_dupes): with write_con: with write_con.cursor() as write_cur: - with tempfile.NamedTemporaryFile( - prefix="blocking_map_", - mode="w+", - newline="", - ) as block_file: - csv_writer = csv.writer(block_file, dialect=csv.unix_dialect) - csv_writer.writerows(b_data) - block_file.seek(0) - write_cur.copy_expert("COPY blocking_map FROM STDIN CSV", block_file, size=10000) - - # with write_con: - # with write_con.cursor() as write_cur: - # write_cur.copy_expert('COPY blocking_map_serial FROM STDIN WITH CSV', - # Readable(b_data), - # size=10000) + write_cur.copy_expert('COPY blocking_map_serial FROM STDIN WITH CSV', + Readable(b_data), + size=10000) # free up memory by removing indices deduper.fingerprinter.reset_indices() diff --git a/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial_old.py b/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial_old.py deleted file mode 100755 index 1629540f..00000000 --- a/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial_old.py +++ /dev/null @@ -1,373 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" -This is an example of working with very large data. There are about -700,000 unduplicated donors in this database of Illinois political -campaign contributions. - -With such a large set of input data, we cannot store all the comparisons -we need to make in memory. Instead, we will read the pairs on demand -from the PostgresSQL database. - -__Note:__ You will need to run `python pgsql_big_dedupe_example_init_db.py` -before running this script. - -For smaller datasets (<10,000), see our -[csv_example](http://datamade.github.io/dedupe-examples/docs/csv_example.html) -""" -import os -import time -import logging -import optparse -import locale -import itertools -import io -import csv -import tempfile - -import dj_database_url -import psycopg2 -import psycopg2.extras - -import dedupe -import numpy - - -from psycopg2.extensions import register_adapter, AsIs -register_adapter(numpy.int32, AsIs) -register_adapter(numpy.int64, AsIs) -register_adapter(numpy.float32, AsIs) -register_adapter(numpy.float64, AsIs) - - -class Readable(object): - - def __init__(self, iterator): - - self.output = io.StringIO() - self.writer = csv.writer(self.output) - self.iterator = iterator - - def read(self, size): - - self.writer.writerows(itertools.islice(self.iterator, size)) - - chunk = self.output.getvalue() - self.output.seek(0) - self.output.truncate(0) - - return chunk - - -def record_pairs(result_set): - - for i, row in enumerate(result_set): - a_record_id, a_record, b_record_id, b_record = row - record_a = (a_record_id, a_record) - record_b = (b_record_id, b_record) - - yield record_a, record_b - - if i % 10000 == 0: - print(i) - - -def cluster_ids(clustered_dupes): - - for cluster, scores in clustered_dupes: - cluster_id = cluster[0] - for donor_id, score in zip(cluster, scores): - yield donor_id, cluster_id, score - - -if __name__ == '__main__': - # ## Logging - - # Dedupe uses Python logging to show or suppress verbose output. Added - # for convenience. To enable verbose output, run `python - # pgsql_big_dedupe_example.py -v` - optp = optparse.OptionParser() - optp.add_option('-v', '--verbose', dest='verbose', action='count', - help='Increase verbosity (specify multiple times for more)' - ) - (opts, args) = optp.parse_args() - log_level = logging.WARNING - if opts.verbose: - if opts.verbose == 1: - log_level = logging.INFO - elif opts.verbose >= 2: - log_level = logging.DEBUG - logging.getLogger().setLevel(log_level) - - # ## Setup - settings_file = 'pgsql_big_dedupe_example_settings' - training_file = 'pgsql_big_dedupe_example_training.json' - - start_time = time.time() - - # Set the database connection from environment variable using - # [dj_database_url](https://github.com/kennethreitz/dj-database-url) - # For example: - # export DATABASE_URL=postgres://user:password@host/mydatabase - db_conf = dj_database_url.config() - - if not db_conf: - raise Exception( - 'set DATABASE_URL environment variable with your connection, e.g. ' - 'export DATABASE_URL=postgres://user:password@host/mydatabase' - ) - - read_con = psycopg2.connect(database=db_conf['NAME'], - user=db_conf['USER'], - password=db_conf['PASSWORD'], - host=db_conf['HOST'], - cursor_factory=psycopg2.extras.RealDictCursor) - - write_con = psycopg2.connect(database=db_conf['NAME'], - user=db_conf['USER'], - password=db_conf['PASSWORD'], - host=db_conf['HOST']) - - # We'll be using variations on this following select statement to pull - # in campaign donor info. - # - # We did a fair amount of preprocessing of the fields in - # `pgsql_big_dedupe_example_init_db.py` - - DONOR_SELECT = "SELECT donor_id, city, name, zip, state, address " \ - "from processed_donors" - - # ## Training - - if os.path.exists(settings_file): - print('reading from ', settings_file) - with open(settings_file, 'rb') as sf: - deduper = dedupe.StaticDedupe(sf, num_cores=4) - else: - - # Define the fields dedupe will pay attention to - # - # The address, city, and zip fields are often missing, so we'll - # tell dedupe that, and we'll learn a model that take that into - # account - fields = [{'field': 'name', 'type': 'String'}, - {'field': 'address', 'type': 'String', - 'has missing': True}, - {'field': 'city', 'type': 'ShortString', 'has missing': True}, - {'field': 'state', 'type': 'ShortString', 'has missing': True}, - {'field': 'zip', 'type': 'ShortString', 'has missing': True}, - ] - - # Create a new deduper object and pass our data model to it. - deduper = dedupe.Dedupe(fields, num_cores=4) - - # Named cursor runs server side with psycopg2 - with read_con.cursor('donor_select') as cur: - cur.execute(DONOR_SELECT) - temp_d = {i: row for i, row in enumerate(cur)} - - # If we have training data saved from a previous run of dedupe, - # look for it an load it in. - # - # __Note:__ if you want to train from - # scratch, delete the training_file - if os.path.exists(training_file): - print('reading labeled examples from ', training_file) - with open(training_file) as tf: - deduper.prepare_training(temp_d, tf) - else: - deduper.prepare_training(temp_d) - - del temp_d - - # ## Active learning - - print('starting active labeling...') - # Starts the training loop. Dedupe will find the next pair of records - # it is least certain about and ask you to label them as duplicates - # or not. - - # use 'y', 'n' and 'u' keys to flag duplicates - # press 'f' when you are finished - dedupe.console_label(deduper) - # When finished, save our labeled, training pairs to disk - with open(training_file, 'w') as tf: - deduper.write_training(tf) - - # Notice our argument here - # - # `recall` is the proportion of true dupes pairs that the learned - # rules must cover. You may want to reduce this if your are making - # too many blocks and too many comparisons. - deduper.train(recall=0.90) - - with open(settings_file, 'wb') as sf: - deduper.write_settings(sf) - - # We can now remove some of the memory hogging objects we used - # for training - deduper.cleanup_training() - - # ## Blocking - print('blocking...') - - # To run blocking on such a large set of data, we create a separate table - # that contains blocking keys and record ids - print('creating blocking_map_serial database') - with write_con: - with write_con.cursor() as cur: - cur.execute("DROP TABLE IF EXISTS blocking_map_serial") - cur.execute("CREATE TABLE blocking_map_serial " - "(block_key text, donor_id INTEGER)") - - # If dedupe learned a Index Predicate, we have to take a pass - # through the data and create indices. - print('creating inverted index') - - for field in deduper.fingerprinter.index_fields: - with read_con.cursor('field_values') as cur: - cur.execute("SELECT DISTINCT %s FROM processed_donors" % field) - field_data = (row[field] for row in cur) - deduper.fingerprinter.index(field_data, field) - - # Now we are ready to write our blocking map table by creating a - # generator that yields unique `(block_key, donor_id)` tuples. - print('writing blocking map') - - with read_con.cursor('donor_select') as read_cur: - read_cur.execute(DONOR_SELECT) - - full_data = ((row['donor_id'], row) for row in read_cur) - b_data = deduper.fingerprinter(full_data) - - with write_con: - with write_con.cursor() as write_cur: - write_cur.copy_expert('COPY blocking_map_serial FROM STDIN WITH CSV', - Readable(b_data), - size=10000) - - # free up memory by removing indices - deduper.fingerprinter.reset_indices() - - # logging.info("indexing block_key") - # with write_con: - # with write_con.cursor() as cur: - # cur.execute("CREATE UNIQUE INDEX ON blocking_map_serial " - # "(block_key text_pattern_ops, donor_id)") - - # # ## Clustering - - # with write_con: - # with write_con.cursor() as cur: - # cur.execute("DROP TABLE IF EXISTS entity_map") - - # print('creating entity_map database') - # cur.execute("CREATE TABLE entity_map " - # "(donor_id INTEGER, canon_id INTEGER, " - # " cluster_score FLOAT, PRIMARY KEY(donor_id))") - - # with read_con.cursor('pairs', cursor_factory=psycopg2.extensions.cursor) as read_cur: - # read_cur.execute(""" - # select a.donor_id, - # row_to_json((select d from (select a.city, - # a.name, - # a.zip, - # a.state, - # a.address) d)), - # b.donor_id, - # row_to_json((select d from (select b.city, - # b.name, - # b.zip, - # b.state, - # b.address) d)) - # from (select DISTINCT l.donor_id as east, r.donor_id as west - # from blocking_map_serial as l - # INNER JOIN blocking_map_serial as r - # using (block_key) - # where l.donor_id < r.donor_id) ids - # INNER JOIN processed_donors a on ids.east=a.donor_id - # INNER JOIN processed_donors b on ids.west=b.donor_id""") - - # print('clustering...') - # clustered_dupes = deduper.cluster(deduper.score(record_pairs(read_cur)), - # threshold=0.5) - - # # ## Writing out results - - # # We now have a sequence of tuples of donor ids that dedupe believes - # # all refer to the same entity. We write this out onto an entity map - # # table - - # print('writing results') - # with write_con: - # with write_con.cursor() as write_cur: - # write_cur.copy_expert('COPY entity_map FROM STDIN WITH CSV', - # Readable(cluster_ids(clustered_dupes)), - # size=10000) - - # with write_con: - # with write_con.cursor() as cur: - # cur.execute("CREATE INDEX head_index ON entity_map (canon_id)") - - # # Print out the number of duplicates found - - # # ## Payoff - - # # With all this done, we can now begin to ask interesting questions - # # of the data - # # - # # For example, let's see who the top 10 donors are. - - # locale.setlocale(locale.LC_ALL, '') # for pretty printing numbers - - # # Create a temporary table so each group and unmatched record has - # # a unique id - - # with read_con.cursor() as cur: - # cur.execute("CREATE TEMPORARY TABLE e_map " - # "AS SELECT COALESCE(canon_id, donor_id) AS canon_id, donor_id " - # "FROM entity_map " - # "RIGHT JOIN donors USING(donor_id)") - - # cur.execute( - # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) AS name, " - # "donation_totals.totals AS totals " - # "FROM donors INNER JOIN " - # "(SELECT canon_id, SUM(CAST(amount AS FLOAT)) AS totals " - # " FROM contributions INNER JOIN e_map " - # " USING (donor_id) " - # " GROUP BY (canon_id) " - # " ORDER BY totals " - # " DESC LIMIT 10) " - # "AS donation_totals ON donors.donor_id=donation_totals.canon_id " - # "WHERE donors.donor_id = donation_totals.canon_id" - # ) - - # print("Top Donors (deduped)") - # for row in cur: - # row['totals'] = locale.currency(row['totals'], grouping=True) - # print('%(totals)20s: %(name)s' % row) - - # # Compare this to what we would have gotten if we hadn't done any - # # deduplication - # cur.execute( - # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) as name, " - # "SUM(CAST(contributions.amount AS FLOAT)) AS totals " - # "FROM donors INNER JOIN contributions " - # "USING (donor_id) " - # "GROUP BY (donor_id) " - # "ORDER BY totals DESC " - # "LIMIT 10" - # ) - - # print("Top Donors (raw)") - # for row in cur: - # row['totals'] = locale.currency(row['totals'], grouping=True) - # print('%(totals)20s: %(name)s' % row) - - read_con.close() - write_con.commit() - write_con.close() - - print('ran in', time.time() - start_time, 'seconds') diff --git a/pgsql_big_dedupe_example/test_parallel_vs_serial.sh b/pgsql_big_dedupe_example/test_parallel_vs_serial.sh index f0ee4367..68eef4b9 100755 --- a/pgsql_big_dedupe_example/test_parallel_vs_serial.sh +++ b/pgsql_big_dedupe_example/test_parallel_vs_serial.sh @@ -3,9 +3,5 @@ set -e # exit immediately echo "Parallel:" time (python pgsql_big_dedupe_example.py > /dev/null 2>&1) -echo "Parallel old:" -time (python pgsql_big_dedupe_example_old.py > /dev/null 2>&1) echo "Serial:" time (python pgsql_big_dedupe_example_serial.py > /dev/null 2>&1) -echo "Serial old:" -time (python pgsql_big_dedupe_example_serial_old.py > /dev/null 2>&1) From 75c7038c475a73ef35466939f86fc56bb1040526 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fl=C3=A1vio=20Juvenal?= Date: Fri, 18 Sep 2020 17:48:26 -0300 Subject: [PATCH 4/6] Keep only the new parallel pgsql_big_dedupe_example.py and uncomment lines --- .../pgsql_big_dedupe_example.py | 231 +++++---- .../pgsql_big_dedupe_example_old.py | 461 ------------------ .../pgsql_big_dedupe_example_serial.py | 373 -------------- .../test_parallel_vs_serial.sh | 7 - 4 files changed, 115 insertions(+), 957 deletions(-) delete mode 100755 pgsql_big_dedupe_example/pgsql_big_dedupe_example_old.py delete mode 100755 pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py delete mode 100755 pgsql_big_dedupe_example/test_parallel_vs_serial.sh diff --git a/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py b/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py index 3235573a..4eae22e8 100755 --- a/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py +++ b/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py @@ -338,124 +338,123 @@ def parallel_fingerprinter(db_conf, deduper_fingerprinter, donor_partition_selec # free up memory by removing indices deduper.fingerprinter.reset_indices() - # logging.info("indexing block_key") - # with write_con: - # with write_con.cursor() as cur: - # cur.execute("CREATE UNIQUE INDEX ON blocking_map " - # "(block_key text_pattern_ops, donor_id)") - - # # ## Clustering - - # with write_con: - # with write_con.cursor() as cur: - # cur.execute("DROP TABLE IF EXISTS entity_map") - - # print('creating entity_map database') - # cur.execute("CREATE TABLE entity_map " - # "(donor_id INTEGER, canon_id INTEGER, " - # " cluster_score FLOAT, PRIMARY KEY(donor_id))") - - # with read_con.cursor('pairs', cursor_factory=psycopg2.extensions.cursor) as read_cur: - # read_cur.execute(""" - # select a.donor_id, - # row_to_json((select d from (select a.city, - # a.name, - # a.zip, - # a.state, - # a.address) d)), - # b.donor_id, - # row_to_json((select d from (select b.city, - # b.name, - # b.zip, - # b.state, - # b.address) d)) - # from (select DISTINCT l.donor_id as east, r.donor_id as west - # from blocking_map as l - # INNER JOIN blocking_map as r - # using (block_key) - # where l.donor_id < r.donor_id) ids - # INNER JOIN processed_donors a on ids.east=a.donor_id - # INNER JOIN processed_donors b on ids.west=b.donor_id""") - - # print('clustering...') - # clustered_dupes = deduper.cluster(deduper.score(record_pairs(read_cur)), - # threshold=0.5) - - # # ## Writing out results - - # # We now have a sequence of tuples of donor ids that dedupe believes - # # all refer to the same entity. We write this out onto an entity map - # # table - - # print('writing results') - # with write_con: - # with write_con.cursor() as write_cur: - # write_cur.copy_expert('COPY entity_map FROM STDIN WITH CSV', - # Readable(cluster_ids(clustered_dupes)), - # size=10000) - - # with write_con: - # with write_con.cursor() as cur: - # cur.execute("CREATE INDEX head_index ON entity_map (canon_id)") - - # # Print out the number of duplicates found - - # # ## Payoff - - # # With all this done, we can now begin to ask interesting questions - # # of the data - # # - # # For example, let's see who the top 10 donors are. - - # locale.setlocale(locale.LC_ALL, '') # for pretty printing numbers - - # # Create a temporary table so each group and unmatched record has - # # a unique id - - # with read_con.cursor() as cur: - # cur.execute("CREATE TEMPORARY TABLE e_map " - # "AS SELECT COALESCE(canon_id, donor_id) AS canon_id, donor_id " - # "FROM entity_map " - # "RIGHT JOIN donors USING(donor_id)") - - # cur.execute( - # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) AS name, " - # "donation_totals.totals AS totals " - # "FROM donors INNER JOIN " - # "(SELECT canon_id, SUM(CAST(amount AS FLOAT)) AS totals " - # " FROM contributions INNER JOIN e_map " - # " USING (donor_id) " - # " GROUP BY (canon_id) " - # " ORDER BY totals " - # " DESC LIMIT 10) " - # "AS donation_totals ON donors.donor_id=donation_totals.canon_id " - # "WHERE donors.donor_id = donation_totals.canon_id" - # ) - - # print("Top Donors (deduped)") - # for row in cur: - # row['totals'] = locale.currency(row['totals'], grouping=True) - # print('%(totals)20s: %(name)s' % row) - - # # Compare this to what we would have gotten if we hadn't done any - # # deduplication - # cur.execute( - # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) as name, " - # "SUM(CAST(contributions.amount AS FLOAT)) AS totals " - # "FROM donors INNER JOIN contributions " - # "USING (donor_id) " - # "GROUP BY (donor_id) " - # "ORDER BY totals DESC " - # "LIMIT 10" - # ) - - # print("Top Donors (raw)") - # for row in cur: - # row['totals'] = locale.currency(row['totals'], grouping=True) - # print('%(totals)20s: %(name)s' % row) + logging.info("indexing block_key") + with write_con: + with write_con.cursor() as cur: + cur.execute("CREATE UNIQUE INDEX ON blocking_map " + "(block_key text_pattern_ops, donor_id)") + + # ## Clustering + + with write_con: + with write_con.cursor() as cur: + cur.execute("DROP TABLE IF EXISTS entity_map") + + print('creating entity_map database') + cur.execute("CREATE TABLE entity_map " + "(donor_id INTEGER, canon_id INTEGER, " + " cluster_score FLOAT, PRIMARY KEY(donor_id))") + + with read_con.cursor('pairs', cursor_factory=psycopg2.extensions.cursor) as read_cur: + read_cur.execute(""" + select a.donor_id, + row_to_json((select d from (select a.city, + a.name, + a.zip, + a.state, + a.address) d)), + b.donor_id, + row_to_json((select d from (select b.city, + b.name, + b.zip, + b.state, + b.address) d)) + from (select DISTINCT l.donor_id as east, r.donor_id as west + from blocking_map as l + INNER JOIN blocking_map as r + using (block_key) + where l.donor_id < r.donor_id) ids + INNER JOIN processed_donors a on ids.east=a.donor_id + INNER JOIN processed_donors b on ids.west=b.donor_id""") + + print('clustering...') + clustered_dupes = deduper.cluster(deduper.score(record_pairs(read_cur)), + threshold=0.5) + + # ## Writing out results + + # We now have a sequence of tuples of donor ids that dedupe believes + # all refer to the same entity. We write this out onto an entity map + # table + + print('writing results') + with write_con: + with write_con.cursor() as write_cur: + write_cur.copy_expert('COPY entity_map FROM STDIN WITH CSV', + Readable(cluster_ids(clustered_dupes)), + size=10000) + + with write_con: + with write_con.cursor() as cur: + cur.execute("CREATE INDEX head_index ON entity_map (canon_id)") + + # Print out the number of duplicates found + + # ## Payoff + + # With all this done, we can now begin to ask interesting questions + # of the data + # + # For example, let's see who the top 10 donors are. + + locale.setlocale(locale.LC_ALL, '') # for pretty printing numbers + + # Create a temporary table so each group and unmatched record has + # a unique id + + with read_con.cursor() as cur: + cur.execute("CREATE TEMPORARY TABLE e_map " + "AS SELECT COALESCE(canon_id, donor_id) AS canon_id, donor_id " + "FROM entity_map " + "RIGHT JOIN donors USING(donor_id)") + + cur.execute( + "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) AS name, " + "donation_totals.totals AS totals " + "FROM donors INNER JOIN " + "(SELECT canon_id, SUM(CAST(amount AS FLOAT)) AS totals " + " FROM contributions INNER JOIN e_map " + " USING (donor_id) " + " GROUP BY (canon_id) " + " ORDER BY totals " + " DESC LIMIT 10) " + "AS donation_totals ON donors.donor_id=donation_totals.canon_id " + "WHERE donors.donor_id = donation_totals.canon_id" + ) + + print("Top Donors (deduped)") + for row in cur: + row['totals'] = locale.currency(row['totals'], grouping=True) + print('%(totals)20s: %(name)s' % row) + + # Compare this to what we would have gotten if we hadn't done any + # deduplication + cur.execute( + "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) as name, " + "SUM(CAST(contributions.amount AS FLOAT)) AS totals " + "FROM donors INNER JOIN contributions " + "USING (donor_id) " + "GROUP BY (donor_id) " + "ORDER BY totals DESC " + "LIMIT 10" + ) + + print("Top Donors (raw)") + for row in cur: + row['totals'] = locale.currency(row['totals'], grouping=True) + print('%(totals)20s: %(name)s' % row) read_con.close() - write_con.commit() write_con.close() print('ran in', time.time() - start_time, 'seconds') diff --git a/pgsql_big_dedupe_example/pgsql_big_dedupe_example_old.py b/pgsql_big_dedupe_example/pgsql_big_dedupe_example_old.py deleted file mode 100755 index 3235573a..00000000 --- a/pgsql_big_dedupe_example/pgsql_big_dedupe_example_old.py +++ /dev/null @@ -1,461 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" -This is an example of working with very large data. There are about -700,000 unduplicated donors in this database of Illinois political -campaign contributions. - -With such a large set of input data, we cannot store all the comparisons -we need to make in memory. Instead, we will read the pairs on demand -from the PostgresSQL database. - -__Note:__ You will need to run `python pgsql_big_dedupe_example_init_db.py` -before running this script. - -For smaller datasets (<10,000), see our -[csv_example](http://datamade.github.io/dedupe-examples/docs/csv_example.html) -""" -import os -import time -import logging -import optparse -import locale -import itertools -import io -import csv -import multiprocessing -import math - -import dj_database_url -import psycopg2 -import psycopg2.extras - -import dedupe -import numpy - - -from psycopg2.extensions import register_adapter, AsIs -register_adapter(numpy.int32, AsIs) -register_adapter(numpy.int64, AsIs) -register_adapter(numpy.float32, AsIs) -register_adapter(numpy.float64, AsIs) - - -class Readable(object): - - def __init__(self, iterator): - - self.output = io.StringIO( - # necesary for csv. See: https://docs.python.org/3/library/csv.html#id3 - newline='', - ) - self.writer = csv.writer( - self.output, - # csv.unix_dialect seems the right one - # based on our tests and Postgres CSV defaults: - # https://www.postgresql.org/docs/12/sql-copy.html - dialect=csv.unix_dialect) - self.iterator = iterator - - def read(self, size): - - self.writer.writerows(itertools.islice(self.iterator, size)) - - chunk = self.output.getvalue() - self.output.seek(0) - self.output.truncate(0) - - return chunk - - -def record_pairs(result_set): - - for i, row in enumerate(result_set): - a_record_id, a_record, b_record_id, b_record = row - record_a = (a_record_id, a_record) - record_b = (b_record_id, b_record) - - yield record_a, record_b - - if i % 10000 == 0: - print(i) - - -def cluster_ids(clustered_dupes): - - for cluster, scores in clustered_dupes: - cluster_id = cluster[0] - for donor_id, score in zip(cluster, scores): - yield donor_id, cluster_id, score - - -def parallel_fingerprinter(db_conf, deduper_fingerprinter, donor_partition_select, partition_offset, partition_end): - read_con = psycopg2.connect(database=db_conf['NAME'], - user=db_conf['USER'], - password=db_conf['PASSWORD'], - host=db_conf['HOST'], - cursor_factory=psycopg2.extras.RealDictCursor) - write_con = psycopg2.connect(database=db_conf['NAME'], - user=db_conf['USER'], - password=db_conf['PASSWORD'], - host=db_conf['HOST']) - - with read_con.cursor('donor_partition_select') as read_cur: - read_cur.execute(donor_partition_select, (partition_offset, partition_end)) - - partition_data = ((row['donor_id'], row) for row in read_cur) - b_data = deduper_fingerprinter(partition_data) - - with write_con: - with write_con.cursor() as write_cur: - write_cur.copy_expert('COPY blocking_map FROM STDIN WITH CSV', - Readable(b_data), - size=10000) - - -if __name__ == '__main__': - # ## Logging - - # Dedupe uses Python logging to show or suppress verbose output. Added - # for convenience. To enable verbose output, run `python - # pgsql_big_dedupe_example.py -v` - optp = optparse.OptionParser() - optp.add_option('-v', '--verbose', dest='verbose', action='count', - help='Increase verbosity (specify multiple times for more)' - ) - (opts, args) = optp.parse_args() - log_level = logging.WARNING - if opts.verbose: - if opts.verbose == 1: - log_level = logging.INFO - elif opts.verbose >= 2: - log_level = logging.DEBUG - logging.getLogger().setLevel(log_level) - - # ## Setup - settings_file = 'pgsql_big_dedupe_example_settings' - training_file = 'pgsql_big_dedupe_example_training.json' - num_cores = multiprocessing.cpu_count() - - start_time = time.time() - - # Set the database connection from environment variable using - # [dj_database_url](https://github.com/kennethreitz/dj-database-url) - # For example: - # export DATABASE_URL=postgres://user:password@host/mydatabase - db_conf = dj_database_url.config() - - if not db_conf: - raise Exception( - 'set DATABASE_URL environment variable with your connection, e.g. ' - 'export DATABASE_URL=postgres://user:password@host/mydatabase' - ) - - read_con = psycopg2.connect(database=db_conf['NAME'], - user=db_conf['USER'], - password=db_conf['PASSWORD'], - host=db_conf['HOST'], - cursor_factory=psycopg2.extras.RealDictCursor) - - write_con = psycopg2.connect(database=db_conf['NAME'], - user=db_conf['USER'], - password=db_conf['PASSWORD'], - host=db_conf['HOST']) - - # We'll be using variations on this following select statement to pull - # in campaign donor info. - # - # We did a fair amount of preprocessing of the fields in - # `pgsql_big_dedupe_example_init_db.py` - - DONOR_SELECT = "SELECT donor_id, city, name, zip, state, address " \ - "FROM processed_donors" - DONOR_PARTITION_SELECT = "SELECT donor_id, city, name, zip, state, address " \ - "FROM processed_donors " \ - "WHERE donor_id >= %s " \ - "AND donor_id < %s" - COUNT_SELECT = "SELECT COUNT(*) FROM processed_donors" - - # ## Training - - if os.path.exists(settings_file): - print('reading from ', settings_file) - with open(settings_file, 'rb') as sf: - deduper = dedupe.StaticDedupe(sf, num_cores=num_cores) - else: - - # Define the fields dedupe will pay attention to - # - # The address, city, and zip fields are often missing, so we'll - # tell dedupe that, and we'll learn a model that take that into - # account - fields = [{'field': 'name', 'type': 'String'}, - {'field': 'address', 'type': 'String', - 'has missing': True}, - {'field': 'city', 'type': 'ShortString', 'has missing': True}, - {'field': 'state', 'type': 'ShortString', 'has missing': True}, - {'field': 'zip', 'type': 'ShortString', 'has missing': True}, - ] - - # Create a new deduper object and pass our data model to it. - deduper = dedupe.Dedupe(fields, num_cores=num_cores) - - # Named cursor runs server side with psycopg2 - with read_con.cursor('donor_select') as cur: - cur.execute(DONOR_SELECT) - temp_d = {i: row for i, row in enumerate(cur)} - - # If we have training data saved from a previous run of dedupe, - # look for it an load it in. - # - # __Note:__ if you want to train from - # scratch, delete the training_file - if os.path.exists(training_file): - print('reading labeled examples from ', training_file) - with open(training_file) as tf: - deduper.prepare_training(temp_d, tf) - else: - deduper.prepare_training(temp_d) - - del temp_d - - # ## Active learning - - print('starting active labeling...') - # Starts the training loop. Dedupe will find the next pair of records - # it is least certain about and ask you to label them as duplicates - # or not. - - # use 'y', 'n' and 'u' keys to flag duplicates - # press 'f' when you are finished - dedupe.console_label(deduper) - # When finished, save our labeled, training pairs to disk - with open(training_file, 'w') as tf: - deduper.write_training(tf) - - # Notice our argument here - # - # `recall` is the proportion of true dupes pairs that the learned - # rules must cover. You may want to reduce this if your are making - # too many blocks and too many comparisons. - deduper.train(recall=0.90) - - with open(settings_file, 'wb') as sf: - deduper.write_settings(sf) - - # We can now remove some of the memory hogging objects we used - # for training - deduper.cleanup_training() - - # ## Blocking - print('blocking...') - - # To run blocking on such a large set of data, we create a separate table - # that contains blocking keys and record ids - print('creating blocking_map database') - with write_con: - with write_con.cursor() as cur: - cur.execute("DROP TABLE IF EXISTS blocking_map") - cur.execute("CREATE TABLE blocking_map " - "(block_key text, donor_id INTEGER)") - - # Compute `(block_key, donor_id)` tuples and write on `blocking_map` in parallel, - # but only for the non-index predicates. Only those can run in parallel - # because they don't need global predicate indexes. - # We cannot share predicate indexes across Python processes because it would consume - # too much RAM - print('computing blocking map in parallel (non-index predicates)') - - predicates_without_index = [] - predicates_with_index = [] - for full_predicate in deduper.fingerprinter.predicates: - if any(predicate in deduper.fingerprinter.index_predicates - for predicate in full_predicate): - predicates_with_index.append(full_predicate) - else: - predicates_without_index.append(full_predicate) - - # Use only predicates WITHOUT indexes for parallel blocking - deduper.fingerprinter.predicates = predicates_without_index - - # processed_donors `id` starts at 1 and goes up to `COUNT(*)` - with read_con.cursor('donor_select') as cur: - cur.execute(COUNT_SELECT) - total_rows = cur.fetchone()['count'] - partition_size = math.ceil(total_rows / num_cores) - partition_offsets = range(1, total_rows + 1, partition_size) - - # Use a multiprocessing.Pool to run parallel_fingerprinter in num_cores. - # Set context to 'spawn' to start new clean processes. - # 'spawn' is the defualt on Windows and macOS, but not on Unix - multiprocessing.set_start_method('spawn') - with multiprocessing.Pool(processes=num_cores) as pool: - block_file_path_list = pool.starmap( - parallel_fingerprinter, - [ - ( - db_conf, - deduper.fingerprinter, - DONOR_PARTITION_SELECT, - partition_offset, - (partition_offset + partition_size) - ) - for partition_offset in partition_offsets - ], - ) - - # If dedupe learned a Index Predicate, we have to take a pass - # through the data and create indices - if predicates_with_index: - print('creating inverted indexes') - - # Use only predicates WITH indexes for non-parallel blocking - deduper.fingerprinter.predicates = predicates_with_index - - for field in deduper.fingerprinter.index_fields: - with read_con.cursor('field_values') as cur: - cur.execute("SELECT DISTINCT %s FROM processed_donors" % field) - field_data = (row[field] for row in cur) - deduper.fingerprinter.index(field_data, field) - - # Compute `(block_key, donor_id)` tuples and write on `blocking_map` in serial - # for the index predicates - print('computing and writing blocking map (index predicates)') - - with read_con.cursor('donor_select') as read_cur: - read_cur.execute(DONOR_SELECT) - - full_data = ((row['donor_id'], row) for row in read_cur) - b_data = deduper.fingerprinter(full_data) - - with write_con: - with write_con.cursor() as write_cur: - write_cur.copy_expert('COPY blocking_map FROM STDIN WITH CSV', - Readable(b_data), - size=10000) - - # free up memory by removing indices - deduper.fingerprinter.reset_indices() - - # logging.info("indexing block_key") - # with write_con: - # with write_con.cursor() as cur: - # cur.execute("CREATE UNIQUE INDEX ON blocking_map " - # "(block_key text_pattern_ops, donor_id)") - - # # ## Clustering - - # with write_con: - # with write_con.cursor() as cur: - # cur.execute("DROP TABLE IF EXISTS entity_map") - - # print('creating entity_map database') - # cur.execute("CREATE TABLE entity_map " - # "(donor_id INTEGER, canon_id INTEGER, " - # " cluster_score FLOAT, PRIMARY KEY(donor_id))") - - # with read_con.cursor('pairs', cursor_factory=psycopg2.extensions.cursor) as read_cur: - # read_cur.execute(""" - # select a.donor_id, - # row_to_json((select d from (select a.city, - # a.name, - # a.zip, - # a.state, - # a.address) d)), - # b.donor_id, - # row_to_json((select d from (select b.city, - # b.name, - # b.zip, - # b.state, - # b.address) d)) - # from (select DISTINCT l.donor_id as east, r.donor_id as west - # from blocking_map as l - # INNER JOIN blocking_map as r - # using (block_key) - # where l.donor_id < r.donor_id) ids - # INNER JOIN processed_donors a on ids.east=a.donor_id - # INNER JOIN processed_donors b on ids.west=b.donor_id""") - - # print('clustering...') - # clustered_dupes = deduper.cluster(deduper.score(record_pairs(read_cur)), - # threshold=0.5) - - # # ## Writing out results - - # # We now have a sequence of tuples of donor ids that dedupe believes - # # all refer to the same entity. We write this out onto an entity map - # # table - - # print('writing results') - # with write_con: - # with write_con.cursor() as write_cur: - # write_cur.copy_expert('COPY entity_map FROM STDIN WITH CSV', - # Readable(cluster_ids(clustered_dupes)), - # size=10000) - - # with write_con: - # with write_con.cursor() as cur: - # cur.execute("CREATE INDEX head_index ON entity_map (canon_id)") - - # # Print out the number of duplicates found - - # # ## Payoff - - # # With all this done, we can now begin to ask interesting questions - # # of the data - # # - # # For example, let's see who the top 10 donors are. - - # locale.setlocale(locale.LC_ALL, '') # for pretty printing numbers - - # # Create a temporary table so each group and unmatched record has - # # a unique id - - # with read_con.cursor() as cur: - # cur.execute("CREATE TEMPORARY TABLE e_map " - # "AS SELECT COALESCE(canon_id, donor_id) AS canon_id, donor_id " - # "FROM entity_map " - # "RIGHT JOIN donors USING(donor_id)") - - # cur.execute( - # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) AS name, " - # "donation_totals.totals AS totals " - # "FROM donors INNER JOIN " - # "(SELECT canon_id, SUM(CAST(amount AS FLOAT)) AS totals " - # " FROM contributions INNER JOIN e_map " - # " USING (donor_id) " - # " GROUP BY (canon_id) " - # " ORDER BY totals " - # " DESC LIMIT 10) " - # "AS donation_totals ON donors.donor_id=donation_totals.canon_id " - # "WHERE donors.donor_id = donation_totals.canon_id" - # ) - - # print("Top Donors (deduped)") - # for row in cur: - # row['totals'] = locale.currency(row['totals'], grouping=True) - # print('%(totals)20s: %(name)s' % row) - - # # Compare this to what we would have gotten if we hadn't done any - # # deduplication - # cur.execute( - # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) as name, " - # "SUM(CAST(contributions.amount AS FLOAT)) AS totals " - # "FROM donors INNER JOIN contributions " - # "USING (donor_id) " - # "GROUP BY (donor_id) " - # "ORDER BY totals DESC " - # "LIMIT 10" - # ) - - # print("Top Donors (raw)") - # for row in cur: - # row['totals'] = locale.currency(row['totals'], grouping=True) - # print('%(totals)20s: %(name)s' % row) - - read_con.close() - write_con.commit() - write_con.close() - - print('ran in', time.time() - start_time, 'seconds') diff --git a/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py b/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py deleted file mode 100755 index 1629540f..00000000 --- a/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py +++ /dev/null @@ -1,373 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" -This is an example of working with very large data. There are about -700,000 unduplicated donors in this database of Illinois political -campaign contributions. - -With such a large set of input data, we cannot store all the comparisons -we need to make in memory. Instead, we will read the pairs on demand -from the PostgresSQL database. - -__Note:__ You will need to run `python pgsql_big_dedupe_example_init_db.py` -before running this script. - -For smaller datasets (<10,000), see our -[csv_example](http://datamade.github.io/dedupe-examples/docs/csv_example.html) -""" -import os -import time -import logging -import optparse -import locale -import itertools -import io -import csv -import tempfile - -import dj_database_url -import psycopg2 -import psycopg2.extras - -import dedupe -import numpy - - -from psycopg2.extensions import register_adapter, AsIs -register_adapter(numpy.int32, AsIs) -register_adapter(numpy.int64, AsIs) -register_adapter(numpy.float32, AsIs) -register_adapter(numpy.float64, AsIs) - - -class Readable(object): - - def __init__(self, iterator): - - self.output = io.StringIO() - self.writer = csv.writer(self.output) - self.iterator = iterator - - def read(self, size): - - self.writer.writerows(itertools.islice(self.iterator, size)) - - chunk = self.output.getvalue() - self.output.seek(0) - self.output.truncate(0) - - return chunk - - -def record_pairs(result_set): - - for i, row in enumerate(result_set): - a_record_id, a_record, b_record_id, b_record = row - record_a = (a_record_id, a_record) - record_b = (b_record_id, b_record) - - yield record_a, record_b - - if i % 10000 == 0: - print(i) - - -def cluster_ids(clustered_dupes): - - for cluster, scores in clustered_dupes: - cluster_id = cluster[0] - for donor_id, score in zip(cluster, scores): - yield donor_id, cluster_id, score - - -if __name__ == '__main__': - # ## Logging - - # Dedupe uses Python logging to show or suppress verbose output. Added - # for convenience. To enable verbose output, run `python - # pgsql_big_dedupe_example.py -v` - optp = optparse.OptionParser() - optp.add_option('-v', '--verbose', dest='verbose', action='count', - help='Increase verbosity (specify multiple times for more)' - ) - (opts, args) = optp.parse_args() - log_level = logging.WARNING - if opts.verbose: - if opts.verbose == 1: - log_level = logging.INFO - elif opts.verbose >= 2: - log_level = logging.DEBUG - logging.getLogger().setLevel(log_level) - - # ## Setup - settings_file = 'pgsql_big_dedupe_example_settings' - training_file = 'pgsql_big_dedupe_example_training.json' - - start_time = time.time() - - # Set the database connection from environment variable using - # [dj_database_url](https://github.com/kennethreitz/dj-database-url) - # For example: - # export DATABASE_URL=postgres://user:password@host/mydatabase - db_conf = dj_database_url.config() - - if not db_conf: - raise Exception( - 'set DATABASE_URL environment variable with your connection, e.g. ' - 'export DATABASE_URL=postgres://user:password@host/mydatabase' - ) - - read_con = psycopg2.connect(database=db_conf['NAME'], - user=db_conf['USER'], - password=db_conf['PASSWORD'], - host=db_conf['HOST'], - cursor_factory=psycopg2.extras.RealDictCursor) - - write_con = psycopg2.connect(database=db_conf['NAME'], - user=db_conf['USER'], - password=db_conf['PASSWORD'], - host=db_conf['HOST']) - - # We'll be using variations on this following select statement to pull - # in campaign donor info. - # - # We did a fair amount of preprocessing of the fields in - # `pgsql_big_dedupe_example_init_db.py` - - DONOR_SELECT = "SELECT donor_id, city, name, zip, state, address " \ - "from processed_donors" - - # ## Training - - if os.path.exists(settings_file): - print('reading from ', settings_file) - with open(settings_file, 'rb') as sf: - deduper = dedupe.StaticDedupe(sf, num_cores=4) - else: - - # Define the fields dedupe will pay attention to - # - # The address, city, and zip fields are often missing, so we'll - # tell dedupe that, and we'll learn a model that take that into - # account - fields = [{'field': 'name', 'type': 'String'}, - {'field': 'address', 'type': 'String', - 'has missing': True}, - {'field': 'city', 'type': 'ShortString', 'has missing': True}, - {'field': 'state', 'type': 'ShortString', 'has missing': True}, - {'field': 'zip', 'type': 'ShortString', 'has missing': True}, - ] - - # Create a new deduper object and pass our data model to it. - deduper = dedupe.Dedupe(fields, num_cores=4) - - # Named cursor runs server side with psycopg2 - with read_con.cursor('donor_select') as cur: - cur.execute(DONOR_SELECT) - temp_d = {i: row for i, row in enumerate(cur)} - - # If we have training data saved from a previous run of dedupe, - # look for it an load it in. - # - # __Note:__ if you want to train from - # scratch, delete the training_file - if os.path.exists(training_file): - print('reading labeled examples from ', training_file) - with open(training_file) as tf: - deduper.prepare_training(temp_d, tf) - else: - deduper.prepare_training(temp_d) - - del temp_d - - # ## Active learning - - print('starting active labeling...') - # Starts the training loop. Dedupe will find the next pair of records - # it is least certain about and ask you to label them as duplicates - # or not. - - # use 'y', 'n' and 'u' keys to flag duplicates - # press 'f' when you are finished - dedupe.console_label(deduper) - # When finished, save our labeled, training pairs to disk - with open(training_file, 'w') as tf: - deduper.write_training(tf) - - # Notice our argument here - # - # `recall` is the proportion of true dupes pairs that the learned - # rules must cover. You may want to reduce this if your are making - # too many blocks and too many comparisons. - deduper.train(recall=0.90) - - with open(settings_file, 'wb') as sf: - deduper.write_settings(sf) - - # We can now remove some of the memory hogging objects we used - # for training - deduper.cleanup_training() - - # ## Blocking - print('blocking...') - - # To run blocking on such a large set of data, we create a separate table - # that contains blocking keys and record ids - print('creating blocking_map_serial database') - with write_con: - with write_con.cursor() as cur: - cur.execute("DROP TABLE IF EXISTS blocking_map_serial") - cur.execute("CREATE TABLE blocking_map_serial " - "(block_key text, donor_id INTEGER)") - - # If dedupe learned a Index Predicate, we have to take a pass - # through the data and create indices. - print('creating inverted index') - - for field in deduper.fingerprinter.index_fields: - with read_con.cursor('field_values') as cur: - cur.execute("SELECT DISTINCT %s FROM processed_donors" % field) - field_data = (row[field] for row in cur) - deduper.fingerprinter.index(field_data, field) - - # Now we are ready to write our blocking map table by creating a - # generator that yields unique `(block_key, donor_id)` tuples. - print('writing blocking map') - - with read_con.cursor('donor_select') as read_cur: - read_cur.execute(DONOR_SELECT) - - full_data = ((row['donor_id'], row) for row in read_cur) - b_data = deduper.fingerprinter(full_data) - - with write_con: - with write_con.cursor() as write_cur: - write_cur.copy_expert('COPY blocking_map_serial FROM STDIN WITH CSV', - Readable(b_data), - size=10000) - - # free up memory by removing indices - deduper.fingerprinter.reset_indices() - - # logging.info("indexing block_key") - # with write_con: - # with write_con.cursor() as cur: - # cur.execute("CREATE UNIQUE INDEX ON blocking_map_serial " - # "(block_key text_pattern_ops, donor_id)") - - # # ## Clustering - - # with write_con: - # with write_con.cursor() as cur: - # cur.execute("DROP TABLE IF EXISTS entity_map") - - # print('creating entity_map database') - # cur.execute("CREATE TABLE entity_map " - # "(donor_id INTEGER, canon_id INTEGER, " - # " cluster_score FLOAT, PRIMARY KEY(donor_id))") - - # with read_con.cursor('pairs', cursor_factory=psycopg2.extensions.cursor) as read_cur: - # read_cur.execute(""" - # select a.donor_id, - # row_to_json((select d from (select a.city, - # a.name, - # a.zip, - # a.state, - # a.address) d)), - # b.donor_id, - # row_to_json((select d from (select b.city, - # b.name, - # b.zip, - # b.state, - # b.address) d)) - # from (select DISTINCT l.donor_id as east, r.donor_id as west - # from blocking_map_serial as l - # INNER JOIN blocking_map_serial as r - # using (block_key) - # where l.donor_id < r.donor_id) ids - # INNER JOIN processed_donors a on ids.east=a.donor_id - # INNER JOIN processed_donors b on ids.west=b.donor_id""") - - # print('clustering...') - # clustered_dupes = deduper.cluster(deduper.score(record_pairs(read_cur)), - # threshold=0.5) - - # # ## Writing out results - - # # We now have a sequence of tuples of donor ids that dedupe believes - # # all refer to the same entity. We write this out onto an entity map - # # table - - # print('writing results') - # with write_con: - # with write_con.cursor() as write_cur: - # write_cur.copy_expert('COPY entity_map FROM STDIN WITH CSV', - # Readable(cluster_ids(clustered_dupes)), - # size=10000) - - # with write_con: - # with write_con.cursor() as cur: - # cur.execute("CREATE INDEX head_index ON entity_map (canon_id)") - - # # Print out the number of duplicates found - - # # ## Payoff - - # # With all this done, we can now begin to ask interesting questions - # # of the data - # # - # # For example, let's see who the top 10 donors are. - - # locale.setlocale(locale.LC_ALL, '') # for pretty printing numbers - - # # Create a temporary table so each group and unmatched record has - # # a unique id - - # with read_con.cursor() as cur: - # cur.execute("CREATE TEMPORARY TABLE e_map " - # "AS SELECT COALESCE(canon_id, donor_id) AS canon_id, donor_id " - # "FROM entity_map " - # "RIGHT JOIN donors USING(donor_id)") - - # cur.execute( - # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) AS name, " - # "donation_totals.totals AS totals " - # "FROM donors INNER JOIN " - # "(SELECT canon_id, SUM(CAST(amount AS FLOAT)) AS totals " - # " FROM contributions INNER JOIN e_map " - # " USING (donor_id) " - # " GROUP BY (canon_id) " - # " ORDER BY totals " - # " DESC LIMIT 10) " - # "AS donation_totals ON donors.donor_id=donation_totals.canon_id " - # "WHERE donors.donor_id = donation_totals.canon_id" - # ) - - # print("Top Donors (deduped)") - # for row in cur: - # row['totals'] = locale.currency(row['totals'], grouping=True) - # print('%(totals)20s: %(name)s' % row) - - # # Compare this to what we would have gotten if we hadn't done any - # # deduplication - # cur.execute( - # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) as name, " - # "SUM(CAST(contributions.amount AS FLOAT)) AS totals " - # "FROM donors INNER JOIN contributions " - # "USING (donor_id) " - # "GROUP BY (donor_id) " - # "ORDER BY totals DESC " - # "LIMIT 10" - # ) - - # print("Top Donors (raw)") - # for row in cur: - # row['totals'] = locale.currency(row['totals'], grouping=True) - # print('%(totals)20s: %(name)s' % row) - - read_con.close() - write_con.commit() - write_con.close() - - print('ran in', time.time() - start_time, 'seconds') diff --git a/pgsql_big_dedupe_example/test_parallel_vs_serial.sh b/pgsql_big_dedupe_example/test_parallel_vs_serial.sh deleted file mode 100755 index 68eef4b9..00000000 --- a/pgsql_big_dedupe_example/test_parallel_vs_serial.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash -set -e # exit immediately - -echo "Parallel:" -time (python pgsql_big_dedupe_example.py > /dev/null 2>&1) -echo "Serial:" -time (python pgsql_big_dedupe_example_serial.py > /dev/null 2>&1) From c1c838485f6fee1958e4b4a35be58f5f035a3db5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fl=C3=A1vio=20Juvenal?= Date: Sat, 19 Sep 2020 14:12:01 -0300 Subject: [PATCH 5/6] test_parallel_vs_serial.sh again --- .../pgsql_big_dedupe_example.py | 238 ++++++----- .../pgsql_big_dedupe_example_serial.py | 371 ++++++++++++++++++ .../test_parallel_vs_serial.sh | 7 + 3 files changed, 496 insertions(+), 120 deletions(-) create mode 100644 pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py create mode 100755 pgsql_big_dedupe_example/test_parallel_vs_serial.sh diff --git a/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py b/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py index 4eae22e8..df8b8320 100755 --- a/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py +++ b/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py @@ -32,6 +32,7 @@ import psycopg2.extras import dedupe +from dedupe.backport import Pool import numpy @@ -134,7 +135,7 @@ def parallel_fingerprinter(db_conf, deduper_fingerprinter, donor_partition_selec logging.getLogger().setLevel(log_level) # ## Setup - settings_file = 'pgsql_big_dedupe_example_settings' + settings_file = 'pgsql_big_dedupe_example_settings.with-indexes' training_file = 'pgsql_big_dedupe_example_training.json' num_cores = multiprocessing.cpu_count() @@ -287,10 +288,7 @@ def parallel_fingerprinter(db_conf, deduper_fingerprinter, donor_partition_selec partition_offsets = range(1, total_rows + 1, partition_size) # Use a multiprocessing.Pool to run parallel_fingerprinter in num_cores. - # Set context to 'spawn' to start new clean processes. - # 'spawn' is the defualt on Windows and macOS, but not on Unix - multiprocessing.set_start_method('spawn') - with multiprocessing.Pool(processes=num_cores) as pool: + with Pool(processes=num_cores) as pool: block_file_path_list = pool.starmap( parallel_fingerprinter, [ @@ -338,121 +336,121 @@ def parallel_fingerprinter(db_conf, deduper_fingerprinter, donor_partition_selec # free up memory by removing indices deduper.fingerprinter.reset_indices() - logging.info("indexing block_key") - with write_con: - with write_con.cursor() as cur: - cur.execute("CREATE UNIQUE INDEX ON blocking_map " - "(block_key text_pattern_ops, donor_id)") - - # ## Clustering - - with write_con: - with write_con.cursor() as cur: - cur.execute("DROP TABLE IF EXISTS entity_map") - - print('creating entity_map database') - cur.execute("CREATE TABLE entity_map " - "(donor_id INTEGER, canon_id INTEGER, " - " cluster_score FLOAT, PRIMARY KEY(donor_id))") - - with read_con.cursor('pairs', cursor_factory=psycopg2.extensions.cursor) as read_cur: - read_cur.execute(""" - select a.donor_id, - row_to_json((select d from (select a.city, - a.name, - a.zip, - a.state, - a.address) d)), - b.donor_id, - row_to_json((select d from (select b.city, - b.name, - b.zip, - b.state, - b.address) d)) - from (select DISTINCT l.donor_id as east, r.donor_id as west - from blocking_map as l - INNER JOIN blocking_map as r - using (block_key) - where l.donor_id < r.donor_id) ids - INNER JOIN processed_donors a on ids.east=a.donor_id - INNER JOIN processed_donors b on ids.west=b.donor_id""") - - print('clustering...') - clustered_dupes = deduper.cluster(deduper.score(record_pairs(read_cur)), - threshold=0.5) - - # ## Writing out results - - # We now have a sequence of tuples of donor ids that dedupe believes - # all refer to the same entity. We write this out onto an entity map - # table - - print('writing results') - with write_con: - with write_con.cursor() as write_cur: - write_cur.copy_expert('COPY entity_map FROM STDIN WITH CSV', - Readable(cluster_ids(clustered_dupes)), - size=10000) - - with write_con: - with write_con.cursor() as cur: - cur.execute("CREATE INDEX head_index ON entity_map (canon_id)") - - # Print out the number of duplicates found - - # ## Payoff - - # With all this done, we can now begin to ask interesting questions - # of the data - # - # For example, let's see who the top 10 donors are. - - locale.setlocale(locale.LC_ALL, '') # for pretty printing numbers - - # Create a temporary table so each group and unmatched record has - # a unique id - - with read_con.cursor() as cur: - cur.execute("CREATE TEMPORARY TABLE e_map " - "AS SELECT COALESCE(canon_id, donor_id) AS canon_id, donor_id " - "FROM entity_map " - "RIGHT JOIN donors USING(donor_id)") - - cur.execute( - "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) AS name, " - "donation_totals.totals AS totals " - "FROM donors INNER JOIN " - "(SELECT canon_id, SUM(CAST(amount AS FLOAT)) AS totals " - " FROM contributions INNER JOIN e_map " - " USING (donor_id) " - " GROUP BY (canon_id) " - " ORDER BY totals " - " DESC LIMIT 10) " - "AS donation_totals ON donors.donor_id=donation_totals.canon_id " - "WHERE donors.donor_id = donation_totals.canon_id" - ) - - print("Top Donors (deduped)") - for row in cur: - row['totals'] = locale.currency(row['totals'], grouping=True) - print('%(totals)20s: %(name)s' % row) - - # Compare this to what we would have gotten if we hadn't done any - # deduplication - cur.execute( - "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) as name, " - "SUM(CAST(contributions.amount AS FLOAT)) AS totals " - "FROM donors INNER JOIN contributions " - "USING (donor_id) " - "GROUP BY (donor_id) " - "ORDER BY totals DESC " - "LIMIT 10" - ) - - print("Top Donors (raw)") - for row in cur: - row['totals'] = locale.currency(row['totals'], grouping=True) - print('%(totals)20s: %(name)s' % row) + # logging.info("indexing block_key") + # with write_con: + # with write_con.cursor() as cur: + # cur.execute("CREATE UNIQUE INDEX ON blocking_map " + # "(block_key text_pattern_ops, donor_id)") + + # # ## Clustering + + # with write_con: + # with write_con.cursor() as cur: + # cur.execute("DROP TABLE IF EXISTS entity_map") + + # print('creating entity_map database') + # cur.execute("CREATE TABLE entity_map " + # "(donor_id INTEGER, canon_id INTEGER, " + # " cluster_score FLOAT, PRIMARY KEY(donor_id))") + + # with read_con.cursor('pairs', cursor_factory=psycopg2.extensions.cursor) as read_cur: + # read_cur.execute(""" + # select a.donor_id, + # row_to_json((select d from (select a.city, + # a.name, + # a.zip, + # a.state, + # a.address) d)), + # b.donor_id, + # row_to_json((select d from (select b.city, + # b.name, + # b.zip, + # b.state, + # b.address) d)) + # from (select DISTINCT l.donor_id as east, r.donor_id as west + # from blocking_map as l + # INNER JOIN blocking_map as r + # using (block_key) + # where l.donor_id < r.donor_id) ids + # INNER JOIN processed_donors a on ids.east=a.donor_id + # INNER JOIN processed_donors b on ids.west=b.donor_id""") + + # print('clustering...') + # clustered_dupes = deduper.cluster(deduper.score(record_pairs(read_cur)), + # threshold=0.5) + + # # ## Writing out results + + # # We now have a sequence of tuples of donor ids that dedupe believes + # # all refer to the same entity. We write this out onto an entity map + # # table + + # print('writing results') + # with write_con: + # with write_con.cursor() as write_cur: + # write_cur.copy_expert('COPY entity_map FROM STDIN WITH CSV', + # Readable(cluster_ids(clustered_dupes)), + # size=10000) + + # with write_con: + # with write_con.cursor() as cur: + # cur.execute("CREATE INDEX head_index ON entity_map (canon_id)") + + # # Print out the number of duplicates found + + # # ## Payoff + + # # With all this done, we can now begin to ask interesting questions + # # of the data + # # + # # For example, let's see who the top 10 donors are. + + # locale.setlocale(locale.LC_ALL, '') # for pretty printing numbers + + # # Create a temporary table so each group and unmatched record has + # # a unique id + + # with read_con.cursor() as cur: + # cur.execute("CREATE TEMPORARY TABLE e_map " + # "AS SELECT COALESCE(canon_id, donor_id) AS canon_id, donor_id " + # "FROM entity_map " + # "RIGHT JOIN donors USING(donor_id)") + + # cur.execute( + # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) AS name, " + # "donation_totals.totals AS totals " + # "FROM donors INNER JOIN " + # "(SELECT canon_id, SUM(CAST(amount AS FLOAT)) AS totals " + # " FROM contributions INNER JOIN e_map " + # " USING (donor_id) " + # " GROUP BY (canon_id) " + # " ORDER BY totals " + # " DESC LIMIT 10) " + # "AS donation_totals ON donors.donor_id=donation_totals.canon_id " + # "WHERE donors.donor_id = donation_totals.canon_id" + # ) + + # print("Top Donors (deduped)") + # for row in cur: + # row['totals'] = locale.currency(row['totals'], grouping=True) + # print('%(totals)20s: %(name)s' % row) + + # # Compare this to what we would have gotten if we hadn't done any + # # deduplication + # cur.execute( + # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) as name, " + # "SUM(CAST(contributions.amount AS FLOAT)) AS totals " + # "FROM donors INNER JOIN contributions " + # "USING (donor_id) " + # "GROUP BY (donor_id) " + # "ORDER BY totals DESC " + # "LIMIT 10" + # ) + + # print("Top Donors (raw)") + # for row in cur: + # row['totals'] = locale.currency(row['totals'], grouping=True) + # print('%(totals)20s: %(name)s' % row) read_con.close() write_con.close() diff --git a/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py b/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py new file mode 100644 index 00000000..8ba235cb --- /dev/null +++ b/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py @@ -0,0 +1,371 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +This is an example of working with very large data. There are about +700,000 unduplicated donors in this database of Illinois political +campaign contributions. + +With such a large set of input data, we cannot store all the comparisons +we need to make in memory. Instead, we will read the pairs on demand +from the PostgresSQL database. + +__Note:__ You will need to run `python pgsql_big_dedupe_example_init_db.py` +before running this script. + +For smaller datasets (<10,000), see our +[csv_example](http://datamade.github.io/dedupe-examples/docs/csv_example.html) +""" +import os +import time +import logging +import optparse +import locale +import itertools +import io +import csv + +import dj_database_url +import psycopg2 +import psycopg2.extras + +import dedupe +import numpy + + +from psycopg2.extensions import register_adapter, AsIs +register_adapter(numpy.int32, AsIs) +register_adapter(numpy.int64, AsIs) +register_adapter(numpy.float32, AsIs) +register_adapter(numpy.float64, AsIs) + + +class Readable(object): + + def __init__(self, iterator): + + self.output = io.StringIO() + self.writer = csv.writer(self.output) + self.iterator = iterator + + def read(self, size): + + self.writer.writerows(itertools.islice(self.iterator, size)) + + chunk = self.output.getvalue() + self.output.seek(0) + self.output.truncate(0) + + return chunk + + +def record_pairs(result_set): + + for i, row in enumerate(result_set): + a_record_id, a_record, b_record_id, b_record = row + record_a = (a_record_id, a_record) + record_b = (b_record_id, b_record) + + yield record_a, record_b + + if i % 10000 == 0: + print(i) + + +def cluster_ids(clustered_dupes): + + for cluster, scores in clustered_dupes: + cluster_id = cluster[0] + for donor_id, score in zip(cluster, scores): + yield donor_id, cluster_id, score + + +if __name__ == '__main__': + # ## Logging + + # Dedupe uses Python logging to show or suppress verbose output. Added + # for convenience. To enable verbose output, run `python + # pgsql_big_dedupe_example.py -v` + optp = optparse.OptionParser() + optp.add_option('-v', '--verbose', dest='verbose', action='count', + help='Increase verbosity (specify multiple times for more)' + ) + (opts, args) = optp.parse_args() + log_level = logging.WARNING + if opts.verbose: + if opts.verbose == 1: + log_level = logging.INFO + elif opts.verbose >= 2: + log_level = logging.DEBUG + logging.getLogger().setLevel(log_level) + + # ## Setup + settings_file = 'pgsql_big_dedupe_example_settings.with-indexes' + training_file = 'pgsql_big_dedupe_example_training.json' + + start_time = time.time() + + # Set the database connection from environment variable using + # [dj_database_url](https://github.com/kennethreitz/dj-database-url) + # For example: + # export DATABASE_URL=postgres://user:password@host/mydatabase + db_conf = dj_database_url.config() + + if not db_conf: + raise Exception( + 'set DATABASE_URL environment variable with your connection, e.g. ' + 'export DATABASE_URL=postgres://user:password@host/mydatabase' + ) + + read_con = psycopg2.connect(database=db_conf['NAME'], + user=db_conf['USER'], + password=db_conf['PASSWORD'], + host=db_conf['HOST'], + cursor_factory=psycopg2.extras.RealDictCursor) + + write_con = psycopg2.connect(database=db_conf['NAME'], + user=db_conf['USER'], + password=db_conf['PASSWORD'], + host=db_conf['HOST']) + + # We'll be using variations on this following select statement to pull + # in campaign donor info. + # + # We did a fair amount of preprocessing of the fields in + # `pgsql_big_dedupe_example_init_db.py` + + DONOR_SELECT = "SELECT donor_id, city, name, zip, state, address " \ + "from processed_donors" + + # ## Training + + if os.path.exists(settings_file): + print('reading from ', settings_file) + with open(settings_file, 'rb') as sf: + deduper = dedupe.StaticDedupe(sf, num_cores=4) + else: + + # Define the fields dedupe will pay attention to + # + # The address, city, and zip fields are often missing, so we'll + # tell dedupe that, and we'll learn a model that take that into + # account + fields = [{'field': 'name', 'type': 'String'}, + {'field': 'address', 'type': 'String', + 'has missing': True}, + {'field': 'city', 'type': 'ShortString', 'has missing': True}, + {'field': 'state', 'type': 'ShortString', 'has missing': True}, + {'field': 'zip', 'type': 'ShortString', 'has missing': True}, + ] + + # Create a new deduper object and pass our data model to it. + deduper = dedupe.Dedupe(fields, num_cores=4) + + # Named cursor runs server side with psycopg2 + with read_con.cursor('donor_select') as cur: + cur.execute(DONOR_SELECT) + temp_d = {i: row for i, row in enumerate(cur)} + + # If we have training data saved from a previous run of dedupe, + # look for it an load it in. + # + # __Note:__ if you want to train from + # scratch, delete the training_file + if os.path.exists(training_file): + print('reading labeled examples from ', training_file) + with open(training_file) as tf: + deduper.prepare_training(temp_d, tf) + else: + deduper.prepare_training(temp_d) + + del temp_d + + # ## Active learning + + print('starting active labeling...') + # Starts the training loop. Dedupe will find the next pair of records + # it is least certain about and ask you to label them as duplicates + # or not. + + # use 'y', 'n' and 'u' keys to flag duplicates + # press 'f' when you are finished + dedupe.console_label(deduper) + # When finished, save our labeled, training pairs to disk + with open(training_file, 'w') as tf: + deduper.write_training(tf) + + # Notice our argument here + # + # `recall` is the proportion of true dupes pairs that the learned + # rules must cover. You may want to reduce this if your are making + # too many blocks and too many comparisons. + deduper.train(recall=0.90) + + with open(settings_file, 'wb') as sf: + deduper.write_settings(sf) + + # We can now remove some of the memory hogging objects we used + # for training + deduper.cleanup_training() + + # ## Blocking + print('blocking...') + + # To run blocking on such a large set of data, we create a separate table + # that contains blocking keys and record ids + print('creating blocking_map_serial database') + with write_con: + with write_con.cursor() as cur: + cur.execute("DROP TABLE IF EXISTS blocking_map_serial") + cur.execute("CREATE TABLE blocking_map_serial " + "(block_key text, donor_id INTEGER)") + + # If dedupe learned a Index Predicate, we have to take a pass + # through the data and create indices. + print('creating inverted index') + + for field in deduper.fingerprinter.index_fields: + with read_con.cursor('field_values') as cur: + cur.execute("SELECT DISTINCT %s FROM processed_donors" % field) + field_data = (row[field] for row in cur) + deduper.fingerprinter.index(field_data, field) + + # Now we are ready to write our blocking map table by creating a + # generator that yields unique `(block_key, donor_id)` tuples. + print('writing blocking map') + + with read_con.cursor('donor_select') as read_cur: + read_cur.execute(DONOR_SELECT) + + full_data = ((row['donor_id'], row) for row in read_cur) + b_data = deduper.fingerprinter(full_data) + + with write_con: + with write_con.cursor() as write_cur: + write_cur.copy_expert('COPY blocking_map_serial FROM STDIN WITH CSV', + Readable(b_data), + size=10000) + + # # free up memory by removing indices + # deduper.fingerprinter.reset_indices() + + # logging.info("indexing block_key") + # with write_con: + # with write_con.cursor() as cur: + # cur.execute("CREATE UNIQUE INDEX ON blocking_map_serial " + # "(block_key text_pattern_ops, donor_id)") + + # # ## Clustering + + # with write_con: + # with write_con.cursor() as cur: + # cur.execute("DROP TABLE IF EXISTS entity_map") + + # print('creating entity_map database') + # cur.execute("CREATE TABLE entity_map " + # "(donor_id INTEGER, canon_id INTEGER, " + # " cluster_score FLOAT, PRIMARY KEY(donor_id))") + + # with read_con.cursor('pairs', cursor_factory=psycopg2.extensions.cursor) as read_cur: + # read_cur.execute(""" + # select a.donor_id, + # row_to_json((select d from (select a.city, + # a.name, + # a.zip, + # a.state, + # a.address) d)), + # b.donor_id, + # row_to_json((select d from (select b.city, + # b.name, + # b.zip, + # b.state, + # b.address) d)) + # from (select DISTINCT l.donor_id as east, r.donor_id as west + # from blocking_map_serial as l + # INNER JOIN blocking_map_serial as r + # using (block_key) + # where l.donor_id < r.donor_id) ids + # INNER JOIN processed_donors a on ids.east=a.donor_id + # INNER JOIN processed_donors b on ids.west=b.donor_id""") + + # print('clustering...') + # clustered_dupes = deduper.cluster(deduper.score(record_pairs(read_cur)), + # threshold=0.5) + + # # ## Writing out results + + # # We now have a sequence of tuples of donor ids that dedupe believes + # # all refer to the same entity. We write this out onto an entity map + # # table + + # print('writing results') + # with write_con: + # with write_con.cursor() as write_cur: + # write_cur.copy_expert('COPY entity_map FROM STDIN WITH CSV', + # Readable(cluster_ids(clustered_dupes)), + # size=10000) + + # with write_con: + # with write_con.cursor() as cur: + # cur.execute("CREATE INDEX head_index ON entity_map (canon_id)") + + # # Print out the number of duplicates found + + # # ## Payoff + + # # With all this done, we can now begin to ask interesting questions + # # of the data + # # + # # For example, let's see who the top 10 donors are. + + # locale.setlocale(locale.LC_ALL, '') # for pretty printing numbers + + # # Create a temporary table so each group and unmatched record has + # # a unique id + + # with read_con.cursor() as cur: + # cur.execute("CREATE TEMPORARY TABLE e_map " + # "AS SELECT COALESCE(canon_id, donor_id) AS canon_id, donor_id " + # "FROM entity_map " + # "RIGHT JOIN donors USING(donor_id)") + + # cur.execute( + # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) AS name, " + # "donation_totals.totals AS totals " + # "FROM donors INNER JOIN " + # "(SELECT canon_id, SUM(CAST(amount AS FLOAT)) AS totals " + # " FROM contributions INNER JOIN e_map " + # " USING (donor_id) " + # " GROUP BY (canon_id) " + # " ORDER BY totals " + # " DESC LIMIT 10) " + # "AS donation_totals ON donors.donor_id=donation_totals.canon_id " + # "WHERE donors.donor_id = donation_totals.canon_id" + # ) + + # print("Top Donors (deduped)") + # for row in cur: + # row['totals'] = locale.currency(row['totals'], grouping=True) + # print('%(totals)20s: %(name)s' % row) + + # # Compare this to what we would have gotten if we hadn't done any + # # deduplication + # cur.execute( + # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) as name, " + # "SUM(CAST(contributions.amount AS FLOAT)) AS totals " + # "FROM donors INNER JOIN contributions " + # "USING (donor_id) " + # "GROUP BY (donor_id) " + # "ORDER BY totals DESC " + # "LIMIT 10" + # ) + + # print("Top Donors (raw)") + # for row in cur: + # row['totals'] = locale.currency(row['totals'], grouping=True) + # print('%(totals)20s: %(name)s' % row) + + read_con.close() + write_con.close() + + print('ran in', time.time() - start_time, 'seconds') diff --git a/pgsql_big_dedupe_example/test_parallel_vs_serial.sh b/pgsql_big_dedupe_example/test_parallel_vs_serial.sh new file mode 100755 index 00000000..68eef4b9 --- /dev/null +++ b/pgsql_big_dedupe_example/test_parallel_vs_serial.sh @@ -0,0 +1,7 @@ +#!/bin/bash +set -e # exit immediately + +echo "Parallel:" +time (python pgsql_big_dedupe_example.py > /dev/null 2>&1) +echo "Serial:" +time (python pgsql_big_dedupe_example_serial.py > /dev/null 2>&1) From 3121218be216606bd9bd3a429206abf274f26a17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fl=C3=A1vio=20Juvenal?= Date: Sat, 19 Sep 2020 14:14:08 -0300 Subject: [PATCH 6/6] Delete performance test files --- .../pgsql_big_dedupe_example.py | 230 +++++------ .../pgsql_big_dedupe_example_serial.py | 371 ------------------ .../test_parallel_vs_serial.sh | 7 - 3 files changed, 115 insertions(+), 493 deletions(-) delete mode 100644 pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py delete mode 100755 pgsql_big_dedupe_example/test_parallel_vs_serial.sh diff --git a/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py b/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py index df8b8320..57460476 100755 --- a/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py +++ b/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py @@ -336,121 +336,121 @@ def parallel_fingerprinter(db_conf, deduper_fingerprinter, donor_partition_selec # free up memory by removing indices deduper.fingerprinter.reset_indices() - # logging.info("indexing block_key") - # with write_con: - # with write_con.cursor() as cur: - # cur.execute("CREATE UNIQUE INDEX ON blocking_map " - # "(block_key text_pattern_ops, donor_id)") - - # # ## Clustering - - # with write_con: - # with write_con.cursor() as cur: - # cur.execute("DROP TABLE IF EXISTS entity_map") - - # print('creating entity_map database') - # cur.execute("CREATE TABLE entity_map " - # "(donor_id INTEGER, canon_id INTEGER, " - # " cluster_score FLOAT, PRIMARY KEY(donor_id))") - - # with read_con.cursor('pairs', cursor_factory=psycopg2.extensions.cursor) as read_cur: - # read_cur.execute(""" - # select a.donor_id, - # row_to_json((select d from (select a.city, - # a.name, - # a.zip, - # a.state, - # a.address) d)), - # b.donor_id, - # row_to_json((select d from (select b.city, - # b.name, - # b.zip, - # b.state, - # b.address) d)) - # from (select DISTINCT l.donor_id as east, r.donor_id as west - # from blocking_map as l - # INNER JOIN blocking_map as r - # using (block_key) - # where l.donor_id < r.donor_id) ids - # INNER JOIN processed_donors a on ids.east=a.donor_id - # INNER JOIN processed_donors b on ids.west=b.donor_id""") - - # print('clustering...') - # clustered_dupes = deduper.cluster(deduper.score(record_pairs(read_cur)), - # threshold=0.5) - - # # ## Writing out results - - # # We now have a sequence of tuples of donor ids that dedupe believes - # # all refer to the same entity. We write this out onto an entity map - # # table - - # print('writing results') - # with write_con: - # with write_con.cursor() as write_cur: - # write_cur.copy_expert('COPY entity_map FROM STDIN WITH CSV', - # Readable(cluster_ids(clustered_dupes)), - # size=10000) - - # with write_con: - # with write_con.cursor() as cur: - # cur.execute("CREATE INDEX head_index ON entity_map (canon_id)") - - # # Print out the number of duplicates found - - # # ## Payoff - - # # With all this done, we can now begin to ask interesting questions - # # of the data - # # - # # For example, let's see who the top 10 donors are. - - # locale.setlocale(locale.LC_ALL, '') # for pretty printing numbers - - # # Create a temporary table so each group and unmatched record has - # # a unique id - - # with read_con.cursor() as cur: - # cur.execute("CREATE TEMPORARY TABLE e_map " - # "AS SELECT COALESCE(canon_id, donor_id) AS canon_id, donor_id " - # "FROM entity_map " - # "RIGHT JOIN donors USING(donor_id)") - - # cur.execute( - # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) AS name, " - # "donation_totals.totals AS totals " - # "FROM donors INNER JOIN " - # "(SELECT canon_id, SUM(CAST(amount AS FLOAT)) AS totals " - # " FROM contributions INNER JOIN e_map " - # " USING (donor_id) " - # " GROUP BY (canon_id) " - # " ORDER BY totals " - # " DESC LIMIT 10) " - # "AS donation_totals ON donors.donor_id=donation_totals.canon_id " - # "WHERE donors.donor_id = donation_totals.canon_id" - # ) - - # print("Top Donors (deduped)") - # for row in cur: - # row['totals'] = locale.currency(row['totals'], grouping=True) - # print('%(totals)20s: %(name)s' % row) - - # # Compare this to what we would have gotten if we hadn't done any - # # deduplication - # cur.execute( - # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) as name, " - # "SUM(CAST(contributions.amount AS FLOAT)) AS totals " - # "FROM donors INNER JOIN contributions " - # "USING (donor_id) " - # "GROUP BY (donor_id) " - # "ORDER BY totals DESC " - # "LIMIT 10" - # ) - - # print("Top Donors (raw)") - # for row in cur: - # row['totals'] = locale.currency(row['totals'], grouping=True) - # print('%(totals)20s: %(name)s' % row) + logging.info("indexing block_key") + with write_con: + with write_con.cursor() as cur: + cur.execute("CREATE UNIQUE INDEX ON blocking_map " + "(block_key text_pattern_ops, donor_id)") + + # ## Clustering + + with write_con: + with write_con.cursor() as cur: + cur.execute("DROP TABLE IF EXISTS entity_map") + + print('creating entity_map database') + cur.execute("CREATE TABLE entity_map " + "(donor_id INTEGER, canon_id INTEGER, " + " cluster_score FLOAT, PRIMARY KEY(donor_id))") + + with read_con.cursor('pairs', cursor_factory=psycopg2.extensions.cursor) as read_cur: + read_cur.execute(""" + select a.donor_id, + row_to_json((select d from (select a.city, + a.name, + a.zip, + a.state, + a.address) d)), + b.donor_id, + row_to_json((select d from (select b.city, + b.name, + b.zip, + b.state, + b.address) d)) + from (select DISTINCT l.donor_id as east, r.donor_id as west + from blocking_map as l + INNER JOIN blocking_map as r + using (block_key) + where l.donor_id < r.donor_id) ids + INNER JOIN processed_donors a on ids.east=a.donor_id + INNER JOIN processed_donors b on ids.west=b.donor_id""") + + print('clustering...') + clustered_dupes = deduper.cluster(deduper.score(record_pairs(read_cur)), + threshold=0.5) + + # ## Writing out results + + # We now have a sequence of tuples of donor ids that dedupe believes + # all refer to the same entity. We write this out onto an entity map + # table + + print('writing results') + with write_con: + with write_con.cursor() as write_cur: + write_cur.copy_expert('COPY entity_map FROM STDIN WITH CSV', + Readable(cluster_ids(clustered_dupes)), + size=10000) + + with write_con: + with write_con.cursor() as cur: + cur.execute("CREATE INDEX head_index ON entity_map (canon_id)") + + # Print out the number of duplicates found + + # ## Payoff + + # With all this done, we can now begin to ask interesting questions + # of the data + # + # For example, let's see who the top 10 donors are. + + locale.setlocale(locale.LC_ALL, '') # for pretty printing numbers + + # Create a temporary table so each group and unmatched record has + # a unique id + + with read_con.cursor() as cur: + cur.execute("CREATE TEMPORARY TABLE e_map " + "AS SELECT COALESCE(canon_id, donor_id) AS canon_id, donor_id " + "FROM entity_map " + "RIGHT JOIN donors USING(donor_id)") + + cur.execute( + "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) AS name, " + "donation_totals.totals AS totals " + "FROM donors INNER JOIN " + "(SELECT canon_id, SUM(CAST(amount AS FLOAT)) AS totals " + " FROM contributions INNER JOIN e_map " + " USING (donor_id) " + " GROUP BY (canon_id) " + " ORDER BY totals " + " DESC LIMIT 10) " + "AS donation_totals ON donors.donor_id=donation_totals.canon_id " + "WHERE donors.donor_id = donation_totals.canon_id" + ) + + print("Top Donors (deduped)") + for row in cur: + row['totals'] = locale.currency(row['totals'], grouping=True) + print('%(totals)20s: %(name)s' % row) + + # Compare this to what we would have gotten if we hadn't done any + # deduplication + cur.execute( + "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) as name, " + "SUM(CAST(contributions.amount AS FLOAT)) AS totals " + "FROM donors INNER JOIN contributions " + "USING (donor_id) " + "GROUP BY (donor_id) " + "ORDER BY totals DESC " + "LIMIT 10" + ) + + print("Top Donors (raw)") + for row in cur: + row['totals'] = locale.currency(row['totals'], grouping=True) + print('%(totals)20s: %(name)s' % row) read_con.close() write_con.close() diff --git a/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py b/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py deleted file mode 100644 index 8ba235cb..00000000 --- a/pgsql_big_dedupe_example/pgsql_big_dedupe_example_serial.py +++ /dev/null @@ -1,371 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" -This is an example of working with very large data. There are about -700,000 unduplicated donors in this database of Illinois political -campaign contributions. - -With such a large set of input data, we cannot store all the comparisons -we need to make in memory. Instead, we will read the pairs on demand -from the PostgresSQL database. - -__Note:__ You will need to run `python pgsql_big_dedupe_example_init_db.py` -before running this script. - -For smaller datasets (<10,000), see our -[csv_example](http://datamade.github.io/dedupe-examples/docs/csv_example.html) -""" -import os -import time -import logging -import optparse -import locale -import itertools -import io -import csv - -import dj_database_url -import psycopg2 -import psycopg2.extras - -import dedupe -import numpy - - -from psycopg2.extensions import register_adapter, AsIs -register_adapter(numpy.int32, AsIs) -register_adapter(numpy.int64, AsIs) -register_adapter(numpy.float32, AsIs) -register_adapter(numpy.float64, AsIs) - - -class Readable(object): - - def __init__(self, iterator): - - self.output = io.StringIO() - self.writer = csv.writer(self.output) - self.iterator = iterator - - def read(self, size): - - self.writer.writerows(itertools.islice(self.iterator, size)) - - chunk = self.output.getvalue() - self.output.seek(0) - self.output.truncate(0) - - return chunk - - -def record_pairs(result_set): - - for i, row in enumerate(result_set): - a_record_id, a_record, b_record_id, b_record = row - record_a = (a_record_id, a_record) - record_b = (b_record_id, b_record) - - yield record_a, record_b - - if i % 10000 == 0: - print(i) - - -def cluster_ids(clustered_dupes): - - for cluster, scores in clustered_dupes: - cluster_id = cluster[0] - for donor_id, score in zip(cluster, scores): - yield donor_id, cluster_id, score - - -if __name__ == '__main__': - # ## Logging - - # Dedupe uses Python logging to show or suppress verbose output. Added - # for convenience. To enable verbose output, run `python - # pgsql_big_dedupe_example.py -v` - optp = optparse.OptionParser() - optp.add_option('-v', '--verbose', dest='verbose', action='count', - help='Increase verbosity (specify multiple times for more)' - ) - (opts, args) = optp.parse_args() - log_level = logging.WARNING - if opts.verbose: - if opts.verbose == 1: - log_level = logging.INFO - elif opts.verbose >= 2: - log_level = logging.DEBUG - logging.getLogger().setLevel(log_level) - - # ## Setup - settings_file = 'pgsql_big_dedupe_example_settings.with-indexes' - training_file = 'pgsql_big_dedupe_example_training.json' - - start_time = time.time() - - # Set the database connection from environment variable using - # [dj_database_url](https://github.com/kennethreitz/dj-database-url) - # For example: - # export DATABASE_URL=postgres://user:password@host/mydatabase - db_conf = dj_database_url.config() - - if not db_conf: - raise Exception( - 'set DATABASE_URL environment variable with your connection, e.g. ' - 'export DATABASE_URL=postgres://user:password@host/mydatabase' - ) - - read_con = psycopg2.connect(database=db_conf['NAME'], - user=db_conf['USER'], - password=db_conf['PASSWORD'], - host=db_conf['HOST'], - cursor_factory=psycopg2.extras.RealDictCursor) - - write_con = psycopg2.connect(database=db_conf['NAME'], - user=db_conf['USER'], - password=db_conf['PASSWORD'], - host=db_conf['HOST']) - - # We'll be using variations on this following select statement to pull - # in campaign donor info. - # - # We did a fair amount of preprocessing of the fields in - # `pgsql_big_dedupe_example_init_db.py` - - DONOR_SELECT = "SELECT donor_id, city, name, zip, state, address " \ - "from processed_donors" - - # ## Training - - if os.path.exists(settings_file): - print('reading from ', settings_file) - with open(settings_file, 'rb') as sf: - deduper = dedupe.StaticDedupe(sf, num_cores=4) - else: - - # Define the fields dedupe will pay attention to - # - # The address, city, and zip fields are often missing, so we'll - # tell dedupe that, and we'll learn a model that take that into - # account - fields = [{'field': 'name', 'type': 'String'}, - {'field': 'address', 'type': 'String', - 'has missing': True}, - {'field': 'city', 'type': 'ShortString', 'has missing': True}, - {'field': 'state', 'type': 'ShortString', 'has missing': True}, - {'field': 'zip', 'type': 'ShortString', 'has missing': True}, - ] - - # Create a new deduper object and pass our data model to it. - deduper = dedupe.Dedupe(fields, num_cores=4) - - # Named cursor runs server side with psycopg2 - with read_con.cursor('donor_select') as cur: - cur.execute(DONOR_SELECT) - temp_d = {i: row for i, row in enumerate(cur)} - - # If we have training data saved from a previous run of dedupe, - # look for it an load it in. - # - # __Note:__ if you want to train from - # scratch, delete the training_file - if os.path.exists(training_file): - print('reading labeled examples from ', training_file) - with open(training_file) as tf: - deduper.prepare_training(temp_d, tf) - else: - deduper.prepare_training(temp_d) - - del temp_d - - # ## Active learning - - print('starting active labeling...') - # Starts the training loop. Dedupe will find the next pair of records - # it is least certain about and ask you to label them as duplicates - # or not. - - # use 'y', 'n' and 'u' keys to flag duplicates - # press 'f' when you are finished - dedupe.console_label(deduper) - # When finished, save our labeled, training pairs to disk - with open(training_file, 'w') as tf: - deduper.write_training(tf) - - # Notice our argument here - # - # `recall` is the proportion of true dupes pairs that the learned - # rules must cover. You may want to reduce this if your are making - # too many blocks and too many comparisons. - deduper.train(recall=0.90) - - with open(settings_file, 'wb') as sf: - deduper.write_settings(sf) - - # We can now remove some of the memory hogging objects we used - # for training - deduper.cleanup_training() - - # ## Blocking - print('blocking...') - - # To run blocking on such a large set of data, we create a separate table - # that contains blocking keys and record ids - print('creating blocking_map_serial database') - with write_con: - with write_con.cursor() as cur: - cur.execute("DROP TABLE IF EXISTS blocking_map_serial") - cur.execute("CREATE TABLE blocking_map_serial " - "(block_key text, donor_id INTEGER)") - - # If dedupe learned a Index Predicate, we have to take a pass - # through the data and create indices. - print('creating inverted index') - - for field in deduper.fingerprinter.index_fields: - with read_con.cursor('field_values') as cur: - cur.execute("SELECT DISTINCT %s FROM processed_donors" % field) - field_data = (row[field] for row in cur) - deduper.fingerprinter.index(field_data, field) - - # Now we are ready to write our blocking map table by creating a - # generator that yields unique `(block_key, donor_id)` tuples. - print('writing blocking map') - - with read_con.cursor('donor_select') as read_cur: - read_cur.execute(DONOR_SELECT) - - full_data = ((row['donor_id'], row) for row in read_cur) - b_data = deduper.fingerprinter(full_data) - - with write_con: - with write_con.cursor() as write_cur: - write_cur.copy_expert('COPY blocking_map_serial FROM STDIN WITH CSV', - Readable(b_data), - size=10000) - - # # free up memory by removing indices - # deduper.fingerprinter.reset_indices() - - # logging.info("indexing block_key") - # with write_con: - # with write_con.cursor() as cur: - # cur.execute("CREATE UNIQUE INDEX ON blocking_map_serial " - # "(block_key text_pattern_ops, donor_id)") - - # # ## Clustering - - # with write_con: - # with write_con.cursor() as cur: - # cur.execute("DROP TABLE IF EXISTS entity_map") - - # print('creating entity_map database') - # cur.execute("CREATE TABLE entity_map " - # "(donor_id INTEGER, canon_id INTEGER, " - # " cluster_score FLOAT, PRIMARY KEY(donor_id))") - - # with read_con.cursor('pairs', cursor_factory=psycopg2.extensions.cursor) as read_cur: - # read_cur.execute(""" - # select a.donor_id, - # row_to_json((select d from (select a.city, - # a.name, - # a.zip, - # a.state, - # a.address) d)), - # b.donor_id, - # row_to_json((select d from (select b.city, - # b.name, - # b.zip, - # b.state, - # b.address) d)) - # from (select DISTINCT l.donor_id as east, r.donor_id as west - # from blocking_map_serial as l - # INNER JOIN blocking_map_serial as r - # using (block_key) - # where l.donor_id < r.donor_id) ids - # INNER JOIN processed_donors a on ids.east=a.donor_id - # INNER JOIN processed_donors b on ids.west=b.donor_id""") - - # print('clustering...') - # clustered_dupes = deduper.cluster(deduper.score(record_pairs(read_cur)), - # threshold=0.5) - - # # ## Writing out results - - # # We now have a sequence of tuples of donor ids that dedupe believes - # # all refer to the same entity. We write this out onto an entity map - # # table - - # print('writing results') - # with write_con: - # with write_con.cursor() as write_cur: - # write_cur.copy_expert('COPY entity_map FROM STDIN WITH CSV', - # Readable(cluster_ids(clustered_dupes)), - # size=10000) - - # with write_con: - # with write_con.cursor() as cur: - # cur.execute("CREATE INDEX head_index ON entity_map (canon_id)") - - # # Print out the number of duplicates found - - # # ## Payoff - - # # With all this done, we can now begin to ask interesting questions - # # of the data - # # - # # For example, let's see who the top 10 donors are. - - # locale.setlocale(locale.LC_ALL, '') # for pretty printing numbers - - # # Create a temporary table so each group and unmatched record has - # # a unique id - - # with read_con.cursor() as cur: - # cur.execute("CREATE TEMPORARY TABLE e_map " - # "AS SELECT COALESCE(canon_id, donor_id) AS canon_id, donor_id " - # "FROM entity_map " - # "RIGHT JOIN donors USING(donor_id)") - - # cur.execute( - # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) AS name, " - # "donation_totals.totals AS totals " - # "FROM donors INNER JOIN " - # "(SELECT canon_id, SUM(CAST(amount AS FLOAT)) AS totals " - # " FROM contributions INNER JOIN e_map " - # " USING (donor_id) " - # " GROUP BY (canon_id) " - # " ORDER BY totals " - # " DESC LIMIT 10) " - # "AS donation_totals ON donors.donor_id=donation_totals.canon_id " - # "WHERE donors.donor_id = donation_totals.canon_id" - # ) - - # print("Top Donors (deduped)") - # for row in cur: - # row['totals'] = locale.currency(row['totals'], grouping=True) - # print('%(totals)20s: %(name)s' % row) - - # # Compare this to what we would have gotten if we hadn't done any - # # deduplication - # cur.execute( - # "SELECT CONCAT_WS(' ', donors.first_name, donors.last_name) as name, " - # "SUM(CAST(contributions.amount AS FLOAT)) AS totals " - # "FROM donors INNER JOIN contributions " - # "USING (donor_id) " - # "GROUP BY (donor_id) " - # "ORDER BY totals DESC " - # "LIMIT 10" - # ) - - # print("Top Donors (raw)") - # for row in cur: - # row['totals'] = locale.currency(row['totals'], grouping=True) - # print('%(totals)20s: %(name)s' % row) - - read_con.close() - write_con.close() - - print('ran in', time.time() - start_time, 'seconds') diff --git a/pgsql_big_dedupe_example/test_parallel_vs_serial.sh b/pgsql_big_dedupe_example/test_parallel_vs_serial.sh deleted file mode 100755 index 68eef4b9..00000000 --- a/pgsql_big_dedupe_example/test_parallel_vs_serial.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash -set -e # exit immediately - -echo "Parallel:" -time (python pgsql_big_dedupe_example.py > /dev/null 2>&1) -echo "Serial:" -time (python pgsql_big_dedupe_example_serial.py > /dev/null 2>&1)