From a37013e99688d0b10b2e0167509b2728cbdf35ba Mon Sep 17 00:00:00 2001 From: Tobias Bruns <kontakt@tobias-bruns.de> Date: Mon, 10 Feb 2025 09:58:26 +0100 Subject: [PATCH] OZG-7679 add transfer of files --- .../processor/file/OzgCloudFileResouce.java | 106 ++++++++++++++++++ .../processor/file/ProcessorFileService.java | 20 ++++ .../processor/file/QueueBasedInputStream.java | 25 +++++ .../file/QueueBasedOutputStream.java | 32 ++++++ .../processor/processor/ProcessorService.java | 19 +++- .../processor/ProcessorServiceITCase.java | 4 +- 6 files changed, 201 insertions(+), 5 deletions(-) create mode 100644 src/main/java/de/ozgcloud/processor/file/OzgCloudFileResouce.java create mode 100644 src/main/java/de/ozgcloud/processor/file/ProcessorFileService.java create mode 100644 src/main/java/de/ozgcloud/processor/file/QueueBasedInputStream.java create mode 100644 src/main/java/de/ozgcloud/processor/file/QueueBasedOutputStream.java diff --git a/src/main/java/de/ozgcloud/processor/file/OzgCloudFileResouce.java b/src/main/java/de/ozgcloud/processor/file/OzgCloudFileResouce.java new file mode 100644 index 0000000..7264d3e --- /dev/null +++ b/src/main/java/de/ozgcloud/processor/file/OzgCloudFileResouce.java @@ -0,0 +1,106 @@ +package de.ozgcloud.processor.file; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URL; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; + +import org.springframework.core.io.Resource; + +import de.ozgcloud.apilib.file.OzgCloudFileId; +import de.ozgcloud.processor.vorgang.FileMetaData; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +@RequiredArgsConstructor +public class OzgCloudFileResouce implements Resource { + + private final FileMetaData file; + private final BiConsumer<OzgCloudFileId, OutputStream> fileLoadWriter; + + private final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1024); + private final OutputStream out = new QueueBasedOutputStream(queue); + private final InputStream in = new QueueBasedInputStream(queue); + + private AtomicBoolean initialized = new AtomicBoolean(false); + + @Override + public InputStream getInputStream() throws IOException { + if (!initialized.get()) { + initialize(); + } + + return in; + } + + synchronized void initialize() { + if (!initialized.get()) { + new Thread(this::loadAndWriteFile); + } + + initialized.set(true); + } + + private void loadAndWriteFile() { + LOG.debug("starting file loading"); + fileLoadWriter.accept(OzgCloudFileId.from(file.getId().toString()), out); + } + + @Override + public boolean exists() { + return true; + } + + @Override + public URL getURL() throws IOException { + throw new IOException("Resource is not available as a descriptor"); + } + + @Override + public URI getURI() throws IOException { + throw new IOException("Resource is not available as a descriptor"); + } + + @Override + public File getFile() throws IOException { + throw new IOException("Resource is not available as a File"); + } + + @Override + public long contentLength() throws IOException { + return file.getSize(); + } + + @Override + public long lastModified() throws IOException { + return 0; + } + + @Override + public Resource createRelative(String relativePath) throws IOException { + throw new IOException("There is no relative Resource."); + } + + @Override + public String getFilename() { + return file.getName(); + } + + @Override + public String getDescription() { + return "OzgCloud File Representation of '%s'.".formatted(file.getName()); + } + + @Override + public boolean isOpen() { + return true; + } + +} diff --git a/src/main/java/de/ozgcloud/processor/file/ProcessorFileService.java b/src/main/java/de/ozgcloud/processor/file/ProcessorFileService.java new file mode 100644 index 0000000..87fcf60 --- /dev/null +++ b/src/main/java/de/ozgcloud/processor/file/ProcessorFileService.java @@ -0,0 +1,20 @@ +package de.ozgcloud.processor.file; + +import org.springframework.core.io.Resource; +import org.springframework.stereotype.Service; + +import de.ozgcloud.apilib.file.OzgCloudFileService; +import de.ozgcloud.processor.vorgang.FileMetaData; +import lombok.RequiredArgsConstructor; + +@Service +@RequiredArgsConstructor +public class ProcessorFileService { + + private final OzgCloudFileService ozgCloudFileService = null; + + public Resource getFileData(FileMetaData file) { + return new OzgCloudFileResouce(file, ozgCloudFileService::writeFileDataToStream); + } + +} diff --git a/src/main/java/de/ozgcloud/processor/file/QueueBasedInputStream.java b/src/main/java/de/ozgcloud/processor/file/QueueBasedInputStream.java new file mode 100644 index 0000000..b6a1441 --- /dev/null +++ b/src/main/java/de/ozgcloud/processor/file/QueueBasedInputStream.java @@ -0,0 +1,25 @@ +package de.ozgcloud.processor.file; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; + +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public class QueueBasedInputStream extends InputStream { + + private final BlockingQueue<Integer> queue; + + @Override + public int read() throws IOException { + return queue.poll(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + Objects.checkFromIndexSize(off, len, b.length); + return super.read(b, off, len); + } +} diff --git a/src/main/java/de/ozgcloud/processor/file/QueueBasedOutputStream.java b/src/main/java/de/ozgcloud/processor/file/QueueBasedOutputStream.java new file mode 100644 index 0000000..36a090b --- /dev/null +++ b/src/main/java/de/ozgcloud/processor/file/QueueBasedOutputStream.java @@ -0,0 +1,32 @@ +package de.ozgcloud.processor.file; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; + +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public class QueueBasedOutputStream extends OutputStream { + + private final BlockingQueue<Integer> queue; + + @Override + public void write(int b) throws IOException { + queue.add(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + Objects.checkFromIndexSize(off, len, b.length); + super.write(b, off, len); + } + + @Override + public void close() throws IOException { + super.close(); + queue.add(-1); + } + +} diff --git a/src/main/java/de/ozgcloud/processor/processor/ProcessorService.java b/src/main/java/de/ozgcloud/processor/processor/ProcessorService.java index ff4d223..e6ec350 100644 --- a/src/main/java/de/ozgcloud/processor/processor/ProcessorService.java +++ b/src/main/java/de/ozgcloud/processor/processor/ProcessorService.java @@ -25,6 +25,7 @@ package de.ozgcloud.processor.processor; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.stream.Stream; @@ -43,9 +44,11 @@ import org.springframework.web.reactive.function.client.WebClient; import de.ozgcloud.common.errorhandling.TechnicalException; import de.ozgcloud.processor.ProcessorManagerConfiguration; +import de.ozgcloud.processor.file.ProcessorFileService; import de.ozgcloud.processor.processor.ProcessorProperties.Form; import de.ozgcloud.processor.processor.ProcessorProperties.Processor; import de.ozgcloud.processor.result.ProcessorTechnicalException; +import de.ozgcloud.processor.vorgang.FileGroup; import de.ozgcloud.processor.vorgang.Vorgang; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; @@ -61,7 +64,7 @@ public class ProcessorService { @Qualifier(ProcessorManagerConfiguration.PROCESSOR_PROPERTIES_NAME) // NOSONAR private final ProcessorProperties properties; - + private final ProcessorFileService fileService; private final WebClient webClient; public Stream<Mono<ProcessorResult>> processVorgang(Vorgang vorgang) { @@ -97,7 +100,7 @@ public class ProcessorService { LOG.info("Sending Vorgang {} to processors.", vorgang.getId()); return webClient.post() .uri(processor.getAddress()) - .bodyValue(createRequestBody(vorgang)) + .bodyValue(createRequestBody(processor, vorgang)) .retrieve() .onStatus(HttpStatusCode::is3xxRedirection, this::buildRedirectError) .bodyToMono(ProcessorResult.class) @@ -106,12 +109,22 @@ public class ProcessorService { .doOnSuccess(processorResult -> processorResult.toBuilder().processorName(processor.getName()).build()); } - MultiValueMap<String, HttpEntity<?>> createRequestBody(Vorgang vorgang) { // NOSONAR + MultiValueMap<String, HttpEntity<?>> createRequestBody(Processor processor, Vorgang vorgang) { // NOSONAR var multipartBodyBuilder = new MultipartBodyBuilder(); multipartBodyBuilder.part(KEY_VORGANG, vorgang, MediaType.APPLICATION_JSON); + if (processor.isTransferAttachments()) { + addAttachments(multipartBodyBuilder, vorgang.getEingang().getAttachments()); + } return multipartBodyBuilder.build(); } + private void addAttachments(MultipartBodyBuilder builder, List<FileGroup> attachments) { + attachments.stream().flatMap(group -> group.getFiles().stream()) + .map(file -> fileService.getFileData(file)) + .forEach(resource -> builder.part("test", resource)); + + } + Mono<Throwable> buildRedirectError(ClientResponse clientResponse) { return Mono.error(new TechnicalException("Resource was moved (%s) ".formatted(clientResponse.statusCode()))); } diff --git a/src/test/java/de/ozgcloud/processor/processor/ProcessorServiceITCase.java b/src/test/java/de/ozgcloud/processor/processor/ProcessorServiceITCase.java index c056162..6bb5f06 100644 --- a/src/test/java/de/ozgcloud/processor/processor/ProcessorServiceITCase.java +++ b/src/test/java/de/ozgcloud/processor/processor/ProcessorServiceITCase.java @@ -40,9 +40,9 @@ import org.junit.jupiter.params.provider.NullAndEmptySource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; +import org.springframework.test.context.bean.override.mockito.MockitoSpyBean; import org.springframework.test.util.ReflectionTestUtils; import de.ozgcloud.common.test.ITCase; @@ -62,7 +62,7 @@ class ProcessorServiceITCase { @Autowired private ProcessorService service; - @SpyBean + @MockitoSpyBean private ResultService resultService; private Processor processor; -- GitLab