2 from os
.path
import join
6 from urllib
.parse
import urlparse
11 import luigi
.contrib
.postgres
13 """We mirror the public procurement Excel files of the "Función pública"
14 Ministry as a PostgreSQL table.
16 This Luigi pipeline may be used to initialize, or to upgrade a mirror. The
17 pipeline proposes a straightforward "download, extract and ingest" process.
19 The FuncionPublicaMirror class triggers the pipeline. This is a possible
21 PYTHONPATH='.' /path/to/luigi --local-scheduler --module FuncionPublicaMirror FuncionPublicaMirror
23 If bandwidth isn't too bad, the process should take no more than 45 minutes to
28 class PgConfiguration(luigi
.Config
):
29 """Read configuration of Postgres connection."""
30 host
= luigi
.Parameter()
31 db
= luigi
.Parameter()
32 user
= luigi
.Parameter()
33 password
= luigi
.Parameter()
36 class FuncionPublicaMirror(luigi
.WrapperTask
):
37 """Trigger the pipeline. We use the wrapper task pattern."""
38 get_time
= datetime
.datetime
.now()
39 date
= luigi
.DateSecondParameter(default
=get_time
)
40 date_string
= get_time
.strftime('%Y-%m-%dT%H%M%S')
41 files_download
= luigi
.Parameter(
42 default
='/tmp/funcion-publica-files-' + date_string
)
43 urls
= luigi
.ListParameter(default
=[
44 'https://compranetinfo.funcionpublica.gob.mx/descargas/cnet/Contratos2010_2012.zip',
45 'https://compranetinfo.funcionpublica.gob.mx/descargas/cnet/Contratos2013.zip',
46 'https://compranetinfo.funcionpublica.gob.mx/descargas/cnet/Contratos2014.zip',
47 'https://upcp.funcionpublica.gob.mx/descargas/Contratos2015.zip',
48 'https://upcp.funcionpublica.gob.mx/descargas/Contratos2016.zip',
49 'https://upcp.funcionpublica.gob.mx/descargas/Contratos2017.zip',
55 date
=self
.date
, urls
=self
.urls
, files_download
=self
.files_download
)
57 date
=self
.date
, urls
=self
.urls
, files_download
=self
.files_download
)
59 date
=self
.date
, urls
=self
.urls
, files_download
=self
.files_download
)
60 yield ExcelToPostgres(
61 date
=self
.date
, urls
=self
.urls
, files_download
=self
.files_download
)
63 date
=self
.date
, urls
=self
.urls
, files_download
=self
.files_download
)
66 class Download(luigi
.Task
):
67 """Download zipped Excel files that "Función pública" Ministry releases.
69 The output target is a plain text list of the download url's
71 This class is a straightforward use of the Requests module.
73 date
= luigi
.DateSecondParameter()
74 urls
= luigi
.ListParameter()
75 files_download
= luigi
.Parameter()
79 if not os
.path
.exists(self
.files_download
):
80 os
.makedirs(self
.files_download
)
82 output
= self
.output().open('w')
86 # XXX For now, the output target just contains some url's
89 r
= requests
.get(url
, verify
=False)
91 file_name
= o
.path
.split('/')[-1]
92 local_file_path
= self
.files_download
+ '/' + file_name
93 with
open(local_file_path
, 'wb') as handle
:
94 for data
in r
.iter_content():
100 return luigi
.LocalTarget(
102 '/tmp/funcion-publica-download-%Y-%m-%dT%H%M%S'))
105 class Unzip(luigi
.Task
):
106 """Extract Excel files.
108 The output target is a plain text list of the zip files.
110 This class is a straightforward use of the zipfile module.
112 date
= luigi
.DateSecondParameter()
113 urls
= luigi
.ListParameter()
114 files_download
= luigi
.Parameter()
118 output
= self
.output().open('w')
119 is_zip_file
= re
.compile('.*zip$')
121 for root
, dirs
, files
in os
.walk(self
.files_download
):
123 if is_zip_file
.match(myfile
):
124 z
= zipfile
.ZipFile(join(root
, myfile
))
125 z
.extractall(path
=self
.files_download
)
126 # XXX For now, the output target just contains some file names
132 return luigi
.LocalTarget(
133 self
.date
.strftime('/tmp/funcion-publica-unzip-%Y-%m-%dT%H%M%S'))
137 date
=self
.date
, urls
=self
.urls
, files_download
=self
.files_download
)
140 class DropOlderTable(luigi
.contrib
.postgres
.PostgresQuery
):
141 """If need be, drop previous version of mirror before upgrade."""
142 host
= PgConfiguration().host
143 database
= PgConfiguration().db
144 user
= PgConfiguration().user
145 password
= PgConfiguration().password
147 date
= luigi
.DateSecondParameter()
148 urls
= luigi
.ListParameter()
149 files_download
= luigi
.Parameter()
151 table
= "compranet.mirror"
152 query
= "drop table if exists compranet.mirror;"
156 return self
.date
.strftime('remove-%Y-%m-%dT%H%M%S')
159 connection
= self
.output().connect()
160 cursor
= connection
.cursor()
165 # Update marker table
166 self
.output().touch(connection
)
168 # commit and close connection
174 date
=self
.date
, urls
=self
.urls
, files_download
=self
.files_download
)
177 class ExcelToPostgres(luigi
.contrib
.postgres
.CopyToTable
):
178 """Write data from Excel files to a Postgres table.
180 The set of Excel files lends itself to be modelled by a single Postgres
181 table. Their columns are our columns.
183 The "rows" method override needs to take into account the fact that the
184 Excel file for the 2010 - 2012 period lacks the "identificador_cm" column.
185 The column is left empty for the period.
187 host
= PgConfiguration().host
188 database
= PgConfiguration().db
189 user
= PgConfiguration().user
190 password
= PgConfiguration().password
191 table
= "compranet.mirror"
193 date
= luigi
.DateSecondParameter()
194 urls
= luigi
.ListParameter()
195 files_download
= luigi
.Parameter()
197 # Documentations say: null_values = container of values that should be inserted as NULL values
201 ("gobierno", "text"),
203 ("dependencia", "text"),
205 ("nombre_de_la_uc", "text"),
206 ("responsable", "text"),
207 ("codigo_expediente", "text"),
208 ("titulo_expediente", "text"),
209 ("plantilla_expediente", "text"),
210 ("numero_procedimiento", "text"),
211 ("exp_f_fallo", "date"),
212 ("proc_f_publicacion", "date"),
213 ("fecha_apertura_proposiciones", "date"),
214 ("caracter", "text"),
215 ("tipo_contratacion", "text"),
216 ("tipo_procedimiento", "text"),
217 ("forma_procedimiento", "text"),
218 ("codigo_contrato", "text"),
219 ("titulo_contrato", "text"),
220 ("fecha_inicio", "date"),
221 ("fecha_fin", "date"),
222 ("importe_contrato", "numeric"),
224 ("estatus_contrato", "text"),
225 ("archivado", "text"),
226 ("convenio_modificatorio", "text"),
228 ("clave_programa", "text"),
229 ("aportacion_federal", "numeric"),
230 ("fecha_celebracion", "date"),
231 ("contrato_marco", "text"),
232 ("identificador_cm", "text"),
233 ("compra_consolidada", "text"),
234 ("plurianual", "text"),
235 ("clave_cartera_shcp", "text"),
236 ("estratificacion_muc", "text"),
237 ("folio_rupc", "text"),
238 ("proveedor_contratista", "text"),
239 ("estratificacion_mpc", "text"),
240 ("siglas_pais", "text"),
241 ("estatus_empresa", "text"),
242 ("cuenta_administrada_por", "text"),
243 ("c_externo", "text"),
244 ("organismo", "text"),
248 # https://luigi.readthedocs.io/en/stable/api/luigi.contrib.rdbms.html#luigi.contrib.rdbms.CopyToTable.update_id
251 return self
.date
.strftime('insert-%Y-%m-%dT%H%M%S')
254 """Yield lists corresponding to each row to be inserted.
256 For some reason, the 2010 - 2012 Excel file misses the 32nd column,
257 corresponding to the "identificador_cm" value. We insert empty strings
260 ten_twelve_exception
= re
.compile(
261 '.*2010_2012.*') # Match the name of 2010-2012 Excel file
262 is_excel_file
= re
.compile('.*xlsx$')
263 date_string
= self
.date
.strftime('%Y%m%d')
265 for root
, dirs
, files
in os
.walk(self
.files_download
):
269 if is_excel_file
.match(myfile
):
270 wb
= xlrd
.open_workbook(join(root
, myfile
))
271 sh
= wb
.sheet_by_index(0)
272 if ten_twelve_exception
.match(myfile
):
273 for rownum
in range(1, sh
.nrows
):
274 row
= sh
.row_values(rownum
)[:31] + [
276 ] + sh
.row_values(rownum
)[31:]
279 for rownum
in range(1, sh
.nrows
):
280 row
= sh
.row_values(rownum
)
284 """The previous mirror should be dropped before inserting new data."""
285 return DropOlderTable(
286 date
=self
.date
, urls
=self
.urls
, files_download
=self
.files_download
)
289 class DeleteFiles(luigi
.Task
):
290 """Remove files that were downloaded and extracted."""
291 date
= luigi
.DateSecondParameter()
292 urls
= luigi
.ListParameter()
293 files_download
= luigi
.Parameter()
297 shutil
.rmtree(self
.files_download
)
299 # XXX For now, the output target just contains a date
300 output
= self
.output().open('w')
301 output
.write(self
.date
.strftime('%Y-%m-%dT%H%M%S'))
305 return luigi
.LocalTarget(
307 '/tmp/funcion-publica-delete-files-%Y-%m-%dT%H%M%S'))
310 return ExcelToPostgres(
311 date
=self
.date
, urls
=self
.urls
, files_download
=self
.files_download
)