Fix #5398 and #5395 - Fix tests failing due to problem creating connection for alembic
[larjonas-mediagoblin.git] / mediagoblin / media_types / ascii / processing.py
blobed6600eee4522c588c16df5173693827d0729eb0
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 import argparse
17 import chardet
18 import os
19 try:
20 from PIL import Image
21 except ImportError:
22 import Image
23 import logging
25 import six
27 from mediagoblin import mg_globals as mgg
28 from mediagoblin.processing import (
29 create_pub_filepath, FilenameBuilder,
30 MediaProcessor, ProcessingManager,
31 get_process_filename, copy_original,
32 store_public, request_from_args)
33 from mediagoblin.media_types.ascii import asciitoimage
35 _log = logging.getLogger(__name__)
37 SUPPORTED_EXTENSIONS = ['txt', 'asc', 'nfo']
38 MEDIA_TYPE = 'mediagoblin.media_types.ascii'
41 def sniff_handler(media_file, filename):
42 _log.info('Sniffing {0}'.format(MEDIA_TYPE))
44 name, ext = os.path.splitext(filename)
45 clean_ext = ext[1:].lower()
47 if clean_ext in SUPPORTED_EXTENSIONS:
48 return MEDIA_TYPE
50 return None
53 class CommonAsciiProcessor(MediaProcessor):
54 """
55 Provides a base for various ascii processing steps
56 """
57 acceptable_files = ['original', 'unicode']
59 def common_setup(self):
60 self.ascii_config = mgg.global_config['plugins'][
61 'mediagoblin.media_types.ascii']
63 # Conversions subdirectory to avoid collisions
64 self.conversions_subdir = os.path.join(
65 self.workbench.dir, 'conversions')
66 os.mkdir(self.conversions_subdir)
68 # Pull down and set up the processing file
69 self.process_filename = get_process_filename(
70 self.entry, self.workbench, self.acceptable_files)
71 self.name_builder = FilenameBuilder(self.process_filename)
73 self.charset = None
75 def copy_original(self):
76 copy_original(
77 self.entry, self.process_filename,
78 self.name_builder.fill('{basename}{ext}'))
80 def _detect_charset(self, orig_file):
81 d_charset = chardet.detect(orig_file.read())
83 # Only select a non-utf-8 charset if chardet is *really* sure
84 # Tested with "Feli\x0109an superjaron", which was detected
85 if d_charset['confidence'] < 0.9:
86 self.charset = 'utf-8'
87 else:
88 self.charset = d_charset['encoding']
90 _log.info('Charset detected: {0}\nWill interpret as: {1}'.format(
91 d_charset,
92 self.charset))
94 # Rewind the file
95 orig_file.seek(0)
97 def store_unicode_file(self):
98 with open(self.process_filename, 'rb') as orig_file:
99 self._detect_charset(orig_file)
100 unicode_filepath = create_pub_filepath(self.entry,
101 'ascii-portable.txt')
103 with mgg.public_store.get_file(unicode_filepath, 'wb') \
104 as unicode_file:
105 # Decode the original file from its detected charset (or UTF8)
106 # Encode the unicode instance to ASCII and replace any
107 # non-ASCII with an HTML entity (&#
108 unicode_file.write(
109 six.text_type(orig_file.read().decode(
110 self.charset)).encode(
111 'ascii',
112 'xmlcharrefreplace'))
114 self.entry.media_files['unicode'] = unicode_filepath
116 def generate_thumb(self, font=None, thumb_size=None):
117 with open(self.process_filename, 'rb') as orig_file:
118 # If no font kwarg, check config
119 if not font:
120 font = self.ascii_config.get('thumbnail_font', None)
121 if not thumb_size:
122 thumb_size = (mgg.global_config['media:thumb']['max_width'],
123 mgg.global_config['media:thumb']['max_height'])
125 if self._skip_resizing(font, thumb_size):
126 return
128 tmp_thumb = os.path.join(
129 self.conversions_subdir,
130 self.name_builder.fill('{basename}.thumbnail.png'))
132 ascii_converter_args = {}
134 # If there is a font from either the config or kwarg, update
135 # ascii_converter_args
136 if font:
137 ascii_converter_args.update(
138 {'font': self.ascii_config['thumbnail_font']})
140 converter = asciitoimage.AsciiToImage(
141 **ascii_converter_args)
143 thumb = converter._create_image(
144 orig_file.read())
146 with open(tmp_thumb, 'w') as thumb_file:
147 thumb.thumbnail(
148 thumb_size,
149 Image.ANTIALIAS)
150 thumb.save(thumb_file)
152 thumb_info = {'font': font,
153 'width': thumb_size[0],
154 'height': thumb_size[1]}
156 _log.debug('Copying local file to public storage')
157 store_public(self.entry, 'thumb', tmp_thumb,
158 self.name_builder.fill('{basename}.thumbnail.jpg'))
160 self.entry.set_file_metadata('thumb', **thumb_info)
163 def _skip_resizing(self, font, thumb_size):
164 thumb_info = self.entry.get_file_metadata('thumb')
166 if not thumb_info:
167 return False
169 skip = True
171 if thumb_info.get('font') != font:
172 skip = False
173 elif thumb_info.get('width') != thumb_size[0]:
174 skip = False
175 elif thumb_info.get('height') != thumb_size[1]:
176 skip = False
178 return skip
181 class InitialProcessor(CommonAsciiProcessor):
183 Initial processing step for new ascii media
185 name = "initial"
186 description = "Initial processing"
188 @classmethod
189 def media_is_eligible(cls, entry=None, state=None):
190 if not state:
191 state = entry.state
192 return state in (
193 "unprocessed", "failed")
195 @classmethod
196 def generate_parser(cls):
197 parser = argparse.ArgumentParser(
198 description=cls.description,
199 prog=cls.name)
201 parser.add_argument(
202 '--thumb_size',
203 nargs=2,
204 metavar=('max_width', 'max_width'),
205 type=int)
207 parser.add_argument(
208 '--font',
209 help='the thumbnail font')
211 return parser
213 @classmethod
214 def args_to_request(cls, args):
215 return request_from_args(
216 args, ['thumb_size', 'font'])
218 def process(self, thumb_size=None, font=None):
219 self.common_setup()
220 self.store_unicode_file()
221 self.generate_thumb(thumb_size=thumb_size, font=font)
222 self.copy_original()
223 self.delete_queue_file()
226 class Resizer(CommonAsciiProcessor):
228 Resizing process steps for processed media
230 name = 'resize'
231 description = 'Resize thumbnail'
232 thumb_size = 'thumb_size'
234 @classmethod
235 def media_is_eligible(cls, entry=None, state=None):
237 Determine if this media type is eligible for processing
239 if not state:
240 state = entry.state
241 return state in 'processed'
243 @classmethod
244 def generate_parser(cls):
245 parser = argparse.ArgumentParser(
246 description=cls.description,
247 prog=cls.name)
249 parser.add_argument(
250 '--thumb_size',
251 nargs=2,
252 metavar=('max_width', 'max_height'),
253 type=int)
255 # Needed for gmg reprocess thumbs to work
256 parser.add_argument(
257 'file',
258 nargs='?',
259 default='thumb',
260 choices=['thumb'])
262 return parser
264 @classmethod
265 def args_to_request(cls, args):
266 return request_from_args(
267 args, ['thumb_size', 'file'])
269 def process(self, thumb_size=None, file=None):
270 self.common_setup()
271 self.generate_thumb(thumb_size=thumb_size)
274 class AsciiProcessingManager(ProcessingManager):
275 def __init__(self):
276 super(AsciiProcessingManager, self).__init__()
277 self.add_processor(InitialProcessor)
278 self.add_processor(Resizer)