diff --git a/bytehousereader/pom.xml b/bytehousereader/pom.xml new file mode 100644 index 000000000..b77c126bb --- /dev/null +++ b/bytehousereader/pom.xml @@ -0,0 +1,97 @@ + + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + 4.0.0 + bytehousereader + bytehousereader + jar + + 8 + 8 + 1.1.58 + + + + + + com.bytedance.bytehouse + driver-java + ${bytehouse.version} + all + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + org.slf4j + slf4j-api + + + org.projectlombok + lombok + 1.18.20 + + + ch.qos.logback + logback-classic + + + + com.alibaba.datax + plugin-rdbms-util + ${datax-project-version} + + + + + + + src/main/java + + **/*.properties + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/bytehousereader/src/main/assembly/package.xml b/bytehousereader/src/main/assembly/package.xml new file mode 100644 index 000000000..9f4a894c7 --- /dev/null +++ b/bytehousereader/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/reader/bytehousereader + + + target/ + + bytehousereader-0.0.1-SNAPSHOT.jar + + plugin/reader/bytehousereader + + + + + + false + plugin/reader/bytehousereader/libs + runtime + + + \ No newline at end of file diff --git a/bytehousereader/src/main/java/com/alibaba/datax/plugin/reader/bytehousereader/BytehouseReader.java b/bytehousereader/src/main/java/com/alibaba/datax/plugin/reader/bytehousereader/BytehouseReader.java new file mode 100644 index 000000000..8dd7a5c67 --- /dev/null +++ b/bytehousereader/src/main/java/com/alibaba/datax/plugin/reader/bytehousereader/BytehouseReader.java @@ -0,0 +1,78 @@ +package com.alibaba.datax.plugin.reader.bytehousereader; + +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader; +import com.alibaba.datax.plugin.rdbms.reader.Constant; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class BytehouseReader { + + + private static final DataBaseType DATABASE_TYPE = DataBaseType.ByteHouse; + private static final Logger LOG = LoggerFactory.getLogger(BytehouseReader.class); + + public static class Job extends Reader.Job { + private Configuration jobConfig = null; + private CommonRdbmsReader.Job commonRdbmsReaderMaster; + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + int fetchSize = this.jobConfig.getInt(Constant.FETCH_SIZE, Integer.MIN_VALUE); + this.jobConfig.set(Constant.FETCH_SIZE, fetchSize); + this.commonRdbmsReaderMaster = new CommonRdbmsReader.Job(DATABASE_TYPE); + this.commonRdbmsReaderMaster.init(this.jobConfig); + } + + @Override + public List split(int mandatoryNumber) { + return this.commonRdbmsReaderMaster.split(this.jobConfig, mandatoryNumber); + } + + @Override + public void post() { + this.commonRdbmsReaderMaster.post(this.jobConfig); + } + + @Override + public void destroy() { + this.commonRdbmsReaderMaster.destroy(this.jobConfig); + } + } + + public static class Task extends Reader.Task { + + private Configuration jobConfig; + private CommonRdbmsReader.Task commonRdbmsReaderSlave; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + this.commonRdbmsReaderSlave = new CommonRdbmsReader.Task(DATABASE_TYPE, super.getTaskGroupId(), super.getTaskId()); + this.commonRdbmsReaderSlave.init(this.jobConfig); + } + + @Override + public void startRead(RecordSender recordSender) { + int fetchSize = this.jobConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE, 1000); + + this.commonRdbmsReaderSlave.startRead(this.jobConfig, recordSender, super.getTaskPluginCollector(), fetchSize); + } + + @Override + public void post() { + this.commonRdbmsReaderSlave.post(this.jobConfig); + } + + @Override + public void destroy() { + this.commonRdbmsReaderSlave.destroy(this.jobConfig); + } + } + +} diff --git a/bytehousereader/src/main/resources/plugin.json b/bytehousereader/src/main/resources/plugin.json new file mode 100644 index 000000000..d6064c11f --- /dev/null +++ b/bytehousereader/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "bytehousereader", + "class": "com.alibaba.datax.plugin.reader.bytehousereader.BytehouseReader", + "description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql.", + "developer": "alibaba" +} \ No newline at end of file diff --git a/bytehousereader/src/main/resources/plugin_job_template.json b/bytehousereader/src/main/resources/plugin_job_template.json new file mode 100644 index 000000000..f03a4f97b --- /dev/null +++ b/bytehousereader/src/main/resources/plugin_job_template.json @@ -0,0 +1,16 @@ +{ + "name": "bytehousereader", + "parameter": { + "username": "username", + "password": "password", + "column": ["col1", "col2", "col3"], + "connection": [ + { + "jdbcUrl": "jdbc:bytehouse://:[/]", + "table": ["table1", "table2"] + } + ], + "preSql": [], + "postSql": [] + } +} \ No newline at end of file diff --git a/bytehousewriter/pom.xml b/bytehousewriter/pom.xml new file mode 100644 index 000000000..67b33bb12 --- /dev/null +++ b/bytehousewriter/pom.xml @@ -0,0 +1,96 @@ + + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + 4.0.0 + jar + bytehousewriter + bytehousewriter + + 8 + 8 + 1.1.58 + + + + + com.bytedance.bytehouse + driver-java + ${bytehouse.version} + all + + + org.projectlombok + lombok + 1.18.20 + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + org.slf4j + slf4j-api + + + + ch.qos.logback + logback-classic + + + + com.alibaba.datax + plugin-rdbms-util + ${datax-project-version} + + + + + + src/main/java + + **/*.properties + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/bytehousewriter/src/main/assembly/package.xml b/bytehousewriter/src/main/assembly/package.xml new file mode 100644 index 000000000..17d5c82e6 --- /dev/null +++ b/bytehousewriter/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/bytehousewriter + + + target/ + + bytehousewriter-0.0.1-SNAPSHOT.jar + + plugin/writer/bytehousewriter + + + + + + false + plugin/writer/bytehousewriter/libs + runtime + + + diff --git a/bytehousewriter/src/main/java/com/alibaba/datax/plugin/writer/bytehousewriter/BytehouseWriter.java b/bytehousewriter/src/main/java/com/alibaba/datax/plugin/writer/bytehousewriter/BytehouseWriter.java new file mode 100644 index 000000000..2ef8227cd --- /dev/null +++ b/bytehousewriter/src/main/java/com/alibaba/datax/plugin/writer/bytehousewriter/BytehouseWriter.java @@ -0,0 +1,324 @@ +package com.alibaba.datax.plugin.writer.bytehousewriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.exception.CommonErrorCode; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; + +import java.sql.*; +import java.util.List; +import java.util.regex.Pattern; + +public class BytehouseWriter extends Writer { + private static final DataBaseType DATABASE_TYPE = DataBaseType.ByteHouse; + + public static class Job extends Writer.Job { + private Configuration originalConfig = null; + private CommonRdbmsWriter.Job commonRdbmsWriterMaster; + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE); + this.commonRdbmsWriterMaster.init(this.originalConfig); + } + + @Override + public void prepare() { + this.commonRdbmsWriterMaster.prepare(this.originalConfig); + } + + @Override + public List split(int mandatoryNumber) { + return this.commonRdbmsWriterMaster.split(this.originalConfig, mandatoryNumber); + } + + @Override + public void post() { + this.commonRdbmsWriterMaster.post(this.originalConfig); + } + + @Override + public void destroy() { + this.commonRdbmsWriterMaster.destroy(this.originalConfig); + } + } + + public static class Task extends Writer.Task { + private Configuration writerSliceConfig; + + private CommonRdbmsWriter.Task commonRdbmsWriterSlave; + + @Override + public void init() { + this.writerSliceConfig = super.getPluginJobConf(); + + this.commonRdbmsWriterSlave = new CommonRdbmsWriter.Task(DATABASE_TYPE) { + @Override + protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqltype, String typeName, Column column) throws SQLException { + try { + if (column.getRawData() == null) { + preparedStatement.setNull(columnIndex + 1, columnSqltype); + return preparedStatement; + } + + java.util.Date utilDate; + switch (columnSqltype) { + case Types.CHAR: + case Types.NCHAR: + case Types.CLOB: + case Types.NCLOB: + case Types.VARCHAR: + case Types.LONGVARCHAR: + case Types.NVARCHAR: + case Types.LONGNVARCHAR: + preparedStatement.setString(columnIndex + 1, column + .asString()); + break; + + case Types.TINYINT: + case Types.SMALLINT: + case Types.INTEGER: + case Types.BIGINT: + case Types.DECIMAL: + case Types.FLOAT: + case Types.REAL: + case Types.DOUBLE: + String strValue = column.asString(); + if (emptyAsNull && "".equals(strValue)) { + preparedStatement.setNull(columnIndex + 1, columnSqltype); + } else { + switch (columnSqltype) { + case Types.TINYINT: + case Types.SMALLINT: + case Types.INTEGER: + preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); + break; + case Types.BIGINT: + preparedStatement.setLong(columnIndex + 1, column.asLong()); + break; + case Types.DECIMAL: + preparedStatement.setBigDecimal(columnIndex + 1, column.asBigDecimal()); + break; + case Types.REAL: + case Types.FLOAT: + preparedStatement.setFloat(columnIndex + 1, column.asDouble().floatValue()); + break; + case Types.DOUBLE: + preparedStatement.setDouble(columnIndex + 1, column.asDouble()); + break; + } + } + break; + + case Types.DATE: + if (this.resultSetMetaData.getRight().get(columnIndex) + .equalsIgnoreCase("year")) { + if (column.asBigInteger() == null) { + preparedStatement.setString(columnIndex + 1, null); + } else { + preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); + } + } else { + Date sqlDate = null; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "Date 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlDate = new Date(utilDate.getTime()); + } + preparedStatement.setDate(columnIndex + 1, sqlDate); + } + break; + + case Types.TIME: + Time sqlTime = null; + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "Date 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlTime = new Time(utilDate.getTime()); + } + preparedStatement.setTime(columnIndex + 1, sqlTime); + break; + + case Types.TIMESTAMP: + Timestamp sqlTimestamp = null; + if (column instanceof StringColumn && column.asString() != null) { + String timeStampStr = column.asString(); + // JAVA TIMESTAMP 类型入参必须是 "2017-07-12 14:39:00.123566" 格式 + String pattern = "^\\d+-\\d+-\\d+ \\d+:\\d+:\\d+.\\d+"; + boolean isMatch = Pattern.matches(pattern, timeStampStr); + if (isMatch) { + sqlTimestamp = Timestamp.valueOf(timeStampStr); + preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp); + break; + } + } + try { + utilDate = column.asDate(); + } catch (DataXException e) { + throw new SQLException(String.format( + "Date 类型转换错误:[%s]", column)); + } + + if (null != utilDate) { + sqlTimestamp = new Timestamp( + utilDate.getTime()); + } + preparedStatement.setTimestamp(columnIndex + 1, sqlTimestamp); + break; + + case Types.BINARY: + case Types.VARBINARY: + case Types.BLOB: + case Types.LONGVARBINARY: + preparedStatement.setBytes(columnIndex + 1, column + .asBytes()); + break; + + case Types.BOOLEAN: + preparedStatement.setInt(columnIndex + 1, column.asBigInteger().intValue()); + break; + + // warn: bit(1) -> Types.BIT 可使用setBoolean + // warn: bit(>1) -> Types.VARBINARY 可使用setBytes + case Types.BIT: + if (this.dataBaseType == DataBaseType.MySql) { + Boolean asBoolean = column.asBoolean(); + if (asBoolean != null) { + preparedStatement.setBoolean(columnIndex + 1, asBoolean); + } else { + preparedStatement.setNull(columnIndex + 1, Types.BIT); + } + } else { + preparedStatement.setString(columnIndex + 1, column.asString()); + } + break; + + default: + boolean isHandled = fillPreparedStatementColumnType4CustomType(preparedStatement, + columnIndex, columnSqltype, column); + if (isHandled) { + break; + } + throw DataXException + .asDataXException( + DBUtilErrorCode.UNSUPPORTED_TYPE, + String.format( + "您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", + this.resultSetMetaData.getLeft() + .get(columnIndex), + this.resultSetMetaData.getMiddle() + .get(columnIndex), + this.resultSetMetaData.getRight() + .get(columnIndex))); + } + return preparedStatement; + } catch (DataXException e) { + // fix类型转换或者溢出失败时,将具体哪一列打印出来 + if (e.getErrorCode() == CommonErrorCode.CONVERT_NOT_SUPPORT || + e.getErrorCode() == CommonErrorCode.CONVERT_OVER_FLOW) { + throw DataXException + .asDataXException( + e.getErrorCode(), + String.format( + "类型转化错误. 字段名:[%s], 字段类型:[%d], 字段Java类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", + this.resultSetMetaData.getLeft() + .get(columnIndex), + this.resultSetMetaData.getMiddle() + .get(columnIndex), + this.resultSetMetaData.getRight() + .get(columnIndex))); + } else { + throw e; + } + } + } + + private Object toJavaArray(Object val) { + if (null == val) { + return null; + } else if (val instanceof JSONArray) { + Object[] valArray = ((JSONArray) val).toArray(); + for (int i = 0; i < valArray.length; i++) { + valArray[i] = this.toJavaArray(valArray[i]); + } + return valArray; + } else { + return val; + } + } + + boolean fillPreparedStatementColumnType4CustomType(PreparedStatement ps, + int columnIndex, int columnSqltype, + Column column) throws SQLException { + switch (columnSqltype) { + case Types.OTHER: + if (this.resultSetMetaData.getRight().get(columnIndex).startsWith("Tuple")) { + throw DataXException + .asDataXException(BytehouseWriterErrorCode.TUPLE_NOT_SUPPORTED_ERROR, BytehouseWriterErrorCode.TUPLE_NOT_SUPPORTED_ERROR.getDescription()); + } else { + ps.setString(columnIndex + 1, column.asString()); + } + return true; + + case Types.ARRAY: + Connection conn = ps.getConnection(); + List values = JSON.parseArray(column.asString(), Object.class); + for (int i = 0; i < values.size(); i++) { + values.set(i, this.toJavaArray(values.get(i))); + } + Array array = conn.createArrayOf("String", values.toArray()); + ps.setArray(columnIndex + 1, array); + return true; + + default: + break; + } + + return false; + } + }; + + this.commonRdbmsWriterSlave.init(this.writerSliceConfig); + } + + @Override + public void prepare() { + this.commonRdbmsWriterSlave.prepare(this.writerSliceConfig); + } + + @Override + public void startWrite(RecordReceiver recordReceiver) { + this.commonRdbmsWriterSlave.startWrite(recordReceiver, this.writerSliceConfig, super.getTaskPluginCollector()); + } + + @Override + public void post() { + this.commonRdbmsWriterSlave.post(this.writerSliceConfig); + } + + @Override + public void destroy() { + this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig); + } + } + +} \ No newline at end of file diff --git a/bytehousewriter/src/main/java/com/alibaba/datax/plugin/writer/bytehousewriter/BytehouseWriterErrorCode.java b/bytehousewriter/src/main/java/com/alibaba/datax/plugin/writer/bytehousewriter/BytehouseWriterErrorCode.java new file mode 100644 index 000000000..fb27dc7a5 --- /dev/null +++ b/bytehousewriter/src/main/java/com/alibaba/datax/plugin/writer/bytehousewriter/BytehouseWriterErrorCode.java @@ -0,0 +1,31 @@ +package com.alibaba.datax.plugin.writer.bytehousewriter; + +import com.alibaba.datax.common.spi.ErrorCode; + +public enum BytehouseWriterErrorCode implements ErrorCode { + TUPLE_NOT_SUPPORTED_ERROR("clickhouseWriter-00", "不支持TUPLE类型导入."), + ; + + private final String code; + private final String description; + + private BytehouseWriterErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s].", this.code, this.description); + } +} diff --git a/bytehousewriter/src/main/resources/plugin.json b/bytehousewriter/src/main/resources/plugin.json new file mode 100644 index 000000000..06d3f53f2 --- /dev/null +++ b/bytehousewriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "bytehousewriter", + "class": "com.alibaba.datax.plugin.writer.bytehousewriter.BytehouseWriter", + "description": "useScene: prod. mechanism: Jdbc connection using the database, execute insert sql.", + "developer": "penggan(1466927252@qq.com)" +} \ No newline at end of file diff --git a/bytehousewriter/src/main/resources/plugin_job_template.json b/bytehousewriter/src/main/resources/plugin_job_template.json new file mode 100644 index 000000000..a61953371 --- /dev/null +++ b/bytehousewriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,21 @@ +{ + "name": "clickhousewriter", + "parameter": { + "username": "username", + "password": "password", + "column": ["col1", "col2", "col3"], + "connection": [ + { + "jdbcUrl": "jdbc:bytehouse://:[/]", + "table": ["table1", "table2"] + } + ], + "preSql": [], + "postSql": [], + + "batchSize": 65536, + "batchByteSize": 134217728, + "dryRun": false, + "writeMode": "insert" + } +} \ No newline at end of file diff --git a/package.xml b/package.xml index 624109f79..e02a90f94 100644 --- a/package.xml +++ b/package.xml @@ -25,6 +25,13 @@ + + bytehousereader/target/datax/ + + **/*.* + + datax + mysqlreader/target/datax/ @@ -259,6 +266,13 @@ + + bytehousewriter/target/datax/ + + **/*.* + + datax + mysqlwriter/target/datax/ diff --git a/pom.xml b/pom.xml index c7f43f172..20d68c0c4 100644 --- a/pom.xml +++ b/pom.xml @@ -135,6 +135,8 @@ gaussdbreader gaussdbwriter datax-example + bytehousewriter + bytehousereader @@ -248,6 +250,12 @@ true + + + bytedance + ByteDance Public Repository + https://artifact.bytedance.com/repository/releases +