Newer
Older
import os
import magic
from pdf2image import convert_from_bytes
import logging
from ckan.common import config
def create_thumbnail(context, resource):
'''
main entry point into this module
this function is called from pdf_to_thumbnail.plugin
'''
old_filename = _get_filename_from_context(context)
url_type = resource.get('url_type')
if url_type == 'upload':
is_PDF, filename = _create_thumbnail_from_memory(resource, old_filename)
else:
is_PDF, filename = (False, None)
return is_PDF, filename
def _get_filename_from_context(context):
package = context.get('package')
package_id = package.id
package= toolkit.get_action('package_show')(None, {'id': package_id})
thumbnail = package.get('thumbnail')
return thumbnail
def _create_thumbnail_from_memory(resource, old_filename):
filepath = get_resource_path(resource)
is_PDF = _is_pdf(filepath)
if is_PDF:
with open(filepath, 'rb') as file:
new_filename = _create_thumbnail_from_file(file)
if old_filename:
ThumbnailPath.from_filename(old_filename).remove()
return is_PDF, new_filename
else:
return is_PDF, None
def get_resource_path(resource):
# see https://stackoverflow.com/questions/46572402/where-does-ckan-store-the-files-pushed-to-datastore-filestore
filepath = os.path.join(
config.get('ckan.storage_path'),
'resources',
resource_id[0:3],
resource_id[3:6],
resource_id[6:]
)
return filepath
def _is_pdf(filepath):
file_type = magic.from_file(filepath, mime = True)
return file_type == 'application/pdf'
new_thumbnail = ThumbnailPath.from_unique_random_name()
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
convert_from_bytes(
file_read,
size=(width, None),
output_folder=new_thumbnail.folder,
output_file=new_thumbnail.filename,
single_file=True,
first_page=0,
last_page=0,
fmt='jpg'
)
return new_thumbnail.filename_with_extension
def thumbnail_folder():
return os.path.join(
config.get('ckan.storage_path'),
'thumbnail',
)
def rename_thumbnail_to_random_name(old_filename):
'''
used by pdf_to_thumbnail.action
'''
old_filepath = ThumbnailPath.from_filename_with_extension(old_filename)
new_filepath = ThumbnailPath.from_unique_random_name()
try:
os.renames(old_filepath.full_filename, new_filepath.full_filename)
return new_filepath.filename_with_extension
except OSError:
log.warning('The file path "{}" of package was not found.'.format(old_filepath))
'''
used by pdf_to_thumbnail.action
'''
old_filename = _get_filename_from_context(context)
ThumbnailPath.from_filename_with_extension(old_filename).remove()
def resources_of_containing_package(resource):
#todo: change arg order
'''
used by pdf_to_thumbnail.plugin
'''
package = toolkit.get_action('package_show')(None, {'id': package_id})
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def create_thumbnail_if_none_in_package(context, resources):
'''
used by pdf_to_thumbnail.plugin
loops through a package's resources in the order they have been uploaded
and for each tries to create a thumbnail until it succeeds.
If the package already has a thumbnail the creation step is skipped
'''
package_dict = _get_package_dict_from_context(context)
if not _has_thumbnail(package_dict):
any(_try_create_thumbnail(context, r) for r in resources)
def _get_package_dict_from_context(context):
package_id = context.get('package').id
package_dict = toolkit.get_action('package_show')(None, {'id': package_id})
return package_dict
def _has_thumbnail(package_dict):
thumbnail = package_dict.get('thumbnail')
return bool(thumbnail)
def _try_create_thumbnail(context, resource):
is_PDF, filename = create_thumbnail(context, resource)
success = is_PDF
if success:
_write_thumbnail_into_package(context, filename)
return success
def _write_thumbnail_into_package(context, filename):
package_dict = _get_package_dict_from_context(context)
if filename:
package_dict.update({'thumbnail': filename})
toolkit.get_action('package_update')(None, package_dict)
class ThumbnailPath(object):
'''
utility class to manage the path of thumbnail pictures
'''
def __init__(self, folder, filename, extension):
self.folder = folder
self.filename = filename
self.extension = extension
_EXTENSION = '.jpg'
@staticmethod
def from_filename(filename):
'''
filename without extension (i.e. '.jpg')
'''
return ThumbnailPath(thumbnail_folder(), filename, ThumbnailPath._EXTENSION)
@staticmethod
def from_filename_with_extension(filename_with_extension):
'''
limited to one dot in filename
'''
tokens = filename_with_extension.split('.')
if len(tokens) == 1:
filename = filename_with_extension
extension = ''
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
filename = '.'.join(tokens[:-1])
extension = '.'.join(['', tokens[-1]])
return ThumbnailPath(thumbnail_folder(), filename, extension)
@staticmethod
def from_unique_random_name():
thumbnail_path = ThumbnailPath._from_random_name()
if thumbnail_path.exists():
return ThumbnailPath.from_unique_random_name()
return thumbnail_path
@staticmethod
def _from_random_name():
number = b2a_hex(os.urandom(15))
filename = 'thumbnail_picture_' + str(number)
return ThumbnailPath.from_filename(filename)
@property
def filename_with_extension(self):
return self.filename + self.extension
@property
def full_filename(self):
return os.path.join(self.folder, self.filename_with_extension)
def exists(self):
return os.path.exists(self.full_filename)
def remove(self):
if os.path.exists(self.full_filename):
os.remove(self.full_filename)