Coverage for policyd_rate_limit/policyd.py: 83%
228 statements
« prev ^ index » next coverage.py v7.3.0, created at 2023-08-27 09:19 +0000
« prev ^ index » next coverage.py v7.3.0, created at 2023-08-27 09:19 +0000
1# This program is distributed in the hope that it will be useful, but WITHOUT
2# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
3# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
4# more details.
5#
6# You should have received a copy of the GNU General Public License version 3
7# along with this program; if not, write to the Free Software Foundation, Inc., 51
8# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
9#
10# (c) 2015-2016 Valentin Samir
11import os
12import sys
13import socket
14import time
15import select
16import traceback
17import ast
19from policyd_rate_limit import utils
20from policyd_rate_limit.utils import config
23class Pass(Exception):
24 pass
27class PolicydError(Exception):
28 pass
31class PolicydConnectionClosed(PolicydError):
32 pass
35class Policyd(object):
36 """The policy server class"""
37 socket_data_read = {}
38 socket_data_write = {}
39 last_used = {}
40 last_deprecation_warning = 0
42 def socket(self):
43 """initialize the socket from the config parameters"""
44 # if socket is a string assume it is the path to an unix socket
45 if isinstance(config.SOCKET, str):
46 try:
47 os.remove(config.SOCKET)
48 except OSError:
49 if os.path.exists(config.SOCKET): # pragma: no cover (should not happen)
50 raise
51 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
52 # else asume its a tuple (bind_ip, bind_port)
53 elif ':' in config.SOCKET[0]: # assume ipv6 bind addresse
54 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
55 elif '.' in config.SOCKET[0]: # assume ipv4 bind addresse
56 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
57 else:
58 raise ValueError("bad socket %s" % (config.SOCKET,))
59 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
60 self.sock = sock
62 def close_socket(self):
63 """close the socket depending of the config parameters"""
64 self.sock.close()
65 # if socket was an unix socket, delete it after closing.
66 if isinstance(config.SOCKET, str):
67 try:
68 os.remove(config.SOCKET)
69 except OSError as error: # pragma: no cover (should not happen)
70 sys.stderr.write("%s\n" % error)
71 sys.stderr.flush()
73 def close_connection(self, connection):
74 """close a connection and clean read/write dict"""
75 # Clean up the connection
76 try:
77 del self.socket_data_read[connection]
78 except KeyError:
79 pass
80 try:
81 del self.socket_data_write[connection]
82 except KeyError:
83 pass
84 connection.close()
86 def close_write_conn(self, connection):
87 """Removes a socket from the write dict"""
88 try:
89 del self.socket_data_write[connection]
90 except KeyError:
91 if config.debug:
92 sys.stderr.write(
93 (
94 "Hmmm, a socket actually used to write a little "
95 "time ago wasn\'t in socket_data_write. Weird.\n"
96 )
97 )
99 def run(self):
100 """The main server loop"""
101 try:
102 sock = self.sock
103 sock.bind(config.SOCKET)
104 if isinstance(config.SOCKET, str):
105 os.chmod(config.SOCKET, config.socket_permission)
106 sock.listen(5)
107 self.socket_data_read[sock] = []
108 if config.debug:
109 sys.stderr.write('waiting for connections\n')
110 sys.stderr.flush()
111 while True:
112 # wait for a socket to read to or to write to
113 (rlist, wlist, _) = select.select(
114 self.socket_data_read.keys(), self.socket_data_write.keys(), []
115 )
116 for socket in rlist:
117 # if the socket is the main socket, there is a new connection to accept
118 if socket == sock:
119 connection, client_address = sock.accept()
120 if config.debug:
121 sys.stderr.write('connection from %s\n' % (client_address,))
122 sys.stderr.flush()
123 self.socket_data_read[connection] = []
125 # Updates the last_sed time for the socket.
126 self.last_used[connection] = time.time()
127 # else there is data to read on a client socket
128 else:
129 self.read(socket)
130 for socket in wlist:
131 try:
132 data = self.socket_data_write[socket]
133 sent = socket.send(data)
134 data_not_sent = data[sent:]
135 if data_not_sent: 135 ↛ 136line 135 didn't jump to line 136, because the condition on line 135 was never true
136 self.socket_data_write[socket] = data_not_sent
137 else:
138 self.close_write_conn(socket)
140 # Socket has been used, let's update its last_used time.
141 self.last_used[socket] = time.time()
142 # the socket has been closed during read
143 except KeyError:
144 pass
145 # Closes unused socket for a long time.
146 __to_rm = []
147 for (socket, last_used) in self.last_used.items():
148 if socket == sock: 148 ↛ 149line 148 didn't jump to line 149, because the condition on line 148 was never true
149 continue
150 if time.time() - last_used > config.delay_to_close: 150 ↛ 151line 150 didn't jump to line 151, because the condition on line 150 was never true
151 self.close_connection(socket)
152 __to_rm.append(socket)
153 for socket in __to_rm: 153 ↛ 154line 153 didn't jump to line 154, because the loop on line 153 never started
154 self.last_used.pop(socket)
156 except (KeyboardInterrupt, utils.Exit):
157 for socket in list(self.socket_data_read.keys()):
158 if socket != self.sock:
159 self.close_connection(sock)
160 raise
162 def read(self, connection):
163 """Called then a connection is ready for reads"""
164 try:
165 # get the current buffer of the connection
166 buffer = self.socket_data_read[connection]
167 # read data
168 data = connection.recv(1024).decode('UTF-8')
169 if not data:
170 raise PolicydConnectionClosed()
171 if config.debug:
172 sys.stderr.write(data)
173 sys.stderr.flush()
174 # accumulate it in buffer
175 buffer.append(data)
176 # if data len too short to determine if we are on an empty line, we
177 # concatene datas in buffer
178 if len(data) < 2: 178 ↛ 179line 178 didn't jump to line 179, because the condition on line 178 was never true
179 data = u"".join(buffer)
180 buffer = [data]
181 # We reach on empty line so the client has finish to send and wait for a response
182 if data[-2:] == "\n\n":
183 data = u"".join(buffer)
184 request = {}
185 # read data are like one key=value per line
186 for line in data.split("\n"):
187 line = line.strip()
188 try:
189 key, value = line.split(u"=", 1)
190 if value:
191 request[key] = value
192 # if value is empty, ignore it
193 except ValueError:
194 pass
195 # process the collected data in the action method
196 self.action(connection, request)
197 else:
198 self.socket_data_read[connection] = buffer
199 # Socket has been used, let's update its last_used time.
200 self.last_used[connection] = time.time()
201 except (KeyboardInterrupt, utils.Exit): 201 ↛ 202line 201 didn't jump to line 202, because the exception caught by line 201 didn't happen
202 self.close_connection(connection)
203 raise
204 except PolicydConnectionClosed: 204 ↛ 209line 204 didn't jump to line 209
205 if config.debug:
206 sys.stderr.write("Connection closed\n")
207 sys.stderr.flush()
208 self.close_connection(connection)
209 except Exception:
210 traceback.print_exc()
211 sys.stderr.flush()
212 self.close_connection(connection)
214 def action(self, connection, request):
215 """Called then the client has sent an empty line"""
216 id = None
217 # By default, we do not block emails
218 action = config.success_action
219 try:
220 if not config.database_is_initialized:
221 utils.database_init()
222 with utils.cursor() as cur:
223 try:
224 # only care if the protocol states is RCTP or DATA.
225 # If the policy delegation in postfix
226 # configuration is in smtpd_recipient_restrictions as said in the doc,
227 # possible states are RCPT and VRFY.
228 # If in smtpd_data_restrictions only DATA is possible.
229 if 'protocol_state' not in request: 229 ↛ 230line 229 didn't jump to line 230, because the condition on line 229 was never true
230 sys.stderr.write("Attribute 'protocol_state' not defined\n")
231 sys.stderr.flush()
232 raise Pass()
233 if config.count_mode not in {0, 1, 2}: 233 ↛ 234line 233 didn't jump to line 234, because the condition on line 233 was never true
234 sys.stderr.write("Settings 'count_mode' bad value %r\n" % (
235 config.count_mode,
236 ))
237 sys.stderr.flush()
238 raise Pass()
239 if config.count_mode == 0 and request['protocol_state'].upper() != "RCPT":
240 if config.debug:
241 sys.stderr.write(
242 "Ignoring 'protocol_state' %r\n" % (
243 request['protocol_state'].upper(),
244 )
245 )
246 sys.stderr.flush()
247 raise Pass()
248 if config.count_mode in {1, 2} and request['protocol_state'].upper() != "DATA": 248 ↛ 249line 248 didn't jump to line 249, because the condition on line 248 was never true
249 if config.debug:
250 sys.stderr.write(
251 "Ignoring 'protocol_state' %r\n" % (
252 request['protocol_state'].upper(),
253 )
254 )
255 sys.stderr.flush()
256 raise Pass()
257 if config.count_mode == 0:
258 if config.debug or time.time() - self.last_deprecation_warning > 60:
259 sys.stderr.write(
260 "WARNING: the 'count_mode' parameter is set to 0. "
261 "This is DEPRECATED. 'count_mode' should be set to 1 and postfix"
262 " config edited as stated in the README or "
263 "policyd-rate-limit.yaml(5)\n"
264 )
265 sys.stderr.flush()
266 self.last_deprecation_warning = time.time()
268 # if user is authenticated, we filter by sasl username
269 if config.limit_by_sasl and u'sasl_username' in request:
270 id = request[u'sasl_username']
271 # else, if activated, we filter by sender
272 elif config.limit_by_sender and u'sender' in request: 272 ↛ 273line 272 didn't jump to line 273, because the condition on line 272 was never true
273 id = request[u'sender']
274 # else, if activated, we filter by ip source addresse
275 elif (
276 config.limit_by_ip and
277 u'client_address' in request and
278 utils.is_ip_limited(request[u'client_address'])
279 ):
280 id = request[u'client_address']
281 # if the client neither send us client ip adresse nor sasl username, jump
282 # to the next section
283 else:
284 raise Pass()
286 if request['protocol_state'].upper() == "RCPT":
287 recipient_count = 1
288 elif request['protocol_state'].upper() == "DATA": 288 ↛ 295line 288 didn't jump to line 295, because the condition on line 288 was never false
289 if config.count_mode == 1:
290 recipient_count = max(int(request["recipient_count"]), 1)
291 else:
292 recipient_count = 1
294 # Custom limits per ID via SQL
295 custom_limits = config.limits_by_id
296 if config.sql_limits_by_id != "": 296 ↛ 297line 296 didn't jump to line 297, because the condition on line 296 was never true
297 try:
298 cur.execute(config.sql_limits_by_id, [id])
299 custom_limits[id] = ast.literal_eval(cur.fetchone()[0])
300 except TypeError:
301 custom_limits = config.limits_by_id
302 if config.debug:
303 sys.stderr.write(u"There is no limit rate in SQL for: %s\n" % (id))
304 sys.stderr.flush()
305 if config.debug:
306 sys.stderr.write(u"Custom limit(s): %s\n" % custom_limits)
307 sys.stderr.flush()
309 # Here we are limiting against sasl username, sender or source ip addresses.
310 # for each limit periods, we count the number of mails already send.
311 # if the a limit is reach, we change action to fail (deny the mail).
312 for mail_nb, delta in custom_limits.get(id, config.limits):
313 cur.execute(
314 (
315 "SELECT SUM(recipient_count) FROM mail_count "
316 "WHERE id = %s AND date >= %s"
317 ) % ((config.format_str,)*2),
318 (id, int(time.time() - delta))
319 )
320 nb = cur.fetchone()[0] or 0
321 if config.debug:
322 sys.stderr.write(
323 "%03d/%03d hit since %ss\n" % (
324 nb + recipient_count, mail_nb, delta
325 )
326 )
327 sys.stderr.flush()
328 if nb + recipient_count > mail_nb:
329 action = config.fail_action
330 if config.report and delta in config.report_limits:
331 utils.hit(cur, delta, id)
332 raise Pass()
333 except Pass:
334 pass
335 # If action is a success, record in the database that a new mail has just been sent
336 if action == config.success_action and id is not None:
337 if config.debug:
338 sys.stderr.write(u"insert id %s\n" % id)
339 sys.stderr.flush()
340 cur.execute(
341 "INSERT INTO mail_count VALUES (%s, %s, %s, %s, %s)" % (
342 (config.format_str,)*5
343 ),
344 (
345 id, int(time.time()), recipient_count,
346 request.get("instance", "empty"), request['protocol_state']
347 )
348 )
349 # If action is a failure and using legacy mode, remove previous
350 # recorded event for this mail in the
351 # database. The mail has not been sent, we should not count any recipient
352 if (
353 config.count_mode == 0 and
354 action == config.fail_action and
355 request['protocol_state'].upper() == "RCPT" and
356 request.get("instance")
357 ):
358 cur.execute(
359 "DELETE FROM mail_count WHERE instance = %s AND protocol_state = %s" % (
360 (config.format_str,)*2
361 ),
362 (request["instance"], request['protocol_state'])
363 )
364 except utils.cursor.backend_module.Error as error:
365 utils.cursor.del_db()
366 action = config.db_error_action
367 sys.stderr.write("Database error: %r\n" % error)
368 data = u"action=%s\n\n" % action
369 if config.debug:
370 sys.stderr.write(data)
371 sys.stderr.flush()
372 # return the result to the client
373 self.socket_data_write[connection] = data.encode('UTF-8')
375 # Wipe the read buffer (otherwise it'll be added up for eternity)
376 self.socket_data_read[connection].clear()
377 # Socket has been used, let's update its last_used time.
378 self.last_used[connection] = time.time()