Securing XPN’s SSL usage

XPN is a newsreader written in Python. It claims SSL support, but unfortunately it tries to use the Python stdlib SSL support, which has some issues. First, Python may not have been built with SSL support. Second, the builtin SSL support is not secure, as it does not do all the necessary certificate checks required of a safe SSL connection.

I was able to fix these issues in XPN by using M2Crypto; the code changes are nearly trivial. I offered the patch to the XPN maintainer, but at least at the moment he does not want to integrate this change because he wants to keep XPN dependencies down. It would be possible to make the change so that M2Crypto would be used if present and otherwise it would fall back to some other model, so hopefully he will change his mind.

--- nntplib_ssl.py	2007-01-07 05:49:35.000000000 -0800
+++ m2_nntplib_ssl.py	2007-07-14 18:19:01.000000000 -0700
@@ -1,6 +1,6 @@
 import nntplib
 import re
-import socket
+from M2Crypto import SSL

 NNTP_SSL_PORT = 563
 CRLF = '\r\n'
@@ -33,24 +33,23 @@
         self.certfile = certfile
         self.buffer = ""
         msg = "getaddrinfo returns an empty list"
-        self.sock = None
-        for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM):
-            af, socktype, proto, canonname, sa = res
-            try:
-                self.sock = socket.socket(af, socktype, proto)
-                self.sock.connect(sa)
-            except socket.error, msg:
-                if self.sock:
-                    self.sock.close()
-                self.sock = None
-                continue
-            break
-        if not self.sock:
-            raise socket.error, msg
+
+        self.ctx = SSL.Context()
+
+        # Need to load CA certificates for things to be secure
+        # This works on Ubuntu Dapper Drake
+        if self.ctx.load_verify_locations(capath='/etc/ssl/certs') != 1:
+            raise Exception('Could not load certificates')
+
+        # And then make sure we make the necessary checks
+        self.ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
+
+        # The rest of the security checks will happen automatically
+        self.sock = SSL.Connection(self.ctx)
+        self.sock.connect((self.host, self.port))
+
         self.file = self.sock.makefile('rb')
-        try: socket.ssl
-        except AttributeError: self.sslobj=None
-        else: self.sslobj = socket.ssl(self.sock, self.keyfile, self.certfile)
+
         self.debugging = 0
         self.welcome = self.getresp()

@@ -63,10 +62,10 @@
         if readermode:
             try:
                 self.welcome = self.shortcmd('mode reader')
-            except NNTPPermanentError:
+            except nntplib.NNTPPermanentError:
                 # error 500, probably 'not implemented'
                 pass
-            except NNTPTemporaryError, e:
+            except nntplib.NNTPTemporaryError, e:
                 if user and e.response[:3] == '480':
                     # Need authorization before 'mode reader'
                     readermode_afterauth = 1
@@ -89,21 +88,21 @@
             resp = self.shortcmd('authinfo user '+user)
             if resp[:3] == '381':
                 if not password:
-                    raise NNTPReplyError(resp)
+                    raise nntplib.NNTPReplyError(resp)
                 else:
                     resp = self.shortcmd(
                             'authinfo pass '+password)
                     if resp[:3] != '281':
-                        raise NNTPPermanentError(resp)
+                        raise nntplib.NNTPPermanentError(resp)
             if readermode_afterauth:
                 try:
                     self.welcome = self.shortcmd('mode reader')
-                except NNTPPermanentError:
+                except nntplib.NNTPPermanentError:
                     # error 500, probably 'not implemented'
                     pass

     def _fillBuffer(self):
-        localbuf = self.sslobj.read()
+        localbuf = self.sock.read()
         if len(localbuf) == 0:
             raise error_proto('-ERR EOF')
         self.buffer += localbuf
@@ -129,7 +128,7 @@
         line += CRLF
         bytes = len(line)
         while bytes > 0:
-            sent = self.sslobj.write(line)
+            sent = self.sock.write(line)
             if sent == bytes:
                 break    # avoid copy
             line = line[sent:]
@@ -143,5 +142,5 @@
         except error_proto, val:
             resp = val
         self.sock.close()
-        del self.sslobj, self.sock
+        del self.sock
         return resp

Similar Posts:

Leave a comment

Powered by WP Hashcash