原始json [![](file:///Users/old-tom/Documents/youduqt/74236399-103695-邹瑞/image/temp/feb6c40e-d820-454c-ad3a-361609534f4f.png)](/Users/old-tom/Documents/youduqt/74236399-103695-邹瑞/image/temp/feb6c40e-d820-454c-ad3a-361609534f4f.png) 示例配置: ```text env { # 任务名称 job.name = "zr-test-doris" # 任务并行度 parallelism = "1" # 任务模式 job.mode = "STREAMING" # 检查点时间 checkpoint.interval = "10000" } source { Kafka { # 结果输出表 plugin_output = "source_result" # kafka 连接地址 bootstrap.servers = "172.100.40.152:9092,172.100.40.153:9092,172.100.40.154:9092" # topic topic = "ZR_TEST_3" # 消费者配置 kafka.config = { client.id = client_1 auto.offset.reset = "latest" } # kafka字段映射 schema = { fields { data { c_string = "string" c_integer = "int" c_date = "string" } } } } } transform { Sql { plugin_input = "source_result" plugin_output = "transform_result" query = "select data.c_string as c_string, data.c_integer as c_integer, data.c_date as c_date from source_result" } } sink { Doris { # 数据来源表 plugin_input = "transform_result" # be连接地址 fenodes = "172.100.40.152:8030" # 用户名 username = "root" # 密码 password = "maxvision@123" # 数据库地址 database = "ods" # 表名 table = "ods_json_path_test" # 输出任务前缀 sink.label-prefix = "244a7e723ca84de1b25debc626abe64c" # 是否开启两端提交 sink.enable-2pc = "true" # 是否开启删除策略 sink.enable-delete = "true" doris.config { # 数据格式化类型 format = "json" # 读取每行JSON read_json_by_line = "true" } } } ``` 关键点: 需要按照json解析出这个格式 ```text # kafka字段映射 schema = { fields { data { c_string = "string" c_integer = "int" c_date = "string" } } } ``` transform中的sql字段名为schame.xxx ```sql select data.c_string as c_string, data.c_integer as c_integer, data.c_date as c_date from source_result ```