Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
ckanext-odsh
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Open-Data
ckanext-odsh
Commits
e023df73
Commit
e023df73
authored
1 year ago
by
Thorge Petersen
Browse files
Options
Downloads
Patches
Plain Diff
Removed duplicate validation.py
parent
a6010f04
No related branches found
No related tags found
2 merge requests
!41
Version 2.0.0
,
!39
Resolve "Remove dead code"
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
validation.py
+0
-239
0 additions, 239 deletions
validation.py
with
0 additions
and
239 deletions
validation.py
deleted
100644 → 0
+
0
−
239
View file @
a6010f04
# This Python file uses the following encoding: utf-8
import
logging
import
csv
import
re
import
urllib.request
,
urllib
.
error
,
urllib
.
parse
import
json
from
itertools
import
count
from
dateutil.parser
import
parse
import
ckan.plugins.toolkit
as
toolkit
import
ckan.model
as
model
from
ckan.lib.navl.dictization_functions
import
Missing
import
pdb
_
=
toolkit
.
_
log
=
logging
.
getLogger
(
__name__
)
def
_extract_value
(
data
,
field
):
key
=
None
for
k
in
list
(
data
.
keys
()):
if
data
[
k
]
==
field
:
key
=
k
break
if
key
is
None
:
return
None
return
data
[(
key
[
0
],
key
[
1
],
'
value
'
)]
def
validate_extra_groups
(
data
,
requireAtLeastOne
,
errors
):
value
=
_extract_value
(
data
,
'
groups
'
)
if
value
!=
None
:
# 'value != None' means the extra key 'groups' was found,
# so the dataset came from manual editing via the web-frontend.
if
not
value
:
if
requireAtLeastOne
:
errors
[
'
groups
'
]
=
'
at least one group needed
'
data
[(
'
groups
'
,
0
,
'
id
'
)]
=
''
return
groups
=
[
g
.
strip
()
for
g
in
value
.
split
(
'
,
'
)
if
value
.
strip
()]
for
k
in
list
(
data
.
keys
()):
if
len
(
k
)
==
3
and
k
[
0
]
==
'
groups
'
:
data
[
k
]
=
''
# del data[k]
if
len
(
groups
)
==
0
:
if
requireAtLeastOne
:
errors
[
'
groups
'
]
=
'
at least one group needed
'
return
for
num
,
group
in
zip
(
list
(
range
(
len
(
groups
))),
groups
):
data
[(
'
groups
'
,
num
,
'
id
'
)]
=
group
else
:
# no extra-field 'groups'
# dataset might come from a harvest process
if
not
data
.
get
((
'
groups
'
,
0
,
'
id
'
),
False
)
and
\
not
data
.
get
((
'
groups
'
,
0
,
'
name
'
),
False
):
errors
[
'
groups
'
]
=
'
at least one group needed
'
def
validate_extras
(
key
,
data
,
errors
,
context
):
extra_errors
=
{}
isStaNord
=
(
'
id
'
,)
in
data
and
data
[(
'
id
'
,)][:
7
]
==
'
StaNord
'
validate_extra_groups
(
data
,
True
,
extra_errors
)
validate_extra_date_new
(
key
,
'
issued
'
,
data
,
isStaNord
,
extra_errors
)
validate_extra_date_new
(
key
,
'
temporal_start
'
,
data
,
isStaNord
,
extra_errors
)
validate_extra_date_new
(
key
,
'
temporal_end
'
,
data
,
True
,
extra_errors
)
if
len
(
list
(
extra_errors
.
values
())):
raise
toolkit
.
Invalid
(
extra_errors
)
def
_set_value
(
data
,
field
,
value
):
key
=
None
for
k
in
list
(
data
.
keys
()):
if
data
[
k
]
==
field
:
key
=
k
break
if
key
is
None
:
return
None
data
[(
key
[
0
],
key
[
1
],
'
value
'
)]
=
value
def
validate_extra_date_new
(
key
,
field
,
data
,
optional
,
errors
):
value
=
_extract_value
(
data
,
field
)
if
not
value
:
if
not
optional
:
errors
[
field
]
=
'
empty
'
return
else
:
if
re
.
match
(
r
'
\d\d\d\d-\d\d-\d\d
'
,
value
):
try
:
dt
=
parse
(
value
)
_set_value
(
data
,
field
,
dt
.
isoformat
())
return
except
ValueError
:
pass
errors
[
field
]
=
'
not a valid date
'
def
validate_licenseAttributionByText
(
key
,
data
,
errors
,
context
):
register
=
model
.
Package
.
get_license_register
()
isByLicense
=
False
for
k
in
data
:
if
len
(
k
)
>
0
and
k
[
0
]
==
'
license_id
'
and
data
[
k
]
and
not
isinstance
(
data
[
k
],
Missing
)
and
\
'
Namensnennung
'
in
register
[
data
[
k
]].
title
:
isByLicense
=
True
break
hasAttribution
=
False
for
k
in
data
:
if
data
[
k
]
==
'
licenseAttributionByText
'
:
if
isinstance
(
data
[(
k
[
0
],
k
[
1
],
'
value
'
)],
Missing
)
or
(
k
[
0
],
k
[
1
],
'
value
'
)
not
in
data
:
del
data
[(
k
[
0
],
k
[
1
],
'
value
'
)]
del
data
[(
k
[
0
],
k
[
1
],
'
key
'
)]
break
else
:
value
=
data
[(
k
[
0
],
k
[
1
],
'
value
'
)]
hasAttribution
=
value
!=
''
break
if
not
hasAttribution
:
current_indexes
=
[
k
[
1
]
for
k
in
list
(
data
.
keys
())
if
len
(
k
)
>
1
and
k
[
0
]
==
'
extras
'
]
new_index
=
max
(
current_indexes
)
+
1
if
current_indexes
else
0
data
[(
'
extras
'
,
new_index
,
'
key
'
)]
=
'
licenseAttributionByText
'
data
[(
'
extras
'
,
new_index
,
'
value
'
)]
=
''
if
isByLicense
and
not
hasAttribution
:
raise
toolkit
.
Invalid
(
'
licenseAttributionByText: empty not allowed
'
)
if
not
isByLicense
and
hasAttribution
:
raise
toolkit
.
Invalid
(
'
licenseAttributionByText: text not allowed for this license
'
)
def
known_spatial_uri
(
key
,
data
,
errors
,
context
):
value
=
_extract_value
(
data
,
'
spatial_uri
'
)
if
not
value
:
poly
=
None
# some harvesters might import a polygon directly...
# pdb.set_trace()
poly
=
_extract_value
(
data
,
'
spatial
'
)
has_old_uri
=
False
pkg
=
context
.
get
(
'
package
'
,
None
)
if
pkg
:
old_uri
=
pkg
.
extras
.
get
(
'
spatial_uri
'
,
None
)
has_old_uri
=
old_uri
!=
None
and
len
(
old_uri
)
>
0
if
not
poly
:
poly
=
pkg
.
extras
.
get
(
'
spatial
'
,
None
)
if
not
poly
or
has_old_uri
:
raise
toolkit
.
Invalid
(
'
spatial_uri: empty not allowed
'
)
else
:
if
poly
:
new_index
=
next_extra_index
(
data
)
data
[(
'
extras
'
,
new_index
+
1
,
'
key
'
)]
=
'
spatial
'
data
[(
'
extras
'
,
new_index
+
1
,
'
value
'
)]
=
poly
return
mapping_file
=
tk
.
config
.
get
(
'
ckanext.odsh.spatial.mapping
'
)
try
:
mapping_file
=
urllib
.
request
.
urlopen
(
mapping_file
)
except
Exception
:
raise
Exception
(
"
Could not load spatial mapping file!
"
)
not_found
=
True
spatial_text
=
str
()
spatial
=
str
()
cr
=
csv
.
reader
(
mapping_file
,
delimiter
=
"
\t
"
)
for
row
in
cr
:
if
row
[
0
].
encode
(
'
UTF-8
'
)
==
value
:
not_found
=
False
spatial_text
=
row
[
1
]
loaded
=
json
.
loads
(
row
[
2
])
spatial
=
json
.
dumps
(
loaded
[
'
geometry
'
])
break
if
not_found
:
raise
toolkit
.
Invalid
(
'
spatial_uri: uri unknown
'
)
new_index
=
next_extra_index
(
data
)
data
[(
'
extras
'
,
new_index
,
'
key
'
)]
=
'
spatial_text
'
data
[(
'
extras
'
,
new_index
,
'
value
'
)]
=
spatial_text
data
[(
'
extras
'
,
new_index
+
1
,
'
key
'
)]
=
'
spatial
'
data
[(
'
extras
'
,
new_index
+
1
,
'
value
'
)]
=
spatial
def
next_extra_index
(
data
):
current_indexes
=
[
k
[
1
]
for
k
in
list
(
data
.
keys
())
if
len
(
k
)
>
1
and
k
[
0
]
==
'
extras
'
]
return
max
(
current_indexes
)
+
1
if
current_indexes
else
0
def
tag_name_validator
(
value
,
context
):
tagname_match
=
re
.
compile
(
'
[\w \-.\:\(\)\´\`]*$
'
,
re
.
UNICODE
)
if
not
tagname_match
.
match
(
value
):
raise
toolkit
.
Invalid
(
_
(
'
Tag
"
%s
"
must be alphanumeric
'
'
characters or symbols: -_.:()
'
)
%
(
value
))
return
value
def
tag_string_convert
(
key
,
data
,
errors
,
context
):
'''
Takes a list of tags that is a comma-separated string (in data[key])
and parses tag names. These are added to the data dict, enumerated. They
are also validated.
'''
if
isinstance
(
data
[
key
],
str
):
tags
=
[
tag
.
strip
()
for
tag
in
data
[
key
].
split
(
'
,
'
)
if
tag
.
strip
()]
else
:
tags
=
data
[
key
]
current_index
=
max
([
int
(
k
[
1
])
for
k
in
list
(
data
.
keys
())
if
len
(
k
)
==
3
and
k
[
0
]
==
'
tags
'
]
+
[
-
1
])
for
num
,
tag
in
zip
(
count
(
current_index
+
1
),
tags
):
data
[(
'
tags
'
,
num
,
'
name
'
)]
=
tag
for
tag
in
tags
:
toolkit
.
get_validator
(
'
tag_length_validator
'
)(
tag
,
context
)
tag_name_validator
(
tag
,
context
)
def
get_validators
():
return
{
'
known_spatial_uri
'
:
known_spatial_uri
,
'
odsh_tag_name_validator
'
:
tag_name_validator
,
'
odsh_validate_extras
'
:
validate_extras
,
'
validate_licenseAttributionByText
'
:
validate_licenseAttributionByText
}
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment