BUG-并行流与数组下标越界-思考与总结
今天线上环境报异常,发现了一个之前没注意过的问题,记录一下。
1. 异常信息
异常信息如下:
···
Caused by: java.lang.ArrayIndexOutOfBoundsException
at java.lang.String.getChars(String.java:826)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:449)
at java.lang.StringBuilder.append(StringBuilder.java:136)
···
产生bug的代码改写后如下:
public static void main(String[] args) {
List lists = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
lists.add(i);
}
for (int i = 0; i < 100; i++) {
StringBuilder sb = new StringBuilder();
// StringBuffer sb = new StringBuffer();
lists.parallelStream().forEach(p -> {
sb.append(p);
// 可以明显看到,拼接的字符串长度越大,异常越容易发生
sb.append("----------------------------------------");
// stringBuilder.append("-");
});
System.out.println(i + ": " + sb.toString());
}
}
2. 异常追踪分析
从上面的信息可以看出,是StringBuilder.append使用时,产生了数组下标越界异常。下面是代码追踪:
@Override
public StringBuilder append(String str) {
// 1
super.append(str);
return this;
}
public AbstractStringBuilder append(String str) {
if (str == null)
return appendNull();
int len = str.length();
// 2.1
// 检查现有字符串加上要拼接的字符串以后,长度是否超出内部数组的最大长度,如果超出,则会分配一个新的内部数组,确保数组能装的下拼接后的字符串
ensureCapacityInternal(count + len);
// 2.2
str.getChars(0, len, value, count);
count += len;
return this;
}
public void getChars(int srcBegin, int srcEnd, char dst[], int dstBegin) {
if (srcBegin < 0) {
throw new StringIndexOutOfBoundsException(srcBegin);
}
if (srcEnd > value.length) {
throw new StringIndexOutOfBoundsException(srcEnd);
}
if (srcBegin > srcEnd) {
throw new StringIndexOutOfBoundsException(srcEnd - srcBegin);
}
// 3
System.arraycopy(value, srcBegin, dst, dstBegin, srcEnd - srcBegin);
}
public static native void arraycopy(Object src, int srcPos,
Object dest, int destPos,
int length);
一路追踪代码,可以无论是2.1还是2.2,最终都调用了本地方法arraycopy(),这里抛出的异常。
本地方法定义如下:
/* Only register the performance-critical methods */
static JNINativeMethod methods[] = {
{"currentTimeMillis", "()J", (void *)&JVM_CurrentTimeMillis},
{"nanoTime", "()J", (void *)&JVM_NanoTime},
{"arraycopy", "(" OBJ "I" OBJ "II)V", (void *)&JVM_ArrayCopy},
};
JNIEXPORT void JNICALL
JVM_ArrayCopy(JNIEnv *env, jclass ignored, jobject src, jint src_pos,
jobject dst, jint dst_pos, jint length);
/*
java.lang.System中的arraycopy方法
*/
JVM_ENTRY(void, JVM_ArrayCopy(JNIEnv *env, jclass ignored, jobject src, jint src_pos,
jobject dst, jint dst_pos, jint length))
JVMWrapper("JVM_ArrayCopy");
// Check if we have null pointers
if (src == NULL || dst == NULL) {
THROW(vmSymbols::java_lang_NullPointerException());
}
arrayOop s = arrayOop(JNIHandles::resolve_non_null(src));
arrayOop d = arrayOop(JNIHandles::resolve_non_null(dst));
assert(oopDesc::is_oop(s), "JVM_ArrayCopy: src not an oop");
assert(oopDesc::is_oop(d), "JVM_ArrayCopy: dst not an oop");
// Do copy
s->klass()->copy_array(s, src_pos, d, dst_pos, length, thread);
JVM_END
这里发现JVM_ArrayCopy()只是简单的检测源数组和目的数组不为空,排除一些异常情况,并没有复制数组,而是调用了s->klass()->copy_array()方法来实现。源码如下:
/**
* java.lang.System中的arraycopy方法具体实现
*
* @param s 源数组
* @param src_pos 源数组开始复制的下标
* @param d 目标数组
* @param dst_pos 目标数组开始覆盖的下标
* @param length 要复制的数组元素数量
* @param TRAPS 线程信息
*/
void ObjArrayKlass::copy_array(arrayOop s, int src_pos, arrayOop d,
int dst_pos, int length, TRAPS) {
···
// Check is all offsets and lengths are non negative
// 检查所有的偏移量和长度是否非负
if (src_pos < 0 || dst_pos < 0 || length < 0) {
···
THROW_MSG(vmSymbols::java_lang_ArrayIndexOutOfBoundsException(), ss.as_string());
}
// Check if the ranges are valid
// 检查数组边界是否合法,如果
// 1.要复制的数组元素数量 + 源数组开始复制的下标 > 源数组长度
// 2.要复制的数组元素数量 + 目标数组开始覆盖的下标 > 目标数组长度
// 两种情况中有一种,就抛出数组下标越界异常
if ((((unsigned int) length + (unsigned int) src_pos) > (unsigned int) s->length()) ||
(((unsigned int) length + (unsigned int) dst_pos) > (unsigned int) d->length())) {
···
THROW_MSG(vmSymbols::java_lang_ArrayIndexOutOfBoundsException(), ss.as_string());
}
···
}
阅读源码及注释可以知道,上面两种情况下,都会抛出ArrayIndexOutOfBoundsException。
到这里我们可以猜测出异常抛出的原因了:因为append()方法是在多线程(parallelStream并行流)中调用的,所以可能有两个或者多个线程通过了ensureCapacityInternal()方法的空间校验,而实际空间不足而导致了数组下标越界。
例如有A、B两个线程,都需要拼接一个长度为40的字符串,而当前剩余空间为50。
当A通过ensureCapacityInternal()检验且为执行getChars()方法时被挂起,这时B线程通过ensureCapacityInternal()对空间进行校验是可以通过的,因为40<50。
接下来当A、B线程进行数组复制时,后复制的那个线程将出现数组下标越界异常,因为第一个线程复制完成后,剩下空间只有10。10<40而导致空间不足,下标越界。
3. 其他问题
3.1在测试代码中,我们可以很容易观察到,拼接的字符串长度越大,异常越容易发生。
我们分析下面的源码:
private void ensureCapacityInternal(int minimumCapacity) {
// overflow-conscious code
if (minimumCapacity - value.length > 0) {
value = Arrays.copyOf(value,
newCapacity(minimumCapacity));
}
}
private int newCapacity(int minCapacity) {
// 默认新数组容量为原数组的两倍+2
int newCapacity = (value.length << 1) + 2;
// 如果原数组的两倍+2还是小于需要的最小所需容量,则取最小所需容量为新数组容量
if (newCapacity - minCapacity < 0) {
newCapacity = minCapacity;
}
// 如果获取容量数值溢出,或者大于最大数组容量,则特殊处理(小于int最大值,则正常返回,否则抛出异常)
return (newCapacity <= 0 || MAX_ARRAY_SIZE - newCapacity < 0)
? hugeCapacity(minCapacity)
: newCapacity;
}
public static char[] copyOf(char[] original, int newLength) {
char[] copy = new char[newLength];
System.arraycopy(original, 0, copy, 0,
Math.min(original.length, newLength));
return copy;
}
可以看到ensureCapacityInternal()方法的入参minimumCapacity是源内部数组已存放的字符串长度+要拼接的字符串长度,只有源内部数组的总长度小于minimumCapacity,才会调用newCapacity()方法获取新内部数组的长度,然后调用copyOf()方法将源数组的元素复制到新内部数组。
分析可以得出原因:
字符串的内部数组,默认的长度是16,如果循环拼接最终的字符串长度小于16,则这个异常不会发生。
因为内部数组每次扩容,都是原数组长度x2+2,所以拼接的字符串长度越长,循环前几次,遇到长度不够报异常的可能性越大,触发异常所需要的的循环次数越少。
3.2 并行流parallelStream()使用问题
parallelStream提供了流的并行处理,它是Stream的另一重要特性,其底层使用Fork/Join框架实现。
Fork/Join 框架的核心是采用分治法的思想,将一个大任务拆分为若干互不依赖的子任务,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务。
同时,为了最大限度地提高并行处理能力,采用了工作窃取算法来运行任务,也就是说当某个线程处理完自己工作队列中的任务后,尝试当其他线程的工作队列中窃取一个任务来执行,直到所有任务处理完毕。所以为了减少线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
Fork/Join 的运行流程图
用下面的示例演示一下parallelStream的使用。
public static void main(String[] args) {
List lists = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
lists.add(i);
}
lists.parallelStream().forEach(System.out::println);
lists.parallelStream().map(p->++p).forEach(System.out::println);
}
输出:
6 5 8 9 7 1 0 2 4 3
7 9 3 10 2 6 1 8 5 4
我们发现,使用parallelStream后,结果并不按照集合原有顺序输出。为了进一步证明该操作是并行的,我们打印出线程信息。
public static void main(String[] args) {
List lists = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
lists.add(i);
}
lists.parallelStream().forEach(num -> System.out.println(num + "--" +
Thread.currentThread().getName()));
}
输出:
6--main
8--ForkJoinPool.commonPool-worker-2
1--ForkJoinPool.commonPool-worker-3
2--ForkJoinPool.commonPool-worker-1
9--ForkJoinPool.commonPool-worker-2
5--main
3--ForkJoinPool.commonPool-worker-2
4--ForkJoinPool.commonPool-worker-1
7--ForkJoinPool.commonPool-worker-4
0--ForkJoinPool.commonPool-worker-3
如上,可以确信parallelStream是利用多线程进行的,这可以很大程度简化我们在需要的时候,进行并行操作。例如第一个例子中,对所有集合元素进行自增操作。尤其是当数据量非常庞大的时候,并行流的数据处理将具有无与伦比的优势。
但同时,并行流也是一把双刃剑,使用不当,也会引发不好后果,比如我这次碰到的线上bug,还有就是bug代码中,使用并行流进行字符串拼接,我认为也是一种非常不好的用法,因为字符串拼接是,我们往往是追求有序地拼接,这样文本语意才会符合我们的预期,但是使用并行流很明显不能满足这一点。
由于并行流使用多线程,则一切线程安全问题都应该是需要考虑的问题,如:资源竞争、死锁、事务、可见性等等。
4. 总结
4.1 bug修复方法
使用串行流stream();
使用线程安全的StringBuffer。
结合前面讨论与思考,适合第一种方式,毕竟数据量也不大。
4.2 并行流 or 串行流
parallelStream是一把同时有巨大隐患和好处的双刃剑,那么,使用如何选择,我们可以考虑以下几个问题:
是否需要并行?
任务之间是否是独立的?是否会引起任何竞态条件?
结果是否取决于任务的调用顺序?
对于问题1,在回答这个问题之前,你需要弄清楚你要解决的问题是什么,数据量有多大,计算的特点是什么?并不是所有的问题都适合使用并发程序来求解,比如当数据量不大时,顺序执行往往比并行执行更快。毕竟,准备线程池和其它相关资源也是需要时间的。但是,当任务涉及到I/O操作并且任务之间不互相依赖时,那么并行化就是一个不错的选择。通常而言,将这类程序并行化之后,执行速度会提升好几个等级。
对于问题2,如果任务之间是独立的,并且代码中不涉及到对同一个对象的某个状态或者某个变量的更新操作,那么就表明代码是可以被并行化的。
对于问题3,由于在并行环境中任务的执行顺序是不确定的,因此对于依赖于顺序的任务而言,并行化也许不能给出正确的结果。