001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import static org.apache.commons.io.IOUtils.EOF;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.PipedInputStream;
024import java.io.PipedOutputStream;
025import java.time.Duration;
026import java.util.Objects;
027import java.util.concurrent.BlockingQueue;
028import java.util.concurrent.LinkedBlockingQueue;
029import java.util.concurrent.TimeUnit;
030
031import org.apache.commons.io.build.AbstractStreamBuilder;
032import org.apache.commons.io.output.QueueOutputStream;
033
034/**
035 * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue output stream.
036 * <p>
037 * To build an instance, see {@link Builder}.
038 * </p>
039 * <p>
040 * Example usage:
041 * </p>
042 *
043 * <pre>
044 * QueueInputStream inputStream = new QueueInputStream();
045 * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
046 *
047 * outputStream.write("hello world".getBytes(UTF_8));
048 * inputStream.read();
049 * </pre>
050 * <p>
051 * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a single thread or multiple threads.
052 * Also, unlike JDK classes, no special meaning is attached to initial or current thread. Instances can be used longer after initial threads exited.
053 * </p>
054 * <p>
055 * Closing a {@link QueueInputStream} has no effect. The methods in this class can be called after the stream has been closed without generating an
056 * {@link IOException}.
057 * </p>
058 *
059 * @see QueueOutputStream
060 * @since 2.9.0
061 */
062public class QueueInputStream extends InputStream {
063
064    /**
065     * Builds a new {@link QueueInputStream} instance.
066     * <p>
067     * For example:
068     * </p>
069     *
070     * <pre>{@code
071     * QueueInputStream s = QueueInputStream.builder()
072     *   .setBlockingQueue(new LinkedBlockingQueue<>())
073     *   .setTimeout(Duration.ZERO)
074     *   .get();}
075     * </pre>
076     *
077     * @since 2.12.0
078     */
079    public static class Builder extends AbstractStreamBuilder<QueueInputStream, Builder> {
080
081        private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
082        private Duration timeout = Duration.ZERO;
083
084        /**
085         * Constructs a new instance.
086         * <p>
087         * This builder use the aspects BlockingQueue and timeout.
088         * </p>
089         *
090         * @return a new instance.
091         */
092        @Override
093        public QueueInputStream get() {
094            return new QueueInputStream(blockingQueue, timeout);
095        }
096
097        /**
098         * Sets backing queue for the stream.
099         *
100         * @param blockingQueue backing queue for the stream.
101         * @return this
102         */
103        public Builder setBlockingQueue(final BlockingQueue<Integer> blockingQueue) {
104            this.blockingQueue = blockingQueue != null ? blockingQueue : new LinkedBlockingQueue<>();
105            return this;
106        }
107
108        /**
109         * Sets the polling timeout.
110         *
111         * @param timeout the polling timeout.
112         * @return this.
113         */
114        public Builder setTimeout(final Duration timeout) {
115            if (timeout != null && timeout.toNanos() < 0) {
116                throw new IllegalArgumentException("timeout must not be negative");
117            }
118            this.timeout = timeout != null ? timeout : Duration.ZERO;
119            return this;
120        }
121
122    }
123
124    /**
125     * Constructs a new {@link Builder}.
126     *
127     * @return a new {@link Builder}.
128     * @since 2.12.0
129     */
130    public static Builder builder() {
131        return new Builder();
132    }
133
134    private final BlockingQueue<Integer> blockingQueue;
135
136    private final long timeoutNanos;
137
138    /**
139     * Constructs a new instance with no limit to its internal queue size and zero timeout.
140     */
141    public QueueInputStream() {
142        this(new LinkedBlockingQueue<>());
143    }
144
145    /**
146     * Constructs a new instance with given queue and zero timeout.
147     *
148     * @param blockingQueue backing queue for the stream.
149     * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()}.
150     */
151    @Deprecated
152    public QueueInputStream(final BlockingQueue<Integer> blockingQueue) {
153        this(blockingQueue, Duration.ZERO);
154    }
155
156    /**
157     * Constructs a new instance with given queue and timeout.
158     *
159     * @param blockingQueue backing queue for the stream.
160     * @param timeout       how long to wait before giving up when polling the queue.
161     */
162    private QueueInputStream(final BlockingQueue<Integer> blockingQueue, final Duration timeout) {
163        this.blockingQueue = Objects.requireNonNull(blockingQueue, "blockingQueue");
164        this.timeoutNanos = Objects.requireNonNull(timeout, "timeout").toNanos();
165    }
166
167    /**
168     * Gets the blocking queue.
169     *
170     * @return the blocking queue.
171     */
172    BlockingQueue<Integer> getBlockingQueue() {
173        return blockingQueue;
174    }
175
176    /**
177     * Gets the timeout duration.
178     *
179     * @return the timeout duration.
180     */
181    Duration getTimeout() {
182        return Duration.ofNanos(timeoutNanos);
183    }
184
185    /**
186     * Constructs a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream.
187     *
188     * @return QueueOutputStream connected to this stream.
189     */
190    public QueueOutputStream newQueueOutputStream() {
191        return new QueueOutputStream(blockingQueue);
192    }
193
194    /**
195     * Reads and returns a single byte.
196     *
197     * @return the byte read, or {@code -1} if a timeout occurs before a queue element is available.
198     * @throws IllegalStateException if thread is interrupted while waiting.
199     */
200    @Override
201    public int read() {
202        try {
203            final Integer value = blockingQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS);
204            return value == null ? EOF : 0xFF & value;
205        } catch (final InterruptedException e) {
206            Thread.currentThread().interrupt();
207            // throw runtime unchecked exception to maintain signature backward-compatibility of
208            // this read method, which does not declare IOException
209            throw new IllegalStateException(e);
210        }
211    }
212
213}