Skip to content
Snippets Groups Projects
Commit a37013e9 authored by Tobias Bruns's avatar Tobias Bruns
Browse files

OZG-7679 add transfer of files

parent 85342ece
No related branches found
No related tags found
No related merge requests found
package de.ozgcloud.processor.file;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URL;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.springframework.core.io.Resource;
import de.ozgcloud.apilib.file.OzgCloudFileId;
import de.ozgcloud.processor.vorgang.FileMetaData;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
@Log4j2
@RequiredArgsConstructor
public class OzgCloudFileResouce implements Resource {
private final FileMetaData file;
private final BiConsumer<OzgCloudFileId, OutputStream> fileLoadWriter;
private final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1024);
private final OutputStream out = new QueueBasedOutputStream(queue);
private final InputStream in = new QueueBasedInputStream(queue);
private AtomicBoolean initialized = new AtomicBoolean(false);
@Override
public InputStream getInputStream() throws IOException {
if (!initialized.get()) {
initialize();
}
return in;
}
synchronized void initialize() {
if (!initialized.get()) {
new Thread(this::loadAndWriteFile);
}
initialized.set(true);
}
private void loadAndWriteFile() {
LOG.debug("starting file loading");
fileLoadWriter.accept(OzgCloudFileId.from(file.getId().toString()), out);
}
@Override
public boolean exists() {
return true;
}
@Override
public URL getURL() throws IOException {
throw new IOException("Resource is not available as a descriptor");
}
@Override
public URI getURI() throws IOException {
throw new IOException("Resource is not available as a descriptor");
}
@Override
public File getFile() throws IOException {
throw new IOException("Resource is not available as a File");
}
@Override
public long contentLength() throws IOException {
return file.getSize();
}
@Override
public long lastModified() throws IOException {
return 0;
}
@Override
public Resource createRelative(String relativePath) throws IOException {
throw new IOException("There is no relative Resource.");
}
@Override
public String getFilename() {
return file.getName();
}
@Override
public String getDescription() {
return "OzgCloud File Representation of '%s'.".formatted(file.getName());
}
@Override
public boolean isOpen() {
return true;
}
}
package de.ozgcloud.processor.file;
import org.springframework.core.io.Resource;
import org.springframework.stereotype.Service;
import de.ozgcloud.apilib.file.OzgCloudFileService;
import de.ozgcloud.processor.vorgang.FileMetaData;
import lombok.RequiredArgsConstructor;
@Service
@RequiredArgsConstructor
public class ProcessorFileService {
private final OzgCloudFileService ozgCloudFileService = null;
public Resource getFileData(FileMetaData file) {
return new OzgCloudFileResouce(file, ozgCloudFileService::writeFileDataToStream);
}
}
package de.ozgcloud.processor.file;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class QueueBasedInputStream extends InputStream {
private final BlockingQueue<Integer> queue;
@Override
public int read() throws IOException {
return queue.poll();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
Objects.checkFromIndexSize(off, len, b.length);
return super.read(b, off, len);
}
}
package de.ozgcloud.processor.file;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class QueueBasedOutputStream extends OutputStream {
private final BlockingQueue<Integer> queue;
@Override
public void write(int b) throws IOException {
queue.add(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
Objects.checkFromIndexSize(off, len, b.length);
super.write(b, off, len);
}
@Override
public void close() throws IOException {
super.close();
queue.add(-1);
}
}
......@@ -25,6 +25,7 @@ package de.ozgcloud.processor.processor;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
......@@ -43,9 +44,11 @@ import org.springframework.web.reactive.function.client.WebClient;
import de.ozgcloud.common.errorhandling.TechnicalException;
import de.ozgcloud.processor.ProcessorManagerConfiguration;
import de.ozgcloud.processor.file.ProcessorFileService;
import de.ozgcloud.processor.processor.ProcessorProperties.Form;
import de.ozgcloud.processor.processor.ProcessorProperties.Processor;
import de.ozgcloud.processor.result.ProcessorTechnicalException;
import de.ozgcloud.processor.vorgang.FileGroup;
import de.ozgcloud.processor.vorgang.Vorgang;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
......@@ -61,7 +64,7 @@ public class ProcessorService {
@Qualifier(ProcessorManagerConfiguration.PROCESSOR_PROPERTIES_NAME) // NOSONAR
private final ProcessorProperties properties;
private final ProcessorFileService fileService;
private final WebClient webClient;
public Stream<Mono<ProcessorResult>> processVorgang(Vorgang vorgang) {
......@@ -97,7 +100,7 @@ public class ProcessorService {
LOG.info("Sending Vorgang {} to processors.", vorgang.getId());
return webClient.post()
.uri(processor.getAddress())
.bodyValue(createRequestBody(vorgang))
.bodyValue(createRequestBody(processor, vorgang))
.retrieve()
.onStatus(HttpStatusCode::is3xxRedirection, this::buildRedirectError)
.bodyToMono(ProcessorResult.class)
......@@ -106,12 +109,22 @@ public class ProcessorService {
.doOnSuccess(processorResult -> processorResult.toBuilder().processorName(processor.getName()).build());
}
MultiValueMap<String, HttpEntity<?>> createRequestBody(Vorgang vorgang) { // NOSONAR
MultiValueMap<String, HttpEntity<?>> createRequestBody(Processor processor, Vorgang vorgang) { // NOSONAR
var multipartBodyBuilder = new MultipartBodyBuilder();
multipartBodyBuilder.part(KEY_VORGANG, vorgang, MediaType.APPLICATION_JSON);
if (processor.isTransferAttachments()) {
addAttachments(multipartBodyBuilder, vorgang.getEingang().getAttachments());
}
return multipartBodyBuilder.build();
}
private void addAttachments(MultipartBodyBuilder builder, List<FileGroup> attachments) {
attachments.stream().flatMap(group -> group.getFiles().stream())
.map(file -> fileService.getFileData(file))
.forEach(resource -> builder.part("test", resource));
}
Mono<Throwable> buildRedirectError(ClientResponse clientResponse) {
return Mono.error(new TechnicalException("Resource was moved (%s) ".formatted(clientResponse.statusCode())));
}
......
......@@ -40,9 +40,9 @@ import org.junit.jupiter.params.provider.NullAndEmptySource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.test.context.bean.override.mockito.MockitoSpyBean;
import org.springframework.test.util.ReflectionTestUtils;
import de.ozgcloud.common.test.ITCase;
......@@ -62,7 +62,7 @@ class ProcessorServiceITCase {
@Autowired
private ProcessorService service;
@SpyBean
@MockitoSpyBean
private ResultService resultService;
private Processor processor;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment