Zeppelin:如何在自定义解释器中创建DataFrame?

2019年4月12日 15点热度 0条评论

我正在为特定领域的语言开发自定义解释器。基于Apache Zeppelin文档(https://zeppelin.incubator.apache.org/docs/latest/development/writingzeppelininterpreter.html)中给出的示例,解释器运行良好。现在,我想将一些结果存储在新的DataFrame中。

我找到了创建DataFrames的代码(http://spark.apache.org/docs/latest/sql-programming-guide.html),但是我不能在解释器中使用它,因为我基本上找不到从自定义解释器中访问有效的运行时SparkContext(通常称为“sc”)的方法。

我尝试了(静态)SparkContext.getOrCreate(),但这甚至导致了ClassNotFoundException。然后,我将整个zeppelin-spark-dependencies ... jar添加到我的解释器文件夹中,该文件夹解决了类加载问题,但是现在我得到了SparkException(“必须设置主URL ...”)。

知道如何从自定义解释器中访问我的Notebook的SparkContext吗?非常感谢!

更新

感谢下面的Kangrok Lee的评论,我的代码现在如下所示:请参见下文。它运行并似乎创建了一个DataFrame(至少它不再抛出任何Exception)。但是我不能在后续的SQL段落中使用创建的DataFrame(第一段使用我的“%opl”解释器,如下所示,该解释器应创建“结果” DataFrame):

%opl
1 2 3
> 1
> 2
> 3

%sql
select * from result
> Table not found: result; line 1 pos 14

因此,我处理SparkContext的方式可能仍然存在问题。有任何想法吗?非常感谢!

package opl;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OplInterpreter2 extends Interpreter {

static {
    Interpreter.register("opl","opl",OplInterpreter2.class.getName(),
        new InterpreterPropertyBuilder()
        .add("spark.master", "local[4]", "spark.master")
        .add("spark.app.name", "Opl Interpreter", "spark.app.name")
        .add("spark.serializer", "org.apache.spark.serializer.KryoSerializer", "spark.serializer")
        .build());
}

private Logger logger = LoggerFactory.getLogger(OplInterpreter2.class);

private void log(Object o) {
    if (logger != null)
        logger.warn("OplInterpreter2 "+o);
}

public OplInterpreter2(Properties properties) {
    super(properties);
    log("CONSTRUCTOR");
}

@Override
public void open() {
    log("open()");
}

@Override
public void cancel(InterpreterContext arg0) {
    log("cancel()");
}

@Override
public void close() {
    log("close()");
}

@Override
public List<String> completion(String arg0, int arg1) {
    log("completion()");
    return new ArrayList<String>();
}

@Override
public FormType getFormType() {
    log("getFormType()");
    return FormType.SIMPLE;
}

@Override
public int getProgress(InterpreterContext arg0) {
    log("getProgress()");
    return 100;
}

@Override
public InterpreterResult interpret(String string, InterpreterContext context) {
    log("interpret() "+string);
    PrintStream oldSys = System.out;
    try {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        System.setOut(ps);
        execute(string);
        System.out.flush();
        System.setOut(oldSys);
        return new InterpreterResult(
                InterpreterResult.Code.SUCCESS,
                InterpreterResult.Type.TEXT,
                baos.toString());
    } catch (Exception ex) {
        System.out.flush();
        System.setOut(oldSys);
        return new InterpreterResult(
                InterpreterResult.Code.ERROR,
                InterpreterResult.Type.TEXT,
                ex.toString());
    }
}

private void execute(String code) throws Exception {
    SparkContext sc = SparkContext.getOrCreate();
    SQLContext sqlc = SQLContext.getOrCreate(sc);
    StructType structType = new StructType().add("value",DataTypes.IntegerType);
    ArrayList<Row> list = new ArrayList<Row>();
    for (String s : code.trim().split("\\s+")) {
        int value = Integer.parseInt(s);
        System.out.println(value);
        list.add(RowFactory.create(value));
    }
    DataFrame df = sqlc.createDataFrame(list,structType);
    df.registerTempTable("result");
}
}

解决方案如下:

最终我找到了一个解决方案,尽管我认为这不是一个很好的解决方案。在下面的代码中,我使用的是在org.apache.zeppelin.spark.PySparkInterpreter.java中找到的函数getSparkInterpreter()。

这要求我将打包的代码(jar)放到Spark解释器文件夹中,而不是它自己的解释器文件夹中,我认为这是首选的方式(根据https://zeppelin.incubator.apache.org/docs/latest/development/writingzeppelininterpreter.html)。另外,我的解释器不会作为自己的解释器出现在Zeppelin的解释器配置页面中。但是,它仍然可以在齐柏林飞艇的段落中使用。

并且:在代码中,我可以创建一个DataFrame,这也可以在我的段落之外使用-这是我想要实现的。

package opl;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.spark.SparkInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OplInterpreter2 extends Interpreter {

    static {
        Interpreter.register(
                "opl", 
                "spark",//"opl", 
                OplInterpreter2.class.getName(),
                new InterpreterPropertyBuilder()
                    .add("sth", "defaultSth", "some thing")
                    .build());
    }

    private Logger logger = LoggerFactory.getLogger(OplInterpreter2.class);

    private void log(Object o) {
        if (logger != null)
            logger.warn("OplInterpreter2 "+o);
    }

    public OplInterpreter2(Properties properties) {
        super(properties);
        log("CONSTRUCTOR");
    }

    @Override
    public void open() {
        log("open()");
    }

    @Override
    public void cancel(InterpreterContext arg0) {
        log("cancel()");
    }

    @Override
    public void close() {
        log("close()");
    }

    @Override
    public List<String> completion(String arg0, int arg1) {
        log("completion()");
        return new ArrayList<String>();
    }

    @Override
    public FormType getFormType() {
        log("getFormType()");
        return FormType.SIMPLE;
    }

    @Override
    public int getProgress(InterpreterContext arg0) {
        log("getProgress()");
        return 100;
    }

    @Override
    public InterpreterResult interpret(String string, InterpreterContext context) {
        log("interpret() "+string);
        PrintStream oldSys = System.out;
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            PrintStream ps = new PrintStream(baos);
            System.setOut(ps);
            execute(string);
            System.out.flush();
            System.setOut(oldSys);
            return new InterpreterResult(
                    InterpreterResult.Code.SUCCESS,
                    InterpreterResult.Type.TEXT,
                    baos.toString());
        } catch (Exception ex) {
            System.out.flush();
            System.setOut(oldSys);
            return new InterpreterResult(
                    InterpreterResult.Code.ERROR,
                    InterpreterResult.Type.TEXT,
                    ex.toString());
        }
    }

    private void execute(String code) throws Exception {
        SparkInterpreter sintp = getSparkInterpreter();
        SQLContext sqlc = sintp.getSQLContext();
        StructType structType = new StructType().add("value",DataTypes.IntegerType);
        ArrayList<Row> list = new ArrayList<Row>();
        for (String s : code.trim().split("\\s+")) {
            int value = Integer.parseInt(s);
            System.out.println(value);
            list.add(RowFactory.create(value));
        }
        DataFrame df = sqlc.createDataFrame(list,structType);
        df.registerTempTable("result");
    }

    private SparkInterpreter getSparkInterpreter() {
        LazyOpenInterpreter lazy = null;
        SparkInterpreter spark = null;
        Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
        while (p instanceof WrappedInterpreter) {
            if (p instanceof LazyOpenInterpreter) {
                lazy = (LazyOpenInterpreter) p;
            }
            p = ((WrappedInterpreter) p).getInnerInterpreter();
        }
        spark = (SparkInterpreter) p;
        if (lazy != null) {
            lazy.open();
        }
        return spark;
    }
}