Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions be/src/information_schema/schema_cluster_snapshots_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaClusterSnapshotsScanner::_s_tbls_co
{"LABEL", TYPE_STRING, sizeof(StringRef), true},
{"MSG", TYPE_STRING, sizeof(StringRef), true},
{"COUNT", TYPE_INT, sizeof(int32_t), true},
{"VAULT_ID", TYPE_STRING, sizeof(StringRef), true},
};

SchemaClusterSnapshotsScanner::SchemaClusterSnapshotsScanner()
Expand Down Expand Up @@ -246,6 +247,19 @@ Status SchemaClusterSnapshotsScanner::_fill_block_impl(Block* block) {
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 11, datas));
}
// resource_id
{
for (int i = 0; i < row_num; ++i) {
auto& snapshot = _snapshots[i];
if (snapshot.has_resource_id()) {
strs[i] = StringRef(snapshot.resource_id().c_str(), snapshot.resource_id().size());
datas[i] = strs.data() + i;
} else {
datas[i] = nullptr;
}
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, datas));
}
return Status::OK();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ TEST_F(SchemaClusterSnapshotsScannerTest, test_get_next_block_internal) {
snapshot.set_ttl_seconds(3600);
snapshot.set_snapshot_label("label");
snapshot.set_reason("reason");
snapshot.set_resource_id("vault_1");
snapshots.push_back(snapshot);
}

Expand All @@ -62,6 +63,10 @@ TEST_F(SchemaClusterSnapshotsScannerTest, test_get_next_block_internal) {
auto col = data_block->safe_get_by_position(0);
auto v = (*col.column)[1].get<TYPE_STRING>();
EXPECT_EQ(v, "232ds");

auto vault_col = data_block->safe_get_by_position(12);
auto vault_id = (*vault_col.column)[1].get<TYPE_STRING>();
EXPECT_EQ(vault_id, "vault_1");
}

} // namespace doris
24 changes: 21 additions & 3 deletions docker/runtime/doris-compose/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@
LOG = utils.get_logger()


def is_true(value):
return str(value).strip().lower() in ("1", "true", "yes", "y", "on")


def get_env_value(envs, name):
for env in envs or []:
pos = env.find('=')
if pos == -1:
continue
if env[:pos] == name:
return env[pos + 1:]
return None


def get_cluster_path(cluster_name):
return os.path.join(LOCAL_DORIS_PATH, cluster_name)

Expand Down Expand Up @@ -397,6 +411,7 @@ def docker_env(self):
"STOP_GRACE": 1 if enable_coverage else 0,
"IS_CLOUD": 1 if self.cluster.is_cloud else 0,
"SQL_MODE_NODE_MGR": 1 if self.cluster.sql_mode_node_mgr else 0,
"ENABLE_STORAGE_VAULT": 1 if getattr(self.cluster, "enable_storage_vault", False) else 0,
"TDE_AK": self.get_tde_ak(),
"TDE_SK": self.get_tde_sk(),
}
Expand Down Expand Up @@ -909,7 +924,8 @@ def __init__(self, name, subnet, image, is_cloud, is_root_user, fe_config,
local_network_ip, fe_follower, be_disks, be_cluster, reg_be,
extra_hosts, env, coverage_dir, cloud_store_config,
sql_mode_node_mgr, be_metaservice_endpoint, be_cluster_id, tde_ak, tde_sk,
external_ms_cluster, instance_id, cluster_snapshot=""):
external_ms_cluster, instance_id, cluster_snapshot="",
enable_storage_vault=False):
self.name = name
self.subnet = subnet
self.image = image
Expand All @@ -935,6 +951,7 @@ def __init__(self, name, subnet, image, is_cloud, is_root_user, fe_config,
self.instance_id = f"instance_{name}" if self.external_ms_cluster else "default_instance_id"
# cluster_snapshot is not persisted to meta, only used during cluster creation
self.cluster_snapshot = cluster_snapshot
self.enable_storage_vault = is_true(enable_storage_vault)
self.is_rollback = False
self.groups = {
node_type: Group(node_type)
Expand All @@ -955,7 +972,8 @@ def new(name, image, is_cloud, is_root_user, fe_config, be_config,
fe_follower, be_disks, be_cluster, reg_be, extra_hosts, env,
coverage_dir, cloud_store_config, sql_mode_node_mgr,
be_metaservice_endpoint, be_cluster_id, tde_ak, tde_sk,
external_ms_cluster, instance_id, cluster_snapshot=""):
external_ms_cluster, instance_id, cluster_snapshot="",
enable_storage_vault=False):
if not os.path.exists(LOCAL_DORIS_PATH):
os.makedirs(LOCAL_DORIS_PATH, exist_ok=True)
os.chmod(LOCAL_DORIS_PATH, 0o777)
Expand All @@ -971,7 +989,7 @@ def new(name, image, is_cloud, is_root_user, fe_config, be_config,
coverage_dir, cloud_store_config,
sql_mode_node_mgr, be_metaservice_endpoint,
be_cluster_id, tde_ak, tde_sk, external_ms_cluster,
instance_id, cluster_snapshot)
instance_id, cluster_snapshot, enable_storage_vault)
os.makedirs(cluster.get_path(), exist_ok=True)
os.makedirs(get_status_path(name), exist_ok=True)
cluster._save_meta()
Expand Down
46 changes: 32 additions & 14 deletions docker/runtime/doris-compose/resource/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -153,20 +153,38 @@ create_doris_instance() {

lock_cluster

output=$(curl -s "${META_SERVICE_ENDPOINT}/MetaService/http/create_instance?token=greedisgood9999" \
-d '{"instance_id":"'"${INSTANCE_ID}"'",
"name": "'"${INSTANCE_ID}"'",
"user_id": "'"${DORIS_CLOUD_USER}"'",
"obj_info": {
"ak": "'"${DORIS_CLOUD_AK}"'",
"sk": "'"${DORIS_CLOUD_SK}"'",
"bucket": "'"${DORIS_CLOUD_BUCKET}"'",
"endpoint": "'"${DORIS_CLOUD_ENDPOINT}"'",
"external_endpoint": "'"${DORIS_CLOUD_EXTERNAL_ENDPOINT}"'",
"prefix": "'"${DORIS_CLOUD_PREFIX}"'",
"region": "'"${DORIS_CLOUD_REGION}"'",
"provider": "'"${DORIS_CLOUD_PROVIDER}"'"
}}')
if [[ "${ENABLE_STORAGE_VAULT}" =~ ^([Tt][Rr][Uu][Ee]|[Yy][Ee][Ss]|[Yy]|[Oo][Nn]|1)$ ]]; then
Comment thread
wyxxxcat marked this conversation as resolved.
output=$(curl -s "${META_SERVICE_ENDPOINT}/MetaService/http/create_instance?token=greedisgood9999" \
-d '{"instance_id":"'"${INSTANCE_ID}"'",
"name": "'"${INSTANCE_ID}"'",
"user_id": "'"${DORIS_CLOUD_USER}"'",
"vault": {
"obj_info": {
"ak": "'"${DORIS_CLOUD_AK}"'",
"sk": "'"${DORIS_CLOUD_SK}"'",
"bucket": "'"${DORIS_CLOUD_BUCKET}"'",
"endpoint": "'"${DORIS_CLOUD_ENDPOINT}"'",
"external_endpoint": "'"${DORIS_CLOUD_EXTERNAL_ENDPOINT}"'",
"prefix": "'"${DORIS_CLOUD_PREFIX}"'",
"region": "'"${DORIS_CLOUD_REGION}"'",
"provider": "'"${DORIS_CLOUD_PROVIDER}"'"
}}}')
else
output=$(curl -s "${META_SERVICE_ENDPOINT}/MetaService/http/create_instance?token=greedisgood9999" \
-d '{"instance_id":"'"${INSTANCE_ID}"'",
"name": "'"${INSTANCE_ID}"'",
"user_id": "'"${DORIS_CLOUD_USER}"'",
"obj_info": {
"ak": "'"${DORIS_CLOUD_AK}"'",
"sk": "'"${DORIS_CLOUD_SK}"'",
"bucket": "'"${DORIS_CLOUD_BUCKET}"'",
"endpoint": "'"${DORIS_CLOUD_ENDPOINT}"'",
"external_endpoint": "'"${DORIS_CLOUD_EXTERNAL_ENDPOINT}"'",
"prefix": "'"${DORIS_CLOUD_PREFIX}"'",
"region": "'"${DORIS_CLOUD_REGION}"'",
"provider": "'"${DORIS_CLOUD_PROVIDER}"'"
}}')
fi

unlock_cluster

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,7 @@ public class SchemaTable extends Table {
.column("LABEL", ScalarType.createStringType())
.column("MSG", ScalarType.createStringType())
.column("COUNT", ScalarType.createType(PrimitiveType.INT))
.column("VAULT_ID", ScalarType.createStringType())
Comment thread
wyxxxcat marked this conversation as resolved.
.build()))
.put("cluster_snapshot_properties",
new SchemaTable(SystemIdGenerator.getNextId(), "cluster_snapshot_properties", TableType.SCHEMA,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ protected void runAfterCatalogReady() {
// do nothing
}

public void submitJob(long ttl, String label) throws Exception {
public void submitJob(long ttl, String label, String vaultName) throws Exception {
Comment thread
wyxxxcat marked this conversation as resolved.
Comment thread
wyxxxcat marked this conversation as resolved.
throw new NotImplementedException("submitJob is not implemented");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ public class AdminCreateClusterSnapshotCommand extends Command implements Forwar

public static final String PROP_TTL = "ttl";
public static final String PROP_LABEL = "label";
public static final String PROP_VAULT_NAME = "vault_name";
private static final Logger LOG = LogManager.getLogger(AdminCreateClusterSnapshotCommand.class);

private Map<String, String> properties;
private long ttl;
private String label = null;
private String vaultName = null;

/**
* AdminCreateClusterSnapshotCommand
Expand All @@ -64,7 +66,7 @@ public AdminCreateClusterSnapshotCommand(Map<String, String> properties) {
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
validate(ctx);
CloudSnapshotHandler cloudSnapshotHandler = ((CloudEnv) ctx.getEnv()).getCloudSnapshotHandler();
cloudSnapshotHandler.submitJob(ttl, label);
cloudSnapshotHandler.submitJob(ttl, label, vaultName);
}

/**
Expand Down Expand Up @@ -106,6 +108,11 @@ public void validate(ConnectContext ctx) throws AnalysisException {
if (label == null || label.isEmpty()) {
throw new AnalysisException("Property 'label' cannot be empty");
}
} else if (entry.getKey().equalsIgnoreCase(PROP_VAULT_NAME)) {
vaultName = entry.getValue();
if (vaultName == null || vaultName.isEmpty()) {
throw new AnalysisException("Property 'vault_name' cannot be empty");
}
} else {
throw new AnalysisException("Unknown property: " + entry.getKey());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,13 @@ public void testValidateNormal() throws Exception {
properties.add(Pair.of(ImmutableMap.of("ttl", "a", "label", "a"), "Invalid value"));
properties.add(Pair.of(ImmutableMap.of("ttl", "0", "label", "a"), "Property 'ttl' must be positive"));
properties.add(Pair.of(ImmutableMap.of("ttl", "3600", "label", ""), "Property 'label' cannot be empty"));
properties.add(Pair.of(ImmutableMap.of("ttl", "3600", "label", "a", "vault_name", ""),
"Property 'vault_name' cannot be empty"));
// unknown property
properties.add(Pair.of(ImmutableMap.of("ttl", "0", "a", "b"), "Unknown property"));
// normal case
properties.add(Pair.of(ImmutableMap.of("ttl", "3600", "label", "abc"), ""));
properties.add(Pair.of(ImmutableMap.of("ttl", "3600", "label", "abc", "vault_name", "vault_1"), ""));

for (Pair<Map<String, String>, String> entry : properties) {
AdminCreateClusterSnapshotCommand command0 = new AdminCreateClusterSnapshotCommand(entry.first);
Expand Down
2 changes: 2 additions & 0 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2095,6 +2095,7 @@ message BeginSnapshotRequest {
optional int64 timeout_seconds = 4;
optional int64 ttl_seconds = 5;
optional string request_ip = 6;
optional string vault_name = 7;
}

message BeginSnapshotResponse {
Expand Down Expand Up @@ -2163,6 +2164,7 @@ message SnapshotInfoPB {
optional int64 snapshot_logical_data_size = 17;
optional int64 snapshot_retained_data_size = 18;
optional int64 snapshot_billable_data_size = 19;
optional string resource_id = 20;
}

message ListSnapshotRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ class ClusterOptions {
// Example: clusterSnapshot = '{"cloud_unique_id":"1:instance_id:xxx"}'
String clusterSnapshot = null;

// Create cloud instance in storage-vault mode instead of legacy obj_info mode.
// Docker framework will also create a default storage vault automatically for new clusters.
Boolean enableStorageVault = false;

void enableDebugPoints() {
feConfigs.add('enable_debug_points=true')
beConfigs.add('enable_debug_points=true')
Expand Down Expand Up @@ -372,9 +376,13 @@ class SuiteCluster {
cmd += ['--extra-hosts']
cmd += options.extraHosts
}
if (!options.environments.isEmpty()) {
def envs = new ArrayList<String>(options.environments)
if (options.enableStorageVault) {
Comment thread
wyxxxcat marked this conversation as resolved.
envs.add('ENABLE_STORAGE_VAULT=1')
}
if (!envs.isEmpty()) {
cmd += ['--env']
cmd += options.environments
cmd += envs
}
if (config.dockerCoverageOutputDir != null && config.dockerCoverageOutputDir != '') {
cmd += ['--coverage-dir', config.dockerCoverageOutputDir]
Expand Down
Loading