Coverage for policyd_rate_limit/utils.py: 54%
328 statements
« prev ^ index » next coverage.py v7.3.0, created at 2023-08-27 09:19 +0000
« prev ^ index » next coverage.py v7.3.0, created at 2023-08-27 09:19 +0000
1# This program is distributed in the hope that it will be useful, but WITHOUT
2# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
3# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
4# more details.
5#
6# You should have received a copy of the GNU General Public License version 3
7# along with this program; if not, write to the Free Software Foundation, Inc., 51
8# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
9#
10# (c) 2015-2016 Valentin Samir
11# -*- mode: python; coding: utf-8 -*-
12import os
13import sys
14import threading
15import collections
16import ipaddress
17import time
18import imp
19import pwd
20import grp
21import warnings
22import smtplib
23import yaml
24from email.mime.text import MIMEText
25from email.mime.multipart import MIMEMultipart
27from policyd_rate_limit.const import SQLITE_DB, MYSQL_DB, PGSQL_DB
28from policyd_rate_limit import config as default_config
31def ip_network(ip_str):
32 try:
33 return ipaddress.IPv4Network(ip_str)
34 except ipaddress.AddressValueError:
35 return ipaddress.IPv6Network(ip_str)
38class Exit(Exception):
39 pass
42class Config(object):
43 """Act as a config module, missing parameters fallbacks to default_config"""
44 _config = None
45 config_file = None
46 database_is_initialized = False
48 def __init__(self, config_file=None):
49 if config_file is None:
50 # search for config files in the following locations
51 config_files = [
52 # will be deprecated in favor of .yaml file
53 os.path.expanduser("~/.config/policyd-rate-limit.conf"),
54 os.path.expanduser("~/.config/policyd-rate-limit.yaml"),
55 # will be deprecated in favor of .yaml file
56 "/etc/policyd-rate-limit.conf",
57 "/etc/policyd-rate-limit.yaml",
58 ]
59 else:
60 config_files = [config_file]
61 for config_file in config_files:
62 if os.path.isfile(config_file):
63 try:
64 # compatibility with old config style in a python module
65 if config_file.endswith(".conf"): # pragma: no cover (deprecated)
66 self._config = imp.load_source('config', config_file)
67 warnings.warn(
68 (
69 "New configuration use a .yaml file. "
70 "Please migrate to it and delete you .conf file"
71 ),
72 stacklevel=3
73 )
74 cache_file = imp.cache_from_source(config_file)
75 # remove the config pyc file
76 try:
77 os.remove(cache_file)
78 except OSError:
79 pass
80 # remove the __pycache__ dir of the config pyc file if empty
81 try:
82 os.rmdir(os.path.dirname(cache_file))
83 except OSError:
84 pass
85 # new config file use yaml
86 else:
87 with open(config_file) as f:
88 self._config = yaml.load(f, Loader=yaml.SafeLoader)
89 self.config_file = config_file
90 break
91 except PermissionError:
92 pass
93 # if not config file found, raise en error
94 else:
95 sys.stderr.write(
96 "No config file found or bad permissions, searched for %s\n" % (
97 ", ".join(config_files),
98 )
99 )
101 try: # pragma: no cover (deprecated)
102 self.limited_networks = [ip_network(net) for net in self.limited_netword]
103 warnings.warn(
104 (
105 "The limited_netword config parameter is deprecated, please use "
106 "limited_networks instead."
107 ),
108 stacklevel=3
109 )
110 except AttributeError:
111 self.limited_networks = [ip_network(net) for net in self.limited_networks]
113 def __getattr__(self, name):
114 try:
115 if self.config_file.endswith(".yaml"):
116 ret = self._config[name]
117 else: # pragma: no cover (deprecated)
118 ret = getattr(self._config, name)
119 # If an parameter is not defined in the config file, return its default value.
120 except (AttributeError, KeyError):
121 ret = getattr(default_config, name)
123 if name == "SOCKET":
124 if isinstance(ret, list):
125 ret = tuple(ret)
127 return ret
130class LazyConfig(object):
131 """A lazy proxy to the Config class allowing to import config before it is initialized"""
132 _config = None
133 format_str = None
135 def __getattr__(self, name):
136 if self._config is None: 136 ↛ 137line 136 didn't jump to line 137, because the condition on line 136 was never true
137 raise RuntimeError("config is not initialized")
138 return getattr(self._config, name)
140 def setup(self, config_file=None):
141 """initialize the config"""
142 global cursor
143 # initialize config
144 self._config = Config(config_file)
146 # make the cursor class function of the configured backend
147 if config.backend == SQLITE_DB:
148 cursor = make_cursor("sqlite_cursor", config.backend, config.sqlite_config)
149 self.format_str = "?"
150 elif config.backend == MYSQL_DB: 150 ↛ 151line 150 didn't jump to line 151, because the condition on line 150 was never true
151 cursor = make_cursor("mysql_cursor", config.backend, config.mysql_config)
152 self.format_str = "%s"
153 elif config.backend == PGSQL_DB: 153 ↛ 154line 153 didn't jump to line 154, because the condition on line 153 was never true
154 cursor = make_cursor("pgsql_cursor", config.backend, config.pgsql_config)
155 self.format_str = "%s"
156 else:
157 raise ValueError("backend %s unknown" % config.backend)
160def make_directories():
161 """Create directory for pid and socket and chown if needed"""
162 try:
163 uid = pwd.getpwnam(config.user).pw_uid
164 except KeyError:
165 raise ValueError("User %s in config do not exists" % config.user)
166 try:
167 gid = grp.getgrnam(config.group).gr_gid
168 except KeyError:
169 raise ValueError("Group %s in config do not exists" % config.group)
170 pid_dir = os.path.dirname(config.pidfile)
171 if not os.path.exists(pid_dir):
172 os.mkdir(pid_dir)
173 if not os.listdir(pid_dir):
174 os.chmod(pid_dir, 0o755)
175 os.chown(pid_dir, uid, gid)
176 if isinstance(config.SOCKET, str):
177 socket_dir = os.path.dirname(config.SOCKET)
178 if not os.path.exists(socket_dir):
179 os.mkdir(socket_dir)
180 if not os.listdir(socket_dir):
181 os.chown(socket_dir, uid, gid)
182 if config.backend == SQLITE_DB:
183 try:
184 db_dir = os.path.dirname(config.sqlite_config["database"])
185 if not os.path.exists(db_dir):
186 os.mkdir(db_dir)
187 if not os.listdir(db_dir):
188 os.chmod(db_dir, 0o700)
189 os.chown(db_dir, uid, gid)
190 except KeyError:
191 pass
194def drop_privileges():
195 """If current running use is root, drop privileges to user and group set in the config"""
196 if os.getuid() != 0:
197 # We're not root so, like, whatever dude
198 return
200 # Get the uid/gid from the name
201 running_uid = pwd.getpwnam(config.user).pw_uid
202 running_gid = grp.getgrnam(config.group).gr_gid
204 # Remove group privileges
205 os.setgroups([])
207 # Try setting the new uid/gid
208 os.setgid(running_gid)
209 os.setuid(running_uid)
211 # Ensure a very conservative umask
212 os.umask(0o077)
215def make_cursor(name, backend, config):
216 """Create a cursor class usable as a context manager, binding to the backend selected"""
217 if backend == MYSQL_DB: 217 ↛ 218line 217 didn't jump to line 218, because the condition on line 217 was never true
218 try:
219 import MySQLdb
220 except ImportError:
221 raise ValueError(
222 "You need to install the python3 module MySQLdb to use the mysql backend"
223 )
224 methods = {
225 '_db': collections.defaultdict(lambda: MySQLdb.connect(**config)),
226 'backend': MYSQL_DB,
227 'backend_module': MySQLdb,
228 }
229 elif backend == SQLITE_DB: 229 ↛ 236line 229 didn't jump to line 236, because the condition on line 229 was never false
230 import sqlite3
231 methods = {
232 '_db': collections.defaultdict(lambda: sqlite3.connect(**config)),
233 'backend': SQLITE_DB,
234 'backend_module': sqlite3,
235 }
236 elif backend == PGSQL_DB:
237 try:
238 import psycopg2
239 except ImportError:
240 raise ValueError(
241 "You need to install the python3 module psycopg2 to use the postgresql backend"
242 )
243 methods = {
244 '_db': collections.defaultdict(lambda: psycopg2.connect(**config)),
245 'backend': PGSQL_DB,
246 'backend_module': psycopg2,
247 }
248 else:
249 raise RuntimeError("backend %s unknown" % backend)
250 newclass = type(name, (_cursor,), methods)
251 return newclass
254class _cursor(object):
255 """cursor template class"""
256 backend = None
257 backend_module = None
259 @classmethod
260 def get_db(cls):
261 return cls._db[threading.current_thread()]
263 @classmethod
264 def set_db(cls, value):
265 cls._db[threading.current_thread()] = value
267 @classmethod
268 def del_db(cls):
269 try:
270 cls._db[threading.current_thread()].close()
271 except Exception:
272 pass
273 try:
274 del cls._db[threading.current_thread()]
275 except KeyError:
276 pass
278 def __enter__(self):
279 self.cur = self.get_db().cursor()
280 if self.backend in [MYSQL_DB, PGSQL_DB]: 280 ↛ 281line 280 didn't jump to line 281, because the condition on line 280 was never true
281 try:
282 if self.backend == MYSQL_DB:
283 self.cur.execute("DO 0")
284 else:
285 self.cur.execute("SELECT 0")
286 self.cur.fetchone()
287 except self.backend_module.Error as error:
288 # SQL server has gone away, probably a timeout, try to reconnect
289 # else, query on the returned cursor will raise on exception
290 if error.args[0] in [2002, 2003, 2006, 2013, 8000, 8001, 8003, 8004, 8006]:
291 self.del_db()
292 self.cur.close()
293 self.cur = self.get_db().cursor()
294 return self.cur
296 def __exit__(self, exc_type, exc_value, traceback):
297 self.cur.close()
298 self.get_db().commit()
301def is_ip_limited(ip):
302 """Check if ``ip`` is part of a network of ``config.limited_networks``"""
303 try:
304 ip = ipaddress.IPv4Address(ip)
305 except ipaddress.AddressValueError:
306 ip = ipaddress.IPv6Address(ip)
307 for net in config.limited_networks:
308 if ip in net:
309 return True
310 return False
313def print_fw(msg, length, filler=' ', align_left=True):
314 msg = "%s" % msg
315 if len(msg) > length:
316 raise ValueError("%r must not be longer than %s" % (msg, length))
317 if align_left:
318 return "%s%s" % (msg, filler * (length - len(msg)))
319 else:
320 return "%s%s" % (filler * (length - len(msg)), msg)
323def clean():
324 """Delete old records from the database"""
325 max_delta = 0
326 for nb, delta in config.limits:
327 max_delta = max(max_delta, delta)
328 # remove old record older than 2*max_delta
329 expired = int(time.time() - max_delta - max_delta)
330 report_text = ""
331 with cursor() as cur:
332 cur.execute("DELETE FROM mail_count WHERE date <= %s" % config.format_str, (expired,))
333 print("%d records deleted" % cur.rowcount)
334 # if report is True, generate a mail report
335 if config.report and config.report_to: 335 ↛ 337line 335 didn't jump to line 337, because the condition on line 335 was never false
336 report_text = gen_report(cur)
337 if config.report: 337 ↛ 331line 337 didn't jump to line 331
338 # The mail report has been successfully send, flush limit_report
339 cur.execute("DELETE FROM limit_report")
340 # send report
341 if len(report_text) != 0:
342 send_report(report_text)
344 try:
345 if config.backend == PGSQL_DB: 345 ↛ 347line 345 didn't jump to line 347, because the condition on line 345 was never true
346 # setting autocommit to True disable the transations. This is needed to run VACUUM
347 cursor.get_db().autocommit = True
348 with cursor() as cur:
349 if config.backend == PGSQL_DB: 349 ↛ 350line 349 didn't jump to line 350, because the condition on line 349 was never true
350 cur.execute("VACUUM ANALYZE")
351 elif config.backend == SQLITE_DB: 351 ↛ 353line 351 didn't jump to line 353, because the condition on line 351 was never false
352 cur.execute("VACUUM")
353 elif config.backend == MYSQL_DB:
354 if config.report:
355 cur.execute("OPTIMIZE TABLE mail_count, limit_report")
356 else:
357 cur.execute("OPTIMIZE TABLE mail_count")
358 finally:
359 if config.backend == PGSQL_DB: 359 ↛ 360line 359 didn't jump to line 360, because the condition on line 359 was never true
360 cursor.get_db().autocommit = False
363def gen_report(cur):
364 cur.execute("SELECT id, delta, hit FROM limit_report")
365 # list to sort ids by hits
366 report = list(cur.fetchall())
367 text = []
368 if not config.report_only_if_needed or report:
369 if report: 369 ↛ 370line 369 didn't jump to line 370, because the condition on line 369 was never true
370 text = ["Below is the table of users who hit a limit since the last cleanup:", ""]
371 # dist to groups deltas by ids
372 report_d = collections.defaultdict(list)
373 max_d = {'id': 2, 'delta': 5, 'hit': 3}
374 for (id, delta, hit) in report:
375 report_d[id].append((delta, hit))
376 max_d['id'] = max(max_d['id'], len(id))
377 max_d['delta'] = max(max_d['delta'], len(str(delta)) + 1)
378 max_d['hit'] = max(max_d['hit'], len(str(hit)))
379 # sort by hits
380 report.sort(key=lambda x: x[2])
381 # table header
382 text.append(
383 "|%s|%s|%s|" % (
384 print_fw("id", max_d['id']),
385 print_fw("delta", max_d['delta']),
386 print_fw("hit", max_d['hit'])
387 )
388 )
389 # table header/data separation
390 text.append(
391 "|%s+%s+%s|" % (
392 print_fw("", max_d['id'], filler='-'),
393 print_fw("", max_d['delta'], filler='-'),
394 print_fw("", max_d['hit'], filler='-')
395 )
396 )
398 for (id, _, _) in report:
399 # sort by delta
400 report_d[id].sort()
401 for (delta, hit) in report_d[id]:
402 # add a table row
403 text.append(
404 "|%s|%s|%s|" % (
405 print_fw(id, max_d['id'], align_left=False),
406 print_fw("%ss" % delta, max_d['delta'], align_left=False),
407 print_fw(hit, max_d['hit'], align_left=False)
408 )
409 )
410 else:
411 text = ["No user hit a limit since the last cleanup"]
412 text.extend(["", "-- ", "policyd-rate-limit"])
413 return text
416def send_report(text):
417 # check that smtp_server is wekk formated
418 if isinstance(config.smtp_server, (list, tuple)): 418 ↛ 419line 418 didn't jump to line 419, because the condition on line 418 was never true
419 if len(config.smtp_server) >= 2:
420 server = smtplib.SMTP(config.smtp_server[0], config.smtp_server[1])
421 elif len(config.smtp_server) == 1:
422 server = smtplib.SMTP(config.smtp_server[0], 25)
423 else:
424 raise ValueError("bad smtp_server should be a tuple (server_adress, port)")
425 else:
426 raise ValueError("bad smtp_server should be a tuple (server_adress, port)")
428 try:
429 # should we use starttls ?
430 if config.smtp_starttls:
431 server.starttls()
432 # should we use credentials ?
433 if config.smtp_credentials:
434 if (
435 isinstance(config.smtp_credentials, (list, tuple)) and
436 len(config.smtp_credentials) >= 2
437 ):
438 server.login(config.smtp_credentials[0], config.smtp_credentials[1])
439 else:
440 ValueError("bad smtp_credentials should be a tuple (login, password)")
442 if not isinstance(config.report_to, list):
443 report_to = [config.report_to]
444 else:
445 report_to = config.report_to
446 for rcpt in report_to:
447 # Start building the mail report
448 msg = MIMEMultipart()
449 msg['Subject'] = config.report_subject or ""
450 msg['From'] = config.report_from or ""
451 msg['To'] = rcpt
452 msg.attach(MIMEText("\n".join(text), 'plain'))
453 server.sendmail(config.report_from or "", rcpt, msg.as_string())
454 finally:
455 print('report is sent')
456 server.quit()
459def database_init():
460 """Initialize database (create the table and index)"""
461 with cursor() as cur:
462 query = """CREATE TABLE IF NOT EXISTS mail_count (
463 id varchar(40) NOT NULL,
464 date bigint NOT NULL,
465 recipient_count int DEFAULT 1,
466 instance varchar(40) NOT NULL,
467 protocol_state varchar(10) NOT NULL
468 );"""
469 if config.backend == MYSQL_DB: 469 ↛ 470line 469 didn't jump to line 470, because the condition on line 469 was never true
470 query_limits = """CREATE TABLE IF NOT EXISTS rate_limits (
471 id int NOT NULL AUTO_INCREMENT,
472 limits varchar(255) NOT NULL,
473 PRIMARY KEY (id)
474 );"""
475 else:
476 query_limits = """CREATE TABLE IF NOT EXISTS rate_limits (
477 id int NOT NULL,
478 limits varchar(255) NOT NULL,
479 PRIMARY KEY (id)
480 );"""
482 # if report is enable, also create the table for storing report datas
483 query_report = """CREATE TABLE IF NOT EXISTS limit_report (
484 id varchar(40) NOT NULL,
485 delta int NOT NULL,
486 hit int NOT NULL DEFAULT 0
487 );"""
488 # Test the table version
489 try:
490 cur.execute("SELECT recipient_count FROM mail_count")
491 except cursor.backend_module.Error as error:
492 # If the table mail_count exists but the new column
493 # recipient_count does not, drop the table (it only
494 # contains temporary data). It will be recreated below.
495 if ( 495 ↛ 507line 495 didn't jump to line 507
496 (cursor.backend == MYSQL_DB and error.args[0] == 1054) or
497 (
498 cursor.backend == SQLITE_DB and
499 error.args[0] == 'no such column: recipient_count'
500 ) or
501 (
502 cursor.backend == PGSQL_DB and
503 isinstance(error, cursor.backend_module.errors.UndefinedColumn)
504 )
506 ):
507 cursor.get_db().commit()
508 cur.execute("DROP TABLE mail_count")
509 # Mysql table 'mail_count' doesn't exist
510 elif cursor.backend == MYSQL_DB and error.args[0] == 1146: 510 ↛ 511line 510 didn't jump to line 511, because the condition on line 510 was never true
511 cursor.get_db().commit()
512 elif cursor.backend == SQLITE_DB and error.args[0] == 'no such table: mail_count': 512 ↛ 514line 512 didn't jump to line 514, because the condition on line 512 was never false
513 cursor.get_db().commit()
514 elif (
515 cursor.backend == PGSQL_DB and
516 isinstance(error, cursor.backend_module.errors.UndefinedTable)
517 ):
518 cursor.get_db().commit()
519 else:
520 raise
521 # Create the table if needed
522 try:
523 if cursor.backend == MYSQL_DB: 523 ↛ 525line 523 didn't jump to line 525, because the condition on line 523 was never true
524 # ignore possible warnings about the table already existing
525 warnings.filterwarnings('ignore', category=cursor.backend_module.Warning)
526 cur.execute(query)
527 if config.report:
528 cur.execute(query_report)
529 if config.sql_limits_by_id and config.backend in [MYSQL_DB, PGSQL_DB]: 529 ↛ 530line 529 didn't jump to line 530, because the condition on line 529 was never true
530 cur.execute(query_limits)
532 finally:
533 warnings.resetwarnings()
534 try:
535 cur.execute('CREATE INDEX %s mail_count_index ON mail_count(id, date)' % (
536 "" if cursor.backend == 1 else "IF NOT EXISTS"
537 ))
538 except cursor.backend_module.Error as error:
539 # Duplicate key name for the mysql backend
540 if error.args[0] not in [1061]:
541 raise
542 # if report is enable, create and unique index on (id, delta)
543 if config.report:
544 try:
545 cur.execute(
546 'CREATE UNIQUE INDEX %s limit_report_index ON limit_report(id, delta)' % (
547 "" if cursor.backend == 1 else "IF NOT EXISTS"
548 )
549 )
550 except cursor.backend_module.Error as error:
551 # Duplicate key name for the mysql backend
552 if error.args[0] not in [1061]:
553 raise
554 config.database_is_initialized = True
557def hit(cur, delta, id):
558 # if no row is updated, (id, delta) do not exists and insert
559 cur.execute(
560 "UPDATE limit_report SET hit=hit+1 WHERE id = %s and delta = %s" % (
561 (config.format_str,)*2
562 ),
563 (id, delta)
564 )
565 if cur.rowcount <= 0:
566 cur.execute(
567 "INSERT INTO limit_report (id, delta, hit) VALUES (%s, %s, 1)" % (
568 (config.format_str,)*2
569 ),
570 (id, delta)
571 )
574def write_pidfile():
575 """write current pid file to ``config.pidfile``"""
576 try:
577 with open(config.pidfile, 'w') as f:
578 f.write("%s" % os.getpid())
579 except PermissionError as error:
580 raise ValueError("Unable to write pid file, %s." % error)
583def remove_pidfile():
584 """Try to remove ``config.pidfile``"""
585 try:
586 os.remove(config.pidfile)
587 except OSError:
588 pass
591def get_config(dotted_string):
592 """
593 Return the config parameter designated by ``dotted_string``.
594 Dots are used as separator between dict and key.
595 """
596 params = dotted_string.split('.')
597 obj = getattr(config, params[0])
598 for param in params[1:]:
599 obj = obj[param]
600 return obj
603def exit_signal_handler(signal, frame):
604 """SIGUSR1 signal handler. Cause the program to exit gracefully.
605 Used for coverage computation"""
606 raise Exit()
609config = LazyConfig()