Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.table.timeline.dto.v2;

import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.InstantGenerator;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* The data transfer object of instant.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class InstantDTO {

@JsonProperty("action")
public String action;
@JsonProperty("requestTs")
public String requestedTime;
@JsonProperty("completionTs")
public String completionTime;
@JsonProperty("state")
public String state;

public static InstantDTO fromInstant(HoodieInstant instant) {
if (null == instant) {
return null;
}

InstantDTO dto = new InstantDTO();
dto.action = instant.getAction();
dto.requestedTime = instant.requestedTime();
dto.completionTime = instant.getCompletionTime();
dto.state = instant.getState().toString();
return dto;
}

public static HoodieInstant toInstant(InstantDTO dto, InstantGenerator factory) {
if (null == dto) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you rename factory to instantGenerator here? The same type is referred to as instantGenerator in TimelineDTOV2.toTimeline and in TimelineHandler, so calling it factory in this one spot breaks the naming convention and makes readers wonder whether it's a different object.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

return null;
}

return factory.createNewInstant(HoodieInstant.State.valueOf(dto.state), dto.action,
dto.requestedTime, dto.completionTime);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.table.timeline.dto.v2;

import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.timeline.TimelineFactory;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;
import java.util.stream.Collectors;

/**
* The data transfer object of timeline.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class TimelineDTOV2 {

@JsonProperty("instants")
public List<InstantDTO> instants;

public static TimelineDTOV2 fromTimeline(HoodieTimeline timeline) {
TimelineDTOV2 dto = new TimelineDTOV2();
dto.instants = timeline.getInstantsAsStream().map(InstantDTO::fromInstant).collect(Collectors.toList());
return dto;
}

public static HoodieTimeline toTimeline(TimelineDTOV2 dto, HoodieTableMetaClient metaClient) {
InstantGenerator instantGenerator = metaClient.getInstantGenerator();
TimelineFactory factory = metaClient.getTimelineLayout().getTimelineFactory();
// TODO: For Now, we will assume, only active-timeline will be transferred.
return factory.createDefaultTimeline(dto.instants.stream().map(d -> InstantDTO.toInstant(d, instantGenerator)),
metaClient.getActiveTimeline());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,

private static final String SCHEME = "http";
private static final String BASE_URL = "/v1/hoodie/view";
private static final String BASE_URL_V2 = "/v2/hoodie/view";
public static final String LATEST_PARTITION_SLICES_URL = String.format("%s/%s", BASE_URL, "slices/partition/latest/");
public static final String LATEST_PARTITION_SLICES_INFLIGHT_URL = String.format("%s/%s", BASE_URL, "slices/partition/latest/inflight/");
public static final String LATEST_PARTITION_SLICES_STATELESS_URL = String.format("%s/%s", BASE_URL, "slices/partition/latest/stateless/");
Expand Down Expand Up @@ -105,7 +106,13 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
public static final String LAST_INSTANT_URL = String.format("%s/%s", BASE_URL, "timeline/instant/last");
public static final String LAST_INSTANTS_URL = String.format("%s/%s", BASE_URL, "timeline/instants/last");

public static final String INSTANT_DETAILS_URL = String.format("%s/%s", BASE_URL_V2, "timeline/instant");

public static final String TIMELINE_URL = String.format("%s/%s", BASE_URL, "timeline/instants/all");
public static final String TIMELINE_V2_URL = String.format("%s/%s", BASE_URL_V2, "timeline/instants/all");

public static final String TABLE_CONFIG_V2_URL = String.format("%s/%s", BASE_URL_V2, "table/config");
public static final String SCHEMA_HISTORY_V2_URL = String.format("%s/%s", BASE_URL_V2, "table/schema/history");

// POST Requests
public static final String REFRESH_TABLE_URL = String.format("%s/%s", BASE_URL, "refresh/");
Expand All @@ -116,6 +123,8 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
public static final String PARTITIONS_PARAM = "partitions";
public static final String BASEPATH_PARAM = "basepath";
public static final String INSTANT_PARAM = "instant";
public static final String INSTANT_ACTION_PARAM = "instantaction";
public static final String INSTANT_STATE_PARAM = "instantstate";
public static final String MAX_INSTANT_PARAM = "maxinstant";
public static final String CURRENT_INSTANT_PARAM = "currentinstant";
public static final String MIN_INSTANT_PARAM = "mininstant";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
import org.apache.hudi.common.table.timeline.dto.InstantDTO;
import org.apache.hudi.common.table.timeline.dto.TimelineDTO;
import org.apache.hudi.common.table.timeline.dto.v2.TimelineDTOV2;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
Expand Down Expand Up @@ -171,6 +172,18 @@ private static String getMinInstantParam(Context ctx) {
return ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MIN_INSTANT_PARAM, String.class).getOrDefault("");
}

private static String getInstantParam(Context ctx) {
return ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INSTANT_PARAM, String.class).getOrThrow(e -> new BadRequestResponse("INSTANT_PARAM is required"));
}

private static String getInstantActionParam(Context ctx) {
return ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INSTANT_ACTION_PARAM, String.class).getOrThrow(e -> new BadRequestResponse("INSTANT_ACTION_PARAM is required"));
}

private static String getInstantStateParam(Context ctx) {
return ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INSTANT_STATE_PARAM, String.class).getOrThrow(e -> new BadRequestResponse("INSTANT_STATE_PARAM is required"));
}
Comment thread
voonhous marked this conversation as resolved.

private static String getMarkerDirParam(Context ctx) {
return ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM, String.class).getOrDefault("");
}
Expand All @@ -185,6 +198,9 @@ public void register() {
registerDataFilesAPI();
registerFileSlicesAPI();
registerTimelineAPI();
if (timelineServiceConfig.enableUi) {
registerTimelineV2API();
}
if (markerHandler != null) {
registerMarkerAPI();
}
Expand Down Expand Up @@ -242,6 +258,43 @@ private void registerTimelineAPI() {
}, false));
}

/**
* Register v2 Timeline API calls used by the Timeline UI. Gated behind --enable-ui.
*/
private void registerTimelineV2API() {
app.get(RemoteHoodieTableFileSystemView.TIMELINE_V2_URL, new ViewHandler(ctx -> {
Comment thread
voonhous marked this conversation as resolved.
metricsRegistry.add("TIMELINE_V2", 1);
TimelineDTOV2 dto = instantHandler.getTimelineV2(getBasePathParam(ctx));
writeValueAsString(ctx, dto);
}, false));

app.get(RemoteHoodieTableFileSystemView.INSTANT_DETAILS_URL, new ViewHandler(ctx -> {
metricsRegistry.add("INSTANT_DETAILS", 1);
Object instantDetails = instantHandler.getInstantDetails(getBasePathParam(ctx),
getInstantParam(ctx), getInstantActionParam(ctx), getInstantStateParam(ctx));
writeValueAsString(ctx, instantDetails);
}, false));

app.get(RemoteHoodieTableFileSystemView.TABLE_CONFIG_V2_URL, new ViewHandler(ctx -> {
metricsRegistry.add("TABLE_CONFIG", 1);
writeValueAsString(ctx, instantHandler.getTableConfig(getBasePathParam(ctx)));
}, false));

app.get(RemoteHoodieTableFileSystemView.SCHEMA_HISTORY_V2_URL, new ViewHandler(ctx -> {
metricsRegistry.add("SCHEMA_HISTORY", 1);
Comment thread
voonhous marked this conversation as resolved.
int limit;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the limit parsing and validation is done inline here, while every other query param in this class is extracted to a private helper (getInstantParam, getBasePathParam, etc.). It might be worth pulling this out to something like getLimitParam(Context ctx) to keep the pattern consistent and make the lambda body easier to scan.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

try {
limit = Integer.parseInt(ctx.queryParamAsClass("limit", String.class).getOrDefault("200"));
} catch (NumberFormatException e) {
throw new BadRequestResponse("limit must be an integer");
}
if (limit <= 0 || limit > 1000) {
throw new BadRequestResponse("limit must be between 1 and 1000");
}
writeValueAsString(ctx, instantHandler.getSchemaHistory(getBasePathParam(ctx), limit));
Comment thread
voonhous marked this conversation as resolved.
}, false));
}

/**
* Register Data-Files API calls.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.timeline.service.ui.UiHandler;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import io.javalin.Javalin;
import io.javalin.core.compression.CompressionStrategy;
import io.javalin.core.util.JavalinBindException;
import io.javalin.http.staticfiles.Location;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
Expand Down Expand Up @@ -60,6 +63,7 @@ public class TimelineService {
private transient Javalin app = null;
private transient FileSystemViewManager fsViewsManager;
private transient RequestHandler requestHandler;
private transient UiHandler uiHandler;

public TimelineService(StorageConfiguration<?> storageConf, Config timelineServerConf,
FileSystemViewManager globalFileSystemViewManager) {
Expand Down Expand Up @@ -122,6 +126,10 @@ public static class Config implements Serializable {
@Parameter(names = {"--enable-marker-requests", "-em"}, description = "Enable handling of marker-related requests")
public boolean enableMarkerRequests = false;

@Builder.Default
@Parameter(names = {"--enable-ui"}, description = "Enable the Timeline UI (/ui) and its /v2/hoodie/view/ UI API endpoints")
public boolean enableUi = false;

@Builder.Default
@Parameter(names = {"--enable-remote-partitioner"}, description = "Enable remote partitioner")
public boolean enableRemotePartitioner = false;
Expand Down Expand Up @@ -232,15 +240,22 @@ private void createApp() {

app = Javalin.create(c -> {
if (!timelineServerConf.compress) {
c.compressionStrategy(io.javalin.core.compression.CompressionStrategy.NONE);
c.compressionStrategy(CompressionStrategy.NONE);
}
c.server(() -> server);
if (timelineServerConf.enableUi) {
c.addStaticFiles("/public", Location.CLASSPATH);
}
});

requestHandler = new RequestHandler(
app, storageConf, timelineServerConf, fsViewsManager);
app.get("/", ctx -> ctx.result("Hello Hudi"));
requestHandler.register();
if (timelineServerConf.enableUi) {
uiHandler = new UiHandler(app);
uiHandler.register();
}
}

public void run() throws IOException {
Expand All @@ -249,8 +264,10 @@ public void run() throws IOException {

public static FileSystemViewManager buildFileSystemViewManager(Config config, StorageConfiguration<?> conf) {
HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(conf);
// Just use defaults for now
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().build();
// Disable the metadata table for the standalone timeline server (which backs the Timeline UI):
// it serves read-only timeline data for arbitrary basepaths, so it should not require each table
// to have a metadata table present/bootstrapped. File listings fall back to direct storage listing.
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().enable(false).build();
Comment thread
voonhous marked this conversation as resolved.
HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().build();

switch (config.viewStorageType) {
Expand Down Expand Up @@ -310,5 +327,8 @@ public static void main(String[] args) throws Exception {
cfg,
viewManager);
service.run();
// Block the main thread to keep the server alive (Javalin uses daemon threads).
Runtime.getRuntime().addShutdownHook(new Thread(service::close));
Thread.currentThread().join();
}
}
Loading
Loading