diff --git a/af/fastpath/fastpath/core.py b/af/fastpath/fastpath/core.py index db7d0891..7d408dfa 100644 --- a/af/fastpath/fastpath/core.py +++ b/af/fastpath/fastpath/core.py @@ -34,7 +34,7 @@ no_journal_handler = True # Feeds measurements from S3 -import fastpath.s3feeder as s3feeder +import fastpath.oonidata.s3feeder as s3feeder # Feeds measurements from a local HTTP API from fastpath.localhttpfeeder import start_http_api @@ -42,11 +42,11 @@ # Push measurements into Postgres import fastpath.db as db -from fastpath.metrics import setup_metrics -from fastpath.mytypes import MsmtTup +from fastpath.oonidata.metrics import setup_metrics +from fastpath.oonidata.mytypes import MsmtTup import fastpath.portable_queue as queue -import fastpath.utils +import fastpath.oonidata.utils LOCALITY_VALS = ("general", "global", "country", "isp", "local") @@ -725,7 +725,7 @@ def get_http_header(resp, header_name, case_sensitive=False): # backward compatibility with older measurements that don't have # header_list - if "header_list" not in resp: + if "headers_list" not in resp: headers = resp.get("headers", {}) header_list = [[h,v] for h,v in headers.items()] else: @@ -1617,7 +1617,7 @@ def setup_fingerprints(): fingerprints = { "ZZ": {"body_match": [], "header_prefix": [], "header_full": [], "dns_full": []} } - for cc, fprints in fastpath.utils.fingerprints.items(): + for cc, fprints in fastpath.oonidata.utils.fingerprints.items(): d = fingerprints.setdefault(cc, {}) for fp in fprints: assert fp["locality"] in LOCALITY_VALS, fp["locality"] diff --git a/af/fastpath/fastpath/db.py b/af/fastpath/fastpath/db.py index de96fb10..5cd69abb 100644 --- a/af/fastpath/fastpath/db.py +++ b/af/fastpath/fastpath/db.py @@ -22,7 +22,7 @@ pass import ujson -from fastpath.metrics import setup_metrics +from fastpath.oonidata.metrics import setup_metrics log = logging.getLogger("fastpath.db") metrics = setup_metrics(name="fastpath.db") diff --git a/af/fastpath/fastpath/metrics.py b/af/fastpath/fastpath/metrics.py deleted file mode 100644 index 470ec975..00000000 --- a/af/fastpath/fastpath/metrics.py +++ /dev/null @@ -1,24 +0,0 @@ -# -*- coding: utf-8 -*- - -""" -Metric generation -""" - -from os.path import basename, splitext - -import statsd # debdeps: python3-statsd - - -def setup_metrics(host="localhost", name=None): - """Setup metric generation. Use dotted namespaces e.g. - "pipeline.centrifugation" - """ - if name is None: - import __main__ - - prefix = splitext(basename(__main__.__file__))[0] - else: - prefix = name - - prefix = prefix.strip(".") - return statsd.StatsClient(host, 8125, prefix=prefix) diff --git a/af/fastpath/fastpath/oonidata b/af/fastpath/fastpath/oonidata new file mode 120000 index 00000000..21aa3ae4 --- /dev/null +++ b/af/fastpath/fastpath/oonidata @@ -0,0 +1 @@ +../../../oonidata/oonidata \ No newline at end of file diff --git a/af/fastpath/fastpath/reprocessor.py b/af/fastpath/fastpath/reprocessor.py index 4e598b5d..bdb89499 100755 --- a/af/fastpath/fastpath/reprocessor.py +++ b/af/fastpath/fastpath/reprocessor.py @@ -45,9 +45,9 @@ import statsd # debdeps: python3-statsd import fastpath.db as db -import fastpath.s3feeder as s3f +import fastpath.oonidata.s3feeder as s3f from fastpath.core import score_measurement, setup_fingerprints, unwrap_msmt -from fastpath.utils import trivial_id +from fastpath.oonidata.utils import trivial_id metrics = statsd.StatsClient("127.0.0.1", 8125, prefix="reprocessor") log = logging.getLogger("reprocessor") diff --git a/af/fastpath/fastpath/s3feeder.py b/af/fastpath/fastpath/s3feeder.py deleted file mode 100644 index 179e908d..00000000 --- a/af/fastpath/fastpath/s3feeder.py +++ /dev/null @@ -1,348 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -Feeds reports from cans on public S3 bucke or local disk - -Explore bucket from CLI: -AWS_PROFILE=ooni-data aws s3 ls s3://ooni-data/canned/2019-07-16/ - -""" - -from datetime import date, timedelta -from typing import Generator, Set -from pathlib import Path -import logging -import os -import time -import tarfile - -import lz4.frame as lz4frame # debdeps: python3-lz4 -import ujson - -# lz4frame appears faster than executing lz4cat: 2.4s vs 3.9s on a test file - -import boto3 # debdeps: python3-boto3 -from botocore import UNSIGNED as botoSigUNSIGNED -from botocore.config import Config as botoConfig - -from fastpath.metrics import setup_metrics -from fastpath.mytypes import MsmtTup # msmt bytes, msmt dict, uid -from fastpath.normalize import iter_yaml_msmt_normalized -from fastpath.utils import trivial_id - -CAN_BUCKET_NAME = "ooni-data" -MC_BUCKET_NAME = "ooni-data-eu-fra" - -log = logging.getLogger("fastpath") -metrics = setup_metrics(name="fastpath.s3feeder") - -# suppress debug logs -for x in ("urllib3", "botocore", "s3transfer"): - logging.getLogger(x).setLevel(logging.INFO) - - -def load_multiple(fn: str) -> Generator[MsmtTup, None, None]: - """Load contents of legacy cans and minicans. - Decompress tar archives if found. - Yields measurements one by one as: - (string of JSON, None, uid) or (None, msmt dict, uid) - The uid is either taken from the filename or generated by trivial_id for - legacy cans - """ - # TODO: split this and handle legacy cans and post/minicans independently - if fn.endswith(".tar.lz4"): - # Legacy lz4 cans - with lz4frame.open(fn) as f: - tf = tarfile.TarFile(fileobj=f) - while True: - m = tf.next() - if m is None: - # end of tarball - break - log.debug("Loading nested %s", m.name) - k = tf.extractfile(m) - assert k is not None - if m.name.endswith(".json"): - for line in k: - msm = ujson.loads(line) - msmt_uid = trivial_id(msm) - yield (None, msm, msmt_uid) - - elif m.name.endswith(".yaml"): - bucket_tstamp = fn.split("/")[-2] - rfn = f"{bucket_tstamp}/" + fn.split("/")[-1] - for msm in iter_yaml_msmt_normalized(k, bucket_tstamp, rfn): - metrics.incr("yaml_normalization") - msmt_uid = trivial_id(msm) - yield (None, msm, msmt_uid) - - elif fn.endswith(".json.lz4"): - # Legacy lz4 json files - with lz4frame.open(fn) as f: - for line in f: - msm = ujson.loads(line) - msmt_uid = trivial_id(msm) - yield (None, msm, msmt_uid) - - elif fn.endswith(".yaml.lz4"): - # Legacy lz4 yaml files - with lz4frame.open(fn) as f: - bucket_tstamp = fn.split("/")[-2] - rfn = f"{bucket_tstamp}/" + fn.split("/")[-1] - for msm in iter_yaml_msmt_normalized(f, bucket_tstamp, rfn): - metrics.incr("yaml_normalization") - msmt_uid = trivial_id(msm) - yield (None, msm, msmt_uid) - - elif fn.endswith(".tar.gz"): - # minican with missing gzipping :( - tf = tarfile.open(fn) - while True: - m = tf.next() - if m is None: - # end of tarball - tf.close() - break - log.debug("Loading %s", m.name) - k = tf.extractfile(m) - assert k is not None - if not m.name.endswith(".post"): - log.error("Unexpected filename") - continue - - try: - j = ujson.loads(k.read()) - except Exception: - log.error(repr(k[:100]), exc_info=1) - continue - - fmt = j.get("format", "") - if fmt == "json": - msm = j.get("content", {}) - # extract msmt_uid from filename e.g: - # ... /20210614004521.999962_JO_signal_68eb19b439326d60.post - msmt_uid = m.name.rsplit("/", 1)[1] - msmt_uid = msmt_uid[:-5] - yield (None, msm, msmt_uid) - - elif fmt == "yaml": - log.info("Skipping YAML") - - else: - log.info("Ignoring invalid post") - - elif fn.endswith("/index.json.gz"): - pass - - else: - raise RuntimeError(f"Unexpected [mini]can filename '{fn}'") - - -def create_s3_client(): - return boto3.client("s3", config=botoConfig(signature_version=botoSigUNSIGNED)) - - -def list_cans_on_s3_for_a_day(s3, day: date): - """List legacy cans.""" - prefix = f"{day}/" - r = s3.list_objects_v2(Bucket=CAN_BUCKET_NAME, Prefix="canned/" + prefix) - - if ("Contents" in r) ^ (day <= date(2020, 10, 21)): - # The last day with cans is 2020-10-21 - log.warn("%d can files found!", len(r.get("Contents", []))) - - fs = r.get("Contents", []) - files = [(f["Key"], f["Size"]) for f in fs] - return files - - -def list_minicans_on_s3_for_a_day( - s3, day: date, ccs: Set[str], testnames: Set[str] -) -> list: - """List minicans. Filter them by CCs and testnames - Testnames are without underscores. - """ - # s3cmd ls s3://ooni-data-eu-fra/raw/20210202 - tstamp = day.strftime("%Y%m%d") - prefix = f"raw/{tstamp}/" - cont_token = None - files = [] - # list_objects_v2 returns 1000 objects max and needs a token (!= None) - while True: - kw = {} if cont_token is None else dict(ContinuationToken=cont_token) - r = s3.list_objects_v2(Bucket=MC_BUCKET_NAME, Prefix=prefix, **kw) - - cont_token = r.get("NextContinuationToken", None) - if ("Contents" in r) ^ (day >= date(2020, 10, 20)): - # The first day with minicans is 2020-10-20 - log.warn("%d minican files found!", len(r.get("Contents", []))) - - for f in r.get("Contents", []): - if not f["Key"].endswith(".tar.gz"): - continue - - # Example: - # raw/20210910/02/CU/signal/2021091002_CU_signal.n0.0.tar.gz - fname = f["Key"] - try: - _raw, _date, _hour, cc, testname, _ = fname.split("/") - except Exception: - log.warn(f"Ignoring unexpected minican filename {fname}") - - if ccs and cc not in ccs: - continue - - if testnames and testname not in testnames: - continue - - if f["Size"] > 0: - files.append((fname, f["Size"])) - - if cont_token is None: - log.info(f"Found {len(files)} minican .tar.gz files") - return sorted(files) - - assert False - - -def log_download(s3fname, size) -> None: - s = size / 1024 / 1024 - d = "M" - if s < 1: - s = size / 1024 - d = "K" - log.info(f"Downloading can {s3fname} size {s:.1f} {d}B") - - -@metrics.timer("fetch_cans") -def fetch_cans(s3, conf, files) -> Generator[Path, None, None]: - """ - Download cans to a local directory - fnames = [("2013-09-12/20130912T150305Z-MD-AS1547-http_", size), ... ] - yield each can file Path - """ - # fn: can filename without path - # diskf: File in the s3cachedir directory - cans = set() # (s3fname, filename on disk, size, download required) - for s3fname, size in files: - diskf = conf.s3cachedir / s3fname.split("/", 1)[1] - if diskf.exists() and size == diskf.stat().st_size: - metrics.incr("cache_hit") - diskf.touch(exist_ok=True) - cans.add((s3fname, diskf, size, False)) - else: - metrics.incr("cache_miss") - cans.add((s3fname, diskf, size, True)) - - def _cb(bytes_count): - if _cb.start_time is None: - _cb.start_time = time.time() - _cb.count = bytes_count - return - _cb.count += bytes_count - _cb.total_count += bytes_count - metrics.gauge("s3_download_percentage", _cb.total_count / _cb.total_size * 100) - try: - speed = _cb.count / 131_072 / (time.time() - _cb.start_time) - metrics.gauge("s3_download_speed_avg_Mbps", speed) - except ZeroDivisionError: - pass - - cans = sorted(cans) - _cb.total_size = sum(t[2] for t in cans if t[3]) - _cb.total_count = 0 - - for s3fname, diskf, size, dload_required in cans: - if not dload_required: - yield diskf # already in local cache - continue - - # TODO: handle missing file - log_download(s3fname, size) - diskf.parent.mkdir(parents=True, exist_ok=True) - tmpf = diskf.with_suffix(".s3tmp") - metrics.gauge("fetching", 1) - _cb.start_time = None - with tmpf.open("wb") as f: - bucket_name = CAN_BUCKET_NAME if "canned/" in s3fname else MC_BUCKET_NAME - s3.download_fileobj(bucket_name, s3fname, f, Callback=_cb) - f.flush() - os.fsync(f.fileno()) - metrics.gauge("fetching", 0) - tmpf.rename(diskf) - assert size == diskf.stat().st_size - yield diskf - - metrics.gauge("s3_download_speed_avg_Mbps", 0) - - -# TODO: merge with stream_daily_cans, add caching to the latter to be used -# during functional tests -# @metrics.timer("fetch_cans_for_a_day_with_cache") -# def fetch_cans_for_a_day_with_cache(conf, day): -# s3 = create_s3_client() -# fns = list_cans_on_s3_for_a_day(s3, day) -# list(fetch_cans(s3, conf, fns)) - - -def _calculate_etr(t0, now, start_day, day, stop_day, can_num, can_tot_count) -> int: - """Estimate total runtime in seconds. - stop_day is not included, can_num starts from 0 - """ - tot_days_count = (stop_day - start_day).days - elapsed = now - t0 - days_done = (day - start_day).days - fraction_of_day_done = (can_num + 1) / float(can_tot_count) - etr = elapsed * tot_days_count / (days_done + fraction_of_day_done) - return etr - - -def _update_eta(t0, start_day, day, stop_day, can_num, can_tot_count): - """Generate metric process_s3_measurements_eta expressed as epoch""" - try: - now = time.time() - etr = _calculate_etr(t0, now, start_day, day, stop_day, can_num, can_tot_count) - eta = t0 + etr - metrics.gauge("process_s3_measurements_eta", eta) - except: - pass - - -def stream_cans(conf, start_day: date, end_day: date) -> Generator[MsmtTup, None, None]: - """Stream cans from S3""" - today = date.today() - if not start_day or start_day >= today: - return - - log.info("Fetching older cans from S3") - t0 = time.time() - day = start_day - s3 = create_s3_client() - # the last day is not included - stop_day = end_day if end_day < today else today - while day < stop_day: - log.info("Processing day %s", day) - cans_fns = list_cans_on_s3_for_a_day(s3, day) - minicans_fns = list_minicans_on_s3_for_a_day(s3, day, conf.ccs, conf.testnames) - cans_fns.extend(minicans_fns) - for cn, can_f in enumerate(fetch_cans(s3, conf, cans_fns)): - try: - _update_eta(t0, start_day, day, stop_day, cn, len(cans_fns)) - # log.info("can %s ready", can_f.name) - for msmt_tup in load_multiple(can_f.as_posix()): - yield msmt_tup - except Exception as e: - log.error(str(e), exc_info=True) - - if not conf.keep_s3_cache: - try: - can_f.unlink() - except FileNotFoundError: - pass - - day += timedelta(days=1) - - if end_day: - log.info(f"Reached {end_day}, streaming cans from S3 finished") - return diff --git a/af/fastpath/fastpath/sshfeeder.py b/af/fastpath/fastpath/sshfeeder.py index fcb484a4..c0cf8332 100755 --- a/af/fastpath/fastpath/sshfeeder.py +++ b/af/fastpath/fastpath/sshfeeder.py @@ -23,9 +23,9 @@ warnings.filterwarnings(action="ignore", module=".*paramiko.*") -import fastpath.normalize as normalize # noqa -from fastpath.metrics import setup_metrics # noqa -from fastpath.mytypes import MsmtTup +import fastpath.oonidata.normalize as normalize # noqa +from fastpath.oonidata.metrics import setup_metrics # noqa +from fastpath.oonidata.mytypes import MsmtTup log = logging.getLogger("fastpath") diff --git a/af/fastpath/fastpath/tests/test_functional.py b/af/fastpath/fastpath/tests/test_functional.py index 5f261f5d..6799a7ad 100644 --- a/af/fastpath/fastpath/tests/test_functional.py +++ b/af/fastpath/fastpath/tests/test_functional.py @@ -15,7 +15,7 @@ import pytest # debdeps: python3-pytest import fastpath.core as fp -import fastpath.s3feeder as s3feeder +import fastpath.oonidata.s3feeder as s3feeder log = logging.getLogger() @@ -143,7 +143,7 @@ def minicans(test_name, start_date: date, end_date: date, end=None): while day <= end_date: tn_filter = set([test_name.replace("_", "")]) log.info(day) - li = s3feeder.list_minicans_on_s3_for_a_day(s3, day, None, tn_filter) + li = s3feeder.list_minicans_on_s3_for_a_day(day, None, tn_filter) for s3fname, s3size in li: # s3fname: raw/20210426/23/YE/ndt/2021042623_YE_ndt.n0.0.tar.gz local_file = Path("testdata") / "mini" / s3fname @@ -167,8 +167,7 @@ def minicans(test_name, start_date: date, end_date: date, end=None): def list_cans_on_s3_for_a_day(day, filter=None, bysize=False): - s3 = s3feeder.create_s3_client() - fns = s3feeder.list_cans_on_s3_for_a_day(s3, day) + fns = s3feeder.list_cans_on_s3_for_a_day(day) if bysize: fns = sorted(fns, key=lambda i: i[1]) else: diff --git a/af/fastpath/fastpath/tests/test_functional_normalize.py b/af/fastpath/fastpath/tests/test_functional_normalize.py index c77f7bf0..6f489267 100644 --- a/af/fastpath/fastpath/tests/test_functional_normalize.py +++ b/af/fastpath/fastpath/tests/test_functional_normalize.py @@ -16,8 +16,8 @@ import tarfile import ujson -import fastpath.normalize as norm -from fastpath.s3feeder import create_s3_client +import fastpath.oonidata.normalize as norm +from fastpath.oonidata.s3feeder import create_s3_client log = logging.getLogger() diff --git a/af/fastpath/fastpath/tests/test_unit.py b/af/fastpath/fastpath/tests/test_unit.py index 5162cc73..47a7cd1a 100644 --- a/af/fastpath/fastpath/tests/test_unit.py +++ b/af/fastpath/fastpath/tests/test_unit.py @@ -8,9 +8,9 @@ import pytest import ujson -from fastpath.utils import trivial_id +from fastpath.oonidata.utils import trivial_id import fastpath.core as fp -import fastpath.s3feeder as s3feeder +import fastpath.oonidata.s3feeder as s3feeder def loadj(fn): diff --git a/af/fastpath/setup.py b/af/fastpath/setup.py index f34f9153..0150efed 100644 --- a/af/fastpath/setup.py +++ b/af/fastpath/setup.py @@ -11,7 +11,7 @@ setup( name=NAME, python_requires=">=3.7.0", - packages=["fastpath", "fastpath.tests"], + packages=["fastpath", "fastpath.oonidata", "fastpath.tests"], entry_points={"console_scripts": [ "fastpath=fastpath.core:main", "reprocessor=fastpath.reprocessor:main", diff --git a/oonidata/.gitignore b/oonidata/.gitignore new file mode 100644 index 00000000..00211148 --- /dev/null +++ b/oonidata/.gitignore @@ -0,0 +1,2 @@ +dist/ +oonidata.egg-info/ diff --git a/oonidata/LICENSE b/oonidata/LICENSE new file mode 100644 index 00000000..2b969e99 --- /dev/null +++ b/oonidata/LICENSE @@ -0,0 +1,11 @@ +Copyright 2022 Open Observatory of Network Interference (OONI) + +Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: + +Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. + +Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. + +Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/oonidata/README.md b/oonidata/README.md new file mode 100644 index 00000000..b39adf01 --- /dev/null +++ b/oonidata/README.md @@ -0,0 +1,21 @@ +# OONI Data + +**Attention** +This tool is currently in alpha stage. The CLI API is subject to change and you +should be careful to rely on it for production usage. + +## What is this? + +OONI data is a tool for interacting with raw OONI measurements. It supports +downloading raw network measurement data in batch. + +For the specifications of the base data formats see: https://github.com/ooni/spec/tree/master/data-formats + +For the specifications of each of the tests see: https://github.com/ooni/spec/tree/master/nettests + +## Example usage + +To download raw Web Connectivity measurements for a given country and time range, use the following: +``` +oonidata sync --since 2022-02-23 --until 2022-03-17 --country-codes IT --test-names web_connectivity --output-dir ./oonidatastore/ +``` diff --git a/oonidata/oonidata/__init__.py b/oonidata/oonidata/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/oonidata/oonidata/main.py b/oonidata/oonidata/main.py new file mode 100644 index 00000000..dcc22c66 --- /dev/null +++ b/oonidata/oonidata/main.py @@ -0,0 +1,132 @@ +import argparse +import shutil +from collections import namedtuple +from functools import singledispatch, partial +import tempfile +import os +import gzip +import itertools +import logging +import datetime as dt +import pathlib +import sys +import time +from typing import List, Generator, Tuple, List + +import ujson + +from multiprocessing.pool import ThreadPool + +from tqdm import tqdm +from tqdm.contrib.logging import logging_redirect_tqdm + +from .s3feeder import create_s3_client, FileEntry, download_measurement_container +from .s3feeder import jsonl_in_range + +log = logging.getLogger("oonidata") +logging.basicConfig(level=logging.INFO) + +# Taken from: +# https://github.com/Jigsaw-Code/net-analysis/blob/master/netanalysis/ooni/data/sync_measurements.py#L33 +@singledispatch +def trim_measurement(json_obj, max_string_size: int): + return json_obj + + +@trim_measurement.register(dict) +def _(json_dict: dict, max_string_size: int): + keys_to_delete: List[str] = [] + for key, value in json_dict.items(): + if type(value) == str and len(value) > max_string_size: + keys_to_delete.append(key) + else: + trim_measurement(value, max_string_size) + for key in keys_to_delete: + del json_dict[key] + return json_dict + + +@trim_measurement.register(list) +def _(json_list: list, max_string_size: int): + for item in json_list: + trim_measurement(item, max_string_size) + return json_list + + +def trim_container(s3cachedir: pathlib.Path, fe: FileEntry, max_string_size: int): + mc = fe.output_path(s3cachedir) + temp_path = diskf.with_suffix(".tmp") + try: + with gzip.open( + temp_path, mode="wt", encoding="utf-8", newline="\n" + ) as out_file: + for msmt in load_multiple(mc.as_posix()): + msmt = trim_measurement(msmt, args.max_string_size) + ujson.dump(msmt, out_file) + out_file.write("\n") + temp_path.replace(mc) + except: + temp_path.unlink() + raise + + +def download_and_trim(fe, output_dir, max_string_size): + mc = download_measurement_container(output_dir, fe) + if max_string_size: + trim_container(output_dir, fe, max_string_size) + + +def sync(args): + testnames = [] + if args.test_names: + # Replace _ with a - + testnames = list(map(lambda x: x.replace("_", ""), args.test_names)) + + log.info(f"Listing measurement in s3 for {args.since} - {args.until} probe_cc: {args.country_codes}") + log.info("This may take a while...") + + file_entries = list(jsonl_in_range(args.country_codes, testnames, args.since, args.until)) + with logging_redirect_tqdm(): + func = partial(download_and_trim, output_dir=args.output_dir, + max_string_size=args.max_string_size) + with ThreadPool() as pool: + list(tqdm(pool.imap_unordered(func, file_entries), total=len(file_entries))) + + +def _parse_date_flag(date_str: str) -> dt.date: + return dt.datetime.strptime(date_str, "%Y-%m-%d").date() + + +def main(): + parser = argparse.ArgumentParser("OONI Data tools") + parser.set_defaults(func=lambda r: parser.print_usage()) + + subparsers = parser.add_subparsers() + + parser_sync = subparsers.add_parser("sync", help="Sync OONI measurements") + parser_sync.add_argument( + "--country-codes", + type=str, + nargs="*", + help="List of probe_cc values to filter by", + ) + parser_sync.add_argument( + "--since", + type=_parse_date_flag, + default=dt.date.today() - dt.timedelta(days=14), + ) + parser_sync.add_argument("--until", type=_parse_date_flag, default=dt.date.today()) + parser_sync.add_argument( + "--test-names", nargs="*", help="List of test_name values to filter by" + ) + parser_sync.add_argument("--max-string-size", type=int) + parser_sync.add_argument("--output-dir", type=pathlib.Path, required=True) + parser_sync.add_argument("--debug", action="store_true") + parser_sync.set_defaults(func=sync) + + args = parser.parse_args() + sys.exit(args.func(args)) + + +if __name__ == "__main__": + main() diff --git a/oonidata/oonidata/metrics.py b/oonidata/oonidata/metrics.py new file mode 100644 index 00000000..8fbea630 --- /dev/null +++ b/oonidata/oonidata/metrics.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- + +""" +Metric generation +""" + +import logging +from os.path import basename, splitext +from functools import wraps + +log = logging.getLogger("fastpath") + +class MockTimer(object): + def __call__(self, f): + @wraps(f) + def _f(*a, **k): + return f(*a, **k) + return _f + +class MockStatsClient(object): + """ + API compatible with the statsd client, but does nothing. + """ + def __init__(self, host=None, port=None, prefix=None, sample_rate=None): + pass + def incr(self, stat, count=1, rate=1): + pass + def decr(self, stat, count=1, rate=1): + pass + def gauge(self, stat, value, rate=1, delta=False): + log.info(f"{stat}: {value}") + def set(self, stat, value, rate=1): + pass + def timer(self, stat, rate=1): + return MockTimer() + +try: + import statsd # debdeps: python3-statsd + statsdclient = statsd.StatsClient +except ImportError: + statsdclient = MockStatsClient + +def setup_metrics(host="localhost", name=None): + """Setup metric generation. Use dotted namespaces e.g. + "pipeline.centrifugation" + """ + if name is None: + import __main__ + + prefix = splitext(basename(__main__.__file__))[0] + else: + prefix = name + + prefix = prefix.strip(".") + return statsdclient(host, 8125, prefix=prefix) diff --git a/af/fastpath/fastpath/mytypes.py b/oonidata/oonidata/mytypes.py similarity index 100% rename from af/fastpath/fastpath/mytypes.py rename to oonidata/oonidata/mytypes.py diff --git a/af/fastpath/fastpath/normalize.py b/oonidata/oonidata/normalize.py similarity index 100% rename from af/fastpath/fastpath/normalize.py rename to oonidata/oonidata/normalize.py diff --git a/oonidata/oonidata/s3feeder.py b/oonidata/oonidata/s3feeder.py new file mode 100644 index 00000000..17c5438f --- /dev/null +++ b/oonidata/oonidata/s3feeder.py @@ -0,0 +1,510 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Feeds reports from cans on public S3 bucket or local disk + +Explore bucket from CLI: +AWS_PROFILE=ooni-data aws s3 ls s3://ooni-data/canned/2019-07-16/ + +""" + +from datetime import date, timedelta, datetime +from typing import Generator, Set, NamedTuple, Any, List +from collections import namedtuple +from pathlib import Path +import itertools +import logging +import os +import time +import gzip +import tarfile +from multiprocessing import Pool + +import lz4.frame as lz4frame # debdeps: python3-lz4 +import ujson + +# lz4frame appears faster than executing lz4cat: 2.4s vs 3.9s on a test file + +import boto3 # debdeps: python3-boto3 +from botocore import UNSIGNED as botoSigUNSIGNED +from botocore.config import Config as botoConfig + +from .metrics import setup_metrics +from .mytypes import MsmtTup # msmt bytes, msmt dict, uid +from .normalize import iter_yaml_msmt_normalized +from .utils import trivial_id + +CAN_BUCKET_NAME = "ooni-data" +MC_BUCKET_NAME = "ooni-data-eu-fra" +MAX_PROCESS_COUNT = 24 + +log = logging.getLogger("fastpath") +metrics = setup_metrics(name="fastpath.s3feeder") + + +def create_s3_client(): + return boto3.client("s3", config=botoConfig(signature_version=botoSigUNSIGNED)) + +s3 = create_s3_client() + +# suppress debug logs +for x in ("urllib3", "botocore", "s3transfer"): + logging.getLogger(x).setLevel(logging.INFO) + + +def load_multiple(fn: str) -> Generator[MsmtTup, None, None]: + """Load contents of legacy cans and minicans. + Decompress tar archives if found. + Yields measurements one by one as: + (string of JSON, None, uid) or (None, msmt dict, uid) + The uid is either taken from the filename or generated by trivial_id for + legacy cans + """ + # TODO: split this and handle legacy cans and post/minicans independently + if fn.endswith(".tar.lz4"): + # Legacy lz4 cans + with lz4frame.open(fn) as f: + tf = tarfile.TarFile(fileobj=f) + while True: + m = tf.next() + if m is None: + # end of tarball + break + log.debug("Loading nested %s", m.name) + k = tf.extractfile(m) + assert k is not None + if m.name.endswith(".json"): + for line in k: + msm = ujson.loads(line) + msmt_uid = trivial_id(msm) + yield (None, msm, msmt_uid) + + elif m.name.endswith(".yaml"): + bucket_tstamp = fn.split("/")[-2] + rfn = f"{bucket_tstamp}/" + fn.split("/")[-1] + for msm in iter_yaml_msmt_normalized(k, bucket_tstamp, rfn): + metrics.incr("yaml_normalization") + msmt_uid = trivial_id(msm) + yield (None, msm, msmt_uid) + + elif fn.endswith(".json.lz4"): + # Legacy lz4 json files + with lz4frame.open(fn) as f: + for line in f: + msm = ujson.loads(line) + msmt_uid = trivial_id(msm) + yield (None, msm, msmt_uid) + + elif fn.endswith(".jsonl.gz"): + # New JSONL files + with gzip.open(fn) as f: + for line in f: + msm = ujson.loads(line) + msmt_uid = trivial_id(msm) + yield (None, msm, msmt_uid) + + elif fn.endswith(".yaml.lz4"): + # Legacy lz4 yaml files + with lz4frame.open(fn) as f: + bucket_tstamp = fn.split("/")[-2] + rfn = f"{bucket_tstamp}/" + fn.split("/")[-1] + for msm in iter_yaml_msmt_normalized(f, bucket_tstamp, rfn): + metrics.incr("yaml_normalization") + msmt_uid = trivial_id(msm) + yield (None, msm, msmt_uid) + + elif fn.endswith(".tar.gz"): + # minican with missing gzipping :( + tf = tarfile.open(fn) + while True: + m = tf.next() + if m is None: + # end of tarball + tf.close() + break + log.debug("Loading %s", m.name) + k = tf.extractfile(m) + assert k is not None + if not m.name.endswith(".post"): + log.error("Unexpected filename") + continue + + try: + j = ujson.loads(k.read()) + except Exception: + log.error(repr(k[:100]), exc_info=1) + continue + + fmt = j.get("format", "") + if fmt == "json": + msm = j.get("content", {}) + # extract msmt_uid from filename e.g: + # ... /20210614004521.999962_JO_signal_68eb19b439326d60.post + msmt_uid = m.name.rsplit("/", 1)[1] + msmt_uid = msmt_uid[:-5] + yield (None, msm, msmt_uid) + + elif fmt == "yaml": + log.info("Skipping YAML") + + else: + log.info("Ignoring invalid post") + + elif fn.endswith("/index.json.gz"): + pass + + else: + raise RuntimeError(f"Unexpected [mini]can filename '{fn}'") + + + +def list_cans_on_s3_for_a_day(day: date) -> list: + return list( + map(lambda fe: (fe.s3path, fe.size), iter_cans_on_s3_for_a_day(day)) + ) + + +def iter_cans_on_s3_for_a_day(day: date): + """List legacy cans.""" + prefix = f"canned/{day}/" + paginator = s3.get_paginator("list_objects_v2") + files = [] + for r in paginator.paginate(Bucket=CAN_BUCKET_NAME, Prefix=prefix): + if ("Contents" in r) ^ (day <= date(2020, 10, 21)): + # The last day with cans is 2020-10-21 + log.warn("%d can files found!", len(r.get("Contents", []))) + + for f in r.get("Contents", []): + s3path = f["Key"] + filename = s3path.split("/")[-1] + country_code = None + ext = None + if filename.endswith(".tar.lz4"): + test_name = filename.split(".")[0].replace("_", "") + country_code = "XX" + ext = "tar.lz4" + elif filename.endswith(".json.lz4") or filename.endswith(".yaml.lz4"): + parts = filename.split("-") + country_code = parts[1] + test_name = parts[3].replace("_", "") + ext = ".".join(filename.split(".")[-2:]) + else: + if filename != "index.json.gz": + log.warn(f"found an unexpected filename {filename}") + continue + + file_entry = FileEntry( + day=day, + country_code=country_code, + test_name=test_name, + filename=filename, + size=f["Size"], + ext=ext, + s3path=s3path, + bucket_name=CAN_BUCKET_NAME, + ) + yield file_entry + + +class FileEntry(NamedTuple): + day: date + country_code: str + test_name: str + filename: str + size: int + ext: str + s3path: str + bucket_name: str + + def output_path(self, dst_dir: Path) -> Path: + return ( + dst_dir + / self.test_name + / self.country_code + / f"{self.day:%Y-%m-%d}" + / self.filename + ) + + def matches_filter(self, ccs: Set[str], testnames: Set[str]) -> bool: + if self.country_code and ccs and self.country_code not in ccs: + return False + + if self.test_name and testnames and self.test_name not in testnames: + return False + + return True + + def log_download(self) -> None: + s = self.size / 1024 / 1024 + d = "M" + if s < 1: + s = self.size / 1024 + d = "K" + log.info(f"Downloading can {self.s3path} size {s:.1f} {d}B") + + +def iter_file_entries(prefix: str) -> Generator[FileEntry, None, None]: + paginator = s3.get_paginator("list_objects_v2") + for r in paginator.paginate(Bucket=MC_BUCKET_NAME, Prefix=prefix): + for f in r.get("Contents", []): + s3path = f["Key"] + filename = s3path.split("/")[-1] + parts = filename.split("_") + test_name, _, _, ext = parts[2].split(".", 3) + file_entry = FileEntry( + # We need to truncate the first 8 chars, because of + # inconsitencies between the old and new filenames + day=datetime.strptime(parts[0][:8], "%Y%m%d").date(), + country_code=parts[1], + test_name=test_name, + filename=filename, + s3path=s3path, + size=f["Size"], + ext=ext, + bucket_name=MC_BUCKET_NAME, + ) + yield file_entry + + +def list_all_testnames() -> Set[str]: + testnames = set() + paginator = s3.get_paginator("list_objects_v2") + for r in paginator.paginate(Bucket=MC_BUCKET_NAME, Prefix="jsonl/", Delimiter="/"): + for f in r.get("CommonPrefixes", []): + testnames.add(f["Prefix"].split("/")[-2]) + return testnames + + +def get_search_prefixes(testnames: Set[str], ccs: Set[str]) -> List[str]: + """ + get_search_prefixes will return all the prefixes inside of the new jsonl + bucket that match the given testnames and ccs. + If the ccs list is empty we will return prefixes for all countries for + which that particular testname as measurements. + """ + prefixes = [] + paginator = s3.get_paginator("list_objects_v2") + for tn in testnames: + for r in paginator.paginate( + Bucket=MC_BUCKET_NAME, Prefix=f"jsonl/{tn}/", Delimiter="/" + ): + for f in r.get("CommonPrefixes", []): + prefix = f["Prefix"] + cc = prefix.split("/")[-2] + if ccs and cc not in ccs: + continue + prefixes.append(prefix) + return prefixes + + +def get_jsonl_prefixes( + ccs: Set[str], testnames: Set[str], start_day: date, end_day: date +) -> List[str]: + legacy_prefixes = [ + f"raw/{d:%Y%m%d}" + for d in date_interval(max(date(2020, 10, 20), start_day), end_day) + ] + if not testnames: + testnames = list_all_testnames() + prefixes = [] + if start_day < date(2020, 10, 21): + prefixes = get_search_prefixes(testnames, ccs) + combos = list(itertools.product(prefixes, date_interval(start_day, end_day))) + # This results in a faster listing in cases where we need only a small time + # window or few testnames. For larger windows of time, we are better off + # just listing everything. + if len(combos) > 1_000_000: # XXX we might want to tweak this parameter a bit + prefixes = [f"{p}{d:%Y%m%d}" for p, d in combos] + + return prefixes + legacy_prefixes + +def list_file_entries(prefix): + return [fe for fe in iter_file_entries(prefix)] + +def jsonl_in_range( + ccs: Set[str], testnames: Set[str], start_day: date, end_day: date +) -> Generator[FileEntry, None, None]: + + prefixes = get_jsonl_prefixes(ccs, testnames, start_day, end_day) + with Pool(processes=MAX_PROCESS_COUNT) as pool: + fe = pool.imap_unordered( + list_file_entries, + prefixes + ) + for fe_list in fe: + for file_entry in fe_list: + if file_entry.ext != "jsonl.gz": + continue + + if not file_entry.matches_filter(ccs, testnames): + continue + + if file_entry.day < start_day or file_entry.day >= end_day: + continue + + if file_entry.size > 0: + yield file_entry + + +def list_minicans_on_s3_for_a_day( + day: date, ccs: Set[str], testnames: Set[str] +) -> list: + return list( + map( + lambda fe: (fe.s3path, fe.size), + filter( + lambda fe: fe.matches_filter(ccs, testnames), + iter_minicans_on_s3_for_a_day(day), + ), + ) + ) + + +def iter_minicans_on_s3_for_a_day(day: date) -> Generator[FileEntry, None, None]: + """List minicans. Filter them by CCs and testnames + Testnames are without underscores. + """ + # s3cmd ls s3://ooni-data-eu-fra/raw/20210202 + tstamp = day.strftime("%Y%m%d") + prefix = f"raw/{tstamp}/" + for file_entry in iter_file_entries(prefix): + if file_entry.ext != "tar.gz": + continue + yield file_entry + + +def _calculate_etr(t0, now, start_day, day, stop_day, can_num, can_tot_count) -> int: + """Estimate total runtime in seconds. + stop_day is not included, can_num starts from 0 + """ + tot_days_count = (stop_day - start_day).days + elapsed = now - t0 + days_done = (day - start_day).days + fraction_of_day_done = (can_num + 1) / float(can_tot_count) + etr = elapsed * tot_days_count / (days_done + fraction_of_day_done) + return etr + + +def _update_eta(t0, start_day, day, stop_day, can_num, can_tot_count): + """Generate metric process_s3_measurements_eta expressed as epoch""" + try: + now = time.time() + etr = _calculate_etr(t0, now, start_day, day, stop_day, can_num, can_tot_count) + eta = t0 + etr + metrics.gauge("process_s3_measurements_eta", eta) + except: + pass + + +def date_interval(start_day: date, end_day: date): + today = date.today() + if not start_day or start_day >= today: + raise StopIteration + day = start_day + # the last day is not included + stop_day = end_day if end_day < today else today + while day < stop_day: + yield day + day += timedelta(days=1) + + +@metrics.timer("download_measurement_container") +def download_measurement_container(s3cachedir: Path, file_entry: FileEntry): + diskf = file_entry.output_path(s3cachedir) + if diskf.exists() and file_entry.size == diskf.stat().st_size: + metrics.incr("cache_hit") + diskf.touch(exist_ok=True) + return diskf + metrics.incr("cache_miss") + + file_entry.log_download() + + def _cb(bytes_count): + if _cb.start_time is None: + _cb.start_time = time.time() + _cb.count = bytes_count + return + _cb.count += bytes_count + _cb.total_count += bytes_count + metrics.gauge("s3_download_percentage", _cb.total_count / _cb.total_size * 100) + try: + speed = _cb.count / 131_072 / (time.time() - _cb.start_time) + metrics.gauge("s3_download_speed_avg_Mbps", speed) + except ZeroDivisionError: + pass + + _cb.total_size = file_entry.size + _cb.total_count = 0 + _cb.start_time = None + + diskf.parent.mkdir(parents=True, exist_ok=True) + tmpf = diskf.with_suffix(".s3tmp") + with tmpf.open("wb") as f: + s3.download_fileobj(file_entry.bucket_name, file_entry.s3path, f, Callback=_cb) + f.flush() + os.fsync(f.fileno()) + metrics.gauge("fetching", 0) + tmpf.rename(diskf) + assert file_entry.size == diskf.stat().st_size + metrics.gauge("s3_download_speed_avg_Mbps", 0) + return diskf + + +def stream_measurements( + file_entries: Generator[FileEntry, None, None], + s3cachedir: Path, keep_s3_cache: bool, +) -> Generator[MsmtTup, None, None]: + + t0 = time.time() + total_size = sum(map(lambda fe: fe.size, file_entries)) + processed_size = 0 + + for fe in file_entries: + mc = download_measurement_container(s3cachedir, fe) + try: + yield from load_multiple(mc.as_posix()) + except Exception as e: + log.error(str(e), exc_info=True) + processed_size += fe.size + mbps = processed_size / (time.time() - t0) / 1_000_000 + eta = timedelta(seconds=(total_size - processed_size)/(mbps * 1_000_000)) + log.info(f"Speed: {mbps} MB/s") + log.info(f"ETA: {eta}") + if not keep_s3_cache: + try: + mc.unlink() + except FileNotFoundError: + pass + + +def stream_cans(conf, start_day: date, end_day: date) -> Generator[MsmtTup, None, None]: + """Stream cans from S3""" + log.info("Fetching older cans from S3") + t0 = time.time() + for day in date_interval(start_day, end_day): + log.info("Processing day %s", day) + + can_file_entries = itertools.chain( + iter_cans_on_s3_for_a_day(day), iter_minicans_on_s3_for_a_day(day) + ) + yield from stream_measurements(can_file_entries, conf.s3cachedir, conf.keep_s3_cache) + + if end_day: + log.info(f"Reached {end_day}, streaming cans from S3 finished") + return + + +def stream_jsonl( + conf, start_day: date, end_day: date +) -> Generator[MsmtTup, None, None]: + """Stream jsonl from S3""" + log.info("Fetching jsonl from S3") + yield from stream_measurements( + jsonl_in_range(conf.ccs, conf.testnames, start_day, end_day), + conf.s3cachedir, conf.keep_s3_cache + ) + + if end_day: + log.info(f"Reached {end_day}, streaming cans from S3 finished") + return diff --git a/oonidata/oonidata/tests/test_s3feeder.py b/oonidata/oonidata/tests/test_s3feeder.py new file mode 100644 index 00000000..f60f9f43 --- /dev/null +++ b/oonidata/oonidata/tests/test_s3feeder.py @@ -0,0 +1,51 @@ +import pytest +import time + +from pathlib import Path +from datetime import date + +from oonidata.s3feeder import iter_file_entries, create_s3_client, get_jsonl_prefixes +from oonidata.s3feeder import iter_cans_on_s3_for_a_day, jsonl_in_range, list_file_entries +from oonidata.s3feeder import stream_measurements + + +def test_iter_file_entries_new_jsonl(): + fe_list = list(iter_file_entries("jsonl/webconnectivity/IT/20201020/00/")) + assert len(fe_list) == 19 + for fe in fe_list: + assert fe.test_name == "webconnectivity" + assert fe.country_code == "IT" + assert fe.size > 0 + assert fe.bucket_name == "ooni-data-eu-fra" + assert fe.day == date(2020, 10, 20) + assert fe.ext == "jsonl.gz" + +def test_iter_file_entries_old_format(): + fe_list = list(iter_file_entries("raw/20211020/00/IT/webconnectivity/")) + assert len(fe_list) == 6 + for fe in fe_list: + assert fe.test_name == "webconnectivity" + assert fe.country_code == "IT" + assert fe.size > 0 + assert fe.bucket_name == "ooni-data-eu-fra" + assert fe.day == date(2021, 10, 20) + +def test_iter_cans_on_s3_for_a_day(): + fe_list = list(iter_cans_on_s3_for_a_day(date(2020, 1, 1))) + assert len(fe_list) == 136 + assert all(map(lambda fe: fe.bucket_name == "ooni-data", fe_list)) + +def test_get_jsonl_prefixes(): + prefixes = list(get_jsonl_prefixes([], [], date(2020, 1, 1), date(2020, 1, 2))) + assert len(prefixes) == 2516 + +def test_jsonl_in_range(): + fe_list = list(jsonl_in_range([], [], date(2020, 1, 1), date(2020, 1, 2))) + assert len(fe_list) == 1125 + +def test_stream_jsonl_measurements(tmp_path): + fe_list = list_file_entries("jsonl/telegram/IT/20201009/00/") + assert len(fe_list) == 1 + for _, msmt, msmt_uid in stream_measurements(fe_list, tmp_path, False): + assert msmt["probe_cc"] == "IT" + assert msmt["test_name"] == "telegram" diff --git a/af/fastpath/fastpath/utils.py b/oonidata/oonidata/utils.py similarity index 100% rename from af/fastpath/fastpath/utils.py rename to oonidata/oonidata/utils.py diff --git a/oonidata/setup.py b/oonidata/setup.py new file mode 100644 index 00000000..7c03bdff --- /dev/null +++ b/oonidata/setup.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from setuptools import setup + +with open("README.md", "r", encoding="utf-8") as in_file: + long_description = in_file.read() + +setup( + name="oonidata", + version="0.0.1", + author="Open Observatory of Network Interference (OONI)", + author_email="contact@openobservatory.org", + description="Interact with OONI network measurement data", + long_description=long_description, + long_description_content_type="text/markdown", + python_requires=">=3.7.0", + packages=["oonidata"], + entry_points={"console_scripts": [ + "oonidata=oonidata.main:main", + ]}, + install_requires=[ + "boto3", + "pyyaml", + "ujson", + "tqdm", + "lz4" + ], + project_urls={ + "Bug Tracker": "https://github.com/ooni/backend/issues" + }, + classifiers=[ + "Development Status :: 2 - Pre-Alpha", + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent" + ], + zip_safe=False, +)