Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
.idea/
*.iml
target/
*.project
.settings/
*.classpath
test-output/
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
adbcj
=====

Asynchronous Database Connectivity in Java ,like jdbc

enhancement:

done:
1. prepared statment has been supported
2. add mysql5.5 protocol support
3. make mysql data type adjuestment to the mysql manaul : http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-type-conversions.html
4. exception handling


TODO:

70 changes: 35 additions & 35 deletions api/src/main/java/org/adbcj/ConnectionManagerProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,38 @@

public class ConnectionManagerProvider {

public static final String ADBCJ_PROTOCOL = "adbcj";

private ConnectionManagerProvider () {}
public static ConnectionManager createConnectionManager(String url, String username, String password) throws DbException {
return createConnectionManager(url, username, password, null);
}

public static ConnectionManager createConnectionManager(String url, String username, String password, Properties properties) throws DbException {
if (url == null) {
throw new IllegalArgumentException("Connection url can not be null");
}
try {
URI uri = new URI(url);
String adbcjProtocol = uri.getScheme();
if (!ADBCJ_PROTOCOL.equals(adbcjProtocol)) {
throw new DbException("Invalid connection URL: " + url);
}
URI driverUri = new URI(uri.getSchemeSpecificPart());
String protocol = driverUri.getScheme();

ServiceLoader<ConnectionManagerFactory> serviceLoader = ServiceLoader.load(ConnectionManagerFactory.class);
for (ConnectionManagerFactory factory : serviceLoader) {
if (factory.canHandle(protocol)) {
return factory.createConnectionManager(url, username, password, properties);
}
}
throw new DbException("Could not find ConnectionManagerFactory for protocol '" + protocol + "'");
} catch (URISyntaxException e) {
throw new DbException("Invalid connection URL: " + url);
}
}
}
public static final String ADBCJ_PROTOCOL = "adbcj";

private ConnectionManagerProvider () {}

public static ConnectionManager createConnectionManager(String url, String username, String password) throws DbException {
return createConnectionManager(url, username, password, null);
}

public static ConnectionManager createConnectionManager(String url, String username, String password, Properties properties) throws DbException {
if (url == null) {
throw new IllegalArgumentException("Connection url can not be null");
}

try {
URI uri = new URI(url);
String adbcjProtocol = uri.getScheme();
if (!ADBCJ_PROTOCOL.equals(adbcjProtocol)) {
throw new DbException("Invalid connection URL: " + url);
}
URI driverUri = new URI(uri.getSchemeSpecificPart());
String protocol = driverUri.getScheme();

ServiceLoader<ConnectionManagerFactory> serviceLoader = ServiceLoader.load(ConnectionManagerFactory.class);
for (ConnectionManagerFactory factory : serviceLoader) {
if (factory.canHandle(protocol)) {
return factory.createConnectionManager(url, username, password, properties);
}
}
throw new DbException("Could not find ConnectionManagerFactory for protocol '" + protocol + "'");
} catch (URISyntaxException e) {
throw new DbException("Invalid connection URL: " + url);
}
}

}
186 changes: 93 additions & 93 deletions api/src/main/java/org/adbcj/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,106 +4,106 @@
//*********************************************************************
package org.adbcj;

import org.adbcj.support.DefaultDbFuture;
import org.adbcj.support.DefaultDbSessionFuture;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

import org.adbcj.support.DefaultDbFuture;
import org.adbcj.support.DefaultDbSessionFuture;

/**
* @author Mike Heath <heathma@ldschurch.org>
*/
public class ConnectionPool implements DbSessionProvider {

private final Queue<DbSession> sessions = new ConcurrentLinkedQueue<DbSession>();
private final ConnectionManager connectionManager;
private final AtomicInteger count = new AtomicInteger(0);

public ConnectionPool(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}

public void addConnection() {
sessions.add(connectionManager.connect().getUninterruptably());
count.incrementAndGet();
System.out.println("Pool size at: " + count.get());
}

public void setPoolSize(int size) {
if (size < count.get()) {
throw new IllegalArgumentException("Can't decrease pool size.");
}
while (count.get() < size) {
addConnection();
}
}

@Override
public DbFuture<DbSession> connect() {
final DbSession session = sessions.poll();
if (session == null) {
throw new IllegalStateException("No connections available in pool");
}
DefaultDbFuture<DbSession> future = new DefaultDbFuture<DbSession>();
future.setResult(new DbSession() {

@Override
public void beginTransaction() {
session.beginTransaction();
}

@Override
public DbSessionFuture<Void> commit() {
return session.commit();
}

@Override
public DbSessionFuture<Void> rollback() {
return session.rollback();
}

@Override
public boolean isInTransaction() {
return session.isInTransaction();
}

@Override
public DbSessionFuture<ResultSet> executeQuery(String sql) {
return session.executeQuery(sql);
}

@Override
public <T> DbSessionFuture<T> executeQuery(String sql, ResultEventHandler<T> eventHandler, T accumulator) {
return session.executeQuery(sql, eventHandler, accumulator);
}

@Override
public DbSessionFuture<Result> executeUpdate(String sql) {
return session.executeUpdate(sql);
}

@Override
public DbSessionFuture<PreparedStatement> prepareStatement(String sql) {
return session.prepareStatement(sql);
}

@Override
public DbSessionFuture<PreparedStatement> prepareStatement(Object key, String sql) {
return session.prepareStatement(key, sql);
}

@Override
public DbSessionFuture<Void> close(boolean immediate) throws DbException {
sessions.add(session);
return DefaultDbSessionFuture.createCompletedFuture(this, null);
}

@Override
public boolean isClosed() throws DbException {
return false;
}
});
return future;
}
private final Queue<DbSession> sessions = new ConcurrentLinkedQueue<DbSession>();
private final ConnectionManager connectionManager;
private final AtomicInteger count = new AtomicInteger(0);

public ConnectionPool(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}

public void addConnection() {
sessions.add(connectionManager.connect().getUninterruptably());
count.incrementAndGet();
System.out.println("Pool size at: " + count.get());
}

public void setPoolSize(int size) {
if (size < count.get()) {
throw new IllegalArgumentException("Can't decrease pool size.");
}
while (count.get() < size) {
addConnection();
}
}

@Override
public DbFuture<DbSession> connect() {
final DbSession session = sessions.poll();
if (session == null) {
throw new IllegalStateException("No connections available in pool");
}
DefaultDbFuture<DbSession> future = new DefaultDbFuture<DbSession>();
future.setResult(new DbSession() {

@Override
public void beginTransaction() {
session.beginTransaction();
}

@Override
public DbSessionFuture<Void> commit() {
return session.commit();
}

@Override
public DbSessionFuture<Void> rollback() {
return session.rollback();
}

@Override
public boolean isInTransaction() {
return session.isInTransaction();
}

@Override
public DbSessionFuture<ResultSet> executeQuery(String sql) {
return session.executeQuery(sql);
}

@Override
public <T> DbSessionFuture<T> executeQuery(String sql, ResultEventHandler<T> eventHandler, T accumulator) {
return session.executeQuery(sql, eventHandler, accumulator);
}

@Override
public DbSessionFuture<Result> executeUpdate(String sql) {
return session.executeUpdate(sql);
}

@Override
public DbSessionFuture<PreparedStatement> prepareStatement(String sql) {
return session.prepareStatement(sql);
}

@Override
public DbSessionFuture<PreparedStatement> prepareStatement(Object key, String sql) {
return session.prepareStatement(key, sql);
}

@Override
public DbSessionFuture<Void> close(boolean immediate) throws DbException {
sessions.add(session);
return DefaultDbSessionFuture.createCompletedFuture(this, null);
}

@Override
public boolean isClosed() throws DbException {
return false;
}
});
return future;
}
}
4 changes: 3 additions & 1 deletion api/src/main/java/org/adbcj/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@
public interface Row extends Map<Object, Value> {

ResultSet getResultSet();


Value[] getValues();

}
1 change: 1 addition & 0 deletions api/src/main/java/org/adbcj/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
public enum Type {
ARRAY(Types.ARRAY),
BIGINT(Types.BIGINT),
LONG(Types.BIGINT),
BINARY(Types.BINARY),
BIT(Types.BIT),
BLOB(Types.BLOB),
Expand Down
Loading