From 29e73242cba9797ed24127b24bb0380c69a608d3 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 28 Mar 2018 18:38:57 +0100 Subject: [PATCH 1/9] SPARK-23807 Add Hadoop 3 profile with relevant POM fix ups, cloud-storage artifacts and binding Change-Id: Ia4526f184ced9eef5b67aee9e91eced0dd38d723 --- core/pom.xml | 6 + hadoop-cloud/pom.xml | 355 ++++++++++++++---- .../cloud/BindingParquetOutputCommitter.scala | 122 ++++++ .../io/cloud/PathOutputCommitProtocol.scala | 260 +++++++++++++ .../spark/internal/io/cloud/package.scala | 105 ++++++ .../io/cloud/CommitterBindingSuite.scala | 86 +++++ .../io/cloud/StubPathOutputCommitter.scala | 110 ++++++ .../io/cloud/PathCommitterConstants.scala | 87 +++++ hadoop-cloud/src/test/scala/.keep | 0 pom.xml | 9 + 10 files changed, 1072 insertions(+), 68 deletions(-) create mode 100644 hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala create mode 100644 hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala create mode 100644 hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/package.scala create mode 100644 hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala create mode 100644 hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala create mode 100644 hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathCommitterConstants.scala create mode 100644 hadoop-cloud/src/test/scala/.keep diff --git a/core/pom.xml b/core/pom.xml index 9258a856028a0..093a9869b6dd7 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -95,6 +95,12 @@ org.apache.curator curator-recipes + + + org.apache.zookeeper + zookeeper + diff --git a/hadoop-cloud/pom.xml b/hadoop-cloud/pom.xml index 8e424b1c50236..cc520aed6e17c 100644 --- a/hadoop-cloud/pom.xml +++ b/hadoop-cloud/pom.xml @@ -38,81 +38,33 @@ hadoop-cloud + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + - + - org.apache.hadoop - hadoop-aws - ${hadoop.version} - ${hadoop.deps.scope} - - - org.apache.hadoop - hadoop-common - - - commons-logging - commons-logging - - - org.codehaus.jackson - jackson-mapper-asl - - - org.codehaus.jackson - jackson-core-asl - - - com.fasterxml.jackson.core - jackson-core - - - com.fasterxml.jackson.core - jackson-databind - - - com.fasterxml.jackson.core - jackson-annotations - - + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + test-jar + test org.apache.hadoop - hadoop-openstack + hadoop-client ${hadoop.version} - ${hadoop.deps.scope} - - - org.apache.hadoop - hadoop-common - - - commons-logging - commons-logging - - - junit - junit - - - org.mockito - mockito-all - - + provided - - - joda-time - joda-time - ${hadoop.deps.scope} - com.fasterxml.jackson.core @@ -141,13 +93,98 @@ httpcore ${hadoop.deps.scope} + + + + hadoop-2.6 + + true + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + ${hadoop.deps.scope} + + + org.apache.hadoop + hadoop-common + + + commons-logging + commons-logging + + + org.codehaus.jackson + jackson-mapper-asl + + + org.codehaus.jackson + jackson-core-asl + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-annotations + + + + + org.apache.hadoop + hadoop-openstack + ${hadoop.version} + ${hadoop.deps.scope} + + + org.apache.hadoop + hadoop-common + + + commons-logging + commons-logging + + + junit + junit + + + org.mockito + mockito-all + + + + + + + joda-time + joda-time + ${hadoop.deps.scope} + + + + hadoop-2.7 - + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + ${hadoop.deps.scope} + + + org.apache.hadoop + hadoop-common + + + commons-logging + commons-logging + + + org.codehaus.jackson + jackson-mapper-asl + + + org.codehaus.jackson + jackson-core-asl + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-annotations + + + + + org.apache.hadoop + hadoop-openstack + ${hadoop.version} + ${hadoop.deps.scope} + + + org.apache.hadoop + hadoop-common + + + commons-logging + commons-logging + + + junit + junit + + + org.mockito + mockito-all + + + + + + + joda-time + joda-time + ${hadoop.deps.scope} + + + + com.fasterxml.jackson.core + jackson-databind + ${hadoop.deps.scope} + + + com.fasterxml.jackson.core + jackson-annotations + ${hadoop.deps.scope} + + + com.fasterxml.jackson.dataformat + jackson-dataformat-cbor + ${fasterxml.jackson.version} + + + + org.apache.httpcomponents + httpclient + ${hadoop.deps.scope} + + + + org.apache.httpcomponents + httpcore + ${hadoop.deps.scope} + + + + + + + hadoop-3 + + src/hadoop-3/main/scala + src/hadoop-3/test/scala + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-scala-sources + generate-sources + + add-source + + + + ${extra.source.dir} + + + + + add-scala-test-sources + generate-test-sources + + add-test-source + + + + ${extra.testsource.dir} + + + + + + + + + + + + + org.apache.hadoop + hadoop-cloud-storage + ${hadoop.version} + ${hadoop.deps.scope} + + + org.apache.hadoop + hadoop-common + + + org.codehaus.jackson + jackson-mapper-asl + + + com.fasterxml.jackson.core + jackson-core + + + com.google.guava + guava + + + diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala new file mode 100644 index 0000000000000..f2a2d208291fc --- /dev/null +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, PathOutputCommitter} +import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext} +import org.apache.parquet.hadoop.ParquetOutputCommitter + +import org.apache.spark.internal.Logging + + +/** + * This dynamically binds to the factory-configured + * output committer, and is intended to allow callers to use any [[PathOutputCommitter]], + * even if not a subclass of [[ParquetOutputCommitter]]. + * + * The Parquet "parquet.enable.summary-metadata" option will only be supported + * if the instantiated committer itself supports it. + */ + +class BindingParquetOutputCommitter( + path: Path, + context: TaskAttemptContext) + extends ParquetOutputCommitter(path, context) with Logging { + + logInfo(s"${this.getClass.getName} binding to configured PathOutputCommitter and dest $path") + + val committer = new BindingPathOutputCommitter(path, context) + + /** + * This is the committer ultimately bound to. + * @return the committer instantiated by the factory. + */ + def boundCommitter(): PathOutputCommitter = { + committer.getCommitter() + } + + override def getWorkPath: Path = { + committer.getWorkPath() + } + + override def setupTask(taskAttemptContext: TaskAttemptContext): Unit = { + committer.setupTask(taskAttemptContext) + } + + override def commitTask(taskAttemptContext: TaskAttemptContext): Unit = { + committer.commitTask(taskAttemptContext) + } + + override def abortTask(taskAttemptContext: TaskAttemptContext): Unit = { + committer.abortTask(taskAttemptContext) + } + + override def setupJob(jobContext: JobContext): Unit = { + committer.setupJob(jobContext) + } + + override def needsTaskCommit(taskAttemptContext: TaskAttemptContext): Boolean = { + committer.needsTaskCommit(taskAttemptContext) + } + + override def cleanupJob(jobContext: JobContext): Unit = { + committer.cleanupJob(jobContext) + } + + override def isCommitJobRepeatable(jobContext: JobContext): Boolean = { + committer.isCommitJobRepeatable(jobContext) + } + + override def commitJob(jobContext: JobContext): Unit = { + committer.commitJob(jobContext) + } + + override def recoverTask(taskAttemptContext: TaskAttemptContext): Unit = { + committer.recoverTask(taskAttemptContext) + } + + /** + * Abort the job; log and ignore any IO exception thrown. + * + * @param jobContext job context + * @param state final state of the job + */ + override def abortJob( + jobContext: JobContext, + state: JobStatus.State): Unit = { + try { + committer.abortJob(jobContext, state) + } catch { + case e: IOException => + logWarning("Abort job failed", e) + } + } + + override def isRecoverySupported: Boolean = { + committer.isRecoverySupported() + } + + override def isRecoverySupported(jobContext: JobContext): Boolean = { + committer.isRecoverySupported(jobContext) + } + + override def toString: String = s"BindingParquetOutputCommitter($committer)" +} diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala new file mode 100644 index 0000000000000..5645ad53c1bb2 --- /dev/null +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} + +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage + +/** + * Spark Commit protocol for Path Output Committers. + * This committer will work with the `FileOutputCommitter` and subclasses. + * All implementations *must* be serializable. + * + * Rather than ask the `FileOutputFormat` for a committer, it uses the + * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` factory + * API to create the committer. + * This is what [[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does, + * but as [[HadoopMapReduceCommitProtocol]] still uses the original + * `org.apache.hadoop.mapred.FileOutputFormat` binding + * subclasses do not do this, overrides those subclasses to using the + * factory mechanism now supported in the base class. + * + * In `setupCommitter` the factory is bonded to and the committer for + * the destination path chosen. + * + * @constructor Instantiate. dynamic partition overwrite is not supported, + * so that committers for stores which do not support rename + * will not get confused. + * @param jobId job + * @param destination destination + * @param dynamicPartitionOverwrite does the caller want support for dynamic + * partition overwrite. If so, it will be + * refused. + * @throws IOException when an unsupported dynamicPartitionOverwrite option is supplied. + */ +class PathOutputCommitProtocol( + jobId: String, + destination: String, + dynamicPartitionOverwrite: Boolean = false) + extends HadoopMapReduceCommitProtocol( + jobId, + destination, + false) with Serializable { + + @transient var committer: PathOutputCommitter = _ + + require(destination != null, "Null destination specified") + + val destPath = new Path(destination) + + logInfo(s"Instantiated committer with job ID=$jobId;" + + s" destination=$destPath;" + + s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite") + + if (dynamicPartitionOverwrite) { + // until there's explicit extensions to the PathOutputCommitProtocols + // to support the spark mechanism, it's left to the individual committer + // choice to handle partitioning. + throw new IOException("PathOutputCommitProtocol does not support dynamicPartitionOverwrite") + } + + import PathOutputCommitProtocol._ + + /** + * Set up the committer. + * This creates it by talking directly to the Hadoop factories, instead + * of the V1 `mapred.FileOutputFormat` methods. + * @param context task attempt + * @return the committer to use. This will always be a subclass of + * [[PathOutputCommitter]]. + */ + override protected def setupCommitter( + context: TaskAttemptContext): PathOutputCommitter = { + + logInfo(s"Setting up committer for path $destination") + committer = PathOutputCommitterFactory.createCommitter(destPath, context) + + // Special feature to force out the FileOutputCommitter, so as to guarantee + // that the binding is working properly. + val rejectFileOutput = context.getConfiguration + .getBoolean(REJECT_FILE_OUTPUT, REJECT_FILE_OUTPUT_DEFVAL) + if (rejectFileOutput && committer.isInstanceOf[FileOutputCommitter]) { + // the output format returned a file output format committer, which + // is exactly what we do not want. So switch back to the factory. + val factory = PathOutputCommitterFactory.getCommitterFactory( + destPath, + context.getConfiguration) + logInfo(s"Using committer factory $factory") + committer = factory.createOutputCommitter(destPath, context) + } + + logInfo(s"Using committer ${committer.getClass}") + logInfo(s"Committer details: $committer") + if (committer.isInstanceOf[FileOutputCommitter]) { + require(!rejectFileOutput, + s"Committer created is the FileOutputCommitter $committer") + + if (committer.isCommitJobRepeatable(context)) { + // If FileOutputCommitter says its job commit is repeatable, it means + // it is using the v2 algorithm, which is not safe for task commit + // failures. Warn + logWarning(s"Committer $committer may not be tolerant of task commit failures") + } + } + committer + } + + /** + * Create a temporary file for a task. + * + * @param taskContext task context + * @param dir optional subdirectory + * @param ext file extension + * @return a path as a string + */ + override def newTaskTempFile( + taskContext: TaskAttemptContext, + dir: Option[String], + ext: String): String = { + + val workDir = committer.getWorkPath + val parent = dir.map(d => new Path(workDir, d)).getOrElse(workDir) + val file = new Path(parent, buildFilename(taskContext, ext)) + logInfo(s"Creating task file $file for dir $dir and ext $ext") + file.toString + } + + /** + * Absolute files are still renamed into place with a warning. + * + * @param taskContext task + * @param absoluteDir destination dir + * @param ext extension + * @return an absolute path + */ + override def newTaskTempFileAbsPath( + taskContext: TaskAttemptContext, + absoluteDir: String, + ext: String): String = { + + val file = super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext) + logWarning( + s"Creating temporary file $file for absolute path for dir $absoluteDir") + file + } + + /** + * Build a filename which is unique across all task events. + * It does not have to be consistent across multiple attempts of the same + * task or job. + * + * @param taskContext task context + * @param ext extension + * @return a name for a file which must be unique across all task attempts + */ + protected def buildFilename( + taskContext: TaskAttemptContext, + ext: String): String = { + + // The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet + // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, + // the file name is fine and won't overflow. + val split = taskContext.getTaskAttemptID.getTaskID.getId + f"part-$split%05d-$jobId$ext" + } + + override def setupJob(jobContext: JobContext): Unit = { + logInfo("setup job") + super.setupJob(jobContext) + } + + override def commitJob( + jobContext: JobContext, + taskCommits: Seq[FileCommitProtocol.TaskCommitMessage]): Unit = { + logInfo(s"commit job with ${taskCommits.length} task commit message(s)") + super.commitJob(jobContext, taskCommits) + } + + /** + * Abort the job; log and ignore any IO exception thrown. + * + * @param jobContext job context + */ + override def abortJob(jobContext: JobContext): Unit = { + try { + super.abortJob(jobContext) + } catch { + case e: IOException => + logWarning("Abort job failed", e) + } + } + + override def setupTask(taskContext: TaskAttemptContext): Unit = { + super.setupTask(taskContext) + } + + override def commitTask( + taskContext: TaskAttemptContext): FileCommitProtocol.TaskCommitMessage = { + logInfo("Commit task") + super.commitTask(taskContext) + } + + /** + * Abort the task; log and ignore any failure thrown. + * + * @param taskContext context + */ + override def abortTask(taskContext: TaskAttemptContext): Unit = { + logInfo("Abort task") + try { + super.abortTask(taskContext) + } catch { + case e: IOException => + logWarning("Abort task failed", e) + } + } + + override def onTaskCommit(msg: TaskCommitMessage): Unit = { + logInfo(s"onTaskCommit($msg)") + } +} + +object PathOutputCommitProtocol { + + /** + * Hadoop configuration option. + * Fail fast if the committer is using the path output protocol. + * This option can be used to catch configuration issues early. + * + * It's mostly relevant when testing/diagnostics, as it can be used to + * enforce that schema-specific options are triggering a switch + * to a new committer. + */ + val REJECT_FILE_OUTPUT = "pathoutputcommit.reject.fileoutput" + + /** + * Default behavior: accept the file output. + */ + val REJECT_FILE_OUTPUT_DEFVAL = false +} diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/package.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/package.scala new file mode 100644 index 0000000000000..d2a0cd28f95c1 --- /dev/null +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/package.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import org.apache.spark.SparkConf +import org.apache.spark.sql.internal.SQLConf + +/** + * Package object to assist in switching to the Hadoop Hadoop 3 + * [[org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory]] factory + * mechanism for dynamically loading committers for the destination stores. + * + * = Using Alternative Committers with Spark and Hadoop 3 = + * + * Hadoop 3.1 adds a means to select a different output committer when writing + * data to object stores. This can provide higher performance as well as + * addressing the consistency and atomicity problems encountered on some filesystems. + * + * Every object store can implement its own committer factory: the factory + * itself will then instantiated the committer of its choice. + * + * == Prerequisites == + * + * Apache Hadoop 3.0.2 or later for the factory APIs, for the S3A connectors, Hadoop 3.1+ + * + * The Hadoop cluster needs to be configured for the binding from filesystem scheme + * to factory. In Hadoop 3.1 this is done automatically for s3a in the file + * `mapred-default.xml`. + * Other stores' committers may need to be explicitly declared. + * + * {{{ + * + * mapreduce.outputcommitter.factory.scheme.s3a + * org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory + * + * The committer factory to use when writing data to S3A filesystems. + * If mapreduce.outputcommitter.factory.class is set, it will + * override this property. + * + * + * }}} + * + * == Binding a Spark Context to use the new committers for a store == + * + * Spark uses the Hadoop committers in + * [[org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand]] + * by instantiating and then invoking an instance of + * [[org.apache.spark.internal.io.HadoopMapReduceCommitProtocol]]. + * `InsertIntoHadoopFsRelationCommand` needs to be configured to use + * [[org.apache.spark.internal.io.cloud.PathOutputCommitProtocol]] as + * the commit protocol to use. This instantiates the committer through + * the factory mechanism, and relays operations to it. + * + * When working with Parquet data, you need to explicitly switch + * the Parquet committers to use the same mechanism + * + * In `spark-defaults.conf`, everything can be set up with the following settings: + * {{{ + * spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter + * spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol + * }}} + * + * It can be done programmatically by calling [[cloud.bind()]] on the + * spark configuration. + */ +package object cloud { + + /** + * Options for committer setup. + * When applied to a spark configuration, this will set the + * Dataframe output to use the factory mechanism for writing data for + * all file formats. + */ + val COMMITTER_BINDING_OPTIONS: Map[String, String] = Map( + SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> + classOf[BindingParquetOutputCommitter].getName, + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[PathOutputCommitProtocol].getName) + + /** + * Set the options defined in [[cloud.COMMITTER_BINDING_OPTIONS]] on the + * spark context. + * + * @param sparkConf spark configuration to bind. + */ + def bind(sparkConf: SparkConf): Unit = { + sparkConf.setAll(COMMITTER_BINDING_OPTIONS) + } + +} diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala new file mode 100644 index 0000000000000..8654e9df3f06c --- /dev/null +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptID} + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.io.cloud +import org.apache.spark.internal.io.cloud.PathCommitterConstants._ + +/** + * Test committer binding logic. + */ +class CommitterBindingSuite extends SparkFunSuite { + + + private val jobId = "2007071202143_0101" + private val attempt0 = "attempt_" + jobId + "_m_000000_0" + private val taskAttempt0 = TaskAttemptID.forName(attempt0) + + /** + * Does the + * [[BindingParquetOutputCommitter]] committer bind to the schema-specific + * committer declared for the destination path? + */ + test("BindingParquetOutputCommitter will bind") { + val path = new Path("http://example/data") + val job = newJob(path) + val conf = job.getConfiguration + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0) + conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1) + + StubPathOutputCommitterFactory.bind(conf, "http") + val tContext = new TaskAttemptContextImpl(conf, taskAttempt0) + val parquet = new BindingParquetOutputCommitter(path, tContext) + val inner = parquet.boundCommitter().asInstanceOf[StubPathOutputCommitter] + parquet.setupJob(tContext) + assert(inner.setup, s"$inner not setup") + parquet.commitJob(tContext) + assert(inner.committed, s"$inner not committed") + parquet.abortJob(tContext, JobStatus.State.RUNNING) + assert(inner.aborted, s"$inner not aborted") + } + + test("cloud binding") { + val sc = new SparkConf() + cloud.bind(sc) + } + + /** + * Create a a new job. Sets the task attempt ID. + * + * @return the new job + * @throws IOException failure + */ + @throws[IOException] + def newJob(outDir: Path): Job = { + val job = Job.getInstance(new Configuration()) + val conf = job.getConfiguration + conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0) + conf.setBoolean(CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true) + FileOutputFormat.setOutputPath(job, outDir) + job + } +} diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala new file mode 100644 index 0000000000000..0a67c71b58c6c --- /dev/null +++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.lib.output.{PathOutputCommitter, PathOutputCommitterFactory} +import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext} + +/** + * A local path output committer which tracks its state, for use in + * tests. + * @param outputPath final destination. + * @param workPath work path + * @param context task/job attempt. + */ +class StubPathOutputCommitter( + outputPath: Path, + workPath: Path, + context: TaskAttemptContext) extends PathOutputCommitter(workPath, context) { + + var setup: Boolean = false + var committed: Boolean = false + var aborted: Boolean = false + + override def getOutputPath: Path = outputPath + + override def getWorkPath: Path = { + workPath + } + + override def setupTask(taskAttemptContext: TaskAttemptContext): Unit = { + setup = true + } + + override def abortTask(taskAttemptContext: TaskAttemptContext): Unit = { + aborted = true + } + + override def setupJob(jobContext: JobContext): Unit = { + setup = true + } + + override def commitTask(taskAttemptContext: TaskAttemptContext): Unit = { + committed = true + } + + override def commitJob(jobContext: JobContext): Unit = { + committed = true + } + + override def abortJob( + jobContext: JobContext, + state: JobStatus.State): Unit = { + aborted = true + } + + override def needsTaskCommit(taskAttemptContext: TaskAttemptContext): Boolean = { + true + } + + override def toString(): String = s"StubPathOutputCommitter(setup=$setup," + + s" committed=$committed, aborted=$aborted)" +} + +/** + * Factory. + */ +class StubPathOutputCommitterFactory extends PathOutputCommitterFactory { + + override def createOutputCommitter( + outputPath: Path, + context: TaskAttemptContext): PathOutputCommitter = { + new StubPathOutputCommitter(outputPath, workPath(outputPath), context) + } + + + private def workPath(out: Path): Path = new Path(out, PathCommitterConstants.TEMP_DIR_NAME) +} + +object StubPathOutputCommitterFactory { + val Name: String = "org.apache.spark.internal.io.cloud.StubPathOutputCommitterFactory" + + /** + * Given a hadoop configuration, set up the factory binding for the scheme. + * @param conf config to patch + * @param scheme filesystem scheme. + */ + def bind(conf: Configuration, scheme: String): Unit = { + val key = String.format( + PathCommitterConstants.OUTPUTCOMMITTER_FACTORY_SCHEME_PATTERN, scheme) + conf.set(key, Name) + } + +} diff --git a/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathCommitterConstants.scala b/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathCommitterConstants.scala new file mode 100644 index 0000000000000..bbf86e8a5fc00 --- /dev/null +++ b/hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathCommitterConstants.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +/** + * Constants related to Hadoop committer setup and configuration. + * Most of these are scattered around the hadoop-mapreduce classes. + */ +object PathCommitterConstants { + + /** + * Scheme prefix for per-filesystem scheme committers. + */ + val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme" + + /** + * String format pattern for per-filesystem scheme committers. + */ + val OUTPUTCOMMITTER_FACTORY_SCHEME_PATTERN: String = + OUTPUTCOMMITTER_FACTORY_SCHEME + ".%s" + + /** + * Name of the configuration option used to configure the + * output committer factory to use unless there is a specific + * one for a schema. + */ + val OUTPUTCOMMITTER_FACTORY_CLASS = "mapreduce.pathoutputcommitter.factory.class" + + /** Default committer factory. */ + val DEFAULT_COMMITTER_FACTORY = + "org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory" + + /** + * The committer which can be directly instantiated and which then delegates + * all operations to the factory-created committer it creates itself. + */ + val BINDING_PATH_OUTPUT_COMMITTER_CLASS = + "org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter" + + /** + * Classname of a parquet committer which just hands off to the + * `BindingPathOutputCommitter` in hadoop-mapreduce, which takes on the + * task of binding to the current factory. + */ + val BINDING_PARQUET_OUTPUT_COMMITTER_CLASS = + "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter" + + /** hadoop-mapreduce option to choose the algorithm. */ + val FILEOUTPUTCOMMITTER_ALGORITHM_VERSION = "mapreduce.fileoutputcommitter.algorithm.version" + + /** The default committer is not actually safe during task commit failures. */ + val FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 2 + + /** Skip cleanup _temporary folders under job's output directory? */ + val FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED = "mapreduce.fileoutputcommitter.cleanup.skipped" + + /** + * This is the "Pending" directory of the FileOutputCommitter; + * data written here is, in that algorithm, renamed into place. + */ + val TEMP_DIR_NAME = "_temporary" + + /** + * Name of the marker file created on success. + * This is a 0-byte file with the FileOutputCommitter; object store committers + * often add a (non-standard) manifest here. + */ + val SUCCESS_FILE_NAME = "_SUCCESS" + + /** hadoop-mapreduce option to enable the _SUCCESS marker. */ + val CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = "mapreduce.fileoutputcommitter.marksuccessfuljobs" +} diff --git a/hadoop-cloud/src/test/scala/.keep b/hadoop-cloud/src/test/scala/.keep new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/pom.xml b/pom.xml index 0a711f287a53f..9a07c8d87a6c1 100644 --- a/pom.xml +++ b/pom.xml @@ -2671,6 +2671,15 @@ + + hadoop-3 + + 3.1.0-SNAPSHOT + 2.12.0 + 3.4.9 + + + yarn From 016d69090691631343d37f9704d0f37a84ddf297 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 29 Mar 2018 16:04:02 +0100 Subject: [PATCH 2/9] SPARK-23807 review set 1: * hadoop branch-2 dependencies always declared * minor nits in POM addressed * added log4j.properties for tests Change-Id: Ibb64b20a0be8624d1709e592b9fe85bdc4dd1af7 --- hadoop-cloud/pom.xml | 265 +++++------------- .../src/test/resources/log4j.properties | 36 +++ 2 files changed, 111 insertions(+), 190 deletions(-) create mode 100644 hadoop-cloud/src/test/resources/log4j.properties diff --git a/hadoop-cloud/pom.xml b/hadoop-cloud/pom.xml index cc520aed6e17c..880759eb57c11 100644 --- a/hadoop-cloud/pom.xml +++ b/hadoop-cloud/pom.xml @@ -64,7 +64,80 @@ ${hadoop.version} provided + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + ${hadoop.deps.scope} + + + org.apache.hadoop + hadoop-common + + + commons-logging + commons-logging + + + org.codehaus.jackson + jackson-mapper-asl + + + org.codehaus.jackson + jackson-core-asl + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-annotations + + + + + org.apache.hadoop + hadoop-openstack + ${hadoop.version} + ${hadoop.deps.scope} + + + org.apache.hadoop + hadoop-common + + + commons-logging + commons-logging + + + junit + junit + + + org.mockito + mockito-all + + + + + + joda-time + joda-time + ${hadoop.deps.scope} + com.fasterxml.jackson.core @@ -93,95 +166,10 @@ httpcore ${hadoop.deps.scope} - - - - hadoop-2.6 - - true - - - - - org.apache.hadoop - hadoop-aws - ${hadoop.version} - ${hadoop.deps.scope} - - - org.apache.hadoop - hadoop-common - - - commons-logging - commons-logging - - - org.codehaus.jackson - jackson-mapper-asl - - - org.codehaus.jackson - jackson-core-asl - - - com.fasterxml.jackson.core - jackson-core - - - com.fasterxml.jackson.core - jackson-databind - - - com.fasterxml.jackson.core - jackson-annotations - - - - - org.apache.hadoop - hadoop-openstack - ${hadoop.version} - ${hadoop.deps.scope} - - - org.apache.hadoop - hadoop-common - - - commons-logging - commons-logging - - - junit - junit - - - org.mockito - mockito-all - - - - - - - joda-time - joda-time - ${hadoop.deps.scope} - - - - hadoop-2.7 @@ -214,108 +202,6 @@ - - - org.apache.hadoop - hadoop-aws - ${hadoop.version} - ${hadoop.deps.scope} - - - org.apache.hadoop - hadoop-common - - - commons-logging - commons-logging - - - org.codehaus.jackson - jackson-mapper-asl - - - org.codehaus.jackson - jackson-core-asl - - - com.fasterxml.jackson.core - jackson-core - - - com.fasterxml.jackson.core - jackson-databind - - - com.fasterxml.jackson.core - jackson-annotations - - - - - org.apache.hadoop - hadoop-openstack - ${hadoop.version} - ${hadoop.deps.scope} - - - org.apache.hadoop - hadoop-common - - - commons-logging - commons-logging - - - junit - junit - - - org.mockito - mockito-all - - - - - - - joda-time - joda-time - ${hadoop.deps.scope} - - - - com.fasterxml.jackson.core - jackson-databind - ${hadoop.deps.scope} - - - com.fasterxml.jackson.core - jackson-annotations - ${hadoop.deps.scope} - - - com.fasterxml.jackson.dataformat - jackson-dataformat-cbor - ${fasterxml.jackson.version} - - - - org.apache.httpcomponents - httpclient - ${hadoop.deps.scope} - - - - org.apache.httpcomponents - httpcore - ${hadoop.deps.scope} - @@ -332,7 +218,8 @@ - + org.codehaus.mojo build-helper-maven-plugin @@ -364,10 +251,8 @@ - - hadoop-3 - - src/hadoop-3/main/scala - src/hadoop-3/test/scala - - - - - - - org.codehaus.mojo - build-helper-maven-plugin - - - add-scala-sources - generate-sources - - add-source - - - - ${extra.source.dir} - - - - - add-scala-test-sources - generate-test-sources - - add-test-source - - - - ${extra.testsource.dir} - - - - - - - - hadoop-3 + hadoop-3.1 + + org.eclipse.jetty + jetty-util + ${hadoop.deps.scope} + diff --git a/hadoop-cloud/pom.xml b/hadoop-cloud/pom.xml index 5dee31fdea4bb..2c39a7df0146e 100644 --- a/hadoop-cloud/pom.xml +++ b/hadoop-cloud/pom.xml @@ -240,6 +240,24 @@ + + + org.eclipse.jetty + jetty-util + ${hadoop.deps.scope} + + + org.eclipse.jetty + jetty-util-ajax + ${jetty.version} + ${hadoop.deps.scope} +