diff --git a/ckanext/odsh/harvesters/kielharvester.py b/ckanext/odsh/harvesters/kielharvester.py
index 907a7bea0b6a63ea139d8d276f5de715c162b9e8..af40d74ea3058e757e0577a2f8cf107401b32adf 100755
--- a/ckanext/odsh/harvesters/kielharvester.py
+++ b/ckanext/odsh/harvesters/kielharvester.py
@@ -39,23 +39,26 @@ class KielHarvester(ODSHBaseHarvester):
         try:
             used_identifiers = []
             ids = []
-            package_ids_in_db = list(map(lambda x: x[0], model.Session.query(HarvestObject.guid)\
-                                .filter(HarvestObject.current == True)\
+            package_ids_in_db = list(map(lambda x: x[0], model.Session.query(HarvestObject.guid)
+                                .filter(HarvestObject.current == True)
                                 .filter(HarvestObject.harvest_source_id == harvest_job.source.id).all()))
             log.info("Package IDs in DB: %s" % str(package_ids_in_db))
             for dataset in datasets:
-                guid = str(uuid.uuid3(uuid.NAMESPACE_URL, dataset.get("url").encode('ascii', 'ignore')))
+                guid = str(uuid.uuid3(uuid.NAMESPACE_URL,
+                           dataset.get("url").encode('ascii', 'ignore')))
                 if guid not in package_ids_in_db:
                     obj = HarvestObject(job=harvest_job, guid=guid)
                     obj.content = json.dumps(dataset)
                     obj.save()
-                    log.info("harvest_object_id: %s, GUID: %s successfully gathered " % (str(obj.id), str(obj.guid)))
+                    log.info("harvest_object_id: %s, GUID: %s successfully gathered " % (
+                        str(obj.id), str(obj.guid)))
                     used_identifiers.append(guid)
                     ids.append(obj.id)
 
         except Exception as e:
             self._save_gather_error(
-                'Statistik-Nord-Harvester: Error gathering the identifiers from the source server [%s]' % str(e),
+                'Kiel-Harvester: Error gathering the identifiers from the source server [%s]' % str(
+                    e),
                 harvest_job)
             log.error(e)
             return None
@@ -68,7 +71,8 @@ class KielHarvester(ODSHBaseHarvester):
             return ids
         else:
             log.error("No records received")
-            self._save_gather_error("Couldn't find any metadata files", harvest_job)
+            self._save_gather_error(
+                "Couldn't find any metadata files", harvest_job)
             return None
 
     @staticmethod
@@ -79,6 +83,7 @@ class KielHarvester(ODSHBaseHarvester):
             return False
 
     def import_stage(self, harvest_object):
+        log.debug('IMPORT')
         context = {
             'model': model,
             'session': model.Session,
@@ -89,12 +94,14 @@ class KielHarvester(ODSHBaseHarvester):
             return False
 
         if harvest_object.content is None:
-            self._save_object_error('Empty content for object %s' % harvest_object.id, harvest_object, 'Import')
+            self._save_object_error(
+                'Empty content for object %s' % harvest_object.id, harvest_object, 'Import')
             return False
         else:
             package_dict = json.loads(harvest_object.content)
 
-            source_dataset = get_action('package_show')(context.copy(), {'id': harvest_object.source.id})
+            source_dataset = get_action('package_show')(
+                context.copy(), {'id': harvest_object.source.id})
             package_dict['owner_org'] = source_dataset.get('owner_org')
 
             if package_dict['type'] == 'datensatz':
@@ -115,25 +122,39 @@ class KielHarvester(ODSHBaseHarvester):
             package_dict['groups'] = mapped_groups
 
             extras = package_dict['extras']
-            package_dict['extras'] = list()
+            new_extras = list()
             for extra in extras:
-                if extra['key'] in ['temporal_start', 'temporal_end', 'issued']:
-                    package_dict[extra['key']] = extra['value']
+                # WARNING: When this code was written, all datasets had '-zero-' licences, i.e.
+                # there was no key 'licenseAttributionByText' which we would expect for '-by-' licences.
+                # The setting is just anticipated, matching for datasets with a corresponding licence.
+                if extra['key'] == 'licenseAttributionByText':
+                    new_extras.append(extra)
+                elif extra['key'] in ['temporal_start', 'temporal_end', 'issued']:
+                    new_extras.append(extra)
+
+            new_extras.append(
+                {'key': 'spatial_uri',
+                 'value': 'http://dcat-ap.de/def/politicalGeocoding/districtKey/01002'})
 
-            package_dict['spatial_uri'] = 'http://dcat-ap.de/def/politicalGeocoding/districtKey/01002'
+            package_dict['extras'] = new_extras
 
             license_id = self._get_license_id(package_dict['license_id'])
             if license_id:
                 package_dict['license_id'] = license_id
             else:
-                log.error('invalid license_id: %s' % package_dict['license_id'])
-                self._save_object_error('Invalid license_id: %s' % package_dict['license_id'], harvest_object, 'Import')
+                log.error('invalid license_id: %s' %
+                          package_dict['license_id'])
+                self._save_object_error(
+                    'Invalid license_id: %s' % package_dict['license_id'], harvest_object, 'Import')
                 return False
             try:
-                context = {'user': self._get_user_name(), 'return_id_only': True, 'ignore_auth': True}
-                package_plugin = lib_plugins.lookup_package_plugin(package_dict.get('type', None))
+                context = {'user': self._get_user_name(
+                ), 'return_id_only': True, 'ignore_auth': True}
+                package_plugin = lib_plugins.lookup_package_plugin(
+                    package_dict.get('type', None))
                 package_schema = package_plugin.create_package_schema()
                 context['schema'] = package_schema
+                log.debug(package_schema)
                 self._handle_current_harvest_object(harvest_object, harvest_object.guid)
                 result = toolkit.get_action('package_create')(context, package_dict)
                 return result
diff --git a/ckanext/odsh/harvesters/statistikamtnordharvester.py b/ckanext/odsh/harvesters/statistikamtnordharvester.py
index f6d06a341bffdc06b7581bf5131a0e53d914268f..aea0a1ead5c5b63e0afcee95c8dc4a79868e4bd6 100755
--- a/ckanext/odsh/harvesters/statistikamtnordharvester.py
+++ b/ckanext/odsh/harvesters/statistikamtnordharvester.py
@@ -125,24 +125,11 @@ class StatistikamtNordHarvester(ODSHBaseHarvester):
             self.map_fields(context, harvest_object)
             return True
 
-    @staticmethod
-    def _update_schema(schema):
-        schema.update({'temporal_start': [
-            toolkit.get_validator('ignore_empty'),
-            toolkit.get_converter('convert_to_extras')]})
-        schema.update({'temporal_end': [
-            toolkit.get_validator('ignore_empty'),
-            toolkit.get_converter('convert_to_extras')]})
-        schema.update({'issued': [
-            toolkit.get_validator('ignore_missing'),
-            toolkit.get_validator('ignore_empty'),
-            toolkit.get_converter('convert_to_extras')]})
-
     def map_fields(self, context, harvest_object):
         values = json.loads(harvest_object.content)
 
         package_dict = dict()
-        package_dict.update({'resources': [], 'tags': [], 'groups': []})
+        package_dict.update({'resources': [], 'tags': [], 'groups': [], 'extras': []})
         title = values['Titel']
         package_dict.update({'title': title})
         package_dict.update({'name': self._gen_new_name(title)})
@@ -161,17 +148,10 @@ class StatistikamtNordHarvester(ODSHBaseHarvester):
             package_dict['url'] = ""
         package_dict.update({'type': 'dataset'})
 
-        package_dict.update({'licenseAttributionByText': 'Statistisches Amt für Hamburg und Schleswig-Holstein -'
-                                                         ' Anstalt des öffentlichen Rechts - (Statistikamt Nord)'})
-        package_dict.update({'temporal_start': datetime.datetime.strptime(values['ZeitraumVon'], '%Y-%m-%d').isoformat()})
-        package_dict.update({'temporal_end': datetime.datetime.strptime(values['ZeitraumBis'], '%Y-%m-%d').isoformat()})
-        package_dict.update({'spatial_uri': 'http://dcat-ap.de/def/politicalGeocoding/stateKey/01'})
-        # issued sollte noch geliefert werden!
-        package_dict.update({'issued': datetime.datetime.utcnow().isoformat()})
         self.add_ressources(package_dict, values)
 
         self.add_tags(package_dict, values)
-
+        self.add_extras(package_dict, values)
         self.map_to_group(package_dict, values)
 
         source_dataset = get_action('package_show')(context.copy(), {'id': harvest_object.source.id})
@@ -183,9 +163,7 @@ class StatistikamtNordHarvester(ODSHBaseHarvester):
         try:
             context = {'user': self._get_user_name(), 'return_id_only': True, 'ignore_auth': True}
             package_plugin = lib_plugins.lookup_package_plugin(package_dict.get('type', None))
-            package_schema = package_plugin.create_package_schema()
-            self._update_schema(package_schema)
-            context['schema'] = package_schema
+            context['schema'] = package_plugin.create_package_schema()
             self._handle_current_harvest_object(harvest_object, harvest_object.guid)
             result = toolkit.get_action('package_create')(context, package_dict)
             return result
@@ -193,6 +171,27 @@ class StatistikamtNordHarvester(ODSHBaseHarvester):
             self._save_object_error('Validation Error: %s' % str(e.error_summary), harvest_object, 'Import')
             return False
 
+    @staticmethod
+    def add_extras(package_dict, values):
+        # issued sollte noch geliefert werden!
+        package_dict['extras'].append({
+          'key': 'issued', 'value': datetime.datetime.now().isoformat()})
+        try:
+            if values['ZeitraumVon'] != "":
+                package_dict['extras'].append({
+                  'key': 'temporal_start', 'value': values['ZeitraumVon']})
+            if values['ZeitraumBis'] != "":
+                package_dict['extras'].append({
+                  'key': 'temporal_end', 'value': values['ZeitraumBis']})
+        except KeyError as kerr:
+            log.debug("Date not available: " + str(kerr))
+        package_dict['extras'].append({
+          'key': 'spatial_uri', 'value': 'http://dcat-ap.de/def/politicalGeocoding/stateKey/01'})
+        package_dict['extras'].append({
+          'key': 'licenseAttributionByText',
+          'value': 'Statistisches Amt für Hamburg und Schleswig-Holstein - '
+              'Anstalt des öffentlichen Rechts - (Statistikamt Nord)'})
+
     @staticmethod
     def add_tags(package_dict, values):
         tags = values['Schlagwoerter']['Schlagwort']
diff --git a/ckanext/odsh/plugin.py b/ckanext/odsh/plugin.py
index 53e2a145d070f9ad21bd3ec15dbcd30f4e2029b2..0edcd9ebc06c055f7791672b4dc84276c1593b90 100644
--- a/ckanext/odsh/plugin.py
+++ b/ckanext/odsh/plugin.py
@@ -129,12 +129,17 @@ def odsh_validate_extra_date(key, field, data, errors, context):
     value = _extract_value(key, data, field)
 
     if not value:
-        raise toolkit.Invalid(field+':odsh_'+field+'_error_label')
-
-    try:
-        datetime.datetime.strptime(value, '%Y-%m-%d')
-    except ValueError:
-        raise toolkit.Invalid(field+':odsh_'+field+'_not_date_error_label')
+        # Statistikamt Nord does not always provide temporal_start/end,
+        # but their datasets have to be accepted as they are.
+        if data[('id',)][:7] != 'StaNord':
+            raise toolkit.Invalid(field+':odsh_'+field+'_error_label')
+    else:
+        try:
+            # date.split('T')[0] will yield "2012-01-01"
+            # no matter if the date is like "2012-01-01" or "2012-01-01T00:00:00"
+            datetime.datetime.strptime(value.split('T')[0],'%Y-%m-%d').isoformat()
+        except ValueError:
+            raise toolkit.Invalid(field+':odsh_'+field+'_not_date_error_label')
 
 
 def odsh_validate_extra_date_factory(field):
diff --git a/ckanext/odsh/tests/test_search.py b/ckanext/odsh/tests/test_search.py
index d4fafeb2f506e73ede7e03753349bb0dd85a0942..a0eb81915003fe0f962205f1ccd2b27c535a34ed 100644
--- a/ckanext/odsh/tests/test_search.py
+++ b/ckanext/odsh/tests/test_search.py
@@ -21,7 +21,6 @@ class TestSearch(helpers.FunctionalTestBase):
     def teardown(self):
         model.repo.rebuild_db()
 
-
     @odsh_test()
     def test_dataset_is_in_search_result(self):
         # arrange
@@ -58,9 +57,9 @@ class TestSearch(helpers.FunctionalTestBase):
     @odsh_test()
     def test_query_with_start_date_finds_one_dataset(self):
         # arrange
-        datasetA = self._create_dataset('dataseta', '01-01-1960', '31-12-1960')
-        datasetB = self._create_dataset('datasetb', '01-01-1980', '30-06-1990')
-        datasetC = self._create_dataset('datasetc', '01-03-2001', '30-04-2001')
+        datasetA = self._create_dataset('dataseta', '1960-01-01', '1960-12-31')
+        datasetB = self._create_dataset('datasetb', '1980-01-01', '1990-06-30')
+        datasetC = self._create_dataset('datasetc', '2001-03-01', '2001-04-30')
 
         # act
         response1 = self._perform_date_search(None, '1990-01-01')
@@ -95,15 +94,19 @@ class TestSearch(helpers.FunctionalTestBase):
     def _assert_no_results(self, response):
         assert "No datasets found" in response
 
-    def _create_dataset(self, name='my-own-dataset', temporal_start='27-01-2000', temporal_end='27-01-2000'):
+    def _create_dataset(self, name='my-own-dataset', temporal_start='2000-01-27', temporal_end='2000-01-27'):
         user = factories.User()
+        extras = [
+            {'key': 'temporal_start', 'value': temporal_start},
+            {'key': 'temporal_end', 'value': temporal_end},
+            {'key': 'issued', 'value': '2000-01-27'},
+            {'key': 'spatial_uri', 'value': 'http://dcat-ap.de/def/politicalGeocoding/districtKey/01001'}
+        ]
         return factories.Dataset(user=user,
                                  name=name,
                                  title='My very own dataset',
                                  issued='27-01-2000',
-                                 spatial_uri='http://dcat-ap.de/def/politicalGeocoding/districtKey/01001',
-                                 temporal_start=temporal_start,
-                                 temporal_end=temporal_end)
+                                 extras=extras)
 
     def _perform_search(self, query=None):
         search_form = self._perform_search_for_form('dataset-search-box-form')
diff --git a/ckanext/odsh/tests/test_statistikNordHarvester.py b/ckanext/odsh/tests/test_statistikNordHarvester.py
new file mode 100644
index 0000000000000000000000000000000000000000..a662c84d5340604f80814b519648f73f085026ee
--- /dev/null
+++ b/ckanext/odsh/tests/test_statistikNordHarvester.py
@@ -0,0 +1,628 @@
+# -*- coding: utf-8 -*-
+
+from collections import defaultdict
+
+import nose
+import httpretty
+from mock import patch
+
+from six.moves import xrange
+
+import ckan.plugins as p
+import ckantoolkit.tests.helpers as h
+
+import ckanext.harvest.model as harvest_model
+from ckanext.harvest import queue
+
+from ckanext.odsh.harvesters import StatistikamtNordHarvester
+from ckanext.dcat.interfaces import IDCATRDFHarvester
+import ckanext.dcat.harvesters.rdf
+
+
+eq_ = nose.tools.eq_
+
+
+# This horrible monkey patch is needed because httpretty does not play well
+# with redis, so we need to disable it straight after the mocked call is used.
+# See https://github.com/gabrielfalcao/HTTPretty/issues/113
+
+# Start monkey-patch
+
+original_rdf_get_content_and_type = DCATRDFHarvester._get_content_and_type
+
+def _patched_rdf_get_content_and_type(self, url, harvest_job, page=1, content_type=None):
+
+    httpretty.enable()
+
+    value1, value2 = original_rdf_get_content_and_type(self, url, harvest_job, page, content_type)
+
+    httpretty.disable()
+
+    return value1, value2
+
+DCATRDFHarvester._get_content_and_type = _patched_rdf_get_content_and_type
+
+original_json_get_content_and_type = DCATJSONHarvester._get_content_and_type
+
+def _patched_json_get_content_and_type(self, url, harvest_job, page=1, content_type=None):
+
+    httpretty.enable()
+
+    value1, value2 = original_json_get_content_and_type(self, url, harvest_job, page, content_type)
+
+    httpretty.disable()
+
+    return value1, value2
+
+DCATJSONHarvester._get_content_and_type = _patched_json_get_content_and_type
+
+# End monkey-patch
+
+
+class TestRDFHarvester(p.SingletonPlugin):
+
+    p.implements(IDCATRDFHarvester)
+
+    calls = defaultdict(int)
+
+    def before_download(self, url, harvest_job):
+
+        self.calls['before_download'] += 1
+
+        if url == 'http://return.none':
+            return None, []
+        elif url == 'http://return.errors':
+            return None, ['Error 1', 'Error 2']
+        else:
+            return url, []
+
+    def update_session(self, session):
+        self.calls['update_session'] += 1
+        session.headers.update({'x-test': 'true'})
+        return session
+
+    def after_download(self, content, harvest_job):
+
+        self.calls['after_download'] += 1
+
+        if content == 'return.empty.content':
+            return None, []
+        elif content == 'return.errors':
+            return None, ['Error 1', 'Error 2']
+        else:
+            return content, []
+
+    def before_update(self, harvest_object, dataset_dict, temp_dict):
+        self.calls['before_update'] += 1
+
+    def after_update(self, harvest_object, dataset_dict, temp_dict):
+        self.calls['after_update'] += 1
+        return None
+
+    def before_create(self, harvest_object, dataset_dict, temp_dict):
+        self.calls['before_create'] += 1
+
+    def after_create(self, harvest_object, dataset_dict, temp_dict):
+        self.calls['after_create'] += 1
+        return None
+
+
+
+class FunctionalHarvestTest(object):
+
+    @classmethod
+    def setup_class(cls):
+
+        h.reset_db()
+
+        cls.gather_consumer = queue.get_gather_consumer()
+        cls.fetch_consumer = queue.get_fetch_consumer()
+
+        # Minimal remote RDF file
+        cls.rdf_mock_url = 'http://some.dcat.file.rdf'
+        cls.rdf_content_type = 'application/rdf+xml'
+        cls.rdf_content = '''<?xml version="1.0" encoding="utf-8" ?>
+        <rdf:RDF
+         xmlns:dct="http://purl.org/dc/terms/"
+         xmlns:dcat="http://www.w3.org/ns/dcat#"
+         xmlns:xsd="http://www.w3.org/2001/XMLSchema#"
+         xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#">
+        <dcat:Catalog rdf:about="https://data.some.org/catalog">
+          <dcat:dataset>
+            <dcat:Dataset rdf:about="https://data.some.org/catalog/datasets/1">
+              <dct:title>Example dataset 1</dct:title>
+            </dcat:Dataset>
+          </dcat:dataset>
+          <dcat:dataset>
+            <dcat:Dataset rdf:about="https://data.some.org/catalog/datasets/2">
+              <dct:title>Example dataset 2</dct:title>
+            </dcat:Dataset>
+          </dcat:dataset>
+        </dcat:Catalog>
+        </rdf:RDF>
+        '''
+
+        # Minimal remote RDF file with pagination (1)
+        # Use slashes for paginated URLs because HTTPretty won't distinguish
+        # query strings
+        cls.rdf_mock_url_pagination_1 = 'http://some.dcat.file.pagination.rdf'
+        cls.rdf_content_pagination_1 = '''<?xml version="1.0" encoding="utf-8" ?>
+        <rdf:RDF
+         xmlns:dct="http://purl.org/dc/terms/"
+         xmlns:dcat="http://www.w3.org/ns/dcat#"
+         xmlns:xsd="http://www.w3.org/2001/XMLSchema#"
+         xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
+         xmlns:hydra="http://www.w3.org/ns/hydra/core#">
+        <dcat:Catalog rdf:about="https://data.some.org/catalog">
+          <dcat:dataset>
+            <dcat:Dataset rdf:about="https://data.some.org/catalog/datasets/1">
+              <dct:title>Example dataset 1</dct:title>
+            </dcat:Dataset>
+          </dcat:dataset>
+          <dcat:dataset>
+            <dcat:Dataset rdf:about="https://data.some.org/catalog/datasets/2">
+              <dct:title>Example dataset 2</dct:title>
+            </dcat:Dataset>
+          </dcat:dataset>
+        </dcat:Catalog>
+        <hydra:PagedCollection rdf:about="http://some.dcat.file.pagination.rdf/page/1">
+            <hydra:totalItems rdf:datatype="http://www.w3.org/2001/XMLSchema#integer">4</hydra:totalItems>
+            <hydra:lastPage>http://some.dcat.file.pagination.rdf/page/2</hydra:lastPage>
+            <hydra:itemsPerPage rdf:datatype="http://www.w3.org/2001/XMLSchema#integer">2</hydra:itemsPerPage>
+            <hydra:nextPage>http://some.dcat.file.pagination.rdf/page/2</hydra:nextPage>
+            <hydra:firstPage>http://some.dcat.file.pagination.rdf/page/1</hydra:firstPage>
+        </hydra:PagedCollection>
+        </rdf:RDF>
+        '''
+
+        # Minimal remote RDF file with pagination (2)
+        cls.rdf_mock_url_pagination_2 = 'http://some.dcat.file.pagination.rdf/page/2'
+        cls.rdf_content_pagination_2 = '''<?xml version="1.0" encoding="utf-8" ?>
+        <rdf:RDF
+         xmlns:dct="http://purl.org/dc/terms/"
+         xmlns:dcat="http://www.w3.org/ns/dcat#"
+         xmlns:xsd="http://www.w3.org/2001/XMLSchema#"
+         xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
+         xmlns:hydra="http://www.w3.org/ns/hydra/core#">
+        <dcat:Catalog rdf:about="https://data.some.org/catalog">
+          <dcat:dataset>
+            <dcat:Dataset rdf:about="https://data.some.org/catalog/datasets/3">
+              <dct:title>Example dataset 3</dct:title>
+            </dcat:Dataset>
+          </dcat:dataset>
+          <dcat:dataset>
+            <dcat:Dataset rdf:about="https://data.some.org/catalog/datasets/4">
+              <dct:title>Example dataset 4</dct:title>
+            </dcat:Dataset>
+          </dcat:dataset>
+        </dcat:Catalog>
+        <hydra:PagedCollection rdf:about="http://some.dcat.file.pagination.rdf/page/1">
+            <hydra:totalItems rdf:datatype="http://www.w3.org/2001/XMLSchema#integer">4</hydra:totalItems>
+            <hydra:lastPage>http://some.dcat.file.pagination.rdf/page/2</hydra:lastPage>
+            <hydra:itemsPerPage rdf:datatype="http://www.w3.org/2001/XMLSchema#integer">2</hydra:itemsPerPage>
+            <hydra:previousPage>http://some.dcat.file.pagination.rdf/page/1</hydra:previousPage>
+            <hydra:firstPage>http://some.dcat.file.pagination.rdf/page/1</hydra:firstPage>
+        </hydra:PagedCollection>
+        </rdf:RDF>
+        '''
+
+        # Minimal remote RDF file
+        cls.rdf_mock_url = 'http://some.dcat.file.rdf'
+        cls.rdf_content_type = 'application/rdf+xml'
+        cls.rdf_content = '''<?xml version="1.0" encoding="utf-8" ?>
+        <rdf:RDF
+         xmlns:dct="http://purl.org/dc/terms/"
+         xmlns:dcat="http://www.w3.org/ns/dcat#"
+         xmlns:xsd="http://www.w3.org/2001/XMLSchema#"
+         xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#">
+        <dcat:Catalog rdf:about="https://data.some.org/catalog">
+          <dcat:dataset>
+            <dcat:Dataset rdf:about="https://data.some.org/catalog/datasets/1">
+              <dct:title>Example dataset 1</dct:title>
+            </dcat:Dataset>
+          </dcat:dataset>
+          <dcat:dataset>
+            <dcat:Dataset rdf:about="https://data.some.org/catalog/datasets/2">
+              <dct:title>Example dataset 2</dct:title>
+            </dcat:Dataset>
+          </dcat:dataset>
+        </dcat:Catalog>
+        </rdf:RDF>
+        '''
+
+        cls.rdf_remote_file_small = '''<?xml version="1.0" encoding="utf-8" ?>
+        <rdf:RDF
+         xmlns:dct="http://purl.org/dc/terms/"
+         xmlns:dcat="http://www.w3.org/ns/dcat#"
+         xmlns:xsd="http://www.w3.org/2001/XMLSchema#"
+         xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#">
+        <dcat:Catalog rdf:about="https://data.some.org/catalog">
+          <dcat:dataset>
+            <dcat:Dataset rdf:about="https://data.some.org/catalog/datasets/1">
+              <dct:title>Example dataset 1</dct:title>
+            </dcat:Dataset>
+          </dcat:dataset>
+        </dcat:Catalog>
+        </rdf:RDF>
+        '''
+
+        # RDF with minimal distribution
+        cls.rdf_content_with_distribution_uri = '''<?xml version="1.0" encoding="utf-8" ?>
+        <rdf:RDF
+         xmlns:dct="http://purl.org/dc/terms/"
+         xmlns:dcat="http://www.w3.org/ns/dcat#"
+         xmlns:xsd="http://www.w3.org/2001/XMLSchema#"
+         xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#">
+        <dcat:Catalog rdf:about="https://data.some.org/catalog">
+          <dcat:dataset>
+            <dcat:Dataset rdf:about="https://data.some.org/catalog/datasets/1">
+              <dct:title>Example dataset 1</dct:title>
+              <dcat:distribution>
+                <dcat:Distribution rdf:about="https://data.some.org/catalog/datasets/1/resource/1">
+                  <dct:title>Example resource 1</dct:title>
+                  <dcat:accessURL>http://data.some.org/download.zip</dcat:accessURL>
+                </dcat:Distribution>
+              </dcat:distribution>
+            </dcat:Dataset>
+          </dcat:dataset>
+        </dcat:Catalog>
+        </rdf:RDF>
+        '''
+        cls.rdf_content_with_distribution = '''<?xml version="1.0" encoding="utf-8" ?>
+        <rdf:RDF
+         xmlns:dct="http://purl.org/dc/terms/"
+         xmlns:dcat="http://www.w3.org/ns/dcat#"
+         xmlns:xsd="http://www.w3.org/2001/XMLSchema#"
+         xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#">
+        <dcat:Catalog rdf:about="https://data.some.org/catalog">
+          <dcat:dataset>
+            <dcat:Dataset rdf:about="https://data.some.org/catalog/datasets/1">
+              <dct:title>Example dataset 1</dct:title>
+              <dcat:distribution>
+                <dcat:Distribution>
+                  <dct:title>Example resource 1</dct:title>
+                  <dcat:accessURL>http://data.some.org/download.zip</dcat:accessURL>
+                </dcat:Distribution>
+              </dcat:distribution>
+            </dcat:Dataset>
+          </dcat:dataset>
+        </dcat:Catalog>
+        </rdf:RDF>
+        '''
+
+    def setup(self):
+
+        harvest_model.setup()
+
+        queue.purge_queues()
+
+    def teardown(cls):
+        h.reset_db()
+
+    def _create_harvest_source(self, mock_url, **kwargs):
+
+        source_dict = {
+            'title': 'Test RDF DCAT Source',
+            'name': 'test-rdf-dcat-source',
+            'url': mock_url,
+            'source_type': 'dcat_rdf',
+        }
+
+        source_dict.update(**kwargs)
+
+        harvest_source = h.call_action('harvest_source_create',
+                                       {}, **source_dict)
+
+        return harvest_source
+
+    def _create_harvest_job(self, harvest_source_id):
+
+        harvest_job = h.call_action('harvest_job_create',
+                                    {}, source_id=harvest_source_id)
+
+        return harvest_job
+
+    def _run_jobs(self, harvest_source_id=None):
+        try:
+            h.call_action('harvest_jobs_run',
+                          {}, source_id=harvest_source_id)
+        except Exception, e:
+            if (str(e) == 'There are no new harvesting jobs'):
+                pass
+
+    def _gather_queue(self, num_jobs=1):
+
+        for job in xrange(num_jobs):
+            # Pop one item off the queue (the job id) and run the callback
+            reply = self.gather_consumer.basic_get(
+                queue='ckan.harvest.gather.test')
+
+            # Make sure something was sent to the gather queue
+            assert reply[2], 'Empty gather queue'
+
+            # Send the item to the gather callback, which will call the
+            # harvester gather_stage
+            queue.gather_callback(self.gather_consumer, *reply)
+
+    def _fetch_queue(self, num_objects=1):
+
+        for _object in xrange(num_objects):
+            # Pop item from the fetch queues (object ids) and run the callback,
+            # one for each object created
+            reply = self.fetch_consumer.basic_get(
+                queue='ckan.harvest.fetch.test')
+
+            # Make sure something was sent to the fetch queue
+            assert reply[2], 'Empty fetch queue, the gather stage failed'
+
+            # Send the item to the fetch callback, which will call the
+            # harvester fetch_stage and import_stage
+            queue.fetch_callback(self.fetch_consumer, *reply)
+
+    def _run_full_job(self, harvest_source_id, num_jobs=1, num_objects=1):
+
+        # Create new job for the source
+        self._create_harvest_job(harvest_source_id)
+
+        # Run the job
+        self._run_jobs(harvest_source_id)
+
+        # Handle the gather queue
+        self._gather_queue(num_jobs)
+
+        # Handle the fetch queue
+        self._fetch_queue(num_objects)
+
+
+class TestDCATHarvestFunctional(FunctionalHarvestTest):
+
+    def test_harvest_create_rdf(self):
+
+        self._test_harvest_create(self.rdf_mock_url,
+                                  self.rdf_content,
+                                  self.rdf_content_type)
+
+    def _test_harvest_create(self, url, content, content_type, **kwargs):
+
+        # Mock the GET request to get the file
+        httpretty.register_uri(httpretty.GET, url,
+                               body=content, content_type=content_type)
+
+        # The harvester will try to do a HEAD request first so we need to mock
+        # this as well
+        httpretty.register_uri(httpretty.HEAD, url,
+                               status=405, content_type=content_type)
+
+        harvest_source = self._create_harvest_source(url, **kwargs)
+
+        self._run_full_job(harvest_source['id'], num_objects=2)
+
+        # Check that two datasets were created
+        fq = "+type:dataset harvest_source_id:{0}".format(harvest_source['id'])
+        results = h.call_action('package_search', {}, fq=fq)
+
+        eq_(results['count'], 2)
+        for result in results['results']:
+            assert result['title'] in ('Example dataset 1',
+                                       'Example dataset 2')
+
+    def test_harvest_create_rdf_pagination(self):
+
+        # Mock the GET requests needed to get the file
+        httpretty.register_uri(httpretty.GET, self.rdf_mock_url_pagination_1,
+                               body=self.rdf_content_pagination_1,
+                               content_type=self.rdf_content_type)
+
+        httpretty.register_uri(httpretty.GET, self.rdf_mock_url_pagination_2,
+                               body=self.rdf_content_pagination_2,
+                               content_type=self.rdf_content_type)
+
+        # The harvester will try to do a HEAD request first so we need to mock
+        # them as well
+        httpretty.register_uri(httpretty.HEAD, self.rdf_mock_url_pagination_1,
+                               status=405,
+                               content_type=self.rdf_content_type)
+
+        httpretty.register_uri(httpretty.HEAD, self.rdf_mock_url_pagination_2,
+                               status=405,
+                               content_type=self.rdf_content_type)
+
+        harvest_source = self._create_harvest_source(
+            self.rdf_mock_url_pagination_1)
+
+        self._run_full_job(harvest_source['id'], num_objects=4)
+
+        # Check that four datasets were created
+        fq = "+type:dataset harvest_source_id:{0}".format(harvest_source['id'])
+        results = h.call_action('package_search', {}, fq=fq)
+
+        eq_(results['count'], 4)
+        eq_(sorted([d['title'] for d in results['results']]),
+            ['Example dataset 1', 'Example dataset 2',
+             'Example dataset 3', 'Example dataset 4'])
+
+    def test_harvest_create_rdf_pagination_same_content(self):
+
+        # Mock the GET requests needed to get the file. Two different URLs but
+        # same content to mock a misconfigured server
+        httpretty.register_uri(httpretty.GET, self.rdf_mock_url_pagination_1,
+                               body=self.rdf_content_pagination_1,
+                               content_type=self.rdf_content_type)
+
+        httpretty.register_uri(httpretty.GET, self.rdf_mock_url_pagination_2,
+                               body=self.rdf_content_pagination_1,
+                               content_type=self.rdf_content_type)
+
+        # The harvester will try to do a HEAD request first so we need to mock
+        # them as well
+        httpretty.register_uri(httpretty.HEAD, self.rdf_mock_url_pagination_1,
+                               status=405,
+                               content_type=self.rdf_content_type)
+
+        httpretty.register_uri(httpretty.HEAD, self.rdf_mock_url_pagination_2,
+                               status=405,
+                               content_type=self.rdf_content_type)
+
+        harvest_source = self._create_harvest_source(
+            self.rdf_mock_url_pagination_1)
+
+        self._run_full_job(harvest_source['id'], num_objects=2)
+
+        # Check that two datasets were created
+        fq = "+type:dataset harvest_source_id:{0}".format(harvest_source['id'])
+        results = h.call_action('package_search', {}, fq=fq)
+
+        eq_(results['count'], 2)
+        eq_(sorted([d['title'] for d in results['results']]),
+            ['Example dataset 1', 'Example dataset 2'])
+
+    def test_harvest_update_unicode_keywords(self):
+
+        self._test_harvest_create(self.ttl_mock_url,
+                                  self.ttl_unicode_in_keywords,
+                                  self.ttl_content_type)
+
+    def test_harvest_update_commas_keywords(self):
+
+        self._test_harvest_update(self.ttl_mock_url,
+                                  self.ttl_commas_in_keywords,
+                                  self.ttl_content_type)
+
+    def _test_harvest_update(self, url, content, content_type):
+        # Mock the GET request to get the file
+        httpretty.register_uri(httpretty.GET, url,
+                               body=content, content_type=content_type)
+
+        # The harvester will try to do a HEAD request first so we need to mock
+        # this as well
+        httpretty.register_uri(httpretty.HEAD, url,
+                               status=405, content_type=content_type)
+
+        harvest_source = self._create_harvest_source(url)
+
+        # First run, will create two datasets as previously tested
+        self._run_full_job(harvest_source['id'], num_objects=2)
+
+        # Run the jobs to mark the previous one as Finished
+        self._run_jobs()
+
+        # Mock an update in the remote file
+        new_file = content.replace('Example dataset 1',
+                                   'Example dataset 1 (updated)')
+        httpretty.register_uri(httpretty.GET, url,
+                               body=new_file, content_type=content_type)
+
+        # Run a second job
+        self._run_full_job(harvest_source['id'], num_objects=2)
+
+        # Check that we still have two datasets
+        fq = "+type:dataset harvest_source_id:{0}".format(harvest_source['id'])
+        results = h.call_action('package_search', {}, fq=fq)
+
+        eq_(results['count'], 2)
+
+        # Check that the dataset was updated
+        for result in results['results']:
+            assert result['title'] in ('Example dataset 1 (updated)',
+                                       'Example dataset 2')
+
+    def test_harvest_update_existing_resources(self):
+
+        existing, new = self._test_harvest_update_resources(self.rdf_mock_url,
+                                  self.rdf_content_with_distribution_uri,
+                                  self.rdf_content_type)
+        eq_(new['uri'], 'https://data.some.org/catalog/datasets/1/resource/1')
+        eq_(new['uri'], existing['uri'])
+        eq_(new['id'], existing['id'])
+
+    def test_harvest_update_new_resources(self):
+
+        existing, new = self._test_harvest_update_resources(self.rdf_mock_url,
+                                  self.rdf_content_with_distribution,
+                                  self.rdf_content_type)
+        eq_(existing['uri'], '')
+        eq_(new['uri'], '')
+        nose.tools.assert_is_not(new['id'], existing['id'])
+
+    def _test_harvest_update_resources(self, url, content, content_type):
+        # Mock the GET request to get the file
+        httpretty.register_uri(httpretty.GET, url,
+                               body=content, content_type=content_type)
+
+        # The harvester will try to do a HEAD request first so we need to mock
+        # this as well
+        httpretty.register_uri(httpretty.HEAD, url,
+                               status=405, content_type=content_type)
+
+        harvest_source = self._create_harvest_source(url)
+
+        # First run, create the dataset with the resource
+        self._run_full_job(harvest_source['id'], num_objects=1)
+
+        # Run the jobs to mark the previous one as Finished
+        self._run_jobs()
+
+        # get the created dataset
+        fq = "+type:dataset harvest_source_id:{0}".format(harvest_source['id'])
+        results = h.call_action('package_search', {}, fq=fq)
+        eq_(results['count'], 1)
+
+        existing_dataset = results['results'][0]
+        existing_resource = existing_dataset.get('resources')[0]
+
+        # Mock an update in the remote file
+        new_file = content.replace('Example resource 1',
+                                   'Example resource 1 (updated)')
+        httpretty.register_uri(httpretty.GET, url,
+                               body=new_file, content_type=content_type)
+
+        # Run a second job
+        self._run_full_job(harvest_source['id'])
+
+        # get the updated dataset
+        new_results = h.call_action('package_search', {}, fq=fq)
+        eq_(new_results['count'], 1)
+
+        new_dataset = new_results['results'][0]
+        new_resource = new_dataset.get('resources')[0]
+
+        eq_(existing_resource['name'], 'Example resource 1')
+        eq_(len(new_dataset.get('resources')), 1)
+        eq_(new_resource['name'], 'Example resource 1 (updated)')
+        return (existing_resource, new_resource)
+
+    def test_harvest_bad_format_rdf(self):
+
+        self._test_harvest_bad_format(self.rdf_mock_url,
+                                      self.rdf_remote_file_invalid,
+                                      self.rdf_content_type)
+
+    def _test_harvest_bad_format(self, url, bad_content, content_type):
+
+        # Mock the GET request to get the file
+        httpretty.register_uri(httpretty.GET, url,
+                               body=bad_content, content_type=content_type)
+
+        # The harvester will try to do a HEAD request first so we need to mock
+        # this as well
+        httpretty.register_uri(httpretty.HEAD, url,
+                               status=405, content_type=content_type)
+
+        harvest_source = self._create_harvest_source(url)
+        self._create_harvest_job(harvest_source['id'])
+        self._run_jobs(harvest_source['id'])
+        self._gather_queue(1)
+
+        # Run the jobs to mark the previous one as Finished
+        self._run_jobs()
+
+        # Get the harvest source with the udpated status
+        harvest_source = h.call_action('harvest_source_show',
+                                       id=harvest_source['id'])
+
+        last_job_status = harvest_source['status']['last_job']
+
+        eq_(last_job_status['status'], 'Finished')
+        assert ('Error parsing the RDF file'
+                in last_job_status['gather_error_summary'][0][0])
+