Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
74d9da4
First draft of function to resolve concepts
awildturtok Nov 12, 2025
e028841
remove qualification from CTConditionContext to make integration with…
awildturtok Nov 12, 2025
4cccd65
Collect Auxiliary columns to then generate proper signature
awildturtok Nov 13, 2025
209d881
properly insert function (only postgres atm)
awildturtok Nov 17, 2025
8620c65
first draft towards applying matching stats
awildturtok Nov 18, 2025
51f1aea
Reworks registration of MatchingStats.
awildturtok Jan 29, 2025
98b69c4
fixes usage in MatchingStatsTests.java
awildturtok Jan 29, 2025
f09cdaa
wip
awildturtok Jan 30, 2025
107407d
implements conversion and extraction of matching stats for Hana and P…
awildturtok Dec 4, 2025
1b4949f
Merge remote-tracking branch 'origin/develop' into feature/matching-s…
awildturtok Jan 7, 2026
3f0b1bb
disable daterange and money type compat check
awildturtok Jan 7, 2026
ff18d93
adds missing coalesce for primaryColumn
awildturtok Jan 7, 2026
b86af08
don't remove connector column
awildturtok Jan 7, 2026
7735740
removes very verbose logging
awildturtok Jan 8, 2026
ed98bd2
use cursor to iterate results
awildturtok Jan 8, 2026
368b4f5
adds timing to SQL matching stats fetching
awildturtok Jan 8, 2026
5ffdade
use transaction to disable autocommit
awildturtok Jan 8, 2026
0cb22e8
log select statement for debugging
awildturtok Jan 8, 2026
8e64d47
add PARALLEL SAFE marker to created functions
awildturtok Jan 8, 2026
8d5cd0a
reworks SqlMatchingStats function as flattened table
awildturtok Jan 14, 2026
44899ee
adds some logging
awildturtok Jan 14, 2026
e2df161
adds missing concept Id
awildturtok Jan 14, 2026
750a03b
adds missing error handling in outer loop
awildturtok Jan 14, 2026
152fc72
adds grouping by params to map into most specific child
awildturtok Jan 14, 2026
37009b0
fixes wrong usage of immutable datasctructure
awildturtok Jan 14, 2026
67a0547
try to insert the join-tables
awildturtok Jan 15, 2026
34b8bb5
adds typing to fields
awildturtok Jan 15, 2026
4ff4776
remove primary key (nullability issue) and add index
awildturtok Jan 15, 2026
11bd235
fix index creation
awildturtok Jan 15, 2026
8e56eda
fix index creation #2
awildturtok Jan 15, 2026
0c355b3
fix index creation #3
awildturtok Jan 15, 2026
811afeb
outcomment index creation
awildturtok Jan 15, 2026
ced3e38
delete prior table
awildturtok Jan 15, 2026
bf1f37e
first draft of using join tables
awildturtok Jan 19, 2026
f04d739
fix dupe join
awildturtok Jan 19, 2026
6aef402
cleanup of SqlMatchingStats
awildturtok Jan 20, 2026
f5fe46e
fix naming
awildturtok Jan 20, 2026
8ef440f
adds exception handling
awildturtok Jan 20, 2026
556f5ee
hopefully fixes reference on ColumnValue
awildturtok Jan 20, 2026
4faabfb
hopefully fixes reference on ColumnValue
awildturtok Jan 20, 2026
46f46a1
hopefully fixes reference on ColumnValue
awildturtok Jan 21, 2026
c53ce5f
hopefully fixes reference on ColumnValue
awildturtok Jan 22, 2026
3093a9a
minor fixes for extraction of matching stats
awildturtok Jan 22, 2026
53ba076
more cleanup
awildturtok Jan 22, 2026
11c304f
more cleanup
awildturtok Jan 22, 2026
3d02375
fix union to intersection
awildturtok Jan 22, 2026
4366d85
cleanup
awildturtok Feb 2, 2026
1ef85b9
Cleanup of failing tests
awildturtok Feb 11, 2026
e301226
Merge branch 'develop' into feature/matching-stats-as-join-table
awildturtok Feb 17, 2026
14b72b5
fix hana insertion
awildturtok Feb 17, 2026
7b9d34d
Some more Hana fixes
awildturtok Feb 18, 2026
6e7d423
Merge branch 'develop' into feature/matching-stats-as-join-table
awildturtok Mar 5, 2026
04c1469
Merge remote-tracking branch 'origin/develop' into feature/matching-s…
awildturtok Apr 7, 2026
9871265
Fixes computation of VARCHAR columns
awildturtok Apr 8, 2026
6aea1c9
Fix quoting for hana
awildturtok Apr 9, 2026
bfaaba1
Cleanup
awildturtok Apr 9, 2026
ec0a4cb
restructures CTCondition.Expression
awildturtok Apr 20, 2026
b4dbe1a
Merge branch 'develop' into feature/matching-stats-as-join-table
awildturtok Apr 20, 2026
ed0c95d
fixes Hana isNull checks
awildturtok Apr 21, 2026
8561957
Merge remote-tracking branch 'origin/feature/matching-stats-as-join-t…
awildturtok Apr 21, 2026
898a7f2
Merge branch 'develop' into feature/matching-stats-as-join-table
awildturtok Jun 2, 2026
5f7fdee
fixes test importer and some broken tests
awildturtok Jun 4, 2026
c38aff0
Merge branch 'develop' into feature/matching-stats-as-join-table
awildturtok Jun 4, 2026
bd6dcb1
cleanup
awildturtok Jun 4, 2026
5524680
Merge remote-tracking branch 'origin/feature/matching-stats-as-join-t…
awildturtok Jun 4, 2026
43c95e8
cleanup of unused files
awildturtok Jun 8, 2026
dae7276
Merge branch 'develop' into feature/matching-stats-as-join-table
awildturtok Jun 9, 2026
a53830a
Cleanup
awildturtok Jun 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,43 @@
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.bakdata.conquery.models.worker.WorkerHandler;
import lombok.AllArgsConstructor;

/**
* Propagates changes of stored entities to relevant ConQuery shards in the cluster.
*/
@AllArgsConstructor
public
class ClusterStorageListener implements StorageListener {

private final JobManager jobManager;
private final DatasetRegistry<DistributedNamespace> datasetRegistry;
public record ClusterStorageListener(JobManager jobManager, DatasetRegistry<DistributedNamespace> datasetRegistry) implements StorageListener {

@Override
public void onAddSecondaryId(SecondaryIdDescription secondaryId) {
datasetRegistry.get(secondaryId.getDataset()).getWorkerHandler().sendToAll(new UpdateSecondaryId(secondaryId));
datasetRegistry().get(secondaryId.getDataset()).getWorkerHandler().sendToAll(new UpdateSecondaryId(secondaryId));
}

@Override
public void onDeleteSecondaryId(SecondaryIdDescriptionId secondaryId) {
datasetRegistry.get(secondaryId.getDataset()).getWorkerHandler().sendToAll(new RemoveSecondaryId(secondaryId));
datasetRegistry().get(secondaryId.getDataset()).getWorkerHandler().sendToAll(new RemoveSecondaryId(secondaryId));
}

@Override
public void onAddTable(Table table) {
datasetRegistry.get(table.getDataset()).getWorkerHandler().sendToAll(new UpdateTable(table));
datasetRegistry().get(table.getDataset()).getWorkerHandler().sendToAll(new UpdateTable(table));
}

@Override
public void onRemoveTable(TableId table) {
datasetRegistry.get(table.getDataset()).getWorkerHandler().sendToAll(new RemoveTable(table));
datasetRegistry().get(table.getDataset()).getWorkerHandler().sendToAll(new RemoveTable(table));
}

@Override
public void onAddConcept(Concept<?> concept) {
WorkerHandler handler = datasetRegistry.get(concept.getDataset()).getWorkerHandler();
WorkerHandler handler = datasetRegistry().get(concept.getDataset()).getWorkerHandler();
SimpleJob simpleJob = new SimpleJob(String.format("sendToAll : Add %s ", concept.getId()), () -> handler.sendToAll(new UpdateConcept(concept)));
jobManager.addSlowJob(simpleJob);
jobManager().addSlowJob(simpleJob);
}

@Override
public void onDeleteConcept(ConceptId concept) {
WorkerHandler handler = datasetRegistry.get(concept.getDataset()).getWorkerHandler();
WorkerHandler handler = datasetRegistry().get(concept.getDataset()).getWorkerHandler();
SimpleJob simpleJob = new SimpleJob("sendToAll: remove " + concept, () -> handler.sendToAll(new RemoveConcept(concept)));
jobManager.addSlowJob(simpleJob);
jobManager().addSlowJob(simpleJob);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.bakdata.conquery.mode.NamespaceHandler;
import com.bakdata.conquery.mode.cluster.InternalMapperFactory;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.LocalNamespace;
import com.bakdata.conquery.models.worker.ShardNodeInformation;
Expand All @@ -29,6 +30,8 @@ public DelegateManager<LocalNamespace> provideManager(ConqueryConfig config, Env

final ConnectionManager connectionManager = config.getSqlConnectorConfig().toConnectionManager(environment);

final JobManager jobManager = ManagerProvider.newJobManager(config);

final MetaStorage storage = new MetaStorage(config.getStorage());
final InternalMapperFactory internalMapperFactory = new InternalMapperFactory(config, environment.getValidator());
final NamespaceHandler<LocalNamespace> namespaceHandler = new LocalNamespaceHandler(config, internalMapperFactory, connectionManager, clock);
Expand All @@ -40,7 +43,7 @@ public DelegateManager<LocalNamespace> provideManager(ConqueryConfig config, Env
datasetRegistry,
storage,
new FailingImportHandler(),
new LocalStorageListener(),
new LocalStorageListener(jobManager, datasetRegistry),
EMPTY_NODE_PROVIDER,
List.of(),
internalMapperFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.LocalNamespace;
import com.bakdata.conquery.sql.conquery.SqlExecutionManager;
import com.bakdata.conquery.sql.conquery.SqlMatchingStats;
import com.bakdata.conquery.sql.conversion.NodeConversions;
import com.bakdata.conquery.sql.conversion.SqlConverter;
import com.bakdata.conquery.sql.conversion.dialect.DialectBundle;
Expand Down Expand Up @@ -62,7 +63,8 @@ public LocalNamespace createNamespace(
dslContext, sqlStorageHandler,
namespaceData.jobManager(),
namespaceData.filterSearch(),
sqlEntityResolver
sqlEntityResolver,
connection.getConnection()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import com.bakdata.conquery.models.datasets.SecondaryIdDescription;
import com.bakdata.conquery.models.datasets.Table;
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept;
import com.bakdata.conquery.models.identifiable.ids.specific.ConceptId;
import com.bakdata.conquery.models.identifiable.ids.specific.SecondaryIdDescriptionId;
import com.bakdata.conquery.models.identifiable.ids.specific.TableId;
import lombok.Data;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.LocalNamespace;

@Data
public class LocalStorageListener implements StorageListener {
public record LocalStorageListener(JobManager jobManager, DatasetRegistry<LocalNamespace> datasetRegistry) implements StorageListener {

@Override
public void onAddSecondaryId(SecondaryIdDescription secondaryId) {
Expand All @@ -31,9 +33,13 @@ public void onRemoveTable(TableId table) {

@Override
public void onAddConcept(Concept<?> concept) {
LocalNamespace namespace = datasetRegistry().get(concept.getDataset());
namespace.getMatchingStats().createConceptIdJoinTable((TreeConcept) concept);
}

@Override
public void onDeleteConcept(ConceptId concept) {
LocalNamespace namespace = datasetRegistry().get(concept.getDataset());
namespace.getMatchingStats().deleteConceptIdJoinTable(concept);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.bakdata.conquery.mode.local;

import java.sql.SQLException;

import javax.annotation.CheckForNull;

import com.bakdata.conquery.models.config.DatabaseConnectionConfig;
Expand Down
Comment thread
thoniTUB marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.bakdata.conquery.mode.local;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

import com.bakdata.conquery.models.datasets.Dataset;
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept;
import com.bakdata.conquery.models.jobs.Job;
import com.bakdata.conquery.sql.conquery.SqlMatchingStats;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Data;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.builder.ToStringExclude;
import org.checkerframework.checker.nullness.qual.Nullable;

@Slf4j
@Data
public class UpdateMatchingStatsSqlJob extends Job {

@ToString.Exclude
private final List<Concept<?>> concepts;
private final Dataset dataset;

@ToString.Exclude
private final SqlMatchingStats matchingStats;


@Override
public void execute() throws Exception {

log.info("BEGIN collecting SQL matching stats for {}", dataset);

Stopwatch stopwatch = Stopwatch.createStarted();

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newVirtualThreadPerTaskExecutor());

List<ListenableFuture<?>> jobs = new ArrayList<>();

for (Concept<?> concept : concepts) {
if (!(concept instanceof TreeConcept)) {
continue;
}
jobs.add(matchingStats.collectMatchingStatsForConcept((TreeConcept) concept, executorService));
}

ListenableFuture<List<@Nullable Object>> all = Futures.allAsList(jobs);

while (!all.isDone()) {
if (isCancelled()) {
all.cancel(true);
log.debug("CANCELLED update matching stats for {}", getDataset(), all.exceptionNow());
return;
}

all.get(5, TimeUnit.SECONDS);
log.trace("WAITING for matching stats to finish {}", getDataset());

if (all.state().equals(Future.State.FAILED)) {
log.error("FAILED update matching stats for {}", getDataset(), all.exceptionNow());
return;
}
}

log.debug("DONE collecting SQL matching stats for {} within {}", dataset, stopwatch);
}

@Override
public String getLabel() {
return "Collect matching stats for %s (%s concepts)".formatted(dataset.getName(), concepts.size());
}
}
Loading
Loading