【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)

flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文着重介绍了Flink的时态表进行维表数据join操作,实现了三种方式。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,还依赖kafka的环境。

本专题分为以下几篇文章:
【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据
【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源
【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游
【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)

五、通过Temporal table实现维表数据join

1、说明

Temporal table是持续变化表上某一时刻的视图,Temporal table function是一个表函数,传递一个时间参数,返回Temporal table这一指定时刻的视图。可以将维度数据流映射为Temporal table,事实流与这个Temporal table进行join,可以关联到某一个版本视图的维度数据。

该种方式维度数据量可以很大,维表数据实时更新,不依赖于第三方存储,并且提供不同版本的维表数据(应对维表信息更新)。截至版本Flink 1.17该种方式只能在Flink SQL API中使用。

关于时间参数,flink有三个时间,即eventtime、processingtime和injectiontime,常用的是eventtime和processingtime,本文介绍其实现方式。关于eventtime的实现,kafka与其他的数据源还有不同,本文单独介绍一下kafka的实现方式。

2、示例:将事实流与维表进行关联-ProcessingTime实现


package org.tablesql.join;

import static org.apache.flink.table.api.Expressions.$;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 基于处理时间的时态表
 */
public class TestJoinDimByProcessingTimeDemo {
    // 维表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class User {
        private Integer id;
        private String name;
        private Double balance;
        private Integer age;
        private String email;
    }

    // 事实表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order {
        private Integer id;
        private Integer uId;
        private Double total;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        // order 实时流 事实表
        DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999)
                .map(o -> {
                    String[] lines = o.split(",");
                    return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
                });

        // user 实时流 维度表
        DataStream<User> userDs = env.socketTextStream("192.168.10.42", 8888)
                .map(o -> {
                    String[] lines = o.split(",");
                    return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]),
                            Integer.valueOf(lines[3]), lines[4]);
                }).setParallelism(1);

        // 转变为Table
        Table orderTable = tenv.fromDataStream(orderDs, $("id"), $("uId"), $("total"), $("order_ps").proctime());
        Table userTable = tenv.fromDataStream(userDs, $("id"), $("name"), $("balance"), $("age"), $("email"),
                $("user_ps").proctime());

        // 定义一个TemporalTableFunction
        TemporalTableFunction userDim = userTable.createTemporalTableFunction($("user_ps"), $("id"));

        // 注册表函数
        tenv.registerFunction("alan_userDim", userDim);

        // 关联查询
        Table result = tenv
                .sqlQuery("select o.* , u.name from " + orderTable + " as o  , Lateral table (alan_userDim(o.order_ps)) u " +
                        "where o.uId = u.id");

        // 打印输出
        DataStream resultDs = tenv.toAppendStream(result, Row.class);

        resultDs.print();
        // user 流数据(维度表)
        // 1001,alan,18,20,alan.chan.chn@163.***
        // 1002,alanchan,19,25,alan.chan.chn@163.***
        // 1003,alanchanchn,20,30,alan.chan.chn@163.***
        // 1004,alan_chan,27,20,alan.chan.chn@163.***
        // 1005,alan_chan_chn,36,10,alan.chan.chn@163.***

        // order 流数据
        // 26,1002,311
        // 27,1004,334
        // 28,1005,475

        // 控制台输出
        // 15> +I[26, 1002, 311.0, 2023-12-20T05:21:12.977Z, alanchan]
        // 11> +I[27, 1004, 334.0, 2023-12-20T05:21:50.898Z, alan_chan]
        // 5> +I[28, 1005, 475.0, 2023-12-20T05:21:57.559Z, alan_chan_chn]

        env.execute();

    }
}

3、示例:将事实流与维表进行关联-EventTime实现

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
package org.tablesql.join;

import static org.apache.flink.table.api.Expressions.$;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

import org.apache.flink.api.***mon.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

public class TestjoinDimByEventTimeDemo {
    // 维表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class User {
        private Integer id;
        private String name;
        private Double balance;
        private Integer age;
        private String email;
        private Long eventTime;
    }

    // 事实表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order {
        private Integer id;
        private Integer uId;
        private Double total;
        private Long eventTime;
    }

    final static List<User> userList = Arrays.asList(
            new User(1001, "alan", 20d, 18, "alan.chan.chn@163.***", 1L),
            new User(1002, "alan", 30d, 19, "alan.chan.chn@163.***", 10L),
            new User(1003, "alan", 29d, 25, "alan.chan.chn@163.***", 1L),
            new User(1004, "alanchan", 22d, 28, "alan.chan.chn@163.***", 5L),
            new User(1005, "alanchan", 50d, 29, "alan.chan.chn@163.***", 1698742362424L));

    final static List<Order> orderList = Arrays.asList(
            new Order(11, 1002, 1084d, 1L),
            new Order(12, 1001, 84d, 10L),
            new Order(13, 1005, 369d, 2L));

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        // order 实时流 事实表
        // DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999)
        //         .map(o -> {
        //             String[] lines = o.split(",");
        //             return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]), Long.valueOf(lines[3]));
        //         })
        //         .assignTimestampsAndWatermarks(WatermarkStrategy
        //                 .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
        //                 .withTimestampAssigner((order, rTimeStamp) -> order.getEventTime()));
        
        DataStream<Order> orderDs = env.fromCollection(orderList)
            .assignTimestampsAndWatermarks(WatermarkStrategy
                .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                .withTimestampAssigner((order, rTimeStamp) -> order.getEventTime()));

        // user 实时流 维度表
        // DataStream<User> userDs = env.socketTextStream("192.168.10.42", 8888)
        //         .map(o -> {
        //             String[] lines = o.split(",");
        //             return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]), Integer.valueOf(lines[3]), lines[4], Long.valueOf(lines[3]));
        //         })
        //         .assignTimestampsAndWatermarks(WatermarkStrategy
        //                 .<User>forBoundedOutOfOrderness(Duration.ofSeconds(10))
        //                 .withTimestampAssigner((user, rTimeStamp) -> user.getEventTime()));

        DataStream<User> userDs =  env.fromCollection(userList)
            .assignTimestampsAndWatermarks(WatermarkStrategy
                .<User>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                .withTimestampAssigner((user, rTimeStamp) -> user.getEventTime()));      
                        
        // 转变为Table
        Table orderTable = tenv.fromDataStream(orderDs, $("id"), $("uId"), $("total"), $("order_eventTime").rowtime());
        Table userTable = tenv.fromDataStream(userDs, $("id"), $("name"), $("balance"), $("age"), $("email"), $("user_eventTime").rowtime());

        tenv.createTemporaryView("alan_orderTable", orderTable);
        tenv.createTemporaryView("alan_userTable", userTable);

        // 定义一个TemporalTableFunction
        TemporalTableFunction userDim = userTable.createTemporalTableFunction($("user_eventTime"), $("id"));
        // 注册表函数
        tenv.registerFunction("alan_userDim", userDim);

        // String sql = "select o.* from alan_orderTable as o ";
        // String sql = "select u.* from alan_userTable as u ";
        // String sql = "select o.*,u.name from alan_orderTable as o , alan_userTable as u where o.uId = u.id";
        String sql = "select o.*,u.name from alan_orderTable as o,Lateral table (alan_userDim(o.order_eventTime)) u where o.uId = u.id";
        // 关联查询
        Table result = tenv.sqlQuery(sql);

        // 打印输出
        DataStream resultDs = tenv.toAppendStream(result, Row.class);

        resultDs.print();
        // user 流数据(维度表)
        // userList    

        // order 流数据
        // orderList

        // 控制台输出
        // 3> +I[12, 1001, 84.0, 1970-01-01T00:00:00.010, alan]

        env.execute();
    }
}

4、示例:将事实流与维表进行关联-Kafka Source的EventTime实现

1)、bean定义


package org.tablesql.join.bean;

import java.io.Serializable;

import lombok.Data;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
@Data
public  class CityInfo implements Serializable {
    private Integer cityId;
    private String cityName;
    private Long ts;
}
package org.tablesql.join.bean;

import java.io.Serializable;

import lombok.Data;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
@Data
public  class UserInfo implements Serializable {
    private String userName;
    private Integer cityId;
    private Long ts;
}

2)、序列化定义

package org.tablesql.join.bean;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import ***.alibaba.fastjson.JSON;
import ***.alibaba.fastjson.TypeReference;

import org.apache.flink.api.***mon.serialization.DeserializationSchema;
import org.apache.flink.api.***mon.typeinfo.TypeHint;
import org.apache.flink.api.***mon.typeinfo.TypeInformation;
 
 /*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class CityInfoSchema implements DeserializationSchema<CityInfo> {
 
    @Override
    public CityInfo deserialize(byte[] message) throws IOException {
        String jsonStr = new String(message, StandardCharsets.UTF_8);
        CityInfo data = JSON.parseObject(jsonStr, new TypeReference<CityInfo>() {});
        return data;
    }
 
    @Override
    public boolean isEndOfStream(CityInfo nextElement) {
        return false;
    }
 
    @Override
    public TypeInformation<CityInfo> getProducedType() {
        return TypeInformation.of(new TypeHint<CityInfo>() {
        });
    }
    
}

package org.tablesql.join.bean;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import ***.alibaba.fastjson.JSON;
import ***.alibaba.fastjson.TypeReference;

import org.apache.flink.api.***mon.serialization.DeserializationSchema;
import org.apache.flink.api.***mon.typeinfo.TypeHint;
import org.apache.flink.api.***mon.typeinfo.TypeInformation;

 /*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class UserInfoSchema implements DeserializationSchema<UserInfo> {
 
    @Override
    public UserInfo deserialize(byte[] message) throws IOException {
        String jsonStr = new String(message, StandardCharsets.UTF_8);
        UserInfo data = JSON.parseObject(jsonStr, new TypeReference<UserInfo>() {});
        return data;
    }
 
    @Override
    public boolean isEndOfStream(UserInfo nextElement) {
        return false;
    }
 
    @Override
    public TypeInformation<UserInfo> getProducedType() {
        return TypeInformation.of(new TypeHint<UserInfo>() {
        });
    }

}

3)、实现

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
package org.tablesql.join;

import static org.apache.flink.table.api.Expressions.$;

import java.time.Duration;
import java.util.Properties;

import org.apache.flink.api.***mon.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;
import org.tablesql.join.bean.CityInfo;
import org.tablesql.join.bean.CityInfoSchema;
import org.tablesql.join.bean.UserInfo;
import org.tablesql.join.bean.UserInfoSchema;

public class TestJoinDimByKafkaEventTimeDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // Kafka的ip和要消费的topic,//Kafka设置
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");
        props.setProperty("group.id", "kafkatest");

        // 读取用户信息Kafka
        FlinkKafkaConsumer<UserInfo> userConsumer = new FlinkKafkaConsumer<UserInfo>("user", new UserInfoSchema(),props);
        userConsumer.setStartFromEarliest();

        userConsumer.assignTimestampsAndWatermarks(WatermarkStrategy
                        .<UserInfo>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner((user, rTimeStamp) -> user.getTs()) // 该句如果不加,则是默认为kafka的事件时间
        );
                
        // 读取城市维度信息Kafka
        FlinkKafkaConsumer<CityInfo> cityConsumer = new FlinkKafkaConsumer<CityInfo>("city", new CityInfoSchema(), props);
        cityConsumer.setStartFromEarliest();

        cityConsumer.assignTimestampsAndWatermarks(WatermarkStrategy
                        .<CityInfo>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner((city, rTimeStamp) -> city.getTs()) // 该句如果不加,则是默认为kafka的事件时间
        );

        
        Table userTable = tableEnv.fromDataStream(env.addSource(userConsumer), $("userName"), $("cityId"), $("ts").rowtime());
        Table cityTable = tableEnv.fromDataStream(env.addSource(cityConsumer), $("cityId"), $("cityName"),$("ts").rowtime());

        tableEnv.createTemporaryView("userTable", userTable);
        tableEnv.createTemporaryView("cityTable", cityTable);

        // 定义一个TemporalTableFunction
        TemporalTableFunction dimCity = cityTable.createTemporalTableFunction($("ts"), $("cityId"));
        // 注册表函数
        // tableEnv.registerFunction("dimCity", dimCity);
        tableEnv.createTemporarySystemFunction("dimCity", dimCity);

        Table u = tableEnv.sqlQuery("select * from userTable");
        // u.printSchema();
        tableEnv.toAppendStream(u, Row.class).print("user流接收到:");

        Table c = tableEnv.sqlQuery("select * from cityTable");
        // c.printSchema();
        tableEnv.toAppendStream(c, Row.class).print("city流接收到:");

        // 关联查询
        Table result = tableEnv
                .sqlQuery("select u.userName,u.cityId,d.cityName,u.ts " +
                        "from userTable as u " +
                        ", Lateral table  (dimCity(u.ts)) d " +
                        "where u.cityId=d.cityId");

        // 打印输出
        DataStream resultDs = tableEnv.toAppendStream(result, Row.class);
        resultDs.print("\t关联输出:");
        // 用户信息格式:
        // {"userName":"user1","cityId":1,"ts":0}
        // {"userName":"user1","cityId":1,"ts":1}
        // {"userName":"user1","cityId":1,"ts":4}
        // {"userName":"user1","cityId":1,"ts":5}
        // {"userName":"user1","cityId":1,"ts":7}
        // {"userName":"user1","cityId":1,"ts":9}
        // {"userName":"user1","cityId":1,"ts":11}
        // kafka-console-producer.sh --broker-list server1:9092 --topic user
        // 城市维度格式:
        // {"cityId":1,"cityName":"nanjing","ts":15}
        // {"cityId":1,"cityName":"beijing","ts":1}
        // {"cityId":1,"cityName":"shanghai","ts":5}
        // {"cityId":1,"cityName":"shanghai","ts":7}
        // {"cityId":1,"cityName":"wuhan","ts":10}
        // kafka-console-producer.sh --broker-list server1:9092 --topic city

        // 输出
        // city流接收到::6> +I[1, beijing, 1970-01-01T00:00:00.001]
        // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.004]
        // city流接收到::6> +I[1, shanghai, 1970-01-01T00:00:00.005]
        // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.005]
        // city流接收到::6> +I[1, shanghai, 1970-01-01T00:00:00.007]
        // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.007]
        // city流接收到::6> +I[1, wuhan, 1970-01-01T00:00:00.010]
        // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.009]
        // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.011]
        //         关联输出::12> +I[user1, 1, beijing, 1970-01-01T00:00:00.001]
        //         关联输出::12> +I[user1, 1, beijing, 1970-01-01T00:00:00.004]
        //         关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.005]
        //         关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.007]
        //         关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.009]
        
        env.execute("joinDemo");
    }

}

以上,本文着重介绍了Flink的时态表进行维表数据join操作,实现了三种方式。

本专题分为以下几篇文章:
【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据
【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源
【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游
【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)

转载请说明出处内容投诉
CSS教程_站长资源网 » 【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买