From 009dd7b01b8e61d5285ed527127e2272307d1604 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Tue, 26 May 2026 00:27:43 -0700 Subject: [PATCH 1/2] honor Option contract on tryGetExistingExecution miss --- .../ExecutionsMetadataPersistService.scala | 2 +- ...ExecutionsMetadataPersistServiceSpec.scala | 310 ++++++++++++++++++ 2 files changed, 311 insertions(+), 1 deletion(-) create mode 100644 amber/src/test/scala/org/apache/texera/web/service/ExecutionsMetadataPersistServiceSpec.scala diff --git a/amber/src/main/scala/org/apache/texera/web/service/ExecutionsMetadataPersistService.scala b/amber/src/main/scala/org/apache/texera/web/service/ExecutionsMetadataPersistService.scala index 833c4433328..b9b29dff72f 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/ExecutionsMetadataPersistService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/ExecutionsMetadataPersistService.scala @@ -77,7 +77,7 @@ object ExecutionsMetadataPersistService extends LazyLogging { def tryGetExistingExecution(executionId: ExecutionIdentity): Option[WorkflowExecutions] = { try { - Some(workflowExecutionsDao.fetchOneByEid(executionId.id.toInt)) + Option(workflowExecutionsDao.fetchOneByEid(executionId.id.toInt)) } catch { case t: Throwable => logger.info("Unable to get execution. Error = " + t.getMessage) diff --git a/amber/src/test/scala/org/apache/texera/web/service/ExecutionsMetadataPersistServiceSpec.scala b/amber/src/test/scala/org/apache/texera/web/service/ExecutionsMetadataPersistServiceSpec.scala new file mode 100644 index 00000000000..5c7a687728f --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/web/service/ExecutionsMetadataPersistServiceSpec.scala @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.web.service + +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState +import org.apache.texera.amber.engine.common.Utils.maptoStatusCode +import org.apache.texera.amber.engine.common.executionruntimestate.ExecutionMetadataStore +import org.apache.texera.dao.MockTexeraDB +import org.apache.texera.dao.jooq.generated.Tables._ +import org.apache.texera.dao.jooq.generated.tables.daos.{ + UserDao, + WorkflowComputingUnitDao, + WorkflowDao, + WorkflowExecutionsDao, + WorkflowVersionDao +} +import org.apache.texera.dao.jooq.generated.tables.pojos.{ + User, + Workflow, + WorkflowComputingUnit, + WorkflowExecutions, + WorkflowVersion +} +import org.apache.texera.web.storage.ExecutionStateStore +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import java.sql.Timestamp +import java.util.UUID + +class ExecutionsMetadataPersistServiceSpec + extends AnyFlatSpec + with Matchers + with BeforeAndAfterAll + with BeforeAndAfterEach + with MockTexeraDB { + + // Randomise the seeded wid/uid so a parallel run of unrelated specs that + // happen to seed the same ids wouldn't collide on the embedded postgres. + private val testWid = 7000 + scala.util.Random.nextInt(1000) + private val testUid = 7000 + scala.util.Random.nextInt(1000) + + private var workflowDao: WorkflowDao = _ + private var workflowVersionDao: WorkflowVersionDao = _ + private var workflowExecutionsDao: WorkflowExecutionsDao = _ + private var userDao: UserDao = _ + private var computingUnitDao: WorkflowComputingUnitDao = _ + private var seededVid: Integer = _ + private var seededCuid: Integer = _ + + override protected def beforeAll(): Unit = { + initializeDBAndReplaceDSLContext() + } + + override protected def afterAll(): Unit = { + shutdownDB() + } + + override protected def beforeEach(): Unit = { + val cfg = getDSLContext.configuration() + workflowDao = new WorkflowDao(cfg) + workflowVersionDao = new WorkflowVersionDao(cfg) + workflowExecutionsDao = new WorkflowExecutionsDao(cfg) + userDao = new UserDao(cfg) + computingUnitDao = new WorkflowComputingUnitDao(cfg) + cleanup() + + val user = new User + user.setUid(testUid) + user.setName("metadata_persist_spec_user") + user.setEmail(s"user_${UUID.randomUUID()}@example.com") + user.setPassword("password") + userDao.insert(user) + + val workflow = new Workflow + workflow.setWid(testWid) + workflow.setName(s"wf_${UUID.randomUUID().toString.substring(0, 8)}") + workflow.setContent("{}") + workflow.setDescription("") + workflow.setCreationTime(new Timestamp(System.currentTimeMillis())) + workflow.setLastModifiedTime(new Timestamp(System.currentTimeMillis())) + workflowDao.insert(workflow) + + // Seed one version explicitly so insertNewExecution's getLatestVersion + // takes the happy "max(existing)" branch instead of falling into the + // back-compat "insert one for you" branch (which would drag in + // WorkflowVersionResource.insertNewVersion's diff/aggregate logic). + val version = new WorkflowVersion + version.setWid(testWid) + version.setContent("{}") + version.setCreationTime(new Timestamp(System.currentTimeMillis())) + workflowVersionDao.insert(version) + seededVid = version.getVid + + // workflow_executions.cuid has a FK to workflow_computing_unit; seed one + // so insertNewExecution's setCuid(...) call doesn't trip the constraint. + val cu = new WorkflowComputingUnit + cu.setUid(testUid) + cu.setName("test-cu") + computingUnitDao.insert(cu) + seededCuid = cu.getCuid + } + + override protected def afterEach(): Unit = cleanup() + + private def cleanup(): Unit = { + getDSLContext + .deleteFrom(WORKFLOW_EXECUTIONS) + .where( + WORKFLOW_EXECUTIONS.VID.in( + getDSLContext + .select(WORKFLOW_VERSION.VID) + .from(WORKFLOW_VERSION) + .where(WORKFLOW_VERSION.WID.eq(testWid)) + ) + ) + .execute() + getDSLContext + .deleteFrom(WORKFLOW_VERSION) + .where(WORKFLOW_VERSION.WID.eq(testWid)) + .execute() + getDSLContext.deleteFrom(WORKFLOW).where(WORKFLOW.WID.eq(testWid)).execute() + getDSLContext + .deleteFrom(WORKFLOW_COMPUTING_UNIT) + .where(WORKFLOW_COMPUTING_UNIT.UID.eq(testUid)) + .execute() + getDSLContext.deleteFrom(USER).where(USER.UID.eq(testUid)).execute() + } + + // Helper: insert an execution row tied to the seeded version and return its eid. + private def seedExecution(status: Byte = 0): Integer = { + val exec = new WorkflowExecutions + exec.setVid(seededVid) + exec.setUid(testUid) + exec.setStatus(status) + exec.setResult("") + exec.setStartingTime(new Timestamp(System.currentTimeMillis())) + exec.setBookmarked(false) + exec.setName("seeded execution") + exec.setEnvironmentVersion("test-env") + workflowExecutionsDao.insert(exec) + exec.getEid + } + + // -- insertNewExecution ----------------------------------------------------- + + "insertNewExecution" should "insert a row tied to the latest workflow version" in { + val id = ExecutionsMetadataPersistService.insertNewExecution( + WorkflowIdentity(testWid.toLong), + Some(testUid), + executionName = "named-execution", + environmentVersion = "env-1", + computingUnitId = seededCuid + ) + id should not be ExecutionIdentity(0L) + + val stored = workflowExecutionsDao.fetchOneByEid(id.id.toInt) + stored should not be null + stored.getVid shouldBe seededVid + stored.getUid shouldBe testUid + stored.getName shouldBe "named-execution" + stored.getEnvironmentVersion shouldBe "env-1" + stored.getCuid shouldBe seededCuid + } + + it should "skip setName when executionName is the empty string" in { + val id = ExecutionsMetadataPersistService.insertNewExecution( + WorkflowIdentity(testWid.toLong), + Some(testUid), + executionName = "", + environmentVersion = "env-2", + computingUnitId = seededCuid + ) + val stored = workflowExecutionsDao.fetchOneByEid(id.id.toInt) + // The DDL default for workflow_executions.name is 'Untitled Execution' + // (sql/texera_ddl.sql). The production code path explicitly does not + // call setName for an empty string, so the row should fall back to the + // DDL default rather than persisting "". + stored.getName shouldBe "Untitled Execution" + } + + it should "throw a DB constraint violation when uid is None" in { + // The method signature accepts Option[Integer] for uid and calls + // `newExecution.setUid(uid.orNull)`, but workflow_executions.uid is + // NOT NULL per texera_ddl.sql, so passing None propagates a jOOQ + // DataAccessException. Pinning the current behavior so a future fix — + // either tightening the signature to a required Integer or making the + // column nullable — breaks the spec deliberately. See follow-up bug. + val ex = intercept[org.jooq.exception.DataAccessException] { + ExecutionsMetadataPersistService.insertNewExecution( + WorkflowIdentity(testWid.toLong), + None, + executionName = "anonymous", + environmentVersion = "env-3", + computingUnitId = seededCuid + ) + } + ex.getMessage should include("uid") + } + + // -- tryGetExistingExecution ------------------------------------------------ + + "tryGetExistingExecution" should "return Some(row) for a known eid" in { + val eid = seedExecution() + val fetched = ExecutionsMetadataPersistService.tryGetExistingExecution( + ExecutionIdentity(eid.longValue()) + ) + fetched shouldBe defined + fetched.get.getEid shouldBe eid + } + + it should "return None for an unknown eid" in { + val fetched = ExecutionsMetadataPersistService.tryGetExistingExecution( + ExecutionIdentity(-1L) + ) + fetched shouldBe None + } + + // -- tryUpdateExistingExecution --------------------------------------------- + + "tryUpdateExistingExecution" should "apply the update function to the stored row" in { + val eid = seedExecution(status = 0) + ExecutionsMetadataPersistService.tryUpdateExistingExecution( + ExecutionIdentity(eid.longValue()) + ) { exec => + exec.setStatus(2.toByte) // PAUSED + exec.setName("renamed via update") + } + val after = workflowExecutionsDao.fetchOneByEid(eid) + after.getStatus shouldBe 2.toByte + after.getName shouldBe "renamed via update" + } + + it should "swallow the update error for an unknown eid and leave existing rows untouched" in { + // Pin the silent-failure contract: fetchOneByEid returns null, the + // update closure NPEs on it, the catch logs "Unable to update execution" + // and continues. The seeded row stays untouched. + val eid = seedExecution(status = 1) + noException should be thrownBy + ExecutionsMetadataPersistService.tryUpdateExistingExecution( + ExecutionIdentity(-1L) + ) { exec => exec.setStatus(9.toByte) } + val after = workflowExecutionsDao.fetchOneByEid(eid) + after.getStatus shouldBe 1.toByte + } + + // -- ExecutionStateStore.updateWorkflowState -------------------------------- + // + // updateWorkflowState wraps tryUpdateExistingExecution to also bump the + // in-memory ExecutionMetadataStore's `state`. Exercising it here keeps the + // DB-backed setup (workflow/version/execution rows) in one place; the + // pure-logic ExecutionStateStoreSpec sibling has no DB dependency. + + "ExecutionStateStore.updateWorkflowState" should "set status via maptoStatusCode and return the metadata store with new state" in { + val eid = seedExecution(status = 0) + val before = workflowExecutionsDao.fetchOneByEid(eid) + val beforeTs = before.getLastUpdateTime + val store = ExecutionMetadataStore( + state = WorkflowAggregatedState.UNINITIALIZED, + executionId = ExecutionIdentity(eid.longValue()) + ) + val updated = + ExecutionStateStore.updateWorkflowState(WorkflowAggregatedState.COMPLETED, store) + updated.state shouldBe WorkflowAggregatedState.COMPLETED + + val after = workflowExecutionsDao.fetchOneByEid(eid) + after.getStatus shouldBe maptoStatusCode(WorkflowAggregatedState.COMPLETED) + // lastUpdateTime is set unconditionally to System.currentTimeMillis(). + // Asserting it advanced past the seeded null/older value catches a + // regression that drops the setLastUpdateTime call. + Option(beforeTs) match { + case Some(t) => after.getLastUpdateTime.getTime should be >= t.getTime + case None => after.getLastUpdateTime should not be null + } + } + + it should "still return a metadataStore with the new state when the eid is unknown" in { + // updateWorkflowState first calls tryUpdateExistingExecution (which + // silently swallows the unknown-eid error) and then unconditionally + // returns metadataStore.withState(state). Document this so a future + // refactor that makes the failure surface (e.g. via Try / Either) has + // a spec to migrate. + val store = ExecutionMetadataStore( + state = WorkflowAggregatedState.UNINITIALIZED, + executionId = ExecutionIdentity(-1L) + ) + val updated = + ExecutionStateStore.updateWorkflowState(WorkflowAggregatedState.FAILED, store) + updated.state shouldBe WorkflowAggregatedState.FAILED + } +} From 4f385f9f816e70fc51364537fe9f7baafd1e73c3 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Tue, 26 May 2026 00:52:47 -0700 Subject: [PATCH 2/2] feat(ci): welcome first-time contributors with comment commands guide --- .../welcome-first-time-contributor.yml | 157 ++++++++++++++++++ CONTRIBUTING.md | 40 +++++ .../ExecutionsMetadataPersistService.scala | 2 +- 3 files changed, 198 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/welcome-first-time-contributor.yml diff --git a/.github/workflows/welcome-first-time-contributor.yml b/.github/workflows/welcome-first-time-contributor.yml new file mode 100644 index 00000000000..df67bebce0c --- /dev/null +++ b/.github/workflows/welcome-first-time-contributor.yml @@ -0,0 +1,157 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Welcome first-time contributors when they open an issue or pull +# request, pointing them at the comment-driven commands defined in +# `comment-commands.yml` (/take, /request-review, /sub-issue, etc.). +# +# Detection uses the search API rather than `author_association`: +# `author_association` is FIRST_TIME_CONTRIBUTOR only on the first +# *commit/PR*, so it misses someone opening their first issue (they +# show up as NONE alongside any non-member who has commented before). +# Searching `repo: is:issue author:` with `total_count +# <= 1` cleanly covers both issues and PRs, tolerating the brief +# indexing delay where the just-opened item may not be in results yet. +# +# Uses `pull_request_target` so PRs from forks still get a welcome +# comment — `pull_request` from forks runs with a read-only token. +name: Welcome first-time contributor +on: + issues: + types: [opened] + pull_request_target: + types: [opened] + +permissions: + issues: write + pull-requests: write + +jobs: + welcome: + if: github.event.sender.type != 'Bot' + runs-on: ubuntu-latest + steps: + - uses: actions/github-script@v8 + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + script: | + const isPR = context.eventName === 'pull_request_target'; + const subject = isPR + ? context.payload.pull_request + : context.payload.issue; + const author = subject.user.login; + const issue_number = subject.number; + const { owner, repo } = context.repo; + + // Hidden marker for idempotency: if a previous run already + // welcomed this issue/PR, the marker will be in an existing + // comment and we skip. Lets us survive workflow re-runs, + // reopen races, and future manual triggers. + const MARKER = ''; + try { + const existing = await github.paginate( + github.rest.issues.listComments, + { owner, repo, issue_number, per_page: 100 }, + ); + if (existing.some((c) => (c.body || '').includes(MARKER))) { + core.info(`Already welcomed on #${issue_number}; skipping.`); + return; + } + } catch (e) { + core.warning( + `listComments on #${issue_number} failed: ${e.message}`, + ); + // Fall through — better to risk a duplicate welcome than + // skip a genuine first-timer over a transient API error. + } + + // Count prior items of the same kind by this author. The + // just-opened item may or may not be indexed yet, so we + // treat <=1 as "first time" (covers both 0 — not yet + // indexed — and 1 — only the new item). + const q = `repo:${owner}/${repo} is:${isPR ? 'pr' : 'issue'} author:${author}`; + let total = 0; + try { + const { data } = await github.rest.search.issuesAndPullRequests({ + q, per_page: 1, + }); + total = data.total_count; + } catch (e) { + core.warning( + `Search for prior items by ${author} failed: ${e.message}`, + ); + return; + } + core.info( + `Author ${author} has ${total} ${isPR ? 'PR' : 'issue'}(s) ` + + `in ${owner}/${repo} (including this one if indexed).`, + ); + if (total > 1) { + core.info(`${author} is not a first-time contributor; skipping.`); + return; + } + + const body = [ + MARKER, + `👋 Thanks for your first contribution to Texera, @${author}!`, + ``, + `You can drive common housekeeping tasks just by leaving a comment. Type the command on its own line.`, + ``, + `### On issues`, + ``, + `| Command | What it does |`, + `|---|---|`, + `| \`/take\` | Assign the issue to yourself (self-claim it) |`, + `| \`/untake\` | Remove yourself as assignee |`, + ``, + `To find unclaimed work, search \`is:issue is:open no:assignee\` — there's no "triage" label; the search filter *is* the triage state.`, + ``, + `### Linking sub-issues`, + ``, + `| Command | Where to run it | What it does |`, + `|---|---|---|`, + `| \`/sub-issue #12 #13\` | On the **parent** | Links #12 and #13 as children of this issue |`, + `| \`/unsub-issue #12 #13\` | On the **parent** | Unlinks those children |`, + `| \`/parent-issue #5\` | On the **child** | Sets #5 as this issue's parent |`, + `| \`/unparent-issue\` | On the **child** | Removes this issue's parent (auto-detected) |`, + `| \`/unparent-issue #5\` | On the **child** | Removes parent #5 explicitly |`, + ``, + `You can write references as \`#12\` or bare \`12\`. Cross-repo references like \`owner/repo#12\` aren't supported and are ignored.`, + ``, + `### On pull requests (author only)`, + ``, + `| Command | What it does |`, + `|---|---|`, + `| \`/request-review @user [@user ...]\` | Request reviews from those users |`, + `| \`/unrequest-review @user [@user ...]\` | Cancel those review requests |`, + ``, + `You can mention teams as \`@org/team\`, and \`@copilot\` works too. Only the PR **author** can use these commands.`, + ``, + `> **Note:** Commands must match exactly — \`/take this\` won't work, only \`/take\`. Bots are ignored, and you can't self-link an issue or set an issue as its own parent.`, + ``, + `For the full contribution flow, see [CONTRIBUTING.md](https://github.com/${owner}/${repo}/blob/main/CONTRIBUTING.md).`, + ].join('\n'); + + try { + await github.rest.issues.createComment({ + owner, repo, issue_number, body, + }); + core.info(`Posted welcome comment on #${issue_number}`); + } catch (e) { + core.warning( + `Failed to post welcome on #${issue_number}: ${e.message}`, + ); + } diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index f31b0052e09..8abbf4fc8f7 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -96,6 +96,46 @@ yarn format:fix --- +## 👋 Comment commands + +You can drive common housekeeping tasks just by leaving a comment on an issue or pull request. Type the command on its own line. + +### On issues + +| Command | What it does | +|---|---| +| `/take` | Assign the issue to yourself (self-claim it) | +| `/untake` | Remove yourself as assignee | + +To find unclaimed work, search `is:issue is:open no:assignee` — there's no "triage" label; the search filter *is* the triage state. + +### Linking sub-issues + +You can link from either end of the parent/child relationship: + +| Command | Where to run it | What it does | +|---|---|---| +| `/sub-issue #12 #13` | On the **parent** | Links #12 and #13 as children of this issue | +| `/unsub-issue #12 #13` | On the **parent** | Unlinks those children | +| `/parent-issue #5` | On the **child** | Sets #5 as this issue's parent | +| `/unparent-issue` | On the **child** | Removes this issue's parent (auto-detected) | +| `/unparent-issue #5` | On the **child** | Removes parent #5 explicitly | + +You can write references as `#12` or bare `12`. Cross-repo references like `owner/repo#12` aren't supported and are ignored. + +### On pull requests (author only) + +| Command | What it does | +|---|---| +| `/request-review @user [@user ...]` | Request reviews from those users | +| `/unrequest-review @user [@user ...]` | Cancel those review requests | + +You can mention teams as `@org/team`, and `@copilot` works too. Only the PR **author** can use these commands. + +> **Note:** Commands must match exactly — `/take this` won't work, only `/take`. Bots are ignored, and you can't self-link an issue or set an issue as its own parent. + +--- + ## 📝 Apache License Header All new files must include the Apache License header. diff --git a/amber/src/main/scala/org/apache/texera/web/service/ExecutionsMetadataPersistService.scala b/amber/src/main/scala/org/apache/texera/web/service/ExecutionsMetadataPersistService.scala index b9b29dff72f..833c4433328 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/ExecutionsMetadataPersistService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/ExecutionsMetadataPersistService.scala @@ -77,7 +77,7 @@ object ExecutionsMetadataPersistService extends LazyLogging { def tryGetExistingExecution(executionId: ExecutionIdentity): Option[WorkflowExecutions] = { try { - Option(workflowExecutionsDao.fetchOneByEid(executionId.id.toInt)) + Some(workflowExecutionsDao.fetchOneByEid(executionId.id.toInt)) } catch { case t: Throwable => logger.info("Unable to get execution. Error = " + t.getMessage)