问题是这样的,本来是在本地开发 flink 应用,然后想连接云主机 Doris 把内容写入到 Doris 中,但是一开始报错是返回了远程 Doris 运行时的一个内网地址。
于是改了以下源码的这个类:BackendV2 在其中增加了 convertToHostname() 方法将内网地址映射为公网地址
// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package org.apache.doris.flink.rest.models; import com.car.common.Constant; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; import java.util.Objects; /** * Be response model **/ @JsonIgnoreProperties(ignoreUnknown = true) public class BackendV2 { @JsonProperty(value = "backends") private List<BackendRowV2> backends; public void setBackends(List<BackendRowV2> backends) { this.backends = backends; } public static class BackendRowV2 { @JsonProperty("ip") public String ip; @JsonProperty(value="http_port") public int httpPort; @JsonProperty("is_alive") public boolean isAlive; public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public int getHttpPort() { return httpPort; } public void setHttpPort(int httpPort) { this.httpPort = httpPort; } public boolean isAlive() { return isAlive; } public void setAlive(boolean alive) { isAlive = alive; } public String convertToHostname(String ip) { System.out.println("================"+ip); this.ip = ip; if(Objects.equals(ip,"192.168.0.107")) { return ip= Constant.HADOOP102; } else if(Objects.equals(ip,"192.168.0.108")) { return ip = Constant.HADOOP103; } else if (Objects.equals(ip,"192.168.0.109")) { return ip = Constant.HADOOP104; } else { return null; } } public String toBackendString(){ return convertToHostname(ip) + ":" + httpPort; //return ip + ":" + httpPort; } } }
本来的源码是这样的:
// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package org.apache.doris.flink.rest.models; import com.car.common.Constant; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; import java.util.Objects; /** * Be response model **/ @JsonIgnoreProperties(ignoreUnknown = true) public class BackendV2 { @JsonProperty(value = "backends") private List<BackendRowV2> backends; public void setBackends(List<BackendRowV2> backends) { this.backends = backends; } public static class BackendRowV2 { @JsonProperty("ip") public String ip; @JsonProperty(value="http_port") public int httpPort; @JsonProperty("is_alive") public boolean isAlive; public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public int getHttpPort() { return httpPort; } public void setHttpPort(int httpPort) { this.httpPort = httpPort; } public boolean isAlive() { return isAlive; } public void setAlive(boolean alive) { isAlive = alive; } public String toBackendString(){ return ip + ":" + httpPort; } } }
但是替换完以后报错如下:
Caused by: org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "http_port" (class org.apache.doris.flink.rest.models.BackendV2$BackendRowV2), not marked as ignorable (4 known properties: "httpPort", "isAlive", "alive", "ip"]) at [Source: (String)"{"backends":[{"ip":"192.168.0.109","http_port":7040,"is_alive":true}]}"; line: 1, column: 52] (through reference chain: org.apache.doris.flink.rest.models.BackendV2["backends"]->java.util.ArrayList[0]->org.apache.doris.flink.rest.models.BackendV2$BackendRowV2["http_port"]) at org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61) at org.apache.doris.shaded.com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:1127) at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:2023) at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1700) at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownVanilla(BeanDeserializerBase.java:1678) at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:319) at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:176) at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:355) at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244) at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28) at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:129) at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:313) at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:176) at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323) atorg.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674) at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629) at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3597) at org.apache.doris.flink.rest.RestService.parseBackendV2(RestService.java:380)
替换前报错如下:
2023-07-25 20:54:43 WARN (org.apache.doris.flink.sink.writer.DorisWriter:tryHttpConnection) - Failed to connect to backend:http://192.168.0.109:7040 java.net.ConnectException: Connection timed out: connect at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:607) at sun.net.NetworkClient.doConnect(NetworkClient.java:175) at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) at sun.net.www.http.HttpClient.<init>(HttpClient.java:242) at sun.net.www.http.HttpClient.New(HttpClient.java:339) at sun.net.www.http.HttpClient.New(HttpClient.java:357) at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1228) at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1162) at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056) at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990) at org.apache.doris.flink.sink.writer.DorisWriter.tryHttpConnection(DorisWriter.java:259) at org.apache.doris.flink.sink.writer.DorisWriter.getAvailableBackend(DorisWriter.java:245) at org.apache.doris.flink.sink.writer.DorisWriter.initializeLoad(DorisWriter.java:108) at org.apache.doris.flink.sink.DorisSink.createWriter(DorisSink.java:64) at org.apache.flink.streaming.api.transformations.SinkV1Adapter.createWriter(SinkV1Adapter.java:77) at org.apache.flink.streaming.api.transformations.SinkV1Adapter$PlainSinkAdapter.createWriter(SinkV1Adapter.java:306) at org.apache.flink.streaming.api.transformations.SinkV1Adapter$StatefulSinkAdapter.createWriter(SinkV1Adapter.java:315) at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:117) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:750)
想问问大家有没遇到过本地 flink 应用远程连接 Doris 并写入数据的问题啊? 感谢大家!!!
![]() | 1 F281M6Dh8DXpD1g2 2023-07-25 22:13:30 +08:00 via iPhone 端口转发到本地不就完了 |