Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.livy.client.common;


import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.PosixFilePermission;
import java.util.Collections;
import java.util.EnumSet;
import static java.nio.file.attribute.PosixFilePermission.*;

import com.sun.javafx.PlatformUtil;

/*
* Utility class to get OS related information and perform OS-dependent actions
*/
public class OperatingSystemUtils {

public static boolean isPosixCompliant() {
return PlatformUtil.isMac() || PlatformUtil.isUnix();
}

public static boolean isWindows() {
return PlatformUtil.isWindows();
}

public static void doBasedOnOs(
Procedure doWhenPosixCompliant,
Procedure doWhenWindows,
String operationDescription) {
doBasedOnOsThrowsException(
() -> doWhenPosixCompliant.execute(),
() -> doWhenWindows.execute(),
operationDescription);
}

public static <ExceptionType extends Exception> void doBasedOnOsThrowsException(
ThrowingProcedure<ExceptionType> doWhenPosixCompliant,
ThrowingProcedure<ExceptionType> doWhenWindows,
String operationDescription) throws ExceptionType {
getBasedOnOS(doWhenPosixCompliant, doWhenWindows, operationDescription).execute();
}

public static <T> T getBasedOnOS(
T getWhenPosixCompliant,
T getWhenWindows,
String operationDescription) {
if (isPosixCompliant()) {
return getWhenPosixCompliant;
} else if (isWindows()) {
return getWhenWindows;
} else {
String seperator = operationDescription.isEmpty() ? "" : ": ";
throw new UnsupportedOperationException("Operation" + seperator + operationDescription +
" is not supported on this OS");
}
}

public static void setOSAgnosticFilePermissions(File file,
EnumSet<PosixFilePermission> permissions)
throws IOException {
OperatingSystemUtils.doBasedOnOsThrowsException(
() -> Files.setPosixFilePermissions(file.toPath(), permissions),
//whether or not a file is read-only is the only applicable permission on Windows
() -> file.setWritable(isWriteable(permissions), permissions.contains(OWNER_WRITE)),
"Setting file permissions"
);
}

private static boolean isWriteable(EnumSet<PosixFilePermission> permissions) {
return !Collections.disjoint(permissions, EnumSet.of(OWNER_WRITE,
GROUP_WRITE,
OTHERS_WRITE));
}

@FunctionalInterface
public interface Procedure {
void execute();
}

@FunctionalInterface
public interface ThrowingProcedure<ExceptionType extends Exception> {
void execute() throws ExceptionType;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.livy.client.common;

import java.io.File;
import java.io.IOException;
import java.util.EnumSet;
import static java.nio.file.attribute.PosixFilePermission.*;

import com.sun.javafx.PlatformUtil;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;


public class TestOperatingSystemUtils {

@Before
public void setUp() throws UnsupportedOperationException {
if ( !(PlatformUtil.isMac() || PlatformUtil.isWindows() || PlatformUtil.isUnix())) {
throw new UnsupportedOperationException("Not running on supported OS");
}
}

@Test
public void testIsPosixCompliant() {
assertEquals(PlatformUtil.isUnix() || PlatformUtil.isMac(),
OperatingSystemUtils.isPosixCompliant());
}

@Test
public void testIsWindows() {
assertEquals(PlatformUtil.isWindows(),
OperatingSystemUtils.isWindows());
}

@Test
public void testDoBasedOnOs() {
if (OperatingSystemUtils.isPosixCompliant()) {
OperatingSystemUtils.doBasedOnOs(
() -> {},
() -> fail(),
""
);
} else if (OperatingSystemUtils.isWindows()) {
OperatingSystemUtils.doBasedOnOs(
() -> fail(),
() -> {},
""
);
}
}

@Test(expected = IllegalArgumentException.class)
public void testDoBasedOnOsThrowsException() {
if (OperatingSystemUtils.isPosixCompliant()) {
OperatingSystemUtils.doBasedOnOsThrowsException(
() -> {throw new IllegalArgumentException();},
() -> fail(),
""
);
} else if (OperatingSystemUtils.isWindows()) {
OperatingSystemUtils.doBasedOnOsThrowsException(
() -> fail(),
() -> {throw new IllegalArgumentException();},
""
);
}
}

@Test
public void testGetBasedOnOs() {
String value = "";
if (OperatingSystemUtils.isPosixCompliant()) {
value = OperatingSystemUtils.getBasedOnOS(
"pass",
"fail",
""
);
} else if (OperatingSystemUtils.isWindows()) {
value = OperatingSystemUtils.getBasedOnOS(
"fail",
"pass",
""
);
}
assertEquals(value, "pass");
}

@Test
public void testSetOSAgnosticFilePermissions() throws IOException {
File f = File.createTempFile("testFile", ".txt");
f.deleteOnExit();
OperatingSystemUtils.setOSAgnosticFilePermissions(f, EnumSet.of(OWNER_READ,
OWNER_WRITE,
OWNER_EXECUTE));
assertTrue(f.canExecute());
assertTrue(f.canWrite());
assertTrue(f.canRead());

OperatingSystemUtils.setOSAgnosticFilePermissions(f, EnumSet.of(OWNER_READ));
if (OperatingSystemUtils.isPosixCompliant()) {
assertFalse(f.canExecute());
}
assertFalse(f.canWrite());
assertTrue(f.canRead());


}
}
5 changes: 2 additions & 3 deletions rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.livy.client.common.OperatingSystemUtils;
import org.apache.livy.client.common.TestUtils;
import org.apache.livy.rsc.driver.RSCDriverBootstrapper;
import org.apache.livy.rsc.rpc.Rpc;
import org.apache.livy.rsc.rpc.RpcDispatcher;
import org.apache.livy.rsc.rpc.RpcServer;

import static org.apache.livy.rsc.RSCConf.Entry.*;

/**
Expand Down Expand Up @@ -299,9 +299,8 @@ private static File writeConfToFile(RSCConf conf) throws IOException {
}
}
}

File file = File.createTempFile("livyConf", ".properties");
Files.setPosixFilePermissions(file.toPath(), EnumSet.of(OWNER_READ, OWNER_WRITE));
OperatingSystemUtils.setOSAgnosticFilePermissions(file, EnumSet.of(OWNER_READ, OWNER_WRITE));
//file.deleteOnExit();

Writer writer = new OutputStreamWriter(new FileOutputStream(file), UTF_8);
Expand Down
23 changes: 10 additions & 13 deletions rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,13 @@

package org.apache.livy.rsc.driver;


import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
Expand All @@ -39,6 +32,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static java.nio.file.attribute.PosixFilePermission.*;

import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -52,6 +46,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.livy.client.common.OperatingSystemUtils;
import org.apache.livy.client.common.Serializer;
import org.apache.livy.rsc.BaseProtocol;
import org.apache.livy.rsc.BypassJobStatus;
Expand All @@ -61,7 +56,6 @@
import org.apache.livy.rsc.rpc.Rpc;
import org.apache.livy.rsc.rpc.RpcDispatcher;
import org.apache.livy.rsc.rpc.RpcServer;

import static org.apache.livy.rsc.RSCConf.Entry.*;

/**
Expand Down Expand Up @@ -96,9 +90,12 @@ public class RSCDriver extends BaseProtocol {
private final AtomicBoolean inShutdown;

public RSCDriver(SparkConf conf, RSCConf livyConf) throws Exception {
Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwx------");
this.localTmpDir = Files.createTempDirectory("rsc-tmp",
PosixFilePermissions.asFileAttribute(perms)).toFile();

this.localTmpDir = Files.createTempDirectory("rsc-tmp").toFile();
OperatingSystemUtils.setOSAgnosticFilePermissions(this.localTmpDir, EnumSet.of(OWNER_READ,
OWNER_WRITE,
OWNER_EXECUTE));

this.executor = Executors.newCachedThreadPool();
this.jobQueue = new LinkedList<>();
this.clients = new ConcurrentLinkedDeque<>();
Expand Down
9 changes: 7 additions & 2 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration

import org.apache.livy.client.common.ClientConf
import org.apache.livy.client.common.{ClientConf, OperatingSystemUtils}
import org.apache.livy.client.common.ClientConf.ConfEntry
import org.apache.livy.client.common.ClientConf.DeprecatedConf

Expand Down Expand Up @@ -335,7 +335,12 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {

/** Return the path to the spark-submit executable. */
def sparkSubmit(): String = {
sparkHome().map { _ + File.separator + "bin" + File.separator + "spark-submit" }.get
val sparkSubmit = OperatingSystemUtils
.getBasedOnOS(
"spark-submit",
"spark-submit.cmd",
"Getting Spark Submit");
sparkHome().map { _ + File.separator + "bin" + File.separator + sparkSubmit }.get
}

private val configDir: Option[File] = {
Expand Down