Apache SeaTunnel新版本已经发布,感兴趣的小伙伴可以看之前版本发布的文章
本文主要给大家介绍为使用2.3.4版本的新特性,需要对Apache SeaTunnel-Web依赖的版本进行升级,而SeaTunnel2.3.4版本部分API跟之前版本不兼容,所以需要对 SeaTunnel-Web的源码进行修改适配。
源码修改编译
克隆SeaYunnel-Web源码到本地
git clone https://github.***/apache/seatunnel-web.git
在idea中打开项目
升级Pom中的SeaTunnel版本到2.3.4并重新导入依赖
<seatunnel-framework.version>2.3.3</seatunnel-framework.version>
改为
<seatunnel-framework.version>2.3.4</seatunnel-framework.version>
因为大部分用户使用SeaTunnel Web都是基于SeaTunnel-2.3.3 版本做的适配,而最新发布的SeaTunnel2.3.4 部分API发生了改动导致直接升级的过程中会出现API不兼容的问题,所以本篇文章重点来了:我们需要对调用SeaTunnel API的SeaTunnel Web源码部分进行修改,修改完之后,我们就能完全适配2.3.4最新版本。
社区推出了2.3.X及Web系列专属的社群,感兴趣的小伙伴可以加社区小助手进群。
org.apache.dolphinscheduler.api.dto.seatunnel.bean.engine.EngineDataType
public static class SeaTunnelDataTypeConvertor
implements DataTypeConvertor<SeaTunnelDataType<?>> {
@Override
public SeaTunnelDataType<?> toSeaTunnelType(String engineDataType) {
return DATA_TYPE_MAP.get(engineDataType.toLowerCase(Locale.ROOT)).getRawType();
}
@Override
public SeaTunnelDataType<?> toSeaTunnelType(
SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)
throws DataTypeConvertException {
return seaTunnelDataType;
}
@Override
public SeaTunnelDataType<?> toConnectorType(
SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)
throws DataTypeConvertException {
return seaTunnelDataType;
}
@Override
public String getIdentity() {
return "EngineDataTypeConvertor";
}
}
// 改为
public static class SeaTunnelDataTypeConvertor
implements DataTypeConvertor<SeaTunnelDataType<?>> {
@Override
public SeaTunnelDataType<?> toSeaTunnelType(String s, String s1) {
return DATA_TYPE_MAP.get(s.toLowerCase(Locale.ROOT)).getRawType();
}
@Override
public SeaTunnelDataType<?> toSeaTunnelType(
String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {
return seaTunnelDataType;
}
@Override
public SeaTunnelDataType<?> toConnectorType(
String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {
return seaTunnelDataType;
}
@Override
public String getIdentity() {
return "EngineDataTypeConvertor";
}
}
org.apache.seatunnel.app.service.impl.TableSchemaServiceImpl
public TableSchemaServiceImpl() throws IOException {
***mon.setStarter(true);
Set<PluginIdentifier> pluginIdentifiers =
SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();
ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();
pluginIdentifiersList.addAll(pluginIdentifiers);
List<URL> pluginJarPaths =
new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);
// Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
if (!pluginJarPaths.isEmpty()) {
// List<URL> files = FileUtils.searchJarFiles(path);
pluginJarPaths.addAll(FileUtils.searchJarFiles(***mon.pluginRootDir()));
factory =
new DataTypeConvertorFactory(
new URLClassLoader(pluginJarPaths.toArray(new URL[0])));
} else {
factory = new DataTypeConvertorFactory();
}
}
// 改为
public TableSchemaServiceImpl() throws IOException {
***mon.setStarter(true);
Set<PluginIdentifier> pluginIdentifiers =
SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();
ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();
pluginIdentifiersList.addAll(pluginIdentifiers);
List<URL> pluginJarPaths =
new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);
// Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
if (!pluginJarPaths.isEmpty()) {
// List<URL> files = FileUtils.searchJarFiles(path);
pluginJarPaths.addAll(FileUtils.searchJarFiles(***mon.pluginRootDir()));
factory =
new DataTypeConvertorFactory(
new URLClassLoader(pluginJarPaths.toArray(new URL[0])));
} else {
factory = new DataTypeConvertorFactory();
}
}
SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getType());
// 改为
SeaTunnelDataType<?> dataType =
convertor.toSeaTunnelType(field.getName(), field.getType());
org.apache.seatunnel.app.service.impl.JobExecutorServiceImpl.executeJobBySeaTunnel()
public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) {
***mon.setDeployMode(DeployMode.CLIENT);
JobConfig jobConfig = new JobConfig();
jobConfig.setName(jobInstanceId + "_job");
try {
SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();
SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
ClientJobExecutionEnvironment jobExecutionEnv =
seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));
jobInstanceDao.update(jobInstance);
***pletableFuture.runAsync(
() -> {
waitJobFinish(
clientJobProxy,
userId,
jobInstanceId,
Long.toString(clientJobProxy.getJobId()),
seaTunnelClient);
});
} catch (ExecutionException | InterruptedException e) {
ExceptionUtils.getMessage(e);
throw new RuntimeException(e);
}
return jobInstanceId;
}
org.apache.seatunnel.app.service.impl.JobInstanceServiceImpl
else if (statusList.contains("CANCELLING")) {
jobStatus = JobStatus.CANCELLING.name();
// 改为
else if (statusList.contains("CANCELING")) {
jobStatus = JobStatus.CANCELING.name();
org.apache.seatunnel.app.service.impl.SchemaDerivationServiceImpl
TableFactoryContext context =
new TableFactoryContext(
Collections.singletonList(table),
ReadonlyConfig.fromMap(config),
Thread.currentThread().getContextClassLoader());
// 改为
TableTransformFactoryContext context =
new TableTransformFactoryContext(
Collections.singletonList(table),
ReadonlyConfig.fromMap(config),
Thread.currentThread().getContextClassLoader());
org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy
public void restoreJob(
@NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {
SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
JobConfig jobConfig = new JobConfig();
jobConfig.setName(jobInstanceId + "_job");
try {
seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobEngineId).execute();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 改为
public void restoreJob(
@NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {
SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
JobConfig jobConfig = new JobConfig();
jobConfig.setName(jobInstanceId + "_job");
SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();
try {
seaTunnelClient
.restoreExecutionContext(filePath, jobConfig, seaTunnelConfig, jobEngineId)
.execute();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
org.apache.seatunnel.app.thirdparty.framework.PluginDiscoveryUtil
public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(
PluginType pluginType) throws IOException {
***mon.setStarter(true);
if (!pluginType.equals(PluginType.SOURCE)) {
throw new UnsupportedOperationException("ONLY support plugin type source");
}
Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
List<Factory> factories;
if (path.toFile().exists()) {
List<URL> files = FileUtils.searchJarFiles(path);
factories =
FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])));
} else {
factories =
FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());
}
Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();
factories.forEach(
plugin -> {
if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {
TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;
PluginIdentifier info =
PluginIdentifier.of(
"seatunnel",
PluginType.SOURCE.getType(),
plugin.factoryIdentifier());
featureMap.put(
info,
new ConnectorFeature(
SupportColumnProjection.class.isAssignableFrom(
tableSourceFactory.getSourceClass())));
}
});
return featureMap;
}
// 改为
public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(
PluginType pluginType) {
***mon.setStarter(true);
if (!pluginType.equals(PluginType.SOURCE)) {
throw new UnsupportedOperationException("ONLY support plugin type source");
}
ArrayList<PluginIdentifier> pluginIdentifiers = new ArrayList<>();
pluginIdentifiers.addAll(
SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE).keySet());
List<URL> pluginJarPaths =
new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiers);
List<Factory> factories;
if (!pluginJarPaths.isEmpty()) {
factories =
FactoryUtil.discoverFactories(
new URLClassLoader(pluginJarPaths.toArray(new URL[0])));
} else {
factories =
FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());
}
Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();
factories.forEach(
plugin -> {
if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {
TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;
PluginIdentifier info =
PluginIdentifier.of(
"seatunnel",
PluginType.SOURCE.getType(),
plugin.factoryIdentifier());
featureMap.put(
info,
new ConnectorFeature(
SupportColumnProjection.class.isAssignableFrom(
tableSourceFactory.getSourceClass())));
}
});
return featureMap;
代码格式化
mvn spotless:apply
编译打包
mvn clean package -DskipTests
至此,seatunnel web 适配 seatunnel2.3.4版本完成,对应的安装包会在 seatunnel-web-dist/target目录下生成
Linux部署测试
这里具体请参考之前社区其他老师发布的文章Apache SeaTunnel Web部署指南
重要的配置项
1、seatunnel-web数据库相关配置(application.yml)
用来web服务中的数据持久化
2、SEATUNNEL_HOME(环境变量)
seatunnel-web调用seaunnel的插件获取的API,扫描connector相关的连接器
3、ST_WEB_HOME(环境变量)
seatunnel-web会加载seatunnel-web/datasource下的插件包,这里决定了seatunnel-web支持哪些数据源的任务定义
4、重要的配置文件:
connector-datasource-mapper.yaml
该配置文件配置了支持的数据源类型以及该数据源支持的数据同步方式等信息(比如是否支持多表同步、是否支持cdc等)
hazelcast-client.yaml
seatunnel-web服务通过seatunnel-api的方式与seatunnel集群进行交互,该配置文件配置了集群节点等相关信息
感谢大家的阅读,希望对各位兄弟有所帮助,如果有任何疑问,欢迎来社区找我交流!
本文由 白鲸开源科技 提供发布支持!