Skip to content
Snippets Groups Projects
Commit 251cf65d authored by anonymous's avatar anonymous
Browse files

add harvest tests

parent 4c5e8653
Branches
Tags
No related merge requests found
import socket
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
import time
import requests
import pdb
import threading
import asyncore
hostName = "localhost"
hostPort = 5002
# data = "test"
# with open('catalog.rdf', 'r') as myfile:
# data=myfile.read()
# TODO: better was to set data on RequestHandler
data = ""
class RequestHandler(BaseHTTPRequestHandler):
# GET
def do_GET(self):
self.send_response(requests.codes.ok)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
self.wfile.write(data.encode("utf-8"))
# POST
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
self.send_response(200)
print(post_data)
self.wfile.write("".encode("utf-8")) # send back to client
def do_HEAD(self):
self.send_response(requests.codes.ok)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
class HarvestServerMock(threading.Thread):
def __init__(self):
# init thread
self._stop_event = threading.Event()
self.thread_name = self.__class__
self.server = HTTPServer((hostName, hostPort), RequestHandler)
threading.Thread.__init__(self, name=self.thread_name, target=self.server.serve_forever)
# self.setDaemon(True)
# def run(self):
# while not self._stop_event.isSet():
# asyncore.loop(timeout=0.01, count=1)
# pdb.set_trace()
# try:
# except KeyboardInterrupt:
# pass
def close(self):
self.server.server_close()
# print(time.asctime(), "Server Stops - %s:%s" % (hostName, hostPort))
This diff is collapsed.
from ckanext.odsh.tests.test_helpers import AppProxy
import ckan.tests.factories as factories
import uuid
import pdb
from ckanext.odsh.tests.harvest_sever_mock import HarvestServerMock
import ckanext.odsh.tests.harvest_sever_mock as harvest_sever_mock
import subprocess
class TestHarvest:
def _create_harvester(self):
guid = str(uuid.uuid4())
self.org = factories.Organization(
name="test_harvest_org_" + guid,
users=[{'name': 'ckanuser', 'capacity': 'admin'}]
)
self._get_app().login()
response = self.app.get('/harvest/new')
form = response.forms[0]
title = 'harvest_test_source_' + guid
form['title'] = title
form['url'] = "http://localhost:5002/" + guid
final_response = self.app.submit_form(form)
# submit_response = self.app.submit_form(form)
# assert 'missing value' in submit_response
assert 'There are no datasets associated to this harvest source.' in final_response
return title
def notest_create_harvester(self):
self._create_harvester()
def test_harvest_dcat(self):
# Arrange
harvester = self._create_harvester()
harvest_sever_mock.data = self._load_rdf_catalog()
server = HarvestServerMock()
server.start()
self.run_harvest(harvester)
# server.stop()
def run_harvest(self, harvester):
out = subprocess.check_output([
"paster", "--plugin=ckanext-harvest", "harvester", "run_test", harvester, "--config=/etc/ckan/default/development.ini"])
def _get_app(self):
if not hasattr(self, 'app'):
app = AppProxy()
self.app = app
return self.app
def _load_rdf_catalog(self):
with open('ckanext/odsh/tests/rdf_catalog.xml', 'r') as rdffile:
data = rdffile.read()
return data
......@@ -2,6 +2,8 @@ import contextlib
import errno
import functools
from ckan.common import config
import ckan.config.middleware
import ckan.tests.helpers as helpers
def odsh_test():
......@@ -28,3 +30,30 @@ def changed_config():
finally:
config.clear()
config.update(_original_config)
def _get_test_app():
app = ckan.config.middleware.make_app(config['global_conf'], **config)
app = helpers.CKANTestApp(app)
return app
class AppProxy:
def login(self):
app = _get_test_app()
response = app.get('/user/login')
login_form = response.forms[0]
login_form['login'] = 'ckanuser'
login_form['password'] = 'pass'
submit_response = login_form.submit('save')
final_response = helpers.webtest_maybe_follow(submit_response)
self.app = app
def get(self, path):
return self.app.get(path)
def submit_form(self, form):
submit_response = form.submit('save')
final_response = helpers.webtest_maybe_follow(submit_response)
# let's go to the last redirect in the chain
return final_response
\ No newline at end of file
......@@ -3,15 +3,12 @@ import rdflib
import ckan.tests.factories as factories
from ckan.common import config
import ckan.model as model
import pdb
import json
import ckanext.odsh.profiles as profiles
import urllib2
import ckan.tests.helpers as helpers
from ckan.common import config
import ckan.config.middleware
from routes import url_for
webtest_submit = helpers.webtest_submit
# run with nosetests --ckan --nologcapture --with-pylons=<config to test> ckanext/odsh/tests/test_routes.py
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment