Skip to content
Snippets Groups Projects
thumbnail.py 6.49 KiB
Newer Older
  • Learn to ignore specific revisions
  • root's avatar
    root committed
    import os
    import magic
    from pdf2image import convert_from_bytes
    import logging
    from ckan.common import config 
    
    Thorge Petersen's avatar
    Thorge Petersen committed
    import urllib.request, urllib.error, urllib.parse
    
    root's avatar
    root committed
    
    
    from binascii import b2a_hex
    
    root's avatar
    root committed
    import ckan.plugins.toolkit as toolkit
    
    root's avatar
    root committed
    
    log = logging.getLogger(__name__)
    
    
    
    def create_thumbnail(context, resource):
        '''
        main entry point into this module
        this function is called from pdf_to_thumbnail.plugin
        '''
        old_filename = _get_filename_from_context(context)
        url_type = resource.get('url_type')
        if url_type == 'upload':
            is_PDF, filename = _create_thumbnail_from_memory(resource, old_filename)
        else:
            is_PDF, filename = (False, None)
        return is_PDF, filename  
    
    
    def _get_filename_from_context(context):
    
    root's avatar
    root committed
        package = context.get('package')
        package_id = package.id
    
        package= toolkit.get_action('package_show')(None, {'id': package_id})
    
    root's avatar
    root committed
        thumbnail = package.get('thumbnail') 
        return  thumbnail
    
    
    
    def _create_thumbnail_from_memory(resource, old_filename):
        filepath = get_resource_path(resource)
        is_PDF = _is_pdf(filepath)
        if is_PDF:
            with open(filepath, 'rb') as file:
                new_filename = _create_thumbnail_from_file(file)
            if old_filename:
                ThumbnailPath.from_filename(old_filename).remove()
            return is_PDF, new_filename
        else:
            return is_PDF, None
    
    
    root's avatar
    root committed
    
    
    def get_resource_path(resource):
        # see https://stackoverflow.com/questions/46572402/where-does-ckan-store-the-files-pushed-to-datastore-filestore
    
    root's avatar
    root committed
        resource_id = resource.get('id')
    
        filepath = os.path.join(
            config.get('ckan.storage_path'),
            'resources',
            resource_id[0:3],
            resource_id[3:6],
            resource_id[6:]
        )
        return filepath
    
    
    def _is_pdf(filepath):
        file_type = magic.from_file(filepath, mime = True)
        return file_type == 'application/pdf'
    
    
    root's avatar
    root committed
    
    
    def _create_thumbnail_from_file(file):
    
    root's avatar
    root committed
        width = config.get('ckan.thumbnail.size.width', 410)
    
        new_thumbnail = ThumbnailPath.from_unique_random_name()
    
    root's avatar
    root committed
        file.seek(0)
        file_read = file.read()
    
        convert_from_bytes(
            file_read,
            size=(width, None),
            output_folder=new_thumbnail.folder,
            output_file=new_thumbnail.filename,
            single_file=True,
            first_page=0,
            last_page=0,
            fmt='jpg'
        )
        return new_thumbnail.filename_with_extension
    
    
    def thumbnail_folder():
        return os.path.join(
            config.get('ckan.storage_path'),
            'thumbnail',
        )
    
    
    def rename_thumbnail_to_random_name(old_filename):
        '''
        used by pdf_to_thumbnail.action
        '''
        old_filepath = ThumbnailPath.from_filename_with_extension(old_filename)
        new_filepath = ThumbnailPath.from_unique_random_name()
        try:
            os.renames(old_filepath.full_filename, new_filepath.full_filename)
            return new_filepath.filename_with_extension
        except OSError:
            log.warning('The file path "{}"  of package was not found.'.format(old_filepath))
         
    
    root's avatar
    root committed
    
    def remove_thumbnail(context):
    
        '''
        used by pdf_to_thumbnail.action
        '''
        old_filename = _get_filename_from_context(context)
    
    root's avatar
    root committed
        if old_filename:
    
            ThumbnailPath.from_filename_with_extension(old_filename).remove()
    
    def resources_of_containing_package(resource):
        #todo: change arg order
        '''
        used by pdf_to_thumbnail.plugin
        '''
    
    root's avatar
    root committed
        package_id = resource.get('package_id')
    
        package = toolkit.get_action('package_show')(None, {'id': package_id})
    
    root's avatar
    root committed
        resources = package.get('resources')
    
        return resources
    
    def create_thumbnail_if_none_in_package(context, resources):
        '''
        used by pdf_to_thumbnail.plugin
        loops through a package's resources in the order they have been uploaded
        and for each tries to create a thumbnail until it succeeds.
        If the package already has a thumbnail the creation step is skipped
        '''
        package_dict = _get_package_dict_from_context(context)
        if not _has_thumbnail(package_dict):
            any(_try_create_thumbnail(context, r) for r in resources)
    
    
    def _get_package_dict_from_context(context):
        package_id = context.get('package').id
        package_dict = toolkit.get_action('package_show')(None, {'id': package_id})
        return package_dict
    
    
    def _has_thumbnail(package_dict):
        thumbnail = package_dict.get('thumbnail')
        return bool(thumbnail)
    
    
    def _try_create_thumbnail(context, resource):
        is_PDF, filename = create_thumbnail(context, resource)
        success = is_PDF
        if success:
            _write_thumbnail_into_package(context, filename)
        return success
    
    
    def _write_thumbnail_into_package(context, filename):
        package_dict = _get_package_dict_from_context(context)
        if filename:
            package_dict.update({'thumbnail': filename})
        toolkit.get_action('package_update')(None, package_dict)
        
    
    class ThumbnailPath(object):
        '''
        utility class to manage the path of thumbnail pictures
        '''
    
        def __init__(self, folder, filename, extension):
            self.folder = folder
            self.filename = filename
            self.extension = extension
        
        _EXTENSION = '.jpg'
        
        @staticmethod
        def from_filename(filename):
            '''
            filename without extension (i.e. '.jpg')
            '''
            return ThumbnailPath(thumbnail_folder(), filename, ThumbnailPath._EXTENSION)
        
        @staticmethod
        def from_filename_with_extension(filename_with_extension):
            '''
            limited to one dot in filename
            '''
            tokens = filename_with_extension.split('.')
            if len(tokens) == 1:
                filename = filename_with_extension
                extension = ''
    
    root's avatar
    root committed
            else:
    
                filename = '.'.join(tokens[:-1])
                extension = '.'.join(['', tokens[-1]])
            return ThumbnailPath(thumbnail_folder(), filename, extension)
    
        @staticmethod
        def from_unique_random_name():
            thumbnail_path = ThumbnailPath._from_random_name()
            if thumbnail_path.exists():
                return ThumbnailPath.from_unique_random_name()
            return thumbnail_path
        
        @staticmethod
        def _from_random_name():
            number = b2a_hex(os.urandom(15))
            filename = 'thumbnail_picture_' + str(number)
            return ThumbnailPath.from_filename(filename)
        
        @property
        def filename_with_extension(self):
            return self.filename + self.extension
        
        @property
        def full_filename(self):
            return os.path.join(self.folder, self.filename_with_extension)
        
        def exists(self):
            return os.path.exists(self.full_filename)
        
        def remove(self):
            if os.path.exists(self.full_filename):
                os.remove(self.full_filename)