카테고리 없음

Spark scala Java UDF 등록하기

케키키케 2021. 5. 30. 23:38

참고 자료

아래 링크에 동영상도 나와있습니다.

https://www.learningjournal.guru/courses/spark/spark-foundation-training/create-spark-udf-in-scala-and-python/

 

 

 

1. 아래 코드를 빌드하여 jar파일을 생성합니다.

하나의 argument를 사용하기 때문에 UDF1을 implments합니다.

다른 유형의 UDF는 아래 링크에서 확인할 수 있습니다.

https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/api/java/package-summary.html

 

org.apache.spark.sql.api.java (Spark 1.4.0 JavaDoc)

Interface Summary  Interface Description UDF1 A Spark SQL UDF that has 1 arguments. UDF10 A Spark SQL UDF that has 10 arguments. UDF11 A Spark SQL UDF that has 11 arguments. UDF12 A Spark SQL UDF that has 12 arguments. UDF13 A Spark SQL UDF that has 13 ar

spark.apache.org

샘플 Java UDF

import org.apache.spark.sql.api.java.UDF1; // UDF interface to implement

public class testUdf implements UDF1<Integer, Integer> { // Name of the Java UDF
private static final int serialVersionUID = 1;
@Override
public Integer call(Integer num) throws Exception { // Define logic of UDF
return (num + 5);
}
}

 

2. 생성한 jar파일을 테스트 가능한 환경으로 이동합니다.

저는 윈도우 환경에서 gradlew build를 통해 jar파일을 생성하였고, 이를 리눅스 환경으로 옮겼습니다.

윈도우->리눅스 환경으로 파일을 이동할 때, scp를 이용하여 옮깁니다.

 

3. Spark을 실행합니다.

spark 버전 2.4.7에서 테스트하였습니다.

SPARK_HOME/bin/spark-shell --master local --jar 생성한jar파일.jar

 

4. 다음과 같이 실행해봅니다.~ 따라오세요~

import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import testUdf._ //jar파일을 생성한 테스트 코드를 import합니다.

scala> val myudf = new testUdf().call _ //테스트 코드에 있는 call함수를 가져옵니다.

scala> spark.udf.register("myUdf",myudf) //테스크 코드의 myudf 함수를 myUdf로 등록합니다.
res4: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(IntegerType)))

//테스트를 위한 데이터를 생성합니다.
scala> val data = Seq(("a", 1), ("b", 2), ("c", 3)).toDF("name","age")
//data: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> data.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)

scala> data.createTempView("testData")

scala> spark.sql("select myUdf(age) as plus_age, age from testData").show()
+--------+---+
|plus_age|age|
+--------+---+
|       6|  1|
|       7|  2|
|       8|  3|
+--------+---+