Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.calc.service.AbstractTemporaryQueryDataFileService;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.utils.IOUtils;

import org.apache.tsfile.utils.PublicBAOS;

Expand Down Expand Up @@ -60,7 +61,7 @@ default void deserialize() throws IOException {
}
init();
ByteBuffer byteBuffer = ByteBuffer.allocate(recorder.getSerializedByteLength());
recorder.getFileChannel().read(byteBuffer);
IOUtils.readFully(recorder.getFileChannel(), byteBuffer);
byteBuffer.flip();
deserialize(byteBuffer);
recorder.closeFile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.calc.utils.datastructure.MergeSortKey;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.tsfile.common.conf.TSFileDescriptor;
Expand Down Expand Up @@ -93,10 +94,11 @@ private long read() throws IoTDBException {
if (readLen == -1) {
return -1;
}
IOUtils.readFully(fileChannel, bytes);
bytes.flip();
int capacity = bytes.getInt();
ByteBuffer tsBlockBytes = ByteBuffer.allocate(capacity);
fileChannel.read(tsBlockBytes);
IOUtils.readFully(fileChannel, tsBlockBytes);
tsBlockBytes.flip();
TsBlock cachedTsBlock = serde.deserialize(tsBlockBytes);
cacheBlocks.add(cachedTsBlock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
Expand Down Expand Up @@ -239,7 +240,7 @@ private ServiceInfo deserializeServiceInfoConsiderCRC(InputStream inputStream)
throws IOException {
int length = ReadWriteIOUtils.readInt(inputStream);
byte[] bytes = new byte[length];
inputStream.read(bytes);
new DataInputStream(inputStream).readFully(bytes);

crc32.reset();
crc32.update(bytes, 0, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
Expand Down Expand Up @@ -165,8 +166,16 @@ private static Optional<Procedure> loadProcedure(Path procedureFilePath) {
try (FileInputStream fis = new FileInputStream(procedureFilePath.toFile())) {
Procedure procedure = null;
try (FileChannel channel = fis.getChannel()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(PROCEDURE_LOAD_BUFFER_SIZE);
if (channel.read(byteBuffer) > 0) {
final long fileSize = channel.size();
if (fileSize > PROCEDURE_LOAD_BUFFER_SIZE) {
throw new IOException(
String.format(
"Procedure file %s exceeds the load buffer limit %s, actual size %s",
procedureFilePath, PROCEDURE_LOAD_BUFFER_SIZE, fileSize));
}
ByteBuffer byteBuffer = ByteBuffer.allocate((int) fileSize);
if (fileSize > 0) {
IOUtils.readFully(channel, byteBuffer);
byteBuffer.flip();
procedure = ProcedureFactory.getInstance().create(byteBuffer);
byteBuffer.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,7 @@ public boolean hasNext() {
}
buffer = new byte[logSize];

int readLen = logStream.read(buffer, 0, logSize);
if (readLen < logSize) {
throw new IOException(ConfigNodeMessages.REACH_EOF);
}
logStream.readFully(buffer, 0, logSize);

final long checkSum = logStream.readLong();
checkSummer.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.pipe.consensus.deletion.recover;

import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
Expand Down Expand Up @@ -60,7 +61,7 @@ public List<DeletionResource> readAllDeletions() throws IOException {
try {
// Read magic string
ByteBuffer magicStringBuffer = ByteBuffer.allocate(MAGIC_STRING_BYTES_SIZE);
fileChannel.read(magicStringBuffer);
IOUtils.readFully(fileChannel, magicStringBuffer);
magicStringBuffer.flip();
String magicVersion = new String(magicStringBuffer.array(), StandardCharsets.UTF_8);
if (LOGGER.isDebugEnabled()) {
Expand All @@ -70,7 +71,7 @@ public List<DeletionResource> readAllDeletions() throws IOException {
// Read deletions
long remainingBytes = fileChannel.size() - fileChannel.position();
ByteBuffer byteBuffer = ByteBuffer.allocate((int) remainingBytes);
fileChannel.read(byteBuffer);
IOUtils.readFully(fileChannel, byteBuffer);
byteBuffer.flip();

List<DeletionResource> deletions = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public static ObjectNode deserializeFromWAL(ByteBuffer buffer) {
if (objectFile.isPresent()) {
try (RandomAccessFile raf = new RandomAccessFile(objectFile.get(), "r")) {
raf.seek(offset);
raf.read(contents);
raf.readFully(contents);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -308,7 +308,7 @@ public ByteBuffer serialize() {
private void readContentFromFile(File file, byte[] contents) throws IOException {
try (RandomAccessFile raf = new RandomAccessFile(file, "r")) {
raf.seek(offset);
raf.read(contents);
raf.readFully(contents);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -69,9 +68,7 @@ public T deserialize(InputStream inputStream) throws IOException {
}

byte[] logBuffer = new byte[logLength];
if (logLength < inputStream.read(logBuffer, 0, logLength)) {
throw new EOFException();
}
dataInputStream.readFully(logBuffer, 0, logLength);

T result = deserializer.deserialize(ByteBuffer.wrap(logBuffer));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.ConsensusFactory;
Expand Down Expand Up @@ -360,7 +361,7 @@ private void initFileHeader() throws IOException, MetadataException {
lastSGAddr = 0L;
pageManager = new BTreePageManager(channel, pmtFile, -1, logPath);
} else {
channel.read(headerContent);
IOUtils.readFully(channel, headerContent);
headerContent.clear();
lastPageIndex = ReadWriteIOUtils.readInt(headerContent);
dataTTL = ReadWriteIOUtils.readLong(headerContent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -91,8 +93,9 @@ public List<byte[]> collectUpdatedEntries() throws IOException, SchemaFileLogCor
}
}

// corrupted within one entry
if (inputStream.read(tempBytes, 1, tempBytes.length - 1) < tempBytes.length - 2) {
try {
new DataInputStream(inputStream).readFully(tempBytes, 1, tempBytes.length - 1);
} catch (EOFException e) {
throw new SchemaFileLogCorruptedException(
logFile.getAbsolutePath(), DataNodeSchemaMessages.SCHEMA_FILE_LOG_INCOMPLETE_ENTRY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.pagemgr;

import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.ISchemaPage;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.SchemaFileConfig;
import org.apache.iotdb.db.schemaengine.schemaregion.mtree.impl.pbtree.schemafile.log.SchemaFileLogReader;
Expand Down Expand Up @@ -107,7 +108,7 @@ public void loadFromFileToBuffer(ByteBuffer dst, int pageIndex) throws IOExcepti
if (!readChannel.isOpen()) {
readChannel = FileChannel.open(pmtFile.toPath(), StandardOpenOption.READ);
}
readChannel.read(dst, getPageAddress(pageIndex));
IOUtils.readFully(readChannel, dst, getPageAddress(pageIndex));
}

// region Flush Strategy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.utils.IOUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.i18n.DataNodeSchemaMessages;

Expand Down Expand Up @@ -116,7 +117,7 @@ public static ByteBuffer parseByteBuffer(FileChannel fileChannel, long position)
throws IOException {
// Read the first block
ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
fileChannel.read(byteBuffer, position);
IOUtils.readFully(fileChannel, byteBuffer, position);
byteBuffer.flip();
if (byteBuffer.limit() > 0) { // This indicates that there is data at this position
int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int
Expand All @@ -131,7 +132,7 @@ public static ByteBuffer parseByteBuffer(FileChannel fileChannel, long position)
// read one offset, then use filechannel's read to read it
byteBuffers.position(MAX_LENGTH * i);
byteBuffers.limit(MAX_LENGTH * (i + 1));
fileChannel.read(byteBuffers, nextPosition);
IOUtils.readFully(fileChannel, byteBuffers, nextPosition);
byteBuffers.position(4 + i * Long.BYTES);
}
byteBuffers.limit(byteBuffers.capacity());
Expand All @@ -146,7 +147,10 @@ private List<Long> parseOffsetList(long position) throws IOException {
blockOffset.add(position);
// Read the first block
ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_LENGTH);
fileChannel.read(byteBuffer, position);
if (position == fileChannel.size()) {
return blockOffset;
}
IOUtils.readFully(fileChannel, byteBuffer, position);
byteBuffer.flip();
if (byteBuffer.limit() > 0) { // This indicates that there is data at this position
int firstInt = ReadWriteIOUtils.readInt(byteBuffer); // first int
Expand All @@ -169,7 +173,7 @@ private List<Long> parseOffsetList(long position) throws IOException {
// read
blockBuffer.position(MAX_LENGTH * i);
blockBuffer.limit(MAX_LENGTH * (i + 1));
fileChannel.read(blockBuffer, blockOffset.get(i));
IOUtils.readFully(fileChannel, blockBuffer, blockOffset.get(i));
blockBuffer.position(4 + i * Long.BYTES);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
import org.apache.tsfile.utils.ReadWriteIOUtils;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -64,7 +65,11 @@ public long serialize(ByteBuffer buffer) {
}

public static IDPredicateType deserialize(InputStream stream) throws IOException {
return values()[stream.read()];
int typeNum = stream.read();
if (typeNum == -1) {
throw new EOFException();
}
return values()[typeNum];
}

public static IDPredicateType deserialize(ByteBuffer buffer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.wal.io;

import org.apache.iotdb.commons.utils.IOUtils;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -63,7 +65,7 @@ public static WALFileVersion getVersion(FileChannel channel) throws IOException
continue;
}
ByteBuffer buffer = ByteBuffer.allocate(version.versionBytes.length);
channel.read(buffer);
IOUtils.readFully(channel, buffer);
buffer.flip();
String versionString = new String(buffer.array(), StandardCharsets.UTF_8);
if (version.versionString.equals(versionString)) {
Expand Down
Loading
Loading