001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one or more
003 *  contributor license agreements.  See the NOTICE file distributed with
004 *  this work for additional information regarding copyright ownership.
005 *  The ASF licenses this file to You under the Apache License, Version 2.0
006 *  (the "License"); you may not use this file except in compliance with
007 *  the License.  You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 */
017
018package org.apache.commons.io.input;
019
020import java.io.BufferedInputStream;
021import java.io.IOException;
022import java.io.InputStream;
023
024import org.apache.commons.io.IOUtils;
025import org.apache.commons.io.build.AbstractStreamBuilder;
026
027/**
028 * An unsynchronized version of {@link BufferedInputStream}, not thread-safe.
029 * <p>
030 * Wraps an existing {@link InputStream} and <em>buffers</em> the input. Expensive interaction with the underlying input stream is minimized, since most
031 * (smaller) requests can be satisfied by accessing the buffer alone. The drawback is that some extra space is required to hold the buffer and that copying
032 * takes place when filling that buffer, but this is usually outweighed by the performance benefits.
033 * </p>
034 * <p>
035 * To build an instance, see {@link Builder}.
036 * </p>
037 * <p>
038 * A typical application pattern for the class looks like this:
039 * </p>
040 *
041 * <pre>
042 * UnsynchronizedBufferedInputStream s = new UnsynchronizedBufferedInputStream.Builder().
043 *   .setInputStream(new FileInputStream(&quot;file.java&quot;))
044 *   .setBufferSize(8192)
045 *   .get();
046 * </pre>
047 * <p>
048 * Provenance: Apache Harmony and modified.
049 * </p>
050 *
051 * @see BufferedInputStream
052 * @since 2.12.0
053 */
054//@NotThreadSafe
055public final class UnsynchronizedBufferedInputStream extends UnsynchronizedFilterInputStream {
056
057    /**
058     * Builds a new {@link UnsynchronizedBufferedInputStream} instance.
059     * <p>
060     * Using File IO:
061     * </p>
062     * <pre>{@code
063     * UnsynchronizedBufferedInputStream s = UnsynchronizedBufferedInputStream.builder()
064     *   .setFile(file)
065     *   .setBufferSize(8192)
066     *   .get();}
067     * </pre>
068     * <p>
069     * Using NIO Path:
070     * </p>
071     * <pre>{@code
072     * UnsynchronizedBufferedInputStream s = UnsynchronizedBufferedInputStream.builder()
073     *   .setPath(path)
074     *   .setBufferSize(8192)
075     *   .get();}
076     * </pre>
077     */
078    public static class Builder extends AbstractStreamBuilder<UnsynchronizedBufferedInputStream, Builder> {
079
080        /**
081         * Constructs a new instance.
082         * <p>
083         * This builder use the aspects InputStream, OpenOption[] and buffer size.
084         * </p>
085         * <p>
086         * You must provide an origin that can be converted to an InputStream by this builder, otherwise, this call will throw an
087         * {@link UnsupportedOperationException}.
088         * </p>
089         *
090         * @return a new instance.
091         * @throws UnsupportedOperationException if the origin cannot provide an InputStream.
092         * @see #getInputStream()
093         */
094        @SuppressWarnings("resource") // Caller closes.
095        @Override
096        public UnsynchronizedBufferedInputStream get() throws IOException {
097            return new UnsynchronizedBufferedInputStream(getInputStream(), getBufferSize());
098        }
099
100    }
101
102    /**
103     * The buffer containing the current bytes read from the target InputStream.
104     */
105    protected volatile byte[] buffer;
106
107    /**
108     * The total number of bytes inside the byte array {@code buffer}.
109     */
110    protected int count;
111
112    /**
113     * The current limit, which when passed, invalidates the current mark.
114     */
115    protected int markLimit;
116
117    /**
118     * The currently marked position. -1 indicates no mark has been set or the mark has been invalidated.
119     */
120    protected int markPos = IOUtils.EOF;
121
122    /**
123     * The current position within the byte array {@code buffer}.
124     */
125    protected int pos;
126
127    /**
128     * Constructs a new {@code BufferedInputStream} on the {@link InputStream} {@code in}. The buffer size is specified by the parameter {@code size} and all
129     * reads are now filtered through this stream.
130     *
131     * @param in   the input stream the buffer reads from.
132     * @param size the size of buffer to allocate.
133     * @throws IllegalArgumentException if {@code size < 0}.
134     */
135    private UnsynchronizedBufferedInputStream(final InputStream in, final int size) {
136        super(in);
137        if (size <= 0) {
138            throw new IllegalArgumentException("Size must be > 0");
139        }
140        buffer = new byte[size];
141    }
142
143    /**
144     * Returns the number of bytes that are available before this stream will block. This method returns the number of bytes available in the buffer plus those
145     * available in the source stream.
146     *
147     * @return the number of bytes available before blocking.
148     * @throws IOException if this stream is closed.
149     */
150    @Override
151    public int available() throws IOException {
152        final InputStream localIn = inputStream; // 'in' could be invalidated by close()
153        if (buffer == null || localIn == null) {
154            throw new IOException("Stream is closed");
155        }
156        return count - pos + localIn.available();
157    }
158
159    /**
160     * Closes this stream. The source stream is closed and any resources associated with it are released.
161     *
162     * @throws IOException if an error occurs while closing this stream.
163     */
164    @Override
165    public void close() throws IOException {
166        buffer = null;
167        final InputStream localIn = inputStream;
168        inputStream = null;
169        if (localIn != null) {
170            localIn.close();
171        }
172    }
173
174    private int fillBuffer(final InputStream localIn, byte[] localBuf) throws IOException {
175        if (markPos == IOUtils.EOF || pos - markPos >= markLimit) {
176            /* Mark position not set or exceeded readLimit */
177            final int result = localIn.read(localBuf);
178            if (result > 0) {
179                markPos = IOUtils.EOF;
180                pos = 0;
181                count = result;
182            }
183            return result;
184        }
185        if (markPos == 0 && markLimit > localBuf.length) {
186            /* Increase buffer size to accommodate the readLimit */
187            int newLength = localBuf.length * 2;
188            if (newLength > markLimit) {
189                newLength = markLimit;
190            }
191            final byte[] newbuf = new byte[newLength];
192            System.arraycopy(localBuf, 0, newbuf, 0, localBuf.length);
193            // Reassign buffer, which will invalidate any local references
194            // FIXME: what if buffer was null?
195            localBuf = buffer = newbuf;
196        } else if (markPos > 0) {
197            System.arraycopy(localBuf, markPos, localBuf, 0, localBuf.length - markPos);
198        }
199        // Set the new position and mark position
200        pos -= markPos;
201        count = markPos = 0;
202        final int bytesread = localIn.read(localBuf, pos, localBuf.length - pos);
203        count = bytesread <= 0 ? pos : pos + bytesread;
204        return bytesread;
205    }
206
207    byte[] getBuffer() {
208        return buffer;
209    }
210
211    /**
212     * Sets a mark position in this stream. The parameter {@code readLimit} indicates how many bytes can be read before a mark is invalidated. Calling
213     * {@code reset()} will reposition the stream back to the marked position if {@code readLimit} has not been surpassed. The underlying buffer may be
214     * increased in size to allow {@code readLimit} number of bytes to be supported.
215     *
216     * @param readLimit the number of bytes that can be read before the mark is invalidated.
217     * @see #reset()
218     */
219    @Override
220    public void mark(final int readLimit) {
221        markLimit = readLimit;
222        markPos = pos;
223    }
224
225    /**
226     * Indicates whether {@code BufferedInputStream} supports the {@code mark()} and {@code reset()} methods.
227     *
228     * @return {@code true} for BufferedInputStreams.
229     * @see #mark(int)
230     * @see #reset()
231     */
232    @Override
233    public boolean markSupported() {
234        return true;
235    }
236
237    /**
238     * Reads a single byte from this stream and returns it as an integer in the range from 0 to 255. Returns -1 if the end of the source string has been
239     * reached. If the internal buffer does not contain any available bytes then it is filled from the source stream and the first byte is returned.
240     *
241     * @return the byte read or -1 if the end of the source stream has been reached.
242     * @throws IOException if this stream is closed or another IOException occurs.
243     */
244    @Override
245    public int read() throws IOException {
246        // Use local refs since buf and in may be invalidated by an
247        // unsynchronized close()
248        byte[] localBuf = buffer;
249        final InputStream localIn = inputStream;
250        if (localBuf == null || localIn == null) {
251            throw new IOException("Stream is closed");
252        }
253
254        /* Are there buffered bytes available? */
255        if (pos >= count && fillBuffer(localIn, localBuf) == IOUtils.EOF) {
256            return IOUtils.EOF; /* no, fill buffer */
257        }
258        // localBuf may have been invalidated by fillbuf
259        if (localBuf != buffer) {
260            localBuf = buffer;
261            if (localBuf == null) {
262                throw new IOException("Stream is closed");
263            }
264        }
265
266        /* Did filling the buffer fail with -1 (EOF)? */
267        if (count - pos > 0) {
268            return localBuf[pos++] & 0xFF;
269        }
270        return IOUtils.EOF;
271    }
272
273    /**
274     * Reads at most {@code length} bytes from this stream and stores them in byte array {@code buffer} starting at offset {@code offset}. Returns the number of
275     * bytes actually read or -1 if no bytes were read and the end of the stream was encountered. If all the buffered bytes have been used, a mark has not been
276     * set and the requested number of bytes is larger than the receiver's buffer size, this implementation bypasses the buffer and simply places the results
277     * directly into {@code buffer}.
278     *
279     * @param dest the byte array in which to store the bytes read.
280     * @param offset the initial position in {@code buffer} to store the bytes read from this stream.
281     * @param length the maximum number of bytes to store in {@code buffer}.
282     * @return the number of bytes actually read or -1 if end of stream.
283     * @throws IndexOutOfBoundsException if {@code offset < 0} or {@code length < 0}, or if {@code offset + length} is greater than the size of {@code buffer}.
284     * @throws IOException               if the stream is already closed or another IOException occurs.
285     */
286    @Override
287    public int read(final byte[] dest, int offset, final int length) throws IOException {
288        // Use local ref since buf may be invalidated by an unsynchronized
289        // close()
290        byte[] localBuf = buffer;
291        if (localBuf == null) {
292            throw new IOException("Stream is closed");
293        }
294        // avoid int overflow
295        if (offset > dest.length - length || offset < 0 || length < 0) {
296            throw new IndexOutOfBoundsException();
297        }
298        if (length == 0) {
299            return 0;
300        }
301        final InputStream localIn = inputStream;
302        if (localIn == null) {
303            throw new IOException("Stream is closed");
304        }
305
306        int required;
307        if (pos < count) {
308            /* There are bytes available in the buffer. */
309            final int copylength = count - pos >= length ? length : count - pos;
310            System.arraycopy(localBuf, pos, dest, offset, copylength);
311            pos += copylength;
312            if (copylength == length || localIn.available() == 0) {
313                return copylength;
314            }
315            offset += copylength;
316            required = length - copylength;
317        } else {
318            required = length;
319        }
320
321        while (true) {
322            final int read;
323            /*
324             * If we're not marked and the required size is greater than the buffer, simply read the bytes directly bypassing the buffer.
325             */
326            if (markPos == IOUtils.EOF && required >= localBuf.length) {
327                read = localIn.read(dest, offset, required);
328                if (read == IOUtils.EOF) {
329                    return required == length ? IOUtils.EOF : length - required;
330                }
331            } else {
332                if (fillBuffer(localIn, localBuf) == IOUtils.EOF) {
333                    return required == length ? IOUtils.EOF : length - required;
334                }
335                // localBuf may have been invalidated by fillBuffer()
336                if (localBuf != buffer) {
337                    localBuf = buffer;
338                    if (localBuf == null) {
339                        throw new IOException("Stream is closed");
340                    }
341                }
342
343                read = count - pos >= required ? required : count - pos;
344                System.arraycopy(localBuf, pos, dest, offset, read);
345                pos += read;
346            }
347            required -= read;
348            if (required == 0) {
349                return length;
350            }
351            if (localIn.available() == 0) {
352                return length - required;
353            }
354            offset += read;
355        }
356    }
357
358    /**
359     * Resets this stream to the last marked location.
360     *
361     * @throws IOException if this stream is closed, no mark has been set or the mark is no longer valid because more than {@code readLimit} bytes have been
362     *                     read since setting the mark.
363     * @see #mark(int)
364     */
365    @Override
366    public void reset() throws IOException {
367        if (buffer == null) {
368            throw new IOException("Stream is closed");
369        }
370        if (IOUtils.EOF == markPos) {
371            throw new IOException("Mark has been invalidated");
372        }
373        pos = markPos;
374    }
375
376    /**
377     * Skips {@code amount} number of bytes in this stream. Subsequent {@code read()}'s will not return these bytes unless {@code reset()} is used.
378     *
379     * @param amount the number of bytes to skip. {@code skip} does nothing and returns 0 if {@code amount} is less than zero.
380     * @return the number of bytes actually skipped.
381     * @throws IOException if this stream is closed or another IOException occurs.
382     */
383    @Override
384    public long skip(final long amount) throws IOException {
385        // Use local refs since buf and in may be invalidated by an
386        // unsynchronized close()
387        final byte[] localBuf = buffer;
388        final InputStream localIn = inputStream;
389        if (localBuf == null) {
390            throw new IOException("Stream is closed");
391        }
392        if (amount < 1) {
393            return 0;
394        }
395        if (localIn == null) {
396            throw new IOException("Stream is closed");
397        }
398
399        if (count - pos >= amount) {
400            // (int count - int pos) here is always an int so amount is also in the int range if the above test is true.
401            // We can safely cast to int and avoid static analysis warnings.
402            pos += (int) amount;
403            return amount;
404        }
405        int read = count - pos;
406        pos = count;
407
408        if (markPos != IOUtils.EOF && amount <= markLimit) {
409            if (fillBuffer(localIn, localBuf) == IOUtils.EOF) {
410                return read;
411            }
412            if (count - pos >= amount - read) {
413                // (int count - int pos) here is always an int so (amount - read) is also in the int range if the above test is true.
414                // We can safely cast to int and avoid static analysis warnings.
415                pos += (int) amount - read;
416                return amount;
417            }
418            // Couldn't get all the bytes, skip what we read
419            read += count - pos;
420            pos = count;
421            return read;
422        }
423        return read + localIn.skip(amount - read);
424    }
425}