Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,16 @@ public final class ManagerMessages {
public static final String DECREASE_REFERENCE_COUNT_FOR_SNAPSHOT_ERROR =
"Decrease reference count for snapshot {} error.";
public static final String DELETING_REGIONS_COSTS_MS = "Deleting regions costs {}ms";
public static final String DETECTED_HISTORICAL_PIPE_COMPLETION_REPORT_FROM_DATANODE =
"Detected historical pipe completion report from DataNode {} for pipe {}. remainingEventCount: {}, remainingTime: {}, completedDataNodes: {}";
public static final String DETECTED_COMPLETION_OF_PIPE_STATIC_META_REMOVE_IT =
"Detected completion of pipe {}, static meta: {}, remove it.";
public static final String ALL_DATANODES_REPORTED_HISTORICAL_PIPE_COMPLETED =
"All DataNodes reported historical pipe {} completed. globalRemainingEventCount: {}, globalRemainingTime: {}, staticMeta: {}";
public static final String DETECT_PIPERUNTIMECRITICALEXCEPTION_FROM_AGENT_STOP_PIPE =
"Detect PipeRuntimeCriticalException {} from agent, stop pipe {}.";
public static final String DETECT_PIPERUNTIMESINKCRITICALEXCEPTION_FROM_AGENT_STOP_PIPE =
"Detect PipeRuntimeSinkCriticalException {} from agent, stop pipe {}.";
public static final String ENABLE_SEPARATION_OF_POWERS_IS_NOT_SUPPORTED =
"Enable separation of powers is not supported";
public static final String ENDEXECUTECQ_TIME_RANGE_IS_CURRENT_TIME_IS =
Expand Down Expand Up @@ -113,7 +119,7 @@ public final class ManagerMessages {
public static final String FAILED_TO_CLOSE_CONSUMER_IN_CONSUMER_GROUP_RESULT_STATUS =
"Failed to close consumer {} in consumer group {}. Result status: {}.";
public static final String FAILED_TO_CLOSE_EXTRACTOR_AFTER_FAILED_TO_INITIALIZE_EXTRACTOR =
"Failed to close extractor after failed to initialize extractor. ";
"Failed to close extractor after failed to initialize extractor. Ignore this exception.";
public static final String FAILED_TO_CLOSE_SINK_AFTER_FAILED_TO_INITIALIZE_IT_IGNORE =
"Failed to close sink after failed to initialize it. Ignore this exception.";
public static final String FAILED_TO_COLLECT_COMMITCREATETABLEPLAN =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,16 @@ public final class ManagerMessages {
public static final String DECREASE_REFERENCE_COUNT_FOR_SNAPSHOT_ERROR =
"Decrease reference count for snapshot {} error.";
public static final String DELETING_REGIONS_COSTS_MS = "Deleting regions costs {}ms";
public static final String DETECTED_HISTORICAL_PIPE_COMPLETION_REPORT_FROM_DATANODE =
"检测到来自 DataNode {} 的历史 pipe 完成上报,pipe {}。remainingEventCount: {}, remainingTime: {}, completedDataNodes: {}";
public static final String DETECTED_COMPLETION_OF_PIPE_STATIC_META_REMOVE_IT =
"Detected completion of pipe {}, static meta: {}, remove it.";
"检测到 pipe {} 已完成,static meta: {},将其移除。";
public static final String ALL_DATANODES_REPORTED_HISTORICAL_PIPE_COMPLETED =
"所有 DataNode 均已上报历史 pipe {} 完成。globalRemainingEventCount: {}, globalRemainingTime: {}, staticMeta: {}";
public static final String DETECT_PIPERUNTIMECRITICALEXCEPTION_FROM_AGENT_STOP_PIPE =
"Detect PipeRuntimeCriticalException {} from agent, stop pipe {}.";
"检测到 agent 上报 PipeRuntimeCriticalException {},停止 pipe {}。";
public static final String DETECT_PIPERUNTIMESINKCRITICALEXCEPTION_FROM_AGENT_STOP_PIPE =
"检测到 agent 上报 PipeRuntimeSinkCriticalException {},停止 pipe {}。";
public static final String ENABLE_SEPARATION_OF_POWERS_IS_NOT_SUPPORTED = "不支持启用权力分离";
public static final String ENDEXECUTECQ_TIME_RANGE_IS_CURRENT_TIME_IS =
"[EndExecuteCQ] {}, time range is [{}, {}), current time is {}";
Expand Down Expand Up @@ -112,9 +118,9 @@ public final class ManagerMessages {
public static final String FAILED_TO_CLOSE_CONSUMER_IN_CONSUMER_GROUP_RESULT_STATUS =
"Failed to close consumer {} in consumer group {}. Result status: {}.";
public static final String FAILED_TO_CLOSE_EXTRACTOR_AFTER_FAILED_TO_INITIALIZE_EXTRACTOR =
"Failed to close extractor after failed to initialize extractor. ";
"初始化 extractor 失败后关闭 extractor 失败,忽略此异常。";
public static final String FAILED_TO_CLOSE_SINK_AFTER_FAILED_TO_INITIALIZE_IT_IGNORE =
"Failed to close sink after failed to initialize it. Ignore this exception.";
"初始化 sink 失败后关闭 sink 失败,忽略此异常。";
public static final String FAILED_TO_COLLECT_COMMITCREATETABLEPLAN =
"Failed to collect CommitCreateTablePlan";
public static final String FAILED_TO_COLLECT_PIPE_META_LIST_FROM_CONFIG_NODE_TASK =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.conf.TrimProperties;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.pipe.config.PipeDescriptor;
import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
Expand Down Expand Up @@ -826,12 +828,18 @@ public boolean isSeedConfigNode() {
}
}

public void loadHotModifiedProps(TrimProperties properties) {
public void loadHotModifiedProps(TrimProperties properties) throws IOException {
ConfigurationFileUtils.updateAppliedProperties(properties, true);
Optional.ofNullable(properties.getProperty(IoTDBConstant.CLUSTER_NAME))
.ifPresent(conf::setClusterName);
Optional.ofNullable(properties.getProperty("enable_topology_probing"))
.ifPresent(v -> conf.setEnableTopologyProbing(Boolean.parseBoolean(v)));
loadPipeHotModifiedProp(properties);
}

private void loadPipeHotModifiedProp(TrimProperties properties) throws IOException {
PipeDescriptor.loadPipeProps(commonDescriptor.getConfig(), properties, true);
PipePeriodicalLogReducer.update();
}

public static ConfigNodeDescriptor getInstance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
Expand Down Expand Up @@ -56,6 +58,7 @@ public class PipeConfigNodeRuntimeAgent implements IService {
@Override
public synchronized void start() {
PipeConfig.getInstance().printAllConfigs();
PipeLogger.setLogger(PipePeriodicalLogReducer::log);

// PipeTasks will not be started here and will be started by "HandleLeaderChange"
// procedure when the consensus layer notify leader ready
Expand Down Expand Up @@ -142,19 +145,21 @@ public void report(final EnrichedEvent event, final PipeRuntimeException pipeRun
if (event.getPipeTaskMeta() != null) {
report(event.getPipeTaskMeta(), pipeRuntimeException);
} else {
LOGGER.warn(
ManagerMessages.ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A_NULL_PIPETASKMETA,
pipeRuntimeException);
PipeLogger.log(
LOGGER::warn,
pipeRuntimeException,
ManagerMessages.ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A_NULL_PIPETASKMETA);
}
}

private void report(
final PipeTaskMeta pipeTaskMeta, final PipeRuntimeException pipeRuntimeException) {
LOGGER.warn(
PipeLogger.log(
LOGGER::warn,
pipeRuntimeException,
ManagerMessages.REPORT_PIPERUNTIMEEXCEPTION_TO_LOCAL_PIPETASKMETA_EXCEPTION_MESSAGE,
pipeTaskMeta,
pipeRuntimeException.getMessage(),
pipeRuntimeException);
pipeRuntimeException.getMessage());

pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import org.apache.iotdb.confignode.manager.pipe.metric.sink.PipeConfigRegionSinkMetrics;
Expand Down Expand Up @@ -105,10 +106,10 @@ private void initSource(final Map<String, String> sourceAttributes) throws Excep
try {
source.close();
} catch (Exception closeException) {
LOGGER.warn(
ManagerMessages.FAILED_TO_CLOSE_EXTRACTOR_AFTER_FAILED_TO_INITIALIZE_EXTRACTOR
+ "Ignore this exception.",
closeException);
PipeLogger.log(
LOGGER::warn,
closeException,
ManagerMessages.FAILED_TO_CLOSE_EXTRACTOR_AFTER_FAILED_TO_INITIALIZE_EXTRACTOR);
}
throw e;
}
Expand Down Expand Up @@ -154,9 +155,11 @@ private void initSink(final Map<String, String> sinkAttributes) throws Exception
try {
outputPipeSink.close();
} catch (final Exception closeException) {
LOGGER.warn(
PipeLogger.log(
LOGGER::warn,
closeException,
ManagerMessages.FAILED_TO_CLOSE_SINK_AFTER_FAILED_TO_INITIALIZE_IT_IGNORE,
closeException);
closeException.getMessage());
}
throw e;
}
Expand Down Expand Up @@ -208,19 +211,19 @@ public void close() {
try {
source.close();
} catch (final Exception e) {
LOGGER.info(ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEEXTRACTOR, e);
PipeLogger.log(LOGGER::info, e, ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEEXTRACTOR);
}

try {
processor.close();
} catch (final Exception e) {
LOGGER.info(ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEPROCESSOR, e);
PipeLogger.log(LOGGER::info, e, ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEPROCESSOR);
}

try {
outputPipeSink.close();
} catch (final Exception e) {
LOGGER.info(ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPECONNECTOR, e);
PipeLogger.log(LOGGER::info, e, ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPECONNECTOR);
} finally {
// Should be after connector.close()
super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.ProcedureManager;
Expand Down Expand Up @@ -95,7 +96,8 @@ private synchronized void sync() {
isLastPipeSyncSuccessful = false;

if (configManager.getPipeManager().getPipeTaskCoordinator().isLocked()) {
LOGGER.warn(
PipeLogger.log(
LOGGER::warn,
ManagerMessages.PIPETASKCOORDINATORLOCK_IS_HELD_BY_ANOTHER_THREAD_SKIP_THIS_ROUND_OF_2);
return;
}
Expand All @@ -109,7 +111,9 @@ private synchronized void sync() {
== PipeConfig.getInstance().getPipeMetaSyncerAutoRestartPipeCheckIntervalRound()) {
somePipesNeedRestarting = autoRestartWithLock();
if (somePipesNeedRestarting) {
LOGGER.info(ManagerMessages.SOME_PIPES_NEED_RESTARTING_WILL_RESTART_THEM_AFTER_THIS_SYNC);
PipeLogger.log(
LOGGER::info,
ManagerMessages.SOME_PIPES_NEED_RESTARTING_WILL_RESTART_THEM_AFTER_THIS_SYNC);
}
pipeAutoRestartRoundCounter.set(0);
}
Expand All @@ -130,19 +134,22 @@ private synchronized void sync() {
if (handleMetaChangeStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
successfulSync = true;
} else {
LOGGER.warn(
PipeLogger.log(
LOGGER::warn,
ManagerMessages.FAILED_TO_HANDLE_PIPE_META_CHANGE_RESULT_STATUS,
handleMetaChangeStatus);
}
}

if (successfulSync) {
LOGGER.info(
PipeLogger.log(
LOGGER::info,
ManagerMessages.AFTER_THIS_SUCCESSFUL_SYNC_IF_PIPETASKINFO_IS_EMPTY_DURING_THIS);
isLastPipeSyncSuccessful = true;
}
} else {
LOGGER.warn(ManagerMessages.FAILED_TO_SYNC_PIPE_META_RESULT_STATUS, metaSyncStatus);
PipeLogger.log(
LOGGER::warn, ManagerMessages.FAILED_TO_SYNC_PIPE_META_RESULT_STATUS, metaSyncStatus);
}
}

Expand All @@ -158,7 +165,8 @@ private boolean autoRestartWithLock() {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
if (pipeTaskInfo == null) {
LOGGER.warn(ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_AUTO_RESTART_PIPE_TASK);
PipeLogger.log(
LOGGER::warn, ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_AUTO_RESTART_PIPE_TASK);
return false;
}
try {
Expand All @@ -176,15 +184,17 @@ private void checkAndRepairConsensusPipes() {
.getRegionMaintainHandler()
.checkAndRepairConsensusPipes();
} catch (Exception e) {
LOGGER.warn(ManagerMessages.FAILED_TO_CHECK_AND_REPAIR_CONSENSUS_PIPES, e);
PipeLogger.log(LOGGER::warn, e, ManagerMessages.FAILED_TO_CHECK_AND_REPAIR_CONSENSUS_PIPES);
}
}

private boolean handleSuccessfulRestartWithLock() {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
if (pipeTaskInfo == null) {
LOGGER.warn(ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_HANDLING_SUCCESSFUL_RESTART);
PipeLogger.log(
LOGGER::warn,
ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_HANDLING_SUCCESSFUL_RESTART);
return false;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
import org.apache.iotdb.confignode.manager.ConfigManager;
Expand Down Expand Up @@ -93,7 +94,8 @@ synchronized void parseHeartbeat(final int nodeId, final PipeHeartbeat pipeHeart
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
if (pipeTaskInfo == null) {
LOGGER.warn(
PipeLogger.log(
LOGGER::warn,
ManagerMessages.FAILED_TO_ACQUIRE_LOCK_WHEN_PARSEHEARTBEAT_FROM_NODE_ID,
nodeId);
return;
Expand Down Expand Up @@ -127,8 +129,10 @@ private int getExpectedHeartbeatNodeCount() {
configManager.getNodeManager().getRegisteredDataNodeCount()
+ (PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled() ? 1 : 0);
if (expectedNodeCount <= 0) {
LOGGER.warn(
ManagerMessages.EXPECTED_PIPE_HEARTBEAT_NODE_COUNT_IS_FALLBACK_TO_1, expectedNodeCount);
PipeLogger.log(
LOGGER::warn,
ManagerMessages.EXPECTED_PIPE_HEARTBEAT_NODE_COUNT_IS_FALLBACK_TO_1,
expectedNodeCount);
return 1;
}
return expectedNodeCount;
Expand All @@ -142,10 +146,6 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
final PipeStaticMeta staticMeta = pipeMetaFromCoordinator.getStaticMeta();
final PipeMeta pipeMetaFromAgent = pipeHeartbeat.getPipeMeta(staticMeta);
if (pipeMetaFromAgent == null) {
LOGGER.info(
ManagerMessages.PIPERUNTIMECOORDINATOR_MEETS_ERROR_IN_UPDATING_PIPEMETAKEEPER
+ "pipeMetaFromAgent is null, pipeMetaFromCoordinator: {}",
pipeMetaFromCoordinator);
continue;
}

Expand All @@ -157,8 +157,9 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
if (Boolean.TRUE.equals(isPipeCompletedFromAgent)) {

temporaryMeta.markDataNodeCompleted(nodeId);
LOGGER.info(
"Detected historical pipe completion report from DataNode {} for pipe {}. remainingEventCount: {}, remainingTime: {}, completedDataNodes: {}",
PipeLogger.log(
LOGGER::info,
ManagerMessages.DETECTED_HISTORICAL_PIPE_COMPLETION_REPORT_FROM_DATANODE,
nodeId,
staticMeta.getPipeName(),
pipeHeartbeat.getRemainingEventCount(staticMeta),
Expand All @@ -169,14 +170,16 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
uncompletedDataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds());
if (uncompletedDataNodeIds.isEmpty()) {
LOGGER.info(
"All DataNodes reported historical pipe {} completed. globalRemainingEventCount: {}, globalRemainingTime: {}, staticMeta: {}",
PipeLogger.log(
LOGGER::info,
ManagerMessages.ALL_DATANODES_REPORTED_HISTORICAL_PIPE_COMPLETED,
staticMeta.getPipeName(),
temporaryMeta.getGlobalRemainingEvents(),
temporaryMeta.getGlobalRemainingTime(),
staticMeta);
pipeTaskInfo.get().removePipeMeta(staticMeta.getPipeName());
LOGGER.info(
PipeLogger.log(
LOGGER::info,
ManagerMessages.DETECTED_COMPLETION_OF_PIPE_STATIC_META_REMOVE_IT,
staticMeta.getPipeName(),
staticMeta);
Expand Down Expand Up @@ -267,7 +270,9 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
needWriteConsensusOnConfigNodes.set(true);
needPushPipeMetaToDataNodes.set(false);

LOGGER.warn(
PipeLogger.log(
LOGGER::warn,
exception,
ManagerMessages.DETECT_PIPERUNTIMECRITICALEXCEPTION_FROM_AGENT_STOP_PIPE,
exception,
pipeName);
Expand Down Expand Up @@ -296,11 +301,13 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
needWriteConsensusOnConfigNodes.set(true);
needPushPipeMetaToDataNodes.set(false);

LOGGER.warn(
String.format(
"Detect PipeRuntimeConnectorCriticalException %s "
+ "from agent, stop pipe %s.",
exception, pipeName));
PipeLogger.log(
LOGGER::warn,
exception,
ManagerMessages
.DETECT_PIPERUNTIMESINKCRITICALEXCEPTION_FROM_AGENT_STOP_PIPE,
exception,
pipeName);
});
}
}
Expand Down
Loading
Loading