当前位置: 代码迷 >> SQL >> 【Spark105】Spark SQL动态代码生成1
  详细解决方案

【Spark105】Spark SQL动态代码生成1

热度:364   发布时间:2016-05-05 09:58:11.0
【Spark105】Spark SQL动态代码生成一

?

2015-09-02 14:46:27,681-[TS] DEBUG Executor task launch worker-0 org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection - code for input[0, StringType],input[1, StringType],input[2, StringType],input[3, StringType],input[4, StringType]:

?

日志中的如下信息是如何产生的,这是列及其类型么?

input[0, StringType],input[1, StringType],input[2, StringType],input[3, StringType],input[4, StringType]

?

?

代码:

?

?

/** * A projection that returns UnsafeRow. */abstract class UnsafeProjection extends Projection {  //将IntervalRow转换为UnsafeRow  override def apply(row: InternalRow): UnsafeRow}
?

?

2. UnsafeRow的pointTo方法
  /**   * Update this UnsafeRow to point to different backing data.   *   * @param baseObject the base object   * @param baseOffset the offset within the base object   * @param numFields the number of fields in this row   * @param sizeInBytes the size of this row's backing data, in bytes   */  public void pointTo(Object baseObject, long baseOffset, int numFields, int sizeInBytes) {    assert numFields >= 0 : "numFields (" + numFields + ") should >= 0";    this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);    this.baseObject = baseObject;    this.baseOffset = baseOffset;    this.numFields = numFields;    this.sizeInBytes = sizeInBytes;  }
?
?
?

?

package org.apache.spark.sql.test;import org.apache.spark.sql.catalyst.InternalRow;import org.apache.spark.sql.catalyst.expressions.UnsafeRow;import org.apache.spark.unsafe.Platform;import org.apache.spark.unsafe.types.UTF8String;///在Spark的代码中中没有UnsafeProjection的子类,UnsafeProjection的子类是动态生成的//UnsafeProjection是一个抽象类,子类需要实现apply方法class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {    ///涉及的expressions  private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;    //apply方法返回的结果  private UnsafeRow convertedStruct10;  private byte[] buffer11;  private int cursor12;      public SpecificUnsafeProjection(org.apache.spark.sql.catalyst.expressions.Expression[] expressions) {    this.expressions = expressions;    this.convertedStruct10 = new UnsafeRow();    //buffer11是字节数组,长度为48    this.buffer11 = new byte[48];    this.cursor12 = 0;  }    // Scala.Function1 need this//  public Object apply(Object row) {//    return apply((InternalRow) row);//  }    public UnsafeRow apply(InternalRow i) {        //cursor12首先复制为48,这个更buffer11的长度一样    cursor12 = 48;    //这步操作是对convertedStruct10(UnsafeRow)的一些属性更新更新    //第二个参数是baseOffset,值是    //第三个参数(值5)是Row中的列数    //第四个参数cursor12表示这个Row的sizeInBytes(the size of this row's backing data, in bytes)    convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, cursor12);        /* input[0, StringType] */         //第一列是否为null    boolean isNull0 = i.isNullAt(0);     //不为null,则通过调用getUTF8String获取其UTF8String数据    UTF8String primitive1 = isNull0 ? null : (i.getUTF8String(0));        //获取第一列的字节数(如果是null,则为0),加到cursor12上。    int numBytes14 = cursor12 + (isNull0 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive1));    //如果第一列不为null    if (buffer11.length < numBytes14) {      // This will not happen frequently, because the buffer is re-used.      //扩容      byte[] tmpBuffer13 = new byte[numBytes14 * 2];      //将buffer11的数据复制到tmpBuffer13,然后将tmpBuffer13复制给buffer11,此时buffer11完成了扩容的工作      Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET,        tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length);      buffer11 = tmpBuffer13;    }    //更新值    convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes14);            if (isNull0) {      convertedStruct10.setNullAt(0);    } else {      cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 0, cursor12, primitive1);    }            /* input[1, StringType] */        boolean isNull2 = i.isNullAt(1);    UTF8String primitive3 = isNull2 ? null : (i.getUTF8String(1));            int numBytes15 = cursor12 + (isNull2 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive3));    if (buffer11.length < numBytes15) {      // This will not happen frequently, because the buffer is re-used.      byte[] tmpBuffer13 = new byte[numBytes15 * 2];      Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET,        tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length);      buffer11 = tmpBuffer13;    }    convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes15);            if (isNull2) {      convertedStruct10.setNullAt(1);    } else {      cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 1, cursor12, primitive3);    }            /* input[2, StringType] */        boolean isNull4 = i.isNullAt(2);    UTF8String primitive5 = isNull4 ? null : (i.getUTF8String(2));            int numBytes16 = cursor12 + (isNull4 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive5));    if (buffer11.length < numBytes16) {      // This will not happen frequently, because the buffer is re-used.      byte[] tmpBuffer13 = new byte[numBytes16 * 2];      Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET,        tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length);      buffer11 = tmpBuffer13;    }    convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes16);            if (isNull4) {      convertedStruct10.setNullAt(2);    } else {      cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 2, cursor12, primitive5);    }            /* input[3, StringType] */        boolean isNull6 = i.isNullAt(3);    UTF8String primitive7 = isNull6 ? null : (i.getUTF8String(3));            int numBytes17 = cursor12 + (isNull6 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive7));    if (buffer11.length < numBytes17) {      // This will not happen frequently, because the buffer is re-used.      byte[] tmpBuffer13 = new byte[numBytes17 * 2];      Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET,        tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length);      buffer11 = tmpBuffer13;    }    convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes17);            if (isNull6) {      convertedStruct10.setNullAt(3);    } else {      cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 3, cursor12, primitive7);    }            /* input[4, StringType] */        boolean isNull8 = i.isNullAt(4);    UTF8String primitive9 = isNull8 ? null : (i.getUTF8String(4));            int numBytes18 = cursor12 + (isNull8 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive9));    if (buffer11.length < numBytes18) {      // This will not happen frequently, because the buffer is re-used.      byte[] tmpBuffer13 = new byte[numBytes18 * 2];      Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET,        tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length);      buffer11 = tmpBuffer13;    }    convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes18);            if (isNull8) {      convertedStruct10.setNullAt(4);    } else {      cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 4, cursor12, primitive9);    }                return convertedStruct10;  }}

?

  相关解决方案