Source code for

# -*- coding: utf-8 -*-
# This file is part of Invenio.
# Copyright (C) 2016-2019 CERN.
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Storage related module."""

from flask import current_app
from fs.opener import open_fs as opendir
from fs.path import basename, dirname, split

from ..helpers import make_path
from .base import FileStorage

[docs]class PyFSFileStorage(FileStorage): """File system storage using PyFilesystem for access the file. This storage class will store files according to the following pattern: ``<base_uri>/<file instance uuid>/data``. .. warning:: File operations are not atomic. E.g. if errors happens during e.g. updating part of a file it will leave the file in an inconsistent state. The storage class tries as best as possible to handle errors and leave the system in a consistent state. """ def __init__(self, fileurl, size=None, modified=None, clean_dir=True): """Storage initialization.""" self.fileurl = fileurl self.clean_dir = clean_dir super(PyFSFileStorage, self).__init__(size=size, modified=modified) def _get_fs(self, create_dir=True): """Return tuple with filesystem and filename.""" filedir = dirname(self.fileurl) filename = basename(self.fileurl) return ( opendir(filedir, writeable=True, create=create_dir), filename, )
[docs] def open(self, mode="rb"): """Open file. The caller is responsible for closing the file. """ fs, path = self._get_fs() return, mode=mode)
[docs] def delete(self): """Delete a file. The base directory is also removed, as it is assumed that only one file exists in the directory. """ fs, path = self._get_fs(create_dir=False) if fs.exists(path): fs.remove(path) # PyFilesystem2 really doesn't want to remove the root directory, # so we need to be a bit creative root_path, dir_name = split(fs.root_path) if self.clean_dir and dir_name: parent_fs = opendir(root_path, writeable=True, create=False) if parent_fs.exists(dir_name): parent_fs.removedir(dir_name) return True
[docs] def initialize(self, size=0): """Initialize file on storage and truncate to given size.""" fs, path = self._get_fs() # Required for reliably opening the file on certain file systems: if fs.exists(path): fp =, mode="r+b") else: fp =, mode="wb") try: fp.truncate(size) except Exception: fp.close() self.delete() raise finally: fp.close() self._size = size return self.fileurl, size, None
[docs] def save( self, incoming_stream, size_limit=None, size=None, chunk_size=None, progress_callback=None, ): """Save file in the file system.""" fp ="wb") try: bytes_written, checksum = self._write_stream( incoming_stream, fp, chunk_size=chunk_size, progress_callback=progress_callback, size_limit=size_limit, size=size, ) except Exception: fp.close() self.delete() raise finally: fp.close() self._size = bytes_written return self.fileurl, bytes_written, checksum
[docs] def update( self, incoming_stream, seek=0, size=None, chunk_size=None, progress_callback=None, ): """Update a file in the file system.""" fp ="r+b") try: bytes_written, checksum = self._write_stream( incoming_stream, fp, chunk_size=chunk_size, size=size, progress_callback=progress_callback, ) finally: fp.close() return bytes_written, checksum
[docs]def pyfs_storage_factory( fileinstance=None, default_location=None, default_storage_class=None, filestorage_class=PyFSFileStorage, fileurl=None, size=None, modified=None, clean_dir=True, ): """Get factory function for creating a PyFS file storage instance.""" # Either the FileInstance needs to be specified or all filestorage # class parameters need to be specified assert fileinstance or (fileurl and size) if fileinstance: # FIXME: Code here should be refactored since it assumes a lot on the # directory structure where the file instances are written fileurl = None size = fileinstance.size modified = fileinstance.updated if fileinstance.uri: # Use already existing URL. fileurl = fileinstance.uri else: assert default_location # Generate a new URL. fileurl = make_path( default_location, str(, "data", current_app.config["FILES_REST_STORAGE_PATH_DIMENSIONS"], current_app.config["FILES_REST_STORAGE_PATH_SPLIT_LENGTH"], ) return filestorage_class(fileurl, size=size, modified=modified, clean_dir=clean_dir)