diff --git a/Makefile b/Makefile index 37d94f2b900..4033ff79960 100644 --- a/Makefile +++ b/Makefile @@ -69,13 +69,10 @@ distclean: reindex-solr: # Keep link in sync with ol-solr-updater-start and Jenkinsfile curl -C - -L "https://archive.org/download/2023_openlibrary_osp_counts/osp_totals.db" -o $(OSP_DUMP_LOCATION) - psql --host db openlibrary -t -c 'select key from thing' | sed 's/ *//' | grep '^/books/' | PYTHONPATH=$(PWD) xargs python openlibrary/solr/update.py --ol-url http://web:8080/ --osp-dump $(OSP_DUMP_LOCATION) --ol-config conf/openlibrary.yml --data-provider=legacy --solr-next - psql --host db openlibrary -t -c 'select key from thing' | sed 's/ *//' | grep '^/authors/' | PYTHONPATH=$(PWD) xargs python openlibrary/solr/update.py --ol-url http://web:8080/ --osp-dump $(OSP_DUMP_LOCATION) --ol-config conf/openlibrary.yml --data-provider=legacy --solr-next - psql --host db openlibrary -t -c 'select key from thing' | sed 's/ *//' | grep -E '/(lists|series)/' | PYTHONPATH=$(PWD) xargs python openlibrary/solr/update.py --ol-url http://web:8080/ --osp-dump $(OSP_DUMP_LOCATION) --ol-config conf/openlibrary.yml --data-provider=legacy --solr-next - PYTHONPATH=$(PWD) python ./scripts/solr_builder/solr_builder/index_subjects.py subject - PYTHONPATH=$(PWD) python ./scripts/solr_builder/solr_builder/index_subjects.py person - PYTHONPATH=$(PWD) python ./scripts/solr_builder/solr_builder/index_subjects.py place - PYTHONPATH=$(PWD) python ./scripts/solr_builder/solr_builder/index_subjects.py time + psql --host db openlibrary -t -c 'select key from thing' | sed 's/ *//' | grep '^/books/' | xargs python openlibrary/solr/update.py --ol-url http://web:8080/ --osp-dump $(OSP_DUMP_LOCATION) --ol-config conf/openlibrary.yml --solr-next + psql --host db openlibrary -t -c 'select key from thing' | sed 's/ *//' | grep '^/authors/' | xargs python openlibrary/solr/update.py --ol-url http://web:8080/ --osp-dump $(OSP_DUMP_LOCATION) --ol-config conf/openlibrary.yml --solr-next + psql --host db openlibrary -t -c 'select key from thing' | sed 's/ *//' | grep -E '/(lists|series)/' | xargs python openlibrary/solr/update.py --ol-url http://web:8080/ --osp-dump $(OSP_DUMP_LOCATION) --ol-config conf/openlibrary.yml --solr-next + parallel -j4 python ./scripts/solr_builder/solr_builder/index_subjects.py ::: subject person place time lint: # See the pyproject.toml file for ruff's settings diff --git a/compose.override.yaml b/compose.override.yaml index f369eda6bcf..6c2edf9a192 100644 --- a/compose.override.yaml +++ b/compose.override.yaml @@ -59,8 +59,8 @@ services: test: ["CMD", "curl", "-f", "http://localhost:8983/solr/openlibrary/admin/ping"] interval: 30s start_interval: 1s - start_period: 20s - timeout: 1s + start_period: 60s + timeout: 5s retries: 24 volumes: - ./docker/ol-local-solr-start.sh:/docker/ol-solr-start.sh:ro @@ -75,6 +75,7 @@ services: dockerfile: docker/Dockerfile.oldev environment: - OL_SOLR_NEXT=true + - LOCAL_DEV=true depends_on: solr: condition: service_healthy diff --git a/openlibrary/catalog/utils/query.py b/openlibrary/catalog/utils/query.py deleted file mode 100644 index c953f2affe8..00000000000 --- a/openlibrary/catalog/utils/query.py +++ /dev/null @@ -1,178 +0,0 @@ -import json -import sys -import urllib -from time import sleep - -import requests - -query_host = "openlibrary.org" - - -def urlopen(url, data=None): - version = "%s.%s.%s" % sys.version_info[:3] - user_agent = f"Mozilla/5.0 (openlibrary; {__name__}) Python/{version}" - headers = {"User-Agent": user_agent} - - return requests.get(url, data=data, headers=headers) - - -def jsonload(url): - return urlopen(url).json() - - -def urlread(url): - return urlopen(url).content - - -def set_query_host(host): - global query_host - query_host = host - - -def has_cover(key): - url = "https://covers.openlibrary.org/" + key[1] + "/query?olid=" + key[3:] - return urlread(url).strip() != "[]" - - -def has_cover_retry(key): - for attempt in range(5): - try: - return has_cover(key) - except KeyboardInterrupt: - raise - except: - pass - sleep(2) - - -def base_url(): - return "http://" + query_host - - -def query_url(): - return base_url() + "/query.json?query=" - - -def get_all_ia(): - print("c") - q = {"source_records~": "ia:*", "type": "/type/edition"} - limit = 10 - q["limit"] = limit - q["offset"] = 0 - - while True: - url = base_url() + "/api/things?query=" + urllib.parse.quote(json.dumps(q)) - ret = jsonload(url)["result"] - yield from ret - if not ret: - return - q["offset"] += limit - - -def query(q): - url = query_url() + urllib.parse.quote(json.dumps(q)) - ret = None - for i in range(20): - try: - ret = urlread(url) - while ret.startswith(b"canceling statement due to statement timeout"): - ret = urlread(url) - if not ret: - print("ret == None") - except OSError: - pass - if ret: - try: - data = json.loads(ret) - if isinstance(data, dict): - if "error" in data: - print("error:") - print(ret) - assert "error" not in data - return data - except: - print(ret) - print(url) - sleep(20) - - -def query_iter(q, limit=500, offset=0): - q["limit"] = limit - q["offset"] = offset - while True: - ret = query(q) - if not ret: - return - yield from ret - # We haven't got as many we have requested. No point making one more request - if len(ret) < limit: - break - q["offset"] += limit - - -def get_editions_with_covers_by_author(author, count): - q = { - "type": "/type/edition", - "title_prefix": None, - "subtitle": None, - "title": None, - "authors": author, - } - with_covers = [] - for e in query_iter(q, limit=count): - if not has_cover(e["key"]): - continue - with_covers.append(e) - if len(with_covers) == count: - return with_covers - return with_covers - - -def version_iter(q, limit=500, offset=0): - q["limit"] = limit - q["offset"] = offset - while True: - url = base_url() + "/version" - v = jsonload(url) - if not v: - return - yield from query(q) - q["offset"] += limit - - -def withKey(key): - # ?_raw=true needed for lists/series - url = base_url() + key + ".json?_raw=true" - for i in range(20): - try: - return jsonload(url) - except: - pass - print("retry:", i) - print(url) - - -def get_marc_src(e): - mc = get_mc(e["key"]) - if mc: - yield mc - if not e.get("source_records", []): - return - for src in e["source_records"]: - if src.startswith("marc:") and src != "marc:" + mc: - yield src[5:] - - -def get_mc(key): # get machine comment - v = jsonload(base_url() + key + ".json?m=history") - - comments = [i["machine_comment"] for i in v if i.get("machine_comment", None) and ":" in i["machine_comment"]] - if len(comments) == 0: - return None - if len(set(comments)) != 1: - print(key) - print(comments) - assert len(set(comments)) == 1 - if comments[0] == "initial import": - return None - return comments[0] diff --git a/openlibrary/core/env.py b/openlibrary/core/env.py index 3a87fb2585a..f3d7d8c9dc4 100644 --- a/openlibrary/core/env.py +++ b/openlibrary/core/env.py @@ -7,6 +7,10 @@ class OLEnv: def OL_EXPOSE_SOLR_INTERNALS_PARAMS(self) -> bool: return os.environ.get("OL_EXPOSE_SOLR_INTERNALS_PARAMS") == "true" + @cached_property + def LOCAL_DEV(self) -> bool: + return os.environ.get("LOCAL_DEV") == "true" + _ol_env = OLEnv() diff --git a/openlibrary/core/lending.py b/openlibrary/core/lending.py index c915b937118..4a5a1a08e21 100644 --- a/openlibrary/core/lending.py +++ b/openlibrary/core/lending.py @@ -19,6 +19,7 @@ from infogami.utils.view import public from openlibrary.accounts.model import OpenLibraryAccount from openlibrary.core import cache, stats +from openlibrary.core.env import get_ol_env from openlibrary.plugins.upstream.utils import urlencode from openlibrary.utils import dateutil, uniq from openlibrary.utils.async_utils import async_bridge @@ -113,7 +114,6 @@ def compose_ia_url( query=None, sorts=None, advanced: bool = True, - rate_limit_exempt: bool = True, safe_mode: bool = False, ) -> str | None: """This needs to be exposed by a generalized API endpoint within @@ -175,7 +175,8 @@ def compose_ia_url( ("page", page), ("output", "json"), ] - if rate_limit_exempt: + if not get_ol_env().LOCAL_DEV: + # This flag is only available on prod params.append(("service", "metadata__unlimited")) if not sorts or not isinstance(sorts, list): sorts = [""] diff --git a/openlibrary/core/ratings.py b/openlibrary/core/ratings.py index 7b41b7ef797..c995113fed4 100644 --- a/openlibrary/core/ratings.py +++ b/openlibrary/core/ratings.py @@ -81,18 +81,15 @@ def get_rating_stats(cls, work_id) -> dict: @classmethod def get_work_ratings_summary(cls, work_id: int) -> WorkRatingsSummary | None: oldb = db.get_db() - # NOTE: Using some old postgres syntax here :/ for modern postgres syntax, - # see the query in solr_builder.py query = """ SELECT - sum( CASE WHEN rating = 1 THEN 1 ELSE 0 END ) as ratings_count_1, - sum( CASE WHEN rating = 2 THEN 1 ELSE 0 END ) as ratings_count_2, - sum( CASE WHEN rating = 3 THEN 1 ELSE 0 END ) as ratings_count_3, - sum( CASE WHEN rating = 4 THEN 1 ELSE 0 END ) as ratings_count_4, - sum( CASE WHEN rating = 5 THEN 1 ELSE 0 END ) as ratings_count_5 + count(*) FILTER (WHERE rating = 1) AS ratings_count_1, + count(*) FILTER (WHERE rating = 2) AS ratings_count_2, + count(*) FILTER (WHERE rating = 3) AS ratings_count_3, + count(*) FILTER (WHERE rating = 4) AS ratings_count_4, + count(*) FILTER (WHERE rating = 5) AS ratings_count_5 FROM ratings WHERE work_id = $work_id - GROUP BY work_id """ result = oldb.query(query, vars={"work_id": work_id}) if not result: diff --git a/openlibrary/solr/data_provider.py b/openlibrary/solr/data_provider.py index 2d04577e2e9..f2d5b3b724c 100644 --- a/openlibrary/solr/data_provider.py +++ b/openlibrary/solr/data_provider.py @@ -22,6 +22,7 @@ from openlibrary.core import ia from openlibrary.core.bookshelves import Bookshelves +from openlibrary.core.env import get_ol_env from openlibrary.core.ratings import Ratings, WorkRatingsSummary from openlibrary.solr.utils import get_solr_base_url from openlibrary.utils import extract_numeric_id_from_olid @@ -38,9 +39,7 @@ def get_data_provider(type="default"): """Returns the data provider of given type.""" if type == "default": - return BetterDataProvider() - elif type == "legacy": - return LegacyDataProvider() + return DatabaseDataProvider() else: raise ValueError("unknown data provider type: %s" % type) @@ -132,7 +131,8 @@ async def _get_lite_metadata(ocaids: Sequence[str], _recur_depth=0, _max_recur_d "rows": len(ocaids), "fl": ",".join(IA_METADATA_FIELDS), "output": "json", - "service": "metadata__unlimited", + # This is only available from prod + **({} if get_ol_env().LOCAL_DEV else {"service": "metadata__unlimited"}), }, ) r.raise_for_status() @@ -260,71 +260,6 @@ def clear_cache(self): self.ia_cache.clear() -class LegacyDataProvider(DataProvider): - def __init__(self): - from openlibrary.catalog.utils.query import query_iter, withKey - - super().__init__() - self._query_iter = query_iter - self._withKey = withKey - - def find_redirects(self, key): - """Returns keys of all things which are redirected to this one.""" - logger.info("find_redirects %s", key) - q = {"type": "/type/redirect", "location": key} - return [r["key"] for r in self._query_iter(q)] - - def get_editions_of_work(self, work): - logger.info("find_editions_of_work %s", work["key"]) - q = {"type": "/type/edition", "works": work["key"], "*": None} - return list(self._query_iter(q)) - - async def get_document(self, key): - logger.info("get_document %s", key) - return self._withKey(key) - - def get_work_ratings(self, work_key: str) -> WorkRatingsSummary | None: - work_id = int(work_key[len("/works/OL") : -len("W")]) - return Ratings.get_work_ratings_summary(work_id) - - def get_work_reading_log(self, work_key: str) -> WorkReadingLogSolrSummary: - work_id = extract_numeric_id_from_olid(work_key) - counts = Bookshelves.get_work_summary(work_id) - return cast( - WorkReadingLogSolrSummary, - { - "readinglog_count": sum(counts.values()), - **{f"{shelf}_count": count for shelf, count in counts.items()}, - }, - ) - - def clear_cache(self): - # Nothing's cached, so nothing to clear! - return - - @typing.override - async def get_trending_data(self, work_key: str) -> dict: - async with httpx.AsyncClient() as client: - response = await client.get( - get_solr_base_url() + "/get", - params={ - "id": work_key, - "fl": ",".join( # noqa: FLY002 - ( - "trending_score_hourly_sum", - "trending_score_hourly_*", - "trending_score_daily_*", - "trending_z_score", - ) - ), - }, - ) - response.raise_for_status() - solr_doc = response.json()["doc"] or {} - - return {field: solr_doc.get(field, 0) for field in get_all_trending_fields()} - - def get_all_trending_fields(): for index in range(24): yield f"trending_score_hourly_{index}" @@ -359,7 +294,15 @@ async def get_document(self, key: str): return response.json() -class BetterDataProvider(LegacyDataProvider): +class DatabaseDataProvider(DataProvider): + """ + This data provider assumes we are running in a full prod or local OL environment + with database access. + + It uses the infogami site to fetch data directly from the database, as well as + makes queries to solr for some data (like trending data) that is not in the database. + """ + def __init__( self, site: Site | None = None, @@ -506,16 +449,17 @@ def preload_editions_of_works(self, work_keys: Iterable[str]): # Infobase doesn't has a way to do find editions of multiple works at once. # Using raw SQL to avoid making individual infobase queries, which is very # time consuming. - key_query = "select id from property where name='works' and type=(select id from thing where key='/type/edition')" - - q = ( - "SELECT edition.key as edition_key, work.key as work_key" - " FROM thing as edition, thing as work, edition_ref" - " WHERE edition_ref.thing_id=edition.id" - " AND edition_ref.value=work.id" - f" AND edition_ref.key_id=({key_query})" - " AND work.key in $keys" - ) + q = """ + SELECT edition.key as edition_key, work.key as work_key + FROM thing as edition, thing as work, edition_ref + WHERE edition_ref.thing_id=edition.id + AND edition_ref.value=work.id + AND edition_ref.key_id=( + SELECT id FROM property + WHERE name='works' AND type=(SELECT id FROM thing WHERE key='/type/edition') + ) + AND work.key in $keys + """ result = self.db.query(q, vars={"keys": work_keys}) for row in result: self.edition_keys_of_works_cache.setdefault(row.work_key, []).append(row.edition_key) @@ -524,6 +468,45 @@ def preload_editions_of_works(self, work_keys: Iterable[str]): self.preload_documents0(keys) return + @typing.override + def get_work_ratings(self, work_key: str) -> WorkRatingsSummary | None: + work_id = int(work_key[len("/works/OL") : -len("W")]) + return Ratings.get_work_ratings_summary(work_id) + + @typing.override + def get_work_reading_log(self, work_key: str) -> WorkReadingLogSolrSummary: + work_id = extract_numeric_id_from_olid(work_key) + counts = Bookshelves.get_work_summary(work_id) + return cast( + WorkReadingLogSolrSummary, + { + "readinglog_count": sum(counts.values()), + **{f"{shelf}_count": count for shelf, count in counts.items()}, + }, + ) + + @typing.override + async def get_trending_data(self, work_key: str) -> dict: + async with httpx.AsyncClient() as client: + response = await client.get( + get_solr_base_url() + "/get", + params={ + "id": work_key, + "fl": ",".join( # noqa: FLY002 + ( + "trending_score_hourly_sum", + "trending_score_hourly_*", + "trending_score_daily_*", + "trending_z_score", + ) + ), + }, + ) + response.raise_for_status() + solr_doc = response.json()["doc"] or {} + + return {field: solr_doc.get(field, 0) for field in get_all_trending_fields()} + def clear_cache(self): super().clear_cache() self.cache.clear() diff --git a/openlibrary/solr/update.py b/openlibrary/solr/update.py index af02107b065..096d2c76f57 100644 --- a/openlibrary/solr/update.py +++ b/openlibrary/solr/update.py @@ -7,7 +7,6 @@ import aiofiles -from openlibrary.catalog.utils.query import set_query_host from openlibrary.solr.data_provider import ( DataProvider, ExternalDataProvider, @@ -139,18 +138,12 @@ async def _solr_update(update_state: SolrUpdateRequest): return net_update -async def do_updates(keys): - logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") - await update_keys(keys, commit=False) - - def load_configs( c_host: str, c_config: str, - c_data_provider: (DataProvider | Literal["default", "legacy", "external"]) = "default", + c_data_provider: (DataProvider | Literal["default", "external"]) = "default", ) -> DataProvider: host = c_host.removeprefix("http://").strip("/") - set_query_host(host) load_config(c_config) @@ -172,7 +165,7 @@ async def main( ol_config="openlibrary.yml", output_file: str | None = None, commit=True, - data_provider: Literal["default", "legacy", "external"] = "default", + data_provider: Literal["default", "external"] = "default", solr_base: str | None = None, solr_next=False, update: Literal["update", "print", "pprint"] = "update", diff --git a/openlibrary/tests/solr/test_data_provider.py b/openlibrary/tests/solr/test_data_provider.py index 77fbd94dff9..08f60c78826 100644 --- a/openlibrary/tests/solr/test_data_provider.py +++ b/openlibrary/tests/solr/test_data_provider.py @@ -3,14 +3,14 @@ import pytest from infogami.infobase.client import Thing -from openlibrary.solr.data_provider import BetterDataProvider +from openlibrary.solr.data_provider import DatabaseDataProvider -class TestBetterDataProvider: +class TestDatabaseDataProvider: @pytest.mark.asyncio async def test_get_document(self): mock_site = MagicMock() - dp = BetterDataProvider( + dp = DatabaseDataProvider( site=mock_site, db=MagicMock(), ) @@ -33,7 +33,7 @@ async def test_get_document(self): @pytest.mark.asyncio async def test_clear_cache(self): mock_site = MagicMock() - dp = BetterDataProvider( + dp = DatabaseDataProvider( site=mock_site, db=MagicMock(), ) diff --git a/scripts/solr_updater/solr_updater.py b/scripts/solr_updater/solr_updater.py index bc404dfd5ea..965e7281c03 100644 --- a/scripts/solr_updater/solr_updater.py +++ b/scripts/solr_updater/solr_updater.py @@ -9,17 +9,18 @@ import asyncio import datetime +import itertools import json import logging import re import socket import sys -from collections.abc import Iterator +from collections.abc import Iterable, Iterator from pathlib import Path +from typing import TypedDict, cast import aiofiles import requests -import web import infogami from infogami import config @@ -47,6 +48,19 @@ def get_default_offset(): return datetime.date.today().isoformat() + ":0" +class InfobaseLogRecord(TypedDict): + action: str + timestamp: str + data: dict + site: str + + +class InfobaseLogResponse(TypedDict): + data: list[InfobaseLogRecord] + offset: str + """E.g. '2023-02-11:0'""" + + class InfobaseLog: def __init__(self, hostname: str, exclude: str | None = None): """ @@ -82,7 +96,7 @@ def read_records(self, max_fetches=10): raise try: - d = resp.json() + d: InfobaseLogResponse = resp.json() except: logger.error("Bad JSON: %s", resp.text) raise @@ -142,7 +156,7 @@ def find_keys(d: dict | list) -> Iterator[str]: return -def parse_log(records, load_ia_scans: bool): +def parse_log(records: Iterable[InfobaseLogRecord], load_ia_scans: bool): for rec in records: action = rec.get("action") @@ -151,7 +165,7 @@ def parse_log(records, load_ia_scans: bool): old_docs = changeset.get("old_docs", []) new_docs = changeset.get("docs", []) for before, after in zip(old_docs, new_docs): - yield after["key"] + yield cast(str, after["key"]) # before is None if the item is new if before: before_keys = set(find_keys(before)) @@ -174,11 +188,11 @@ def parse_log(records, load_ia_scans: bool): data = rec.get("data", {}).get("data", {}) key = data.get("_key", "") if data.get("type") == "ebook" and key.startswith("ebooks/books/"): - edition_key = data.get("book_key") + edition_key = cast(str | None, data.get("book_key")) if edition_key: yield edition_key elif load_ia_scans and data.get("type") == "ia-scan" and key.startswith("ia-scan/"): - identifier = data.get("identifier") + identifier = cast(str | None, data.get("identifier")) if identifier and is_allowed_itemid(identifier): yield "/books/ia:" + identifier @@ -187,11 +201,12 @@ def parse_log(records, load_ia_scans: bool): # 'solr-force-update' in the store and whatever keys are written to that # are picked by this script elif key == "solr-force-update": - keys = data.get("keys") + keys = cast(list[str], data.get("keys", [])) yield from keys elif action == "store.delete": - key = rec.get("data", {}).get("key") + data = rec.get("data", {}) + key = cast(str, data.get("key")) # An ia-scan key is deleted when that book is deleted/darked from IA. # Delete it from OL solr by updating that key if key.startswith("ia-scan/"): @@ -199,30 +214,28 @@ def parse_log(records, load_ia_scans: bool): yield ol_key -def is_allowed_itemid(identifier): +def is_allowed_itemid(identifier: str): if not re.match("^[a-zA-Z0-9_.-]*$", identifier): return False # items starts with these prefixes are not books. Ignore them. - ignore_prefixes = config.get("ia_ignore_prefixes", []) + ignore_prefixes = cast(list[str], config.get("ia_ignore_prefixes", [])) return all(not identifier.startswith(prefix) for prefix in ignore_prefixes) -async def update_keys(keys): - if not keys: - return 0 - +async def update_keys(keys: Iterable[str]): # FIXME: Some kind of hack introduced to work around DB connectivity issue logger.debug("Args: %s" % str(args)) update.load_configs(args["ol_url"], args["ol_config"], "default") keys = [k for k in keys if update.can_update_key(k)] + if not keys: + return 0 count = 0 - for chunk in web.group(keys, 100): - chunk = list(chunk) + for chunk in itertools.batched(keys, 100, strict=False): count += len(chunk) - await update.do_updates(chunk) + await update.update_keys(list(chunk), commit=False) # Caches should not persist between different calls to update_keys! update.data_provider.clear_cache() @@ -239,7 +252,7 @@ async def main( debugger: bool = False, state_file: str = "solr-update.state", exclude_edits_containing: str | None = None, - ol_url="http://openlibrary.org/", + ol_url="http://openlibrary.org/", # Has side effects, is read globally socket_timeout: int = 10, load_ia_scans: bool = False, initial_state: str | None = None, @@ -271,11 +284,6 @@ async def main( # Setting a timeout will make the request fail instead of waiting forever. socket.setdefaulttimeout(socket_timeout) - # set OL URL when running on a dev-instance - if ol_url: - host = ol_url.removeprefix("http://").strip("/") - update.set_query_host(host) - set_osp_dump_location(osp_dump) logger.info("loading config from %s", ol_config)