Coverage for policyd_rate_limit/policyd.py: 83%

228 statements  

« prev     ^ index     » next       coverage.py v7.3.0, created at 2023-08-27 09:19 +0000

1# This program is distributed in the hope that it will be useful, but WITHOUT 

2# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

3# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for 

4# more details. 

5# 

6# You should have received a copy of the GNU General Public License version 3 

7# along with this program; if not, write to the Free Software Foundation, Inc., 51 

8# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

9# 

10# (c) 2015-2016 Valentin Samir 

11import os 

12import sys 

13import socket 

14import time 

15import select 

16import traceback 

17import ast 

18 

19from policyd_rate_limit import utils 

20from policyd_rate_limit.utils import config 

21 

22 

23class Pass(Exception): 

24 pass 

25 

26 

27class PolicydError(Exception): 

28 pass 

29 

30 

31class PolicydConnectionClosed(PolicydError): 

32 pass 

33 

34 

35class Policyd(object): 

36 """The policy server class""" 

37 socket_data_read = {} 

38 socket_data_write = {} 

39 last_used = {} 

40 last_deprecation_warning = 0 

41 

42 def socket(self): 

43 """initialize the socket from the config parameters""" 

44 # if socket is a string assume it is the path to an unix socket 

45 if isinstance(config.SOCKET, str): 

46 try: 

47 os.remove(config.SOCKET) 

48 except OSError: 

49 if os.path.exists(config.SOCKET): # pragma: no cover (should not happen) 

50 raise 

51 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 

52 # else asume its a tuple (bind_ip, bind_port) 

53 elif ':' in config.SOCKET[0]: # assume ipv6 bind addresse 

54 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 

55 elif '.' in config.SOCKET[0]: # assume ipv4 bind addresse 

56 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

57 else: 

58 raise ValueError("bad socket %s" % (config.SOCKET,)) 

59 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 

60 self.sock = sock 

61 

62 def close_socket(self): 

63 """close the socket depending of the config parameters""" 

64 self.sock.close() 

65 # if socket was an unix socket, delete it after closing. 

66 if isinstance(config.SOCKET, str): 

67 try: 

68 os.remove(config.SOCKET) 

69 except OSError as error: # pragma: no cover (should not happen) 

70 sys.stderr.write("%s\n" % error) 

71 sys.stderr.flush() 

72 

73 def close_connection(self, connection): 

74 """close a connection and clean read/write dict""" 

75 # Clean up the connection 

76 try: 

77 del self.socket_data_read[connection] 

78 except KeyError: 

79 pass 

80 try: 

81 del self.socket_data_write[connection] 

82 except KeyError: 

83 pass 

84 connection.close() 

85 

86 def close_write_conn(self, connection): 

87 """Removes a socket from the write dict""" 

88 try: 

89 del self.socket_data_write[connection] 

90 except KeyError: 

91 if config.debug: 

92 sys.stderr.write( 

93 ( 

94 "Hmmm, a socket actually used to write a little " 

95 "time ago wasn\'t in socket_data_write. Weird.\n" 

96 ) 

97 ) 

98 

99 def run(self): 

100 """The main server loop""" 

101 try: 

102 sock = self.sock 

103 sock.bind(config.SOCKET) 

104 if isinstance(config.SOCKET, str): 

105 os.chmod(config.SOCKET, config.socket_permission) 

106 sock.listen(5) 

107 self.socket_data_read[sock] = [] 

108 if config.debug: 

109 sys.stderr.write('waiting for connections\n') 

110 sys.stderr.flush() 

111 while True: 

112 # wait for a socket to read to or to write to 

113 (rlist, wlist, _) = select.select( 

114 self.socket_data_read.keys(), self.socket_data_write.keys(), [] 

115 ) 

116 for socket in rlist: 

117 # if the socket is the main socket, there is a new connection to accept 

118 if socket == sock: 

119 connection, client_address = sock.accept() 

120 if config.debug: 

121 sys.stderr.write('connection from %s\n' % (client_address,)) 

122 sys.stderr.flush() 

123 self.socket_data_read[connection] = [] 

124 

125 # Updates the last_sed time for the socket. 

126 self.last_used[connection] = time.time() 

127 # else there is data to read on a client socket 

128 else: 

129 self.read(socket) 

130 for socket in wlist: 

131 try: 

132 data = self.socket_data_write[socket] 

133 sent = socket.send(data) 

134 data_not_sent = data[sent:] 

135 if data_not_sent: 135 ↛ 136line 135 didn't jump to line 136, because the condition on line 135 was never true

136 self.socket_data_write[socket] = data_not_sent 

137 else: 

138 self.close_write_conn(socket) 

139 

140 # Socket has been used, let's update its last_used time. 

141 self.last_used[socket] = time.time() 

142 # the socket has been closed during read 

143 except KeyError: 

144 pass 

145 # Closes unused socket for a long time. 

146 __to_rm = [] 

147 for (socket, last_used) in self.last_used.items(): 

148 if socket == sock: 148 ↛ 149line 148 didn't jump to line 149, because the condition on line 148 was never true

149 continue 

150 if time.time() - last_used > config.delay_to_close: 150 ↛ 151line 150 didn't jump to line 151, because the condition on line 150 was never true

151 self.close_connection(socket) 

152 __to_rm.append(socket) 

153 for socket in __to_rm: 153 ↛ 154line 153 didn't jump to line 154, because the loop on line 153 never started

154 self.last_used.pop(socket) 

155 

156 except (KeyboardInterrupt, utils.Exit): 

157 for socket in list(self.socket_data_read.keys()): 

158 if socket != self.sock: 

159 self.close_connection(sock) 

160 raise 

161 

162 def read(self, connection): 

163 """Called then a connection is ready for reads""" 

164 try: 

165 # get the current buffer of the connection 

166 buffer = self.socket_data_read[connection] 

167 # read data 

168 data = connection.recv(1024).decode('UTF-8') 

169 if not data: 

170 raise PolicydConnectionClosed() 

171 if config.debug: 

172 sys.stderr.write(data) 

173 sys.stderr.flush() 

174 # accumulate it in buffer 

175 buffer.append(data) 

176 # if data len too short to determine if we are on an empty line, we 

177 # concatene datas in buffer 

178 if len(data) < 2: 178 ↛ 179line 178 didn't jump to line 179, because the condition on line 178 was never true

179 data = u"".join(buffer) 

180 buffer = [data] 

181 # We reach on empty line so the client has finish to send and wait for a response 

182 if data[-2:] == "\n\n": 

183 data = u"".join(buffer) 

184 request = {} 

185 # read data are like one key=value per line 

186 for line in data.split("\n"): 

187 line = line.strip() 

188 try: 

189 key, value = line.split(u"=", 1) 

190 if value: 

191 request[key] = value 

192 # if value is empty, ignore it 

193 except ValueError: 

194 pass 

195 # process the collected data in the action method 

196 self.action(connection, request) 

197 else: 

198 self.socket_data_read[connection] = buffer 

199 # Socket has been used, let's update its last_used time. 

200 self.last_used[connection] = time.time() 

201 except (KeyboardInterrupt, utils.Exit): 201 ↛ 202line 201 didn't jump to line 202, because the exception caught by line 201 didn't happen

202 self.close_connection(connection) 

203 raise 

204 except PolicydConnectionClosed: 204 ↛ 209line 204 didn't jump to line 209

205 if config.debug: 

206 sys.stderr.write("Connection closed\n") 

207 sys.stderr.flush() 

208 self.close_connection(connection) 

209 except Exception: 

210 traceback.print_exc() 

211 sys.stderr.flush() 

212 self.close_connection(connection) 

213 

214 def action(self, connection, request): 

215 """Called then the client has sent an empty line""" 

216 id = None 

217 # By default, we do not block emails 

218 action = config.success_action 

219 try: 

220 if not config.database_is_initialized: 

221 utils.database_init() 

222 with utils.cursor() as cur: 

223 try: 

224 # only care if the protocol states is RCTP or DATA. 

225 # If the policy delegation in postfix 

226 # configuration is in smtpd_recipient_restrictions as said in the doc, 

227 # possible states are RCPT and VRFY. 

228 # If in smtpd_data_restrictions only DATA is possible. 

229 if 'protocol_state' not in request: 229 ↛ 230line 229 didn't jump to line 230, because the condition on line 229 was never true

230 sys.stderr.write("Attribute 'protocol_state' not defined\n") 

231 sys.stderr.flush() 

232 raise Pass() 

233 if config.count_mode not in {0, 1, 2}: 233 ↛ 234line 233 didn't jump to line 234, because the condition on line 233 was never true

234 sys.stderr.write("Settings 'count_mode' bad value %r\n" % ( 

235 config.count_mode, 

236 )) 

237 sys.stderr.flush() 

238 raise Pass() 

239 if config.count_mode == 0 and request['protocol_state'].upper() != "RCPT": 

240 if config.debug: 

241 sys.stderr.write( 

242 "Ignoring 'protocol_state' %r\n" % ( 

243 request['protocol_state'].upper(), 

244 ) 

245 ) 

246 sys.stderr.flush() 

247 raise Pass() 

248 if config.count_mode in {1, 2} and request['protocol_state'].upper() != "DATA": 248 ↛ 249line 248 didn't jump to line 249, because the condition on line 248 was never true

249 if config.debug: 

250 sys.stderr.write( 

251 "Ignoring 'protocol_state' %r\n" % ( 

252 request['protocol_state'].upper(), 

253 ) 

254 ) 

255 sys.stderr.flush() 

256 raise Pass() 

257 if config.count_mode == 0: 

258 if config.debug or time.time() - self.last_deprecation_warning > 60: 

259 sys.stderr.write( 

260 "WARNING: the 'count_mode' parameter is set to 0. " 

261 "This is DEPRECATED. 'count_mode' should be set to 1 and postfix" 

262 " config edited as stated in the README or " 

263 "policyd-rate-limit.yaml(5)\n" 

264 ) 

265 sys.stderr.flush() 

266 self.last_deprecation_warning = time.time() 

267 

268 # if user is authenticated, we filter by sasl username 

269 if config.limit_by_sasl and u'sasl_username' in request: 

270 id = request[u'sasl_username'] 

271 # else, if activated, we filter by sender 

272 elif config.limit_by_sender and u'sender' in request: 272 ↛ 273line 272 didn't jump to line 273, because the condition on line 272 was never true

273 id = request[u'sender'] 

274 # else, if activated, we filter by ip source addresse 

275 elif ( 

276 config.limit_by_ip and 

277 u'client_address' in request and 

278 utils.is_ip_limited(request[u'client_address']) 

279 ): 

280 id = request[u'client_address'] 

281 # if the client neither send us client ip adresse nor sasl username, jump 

282 # to the next section 

283 else: 

284 raise Pass() 

285 

286 if request['protocol_state'].upper() == "RCPT": 

287 recipient_count = 1 

288 elif request['protocol_state'].upper() == "DATA": 288 ↛ 295line 288 didn't jump to line 295, because the condition on line 288 was never false

289 if config.count_mode == 1: 

290 recipient_count = max(int(request["recipient_count"]), 1) 

291 else: 

292 recipient_count = 1 

293 

294 # Custom limits per ID via SQL 

295 custom_limits = config.limits_by_id 

296 if config.sql_limits_by_id != "": 296 ↛ 297line 296 didn't jump to line 297, because the condition on line 296 was never true

297 try: 

298 cur.execute(config.sql_limits_by_id, [id]) 

299 custom_limits[id] = ast.literal_eval(cur.fetchone()[0]) 

300 except TypeError: 

301 custom_limits = config.limits_by_id 

302 if config.debug: 

303 sys.stderr.write(u"There is no limit rate in SQL for: %s\n" % (id)) 

304 sys.stderr.flush() 

305 if config.debug: 

306 sys.stderr.write(u"Custom limit(s): %s\n" % custom_limits) 

307 sys.stderr.flush() 

308 

309 # Here we are limiting against sasl username, sender or source ip addresses. 

310 # for each limit periods, we count the number of mails already send. 

311 # if the a limit is reach, we change action to fail (deny the mail). 

312 for mail_nb, delta in custom_limits.get(id, config.limits): 

313 cur.execute( 

314 ( 

315 "SELECT SUM(recipient_count) FROM mail_count " 

316 "WHERE id = %s AND date >= %s" 

317 ) % ((config.format_str,)*2), 

318 (id, int(time.time() - delta)) 

319 ) 

320 nb = cur.fetchone()[0] or 0 

321 if config.debug: 

322 sys.stderr.write( 

323 "%03d/%03d hit since %ss\n" % ( 

324 nb + recipient_count, mail_nb, delta 

325 ) 

326 ) 

327 sys.stderr.flush() 

328 if nb + recipient_count > mail_nb: 

329 action = config.fail_action 

330 if config.report and delta in config.report_limits: 

331 utils.hit(cur, delta, id) 

332 raise Pass() 

333 except Pass: 

334 pass 

335 # If action is a success, record in the database that a new mail has just been sent 

336 if action == config.success_action and id is not None: 

337 if config.debug: 

338 sys.stderr.write(u"insert id %s\n" % id) 

339 sys.stderr.flush() 

340 cur.execute( 

341 "INSERT INTO mail_count VALUES (%s, %s, %s, %s, %s)" % ( 

342 (config.format_str,)*5 

343 ), 

344 ( 

345 id, int(time.time()), recipient_count, 

346 request.get("instance", "empty"), request['protocol_state'] 

347 ) 

348 ) 

349 # If action is a failure and using legacy mode, remove previous 

350 # recorded event for this mail in the 

351 # database. The mail has not been sent, we should not count any recipient 

352 if ( 

353 config.count_mode == 0 and 

354 action == config.fail_action and 

355 request['protocol_state'].upper() == "RCPT" and 

356 request.get("instance") 

357 ): 

358 cur.execute( 

359 "DELETE FROM mail_count WHERE instance = %s AND protocol_state = %s" % ( 

360 (config.format_str,)*2 

361 ), 

362 (request["instance"], request['protocol_state']) 

363 ) 

364 except utils.cursor.backend_module.Error as error: 

365 utils.cursor.del_db() 

366 action = config.db_error_action 

367 sys.stderr.write("Database error: %r\n" % error) 

368 data = u"action=%s\n\n" % action 

369 if config.debug: 

370 sys.stderr.write(data) 

371 sys.stderr.flush() 

372 # return the result to the client 

373 self.socket_data_write[connection] = data.encode('UTF-8') 

374 

375 # Wipe the read buffer (otherwise it'll be added up for eternity) 

376 self.socket_data_read[connection].clear() 

377 # Socket has been used, let's update its last_used time. 

378 self.last_used[connection] = time.time()