001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.commons.io.input; 019 020import java.io.BufferedInputStream; 021import java.io.IOException; 022import java.io.InputStream; 023 024import org.apache.commons.io.IOUtils; 025import org.apache.commons.io.build.AbstractStreamBuilder; 026 027/** 028 * An unsynchronized version of {@link BufferedInputStream}, not thread-safe. 029 * <p> 030 * Wraps an existing {@link InputStream} and <em>buffers</em> the input. Expensive interaction with the underlying input stream is minimized, since most 031 * (smaller) requests can be satisfied by accessing the buffer alone. The drawback is that some extra space is required to hold the buffer and that copying 032 * takes place when filling that buffer, but this is usually outweighed by the performance benefits. 033 * </p> 034 * <p> 035 * To build an instance, see {@link Builder}. 036 * </p> 037 * <p> 038 * A typical application pattern for the class looks like this: 039 * </p> 040 * 041 * <pre> 042 * UnsynchronizedBufferedInputStream s = new UnsynchronizedBufferedInputStream.Builder(). 043 * .setInputStream(new FileInputStream("file.java")) 044 * .setBufferSize(8192) 045 * .get(); 046 * </pre> 047 * <p> 048 * Provenance: Apache Harmony and modified. 049 * </p> 050 * 051 * @see BufferedInputStream 052 * @since 2.12.0 053 */ 054//@NotThreadSafe 055public final class UnsynchronizedBufferedInputStream extends UnsynchronizedFilterInputStream { 056 057 /** 058 * Builds a new {@link UnsynchronizedBufferedInputStream} instance. 059 * <p> 060 * Using File IO: 061 * </p> 062 * <pre>{@code 063 * UnsynchronizedBufferedInputStream s = UnsynchronizedBufferedInputStream.builder() 064 * .setFile(file) 065 * .setBufferSize(8192) 066 * .get();} 067 * </pre> 068 * <p> 069 * Using NIO Path: 070 * </p> 071 * <pre>{@code 072 * UnsynchronizedBufferedInputStream s = UnsynchronizedBufferedInputStream.builder() 073 * .setPath(path) 074 * .setBufferSize(8192) 075 * .get();} 076 * </pre> 077 */ 078 public static class Builder extends AbstractStreamBuilder<UnsynchronizedBufferedInputStream, Builder> { 079 080 /** 081 * Constructs a new instance. 082 * <p> 083 * This builder use the aspects InputStream, OpenOption[] and buffer size. 084 * </p> 085 * <p> 086 * You must provide an origin that can be converted to an InputStream by this builder, otherwise, this call will throw an 087 * {@link UnsupportedOperationException}. 088 * </p> 089 * 090 * @return a new instance. 091 * @throws UnsupportedOperationException if the origin cannot provide an InputStream. 092 * @see #getInputStream() 093 */ 094 @SuppressWarnings("resource") // Caller closes. 095 @Override 096 public UnsynchronizedBufferedInputStream get() throws IOException { 097 return new UnsynchronizedBufferedInputStream(getInputStream(), getBufferSize()); 098 } 099 100 } 101 102 /** 103 * The buffer containing the current bytes read from the target InputStream. 104 */ 105 protected volatile byte[] buffer; 106 107 /** 108 * The total number of bytes inside the byte array {@code buffer}. 109 */ 110 protected int count; 111 112 /** 113 * The current limit, which when passed, invalidates the current mark. 114 */ 115 protected int markLimit; 116 117 /** 118 * The currently marked position. -1 indicates no mark has been set or the mark has been invalidated. 119 */ 120 protected int markPos = IOUtils.EOF; 121 122 /** 123 * The current position within the byte array {@code buffer}. 124 */ 125 protected int pos; 126 127 /** 128 * Constructs a new {@code BufferedInputStream} on the {@link InputStream} {@code in}. The buffer size is specified by the parameter {@code size} and all 129 * reads are now filtered through this stream. 130 * 131 * @param in the input stream the buffer reads from. 132 * @param size the size of buffer to allocate. 133 * @throws IllegalArgumentException if {@code size < 0}. 134 */ 135 private UnsynchronizedBufferedInputStream(final InputStream in, final int size) { 136 super(in); 137 if (size <= 0) { 138 throw new IllegalArgumentException("Size must be > 0"); 139 } 140 buffer = new byte[size]; 141 } 142 143 /** 144 * Returns the number of bytes that are available before this stream will block. This method returns the number of bytes available in the buffer plus those 145 * available in the source stream. 146 * 147 * @return the number of bytes available before blocking. 148 * @throws IOException if this stream is closed. 149 */ 150 @Override 151 public int available() throws IOException { 152 final InputStream localIn = inputStream; // 'in' could be invalidated by close() 153 if (buffer == null || localIn == null) { 154 throw new IOException("Stream is closed"); 155 } 156 return count - pos + localIn.available(); 157 } 158 159 /** 160 * Closes this stream. The source stream is closed and any resources associated with it are released. 161 * 162 * @throws IOException if an error occurs while closing this stream. 163 */ 164 @Override 165 public void close() throws IOException { 166 buffer = null; 167 final InputStream localIn = inputStream; 168 inputStream = null; 169 if (localIn != null) { 170 localIn.close(); 171 } 172 } 173 174 private int fillBuffer(final InputStream localIn, byte[] localBuf) throws IOException { 175 if (markPos == IOUtils.EOF || pos - markPos >= markLimit) { 176 /* Mark position not set or exceeded readLimit */ 177 final int result = localIn.read(localBuf); 178 if (result > 0) { 179 markPos = IOUtils.EOF; 180 pos = 0; 181 count = result; 182 } 183 return result; 184 } 185 if (markPos == 0 && markLimit > localBuf.length) { 186 /* Increase buffer size to accommodate the readLimit */ 187 int newLength = localBuf.length * 2; 188 if (newLength > markLimit) { 189 newLength = markLimit; 190 } 191 final byte[] newbuf = new byte[newLength]; 192 System.arraycopy(localBuf, 0, newbuf, 0, localBuf.length); 193 // Reassign buffer, which will invalidate any local references 194 // FIXME: what if buffer was null? 195 localBuf = buffer = newbuf; 196 } else if (markPos > 0) { 197 System.arraycopy(localBuf, markPos, localBuf, 0, localBuf.length - markPos); 198 } 199 // Set the new position and mark position 200 pos -= markPos; 201 count = markPos = 0; 202 final int bytesread = localIn.read(localBuf, pos, localBuf.length - pos); 203 count = bytesread <= 0 ? pos : pos + bytesread; 204 return bytesread; 205 } 206 207 byte[] getBuffer() { 208 return buffer; 209 } 210 211 /** 212 * Sets a mark position in this stream. The parameter {@code readLimit} indicates how many bytes can be read before a mark is invalidated. Calling 213 * {@code reset()} will reposition the stream back to the marked position if {@code readLimit} has not been surpassed. The underlying buffer may be 214 * increased in size to allow {@code readLimit} number of bytes to be supported. 215 * 216 * @param readLimit the number of bytes that can be read before the mark is invalidated. 217 * @see #reset() 218 */ 219 @Override 220 public void mark(final int readLimit) { 221 markLimit = readLimit; 222 markPos = pos; 223 } 224 225 /** 226 * Indicates whether {@code BufferedInputStream} supports the {@code mark()} and {@code reset()} methods. 227 * 228 * @return {@code true} for BufferedInputStreams. 229 * @see #mark(int) 230 * @see #reset() 231 */ 232 @Override 233 public boolean markSupported() { 234 return true; 235 } 236 237 /** 238 * Reads a single byte from this stream and returns it as an integer in the range from 0 to 255. Returns -1 if the end of the source string has been 239 * reached. If the internal buffer does not contain any available bytes then it is filled from the source stream and the first byte is returned. 240 * 241 * @return the byte read or -1 if the end of the source stream has been reached. 242 * @throws IOException if this stream is closed or another IOException occurs. 243 */ 244 @Override 245 public int read() throws IOException { 246 // Use local refs since buf and in may be invalidated by an 247 // unsynchronized close() 248 byte[] localBuf = buffer; 249 final InputStream localIn = inputStream; 250 if (localBuf == null || localIn == null) { 251 throw new IOException("Stream is closed"); 252 } 253 254 /* Are there buffered bytes available? */ 255 if (pos >= count && fillBuffer(localIn, localBuf) == IOUtils.EOF) { 256 return IOUtils.EOF; /* no, fill buffer */ 257 } 258 // localBuf may have been invalidated by fillbuf 259 if (localBuf != buffer) { 260 localBuf = buffer; 261 if (localBuf == null) { 262 throw new IOException("Stream is closed"); 263 } 264 } 265 266 /* Did filling the buffer fail with -1 (EOF)? */ 267 if (count - pos > 0) { 268 return localBuf[pos++] & 0xFF; 269 } 270 return IOUtils.EOF; 271 } 272 273 /** 274 * Reads at most {@code length} bytes from this stream and stores them in byte array {@code buffer} starting at offset {@code offset}. Returns the number of 275 * bytes actually read or -1 if no bytes were read and the end of the stream was encountered. If all the buffered bytes have been used, a mark has not been 276 * set and the requested number of bytes is larger than the receiver's buffer size, this implementation bypasses the buffer and simply places the results 277 * directly into {@code buffer}. 278 * 279 * @param dest the byte array in which to store the bytes read. 280 * @param offset the initial position in {@code buffer} to store the bytes read from this stream. 281 * @param length the maximum number of bytes to store in {@code buffer}. 282 * @return the number of bytes actually read or -1 if end of stream. 283 * @throws IndexOutOfBoundsException if {@code offset < 0} or {@code length < 0}, or if {@code offset + length} is greater than the size of {@code buffer}. 284 * @throws IOException if the stream is already closed or another IOException occurs. 285 */ 286 @Override 287 public int read(final byte[] dest, int offset, final int length) throws IOException { 288 // Use local ref since buf may be invalidated by an unsynchronized 289 // close() 290 byte[] localBuf = buffer; 291 if (localBuf == null) { 292 throw new IOException("Stream is closed"); 293 } 294 // avoid int overflow 295 if (offset > dest.length - length || offset < 0 || length < 0) { 296 throw new IndexOutOfBoundsException(); 297 } 298 if (length == 0) { 299 return 0; 300 } 301 final InputStream localIn = inputStream; 302 if (localIn == null) { 303 throw new IOException("Stream is closed"); 304 } 305 306 int required; 307 if (pos < count) { 308 /* There are bytes available in the buffer. */ 309 final int copylength = count - pos >= length ? length : count - pos; 310 System.arraycopy(localBuf, pos, dest, offset, copylength); 311 pos += copylength; 312 if (copylength == length || localIn.available() == 0) { 313 return copylength; 314 } 315 offset += copylength; 316 required = length - copylength; 317 } else { 318 required = length; 319 } 320 321 while (true) { 322 final int read; 323 /* 324 * If we're not marked and the required size is greater than the buffer, simply read the bytes directly bypassing the buffer. 325 */ 326 if (markPos == IOUtils.EOF && required >= localBuf.length) { 327 read = localIn.read(dest, offset, required); 328 if (read == IOUtils.EOF) { 329 return required == length ? IOUtils.EOF : length - required; 330 } 331 } else { 332 if (fillBuffer(localIn, localBuf) == IOUtils.EOF) { 333 return required == length ? IOUtils.EOF : length - required; 334 } 335 // localBuf may have been invalidated by fillBuffer() 336 if (localBuf != buffer) { 337 localBuf = buffer; 338 if (localBuf == null) { 339 throw new IOException("Stream is closed"); 340 } 341 } 342 343 read = count - pos >= required ? required : count - pos; 344 System.arraycopy(localBuf, pos, dest, offset, read); 345 pos += read; 346 } 347 required -= read; 348 if (required == 0) { 349 return length; 350 } 351 if (localIn.available() == 0) { 352 return length - required; 353 } 354 offset += read; 355 } 356 } 357 358 /** 359 * Resets this stream to the last marked location. 360 * 361 * @throws IOException if this stream is closed, no mark has been set or the mark is no longer valid because more than {@code readLimit} bytes have been 362 * read since setting the mark. 363 * @see #mark(int) 364 */ 365 @Override 366 public void reset() throws IOException { 367 if (buffer == null) { 368 throw new IOException("Stream is closed"); 369 } 370 if (IOUtils.EOF == markPos) { 371 throw new IOException("Mark has been invalidated"); 372 } 373 pos = markPos; 374 } 375 376 /** 377 * Skips {@code amount} number of bytes in this stream. Subsequent {@code read()}'s will not return these bytes unless {@code reset()} is used. 378 * 379 * @param amount the number of bytes to skip. {@code skip} does nothing and returns 0 if {@code amount} is less than zero. 380 * @return the number of bytes actually skipped. 381 * @throws IOException if this stream is closed or another IOException occurs. 382 */ 383 @Override 384 public long skip(final long amount) throws IOException { 385 // Use local refs since buf and in may be invalidated by an 386 // unsynchronized close() 387 final byte[] localBuf = buffer; 388 final InputStream localIn = inputStream; 389 if (localBuf == null) { 390 throw new IOException("Stream is closed"); 391 } 392 if (amount < 1) { 393 return 0; 394 } 395 if (localIn == null) { 396 throw new IOException("Stream is closed"); 397 } 398 399 if (count - pos >= amount) { 400 // (int count - int pos) here is always an int so amount is also in the int range if the above test is true. 401 // We can safely cast to int and avoid static analysis warnings. 402 pos += (int) amount; 403 return amount; 404 } 405 int read = count - pos; 406 pos = count; 407 408 if (markPos != IOUtils.EOF && amount <= markLimit) { 409 if (fillBuffer(localIn, localBuf) == IOUtils.EOF) { 410 return read; 411 } 412 if (count - pos >= amount - read) { 413 // (int count - int pos) here is always an int so (amount - read) is also in the int range if the above test is true. 414 // We can safely cast to int and avoid static analysis warnings. 415 pos += (int) amount - read; 416 return amount; 417 } 418 // Couldn't get all the bytes, skip what we read 419 read += count - pos; 420 pos = count; 421 return read; 422 } 423 return read + localIn.skip(amount - read); 424 } 425}