From bb31411afd0ac0da1557fc5a9f31e0c76a1dd0fb Mon Sep 17 00:00:00 2001 From: venn Date: Thu, 4 May 2023 09:28:54 +0800 Subject: [PATCH 1/3] datagen_to_kafka comment exactly once --- src/main/resources/sql/dev/datagen_to_kafka.sql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/resources/sql/dev/datagen_to_kafka.sql b/src/main/resources/sql/dev/datagen_to_kafka.sql index ed0c2ef..21546e4 100644 --- a/src/main/resources/sql/dev/datagen_to_kafka.sql +++ b/src/main/resources/sql/dev/datagen_to_kafka.sql @@ -40,6 +40,8 @@ CREATE TABLE user_log_sink ,'properties.group.id' = 'user_log' ,'scan.startup.mode' = 'latest-offset' ,'format' = 'json' +-- ,'sink.semantic' = 'exactly-once' +-- ,'properties.transaction.timeout.ms' = '900000' ); From 36566ccb4306e0c28283294a9134393d9aef7235 Mon Sep 17 00:00:00 2001 From: venn Date: Thu, 4 May 2023 11:19:24 +0800 Subject: [PATCH 2/3] update custom mysql catalog, copy AbstractJdbcCatalog to AbstractMyJdbcCatalog, no longer modify JdbcDynamicTableFactory --- .../jdbc/catalog/AbstractMyJdbcCatalog.java | 544 ++++++++++++++++++ .../jdbc/catalog/MyMySqlCatalog.java | 2 +- ...ry.java => MyJdbcDynamicTableFactory.java} | 69 +-- .../org.apache.flink.table.factories.Factory | 1 + 4 files changed, 562 insertions(+), 54 deletions(-) create mode 100644 src/main/flink/org/apache/flink/connector/jdbc/catalog/AbstractMyJdbcCatalog.java rename src/main/flink/org/apache/flink/connector/jdbc/table/{JdbcDynamicTableFactory.java => MyJdbcDynamicTableFactory.java} (82%) diff --git a/src/main/flink/org/apache/flink/connector/jdbc/catalog/AbstractMyJdbcCatalog.java b/src/main/flink/org/apache/flink/connector/jdbc/catalog/AbstractMyJdbcCatalog.java new file mode 100644 index 0000000..a33d486 --- /dev/null +++ b/src/main/flink/org/apache/flink/connector/jdbc/catalog/AbstractMyJdbcCatalog.java @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.catalog; + +import org.apache.flink.connector.jdbc.table.MyJdbcDynamicTableFactory; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.TemporaryClassLoaderContext; + +import org.apache.commons.compress.utils.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Predicate; + +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL; +import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME; +import static org.apache.flink.connector.jdbc.table.MyJdbcDynamicTableFactory.IDENTIFIER; +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Abstract catalog for any JDBC catalogs. + */ +public abstract class AbstractMyJdbcCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractMyJdbcCatalog.class); + + protected final ClassLoader userClassLoader; + protected final String username; + protected final String pwd; + protected final String baseUrl; + protected final String defaultUrl; + + public AbstractMyJdbcCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String username, + String pwd, + String baseUrl) { + super(catalogName, defaultDatabase); + + checkNotNull(userClassLoader); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(username)); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd)); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl)); + + JdbcCatalogUtils.validateJdbcUrl(baseUrl); + + this.userClassLoader = userClassLoader; + this.username = username; + this.pwd = pwd; + this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/"; + this.defaultUrl = this.baseUrl + defaultDatabase; + } + + @Override + public void open() throws CatalogException { + // load the Driver use userClassLoader explicitly, see FLINK-15635 for more detail + try (TemporaryClassLoaderContext ignored = + TemporaryClassLoaderContext.of(userClassLoader)) { + // test connection, fail early if we cannot connect to database + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { + } catch (SQLException e) { + throw new ValidationException( + String.format("Failed connecting to %s via JDBC.", defaultUrl), e); + } + LOG.info("Catalog {} established connection to {}", getName(), defaultUrl); + } + } + + @Override + public void close() throws CatalogException { + LOG.info("Catalog {} closing", getName()); + } + + // ----- getters ------ + + public String getUsername() { + return username; + } + + public String getPassword() { + return pwd; + } + + public String getBaseUrl() { + return baseUrl; + } + + // ------ retrieve PK constraint ------ + + protected Optional getPrimaryKey( + DatabaseMetaData metaData, String database, String schema, String table) + throws SQLException { + + // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys, + // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ. + // We need to sort them based on the KEY_SEQ value. + // In the currently supported database dialects MySQL and Postgres, + // the database term is equivalent to catalog term. + // We need to pass the database name as catalog parameter for retrieving primary keys by + // full table identifier. + ResultSet rs = metaData.getPrimaryKeys(database, schema, table); + + Map keySeqColumnName = new HashMap<>(); + String pkName = null; + while (rs.next()) { + String columnName = rs.getString("COLUMN_NAME"); + pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same + int keySeq = rs.getInt("KEY_SEQ"); + Preconditions.checkState( + !keySeqColumnName.containsKey(keySeq - 1), + "The field(s) of primary key must be from the same table."); + keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index + } + List pkFields = + Arrays.asList(new String[keySeqColumnName.size()]); // initialize size + keySeqColumnName.forEach(pkFields::set); + if (!pkFields.isEmpty()) { + // PK_NAME maybe null according to the javadoc, generate an unique name in that case + pkName = pkName == null ? "pk_" + String.join("_", pkFields) : pkName; + return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields)); + } + return Optional.empty(); + } + + // ------ table factory ------ + + // update by venn + @Override + public Optional getFactory() { + return Optional.of(new MyJdbcDynamicTableFactory()); + } + + // ------ databases ------ + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); + + return listDatabases().contains(databaseName); + } + + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + + Preconditions.checkState( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "Database name must not be blank."); + if (listDatabases().contains(databaseName)) { + return new CatalogDatabaseImpl(Collections.emptyMap(), null); + } else { + throw new DatabaseNotExistException(getName(), databaseName); + } + } + + // ------ tables and views ------ + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + + String databaseName = tablePath.getDatabaseName(); + String dbUrl = baseUrl + databaseName; + + try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) { + DatabaseMetaData metaData = conn.getMetaData(); + Optional primaryKey = + getPrimaryKey( + metaData, + databaseName, + getSchemaName(tablePath), + getTableName(tablePath)); + + PreparedStatement ps = + conn.prepareStatement( + String.format("SELECT * FROM %s;", getSchemaTableName(tablePath))); + + ResultSetMetaData resultSetMetaData = ps.getMetaData(); + + String[] columnNames = new String[resultSetMetaData.getColumnCount()]; + DataType[] types = new DataType[resultSetMetaData.getColumnCount()]; + + for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { + columnNames[i - 1] = resultSetMetaData.getColumnName(i); + types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i); + if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) { + types[i - 1] = types[i - 1].notNull(); + } + } + + Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types); + primaryKey.ifPresent( + pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns())); + Schema tableSchema = schemaBuilder.build(); + + Map props = new HashMap<>(); + props.put(CONNECTOR.key(), IDENTIFIER); + props.put(URL.key(), dbUrl); + props.put(USERNAME.key(), username); + props.put(PASSWORD.key(), pwd); + props.put(TABLE_NAME.key(), getSchemaTableName(tablePath)); + return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed getting table %s", tablePath.getFullName()), e); + } + } + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTable( + ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List listViews(String databaseName) + throws DatabaseNotExistException, CatalogException { + return Collections.emptyList(); + } + + // ------ partitions ------ + + @Override + public List listPartitions(ObjectPath tablePath) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + return Collections.emptyList(); + } + + @Override + public List listPartitions( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws TableNotExistException, TableNotPartitionedException, + PartitionSpecInvalidException, CatalogException { + return Collections.emptyList(); + } + + @Override + public List listPartitionsByFilter( + ObjectPath tablePath, List filters) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + return Collections.emptyList(); + } + + @Override + public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + throw new PartitionNotExistException(getName(), tablePath, partitionSpec); + } + + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + return false; + } + + @Override + public void createPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition partition, + boolean ignoreIfExists) + throws TableNotExistException, TableNotPartitionedException, + PartitionSpecInvalidException, PartitionAlreadyExistsException, + CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropPartition( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition newPartition, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + // ------ functions ------ + + @Override + public List listFunctions(String dbName) + throws DatabaseNotExistException, CatalogException { + return Collections.emptyList(); + } + + @Override + public CatalogFunction getFunction(ObjectPath functionPath) + throws FunctionNotExistException, CatalogException { + throw new FunctionNotExistException(getName(), functionPath); + } + + @Override + public boolean functionExists(ObjectPath functionPath) throws CatalogException { + return false; + } + + @Override + public void createFunction( + ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) + throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterFunction( + ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + // ------ stats ------ + + @Override + public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return CatalogTableStatistics.UNKNOWN; + } + + @Override + public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return CatalogColumnStatistics.UNKNOWN; + } + + @Override + public CatalogTableStatistics getPartitionStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + return CatalogTableStatistics.UNKNOWN; + } + + @Override + public CatalogColumnStatistics getPartitionColumnStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + return CatalogColumnStatistics.UNKNOWN; + } + + @Override + public void alterTableStatistics( + ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTableColumnStatistics( + ObjectPath tablePath, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException, TablePartitionedException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartitionStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogTableStatistics partitionStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartitionColumnStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + protected List extractColumnValuesBySQL( + String connUrl, + String sql, + int columnIndex, + Predicate filterFunc, + Object... params) { + + List columnValues = Lists.newArrayList(); + + try (Connection conn = DriverManager.getConnection(connUrl, username, pwd); + PreparedStatement ps = conn.prepareStatement(sql)) { + if (Objects.nonNull(params) && params.length > 0) { + for (int i = 0; i < params.length; i++) { + ps.setObject(i + 1, params[i]); + } + } + ResultSet rs = ps.executeQuery(); + while (rs.next()) { + String columnValue = rs.getString(columnIndex); + if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) { + columnValues.add(columnValue); + } + } + return columnValues; + } catch (Exception e) { + throw new CatalogException( + String.format( + "The following SQL query could not be executed (%s): %s", connUrl, sql), + e); + } + } + + protected DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex) + throws SQLException { + throw new UnsupportedOperationException(); + } + + protected String getTableName(ObjectPath tablePath) { + throw new UnsupportedOperationException(); + } + + protected String getSchemaName(ObjectPath tablePath) { + throw new UnsupportedOperationException(); + } + + protected String getSchemaTableName(ObjectPath tablePath) { + throw new UnsupportedOperationException(); + } +} diff --git a/src/main/flink/org/apache/flink/connector/jdbc/catalog/MyMySqlCatalog.java b/src/main/flink/org/apache/flink/connector/jdbc/catalog/MyMySqlCatalog.java index 3388e95..f0aa5ae 100644 --- a/src/main/flink/org/apache/flink/connector/jdbc/catalog/MyMySqlCatalog.java +++ b/src/main/flink/org/apache/flink/connector/jdbc/catalog/MyMySqlCatalog.java @@ -47,7 +47,7 @@ * Catalog for MySQL. */ @Internal -public class MyMySqlCatalog extends AbstractJdbcCatalog { +public class MyMySqlCatalog extends AbstractMyJdbcCatalog { private static final Logger LOG = LoggerFactory.getLogger(MyMySqlCatalog.class); diff --git a/src/main/flink/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java b/src/main/flink/org/apache/flink/connector/jdbc/table/MyJdbcDynamicTableFactory.java similarity index 82% rename from src/main/flink/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java rename to src/main/flink/org/apache/flink/connector/jdbc/table/MyJdbcDynamicTableFactory.java index 5525e44..484f2f1 100644 --- a/src/main/flink/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java +++ b/src/main/flink/org/apache/flink/connector/jdbc/table/MyJdbcDynamicTableFactory.java @@ -23,14 +23,11 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; -import org.apache.flink.connector.jdbc.catalog.MysqlCatalogUtils; import org.apache.flink.connector.jdbc.dialect.JdbcDialect; import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions; import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions; -import org.apache.flink.table.catalog.hive.HiveCatalogLock; -import org.apache.flink.table.connector.RequireCatalogLock; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.lookup.LookupOptions; @@ -59,7 +56,7 @@ * JdbcDynamicTableSink}. */ @Internal -public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { +public class MyJdbcDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { public static final String IDENTIFIER = "jdbc"; @@ -70,35 +67,16 @@ public DynamicTableSink createDynamicTableSink(Context context) { final ReadableConfig config = helper.getOptions(); // add by venn, for user define mysql catalog - // if table identifier is jdbc, use JdbcDynamic Factory - // else use FactoryUtil to auto set - if (IDENTIFIER.equals(config.get(MysqlCatalogUtils.CONNECTOR))) { - helper.validate(); - validateConfigOptions(config, context.getClassLoader()); - validateDataTypeWithJdbcDialect( - context.getPhysicalRowDataType(), config.get(URL), context.getClassLoader()); - JdbcConnectorOptions jdbcOptions = getJdbcOptions(config, context.getClassLoader()); - - return new JdbcDynamicTableSink( - jdbcOptions, - getJdbcExecutionOptions(config), - getJdbcDmlOptions( - jdbcOptions, - context.getPhysicalRowDataType(), - context.getPrimaryKeyIndexes()), - context.getPhysicalRowDataType()); - } else { - DynamicTableSink sink = - FactoryUtil.createDynamicTableSink( - null, - context.getObjectIdentifier(), - context.getCatalogTable(), - context.getConfiguration(), - context.getClassLoader(), - context.isTemporary()); - - return sink; - } + DynamicTableSink sink = + FactoryUtil.createDynamicTableSink( + null, + context.getObjectIdentifier(), + context.getCatalogTable(), + context.getConfiguration(), + context.getClassLoader(), + context.isTemporary()); + + return sink; } @Override @@ -107,26 +85,11 @@ public DynamicTableSource createDynamicTableSource(Context context) { FactoryUtil.createTableFactoryHelper(this, context); final ReadableConfig config = helper.getOptions(); - // todo - if (IDENTIFIER.equals(config.get(MysqlCatalogUtils.CONNECTOR))) { - helper.validate(); - validateConfigOptions(config, context.getClassLoader()); - validateDataTypeWithJdbcDialect( - context.getPhysicalRowDataType(), config.get(URL), context.getClassLoader()); - return new JdbcDynamicTableSource( - getJdbcOptions(helper.getOptions(), context.getClassLoader()), - getJdbcReadOptions(helper.getOptions()), - helper.getOptions().get(LookupOptions.MAX_RETRIES), - getLookupCache(config), - context.getPhysicalRowDataType()); - } else { - DynamicTableSource source = FactoryUtil.createDynamicTableSource((DynamicTableSourceFactory) null, context.getObjectIdentifier(), context.getCatalogTable(), context.getConfiguration(), context.getClassLoader(), context.isTemporary()); -// if (source instanceof RequireCatalogLock) { -// ((RequireCatalogLock)source).setLockFactory(HiveCatalogLock.createFactory(this.hiveConf)); -// } - - return source; - } + DynamicTableSource source = FactoryUtil.createDynamicTableSource((DynamicTableSourceFactory) null, context.getObjectIdentifier(), context.getCatalogTable(), context.getConfiguration(), context.getClassLoader(), context.isTemporary()); + + + return source; +// } } diff --git a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 5c79ae5..0c28170 100644 --- a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -5,3 +5,4 @@ com.rookie.submit.cust.connector.hbase.HbaseDynamicTableFactory com.rookie.submit.cust.connector.redis.RedisDynamicTableFactory com.rookie.submit.cust.format.changelog.csv.ChangelogCsvFormatFactory com.rookie.submit.cust.connector.starrocks.StarrocksDynamicTableFactory +org.apache.flink.connector.jdbc.table.MyJdbcDynamicTableFactory \ No newline at end of file From 6243d9c5e292629dce4b1429d2ea48ee1bbe7abe Mon Sep 17 00:00:00 2001 From: venn Date: Mon, 8 May 2023 14:52:43 +0800 Subject: [PATCH 3/3] update --- pom.xml | 20 +++----- .../sql/cep/cep_datagen_to_kafka.sql | 50 +++++++++++++++++++ 2 files changed, 56 insertions(+), 14 deletions(-) create mode 100644 src/main/resources/sql/cep/cep_datagen_to_kafka.sql diff --git a/pom.xml b/pom.xml index bf84cce..3c87df6 100644 --- a/pom.xml +++ b/pom.xml @@ -101,19 +101,11 @@ ${flink.version} provided - com.fasterxml.jackson.core jackson-annotations 2.12.4 - - +