diff --git a/hugegraph-server/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBTable.java b/hugegraph-server/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBTable.java index 824986d22c..1c30e434ab 100644 --- a/hugegraph-server/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBTable.java +++ b/hugegraph-server/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBTable.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Set; @@ -209,6 +210,10 @@ protected BackendColumnIterator queryByIds(RocksDBSessions.Session session, return this.queryById(session, ids.iterator().next()); } + if (!session.hasChanges()) { + return this.getByIds(session, ids); + } + // NOTE: this will lead to lazy create rocksdb iterator return BackendColumnIterator.wrap(new FlatMapperIterator<>( ids.iterator(), id -> this.queryById(session, id) @@ -224,13 +229,15 @@ protected BackendColumnIterator getById(RocksDBSessions.Session session, Id id) return BackendColumnIterator.iterator(col); } - protected BackendColumnIterator getByIds(RocksDBSessions.Session session, Set ids) { + protected BackendColumnIterator getByIds(RocksDBSessions.Session session, + Collection ids) { if (ids.size() == 1) { return this.getById(session, ids.iterator().next()); } - List keys = new ArrayList<>(ids.size()); - for (Id id : ids) { + Collection uniqueIds = ids instanceof Set ? ids : new LinkedHashSet<>(ids); + List keys = new ArrayList<>(uniqueIds.size()); + for (Id id : uniqueIds) { keys.add(id.asBytes()); } return session.get(this.table(), keys); @@ -309,7 +316,7 @@ protected static BackendEntryIterator newEntryIterator(BackendColumnIterator col } protected static BackendEntryIterator newEntryIteratorOlap( - BackendColumnIterator cols, Query query, boolean isOlap) { + BackendColumnIterator cols, Query query, boolean isOlap) { return new BinaryEntryIterator<>(cols, query, (entry, col) -> { if (entry == null || !entry.belongToMe(col)) { HugeType type = query.resultType(); diff --git a/hugegraph-server/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBTables.java b/hugegraph-server/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBTables.java index 37cc2f151c..01e908c4af 100644 --- a/hugegraph-server/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBTables.java +++ b/hugegraph-server/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBTables.java @@ -19,7 +19,6 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.Collection; import java.util.List; import org.apache.hugegraph.backend.id.Id; @@ -178,13 +177,6 @@ public Vertex(String database) { protected BackendColumnIterator queryById(RocksDBSessions.Session session, Id id) { return this.getById(session, id); } - - @Override - protected BackendColumnIterator queryByIds(RocksDBSessions.Session session, - Collection ids) { - // TODO: use getByIds() after batch version multi-get is ready - return super.queryByIds(session, ids); - } } public static class Edge extends RocksDBTable { diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java index f9f20ab9e5..9ecca5b783 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java @@ -53,6 +53,7 @@ import org.apache.hugegraph.unit.rocksdb.RocksDBCountersTest; import org.apache.hugegraph.unit.rocksdb.RocksDBSessionTest; import org.apache.hugegraph.unit.rocksdb.RocksDBSessionsTest; +import org.apache.hugegraph.unit.rocksdb.RocksDBTableQueryByIdsTest; import org.apache.hugegraph.unit.serializer.BinaryBackendEntryTest; import org.apache.hugegraph.unit.serializer.BinaryScatterSerializerTest; import org.apache.hugegraph.unit.serializer.BinarySerializerTest; @@ -141,6 +142,7 @@ RocksDBSessionsTest.class, RocksDBSessionTest.class, RocksDBCountersTest.class, + RocksDBTableQueryByIdsTest.class, /* utils */ VersionTest.class, diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/rocksdb/RocksDBTableQueryByIdsTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/rocksdb/RocksDBTableQueryByIdsTest.java new file mode 100644 index 0000000000..020020e35c --- /dev/null +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/rocksdb/RocksDBTableQueryByIdsTest.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.unit.rocksdb; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.id.IdGenerator; +import org.apache.hugegraph.backend.store.BackendEntry.BackendColumn; +import org.apache.hugegraph.backend.store.BackendEntry.BackendColumnIterator; +import org.apache.hugegraph.backend.store.rocksdb.RocksDBSessions; +import org.apache.hugegraph.backend.store.rocksdb.RocksDBTables; +import org.apache.hugegraph.testutil.Assert; +import org.junit.Before; +import org.junit.Test; +import org.rocksdb.RocksDBException; + +public class RocksDBTableQueryByIdsTest extends BaseRocksDBUnitTest { + + private static final String DATABASE = "db"; + + private TestVertexTable vertexTable; + private TestEdgeTable edgeOutTable; + private TestEdgeTable edgeInTable; + + @Override + @Before + public void setup() throws RocksDBException { + super.setup(); + this.vertexTable = new TestVertexTable(DATABASE); + this.edgeOutTable = new TestEdgeTable(true, DATABASE); + this.edgeInTable = new TestEdgeTable(false, DATABASE); + this.rocks.createTable(this.vertexTable.table()); + this.rocks.createTable(this.edgeOutTable.table()); + this.rocks.createTable(this.edgeInTable.table()); + } + + @Test + public void testVertexQueryByIdsWithAllExistingIds() { + Id id1 = IdGenerator.of("v1"); + Id id2 = IdGenerator.of("v2"); + Id id3 = IdGenerator.of("v3"); + + this.rocks.session().put(this.vertexTable.table(), id1.asBytes(), getBytes("value1")); + this.rocks.session().put(this.vertexTable.table(), id2.asBytes(), getBytes("value2")); + this.rocks.session().put(this.vertexTable.table(), id3.asBytes(), getBytes("value3")); + this.commit(); + + List ids = Arrays.asList(id1, id2, id3); + BackendColumnIterator iter = this.vertexTable.queryByIds(this.rocks.session(), ids); + + Map results = toResultMap(iter); + + Assert.assertEquals(3, results.size()); + Assert.assertEquals("value1", results.get("v1")); + Assert.assertEquals("value2", results.get("v2")); + Assert.assertEquals("value3", results.get("v3")); + } + + @Test + public void testVertexQueryByIdsWithExistingAndMissingIdsMixed() { + Id id1 = IdGenerator.of("v1"); + Id id2 = IdGenerator.of("v2"); + Id id3 = IdGenerator.of("v3"); + + this.rocks.session().put(this.vertexTable.table(), id1.asBytes(), getBytes("value1")); + this.rocks.session().put(this.vertexTable.table(), id3.asBytes(), getBytes("value3")); + this.commit(); + + List ids = Arrays.asList(id1, id2, id3); + BackendColumnIterator iter = this.vertexTable.queryByIds(this.rocks.session(), ids); + + Map results = toResultMap(iter); + + Assert.assertEquals(2, results.size()); + Assert.assertEquals("value1", results.get("v1")); + Assert.assertEquals("value3", results.get("v3")); + Assert.assertFalse(results.containsKey("v2")); + } + + @Test + public void testVertexQueryByIdsDedupsDuplicateIds() { + Id id1 = IdGenerator.of("v1"); + Id id2 = IdGenerator.of("v2"); + + this.rocks.session().put(this.vertexTable.table(), id1.asBytes(), getBytes("value1")); + this.rocks.session().put(this.vertexTable.table(), id2.asBytes(), getBytes("value2")); + this.commit(); + + List ids = Arrays.asList(id1, id2, id1); + BackendColumnIterator iter = this.vertexTable.queryByIds(this.rocks.session(), ids); + + Map results = toResultMap(iter); + + Assert.assertEquals(2, results.size()); + Assert.assertEquals("value1", results.get("v1")); + Assert.assertEquals("value2", results.get("v2")); + } + + @Test + public void testEdgeOutQueryByIdsWithAllExistingIds() { + Id id1 = IdGenerator.of("e1"); + Id id2 = IdGenerator.of("e2"); + + this.rocks.session().put(this.edgeOutTable.table(), id1.asBytes(), getBytes("edge-value1")); + this.rocks.session().put(this.edgeOutTable.table(), id2.asBytes(), getBytes("edge-value2")); + this.commit(); + + List ids = Arrays.asList(id1, id2); + BackendColumnIterator iter = this.edgeOutTable.queryByIds(this.rocks.session(), ids); + + Map results = toResultMap(iter); + + Assert.assertEquals(2, results.size()); + Assert.assertEquals("edge-value1", results.get("e1")); + Assert.assertEquals("edge-value2", results.get("e2")); + } + + @Test + public void testEdgeInQueryByIdsWithAllExistingIds() { + Id id1 = IdGenerator.of("e1"); + Id id2 = IdGenerator.of("e2"); + + this.rocks.session().put(this.edgeInTable.table(), id1.asBytes(), getBytes("edge-value1")); + this.rocks.session().put(this.edgeInTable.table(), id2.asBytes(), getBytes("edge-value2")); + this.commit(); + + List ids = Arrays.asList(id1, id2); + BackendColumnIterator iter = this.edgeInTable.queryByIds(this.rocks.session(), ids); + + Map results = toResultMap(iter); + + Assert.assertEquals(2, results.size()); + Assert.assertEquals("edge-value1", results.get("e1")); + Assert.assertEquals("edge-value2", results.get("e2")); + } + + @Test + public void testVertexQueryByIdsFallbackWhenHasChanges() { + Id id1 = IdGenerator.of("v1"); + Id id2 = IdGenerator.of("v2"); + + this.rocks.session().put(this.vertexTable.table(), id1.asBytes(), getBytes("value1")); + this.rocks.session().put(this.vertexTable.table(), id2.asBytes(), getBytes("value2")); + this.commit(); + + List ids = Arrays.asList(id1, id2); + RocksDBSessions.Session mockSession = new DelegatingSession(this.rocks.session()) { + @Override + public boolean hasChanges() { + return true; + } + + @Override + public BackendColumnIterator get(String table, List keys) { + throw new AssertionError( + "multi-get should not be called when hasChanges"); + } + }; + + BackendColumnIterator iter = this.vertexTable.queryByIds(mockSession, ids); + + Map results = toResultMap(iter); + Assert.assertEquals(2, results.size()); + Assert.assertEquals("value1", results.get("v1")); + Assert.assertEquals("value2", results.get("v2")); + } + + private Map toResultMap(BackendColumnIterator iter) { + Map results = new HashMap<>(); + while (iter.hasNext()) { + BackendColumn col = iter.next(); + results.put(getString(col.name), getString(col.value)); + } + return results; + } + + /** + * A session wrapper that delegates all operations to an underlying session. + * Subclasses can override specific methods for mocking purposes. + */ + private static class DelegatingSession extends RocksDBSessions.Session { + + private final RocksDBSessions.Session delegate; + + DelegatingSession(RocksDBSessions.Session delegate) { + this.delegate = delegate; + } + + @Override + public String dataPath() { + return this.delegate.dataPath(); + } + + @Override + public String walPath() { + return this.delegate.walPath(); + } + + @Override + public String property(String table, String property) { + return this.delegate.property(table, property); + } + + @Override + public Pair keyRange(String table) { + return this.delegate.keyRange(table); + } + + @Override + public void compactRange(String table) { + this.delegate.compactRange(table); + } + + @Override + public void put(String table, byte[] key, byte[] value) { + this.delegate.put(table, key, value); + } + + @Override + public void merge(String table, byte[] key, byte[] value) { + this.delegate.merge(table, key, value); + } + + @Override + public void increase(String table, byte[] key, byte[] value) { + this.delegate.increase(table, key, value); + } + + @Override + public void delete(String table, byte[] key) { + this.delegate.delete(table, key); + } + + @Override + public void deleteSingle(String table, byte[] key) { + this.delegate.deleteSingle(table, key); + } + + @Override + public void deletePrefix(String table, byte[] key) { + this.delegate.deletePrefix(table, key); + } + + @Override + public void deleteRange(String table, byte[] keyFrom, byte[] keyTo) { + this.delegate.deleteRange(table, keyFrom, keyTo); + } + + @Override + public byte[] get(String table, byte[] key) { + return this.delegate.get(table, key); + } + + @Override + public BackendColumnIterator get(String table, List keys) { + return this.delegate.get(table, keys); + } + + @Override + public BackendColumnIterator scan(String table) { + return this.delegate.scan(table); + } + + @Override + public BackendColumnIterator scan(String table, byte[] prefix) { + return this.delegate.scan(table, prefix); + } + + @Override + public BackendColumnIterator scan(String table, byte[] keyFrom, + byte[] keyTo, int scanType) { + return this.delegate.scan(table, keyFrom, keyTo, scanType); + } + + @Override + public Object commit() { + return this.delegate.commit(); + } + + @Override + public void rollback() { + this.delegate.rollback(); + } + + @Override + public boolean hasChanges() { + return this.delegate.hasChanges(); + } + + @Override + public void open() { + this.delegate.open(); + } + + @Override + public void close() { + this.delegate.close(); + } + } + + /** + * Subclass that exposes the protected queryByIds for testing. + */ + private static class TestVertexTable extends RocksDBTables.Vertex { + + public TestVertexTable(String database) { + super(database); + } + + @Override + public BackendColumnIterator queryByIds(RocksDBSessions.Session session, + Collection ids) { + return super.queryByIds(session, ids); + } + } + + /** + * Subclass that exposes the protected queryByIds for testing. + */ + private static class TestEdgeTable extends RocksDBTables.Edge { + + public TestEdgeTable(boolean out, String database) { + super(out, database); + } + + @Override + public BackendColumnIterator queryByIds(RocksDBSessions.Session session, + Collection ids) { + return super.queryByIds(session, ids); + } + } +}