diff --git a/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py b/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py index e2c62957..57460476 100755 --- a/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py +++ b/pgsql_big_dedupe_example/pgsql_big_dedupe_example.py @@ -24,12 +24,15 @@ import itertools import io import csv +import multiprocessing +import math import dj_database_url import psycopg2 import psycopg2.extras import dedupe +from dedupe.backport import Pool import numpy @@ -44,8 +47,16 @@ class Readable(object): def __init__(self, iterator): - self.output = io.StringIO() - self.writer = csv.writer(self.output) + self.output = io.StringIO( + # necesary for csv. See: https://docs.python.org/3/library/csv.html#id3 + newline='', + ) + self.writer = csv.writer( + self.output, + # csv.unix_dialect seems the right one + # based on our tests and Postgres CSV defaults: + # https://www.postgresql.org/docs/12/sql-copy.html + dialect=csv.unix_dialect) self.iterator = iterator def read(self, size): @@ -80,6 +91,30 @@ def cluster_ids(clustered_dupes): yield donor_id, cluster_id, score +def parallel_fingerprinter(db_conf, deduper_fingerprinter, donor_partition_select, partition_offset, partition_end): + read_con = psycopg2.connect(database=db_conf['NAME'], + user=db_conf['USER'], + password=db_conf['PASSWORD'], + host=db_conf['HOST'], + cursor_factory=psycopg2.extras.RealDictCursor) + write_con = psycopg2.connect(database=db_conf['NAME'], + user=db_conf['USER'], + password=db_conf['PASSWORD'], + host=db_conf['HOST']) + + with read_con.cursor('donor_partition_select') as read_cur: + read_cur.execute(donor_partition_select, (partition_offset, partition_end)) + + partition_data = ((row['donor_id'], row) for row in read_cur) + b_data = deduper_fingerprinter(partition_data) + + with write_con: + with write_con.cursor() as write_cur: + write_cur.copy_expert('COPY blocking_map FROM STDIN WITH CSV', + Readable(b_data), + size=10000) + + if __name__ == '__main__': # ## Logging @@ -100,8 +135,9 @@ def cluster_ids(clustered_dupes): logging.getLogger().setLevel(log_level) # ## Setup - settings_file = 'pgsql_big_dedupe_example_settings' + settings_file = 'pgsql_big_dedupe_example_settings.with-indexes' training_file = 'pgsql_big_dedupe_example_training.json' + num_cores = multiprocessing.cpu_count() start_time = time.time() @@ -135,14 +171,19 @@ def cluster_ids(clustered_dupes): # `pgsql_big_dedupe_example_init_db.py` DONOR_SELECT = "SELECT donor_id, city, name, zip, state, address " \ - "from processed_donors" + "FROM processed_donors" + DONOR_PARTITION_SELECT = "SELECT donor_id, city, name, zip, state, address " \ + "FROM processed_donors " \ + "WHERE donor_id >= %s " \ + "AND donor_id < %s" + COUNT_SELECT = "SELECT COUNT(*) FROM processed_donors" # ## Training if os.path.exists(settings_file): print('reading from ', settings_file) with open(settings_file, 'rb') as sf: - deduper = dedupe.StaticDedupe(sf, num_cores=4) + deduper = dedupe.StaticDedupe(sf, num_cores=num_cores) else: # Define the fields dedupe will pay attention to @@ -159,7 +200,7 @@ def cluster_ids(clustered_dupes): ] # Create a new deduper object and pass our data model to it. - deduper = dedupe.Dedupe(fields, num_cores=4) + deduper = dedupe.Dedupe(fields, num_cores=num_cores) # Named cursor runs server side with psycopg2 with read_con.cursor('donor_select') as cur: @@ -220,34 +261,80 @@ def cluster_ids(clustered_dupes): cur.execute("CREATE TABLE blocking_map " "(block_key text, donor_id INTEGER)") + # Compute `(block_key, donor_id)` tuples and write on `blocking_map` in parallel, + # but only for the non-index predicates. Only those can run in parallel + # because they don't need global predicate indexes. + # We cannot share predicate indexes across Python processes because it would consume + # too much RAM + print('computing blocking map in parallel (non-index predicates)') + + predicates_without_index = [] + predicates_with_index = [] + for full_predicate in deduper.fingerprinter.predicates: + if any(predicate in deduper.fingerprinter.index_predicates + for predicate in full_predicate): + predicates_with_index.append(full_predicate) + else: + predicates_without_index.append(full_predicate) + + # Use only predicates WITHOUT indexes for parallel blocking + deduper.fingerprinter.predicates = predicates_without_index + + # processed_donors `id` starts at 1 and goes up to `COUNT(*)` + with read_con.cursor('donor_select') as cur: + cur.execute(COUNT_SELECT) + total_rows = cur.fetchone()['count'] + partition_size = math.ceil(total_rows / num_cores) + partition_offsets = range(1, total_rows + 1, partition_size) + + # Use a multiprocessing.Pool to run parallel_fingerprinter in num_cores. + with Pool(processes=num_cores) as pool: + block_file_path_list = pool.starmap( + parallel_fingerprinter, + [ + ( + db_conf, + deduper.fingerprinter, + DONOR_PARTITION_SELECT, + partition_offset, + (partition_offset + partition_size) + ) + for partition_offset in partition_offsets + ], + ) + # If dedupe learned a Index Predicate, we have to take a pass - # through the data and create indices. - print('creating inverted index') + # through the data and create indices + if predicates_with_index: + print('creating inverted indexes') - for field in deduper.fingerprinter.index_fields: - with read_con.cursor('field_values') as cur: - cur.execute("SELECT DISTINCT %s FROM processed_donors" % field) - field_data = (row[field] for row in cur) - deduper.fingerprinter.index(field_data, field) + # Use only predicates WITH indexes for non-parallel blocking + deduper.fingerprinter.predicates = predicates_with_index - # Now we are ready to write our blocking map table by creating a - # generator that yields unique `(block_key, donor_id)` tuples. - print('writing blocking map') + for field in deduper.fingerprinter.index_fields: + with read_con.cursor('field_values') as cur: + cur.execute("SELECT DISTINCT %s FROM processed_donors" % field) + field_data = (row[field] for row in cur) + deduper.fingerprinter.index(field_data, field) - with read_con.cursor('donor_select') as read_cur: - read_cur.execute(DONOR_SELECT) + # Compute `(block_key, donor_id)` tuples and write on `blocking_map` in serial + # for the index predicates + print('computing and writing blocking map (index predicates)') - full_data = ((row['donor_id'], row) for row in read_cur) - b_data = deduper.fingerprinter(full_data) + with read_con.cursor('donor_select') as read_cur: + read_cur.execute(DONOR_SELECT) - with write_con: - with write_con.cursor() as write_cur: - write_cur.copy_expert('COPY blocking_map FROM STDIN WITH CSV', - Readable(b_data), - size=10000) + full_data = ((row['donor_id'], row) for row in read_cur) + b_data = deduper.fingerprinter(full_data) + + with write_con: + with write_con.cursor() as write_cur: + write_cur.copy_expert('COPY blocking_map FROM STDIN WITH CSV', + Readable(b_data), + size=10000) - # free up memory by removing indices - deduper.fingerprinter.reset_indices() + # free up memory by removing indices + deduper.fingerprinter.reset_indices() logging.info("indexing block_key") with write_con: