# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015-2019 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Models for Invenio-Files-REST.
The entities of this module consists of:
* **Buckets** - Identified by UUIDs, and contains objects.
* **Buckets tags** - Identified uniquely with a bucket by a key. Used to store
extra metadata for a bucket.
* **Objects** - Identified uniquely within a bucket by string keys. Each
object can have multiple object versions (note: Objects do not have their
own database table).
* **Object versions** - Identified by UUIDs and belongs to one specific object
in one bucket. Each object version has zero or one file instance. If the
object version has no file instance, it is considered a *delete marker*.
* **File instance** - Identified by UUIDs. Represents a physical file on disk.
The location of the file is specified via a URI. A file instance can have
many object versions.
* **Locations** - A bucket belongs to a specific location. Locations can be
used to represent e.g. different storage systems.
* **Multipart Objects** - Identified by UUIDs and belongs to a specific bucket
and key.
* **Part object** - Identified by their multipart object and a part number.
The actual file access is handled by a storage interface. Also, objects do not
have their own model, but are represented via the :py:data:`ObjectVersion`
model.
"""
import re
import sys
import uuid
from datetime import datetime
from functools import wraps
from os.path import basename
from flask import current_app
from invenio_db import db
from sqlalchemy.dialects import mysql
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import validates
from sqlalchemy.orm.exc import MultipleResultsFound
from sqlalchemy_utils.types import UUIDType
from .errors import (
BucketLockedError,
FileInstanceAlreadySetError,
FileInstanceUnreadableError,
FileSizeError,
InvalidKeyError,
InvalidOperationError,
MultipartAlreadyCompleted,
MultipartInvalidChunkSize,
MultipartInvalidPartNumber,
MultipartInvalidSize,
MultipartMissingParts,
MultipartNotCompleted,
)
from .proxies import current_files_rest
from .utils import guess_mimetype
slug_pattern = re.compile("^[a-z][a-z0-9-]+$")
#
# Helpers
#
def validate_key(key):
"""Validate key.
:param key: The key to validate.
:raises invenio_files_rest.errors.InvalidKeyError: If the key is longer
than the maximum length defined in
:data:`invenio_files_rest.config.FILES_REST_FILE_URI_MAX_LEN`.
:returns: The key.
"""
if len(key) > current_app.config["FILES_REST_OBJECT_KEY_MAX_LEN"]:
raise InvalidKeyError()
return key
def as_bucket(value):
"""Get a bucket object from a bucket ID or a bucket object.
:param value: A :class:`invenio_files_rest.models.Bucket` or a Bucket ID.
:returns: A :class:`invenio_files_rest.models.Bucket` instance.
"""
return value if isinstance(value, Bucket) else Bucket.get(value)
def as_bucket_id(value):
"""Get a bucket ID from a bucket ID or a bucket object.
:param value: A :class:`invenio_files_rest.models.Bucket` instance of a
bucket ID.
:returns: The :class:`invenio_files_rest.models.Bucket` ID.
"""
return value.id if isinstance(value, Bucket) else value
def as_object_version(value):
"""Get an object version object from an object version ID or an object version.
:param value: A :class:`invenio_files_rest.models.ObjectVersion` or an
object version ID.
:returns: A :class:`invenio_files_rest.models.ObjectVersion` instance.
"""
return (
value
if isinstance(value, ObjectVersion)
else ObjectVersion.query.filter_by(version_id=value).one_or_none()
)
def as_object_version_id(value):
"""Get an object version ID from an object version ID or an object version.
:param value: A :class:`invenio_files_rest.models.ObjectVersion` instance
of a object version ID.
:returns: The :class:`invenio_files_rest.models.ObjectVersion` version_id.
"""
return value.version_id if isinstance(value, ObjectVersion) else value
#
# Decorators to validate state.
#
def update_bucket_size(f):
"""Decorate to update bucket size after operation."""
@wraps(f)
def inner(self, *args, **kwargs):
res = f(self, *args, **kwargs)
self.bucket.size += self.file.size
return res
return inner
def ensure_state(default_getter, exc_class, default_msg=None):
"""Create a decorator factory function."""
def decorator(getter=default_getter, msg=default_msg):
def ensure_decorator(f):
@wraps(f)
def inner(self, *args, **kwargs):
if not getter(self):
raise exc_class(msg) if msg else exc_class()
return f(self, *args, **kwargs)
return inner
return ensure_decorator
return decorator
class BucketError(object):
"""Represents a bucket level error.
.. note:: This is not an actual exception.
"""
def __init__(self, message):
self.res = dict(message=message)
def to_dict(self):
return self.res
class ObjectVersionError(object):
"""Represents an object version level error.
.. note:: This is not an actual exception.
"""
def __init__(self, message):
self.res = dict(message=message)
def to_dict(self):
return self.res
ensure_readable = ensure_state(lambda o: o.readable, FileInstanceUnreadableError)
"""Ensure file is readable."""
ensure_writable = ensure_state(
lambda o: o.writable, ValueError, "File is not writable."
)
"""Ensure file is writeable."""
ensure_completed = ensure_state(lambda o: o.completed, MultipartNotCompleted)
"""Ensure file is completed."""
ensure_uncompleted = ensure_state(lambda o: not o.completed, MultipartAlreadyCompleted)
"""Ensure file is not completed."""
ensure_not_deleted = ensure_state(
lambda o: not o.deleted,
InvalidOperationError,
[BucketError("Cannot make snapshot of a deleted bucket.")],
)
"""Ensure file is not deleted."""
ensure_unlocked = ensure_state(lambda o: not o.locked, BucketLockedError)
"""Ensure bucket is locked."""
ensure_no_file = ensure_state(lambda o: o.file_id is None, FileInstanceAlreadySetError)
"""Ensure file is not already set."""
ensure_is_previous_version = ensure_state(
lambda o: not o.is_head,
InvalidOperationError,
[ObjectVersionError("Cannot restore latest version.")],
)
"""Ensure file is the previous version."""
#
# Model definitions
#
class Timestamp(object):
"""Timestamp model mix-in with fractional seconds support.
SQLAlchemy-Utils timestamp model, does not have support for fractional
seconds.
"""
created = db.Column(
db.DateTime().with_variant(mysql.DATETIME(fsp=6), "mysql"),
default=datetime.utcnow,
nullable=False,
)
"""Creation timestamp."""
updated = db.Column(
db.DateTime().with_variant(mysql.DATETIME(fsp=6), "mysql"),
default=datetime.utcnow,
nullable=False,
)
"""Modification timestamp."""
@db.event.listens_for(Timestamp, "before_update", propagate=True)
def timestamp_before_update(mapper, connection, target):
"""Listen for updating updated field."""
target.updated = datetime.utcnow()
[docs]class Location(db.Model, Timestamp):
"""Model defining base locations."""
__tablename__ = "files_location"
id = db.Column(db.Integer, primary_key=True)
"""Internal identifier for locations.
The internal identifier is used only used as foreign key for buckets in
order to decrease storage requirements per row for buckets.
"""
name = db.Column(db.String(20), unique=True, nullable=False)
"""External identifier of the location."""
uri = db.Column(db.String(255), nullable=False)
"""URI of the location."""
default = db.Column(db.Boolean(name="default"), nullable=False, default=False)
"""True if the location is the default location.
At least one location should be the default location.
"""
[docs] @validates("name")
def validate_name(self, key, name):
"""Validate name."""
if not slug_pattern.match(name) or len(name) > 20:
raise ValueError(
"Invalid location name (lower-case alphanumeric + dashes)."
)
return name
[docs] @classmethod
def get_by_name(cls, name):
"""Fetch a specific location object."""
return cls.query.filter_by(
name=name,
).one_or_none()
[docs] @classmethod
def get_default(cls):
"""Fetch the default location object."""
try:
return cls.query.filter_by(default=True).one_or_none()
except MultipleResultsFound:
return None
[docs] @classmethod
def all(cls):
"""Return query that fetches all locations."""
return Location.query.all()
def __repr__(self):
"""Return representation of location."""
return self.name
[docs]class Bucket(db.Model, Timestamp):
"""Model for storing buckets.
A bucket is a container of objects. Buckets have a default location and
storage class. Individual objects in the bucket can however have different
locations and storage classes.
A bucket can be marked as deleted. A bucket can also be marked as locked
to prevent operations on the bucket.
Each bucket can also define a quota. The size of a bucket is the size
of all objects in the bucket (including all versions).
"""
__tablename__ = "files_bucket"
id = db.Column(
UUIDType,
primary_key=True,
default=uuid.uuid4,
)
"""Bucket identifier."""
default_location = db.Column(
db.Integer, db.ForeignKey(Location.id, ondelete="RESTRICT"), nullable=False
)
"""Default location."""
default_storage_class = db.Column(
db.String(1),
nullable=False,
default=lambda: current_app.config["FILES_REST_DEFAULT_STORAGE_CLASS"],
)
"""Default storage class."""
size = db.Column(db.BigInteger, default=0, nullable=False)
"""Size of bucket.
This is a computed property which can rebuilt any time from the objects
inside the bucket.
"""
quota_size = db.Column(
db.BigInteger,
nullable=True,
default=lambda: current_app.config["FILES_REST_DEFAULT_QUOTA_SIZE"],
)
"""Quota size of bucket.
Usage of this property depends on which file size limiters are installed.
"""
max_file_size = db.Column(
db.BigInteger,
nullable=True,
default=lambda: current_app.config["FILES_REST_DEFAULT_MAX_FILE_SIZE"],
)
"""Maximum size of a single file in the bucket.
Usage of this property depends on which file size limiters are installed.
"""
locked = db.Column(db.Boolean(name="locked"), default=False, nullable=False)
"""Lock state of bucket.
Modifications are not allowed on a locked bucket.
"""
deleted = db.Column(db.Boolean(name="deleted"), default=False, nullable=False)
"""Delete state of bucket."""
location = db.relationship(Location, backref="buckets")
"""Location associated with this bucket."""
def __repr__(self):
"""Return representation of location."""
return str(self.id)
@property
def quota_left(self):
"""Get how much space is left in the bucket."""
if self.quota_size:
return max(self.quota_size - self.size, 0)
@property
def size_limit(self):
"""Get size limit for this bucket.
The limit is based on the minimum output of the file size limiters.
"""
limits = [
lim
for lim in current_files_rest.file_size_limiters(self)
if lim.limit is not None
]
return min(limits) if limits else None
[docs] @validates("default_storage_class")
def validate_storage_class(self, key, default_storage_class):
"""Validate storage class."""
if (
default_storage_class
not in current_app.config["FILES_REST_STORAGE_CLASS_LIST"]
):
raise ValueError("Invalid storage class.")
return default_storage_class
[docs] @ensure_not_deleted()
def snapshot(self, lock=False):
"""Create a snapshot of latest objects in bucket.
:param lock: Create the new bucket in a locked state.
:returns: Newly created bucket containing copied ObjectVersion.
"""
with db.session.begin_nested():
bucket = Bucket(
default_location=self.default_location,
default_storage_class=self.default_storage_class,
quota_size=self.quota_size,
)
db.session.add(bucket)
for o in ObjectVersion.get_by_bucket(self):
o.copy(bucket=bucket)
bucket.locked = True if lock else self.locked
return bucket
[docs] @ensure_not_deleted(msg=[BucketError("Cannot sync a deleted bucket.")])
def sync(self, bucket, delete_extras=False):
"""Sync self bucket ObjectVersions to the destination bucket.
The bucket is fully mirrored with the destination bucket following the
logic:
* same ObjectVersions are not touched
* new ObjectVersions are added to destination
* deleted ObjectVersions are deleted in destination
* extra ObjectVersions in dest are deleted if `delete_extras` param is
True
:param bucket: The destination bucket.
:param delete_extras: Delete extra ObjectVersions in destination if
True.
:returns: The bucket with an exact copy of ObjectVersions in self.
"""
assert not bucket.locked
src_ovs = ObjectVersion.get_by_bucket(bucket=self, with_deleted=True)
dest_ovs = ObjectVersion.get_by_bucket(bucket=bucket, with_deleted=True)
# transform into a dict { key: object version }
src_dict_ovs = {}
for ov in src_ovs:
# keep only the latest version of an ObjectVersion and don't
# override with previous
if ov.key not in src_dict_ovs:
src_dict_ovs[ov.key] = ov
dest_dict_ovs = {}
for ov in dest_ovs:
# keep only the latest version of an ObjectVersion and don't
# override with previous
if ov.key not in dest_dict_ovs:
dest_dict_ovs[ov.key] = ov
for key, ov in src_dict_ovs.items():
key_in_dest = key in dest_dict_ovs
if ov.deleted:
if key_in_dest and not dest_dict_ovs[key].deleted:
ObjectVersion.delete(bucket, key)
else:
if not key_in_dest:
ov.copy(bucket=bucket)
else:
dest_ov = dest_dict_ovs[key]
file_in_dest_differs = ov.file_id != dest_ov.file_id
ov_deleted_in_dest = dest_ov.deleted
if file_in_dest_differs or ov_deleted_in_dest:
ov.copy(bucket=bucket)
if delete_extras:
for key, ov in dest_dict_ovs.items():
if key not in src_dict_ovs:
ObjectVersion.delete(bucket, key)
return bucket
[docs] @classmethod
def create(cls, location=None, storage_class=None, **kwargs):
r"""Create a bucket.
:param location: Location of a bucket (instance or name).
Default: Default location.
:param storage_class: Storage class of a bucket.
Default: Default storage class.
:param \**kwargs: Keyword arguments are forwarded to the class
:param \**kwargs: Keyword arguments are forwarded to the class
constructor.
:returns: Created bucket.
"""
with db.session.begin_nested():
if location is None:
location = Location.get_default()
elif isinstance(location, str):
location = Location.get_by_name(location)
obj = cls(
default_location=location.id,
default_storage_class=storage_class
or current_app.config["FILES_REST_DEFAULT_STORAGE_CLASS"],
**kwargs
)
db.session.add(obj)
return obj
[docs] @classmethod
def get(cls, bucket_id):
"""Get a bucket object (excluding deleted).
:param bucket_id: Bucket identifier.
:returns: Bucket instance.
"""
return cls.query.filter_by(id=bucket_id, deleted=False).one_or_none()
[docs] @classmethod
def all(cls):
"""Return query of all buckets (excluding deleted)."""
return cls.query.filter_by(deleted=False)
[docs] @classmethod
def delete(cls, bucket_id):
"""Delete a bucket.
Does not actually delete the Bucket, just marks it as deleted.
"""
bucket = cls.get(bucket_id)
if not bucket or bucket.deleted:
return False
bucket.deleted = True
return True
[docs] @ensure_unlocked()
def remove(self):
"""Permanently remove a bucket and all objects (including versions).
.. warning::
This by-passes the normal versioning and should only be used when
you want to permanently delete a bucket and its objects. Otherwise
use :py:data:`Bucket.delete()`.
Note the method does not remove the associated file instances which
must be garbage collected.
:returns: ``self``.
"""
with db.session.begin_nested():
ObjectVersion.query.filter_by(bucket_id=self.id).delete()
self.query.filter_by(id=self.id).delete()
return self
[docs]class BucketTag(db.Model):
"""Model for storing tags associated to buckets.
This is useful to store extra information for a bucket.
"""
__tablename__ = "files_buckettags"
bucket_id = db.Column(
UUIDType,
db.ForeignKey(Bucket.id, ondelete="CASCADE"),
default=uuid.uuid4,
primary_key=True,
)
key = db.Column(db.String(255), primary_key=True)
"""Tag key."""
value = db.Column(db.Text, nullable=False)
"""Tag value."""
bucket = db.relationship(Bucket, backref="tags")
"""Relationship to buckets."""
[docs] @classmethod
def get(cls, bucket, key):
"""Get tag object."""
return cls.query.filter_by(
bucket_id=as_bucket_id(bucket),
key=key,
).one_or_none()
[docs] @classmethod
def create(cls, bucket, key, value):
"""Create a new tag for bucket."""
with db.session.begin_nested():
obj = cls(bucket_id=as_bucket_id(bucket), key=key, value=value)
db.session.add(obj)
return obj
[docs] @classmethod
def create_or_update(cls, bucket, key, value):
"""Create or update a new tag for bucket."""
obj = cls.get(bucket, key)
if obj:
obj.value = value
db.session.merge(obj)
else:
obj = cls.create(bucket, key, value)
return obj
[docs] @classmethod
def get_value(cls, bucket, key):
"""Get tag value."""
obj = cls.get(bucket, key)
return obj.value if obj else None
[docs] @classmethod
def delete(cls, bucket, key):
"""Delete a tag."""
with db.session.begin_nested():
cls.query.filter_by(
bucket_id=as_bucket_id(bucket),
key=key,
).delete()
[docs]class FileInstance(db.Model, Timestamp):
"""Model for storing files.
A file instance represents a file on disk. A file instance may be linked
from many objects, while an object can have one and only one file instance.
A file instance also records the storage class, size and checksum of the
file on disk.
Additionally, a file instance can be read only in case the storage layer
is not capable of writing to the file (e.g. can typically be used to
link to files on externally controlled storage).
"""
__tablename__ = "files_files"
id = db.Column(
UUIDType,
primary_key=True,
default=uuid.uuid4,
)
"""Identifier of file."""
uri = db.Column(
db.Text().with_variant(mysql.VARCHAR(255), "mysql"), unique=True, nullable=True
)
"""Location of file."""
storage_class = db.Column(db.String(1), nullable=True)
"""Storage class of file."""
size = db.Column(db.BigInteger, default=0, nullable=True)
"""Size of file."""
checksum = db.Column(db.String(255), nullable=True)
"""String representing the checksum of the object."""
readable = db.Column(db.Boolean(name="readable"), default=True, nullable=False)
"""Defines if the file is read only."""
writable = db.Column(db.Boolean(name="writable"), default=True, nullable=False)
"""Defines if file is writable.
This property is used to create a file instance prior to having the actual
file at the given URI. This is useful when e.g. copying a file instance.
"""
last_check_at = db.Column(db.DateTime, nullable=True)
"""Timestamp of last fixity check."""
last_check = db.Column(db.Boolean(name="last_check"), default=True)
"""Result of last fixity check."""
[docs] @validates("uri")
def validate_uri(self, key, uri):
"""Validate uri."""
if len(uri) > current_app.config["FILES_REST_FILE_URI_MAX_LEN"]:
raise ValueError("FileInstance URI too long ({0}).".format(len(uri)))
return uri
[docs] @classmethod
def get(cls, file_id):
"""Get a file instance."""
return cls.query.filter_by(id=file_id).one_or_none()
[docs] @classmethod
def get_by_uri(cls, uri):
"""Get a file instance by URI."""
assert uri is not None
return cls.query.filter_by(uri=uri).one_or_none()
[docs] @classmethod
def create(cls):
"""Create a file instance.
Note, object is only added to the database session.
"""
obj = cls(
id=uuid.uuid4(),
writable=True,
readable=False,
size=0,
)
db.session.add(obj)
return obj
[docs] def delete(self):
"""Delete a file instance.
The file instance can be deleted if it has no references from other
objects. The caller is responsible to test if the file instance is
writable and that the disk file can actually be removed.
.. note::
Normally you should use the Celery task to delete a file instance,
as this method will not remove the file on disk.
"""
self.query.filter_by(id=self.id).delete()
return self
[docs] def storage(self, **kwargs):
"""Get storage interface for object.
Uses the applications storage factory to create a storage interface
that can be used for this particular file instance.
:returns: Storage interface.
"""
return current_files_rest.storage_factory(fileinstance=self, **kwargs)
[docs] @ensure_readable()
def update_checksum(
self, progress_callback=None, chunk_size=None, checksum_kwargs=None, **kwargs
):
"""Update checksum based on file."""
self.checksum = self.storage(**kwargs).checksum(
progress_callback=progress_callback,
chunk_size=chunk_size,
**(checksum_kwargs or {})
)
[docs] def clear_last_check(self):
"""Clear the checksum of the file."""
with db.session.begin_nested():
self.last_check = None
self.last_check_at = datetime.utcnow()
return self
[docs] def verify_checksum(
self,
progress_callback=None,
chunk_size=None,
throws=True,
checksum_kwargs=None,
**kwargs
):
"""Verify checksum of file instance.
:param bool throws: If `True`, exceptions raised during checksum
calculation will be re-raised after logging. If set to `False`, and
an exception occurs, the `last_check` field is set to `None`
(`last_check_at` of course is updated), since no check actually was
performed.
:param dict checksum_kwargs: Passed as `**kwargs`` to
``storage().checksum``.
"""
try:
real_checksum = self.storage(**kwargs).checksum(
progress_callback=progress_callback,
chunk_size=chunk_size,
**(checksum_kwargs or {})
)
except Exception as exc:
current_app.logger.exception(str(exc))
if throws:
raise
real_checksum = None
with db.session.begin_nested():
self.last_check = (
None if real_checksum is None else (self.checksum == real_checksum)
)
self.last_check_at = datetime.utcnow()
return self.last_check
[docs] @ensure_writable()
def init_contents(self, size=0, **kwargs):
"""Initialize file."""
self.set_uri(
*self.storage(**kwargs).initialize(size=size), readable=False, writable=True
)
[docs] @ensure_writable()
def update_contents(
self,
stream,
seek=0,
size=None,
chunk_size=None,
progress_callback=None,
**kwargs
):
"""Save contents of stream to this file.
:param obj: ObjectVersion instance from where this file is accessed
from.
:param stream: File-like stream.
"""
self.checksum = None
return self.storage(**kwargs).update(
stream,
seek=seek,
size=size,
chunk_size=chunk_size,
progress_callback=progress_callback,
)
[docs] @ensure_writable()
def set_contents(
self,
stream,
chunk_size=None,
size=None,
size_limit=None,
progress_callback=None,
**kwargs
):
"""Save contents of stream to this file.
:param obj: ObjectVersion instance from where this file is accessed
from.
:param stream: File-like stream.
"""
self.set_uri(
*self.storage(**kwargs).save(
stream,
chunk_size=chunk_size,
size=size,
size_limit=size_limit,
progress_callback=progress_callback,
)
)
[docs] @ensure_writable()
def copy_contents(
self, fileinstance, progress_callback=None, chunk_size=None, **kwargs
):
"""Copy this file instance into another file instance."""
if not fileinstance.readable:
raise ValueError("Source file instance is not readable.")
if not self.size == 0:
raise ValueError("File instance has data.")
self.set_uri(
*self.storage(**kwargs).copy(
fileinstance.storage(**kwargs),
chunk_size=chunk_size,
progress_callback=progress_callback,
)
)
[docs] @ensure_readable()
def send_file(
self,
filename,
restricted=True,
mimetype=None,
trusted=False,
chunk_size=None,
as_attachment=False,
**kwargs
):
"""Send file to client."""
return self.storage(**kwargs).send_file(
filename,
mimetype=mimetype,
restricted=restricted,
checksum=self.checksum,
trusted=trusted,
chunk_size=chunk_size,
as_attachment=as_attachment,
)
[docs] def set_uri(
self, uri, size, checksum, readable=True, writable=False, storage_class=None
):
"""Set a location of a file."""
self.uri = uri
self.size = size
self.checksum = checksum
self.writable = writable
self.readable = readable
self.storage_class = (
current_app.config["FILES_REST_DEFAULT_STORAGE_CLASS"]
if storage_class is None
else storage_class
)
return self
[docs]class ObjectVersion(db.Model, Timestamp):
"""Model for storing versions of objects.
A bucket stores one or more objects identified by a key. Each object is
versioned where each version is represented by an ``ObjectVersion``.
An object version can either be 1) a *normal version* which is linked to
a file instance, or 2) a *delete marker*, which is *not* linked to a file
instance.
An normal object version is linked to a physical file on disk via a file
instance. This allows for multiple object versions to point to the same
file on disk, to optimize storage efficiency (e.g. useful for snapshotting
an entire bucket without duplicating the files).
A delete marker object version represents that the object at hand was
deleted.
The latest version of an object is marked using the ``is_head`` property.
If the latest object version is a delete marker the object will not be
shown in the bucket.
"""
__tablename__ = "files_object"
version_id = db.Column(UUIDType, primary_key=True, default=uuid.uuid4)
"""Identifier for the specific version of an object."""
key = db.Column(db.Text().with_variant(mysql.VARCHAR(255), "mysql"), nullable=False)
"""Key identifying the object."""
bucket_id = db.Column(
UUIDType,
db.ForeignKey(Bucket.id, ondelete="RESTRICT"),
default=uuid.uuid4,
nullable=False,
)
"""Bucket identifier."""
file_id = db.Column(
UUIDType, db.ForeignKey(FileInstance.id, ondelete="RESTRICT"), nullable=True
)
"""File instance for this object version.
A null value in this column defines that the object has been deleted.
"""
_mimetype = db.Column(
db.String(255),
index=True,
nullable=True,
)
"""MIME type of the object."""
is_head = db.Column(db.Boolean(name="is_head"), nullable=False, default=True)
"""Defines if object is the latest version."""
# Relationships definitions
bucket = db.relationship(Bucket, backref="objects")
"""Relationship to buckets."""
file = db.relationship(FileInstance, backref="objects")
"""Relationship to file instance."""
__table_args__ = (db.UniqueConstraint("bucket_id", "version_id", "key"),)
[docs] @validates("key")
def validate_key(self, key, key_):
"""Validate key."""
return validate_key(key_)
def __unicode__(self):
"""Return unicoded object."""
return "{0}:{1}:{2}".format(self.bucket_id, self.version_id, self.key)
# https://docs.python.org/3.3/howto/pyporting.html#str-unicode
if sys.version_info[0] >= 3: # Python 3
def __repr__(self):
"""Return representation of location."""
return self.__unicode__()
else: # Python 2
def __repr__(self):
"""Return representation of location."""
return self.__unicode__().encode("utf8")
@hybrid_property
def mimetype(self):
"""Get MIME type of object."""
return self._mimetype if self._mimetype else guess_mimetype(self.key)
@mimetype.setter
def mimetype(self, value):
"""Setter for MIME type."""
self._mimetype = value
@property
def basename(self):
"""Return filename of the object."""
return basename(self.key)
@property
def deleted(self):
"""Determine if object version is a delete marker."""
return self.file_id is None
[docs] @ensure_no_file()
@update_bucket_size
def set_contents(
self,
stream,
chunk_size=None,
size=None,
size_limit=None,
progress_callback=None,
):
"""Save contents of stream to file instance.
If a file instance has already been set, this methods raises an
``FileInstanceAlreadySetError`` exception.
:param stream: File-like stream.
:param size: Size of stream if known.
:param chunk_size: Desired chunk size to read stream in. It is up to
the storage interface if it respects this value.
"""
if size_limit is None:
size_limit = self.bucket.size_limit
self.file = FileInstance.create()
self.file.set_contents(
stream,
size_limit=size_limit,
size=size,
chunk_size=chunk_size,
progress_callback=progress_callback,
default_location=self.bucket.location.uri,
default_storage_class=self.bucket.default_storage_class,
)
return self
[docs] @ensure_no_file()
@update_bucket_size
def set_location(self, uri, size, checksum, storage_class=None):
"""Set only URI location of for object.
Useful to link files on externally controlled storage. If a file
instance has already been set, this methods raises an
``FileInstanceAlreadySetError`` exception.
:param uri: Full URI to object (which can be interpreted by the storage
interface).
:param size: Size of file.
:param checksum: Checksum of file.
:param storage_class: Storage class where file is stored ()
"""
self.file = FileInstance()
self.file.set_uri(uri, size, checksum, storage_class=storage_class)
db.session.add(self.file)
return self
[docs] @ensure_no_file()
@update_bucket_size
def set_file(self, fileinstance):
"""Set a file instance."""
self.file = fileinstance
return self
[docs] def send_file(self, restricted=True, trusted=False, **kwargs):
"""Wrap around FileInstance's send file."""
return self.file.send_file(
self.basename,
restricted=restricted,
mimetype=self.mimetype,
trusted=trusted,
**kwargs
)
[docs] @ensure_is_previous_version()
def restore(self):
"""Restore this object version to become the latest version.
Raises an exception if the object is the latest version.
"""
# Note, copy calls create which will fail if bucket is locked.
return self.copy()
[docs] @ensure_not_deleted(msg=[ObjectVersionError("Cannot copy a delete marker.")])
def copy(self, bucket=None, key=None):
"""Copy an object version to a given bucket + object key.
The copy operation is handled completely at the metadata level. The
actual data on disk is not copied. Instead, the two object versions
will point to the same physical file (via the same FileInstance).
All the tags associated with the current object version are copied over
to the new instance.
.. warning::
If the destination object exists, it will be replaced by the new
object version which will become the latest version.
:param bucket: The bucket (instance or id) to copy the object to.
Default: current bucket.
:param key: Key name of destination object.
Default: current object key.
:returns: The copied object version.
"""
new_ob = ObjectVersion.create(
self.bucket if bucket is None else as_bucket(bucket),
key or self.key,
_file_id=self.file_id,
)
for tag in self.tags:
ObjectVersionTag.create_or_update(
object_version=new_ob, key=tag.key, value=tag.value
)
return new_ob
[docs] @ensure_unlocked(getter=lambda o: not o.bucket.locked)
def remove(self):
"""Permanently remove a specific object version from the database.
.. warning::
This by-passes the normal versioning and should only be used when
you want to permanently delete a specific object version. Otherwise
use :py:data:`ObjectVersion.delete()`.
Note the method does not remove the associated file instance which
must be garbage collected.
:returns: ``self``.
"""
with db.session.begin_nested():
if self.file_id:
self.bucket.size -= self.file.size
self.query.filter_by(
bucket_id=self.bucket_id,
key=self.key,
version_id=self.version_id,
).delete()
return self
[docs] @classmethod
def create(
cls,
bucket,
key,
_file_id=None,
stream=None,
mimetype=None,
version_id=None,
**kwargs
):
"""Create a new object in a bucket.
The created object is by default created as a delete marker. You must
use ``set_contents()`` or ``set_location()`` in order to change this.
:param bucket: The bucket (instance or id) to create the object in.
:param key: Key of object.
:param _file_id: For internal use.
:param stream: File-like stream object. Used to set content of object
immediately after being created.
:param mimetype: MIME type of the file object if it is known.
:param kwargs: Keyword arguments passed to ``Object.set_contents()``.
"""
bucket = as_bucket(bucket)
if bucket.locked:
raise BucketLockedError()
with db.session.begin_nested():
latest_obj = cls.query.filter(
cls.bucket == bucket, cls.key == key, cls.is_head.is_(True)
).one_or_none()
if latest_obj is not None:
latest_obj.is_head = False
db.session.add(latest_obj)
# By default objects are created in a deleted state (i.e.
# file_id is null).
obj = cls(
bucket=bucket,
key=key,
version_id=version_id or uuid.uuid4(),
is_head=True,
mimetype=mimetype,
)
if _file_id:
file_ = (
_file_id
if isinstance(_file_id, FileInstance)
else FileInstance.get(_file_id)
)
obj.set_file(file_)
db.session.add(obj)
if stream:
obj.set_contents(stream, **kwargs)
return obj
[docs] @classmethod
def get(cls, bucket, key, version_id=None):
"""Fetch a specific object.
By default the latest object version is returned, if
``version_id`` is not set.
:param bucket: The bucket (instance or id) to get the object from.
:param key: Key of object.
:param version_id: Specific version of an object.
"""
filters = [
cls.bucket_id == as_bucket_id(bucket),
cls.key == key,
]
if version_id:
filters.append(cls.version_id == version_id)
else:
filters.append(cls.is_head.is_(True))
filters.append(cls.file_id.isnot(None))
return cls.query.filter(*filters).one_or_none()
[docs] @classmethod
def get_versions(cls, bucket, key, desc=True):
"""Fetch all versions of a specific object.
:param bucket: The bucket (instance or id) to get the object from.
:param key: Key of object.
:param desc: Sort results desc if True, asc otherwise.
:returns: The query to execute to fetch all versions.
"""
filters = [
cls.bucket_id == as_bucket_id(bucket),
cls.key == key,
]
order = cls.created.desc() if desc else cls.created.asc()
return cls.query.filter(*filters).order_by(cls.key, order)
[docs] @classmethod
def delete(cls, bucket, key):
"""Delete an object.
Technically works by creating a new version which works as a delete
marker.
:param bucket: The bucket (instance or id) to delete the object from.
:param key: Key of object.
:returns: Created delete marker object if key exists else ``None``.
"""
bucket_id = as_bucket_id(bucket)
obj = cls.get(bucket_id, key)
if obj:
return cls.create(as_bucket(bucket), key)
return None
[docs] @classmethod
def get_by_bucket(cls, bucket, versions=False, with_deleted=False):
"""Return query that fetches all the objects in a bucket.
:param bucket: The bucket (instance or id) to query.
:param versions: Select all versions if True, only heads otherwise.
:param with_deleted: Select also deleted objects if True.
:returns: The query to retrieve filtered objects in the given bucket.
"""
bucket_id = bucket.id if isinstance(bucket, Bucket) else bucket
filters = [
cls.bucket_id == bucket_id,
]
if not versions:
filters.append(cls.is_head.is_(True))
if not with_deleted:
filters.append(cls.file_id.isnot(None))
return cls.query.filter(*filters).order_by(cls.key, cls.created.desc())
[docs] @classmethod
def relink_all(cls, old_file, new_file):
"""Relink all object versions (for a given file) to a new file.
.. warning::
Use this method with great care.
"""
assert old_file.checksum == new_file.checksum
assert old_file.id
assert new_file.id
with db.session.begin_nested():
ObjectVersion.query.filter_by(file_id=str(old_file.id)).update(
{ObjectVersion.file_id: str(new_file.id)}
)
def __eq__(self, other):
"""Check if the two object are equals."""
return (
other
and isinstance(other, self.__class__)
and self.key == other.key
and self.file_id == other.file_id
)
def __ne__(self, other):
"""Check if are not equal."""
return not self.__eq__(other=other)
# DDL string used to avoid automap in mysql until sqlalchemy 2.0
# https://github.com/sqlalchemy/sqlalchemy/discussions/7597
[docs] @classmethod
def ix_uq_partial_files_object_is_head_dll(cls):
"""Return DDL instruction for ix_uq_partial_files_object_is_head."""
return db.DDL(
"CREATE UNIQUE INDEX ix_uq_partial_files_object_is_head "
"ON %(table)s (bucket_id, key) WHERE is_head"
)
db.event.listen(
ObjectVersion.__table__,
"after_create",
ObjectVersion.ix_uq_partial_files_object_is_head_dll().execute_if(
dialect="postgresql"
),
)
"""Create ix_uq_partial_files_object_is_head only on postgresql backend."""
[docs]class ObjectVersionTag(db.Model):
"""Model for storing tags associated to object versions.
Used for storing extra technical information for an object version.
"""
__tablename__ = "files_objecttags"
version_id = db.Column(
UUIDType,
db.ForeignKey(ObjectVersion.version_id, ondelete="CASCADE"),
default=uuid.uuid4,
primary_key=True,
)
"""Object version id."""
key = db.Column(db.String(255), primary_key=True)
"""Tag key."""
value = db.Column(db.Text, nullable=False)
"""Tag value."""
object_version = db.relationship(ObjectVersion, backref="tags")
"""Relationship to object versions."""
[docs] def copy(self, object_version=None, key=None):
"""Copy a tag to a given object version.
:param object_version: The object version instance to copy the tag to.
Default: current object version.
:param key: Key of destination tag.
Default: current tag key.
:return: The copied object version tag.
"""
return ObjectVersionTag.create(
self.object_version if object_version is None else object_version,
key or self.key,
self.value,
)
[docs] @classmethod
def get(cls, object_version, key):
"""Get the tag object."""
return cls.query.filter_by(
version_id=as_object_version_id(object_version),
key=key,
).one_or_none()
[docs] @classmethod
def create(cls, object_version, key, value):
"""Create a new tag for a given object version."""
assert len(key) < 256
assert len(value) < 256
with db.session.begin_nested():
obj = cls(
version_id=as_object_version_id(object_version), key=key, value=value
)
db.session.add(obj)
return obj
[docs] @classmethod
def create_or_update(cls, object_version, key, value):
"""Create or update a new tag for a given object version."""
assert len(key) < 256
assert len(value) < 256
obj = cls.get(object_version, key)
if obj:
obj.value = value
db.session.merge(obj)
else:
obj = cls.create(object_version, key, value)
return obj
[docs] @classmethod
def get_value(cls, object_version, key):
"""Get the tag value."""
obj = cls.get(object_version, key)
return obj.value if obj else None
[docs] @classmethod
def delete(cls, object_version, key=None):
"""Delete tags.
:param object_version: The object version instance or id.
:param key: Key of the tag to delete.
Default: delete all tags.
"""
with db.session.begin_nested():
q = cls.query.filter_by(version_id=as_object_version_id(object_version))
if key:
q = q.filter_by(key=key)
q.delete()
[docs]class MultipartObject(db.Model, Timestamp):
"""Model for storing files in chunks.
A multipart object belongs to a specific bucket and key and is identified
by an upload id. You can have multiple multipart uploads for the same
bucket and key. Once all parts of a multipart object is uploaded, the state
is changed to ``completed``. Afterwards it is not possible to upload
new parts. Once completed, the multipart object is merged, and added as
a new version in the current object/bucket.
All parts for a multipart upload must be of the same size, except for the
last part.
"""
__tablename__ = "files_multipartobject"
__table_args__ = (
db.UniqueConstraint("upload_id", "bucket_id", "key", name="uix_item"),
)
upload_id = db.Column(
UUIDType,
default=uuid.uuid4,
primary_key=True,
)
"""Identifier for the specific version of an object."""
bucket_id = db.Column(
UUIDType,
db.ForeignKey(Bucket.id, ondelete="RESTRICT"),
)
"""Bucket identifier."""
key = db.Column(
db.Text().with_variant(mysql.VARCHAR(255), "mysql"),
)
"""Key identifying the object."""
file_id = db.Column(
UUIDType, db.ForeignKey(FileInstance.id, ondelete="RESTRICT"), nullable=False
)
"""File instance for this multipart object."""
chunk_size = db.Column(db.Integer, nullable=True)
"""Size of chunks for file."""
size = db.Column(db.BigInteger, nullable=True)
"""Size of file."""
completed = db.Column(db.Boolean(name="completed"), nullable=False, default=False)
"""Defines if object is the completed."""
# Relationships definitions
bucket = db.relationship(Bucket, backref="multipart_objects")
"""Relationship to buckets."""
file = db.relationship(FileInstance, backref="multipart_objects")
"""Relationship to buckets."""
def __repr__(self):
"""Return representation of the multipart object."""
return "{0}:{2}:{1}".format(self.bucket_id, self.key, self.upload_id)
@property
def last_part_number(self):
"""Get last part number."""
return (
int(self.size / self.chunk_size)
if self.size % self.chunk_size
else int(self.size / self.chunk_size) - 1
)
@property
def last_part_size(self):
"""Get size of last part."""
return self.size % self.chunk_size
[docs] @validates("key")
def validate_key(self, key, key_):
"""Validate key."""
return validate_key(key_)
[docs] @staticmethod
def is_valid_chunksize(chunk_size):
"""Check if size is valid."""
min_csize = current_app.config["FILES_REST_MULTIPART_CHUNKSIZE_MIN"]
max_csize = current_app.config["FILES_REST_MULTIPART_CHUNKSIZE_MAX"]
return chunk_size >= min_csize and chunk_size <= max_csize
[docs] @staticmethod
def is_valid_size(size, chunk_size):
"""Validate max theoretical size."""
min_csize = current_app.config["FILES_REST_MULTIPART_CHUNKSIZE_MIN"]
max_size = chunk_size * current_app.config["FILES_REST_MULTIPART_MAX_PARTS"]
return size > min_csize and size <= max_size
[docs] def expected_part_size(self, part_number):
"""Get expected part size for a particular part number."""
last_part = self.multipart.last_part_number
if part_number == last_part:
return self.multipart.last_part_size
elif part_number >= 0 and part_number < last_part:
return self.multipart.chunk_size
else:
raise MultipartInvalidPartNumber()
[docs] @ensure_uncompleted()
def complete(self):
"""Mark a multipart object as complete."""
if Part.count(self) != self.last_part_number + 1:
raise MultipartMissingParts()
with db.session.begin_nested():
self.completed = True
self.file.readable = True
self.file.writable = False
return self
[docs] @ensure_completed()
def merge_parts(self, version_id=None, **kwargs):
"""Merge parts into object version."""
self.file.update_checksum(**kwargs)
with db.session.begin_nested():
obj = ObjectVersion.create(
self.bucket, self.key, _file_id=self.file_id, version_id=version_id
)
self.delete()
return obj
[docs] def delete(self):
"""Delete a multipart object."""
# Update bucket size.
self.bucket.size -= self.size
# Remove parts
Part.query_by_multipart(self).delete()
# Remove self
self.query.filter_by(upload_id=self.upload_id).delete()
[docs] @classmethod
def create(cls, bucket, key, size, chunk_size):
"""Create a new object in a bucket."""
bucket = as_bucket(bucket)
if bucket.locked:
raise BucketLockedError()
# Validate chunk size.
if not cls.is_valid_chunksize(chunk_size):
raise MultipartInvalidChunkSize()
# Validate max theoretical size.
if not cls.is_valid_size(size, chunk_size):
raise MultipartInvalidSize()
# Validate max bucket size.
bucket_limit = bucket.size_limit
if bucket_limit and size > bucket_limit:
desc = (
"File size limit exceeded."
if isinstance(bucket_limit, int)
else bucket_limit.reason
)
raise FileSizeError(description=desc)
with db.session.begin_nested():
file_ = FileInstance.create()
file_.size = size
obj = cls(
upload_id=uuid.uuid4(),
bucket=bucket,
key=key,
chunk_size=chunk_size,
size=size,
completed=False,
file=file_,
)
bucket.size += size
db.session.add(obj)
file_.init_contents(
size=size,
default_location=bucket.location.uri,
default_storage_class=bucket.default_storage_class,
)
return obj
[docs] @classmethod
def get(cls, bucket, key, upload_id, with_completed=False):
"""Fetch a specific multipart object."""
q = cls.query.filter_by(
upload_id=upload_id,
bucket_id=as_bucket_id(bucket),
key=key,
)
if not with_completed:
q = q.filter(cls.completed.is_(False))
return q.one_or_none()
[docs] @classmethod
def query_expired(cls, dt, bucket=None):
"""Query all uncompleted multipart uploads."""
q = cls.query.filter(cls.created < dt).filter_by(completed=True)
if bucket:
q = q.filter(cls.bucket_id == as_bucket_id(bucket))
return q
[docs] @classmethod
def query_by_bucket(cls, bucket):
"""Query all uncompleted multipart uploads."""
return cls.query.filter(cls.bucket_id == as_bucket_id(bucket))
[docs]class Part(db.Model, Timestamp):
"""Part object."""
__tablename__ = "files_multipartobject_part"
upload_id = db.Column(
UUIDType,
db.ForeignKey(MultipartObject.upload_id, ondelete="RESTRICT"),
primary_key=True,
)
"""Multipart object identifier."""
part_number = db.Column(db.Integer, primary_key=True, autoincrement=False)
"""Part number."""
checksum = db.Column(db.String(255), nullable=True)
"""String representing the checksum of the part."""
# Relationships definitions
multipart = db.relationship(MultipartObject, backref="parts")
"""Relationship to multipart objects."""
@property
def start_byte(self):
"""Get start byte in file of this part."""
return self.part_number * self.multipart.chunk_size
@property
def end_byte(self):
"""Get end byte in file for this part."""
return min(
(self.part_number + 1) * self.multipart.chunk_size, self.multipart.size
)
@property
def part_size(self):
"""Get size of this part."""
return self.end_byte - self.start_byte
[docs] @classmethod
def create(cls, mp, part_number, stream=None, **kwargs):
"""Create a new part object in a multipart object."""
if part_number < 0 or part_number > mp.last_part_number:
raise MultipartInvalidPartNumber()
with db.session.begin_nested():
obj = cls(
multipart=mp,
part_number=part_number,
)
db.session.add(obj)
if stream:
obj.set_contents(stream, **kwargs)
return obj
[docs] @classmethod
def get_or_none(cls, mp, part_number):
"""Get part number."""
return cls.query.filter_by(
upload_id=mp.upload_id, part_number=part_number
).one_or_none()
[docs] @classmethod
def get_or_create(cls, mp, part_number):
"""Get or create a part."""
obj = cls.get_or_none(mp, part_number)
if obj:
return obj
return cls.create(mp, part_number)
[docs] @classmethod
def delete(cls, mp, part_number):
"""Get part number."""
return cls.query.filter_by(
upload_id=mp.upload_id, part_number=part_number
).delete()
[docs] @classmethod
def query_by_multipart(cls, multipart):
"""Get all parts for a specific multipart upload.
:param multipart: A :class:`invenio_files_rest.models.MultipartObject`
instance.
:returns: A :class:`invenio_files_rest.models.Part` instance.
"""
upload_id = (
multipart.upload_id if isinstance(multipart, MultipartObject) else multipart
)
return cls.query.filter_by(upload_id=upload_id)
[docs] @classmethod
def count(cls, mp):
"""Count number of parts for a given multipart object."""
return cls.query_by_multipart(mp).count()
[docs] @ensure_uncompleted(getter=lambda o: not o.multipart.completed)
def set_contents(self, stream, progress_callback=None):
"""Save contents of stream to part of file instance.
If a the MultipartObject is completed this methods raises an
``MultipartAlreadyCompleted`` exception.
:param stream: File-like stream.
:param size: Size of stream if known.
:param chunk_size: Desired chunk size to read stream in. It is up to
the storage interface if it respects this value.
"""
size, checksum = self.multipart.file.update_contents(
stream,
seek=self.start_byte,
size=self.part_size,
progress_callback=progress_callback,
)
self.checksum = checksum
return self
__all__ = (
"Bucket",
"BucketTag",
"FileInstance",
"Location",
"MultipartObject",
"ObjectVersion",
"ObjectVersionTag",
"Part",
)