You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2.2 KiB

原始json

示例配置:

env {
  # 任务名称
  job.name = "zr-test-doris"
  # 任务并行度
  parallelism = "1"
  # 任务模式
  job.mode = "STREAMING"
  # 检查点时间
  checkpoint.interval = "10000"
}
source {
  Kafka {
    # 结果输出表
    plugin_output = "source_result"
    # kafka 连接地址
    bootstrap.servers = "172.100.40.152:9092,172.100.40.153:9092,172.100.40.154:9092"
    # topic
    topic = "ZR_TEST_3"
    # 消费者配置
    kafka.config = {
      client.id = client_1
      auto.offset.reset = "latest"
    }
    # kafka字段映射
    schema = {
      fields {
        data {
          c_string = "string"
          c_integer = "int"
          c_date = "string"
        }
      }
    }
  }
}
transform {
    Sql {
      plugin_input = "source_result"
      plugin_output = "transform_result"
      query = "select data.c_string as c_string, data.c_integer as c_integer, data.c_date as c_date from source_result"
  }
}
sink {
  Doris {
    # 数据来源表
    plugin_input = "transform_result"
    # be连接地址
    fenodes = "172.100.40.152:8030"
    # 用户名
    username = "root"
    # 密码
    password = "maxvision@123"
    # 数据库地址
    database = "ods"
    # 表名
    table = "ods_json_path_test"
    # 输出任务前缀
    sink.label-prefix = "244a7e723ca84de1b25debc626abe64c"
    # 是否开启两端提交
    sink.enable-2pc = "true"
    # 是否开启删除策略
    sink.enable-delete = "true"
    doris.config {
      # 数据格式化类型
      format = "json"
      # 读取每行JSON
      read_json_by_line = "true"
    }
  }
}

关键点: 需要按照json解析出这个格式

    # kafka字段映射
    schema = {
      fields {
        data {
          c_string = "string"
          c_integer = "int"
          c_date = "string"
        }
      }
    }

transform中的sql字段名为schame.xxx

select data.c_string as c_string, data.c_integer as c_integer, data.c_date as c_date from source_result