Flink 程序打包与分布式执行从 IDE 到集群的一条龙指南

一、用对执行环境

Flink 支持将程序打包为 JAR 再提交到集群执行。要让相同代码在本地/集群都“识大体”,务必使用

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  • 当你从 命令行或 Web UI 提交 JAR 时,它会自动作为集群环境运行。
  • 当你在 IDE 里直接运行 main() 时,它会自动退化为本地环境(Local)。

这意味着:同一套代码可以在不同环境下复用,无需条件编译或分支判断。

二、目录结构与最小可运行示例

flink-demo/
├─ src/main/java/***/example/WordCountJob.java
├─ pom.xml
└─ ...
package ***.example;

import org.apache.flink.api.***mon.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class WordCountJob {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> lines = env.fromElements("hello flink", "hello world");

    lines
      .flatMap((String s, org.apache.flink.util.Collector<Tuple2<String, Integer>> out) -> {
        for (String w : s.split("\\s+")) out.collect(Tuple2.of(w, 1));
      })
      .returns(Types.TUPLE(Types.STRING, Types.INT))
      .keyBy(t -> t.f0)
      .sum(1)
      .print();

    env.execute("word-count-demo");
  }
}

三、设置入口类 & 正确依赖范围

1)、pom.xml 关键点

  • 入口类写进 Manifest:Main-Class
  • Flink 依赖一般用 provided(由集群提供),避免把整套 Flink 打进你的 JAR
  • 使用 maven-shade-plugin 生成 fat-jar(只打进你的业务依赖;如需 relocate 冲突包请配置)
<project>
  <properties>
    <maven.***piler.source>11</maven.***piler.source>
    <maven.***piler.target>11</maven.***piler.target>
    <flink.version>YOUR_FLINK_VERSION</flink.version>
  </properties>

  <dependencies>
    <!-- 由集群提供,避免重复打包 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <!-- 若需要 Table/Connector/其他,请按需添加,通常也为 provided -->
  </dependencies>

  <build>
    <plugins>
      <!-- 统一打包并写入 Manifest 的 Main-Class -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.5.0</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals><goal>shade</goal></goals>
            <configuration>
              <createDependencyReducedPom>false</createDependencyReducedPom>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>***.example.WordCountJob</mainClass>
                </transformer>
              </transformers>
              <!-- 如有 Guava/Jackson/***ty 冲突,使用 relocate:
              <relocations>
                <relocation>
                  <pattern>***.google.***mon</pattern>
                  <shadedPattern>shadow.***.google.***mon</shadedPattern>
                </relocation>
              </relocations>
              -->
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

2) 构建命令

mvn -DskipTests clean package
# 产物:target/your-artifact-*-shaded.jar (名称依配置而定)

四、JAR 的入口类识别:program-class vs main-class

Flink 在加载你的 JAR 时,按以下顺序寻找入口:

  1. Manifest 中的 program-classmain-class

    • 两者都存在时program-class 优先
  2. 如果 Manifest 里都没有,命令行和 Web UI 都支持手动指定入口类

1) 手动覆盖入口(命令行)

bin/flink run \
  -c ***.example.WordCountJob \   # 覆盖入口类
  target/app-1.0-SNAPSHOT-shaded.jar \
  --yourArg1 foo --yourArg2 bar

五、命令行 & Web UI

1) 命令行(最通用)

# 基本提交流程
bin/flink run target/app-1.0-SNAPSHOT-shaded.jar --key value

# 覆盖入口类
bin/flink run -c ***.example.WordCountJob target/app-1.0-SNAPSHOT-shaded.jar
  • 适用于 Standalone / YARN / K8s 等多种部署模式(集群准备好后,flink run 即可)。
  • 参数传递建议配合 ParameterTool.fromArgs(args) 读取。

2) Web UI(可视化上传)

  • 打开 Flink Web 前端 → “Submit New Job”
  • 上传 JAR
  • 如果 Manifest 无入口,在页面上选择/填写 Entry Class
  • 填写程序参数,点击提交

六、远程执行与本地验证

  • 本地 IDEA 运行 main():用 getExecutionEnvironment() 自动起本地运行时,调试快、反馈直观。
  • 集群执行:打包 JAR 提交即可,代码不变。
  • 这套机制也支持 CI:构建产物后,通过自动化脚本 bin/flink run 提交。

七、常见问题速查

1) ClassNotFoundException / NoSuchMethodError

  • 检查是否错误地把 Flink 依赖 打进了 JAR(不要),或版本与集群不一致
  • 业务依赖冲突(Guava/Jackson/***ty)→ relocate 或与集群对齐

2) “入口类找不到”

  • Manifest 未写 main-class / program-class 且命令行/Web UI 未指定 -c
  • 入口类的包名拼写、可见性、方法签名是否正确(public static void main(String[] args)

3) 本地能跑,集群报错

  • 本地 JDK/依赖版本与集群不同
  • 读取外部文件路径、权限、HDFS/S3 凭据在集群环境不可用
  • 在本地加载的配置/字典未打包或未挂载到集群

4) 依赖体积过大

  • 只打业务依赖;Flink 及其 Connector(多数情况)由集群侧提供(provided
  • 若必须打进(如独立运行环境),请确认与运行时 Flink 版本完全兼容

八、交付级清单(Checklist)

  • 代码中统一使用 StreamExecutionEnvironment.getExecutionEnvironment()
  • Maven:Flink 依赖 provided,Manifest 写好 Main-Class
  • 若有多入口,清楚 program-class 优先级,并能用 -c 覆盖
  • 使用 ParameterTool 接收参数,避免硬编码
  • 如有依赖冲突,使用 shade + relocate
  • 本地/集群分别验证:路径、权限、凭证、JDK/依赖版本
  • CI/CD:构建产物 → 归档 → flink run 提交脚本自动化

九、结语

打包不是“把代码 zip 一下”这么简单:入口类识别、依赖范围、环境一致性都决定了上线是否顺滑。
遵循本文的“环境统一 + 正确打包 + 清晰入口 + 参数化配置”,你的 Flink 程序就能在 IDE → 集群 的迁移中“无感切换”,稳定交付。祝你 All Green!

转载请说明出处内容投诉
CSS教程网 » Flink 程序打包与分布式执行从 IDE 到集群的一条龙指南

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买