当前位置: 代码迷 >> 综合 >> FlinkTable SocketConnector范例
  详细解决方案

FlinkTable SocketConnector范例

热度:57   发布时间:2023-09-18 17:23:03.0

其实这个源码里有

package connector.socket.table;import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.RuntimeConverter.Context;
import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;import java.math.BigInteger;
import java.util.List;
import java.util.regex.Pattern;/*** gsw*/
public class ChangelogCsvDeserializer implements DeserializationSchema<RowData> {private final List<LogicalType> parsingTypes;private final DataStructureConverter converter;private final TypeInformation<RowData> producedTypeInfo;private final String columnDelimiter;public ChangelogCsvDeserializer(List<LogicalType> parsingTypes,DataStructureConverter converter,TypeInformation<RowData> producedTypeInfo,String columnDelimiter) {this.parsingTypes = parsingTypes;this.converter = converter;this.producedTypeInfo = producedTypeInfo;this.columnDelimiter = columnDelimiter;}@Overridepublic TypeInformation<RowData> getProducedType() {// return the type information required by Flink's core interfacesreturn producedTypeInfo;}@Overridepublic void open(InitializationContext context) {// converters must be openconverter.open(Context.create(ChangelogCsvDeserializer.class.getClassLoader()));}@Overridepublic RowData deserialize(byte[] message) {// parse the columns including a changelog flagfinal String[] columns = new String(message).split(Pattern.quote(columnDelimiter));final RowKind kind = RowKind.valueOf(columns[0]);final Row row = new Row(kind, parsingTypes.size());for (int i = 0; i < parsingTypes.size(); i++) {row.setField(i, parse(parsingTypes.get(i).getTypeRoot(), columns[i + 1]));}// convert to internal data structurereturn (RowData) converter.toInternal(row);}//在这加类型支持private static Object parse(LogicalTypeRoot root, String value) {switch (root) {case INTEGER:return Integer.parseInt(value);case VARCHAR:return value;case DOUBLE:return Double.parseDouble(value);case BOOLEAN:return Boolean.parseBoolean(value);case FLOAT:return Float.parseFloat(value);case BIGINT:return Long.parseLong(value);default:throw new IllegalArgumentException();}}@Overridepublic boolean isEndOfStream(RowData nextElement) {return false;}
}
package connector.socket.table;import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;import java.util.List;/*** gsw*/
public class ChangelogCsvFormat implements DecodingFormat<DeserializationSchema<RowData>> {private final String columnDelimiter;public ChangelogCsvFormat(String columnDelimiter) {this.columnDelimiter = columnDelimiter;}@Override@SuppressWarnings("unchecked")public DeserializationSchema<RowData> createRuntimeDecoder(DynamicTableSource.Context context,DataType producedDataType) {// create type information for the DeserializationSchemafinal TypeInformation<RowData> producedTypeInfo = context.createTypeInformation(producedDataType);// most of the code in DeserializationSchema will not work on internal data structures// create a converter for conversion at the endfinal DataStructureConverter converter = context.createDataStructureConverter(producedDataType);// use logical types during runtime for parsingfinal List<LogicalType> parsingTypes = producedDataType.getLogicalType().getChildren();// create runtime classreturn new ChangelogCsvDeserializer(parsingTypes, converter, producedTypeInfo, columnDelimiter);}@Overridepublic ChangelogMode getChangelogMode() {// define that this format can produce INSERT and DELETE rowsreturn ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.DELETE).build();}
}

package connector.socket.table;import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;import java.util.Collections;
import java.util.HashSet;
import java.util.Set;/*** gsw*/
public class ChangelogCsvFormatFactory implements DeserializationFormatFactory {// define all options staticallypublic static final ConfigOption<String> COLUMN_DELIMITER = ConfigOptions.key("column-delimiter").stringType().defaultValue("|");@Overridepublic String factoryIdentifier() {return "changelog-csv";}@Overridepublic Set<ConfigOption<?>> requiredOptions() {return Collections.emptySet();}@Overridepublic Set<ConfigOption<?>> optionalOptions() {final Set<ConfigOption<?>> options = new HashSet<>();options.add(COLUMN_DELIMITER);return options;}@Overridepublic DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(DynamicTableFactory.Context context,ReadableConfig formatOptions) {// either implement your custom validation logic here ...// or use the provided helper methodFactoryUtil.validateFactoryOptions(this, formatOptions);// get the validated optionsfinal String columnDelimiter = formatOptions.get(COLUMN_DELIMITER);// create and return the formatreturn new ChangelogCsvFormat(columnDelimiter);}
}

package connector.socket.table;import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;import java.util.HashSet;
import java.util.Set;/*** gsw*/
public class SocketDynamicTableFactory implements DynamicTableSourceFactory {// define all options staticallypublic static final ConfigOption<String> HOSTNAME = ConfigOptions.key("url").stringType().noDefaultValue();public static final ConfigOption<Integer> PORT = ConfigOptions.key("port").intType().noDefaultValue();public static final ConfigOption<Integer> BYTE_DELIMITER = ConfigOptions.key("byte-delimiter").intType().defaultValue(10); // corresponds to '\n'@Overridepublic String factoryIdentifier() {return "socket"; // used for matching to `connector = '...'`}@Overridepublic Set<ConfigOption<?>> requiredOptions() {final Set<ConfigOption<?>> options = new HashSet<>();options.add(HOSTNAME);options.add(PORT);options.add(FactoryUtil.FORMAT); // use pre-defined option for formatreturn options;}@Overridepublic Set<ConfigOption<?>> optionalOptions() {final Set<ConfigOption<?>> options = new HashSet<>();options.add(BYTE_DELIMITER);return options;}@Overridepublic DynamicTableSource createDynamicTableSource(Context context) {// either implement your custom validation logic here ...// or use the provided helper utilityfinal FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);// discover a suitable decoding formatfinal DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(DeserializationFormatFactory.class,FactoryUtil.FORMAT);// validate all optionshelper.validate();// get the validated optionsfinal ReadableConfig options = helper.getOptions();final String hostname = options.get(HOSTNAME);final int port = options.get(PORT);final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);// derive the produced data type (excluding computed columns) from the catalog tablefinal DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();// create and return dynamic table sourcereturn new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);}
}

package connector.socket.table;import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;/*** gsw*/
public class SocketDynamicTableSource implements ScanTableSource {private final String hostname;private final int port;private final byte byteDelimiter;private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;private final DataType producedDataType;public SocketDynamicTableSource(String hostname,int port,byte byteDelimiter,DecodingFormat<DeserializationSchema<RowData>> decodingFormat,DataType producedDataType) {this.hostname = hostname;this.port = port;this.byteDelimiter = byteDelimiter;this.decodingFormat = decodingFormat;this.producedDataType = producedDataType;}@Overridepublic ChangelogMode getChangelogMode() {// in our example the format decides about the changelog mode// but it could also be the source itselfreturn decodingFormat.getChangelogMode();}@Overridepublic ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {// create runtime classes that are shipped to the clusterfinal DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(runtimeProviderContext,producedDataType);final SourceFunction<RowData> sourceFunction = new SocketSourceFunction(hostname,port,byteDelimiter,deserializer);return SourceFunctionProvider.of(sourceFunction, false);}@Overridepublic DynamicTableSource copy() {return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);}@Overridepublic String asSummaryString() {return "Socket Table Source";}
}

package connector.socket.table;import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.RowData;import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;/*** gsw*/
public class SocketSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {private final String hostname;private final int port;private final byte byteDelimiter;private final DeserializationSchema<RowData> deserializer;private volatile boolean isRunning = true;private Socket currentSocket;public SocketSourceFunction(String hostname, int port, byte byteDelimiter,DeserializationSchema<RowData> deserializer) {this.hostname = hostname;this.port = port;this.byteDelimiter = byteDelimiter;this.deserializer = deserializer;}@Overridepublic TypeInformation<RowData> getProducedType() {return deserializer.getProducedType();}@Overridepublic void open(Configuration parameters) throws Exception {deserializer.open(null);this.currentSocket = new Socket();this.currentSocket.connect(new InetSocketAddress(hostname, port), 0);}@Overridepublic void run(SourceContext<RowData> ctx) throws Exception {InputStream stream = this.currentSocket.getInputStream();while (isRunning) {// open and consume from socketbyte[] b = new byte[46];stream.read(b, 0, 46);RowData rowData = deserializer.deserialize(b);ctx.collect(rowData);
//            Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;try {currentSocket.close();} catch (Throwable t) {// ignore}}
}
  相关解决方案