diff --git a/src/main/java/de/ozgcloud/processor/file/OzgCloudFileResouce.java b/src/main/java/de/ozgcloud/processor/file/OzgCloudFileResouce.java new file mode 100644 index 0000000000000000000000000000000000000000..7264d3eab3ac87bec8ec9d7cecc8e9876289e0a2 --- /dev/null +++ b/src/main/java/de/ozgcloud/processor/file/OzgCloudFileResouce.java @@ -0,0 +1,106 @@ +package de.ozgcloud.processor.file; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URL; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; + +import org.springframework.core.io.Resource; + +import de.ozgcloud.apilib.file.OzgCloudFileId; +import de.ozgcloud.processor.vorgang.FileMetaData; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +@RequiredArgsConstructor +public class OzgCloudFileResouce implements Resource { + + private final FileMetaData file; + private final BiConsumer<OzgCloudFileId, OutputStream> fileLoadWriter; + + private final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1024); + private final OutputStream out = new QueueBasedOutputStream(queue); + private final InputStream in = new QueueBasedInputStream(queue); + + private AtomicBoolean initialized = new AtomicBoolean(false); + + @Override + public InputStream getInputStream() throws IOException { + if (!initialized.get()) { + initialize(); + } + + return in; + } + + synchronized void initialize() { + if (!initialized.get()) { + new Thread(this::loadAndWriteFile); + } + + initialized.set(true); + } + + private void loadAndWriteFile() { + LOG.debug("starting file loading"); + fileLoadWriter.accept(OzgCloudFileId.from(file.getId().toString()), out); + } + + @Override + public boolean exists() { + return true; + } + + @Override + public URL getURL() throws IOException { + throw new IOException("Resource is not available as a descriptor"); + } + + @Override + public URI getURI() throws IOException { + throw new IOException("Resource is not available as a descriptor"); + } + + @Override + public File getFile() throws IOException { + throw new IOException("Resource is not available as a File"); + } + + @Override + public long contentLength() throws IOException { + return file.getSize(); + } + + @Override + public long lastModified() throws IOException { + return 0; + } + + @Override + public Resource createRelative(String relativePath) throws IOException { + throw new IOException("There is no relative Resource."); + } + + @Override + public String getFilename() { + return file.getName(); + } + + @Override + public String getDescription() { + return "OzgCloud File Representation of '%s'.".formatted(file.getName()); + } + + @Override + public boolean isOpen() { + return true; + } + +} diff --git a/src/main/java/de/ozgcloud/processor/file/ProcessorFileService.java b/src/main/java/de/ozgcloud/processor/file/ProcessorFileService.java new file mode 100644 index 0000000000000000000000000000000000000000..87fcf6097d1dff117c8bf5b94ec6f96459a3796d --- /dev/null +++ b/src/main/java/de/ozgcloud/processor/file/ProcessorFileService.java @@ -0,0 +1,20 @@ +package de.ozgcloud.processor.file; + +import org.springframework.core.io.Resource; +import org.springframework.stereotype.Service; + +import de.ozgcloud.apilib.file.OzgCloudFileService; +import de.ozgcloud.processor.vorgang.FileMetaData; +import lombok.RequiredArgsConstructor; + +@Service +@RequiredArgsConstructor +public class ProcessorFileService { + + private final OzgCloudFileService ozgCloudFileService = null; + + public Resource getFileData(FileMetaData file) { + return new OzgCloudFileResouce(file, ozgCloudFileService::writeFileDataToStream); + } + +} diff --git a/src/main/java/de/ozgcloud/processor/file/QueueBasedInputStream.java b/src/main/java/de/ozgcloud/processor/file/QueueBasedInputStream.java new file mode 100644 index 0000000000000000000000000000000000000000..b6a1441a889af87ed79c928b95c3642135910348 --- /dev/null +++ b/src/main/java/de/ozgcloud/processor/file/QueueBasedInputStream.java @@ -0,0 +1,25 @@ +package de.ozgcloud.processor.file; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; + +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public class QueueBasedInputStream extends InputStream { + + private final BlockingQueue<Integer> queue; + + @Override + public int read() throws IOException { + return queue.poll(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + Objects.checkFromIndexSize(off, len, b.length); + return super.read(b, off, len); + } +} diff --git a/src/main/java/de/ozgcloud/processor/file/QueueBasedOutputStream.java b/src/main/java/de/ozgcloud/processor/file/QueueBasedOutputStream.java new file mode 100644 index 0000000000000000000000000000000000000000..36a090b1233ae3e622cc11e6bbf5f77387049036 --- /dev/null +++ b/src/main/java/de/ozgcloud/processor/file/QueueBasedOutputStream.java @@ -0,0 +1,32 @@ +package de.ozgcloud.processor.file; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; + +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public class QueueBasedOutputStream extends OutputStream { + + private final BlockingQueue<Integer> queue; + + @Override + public void write(int b) throws IOException { + queue.add(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + Objects.checkFromIndexSize(off, len, b.length); + super.write(b, off, len); + } + + @Override + public void close() throws IOException { + super.close(); + queue.add(-1); + } + +} diff --git a/src/main/java/de/ozgcloud/processor/processor/ProcessorService.java b/src/main/java/de/ozgcloud/processor/processor/ProcessorService.java index ff4d2230dfaf2883bf85450f2c0ea065ad20e4a6..e6ec35079d59a2fe9526a1e2d9255f454cb77ba2 100644 --- a/src/main/java/de/ozgcloud/processor/processor/ProcessorService.java +++ b/src/main/java/de/ozgcloud/processor/processor/ProcessorService.java @@ -25,6 +25,7 @@ package de.ozgcloud.processor.processor; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.stream.Stream; @@ -43,9 +44,11 @@ import org.springframework.web.reactive.function.client.WebClient; import de.ozgcloud.common.errorhandling.TechnicalException; import de.ozgcloud.processor.ProcessorManagerConfiguration; +import de.ozgcloud.processor.file.ProcessorFileService; import de.ozgcloud.processor.processor.ProcessorProperties.Form; import de.ozgcloud.processor.processor.ProcessorProperties.Processor; import de.ozgcloud.processor.result.ProcessorTechnicalException; +import de.ozgcloud.processor.vorgang.FileGroup; import de.ozgcloud.processor.vorgang.Vorgang; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; @@ -61,7 +64,7 @@ public class ProcessorService { @Qualifier(ProcessorManagerConfiguration.PROCESSOR_PROPERTIES_NAME) // NOSONAR private final ProcessorProperties properties; - + private final ProcessorFileService fileService; private final WebClient webClient; public Stream<Mono<ProcessorResult>> processVorgang(Vorgang vorgang) { @@ -97,7 +100,7 @@ public class ProcessorService { LOG.info("Sending Vorgang {} to processors.", vorgang.getId()); return webClient.post() .uri(processor.getAddress()) - .bodyValue(createRequestBody(vorgang)) + .bodyValue(createRequestBody(processor, vorgang)) .retrieve() .onStatus(HttpStatusCode::is3xxRedirection, this::buildRedirectError) .bodyToMono(ProcessorResult.class) @@ -106,12 +109,22 @@ public class ProcessorService { .doOnSuccess(processorResult -> processorResult.toBuilder().processorName(processor.getName()).build()); } - MultiValueMap<String, HttpEntity<?>> createRequestBody(Vorgang vorgang) { // NOSONAR + MultiValueMap<String, HttpEntity<?>> createRequestBody(Processor processor, Vorgang vorgang) { // NOSONAR var multipartBodyBuilder = new MultipartBodyBuilder(); multipartBodyBuilder.part(KEY_VORGANG, vorgang, MediaType.APPLICATION_JSON); + if (processor.isTransferAttachments()) { + addAttachments(multipartBodyBuilder, vorgang.getEingang().getAttachments()); + } return multipartBodyBuilder.build(); } + private void addAttachments(MultipartBodyBuilder builder, List<FileGroup> attachments) { + attachments.stream().flatMap(group -> group.getFiles().stream()) + .map(file -> fileService.getFileData(file)) + .forEach(resource -> builder.part("test", resource)); + + } + Mono<Throwable> buildRedirectError(ClientResponse clientResponse) { return Mono.error(new TechnicalException("Resource was moved (%s) ".formatted(clientResponse.statusCode()))); } diff --git a/src/test/java/de/ozgcloud/processor/processor/ProcessorServiceITCase.java b/src/test/java/de/ozgcloud/processor/processor/ProcessorServiceITCase.java index c05616259f4ad99759753c1ad796e7f358946027..6bb5f06e65e66a0265d319138bd690cebd89b27b 100644 --- a/src/test/java/de/ozgcloud/processor/processor/ProcessorServiceITCase.java +++ b/src/test/java/de/ozgcloud/processor/processor/ProcessorServiceITCase.java @@ -40,9 +40,9 @@ import org.junit.jupiter.params.provider.NullAndEmptySource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; +import org.springframework.test.context.bean.override.mockito.MockitoSpyBean; import org.springframework.test.util.ReflectionTestUtils; import de.ozgcloud.common.test.ITCase; @@ -62,7 +62,7 @@ class ProcessorServiceITCase { @Autowired private ProcessorService service; - @SpyBean + @MockitoSpyBean private ResultService resultService; private Processor processor;