Fix #5398 and #5395 - Fix tests failing due to problem creating connection for alembic
[larjonas-mediagoblin.git] / mediagoblin / plugins / piwigo / tools.py
blob7b9b7af3283203d4aa69b79e0cb6e612bf3ea72f
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2013 MediaGoblin contributors. See AUTHORS.
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 from collections import namedtuple
18 import logging
20 import six
21 import lxml.etree as ET
22 from werkzeug.exceptions import MethodNotAllowed, BadRequest
24 from mediagoblin.tools.request import setup_user_in_request
25 from mediagoblin.tools.response import Response
28 _log = logging.getLogger(__name__)
31 PwgError = namedtuple("PwgError", ["code", "msg"])
34 class PwgNamedArray(list):
35 def __init__(self, l, item_name, as_attrib=()):
36 self.item_name = item_name
37 self.as_attrib = as_attrib
38 list.__init__(self, l)
40 def fill_element_xml(self, el):
41 for it in self:
42 n = ET.SubElement(el, self.item_name)
43 if isinstance(it, dict):
44 _fill_element_dict(n, it, self.as_attrib)
45 else:
46 _fill_element(n, it)
49 def _fill_element_dict(el, data, as_attr=()):
50 for k, v in six.iteritems(data):
51 if k in as_attr:
52 if not isinstance(v, six.string_types):
53 v = str(v)
54 el.set(k, v)
55 else:
56 n = ET.SubElement(el, k)
57 _fill_element(n, v)
60 def _fill_element(el, data):
61 if isinstance(data, bool):
62 if data:
63 el.text = "1"
64 else:
65 el.text = "0"
66 elif isinstance(data, six.string_types):
67 el.text = data
68 elif isinstance(data, int):
69 el.text = str(data)
70 elif isinstance(data, dict):
71 _fill_element_dict(el, data)
72 elif isinstance(data, PwgNamedArray):
73 data.fill_element_xml(el)
74 else:
75 _log.warn("Can't convert to xml: %r", data)
78 def response_xml(result):
79 r = ET.Element("rsp")
80 r.set("stat", "ok")
81 status = None
82 if isinstance(result, PwgError):
83 r.set("stat", "fail")
84 err = ET.SubElement(r, "err")
85 err.set("code", str(result.code))
86 err.set("msg", result.msg)
87 if result.code >= 100 and result.code < 600:
88 status = result.code
89 else:
90 _fill_element(r, result)
91 return Response(ET.tostring(r, encoding="utf-8", xml_declaration=True),
92 mimetype='text/xml', status=status)
95 class CmdTable(object):
96 _cmd_table = {}
98 def __init__(self, cmd_name, only_post=False):
99 assert not cmd_name in self._cmd_table
100 self.cmd_name = cmd_name
101 self.only_post = only_post
103 def __call__(self, to_be_wrapped):
104 assert not self.cmd_name in self._cmd_table
105 self._cmd_table[self.cmd_name] = (to_be_wrapped, self.only_post)
106 return to_be_wrapped
108 @classmethod
109 def find_func(cls, request):
110 if request.method == "GET":
111 cmd_name = request.args.get("method")
112 else:
113 cmd_name = request.form.get("method")
114 entry = cls._cmd_table.get(cmd_name)
115 if not entry:
116 return entry
117 _log.debug("Found method %s", cmd_name)
118 func, only_post = entry
119 if only_post and request.method != "POST":
120 _log.warn("Method %s only allowed for POST", cmd_name)
121 raise MethodNotAllowed()
122 return func
125 def check_form(form):
126 if not form.validate():
127 _log.error("form validation failed for form %r", form)
128 for f in form:
129 if len(f.errors):
130 _log.error("Errors for %s: %r", f.name, f.errors)
131 raise BadRequest()
132 dump = []
133 for f in form:
134 dump.append("%s=%r" % (f.name, f.data))
135 _log.debug("form: %s", " ".join(dump))
138 class PWGSession(object):
139 session_manager = None
141 def __init__(self, request):
142 self.request = request
143 self.in_pwg_session = False
145 def __enter__(self):
146 # Backup old state
147 self.old_session = self.request.session
148 self.old_user = self.request.user
149 # Load piwigo session into state
150 self.request.session = self.session_manager.load_session_from_cookie(
151 self.request)
152 setup_user_in_request(self.request)
153 self.in_pwg_session = True
154 return self
156 def __exit__(self, *args):
157 # Restore state
158 self.request.session = self.old_session
159 self.request.user = self.old_user
160 self.in_pwg_session = False
162 def save_to_cookie(self, response):
163 assert self.in_pwg_session
164 self.session_manager.save_session_to_cookie(self.request.session,
165 self.request, response)