= Adding the Gopher Protocol to QtWebKit =
{{{
#!/usr/bin/env python
import os, sys
from PyQt4.QtCore import QObject, QTimer, QUrl, QVariant, SIGNAL
from PyQt4.QtGui import *
from PyQt4.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply, QTcpSocket
from PyQt4.QtWebKit import QWebView
class Downloader(QObject):
def __init__(self, parent = None):
QObject.__init__(self, parent)
self.path = u""
def saveFile(self, reply):
fileName = unicode(reply.url().path()).split(u"/")[-1]
if self.path:
fileName = os.path.join(self.path, fileName)
path = unicode(QFileDialog.getSaveFileName(self.parent(),
self.tr("Save File"), fileName))
if path:
try:
open(unicode(path), "w").write(str(reply.readAll()))
self.path = os.path.split(path)[0]
except IOError:
QMessageBox.warning(self.parent(), self.tr("Download Failed"),
self.tr("Failed to save the file."))
class GopherReply(QNetworkReply):
def __init__(self, url):
QNetworkReply.__init__(self)
self.gopher = QTcpSocket()
self.gopher.bytesWritten.connect(self.writeGopherData)
self.gopher.readyRead.connect(self.readGopherData)
self.gopher.connected.connect(self.getResource)
self.gopher.disconnected.connect(self.setContent)
self.input = ""
self.output = ""
self.content = ""
self.offset = 0
self.setUrl(url)
self.gopher.connectToHost(url.host(), 70)
def getResource(self):
path = self.url().path()
if path.isEmpty() or path == u"/":
self.output = "\r\n"
else:
self.output = str(path) + "\r\n"
self.writeGopherData()
def readGopherData(self):
self.input += str(self.gopher.readAll())
def writeGopherData(self, written = 0):
self.output = self.output[written:]
if self.output:
self.gopher.write(self.output)
def html(self, text):
return unicode(text).replace(u"&", u"&").replace(u"<", u"<").replace(u">", u">")
def setContent(self):
if self.url().hasQueryItem(u"type"):
self.setContentData()
else:
self.setContentList()
def setContentData(self):
self.open(self.ReadOnly | self.Unbuffered)
if self.url().queryItemValue(u"type") == u"text":
self.setHeader(QNetworkRequest.ContentTypeHeader,
QVariant("text/plain"))
self.content = self.input
self.setHeader(QNetworkRequest.ContentLengthHeader,
QVariant(len(self.content)))
self.readyRead.emit()
self.finished.emit()
def setContentList(self):
url = QUrl(self.url())
if not url.path().endsWith(u"/"):
url.setPath(url.path() + u"/")
base_url = self.url().toString()
base_path = unicode(url.path())
self.open(self.ReadOnly | self.Unbuffered)
content = (
u"\n"
u"
\n"
u" " + self.html(base_url) + u"\n"
u' \n"
u"\n\n"
u"\n"
u"Listing for " + base_path + u"
\n\n"
)
lines = self.input.splitlines()
for line in lines:
pieces = line.split("\t")
if pieces == ["."]:
break
try:
type, path, host, port = pieces[:4]
except ValueError:
# This isn't a listing. Let's try returning data instead.
self.setContentData()
return
if type[0] == "i":
content += u"" + self.html(type[1:]) + u"
"
elif type[0] == "h" and path.startswith(u"URL:"):
content += u""+self.html(type[1:])+u"
"
elif type[0] == "0":
content += u""+self.html(type[1:])+u"
"
elif type[0] == "1":
content += u""+self.html(type[1:])+u"
"
elif type[0] == "4":
content += u""+self.html(type[1:])+u"
"
elif type[0] == "5":
content += u""+self.html(type[1:])+u"
"
elif type[0] == "6":
content += u""+self.html(type[1:])+u"
"
elif type[0] == "9":
content += u""+self.html(type[1:])+u"
"
elif type[0] == "g":
content += u"
"+self.html(type[1:])+u""
elif type[0] == "I":
content += u"
"+self.html(type[1:])+u""
content += (
u"\n"
u"\n"
)
self.content = content.encode("utf-8")
self.setHeader(QNetworkRequest.ContentTypeHeader, QVariant("text/html; charset=UTF-8"))
self.setHeader(QNetworkRequest.ContentLengthHeader, QVariant(len(self.content)))
self.readyRead.emit()
self.finished.emit()
# QIODevice methods
def abort(self):
pass
def bytesAvailable(self):
return len(self.content) - self.offset
def isSequential(self):
return True
def readData(self, maxSize):
if self.offset < len(self.content):
end = min(self.offset + maxSize, len(self.content))
data = self.content[self.offset:end]
self.offset = end
return data
class NetworkAccessManager(QNetworkAccessManager):
def __init__(self, old_manager):
QNetworkAccessManager.__init__(self)
self.setCache(old_manager.cache())
self.setCookieJar(old_manager.cookieJar())
self.setProxy(old_manager.proxy())
self.setProxyFactory(old_manager.proxyFactory())
def createRequest(self, operation, request, device):
if request.url().scheme() != "gopher":
return QNetworkAccessManager.createRequest(self, operation, request, device)
if operation == self.GetOperation:
# Handle gopher:// URLs separately by creating custom QNetworkReply
# objects.
reply = GopherReply(request.url())
return reply
else:
return QNetworkAccessManager.createRequest(self, operation, request, device)
class Window(QWidget):
def __init__(self, parent = None):
QWidget.__init__(self, parent)
self.addressEdit = QLineEdit()
# gopher://mirror.lug.udel.edu:70
self.addressEdit.setText("gopher://hal3000.cx:70")
self.view = QWebView()
old_manager = self.view.page().networkAccessManager()
self.new_manager = NetworkAccessManager(old_manager)
self.view.page().setNetworkAccessManager(self.new_manager)
self.view.page().setForwardUnsupportedContent(True)
self.downloader = Downloader(self)
self.view.page().unsupportedContent.connect(self.downloader.saveFile)
self.view.setUrl(QUrl(self.addressEdit.text()))
self.addressEdit.returnPressed.connect(self.setUrl)
self.view.urlChanged.connect(self.updateAddress)
layout = QVBoxLayout(self)
layout.addWidget(self.addressEdit)
layout.addWidget(self.view)
def setUrl(self):
self.view.setUrl(QUrl(self.addressEdit.text()))
def updateAddress(self, url):
self.addressEdit.setText(url.toString())
if __name__ == "__main__":
app = QApplication(sys.argv)
window = Window()
window.show()
sys.exit(app.exec_())
}}}