Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions man/ltfs_ordered_copy.1
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ Configure verbosity of logger. VERBOSE shall be 0-6. (Default: 4)
.TP
\fB-q, --quiet\fR
No message outout
.TP
\fB--store-hash\fR
Compute a content hash of each copied file and store it in the \fBltfs.hash.\fR\fIHASHTYPE\fR extended attribute on the destination, as defined by the LTFS Format Specification 2.4 (Annex F, Table F.1). This is intended for LTFS destinations, which persist the value into the index. The hash is stored as a UTF-8 hex string of the length the spec requires for the type; if the hash cannot be stored the file is treated as failed.
.TP
\fB--hash-algo\fR \fIHASHTYPE\fR
LTFS hash type to use with \fB--store-hash\fR (LTFS Format Spec 2.4, Table F.1). One of \fBcrc32sum\fR, \fBmd5sum\fR, \fBsha1sum\fR, \fBsha256sum\fR, \fBsha512sum\fR; the bare names \fBcrc32\fR, \fBmd5\fR, \fBsha1\fR, \fBsha256\fR, \fBsha512\fR are accepted as aliases. Defaults to \fBsha256sum\fR.
.TP
\fB--store-hash-all\fR
Like \fB--store-hash\fR, but compute and store every standardized LTFS hash type (\fBcrc32sum\fR, \fBmd5sum\fR, \fBsha1sum\fR, \fBsha256sum\fR, \fBsha512sum\fR) for each file. Each file is read once and the hash types are computed in parallel, one worker thread per type. Overrides \fB--hash-algo\fR.
.SH "COMMAND EXAMPLES"
.PP
This section shows various command examples.
Expand Down
18 changes: 18 additions & 0 deletions man/sgml/ltfs_ordered_copy.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,24 @@
<para>No message outout</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--store-hash</option></term>
<listitem>
<para>Compute a content hash of each copied file and store it in the <literal>ltfs.hash.<replaceable>HASHTYPE</replaceable></literal> extended attribute on the destination, as defined by the LTFS Format Specification 2.4 (Annex F, Table F.1). This is intended for LTFS destinations, which persist the value into the index. The hash is stored as a UTF-8 hex string of the length the spec requires for the type; if the hash cannot be stored the file is treated as failed.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--hash-algo</option> <replaceable>HASHTYPE</replaceable></term>
<listitem>
<para>LTFS hash type to use with <option>--store-hash</option> (LTFS Format Spec 2.4, Table F.1). One of <literal>crc32sum</literal>, <literal>md5sum</literal>, <literal>sha1sum</literal>, <literal>sha256sum</literal>, <literal>sha512sum</literal>; the bare names crc32, md5, sha1, sha256, sha512 are accepted as aliases. Defaults to <literal>sha256sum</literal>.</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--store-hash-all</option></term>
<listitem>
<para>Like <option>--store-hash</option>, but compute and store every standardized LTFS hash type (<literal>crc32sum</literal>, <literal>md5sum</literal>, <literal>sha1sum</literal>, <literal>sha256sum</literal>, <literal>sha512sum</literal>) for each file. Each file is read once and the hash types are computed in parallel, one worker thread per type. Overrides <option>--hash-algo</option>.</para>
</listitem>
</varlistentry>
</variablelist>
</refsect1>

Expand Down
189 changes: 184 additions & 5 deletions src/utils/ltfs_ordered_copy
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,134 @@ import argparse
import xattr
import shutil
import threading
import hashlib
import zlib

from logging import getLogger, basicConfig, NOTSET, CRITICAL, ERROR, WARNING, INFO, DEBUG
from collections import deque

# LTFS Format Specification 2.4, Annex F (Table F.1): file content hashes are stored as
# ltfs.hash.<hashtype> extended attributes. Only these hashtypes are standardized; all
# other hashtype values are reserved by the spec. Each maps to its underlying algorithm
# (all of which are available on both Python 2.7 and 3.x).
HASH_TYPES = {
'crc32sum': 'crc32',
'md5sum': 'md5',
'sha1sum': 'sha1',
'sha256sum': 'sha256',
'sha512sum': 'sha512',
}
# Accept the bare algorithm name as a convenience alias for the spec hashtype.
HASH_ALIASES = {
'crc32': 'crc32sum', 'md5': 'md5sum', 'sha1': 'sha1sum',
'sha256': 'sha256sum', 'sha512': 'sha512sum',
}

def compute_file_hash(path, hashtype):
"""Stream a file and return the hex digest for an LTFS Table F.1 hashtype (e.g.
'sha256sum'), as a UTF-8 hex string of the length the spec requires for that type."""
algo = HASH_TYPES[hashtype]
if algo == 'crc32':
crc = 0
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(1024 * 1024), b''):
crc = zlib.crc32(chunk, crc)
return '%08x' % (crc & 0xffffffff)
h = hashlib.new(algo)
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(1024 * 1024), b''):
h.update(chunk)
return h.hexdigest()

def compute_file_hashes(path, hashtypes):
"""Compute one or more LTFS Table F.1 hashtypes for a file and return
{hashtype: hexdigest}. The file is read only once. With several hashtypes each is
updated in its own worker thread fed by a single reader; hashlib and zlib release
the GIL during their update calls, so the per-type work runs across CPU cores in
parallel."""
if len(hashtypes) == 1:
ht = hashtypes[0]
return {ht: compute_file_hash(path, ht)}

try:
from queue import Queue # Python 3
except ImportError:
from Queue import Queue # Python 2

queues = dict((ht, Queue(maxsize=8)) for ht in hashtypes)
results = {}

def worker(ht):
q = queues[ht]
algo = HASH_TYPES[ht]
if algo == 'crc32':
crc = 0
chunk = q.get()
while chunk is not None:
crc = zlib.crc32(chunk, crc)
chunk = q.get()
results[ht] = '%08x' % (crc & 0xffffffff)
else:
h = hashlib.new(algo)
chunk = q.get()
while chunk is not None:
h.update(chunk)
chunk = q.get()
results[ht] = h.hexdigest()

threads = [threading.Thread(target=worker, args=(ht,)) for ht in hashtypes]
for t in threads:
t.start()
try:
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(1024 * 1024), b''):
for q in queues.values():
q.put(chunk)
finally:
for q in queues.values():
q.put(None) # signal end-of-stream to every worker
for t in threads:
t.join()
return results

def ensure_ltfs_hash_supported(probe_path, logger):
"""When --store-hash targets an LTFS volume, require LTFS Format Spec >= 2.4 (the
version that introduced the stored ltfs.hash.* VEA), aborting on an older LTFS
volume. Non-LTFS destinations are left alone -- there ltfs.hash.* is just a plain
user extended attribute."""
try:
sig = xattr.get(probe_path, VEA_PREFIX + LTFS_SIG_VEA)
except Exception:
return # No LTFS signature: not an LTFS destination, nothing to gate.
if isinstance(sig, bytes):
sig = sig.decode('ascii', 'replace')
if not sig.startswith('LTFS'):
return
try:
spec = xattr.get(probe_path, VEA_PREFIX + 'ltfs.softwareFormatSpec')
if isinstance(spec, bytes):
spec = spec.decode('ascii', 'replace')
nums = [int(x) for x in spec.strip().split('.')[:2]]
version = (nums[0], nums[1] if len(nums) > 1 else 0)
except Exception as e:
logger.error("--store-hash: cannot determine the LTFS format spec version of '{0}': {1}".format(probe_path, str(e)))
exit(2)
if version < (2, 4):
logger.error("--store-hash: destination LTFS Format Spec {0} is older than 2.4, which is "
"required to store ltfs.hash.* attributes. Omit --store-hash or use a 2.4+ "
"LTFS volume.".format(spec))
exit(2)
logger.log(NOTSET + 1, "Destination LTFS Format Spec {0} supports ltfs.hash.* (>= 2.4)".format(spec))

class CopyItem:
""""""
def __init__(self, src, dst, vea_pre, cp_attr, cp_xattr, logger): #initialization
def __init__(self, src, dst, vea_pre, cp_attr, cp_xattr, logger, store_hash=None): #initialization
self.src = src
self.dst = dst
self.vea_pre = vea_pre
self.cp_attr = cp_attr
self.cp_xattr = cp_xattr
self.store_hash = store_hash
self.vuuid = ''
self.part = ''
self.start = -1
Expand Down Expand Up @@ -101,6 +217,22 @@ class CopyItem:
self.logger.error('Failed to copy "{0}" to "{1}": {2}'.format(self.src, self.dst, str(str(e))))
return False

if self.store_hash:
# Store one ltfs.hash.<hashtype> VEA per requested hashtype on the
# destination. The hash(es) are computed from the (on-disk) source, whose
# bytes are identical to what was just copied; on LTFS this is persisted
# into the index.
try:
target = self.dst
if os.path.isdir(target):
target = os.path.join(target, os.path.basename(self.src))
digests = compute_file_hashes(self.src, self.store_hash)
for ht in self.store_hash:
xattr.set(target, self.vea_pre + 'ltfs.hash.' + ht, digests[ht].encode('ascii'))
except Exception as e:
self.logger.error('Copied "{0}" to "{1}" but failed to store hash(es): {2}'.format(self.src, self.dst, str(e)))
return False

return True

def __repr__(self):
Expand Down Expand Up @@ -151,7 +283,7 @@ class CopyQueue:

self.items = self.items + 1

def walk_dir(self, source, dest, cp_attr, cp_xattr=False):
def walk_dir(self, source, dest, cp_attr, cp_xattr=False, store_hash=None):
(source_root, t) = os.path.split(source)
prefix_len = len(source_root)
dst = dest + "/" + t
Expand All @@ -171,7 +303,7 @@ class CopyQueue:
for f in sorted(files) if self.sort_files else files:
self.logger.log(NOTSET + 1, 'Creating a copy item for file {}'.format(f))
c = CopyItem(os.path.join(root, f), os.path.join(dst, f), VEA_PREFIX,
cp_attr, cp_xattr, logger)
cp_attr, cp_xattr, logger, store_hash)
self.add_copy_item(c)

for d in walk_dirs:
Expand Down Expand Up @@ -280,6 +412,23 @@ parser.add_argument('-v', help='Verbose output. Set VERBOSE level 5', action='st
parser.add_argument('--verbose', help='Configure verbosity of logger. VERBOSE shall be 0-6. default is 4', default = str(logger_info))
parser.add_argument('-q','--quiet', help='No message output', action='store_true')
parser.add_argument('--sort-files', help='Sort the file list before copying', action='store_true')
parser.add_argument('--store-hash', action='store_true',
help='Compute a content hash of each copied file and store it in the '
'ltfs.hash.<hashtype> extended attribute on the destination (intended '
'for LTFS destinations, which persist it in the index per LTFS Format '
'Spec 2.4, Table F.1). The hash type is selected with --hash-algo '
'(default sha256sum).')
parser.add_argument('--hash-algo', default='sha256sum', metavar='HASHTYPE',
help='LTFS hash type to use with --store-hash, stored as ltfs.hash.<HASHTYPE> '
'(LTFS Format Spec 2.4, Table F.1). One of: '
+ ', '.join(sorted(HASH_TYPES))
+ ' (the bare names ' + ', '.join(sorted(HASH_ALIASES))
+ ' are accepted as aliases). Default sha256sum.')
parser.add_argument('--store-hash-all', action='store_true',
help='Like --store-hash, but compute and store every standardized LTFS '
'hash type (' + ', '.join(sorted(HASH_TYPES)) + ') for each file. '
'The file is read once and the hash types are computed in parallel '
'(one worker thread each). Overrides --hash-algo.')

args=parser.parse_args()

Expand Down Expand Up @@ -318,6 +467,22 @@ else:

logger.info('Tape order aware copy for LTFS')

# Resolve --store-hash / --store-hash-all / --hash-algo into args.store_hash: a list of
# LTFS spec hashtypes to compute when hashing is enabled, otherwise None. Downstream code
# treats it as "list of hashtypes or falsy".
if args.store_hash_all:
args.store_hash = sorted(HASH_TYPES) # every standardized hashtype
elif args.store_hash:
ht = args.hash_algo.lower()
ht = HASH_ALIASES.get(ht, ht) # accept a bare algorithm name as an alias
if ht not in HASH_TYPES:
logger.error("Unsupported hash type '{0}'. LTFS Format Spec 2.4 (Table F.1) defines: {1}.".format(
args.hash_algo, ', '.join(sorted(HASH_TYPES))))
exit(2)
args.store_hash = [ht]
else:
args.store_hash = None

if args.target_directory:
if args.DEST != None:
args.SOURCE.extend(args.DEST)
Expand All @@ -336,6 +501,13 @@ if args.DEST == None:
logger.error('No destination is specified')
exit(2)

if args.store_hash:
# ltfs.hash.* is a stored VEA introduced in LTFS Format Spec 2.4. If the
# destination is on an LTFS volume, verify it is new enough up front so we
# fail fast instead of erroring on every single file.
hash_probe = args.DEST if os.path.isdir(args.DEST) else (os.path.dirname(args.DEST) or '.')
ensure_ltfs_hash_supported(hash_probe, logger)

# Special case:
# Copy source is only one file
if args.recursive == False and len(args.SOURCE) == 1:
Expand All @@ -349,6 +521,13 @@ if args.recursive == False and len(args.SOURCE) == 1:
if not os.path.exists(new_d):
os.makedirs(new_d)
shutil.copy(args.SOURCE[0], args.DEST)
if args.store_hash:
target = args.DEST
if os.path.isdir(target):
target = os.path.join(target, os.path.basename(args.SOURCE[0]))
digests = compute_file_hashes(args.SOURCE[0], args.store_hash)
for ht in args.store_hash:
xattr.set(target, VEA_PREFIX + 'ltfs.hash.' + ht, digests[ht].encode('ascii'))
except Exception as e:
logger.error(str(e))
exit(1)
Expand Down Expand Up @@ -402,7 +581,7 @@ for s in args.SOURCE:
(new_d, t) = os.path.split(dst)
if not os.path.exists(new_d):
os.makedirs(new_d)
c = CopyItem(s, dst, VEA_PREFIX, args.p, args.all, logger)
c = CopyItem(s, dst, VEA_PREFIX, args.p, args.all, logger, args.store_hash)
copyq.add_copy_item(c)
else:
logger.log(NOTSET + 1, 'Creating copy item for directory {}'.format(s))
Expand All @@ -414,7 +593,7 @@ for s in args.SOURCE:
if not os.path.exists(new_d):
os.makedirs(new_d)
dst = new_d
copyq.walk_dir(s, dst, args.p, args.all)
copyq.walk_dir(s, dst, args.p, args.all, args.store_hash)
else:
logger.warning("omitting directory '{0}'".format(s))

Expand Down