最近,使用flink读取postgresql的表数据时,代码一直报错,提示表不存在。通过debug发现,flink-jdbc的JDBCDialect类存在一点问题,现记录如下。
flink版本:1.9.1
scala版本:2.12
postgresql数据库表结构:
ipark_datacenter:
--ods
– t_test(id, name)
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.api.java.io.jdbc.JDBCOptions;
import org.apache.flink.api.java.io.jdbc.JDBCTableSource;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
/**
* @Author: maozl
* @Date: 2019/10/17 17:14
* @Description:
*/
public class FlinkJdbcDialectBugShow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//jdbc
JDBCOptions jdbcOptions = JDBCOptions.builder()
.setDriverName("org.postgresql.Driver")
.setDBUrl("jdbc:postgresql://127.0.0.1:65432/ipark_datacenter")
.setUsername("root")
.setPassword("admin")
.setTableName("ods.t_test")
.build();
TableSchema tableSchema = TableSchema.builder()
.fields(new String[]{"id", "name"}, new DataType[]{TypeConversions.fromLegacyInfoToDataType(BasicTypeInfo.INT_TYPE_INFO), TypeConversions.fromLegacyInfoToDataType(BasicTypeInfo.STRING_TYPE_INFO)})
.build();
JDBCTableSource jdbcTableSource = JDBCTableSource.builder().setOptions(jdbcOptions).setSchema(tableSchema).build();
tEnv.registerTableSource("userInfo", jdbcTableSource);
tEnv.scan("userInfo").printSchema();
Table query = tEnv.sqlQuery("select * from userInfo");
DataStream<Row> rs = tEnv.toAppendStream(query, Row.class);
rs.print();
env.execute();
}
}
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
at FlinkJdbcDialectBugShow.main(FlinkJdbcDialectBugShow.java:53)
Caused by: java.lang.Exception: java.lang.IllegalArgumentException: open() failed.ERROR: relation "ods.t_test" does not exist
位置:26
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:217)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: open() failed.ERROR: relation "ods.t_test" does not exist
位置:26
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:250)
at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:85)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203)
Caused by: org.postgresql.util.PSQLException: ERROR: relation "ods.t_test" does not exist
位置:26
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2183)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:308)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:441)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:365)
at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:143)
at org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:106)
at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:247)
... 4 more
报错信息提示 relation “ods.t_test” does not exist,但实际上我已经在ods这个schema下建了t_test这个表的。
default String quoteTablename(String tableName){
if(tableName.contains(".")){
String[] strs = tableName.split("\\.");
return Arrays.stream(strs).map(s -> quoteIdentifier(s)).collect(Collectors.joining("."));
}else {
return quoteIdentifier(tableName);
}
}
package org.apache.flink.api.java.io.jdbc.dialect;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* @Author: maozl01
* @Date: 2020/7/17 17:14
* @Description: JDBCDialect bug fix
*/
public interface JDBCDialect extends Serializable {
/**
* Check if this dialect instance can handle a certain jdbc url.
* @param url the jdbc url.
* @return True if the dialect can be applied on the given jdbc url.
*/
boolean canHandle(String url);
/**
* @return the default driver class name, if user not configure the driver class name,
* then will use this one.
*/
default Optional<String> defaultDriverName() {
return Optional.empty();
}
/**
* Quotes the identifier. This is used to put quotes around the identifier in case the column
* name is a reserved keyword, or in case it contains characters that require quotes (e.g. space).
* Default using double quotes {@code "} to quote.
*/
default String quoteIdentifier(String identifier) {
return "\"" + identifier + "\"";
}
/**
* Get dialect upsert statement, the database has its own upsert syntax, such as Mysql
* using DUPLICATE KEY UPDATE, and PostgresSQL using ON CONFLICT... DO UPDATE SET..
*
* @return None if dialect does not support upsert statement, the writer will degrade to
* the use of select + update/insert, this performance is poor.
*/
default Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
default String quoteTablename(String tableName){
if(tableName.contains(".")){
String[] strs = tableName.split("\\.");
return Arrays.stream(strs).map(s -> quoteIdentifier(s)).collect(Collectors.joining("."));
}else {
return quoteIdentifier(tableName);
}
}
/**
* Get row exists statement by condition fields. Default use SELECT.
*/
default String getRowExistsStatement(String tableName, String[] conditionFields) {
String fieldExpressions = Arrays.stream(conditionFields)
.map(f -> quoteIdentifier(f) + "=?")
.collect(Collectors.joining(" AND "));
return "SELECT 1 FROM " + quoteTablename(tableName) + " WHERE " + fieldExpressions;
}
/**
* Get insert into statement.
*/
default String getInsertIntoStatement(String tableName, String[] fieldNames) {
String columns = Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders = Arrays.stream(fieldNames)
.map(f -> "?")
.collect(Collectors.joining(", "));
return "INSERT INTO " + quoteTablename(tableName) +
"(" + columns + ")" + " VALUES (" + placeholders + ")";
}
/**
* Get update one row statement by condition fields, default not use limit 1,
* because limit 1 is a sql dialect.
*/
default String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
String setClause = Arrays.stream(fieldNames)
.map(f -> quoteIdentifier(f) + "=?")
.collect(Collectors.joining(", "));
String conditionClause = Arrays.stream(conditionFields)
.map(f -> quoteIdentifier(f) + "=?")
.collect(Collectors.joining(" AND "));
return "UPDATE " + quoteTablename(tableName) +
" SET " + setClause +
" WHERE " + conditionClause;
}
/**
* Get delete one row statement by condition fields, default not use limit 1,
* because limit 1 is a sql dialect.
*/
default String getDeleteStatement(String tableName, String[] conditionFields) {
String conditionClause = Arrays.stream(conditionFields)
.map(f -> quoteIdentifier(f) + "=?")
.collect(Collectors.joining(" AND "));
return "DELETE FROM " + quoteTablename(tableName) + " WHERE " + conditionClause;
}
/**
* Get select fields statement by condition fields. Default use SELECT.
*/
default String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) {
String selectExpressions = Arrays.stream(selectFields)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String fieldExpressions = Arrays.stream(conditionFields)
.map(f -> quoteIdentifier(f) + "=?")
.collect(Collectors.joining(" AND "));
return "SELECT " + selectExpressions + " FROM " +
quoteTablename(tableName) + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
}
}
本文地址:https://blog.csdn.net/u014730001/article/details/107356393
如对本文有疑问, 点击进行留言回复!!
金蝶KIS商贸版 专业版 账套管理查看账套没有用户名 登录账套用户不存在 解决方法.
QueryWarpper的使用方法,MyBatics Plus的查询方法处理
sqlServer数据库表无法查询,无法删除。“表发生死锁”。“已超过了锁请求超时时段。”
网友评论