Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 37 additions & 25 deletions cli/api/dbadapters/bigquery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,44 @@ export interface IBigQueryExecutionOptions {
reservation?: string;
}

export type BigQueryClientProvider = (projectId?: string) => BigQuery;

export function createBigQueryClientProvider(
credentials: dataform.IBigQuery
): BigQueryClientProvider {
const clients = new Map<string, BigQuery>();
return (projectId?: string) => {
projectId = projectId || credentials.projectId;
if (!clients.has(projectId)) {
clients.set(
projectId,
new BigQuery({
projectId,
scopes: EXTRA_GOOGLE_SCOPES,
location: credentials.location,
credentials: credentials.credentials && JSON.parse(credentials.credentials)
})
);
}
return clients.get(projectId);
};
}

export class BigQueryDbAdapter implements IDbAdapter {
private bigQueryCredentials: dataform.IBigQuery;
private pool: PromisePoolExecutor;

private readonly clients = new Map<string, BigQuery>();
private readonly bigqueryClient?: BigQuery;
private clientProvider: BigQueryClientProvider;

constructor(
credentials: dataform.IBigQuery,
options?: { concurrencyLimit?: number; bigqueryClient?: BigQuery }
options?: {
concurrencyLimit?: number;
clientProvider?: BigQueryClientProvider;
}
) {
this.bigQueryCredentials = credentials;
this.bigqueryClient = options?.bigqueryClient;
this.clientProvider = options?.clientProvider || createBigQueryClientProvider(credentials);

// Bigquery allows 50 concurrent queries, and a rate limit of 100/user/second by default.
// These limits should be safely low enough for most projects.
this.pool = new PromisePoolExecutor({
Expand Down Expand Up @@ -174,7 +199,7 @@ export class BigQueryDbAdapter implements IDbAdapter {
datasetIds.map(async datasetId => {
const [tables] = await this.getClient(database)
.dataset(datasetId)
.getTables();
.getTables({ autoPaginate: true, maxResults: 1000 });
await Promise.all(
tables.map(async table => {
const metadata = await this.table({
Expand Down Expand Up @@ -272,14 +297,14 @@ export class BigQueryDbAdapter implements IDbAdapter {
}

public async schemas(database: string): Promise<string[]> {
const data = await this.getClient(database).getDatasets();
const data = await this.getClient(database).getDatasets({ autoPaginate: true, maxResults: 1000 });
return data[0].map(dataset => dataset.id);
}

public async createSchema(database: string, schema: string): Promise<void> {
await this.execute(
`create schema if not exists \`${database || this.bigQueryCredentials.projectId}.${schema}\``,
{ bigquery: { location: this.bigQueryCredentials.location } }
{ bigquery: { location: this.bigQueryCredentials.location || "US" } }
);
}

Expand Down Expand Up @@ -320,23 +345,7 @@ export class BigQueryDbAdapter implements IDbAdapter {
}

private getClient(projectId?: string) {
if (this.bigqueryClient) {
return this.bigqueryClient;
}
projectId = projectId || this.bigQueryCredentials.projectId;
if (!this.clients.has(projectId)) {
this.clients.set(
projectId,
new BigQuery({
projectId,
scopes: EXTRA_GOOGLE_SCOPES,
location: this.bigQueryCredentials.location,
credentials:
this.bigQueryCredentials.credentials && JSON.parse(this.bigQueryCredentials.credentials)
})
);
}
return this.clients.get(projectId);
return this.clientProvider(projectId);
}

private async runQuery(
Expand Down Expand Up @@ -544,11 +553,14 @@ function convertFieldType(type: string) {
case "INT64":
return dataform.Field.Primitive.INTEGER;
case "NUMERIC":
case "BIGNUMERIC":
return dataform.Field.Primitive.NUMERIC;
case "BOOL":
case "BOOLEAN":
return dataform.Field.Primitive.BOOLEAN;
case "STRING":
case "JSON":
case "INTERVAL":
return dataform.Field.Primitive.STRING;
case "DATE":
return dataform.Field.Primitive.DATE;
Expand Down
77 changes: 72 additions & 5 deletions cli/api/dbadapters/bigquery_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ suite("BigQueryDbAdapter", () => {
const projectId = "project1";

const credentials = dataform.BigQuery.create({ projectId, location: "US" });
const adapter = new BigQueryDbAdapter(credentials, { bigqueryClient: instance(mockBigQuery) });
const adapter = new BigQueryDbAdapter(credentials, {
clientProvider: () => instance(mockBigQuery)
});

when(mockBigQuery.dataset(schemaName)).thenReturn(instance(mockDataset));
// getTables returns an array where the first element is an array of tables.
// Each table object needs an 'id' property.
when(mockDataset.getTables()).thenReturn(Promise.resolve([[{ id: tableName }]] as any));
when(mockDataset.getTables(anything())).thenReturn(Promise.resolve([[{ id: tableName }]] as any));
when(mockDataset.table(tableName)).thenReturn(instance(mockTable));
when(mockTable.getMetadata()).thenReturn(
Promise.resolve([
Expand Down Expand Up @@ -54,10 +56,12 @@ suite("BigQueryDbAdapter", () => {
const projectId = "project";

const credentials = dataform.BigQuery.create({ projectId, location: "US" });
const adapter = new BigQueryDbAdapter(credentials, { bigqueryClient: instance(mockBigQuery) });
const adapter = new BigQueryDbAdapter(credentials, {
clientProvider: () => instance(mockBigQuery)
});

when(mockBigQuery.dataset(schemaName)).thenReturn(instance(mockDataset));
when(mockDataset.getTables()).thenReturn(Promise.resolve([[{ id: tableName }]] as any));
when(mockDataset.getTables(anything())).thenReturn(Promise.resolve([[{ id: tableName }]] as any));
when(mockDataset.table(tableName)).thenReturn(instance(mockTable));
when(mockTable.getMetadata()).thenReturn(
Promise.resolve([
Expand All @@ -70,7 +74,7 @@ suite("BigQueryDbAdapter", () => {
] as any)
);

when(mockBigQuery.getDatasets()).thenReturn(Promise.resolve([[{ id: schemaName }]] as any));
when(mockBigQuery.getDatasets(anything())).thenReturn(Promise.resolve([[{ id: schemaName }]] as any));

const result = await adapter.tables(projectId);

Expand All @@ -79,4 +83,67 @@ suite("BigQueryDbAdapter", () => {
expect(result[0].target.schema).to.equal(schemaName);
expect(result[0].target.name).to.equal(tableName);
});

test("setMetadata handles action without columns", async () => {
// Partial mock for BigQuery client to avoid real network calls
const mockBigQuery: any = {
dataset: () => ({
table: () => ({
getMetadata: () => Promise.resolve([{ schema: { fields: [] } }]),
setMetadata: (metadata: any) => {
expect(metadata.description).to.equal("test");
return Promise.resolve([]);
}
})
})
};

const credentials = dataform.BigQuery.create({ projectId: "p", location: "US" });
const adapter = new BigQueryDbAdapter(credentials, {
concurrencyLimit: 1,
clientProvider: () => mockBigQuery
});

const action = dataform.ExecutionAction.create({
target: { database: "db", schema: "sch", name: "tab" },
actionDescriptor: { description: "test" }
// columns is missing/null in this action
});

// This should not throw "cannot read property 'find' of undefined"
await adapter.setMetadata(action);
});

test("setMetadata correctly maps column descriptions", async () => {
const mockBigQuery: any = {
dataset: () => ({
table: () => ({
getMetadata: () => Promise.resolve([{
schema: {
fields: [{ name: "id", type: "INTEGER" }]
}
}]),
setMetadata: (metadata: any) => {
expect(metadata.schema[0].description).to.equal("id desc");
return Promise.resolve([]);
}
})
})
};

const credentials = dataform.BigQuery.create({ projectId: "p", location: "US" });
const adapter = new BigQueryDbAdapter(credentials, {
concurrencyLimit: 1,
clientProvider: () => mockBigQuery
});

const action = dataform.ExecutionAction.create({
target: { database: "db", schema: "sch", name: "tab" },
actionDescriptor: {
columns: [{ path: ["id"], description: "id desc" }]
}
});

await adapter.setMetadata(action);
});
});
Loading