网创优客建站品牌官网
为成都网站建设公司企业提供高品质网站建设
热线:028-86922220
成都专业网站建设公司

定制建站费用3500元

符合中小企业对网站设计、功能常规化式的企业展示型网站建设

成都品牌网站建设

品牌网站建设费用6000元

本套餐主要针对企业品牌型网站、中高端设计、前端互动体验...

成都商城网站建设

商城网站建设费用8000元

商城网站建设因基本功能的需求不同费用上面也有很大的差别...

成都微信网站建设

手机微信网站建站3000元

手机微信网站开发、微信官网、微信商城网站...

建站知识

当前位置:首页 > 建站知识

flinksql如何链接kafka

这篇文章主要介绍“flinksql如何链接kafka”,在日常操作中,相信很多人在flinksql如何链接kafka问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”flinksql如何链接kafka”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

目前创新互联已为成百上千的企业提供了网站建设、域名、虚拟主机网站托管运营、企业网站设计、碌曲网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。



    4.0.0

    org.example
    flinksqldemo
    1.0-SNAPSHOT


    
        
        UTF-8
        UTF-8

        2.11
        2.11.8
        0.10.2.1
        1.12.0
        2.7.3

        
        compile
    

    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    8
                    8
                
            
        
    



    
        
        
            org.apache.flink
            flink-table-planner-blink_2.11
            1.12.0

        

        
            org.apache.flink
            flink-java
            ${flink.version}
            ${setting.scope}
        
        
            org.apache.flink
            flink-streaming-java_2.11
            ${flink.version}
            ${setting.scope}
        
        
            org.apache.flink
            flink-clients_2.11
            ${flink.version}
            ${setting.scope}
        


        
            org.apache.flink
            flink-connector-kafka_2.11
            1.12.0
        
        
            org.apache.flink
            flink-csv
            1.12.0
        

        
            org.apache.flink
            flink-streaming-scala_${scala.binary.version}
            ${flink.version}
            ${setting.scope}
        

        
        

        
        
            org.apache.kafka
            kafka_${scala.binary.version}
            ${kafka.version}
            ${setting.scope}
        
        

        
        
            org.apache.hadoop
            hadoop-common
            ${hadoop.version}
            ${setting.scope}
        
        
            org.apache.hadoop
            hadoop-hdfs
            ${hadoop.version}
            ${setting.scope}
        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
            ${setting.scope}
        
        

        
            org.slf4j
            slf4j-api
            1.7.25
        
        
            com.alibaba
            fastjson
            1.2.72
        
        
            redis.clients
            jedis
            2.7.3
        
        
            com.google.guava
            guava
            29.0-jre
        

    

代码:

package com.jd.data;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class TableApiConnectKafka04 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


//        1、创建表执行环节
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

          tableEnv.connect(new Kafka()
                .version("0.11") // 定义版本
                .topic("xxx") // 定义主题
                .property("zookeeper.connect", "localhost:2181")
                .property("bootstrap.servers", "localhost:9092")
        ).withFormat(new Csv()).withSchema(new Schema().field("a", DataTypes.STRING())  // 定义表的结构
                  .field("b", DataTypes.STRING())
                  .field("c", DataTypes.STRING())

          )
                  .inAppendMode()
                  .createTemporaryTable("xxx");

        Table xxx = tableEnv.from("xxx");

        xxx.printSchema();

        tableEnv.toAppendStream(xxx,  Row.class ).print();

        env.execute("job");
    }
}

到此,关于“flinksql如何链接kafka”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!


文章名称:flinksql如何链接kafka
浏览地址:http://bjjierui.cn/article/pcpgeh.html

其他资讯