"""
rssweblog.py
Exposes an interface for filtering RSS files, based on XMLFilter, with
high-level operations such as adding, listing, editing, and removing posts.
Can treat blogBrowser date-based archives as one big file (with optimizations
to find posts quickly based on date).
Revision History:
1.5.2 2003-07-16 Andrew Shearer
Prepend filename (minus dir path) to UnicodeError messages when parsing a
weblogArchive; xmllib sometimes throws UTF-8 errors and otherwise there's no
way to tell which file they came from.
1.5.1 2003-07-15 Andrew Shearer
Accept W3CDate, not just xmlrpclib.DateTime, in the MetaWeblog API's
dateCreated member. This allows the caller (the XML-RPC newPost handler)
to pass the current date with timezone and DST intact.
1.5 2003-07-07 Andrew Shearer
Read-only support for RSS 1.0, as well as RSS 2.0 in a namespace.
flNotOnHomePage support. Editing a post to remove all categories now
works. Renamed ISO8601Date to W3CDate; moved W3CDate and XMLFilter to
their own modules.
"""
import os
import os.path
import re
import string
import time
import xmlrpclib
import W3CDate
import XMLFilter
__author__ = "Andrew Shearer"
# Common namespace URIs
kContentURI = "http://purl.org/rss/1.0/modules/content/"
kDCURI = "http://purl.org/dc/elements/1.1/"
kDCTermsURI = "http://purl.org/dc/terms/"
kRadioWeblogPostURI = "http://backend.userland.com/radioWeblogPostModule"
kRSS1URI = "http://purl.org/rss/1.0/"
kRSS2URI = "http://backend.userland.com/RSS2"
kRDFURI = "http://www.w3.org/1999/02/22-rdf-syntax-ns#"
# If the RSS writer is forced to declare a namespace (because it
# was missing from the original file), it will use these prefixes.
kNamespacePrefix = {
kContentURI: 'content',
kDCURI: 'dc',
kDCTermsURI: 'dcterms',
kRadioWeblogPostURI: 'radioWeblogPost',
kRDFURI: 'rdf',
"http://purl.org/rss/1.0/modules/syndication/": 'sy',
"http://webns.net/mvcb/": 'admin',
}
# Name tuples for core RSS 2.0 elements (first item is namespace URI)
kChannel = (None, 'channel')
kItem = (None, 'item')
kAuthor = (None, 'author')
kCategory = (None, 'category')
kDescription = (None, 'description')
kGuid = (None, 'guid')
kLink = (None, 'link')
kPubDate = (None, 'pubDate')
kTitle = (None, 'title')
kCoreNamespaceAlternatives = (None, kRSS1URI, kRSS2URI)
# Name tuples for common namespaced RSS extensions
kContentEncoded = (kContentURI, 'encoded')
kDCDate = (kDCURI, 'date')
kDCTermsModified = (kDCTermsURI, 'modified')
kRadioWeblogPostID = (kRadioWeblogPostURI, 'id')
kRadioWeblogPostFlNotOnHomePage = (kRadioWeblogPostURI, 'flNotOnHomePage')
kRDFAbout = (kRDFURI, 'about')
# Construct an inverted kNamespaceURI dict, for fast lookup
#kNamespacePrefix = {}
#for iterPrefix, iterURI in kNamespaceURI.items():
# kNamespacePrefix[iterURI] = iterPrefix
class SafeFileReplace:
"""A safe transaction-style API for updating a file, by writing the new
content to a temporary file then replacing the original when done
("commit") or throwing away the temp file ("abort"). Code using this
class should be of the form:
replacer = SafeFileReplace(mypath)
tempfile = replacer.openTempFile()
try:
# write stuff to tempfile
except:
replacer.abort()
raise
else:
replacer.commit()
To do: lock the original file somehow during the update, to prevent race
conditions with multiple update processes or threads. Use a random-number
algorithm to choose temp file names."""
def __init__(self, origpath):
self.origpath = origpath
self.tempfilepath = None
self.tempfileptr = None
def openTempFile(self, binary = 0):
if self.tempfileptr == None:
self.tempfilepath = self.getTempPath(os.path.dirname(self.origpath))
if binary: mode = 'wb'
else: mode = 'w'
self.tempfileptr = open(self.tempfilepath, mode)
return self.tempfileptr
def commit(self):
"""replace the orig file with the temp file.
"""
# would be nice to use FSpExchangeFiles on Mac OS
self.tempfileptr.close()
self.tempfileptr = None
temppath2 = self.getTempPath(os.path.dirname(self.origpath))
os.rename(self.origpath, temppath2)
os.rename(self.tempfilepath, self.origpath)
self.tempfilepath = None
os.remove(temppath2)
def abort(self):
"""get rid of the temp file"""
self.tempfileptr.close()
self.tempfileptr = None
os.remove(self.tempfilepath)
self.tempfilepath = None
def getTempPath(self, dir):
# stupid temp alogorithm
# we don't use Python's tmpnam function because there's no
# guarantee that it will choose the same volume or even a writable
# directory.
index = 1
while 1:
path = os.path.join(dir, '~temp%d.tmp' % index)
if not os.path.exists(path): return path
index = index + 1
# --- weblog representation classes ---
class RSSException(Exception):
pass
class NoPostIDException(RSSException):
pass
class NoChannelForNewItemException(RSSException):
pass
class MissingRSSFileException(RSSException):
pass
class NoSampleRSSFileException(RSSException):
pass
class Weblog:
def __init__(self, webloginfo):
self.webloginfo = webloginfo
# cache inverted categories list, for fast case-insensitive name lookup
self.categoriesByName = {}
categories = webloginfo.get('categories', [])
for categoryid in range(0, len(categories)):
self.categoriesByName[string.lower(categories[categoryid]['name'])] = categoryid
def getCategories(self):
return self.webloginfo.get('categories', [])
def getCategoryNameByID(self, categoryid):
#return self.getCategories()[categoryid]
categories = self.getCategories()
if categoryid >= 0 and categoryid < len(categories):
return categories[categoryid]['name']
else:
return None
def getCategoryIDByName(self, category):
return self.categoriesByName.get(string.lower(category))
def getPermaLinkFormat(self):
return self.webloginfo.get('permaLinkFormat')
def getGuidFormat(self):
return self.webloginfo.get('guidFormat')
def modifyRSS(self, transformerClass, transformerParams = (), postid = None, postdate = None):
"""return a true value if the transformerClass instance says it finished"""
return self._modifyRSSFile(self.webloginfo['path'], transformerClass, transformerParams)
def _modifyRSSFile(self, rsspath, transformerClass, transformerParams = ()):
"""return a true value if the transformerClass instance says it finished"""
replacer = SafeFileReplace(rsspath)
infile = open(rsspath, 'r')
outfile = replacer.openTempFile()
try:
handlerObj = apply(transformerClass, [XMLFilter.XMLGenerator(outfile)]+list(transformerParams))
XMLFilter.parseStream(infile, handlerObj, handlerObj.getRecommendedFeatures())
except:
infile.close()
replacer.abort()
raise
else:
infile.close()
replacer.commit()
return handlerObj.isDone()
def readRSS(self, readerClass, readerParams, postid = None, postdate = None):
"""return a result furnished by the readerClass instance"""
reader = apply(readerClass, readerParams)
if self.webloginfo.has_key('stream'):
XMLFilter.parseStream(self.webloginfo['stream'], reader, reader.getRecommendedFeatures())
else:
self._readRSSFile(reader, self.webloginfo['path'])
return reader.getResult()
def _readRSSFile(self, reader, rsspath):
XMLFilter.parseFilePath(rsspath, reader, reader.getRecommendedFeatures())
#return reader.isDone()
def WeblogFactory(webloginfo):
if webloginfo.has_key('stream') or os.path.isfile(webloginfo['path']):
return Weblog(webloginfo)
if os.path.isdir(webloginfo['path']):
return WeblogArchive(webloginfo)
else:
raise MissingRSSFileException, "the file or folder specified for the weblog is missing"
class WeblogArchive(Weblog):
"""format a weblog as a structured folder hierarchy, ordered by date.
One folder per year, one RSS file per month.
Example: {defined root folder} / 2003 / 01.xml
"""
yearRE = re.compile("^[1-9][0-9][0-9][0-9]$")
def __init__(self, webloginfo):
Weblog.__init__(self, webloginfo)
def modifyRSS(self, transformerClass, params = (), postid = None, postdate = None):
if postdate:
rsspaths = [self.dateToRSSPath(postdate)]
else:
rsspaths = self.getOrderedRSSPaths()
isDone = 0
for rssfile in rsspaths:
isDone = self._modifyRSSFile(rssfile, transformerClass, params)
if isDone: break
self.generateRecent()
return isDone
def generateRecent(self):
recentpath = self.webloginfo.get('recent-file')
if not recentpath:
return
maxposts = int(self.webloginfo.get('recent-max','15'))
recentposts = self.readRSS(RSSLister, (maxposts,))
isDone = self._modifyRSSFile(recentpath, RSSReplacer, (recentposts,))
if not isDone:
raise NoChannelForNewItemException, "Could not write the recent items list, because the recent file did not have a place for them."
def readRSS(self, readerClass, readerParams, postid = None, postdate = None):
if postdate:
rsspaths = [self.dateToRSSPath(postdate)]
else:
rsspaths = self.getOrderedRSSPaths()
reader = apply(readerClass, readerParams)
for rssfile in rsspaths:
try:
self._readRSSFile(reader, rssfile)
except UnicodeError, e:
raise UnicodeError, ('File %s: %s' % (os.path.basename(rssfile), str(e)))
if reader.isDone(): return reader.getResult()
return reader.getResult()
def getOrderedRSSPaths(self):
parentdir = self.webloginfo['path']
yeardirnames = os.listdir(self.webloginfo['path'])
years = [] # will contain list of strings
for yeardirname in yeardirnames:
if len(yeardirname) == 4 and self.yearRE.match(yeardirname) and os.path.isdir(os.path.join(parentdir, yeardirname)):
years.append(yeardirname)
years.sort()
years.reverse()
result = []
for year in years:
for month in range(12, 0, -1):
rsspath = os.path.join(parentdir, year, "%02d.xml" % month)
if os.path.isfile(rsspath):
result.append(rsspath)
return result
def dateToRSSPath(self, postdate, autocreate = 1):
datetuple = postdate.getDateTuple()
parentdir = self.webloginfo['path']
year, month, day = datetuple[0:3]
yearpath = os.path.join(parentdir, "%04d" % year)
if not os.path.isdir(yearpath):
if not autocreate: return None
os.mkdir(yearpath)
rsspath = os.path.join(yearpath, "%02d.xml" % month)
if not os.path.isfile(rsspath):
if not autocreate: return None
samplefile = self.webloginfo.get('sample-file')
if not samplefile:
raise NoSampleRSSFileException, "Could not create the month's RSS archive, because no 'sample-file' to use as a template was specified for the weblog."
if not os.path.isfile(samplefile):
raise NoSampleRSSFileException, "Could not create the month's RSS archive, because the 'sample-file' specified for use as a template was not found."
#shutil.copyfile(emptyfile, rsspath)
#self._modifyRSS(rsspath, rssDropper, (samplefile,))
outfile = open(rsspath, 'w')
try:
handlerObj = RSSReplacer(XMLFilter.XMLGenerator(outfile), [])
XMLFilter.parseFilePath(samplefile, handlerObj, handlerObj.getRecommendedFeatures())
finally:
outfile.close()
return rsspath
class RSSItem:
multiValuedElements = {kCategory: 'categories'}
# element is called 'category' in internal/RSS, 'categories' in MetaWeblog
def __init__(self, weblog = None):
"""weblog object is only used to interpret elements such as category IDs.
The client must handle actually adding the item to the weblog, normally through the
item's pipeToRSS() method."""
self.elementsList = [] # saved order of elements, repeating elems such as category
self.elementsDict = {} # RSS struct format, mapping (namespace URI, tagname) pairs to text content or lists of text content (for multiple like-named nodes)
self.changedElements = {} # elements for which to favor the elementsDict version over elementsList, while preserving order from elementsList
self.elementAttrsDict = {}
self.itemAttrs = {}
self.weblog = weblog
def setItemAttributes(self, attrs):
self.itemAttrs = attrs
def addElement(self, tag, attrs, text): # add an RSS file element. Do this when initializing the RSSItem.
if tag[0] == '': tag = (None, tag[1]) # normalize namespace URI of '' to None
self.elementsList.append((tag, attrs, text))
if attrs: self.elementAttrsDict[tag] = attrs
if self.multiValuedElements.has_key(tag):
#tag = self.multiValuedElements[tag]
if type(text) is not type([]):
text = [text]
if self.elementsDict.has_key(tag):
self.elementsDict[tag].extend(text)
else:
self.elementsDict[tag] = text
else:
self.elementsDict[tag] = text
#print 'addElement: ', repr(self.elementsList)
def changeElement(self, tag, attrs, text): # change an existing RSS element
if tag[0] == '': tag = (None, tag[1]) # normalize namespace URI of '' to None
self.changedElements[tag] = 1
if not self.elementsDict.has_key(tag):
self.elementsList.append((tag, attrs,text))
if self.multiValuedElements.has_key(tag):
#tag = self.multiValuedElements[tag]
if type(text) is not type([]):
text = [text]
self.elementsDict[tag] = text
self.elementAttrsDict[tag] = attrs
def deleteElement(self, tag):
if tag[0] == '': tag = (None, tag[1]) # normalize namespace URI of '' to None
self.changedElements[tag] = 1
if self.elementsDict.has_key(tag):
del self.elementsDict[tag]
def setFromBloggerFormat(self, bloggerFormat):
mapping = (('title', kTitle), ('link', kLink), ('content', kDescription))
for key, rssElement in mapping:
if bloggerFormat.get(key):
if key == 'content':
element = bloggerFormat[key]
# demunge title and category tags that some clients prepend to description
titleprefix = '
'
titlesuffix = ''
catprefix = ''
catsuffix = ''
suffixindex = string.find(element, titlesuffix)
if element[0:len(titleprefix)] == titleprefix and suffixindex > 0:
title = string.strip(element[len(titleprefix):suffixindex])
element = string.lstrip(element[suffixindex + len(titlesuffix):])
if title: self.changeElement(kTitle, {}, title) # +++ unescape
suffixindex = string.find(element, catsuffix)
if element[0:len(catprefix)] == catprefix and suffixindex > 0:
category = string.strip(element[len(catprefix):suffixindex])
element = string.lstrip(element[suffixindex + len(catsuffix):])
if category and self.weblog != None:
categoryName = self.weblog.getCategoryNameByID(int(category))
# +++ should throw exception for unknown categories
if categoryName: self.changeElement(kCategory, {}, categoryName) # +++ unescape
# set description
self.changeElement(kDescription, {}, element)
# remove content:encoded element, if it exists; it would be out of sync
self.deleteElement(kContentEncoded)
else:
self.changeElement(rssElement, {}, bloggerFormat[key])
else:
# delete missing or blank elements.
# Can't just let blanks stand: RSS Validator won't accept
# empty elements, for example.
self.deleteElement(rssElement)
def setFromMetaWeblogFormat(self, metaWeblogFormat):
if metaWeblogFormat.has_key('description'):
# remove content:encoded element, if it exists; it would be out of sync
self.deleteElement(kContentEncoded)
if metaWeblogFormat.has_key('dateCreated'):
self.deleteElement(kDCDate)
#self.deleteElement(kLink)
# always delete flNotOnHomePage first (make it false), because clients
# don't bother sending it at all if the user set it to false, so it won't
# have a chance to overwrite the old value.
# Same deal for category.
self.deleteElement(kRadioWeblogPostFlNotOnHomePage)
self.deleteElement(kCategory)
elementMapping = {'categories': kCategory, 'postid': (None, ''),
'dateCreated': kPubDate, 'flNotOnHomePage': kRadioWeblogPostFlNotOnHomePage}
# don't allow the postid to change
for key, value in metaWeblogFormat.items():
element = elementMapping.get(key)
if key == 'dateCreated':
if value: # canonicalize date
if isinstance(value, W3CDate.W3CDate):
d = value
else:
if isinstance(value, xmlrpclib.DateTime):
value = value.value
else:
value = str(value)
d = W3CDate.W3CDate()
d.parse(value)
value = d.getRFC822()
elif key == 'flNotOnHomePage':
if not value: # test the xmlrpclib.Boolean value
value = None # deletes element for false values, since existence == truth
else:
value = '' # for RSS, empty-string element is true
elif element is None:
if string.find(key, ':') != -1:
# it's an encoded namespace substruct. The MetaWeblog API namespace
# encoding isn't reversible in the general case, so we use
# a heuristic: only namespace URIs will contain a colon.
# (If some future MetaWeblog extension defines a legitimate
# struct member with a colon, we couldn't write it to XML anyway,
# since XML uses the colon to delimit namespace qualifiers.)
for subkey, subvalue in value.items():
self._setFromMetaWeblogStructMember((key, subkey), subvalue)
element = (None, '')
else:
if key == 'permaLink':
element = kGuid
value = {'isPermaLink': 'true', '_value': value}
#elif key == 'source':
# element = kSource
# ... add code here to transform 'name' attr, if it exists, into '_value'
else:
element = (None, key)
if element[1]: self._setFromMetaWeblogStructMember(element, value)
def _setFromMetaWeblogStructMember(self, element, member):
attrs = {}
if member and hasattr(member, 'items'):
# parse a struct of attribute values, including '_value' for the element content
value = ''
for attrname, attrvalue in member.items():
if attrname == '_value':
value = attrvalue
else:
attrs[(None, attrname)] = attrvalue
else:
# a plain value, no attributes
value = member
if value is not None or attrs:
self.changeElement(element, attrs, value)
else:
self.deleteElement(element)
def setFromItem(self, item):
"""Copy attributes from another instance of this class, so
that the other instance overrides this one whenever attributes overlap.
For some attributes, their absence conveys information, so we have
to delete those first from our own copy so the other item has a chance
at transferring that info."""
self.deleteElement(kRadioWeblogPostFlNotOnHomePage)
self.deleteElement(kCategory)
didSetDescription = didSetContentEncoded = 0
for tag, attrs, text in item.elementsList:
if item.changedElements.has_key(tag):
attrs = item.elementAttrsDict[tag]
text = item.elementsDict[tag]
self.changeElement(tag, attrs, text)
if tag == kDescription:
didSetDescription = 1
elif tag == kContentEncoded:
didSetContentEncoded = 1
# if we set only one of 'description' or 'content:encoded', delete the other one
# Otherwise, when the user edited one of them the other one would get out of sync.
# An older content:encoded would even override a newly edited description, which
# would be very confusing.
if didSetContentEncoded and not didSetDescription:
self.deleteElement(kDescription)
elif didSetDescription and not didSetContentEncoded:
self.deleteElement(kContentEncoded)
def getBloggerFormat(self):
"""Return a struct in Blogger API format: content, postid, optional title & category, dateCreated"""
item = {}
item['content'] = self.elementsDict.get(kContentEncoded) or self.elementsDict.get(kDescription,'')
item['postid'] = self.getBloggerID()
if self.elementsDict.has_key(kCategory) and self.weblog != None:
categoryID = self.weblog.getCategoryIDByName(self.elementsDict[kCategory][0])
if categoryID != None:
item['content'] = ('' + xmlrpclib.escape(str(categoryID))
+ '' + item['content'])
if self.elementsDict.has_key(kTitle):
#item['title'] = self.elementsDict['title']
item['content'] = ('' + xmlrpclib.escape(self.elementsDict[kTitle])
+ '' + item['content'])
date = self.getDate()
if date is not None: item['dateCreated'] = xmlrpclib.DateTime(date.getXMLRPC())
if self.elementsDict.has_key(kAuthor):
item['author'] = self.elementsDict[kAuthor]
return item
def getMetaWeblogFormat(self):
result = {}
specialElements = {kDCDate: '', kPubDate: '', kCategory: 'categories',
kRadioWeblogPostID: 'postid', kContentEncoded: '', kDCDate: '',
kRadioWeblogPostFlNotOnHomePage: ''}
# rename or delete these elements. The deleted elements will be handled later.
if self.itemAttrs and self.itemAttrs.has_key(kRDFAbout):
result['permaLink'] = self.itemAttrs[kRDFAbout]
for tag, value in self.elementsDict.items():
attrs = self.elementAttrsDict.get(tag)
if attrs:
valueStruct = {}
for attrname, attrvalue in attrs.items():
if type(attrname) is type(()):
attrname = attrname[1]
valueStruct[str(attrname)] = attrvalue
if value:
valueStruct['_value'] = value
# +++ for source element, should use 'name' attr instead of '_value'
value = valueStruct
specialKey = specialElements.get(tag)
if specialKey: # simple rename
result[specialKey] = value
elif specialKey == '': # delete element (it's a special case to handle later)
pass
elif tag[0]: # namespace, put in sub-struct by namespace URI
result.setdefault(str(tag[0]), {})[str(tag[1])] = value
else:
result[str(tag[1])] = value # regular value, no namespace
# handle special cases: semantically duplicate elements, etc.
descriptionOverride = self.elementsDict.get(kContentEncoded)
if descriptionOverride:
# weblogging tools such as NetNewsWire ignore content:encoded,
# so move it into the description instead, which it overrides
result['description'] = descriptionOverride
if not result.get('postid'):
try:
result['postid'] = self.getBloggerID()
except NoPostIDException:
pass
#if self.elementsDict.has_key('category'):
# result['categories'] = self.elementsDict['category']
try:
date = self.getDate()
except TypeError, e:
pass # don't torpedo the whole call if the item's date is invalid
else:
if date is not None:
result['dateCreated'] = xmlrpclib.DateTime(date.getXMLRPC())
permaLink = self.getPermaLink()
if permaLink: result['permaLink'] = permaLink
if self.elementsDict.has_key(kRadioWeblogPostFlNotOnHomePage):
result['flNotOnHomePage'] = xmlrpclib.True
# existence is truth in RSS, but it's a non-namespaced boolean in XML-RPC
# doesn't handle userid (numeric), source, link, enclosure
return result
def pipeToRSS(self, out):
"""recreate the item in RSS format on a SAX-compatible output"""
out.startElementNS((None, 'item'), 'item', self.itemAttrs)
out.ignorableWhitespace('\n')
emittedChangedElements = {}
# elementsList preserves the order of the original tags
# changedElements keeps track
for tag, attrs, text in self.elementsList:
if self.changedElements.has_key(tag):
if emittedChangedElements.has_key(tag):
continue
# skip this one; we already saw the first of this repeated
# element and spit out all the changed elements together
if not self.elementsDict.has_key(tag):
continue
# skip this one; the element has been deleted
text = self.elementsDict[tag]
attrs = self.elementAttrsDict.get(tag, {})
emittedChangedElements[tag] = 1
#print 'emitting %s' % repr(tag)
qname = tag[1]
if tag[0] and kNamespacePrefix.has_key(tag[0]):
qname = kNamespacePrefix[tag[0]] + ':' + tag[1]
if type(text) is type([]):
for item in text:
out.startElementNS(tag, qname, attrs)
if item: out.characters(item)
out.endElementNS(tag, qname)
#print 'writing list item %s %s' % (repr(tag), item)
out.ignorableWhitespace('\n')
else:
out.startElementNS(tag, qname, attrs)
if text: out.characters(text)
out.endElementNS(tag, qname)
out.ignorableWhitespace('\n')
out.endElementNS((None, 'item'), 'item')
#out.ignorableWhitespace('\n')
#out.suppressNextWhitespace()
def getDate(self):
dateobj = W3CDate.W3CDate()
rawdate = self.elementsDict.get(kPubDate) # try using pubDate, which is the stable creation date
if rawdate:
dateobj.parseRFC822(rawdate)
else:
rawdate = self.elementsDict.get(kDCDate) # next try dc:date element, which we use as a mod date
if rawdate:
dateobj.parse(rawdate)
else:
#print 'no dc:date',repr(self.elementsDict)
# +++ search for dcterms: dates
return None
return dateobj
def getPermaLink(self):
if self.elementsDict.has_key(kGuid) and self.elementAttrsDict.has_key(kGuid) and string.lower(self.elementAttrsDict[kGuid].get('isPermaLink','') or self.elementAttrsDict[kGuid].get((None,'isPermaLink'),'')) == 'true':
return self.elementsDict[kGuid]
else:
return None
bloggerIDFromGuid1 = re.compile(".*#[^0-9]*([0-9]+)$") # URL with anchor & postid suffix
bloggerIDFromGuid2 = re.compile("^([0-9]+).*") # postid prefix
def getBloggerID(self):
"""Return a blogger ID for the given post, which is used by
Blogger API clients to edit or delete the post. Though no
limits are specifed in the Blogger API documentation, some clients
limit the ID to be an integer. BlogApp can't handle IDs of more
than about 10 digits without dropping the rightmost few in scientific
notation. So we first try extracting an integer blogger ID from the existing
guid. If it doesn't exist, we generate an ID based on the post date."""
id = self.elementsDict.get(kRadioWeblogPostID)
if not id and self.elementsDict.has_key(kGuid):
testguid = self.elementsDict[kGuid]
match = self.bloggerIDFromGuid1.match(testguid)
if not match: match = self.bloggerIDFromGuid2.match(testguid)
if match: id = match.groups(1)[0]
#sys.stderr.write('retrieved id %r' % id)
if id == None:
date = self.getDate()
if date is not None: id = time.strftime("%m%d%H%M%S", date.getDateTuple())
if not id: raise NoPostIDException, "Could not generate a post id, because there was no post date to go by."
return str(id) # str de-Unicodes if necessary
def setCreationDate(self, dateobj = None):
if dateobj is None:
dateobj = W3CDate.W3CDate()
dateobj.setCurrentDate()
self.changeElement(kPubDate, {}, dateobj.getRFC822())
def setModificationDate(self, dateobj = None):
if dateobj is None:
dateobj = W3CDate.W3CDate()
dateobj.setCurrentDate()
#self.changeElement((kDCURI, 'date'), {}, dateobj.getRFC8601())
self.changeElement(kDCTermsModified, {}, dateobj.getRFC8601())
def setCategories(self, categoryList):
self.changeElement(kCategory, {}, categoryList)
def getCategories(self):
return self.elementsDict.get(kCategory)
def setGUID(self, guid, isPermaLink):
attrs = {}
if isPermaLink: attrs = {'isPermaLink': 'true'}
self.changeElement(kGuid, attrs, guid)
def setPostID(self, postid):
self.changeElement(kRadioWeblogPostID, {}, postid)
def autoSetPostID(self):
# post date must have already been set
self.setPostID(self.getBloggerID())
class RSSFilter(XMLFilter.XMLFilter):
"""XMLFilter that (optionally) parses each item into an RSSItem instance
instead of passing the xml code through. At the start of the item,
self.shouldParseItem() returns a boolean, which if true causes
all XML to be diverted to a new post object stored as self._currentitem.
While self._currentitem is None, the XML is passed through as usual."""
def __init__(self, nextFilter):
XMLFilter.XMLFilter.__init__(self, nextFilter)
self._currentitem = None
self._currenttext = None
self._currentelemattrs = None
self._suppressNextWhitespace = 0
def getRecommendedFeatures(self):
"""The recommended features (which clients should pass along to the
SAX parser) enable namespace parsing and disable external entities.
(So there's no network traffic even if the document were to specify
a DTD.)
"""
return {XMLFilter.feature_namespaces: 1,
XMLFilter.feature_external_ges: 0,
XMLFilter.feature_external_pes: 0}
def startElementNS(self, nameTuple, qname, attrs):
self._suppressNextWhitespace = 0
if self._currentitem != None:
self._currenttext = ''
self._currentelemattrs = attrs
elif nameTuple[1] == 'item' and nameTuple[0] in kCoreNamespaceAlternatives \
and self.shouldParseItem(): # +++ NS check
self._currentitem = RSSItem()
if attrs: self._currentitem.setItemAttributes(attrs)
else:
XMLFilter.XMLFilter.startElementNS(self, nameTuple, qname, attrs)
def shouldParseItem(self):
"""overrideable"""
return 1
def itemFinished(self, item):
"""overrideable"""
pass
def characters(self, data):
if self._suppressNextWhitespace:
if string.strip(data) == '':
return
else:
self._suppressNextWhitespace = 0
if self._currenttext != None:
self._currenttext = self._currenttext + data
elif self._currentitem is None:
XMLFilter.XMLFilter.characters(self, data)
def endElementNS(self, nameTuple, qname):
self._suppressNextWhitespace = 0
item = self._currentitem
if item is not None:
currenttext = self._currenttext
currentelemattrs = self._currentelemattrs
self._currentelemattrs = self._currenttext = None
if nameTuple[1] == 'item' and nameTuple[0] in kCoreNamespaceAlternatives: # +++ NS check
self._currentitem = None
self.itemFinished(item)
else:
# if the element is in an RSS namespace (1.0 or 2.0), normalize it
# to having no namespace, so the RSSItem can recognize it easily
if nameTuple[0] is not None and nameTuple in kCoreNamespaceAlternatives:
nameTuple = (None, nameTuple[1])
item.addElement(nameTuple, currentelemattrs, currenttext)
else:
XMLFilter.XMLFilter.endElementNS(self, nameTuple, qname)
def ignorableWhitespace(self, data):
if self._suppressNextWhitespace:
pass
elif self._currenttext != None:
self._currenttext = self._currenttext + data
elif self._currentitem is None:
XMLFilter.XMLFilter.ignorableWhitespace(self, data)
#def endDocument(self):
# XMLFilter.endDocument(self)
def suppressNextWhitespace(self):
self._suppressNextWhitespace = 1
def isDone(self):
return 0
class RSSAdder(XMLFilter.XMLFilter):
"""Prepend a post to an RSS XML stream. Not necessary to inherit from
RSSFilter because we don't need to parse any RSS items."""
def __init__(self, out, newPost):
XMLFilter.XMLFilter.__init__(self, out)
self._newPost = newPost
def getRecommendedFeatures(self):
"""The recommended features (which clients should pass along to the
SAX parser) enable namespace parsing and disable external entities.
(So there's no network traffic even if the document were to specify
a DTD.)
"""
return {XMLFilter.feature_namespaces: 1,
XMLFilter.feature_external_ges: 0,
XMLFilter.feature_external_pes: 0}
def startElementNS(self, nameTuple, qname, attrs):
if self._newPost is not None and nameTuple[1] == 'item' and nameTuple[0] in kCoreNamespaceAlternatives: # +++ NS check
self._emitNewPost()
XMLFilter.XMLFilter.startElementNS(self, nameTuple, qname, attrs)
def _emitNewPost(self):
# clear out self._newPost first, as recursion prevention
newPost = self._newPost
self._newPost = None
newPost.pipeToRSS(self)
self.ignorableWhitespace("\n")
self.ignorableWhitespace("\n")
def endElementNS(self, nameTuple, qname):
if self._newPost is not None and nameTuple[1] == 'channel' and nameTuple[0] in kCoreNamespaceAlternatives: # +++ NS check
self._emitNewPost()
XMLFilter.XMLFilter.endElementNS(self, nameTuple, qname)
def endDocument(self):
if self._newPost is not None:
raise NoChannelForNewItemException, "Could not save new post; RSS file did not contain a channel element"
XMLFilter.XMLFilter.endDocument(self)
def isDone(self):
return self._newPost is None
class RSSEditor(RSSFilter):
"""Filter an XML RSS stream, replacing a particular post with an updated version.
The new post is substituted when a target postid comes along.
"""
def __init__(self, out, postid, newPost):
RSSFilter.__init__(self, out)
self._newPost = newPost
self._postid = str(postid)
def shouldParseItem(self):
return self._newPost is not None
def itemFinished(self, item):
#print "comparing post ID %s to target %s" % (str(item.getBloggerID()), self._postid)
if str(item.getBloggerID()) == self._postid:
if not self._newPost: # deleting
item = None
else:
item.setFromItem(self._newPost)
futureNewPost = None # we won't need the _newPost variable after we're done
else:
futureNewPost = self._newPost # we'll need to keep _newPost around
# make sure we don't filter the edited item recursively,
# so clear out vars first
self._newPost = None
if item is not None:
item.pipeToRSS(self)
else:
self.suppressNextWhitespace() # delete trailing whitespace as well as the item
self._newPost = futureNewPost
def endDocument(self):
RSSFilter.endDocument(self)
def isDone(self):
return (self._newPost is None)
class RSSReplacer(XMLFilter.XMLFilter):
"""Filter an XML RSS stream, dropping all posts and replacing them with
the given posts, if any.
The channel info is preserved, making this useful for making a new empty file
from a 'sample' RSS file.
"""
def __init__(self, nextFilter, items = []):
XMLFilter.XMLFilter.__init__(self, nextFilter)
self._ignoring = 0
self._items = items or []
self._suppressNextWhitespace = 0
def getRecommendedFeatures(self):
"""The recommended features (which clients should pass along to the
SAX parser) enable namespace parsing and disable external entities.
(So there's no network traffic even if the document were to specify
a DTD.)
"""
return {XMLFilter.feature_namespaces: 1,
XMLFilter.feature_external_ges: 0,
XMLFilter.feature_external_pes: 0}
def startElementNS(self, nameTuple, qname, attrs):
if not self._ignoring:
if (nameTuple[1] == 'item' and nameTuple[0] in kCoreNamespaceAlternatives) and self._items is not None: # +++ NS check
self._ignoring = 1
else:
self._suppressNextWhitespace = 0
XMLFilter.XMLFilter.startElementNS(self, nameTuple, qname, attrs)
def characters(self, data):
if not self._ignoring:
if self._suppressNextWhitespace:
if string.strip(data) == '':
return
else:
self._suppressNextWhitespace = 0
XMLFilter.XMLFilter.characters(self, data)
def endElementNS(self, nameTuple, qname):
if nameTuple[1] == 'channel' and nameTuple[0] in kCoreNamespaceAlternatives: # +++ NS check
self._ignoring = 0
items = self._items
if items is not None:
self._items = None
for post in items:
post.pipeToRSS(self)
if not self._ignoring:
self._suppressNextWhitespace = 0
XMLFilter.XMLFilter.endElementNS(self, nameTuple, qname)
elif nameTuple[1] == 'item' and nameTuple[0] in kCoreNamespaceAlternatives: # +++ NS check
self._ignoring = 0
self._suppressNextWhitespace = 1
def ignorableWhitespace(self, data):
if not self._ignoring and not self._suppressNextWhitespace:
XMLFilter.XMLFilter.ignorableWhitespace(self, data)
def isDone(self):
return self._items is None
class RSSLister(RSSFilter):
"""Accumulate the parsed RSS items into a big Python list, up to an optional
maximum number of items."""
def __init__(self, maxposts = None):
"""Set up next item in XML filter chain to be an empty XMLSAXHandler
(the bit bucket) because we don't need to save the XML anywhere."""
RSSFilter.__init__(self, XMLFilter.XMLSAXHandler())
self._maxposts = maxposts
self._numposts = 0
self._items = []
def shouldParseItem(self):
return self._maxposts == None or self._numposts < self._maxposts
def itemFinished(self, item):
self._items.append(item)
self._numposts = self._numposts + 1
def getResult(self):
"""Return the list of accumulated posts."""
return self._items
def isDone(self):
return self._maxposts is not None and self._numposts >= self._maxposts
class RSSFilteredLister(RSSFilter):
"""Accumulate the parsed RSS items into a big Python list, up to an optional
maximum number of items."""
def __init__(self, minDate = None, maxDate = None, minNumber = None, maxNumber = None,
category = None):
"""Set up next item in XML filter chain to be an empty XMLSAXHandler
(the bit bucket) because we don't need to save the XML anywhere."""
RSSFilter.__init__(self, XMLFilter.XMLSAXHandler())
self._minNumber = minNumber
self._maxNumber = maxNumber
self._category = category
self._numposts = 0
self._minDate = self._maxDate = None
if minDate is not None:
self._minDate = minDate.getXMLRPC()
if maxDate is not None:
self._maxDate = maxDate.getXMLRPC()
if self._category:
self._category = string.lower(self._category)
self._items = []
def shouldParseItem(self):
self._numposts = self._numposts + 1
return (self._minNumber is None or self._numposts >= self._minNumber) \
and (self._maxNumber is None or self._numposts <= self._maxNumber)
def itemFinished(self, item):
if self._category:
foundMatch = None
for postCategory in (item.getCategories() or []):
if string.lower(postCategory) == self._category:
foundMatch = 1
break
if not foundMatch:
return
if self._minDate is not None or self._maxDate is not None:
itemDate = item.getDate()
if itemDate is not None:
itemDate = itemDate.getXMLRPC()
if self._minDate is not None and self._minDate > itemDate: return
if self._maxDate is not None and self._maxDate < itemDate: return
self._items.append(item)
def getResult(self):
"""Return the list of accumulated posts."""
return self._items
def isDone(self):
return self._maxNumber is not None and self._numposts >= self._maxNumber
class RSSGetPostID(RSSFilter):
def __init__(self, postid = None, guid = None):
"""postid is a string that looks like an integer, for Blogger API clients,
which may not handle anything more. guid is the actual value from the RSS
file, which may happen to contain the postid. Only specify one."""
RSSFilter.__init__(self, XMLFilter.XMLSAXHandler())
self._postid = postid
self._guid = guid
self._post = None # will hold the post object, once found
def shouldParseItem(self):
return self._post == None # haven't already found a post
def itemFinished(self, item):
if (self._postid is not None and item.getBloggerID() == self._postid) or \
(self._guid is not None and item.getAttribute('guid') == self._guid):
self._post = item
def getResult(self):
return self._post
def isDone(self):
return self._post is not None
class RSSPostIDChecker(RSSFilter):
"""count the number of occurrences of the given postid in an RSS file"""
def __init__(self, postid):
RSSFilter.__init__(self, XMLFilter.XMLSAXHandler())
# XMLSAXHandler() creates a null destination,
# since we don't want to save the XML stream
self.postid = str(postid)
self.postsMatchingID = 0
def itemFinished(self, item):
if str(item.getBloggerID()) == self.postid:
self.postsMatchingID = self.postsMatchingID + 1
def getResult(self):
return self.postsMatchingID
def isDone(self):
return 0 # always want to continue on, in case there are more posts with same ID