init research

This commit is contained in:
2026-02-08 11:20:43 -10:00
commit bdf064f54d
3041 changed files with 1592200 additions and 0 deletions
@@ -0,0 +1,77 @@
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
plugins {
application
kotlin("jvm")
// uses the 'old' Gradle plugin instead of the compiler plugin for now
id("org.jetbrains.kotlinx.dataframe")
// only mandatory if `kotlin.dataframe.add.ksp=false` in gradle.properties
id("com.google.devtools.ksp")
}
repositories {
mavenLocal() // in case of local dataframe development
mavenCentral()
}
dependencies {
// implementation("org.jetbrains.kotlinx:dataframe:X.Y.Z")
implementation(project(":"))
// (kotlin) spark support
implementation(libs.kotlin.spark)
compileOnly(libs.spark)
implementation(libs.log4j.core)
implementation(libs.log4j.api)
}
/**
* Runs the kotlinSpark/typedDataset example with java 11.
*/
val runKotlinSparkTypedDataset by tasks.registering(JavaExec::class) {
classpath = sourceSets["main"].runtimeClasspath
javaLauncher = javaToolchains.launcherFor { languageVersion = JavaLanguageVersion.of(11) }
mainClass = "org.jetbrains.kotlinx.dataframe.examples.kotlinSpark.TypedDatasetKt"
}
/**
* Runs the kotlinSpark/untypedDataset example with java 11.
*/
val runKotlinSparkUntypedDataset by tasks.registering(JavaExec::class) {
classpath = sourceSets["main"].runtimeClasspath
javaLauncher = javaToolchains.launcherFor { languageVersion = JavaLanguageVersion.of(11) }
mainClass = "org.jetbrains.kotlinx.dataframe.examples.kotlinSpark.UntypedDatasetKt"
}
/**
* Runs the spark/typedDataset example with java 11.
*/
val runSparkTypedDataset by tasks.registering(JavaExec::class) {
classpath = sourceSets["main"].runtimeClasspath
javaLauncher = javaToolchains.launcherFor { languageVersion = JavaLanguageVersion.of(11) }
mainClass = "org.jetbrains.kotlinx.dataframe.examples.spark.TypedDatasetKt"
}
/**
* Runs the spark/untypedDataset example with java 11.
*/
val runSparkUntypedDataset by tasks.registering(JavaExec::class) {
classpath = sourceSets["main"].runtimeClasspath
javaLauncher = javaToolchains.launcherFor { languageVersion = JavaLanguageVersion.of(11) }
mainClass = "org.jetbrains.kotlinx.dataframe.examples.spark.UntypedDatasetKt"
}
kotlin {
compilerOptions {
jvmTarget = JvmTarget.JVM_11
freeCompilerArgs.add("-Xjdk-release=11")
}
}
tasks.withType<JavaCompile> {
sourceCompatibility = JavaVersion.VERSION_11.toString()
targetCompatibility = JavaVersion.VERSION_11.toString()
options.release.set(11)
}
@@ -0,0 +1,8 @@
@file:Suppress("ktlint:standard:no-empty-file")
package org.jetbrains.kotlinx.dataframe.examples.kotlinSpark
/*
* See ../spark/compatibilityLayer.kt for the implementation.
* It's the same with- and without the Kotlin Spark API.
*/
@@ -0,0 +1,78 @@
@file:Suppress("ktlint:standard:function-signature")
package org.jetbrains.kotlinx.dataframe.examples.kotlinSpark
import org.apache.spark.sql.Dataset
import org.jetbrains.kotlinx.dataframe.annotations.DataSchema
import org.jetbrains.kotlinx.dataframe.api.aggregate
import org.jetbrains.kotlinx.dataframe.api.groupBy
import org.jetbrains.kotlinx.dataframe.api.max
import org.jetbrains.kotlinx.dataframe.api.mean
import org.jetbrains.kotlinx.dataframe.api.min
import org.jetbrains.kotlinx.dataframe.api.print
import org.jetbrains.kotlinx.dataframe.api.schema
import org.jetbrains.kotlinx.dataframe.api.std
import org.jetbrains.kotlinx.dataframe.api.toDataFrame
import org.jetbrains.kotlinx.dataframe.api.toList
import org.jetbrains.kotlinx.spark.api.withSpark
/**
* With the Kotlin Spark API, normal Kotlin data classes are supported,
* meaning we can reuse the same class for Spark and DataFrame!
*
* Also, since we use an actual class to define the schema, we need no type conversion!
*
* See [Person] and [Name] for an example.
*
* NOTE: You will likely need to run this function with Java 8 or 11 for it to work correctly.
* Use the `runKotlinSparkTypedDataset` Gradle task to do so.
*/
fun main() = withSpark {
// Creating a Spark Dataset. Usually, this is loaded from some server or database.
val rawDataset: Dataset<Person> = listOf(
Person(Name("Alice", "Cooper"), 15, "London", 54, true),
Person(Name("Bob", "Dylan"), 45, "Dubai", 87, true),
Person(Name("Charlie", "Daniels"), 20, "Moscow", null, false),
Person(Name("Charlie", "Chaplin"), 40, "Milan", null, true),
Person(Name("Bob", "Marley"), 30, "Tokyo", 68, true),
Person(Name("Alice", "Wolf"), 20, null, 55, false),
Person(Name("Charlie", "Byrd"), 30, "Moscow", 90, true),
).toDS()
// we can perform large operations in Spark.
// DataFrames are in-memory structures, so this is a good place to limit the number of rows if you don't have the RAM ;)
val dataset = rawDataset.filter { it.age > 17 }
// and convert it to DataFrame via a typed List
val dataframe = dataset.collectAsList().toDataFrame()
dataframe.schema().print()
dataframe.print(columnTypes = true, borders = true)
// now we can use DataFrame-specific functions
val ageStats = dataframe
.groupBy { city }.aggregate {
mean { age } into "meanAge"
std { age } into "stdAge"
min { age } into "minAge"
max { age } into "maxAge"
}
ageStats.print(columnTypes = true, borders = true)
// and when we want to convert a DataFrame back to Spark, we can do the same trick via a typed List
val sparkDatasetAgain = dataframe.toList().toDS()
sparkDatasetAgain.printSchema()
sparkDatasetAgain.show()
}
@DataSchema
data class Name(val firstName: String, val lastName: String)
@DataSchema
data class Person(
val name: Name,
val age: Int,
val city: String?,
val weight: Int?,
val isHappy: Boolean,
)
@@ -0,0 +1,74 @@
@file:Suppress("ktlint:standard:function-signature")
package org.jetbrains.kotlinx.dataframe.examples.kotlinSpark
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.jetbrains.kotlinx.dataframe.api.aggregate
import org.jetbrains.kotlinx.dataframe.api.groupBy
import org.jetbrains.kotlinx.dataframe.api.max
import org.jetbrains.kotlinx.dataframe.api.mean
import org.jetbrains.kotlinx.dataframe.api.min
import org.jetbrains.kotlinx.dataframe.api.print
import org.jetbrains.kotlinx.dataframe.api.schema
import org.jetbrains.kotlinx.dataframe.api.std
import org.jetbrains.kotlinx.dataframe.examples.spark.convertToDataFrame
import org.jetbrains.kotlinx.dataframe.examples.spark.convertToDataFrameByInference
import org.jetbrains.kotlinx.dataframe.examples.spark.convertToSpark
import org.jetbrains.kotlinx.spark.api.col
import org.jetbrains.kotlinx.spark.api.gt
import org.jetbrains.kotlinx.spark.api.withSpark
/**
* Since we don't know the schema at compile time this time, we need to do
* some schema mapping in between Spark and DataFrame.
*
* We will use spark/compatibilityLayer.kt to do this.
* Take a look at that file for the implementation details!
*
* NOTE: You will likely need to run this function with Java 8 or 11 for it to work correctly.
* Use the `runKotlinSparkUntypedDataset` Gradle task to do so.
*/
fun main() = withSpark {
// Creating a Spark Dataframe (untyped Dataset). Usually, this is loaded from some server or database.
val rawDataset: Dataset<Row> = listOf(
Person(Name("Alice", "Cooper"), 15, "London", 54, true),
Person(Name("Bob", "Dylan"), 45, "Dubai", 87, true),
Person(Name("Charlie", "Daniels"), 20, "Moscow", null, false),
Person(Name("Charlie", "Chaplin"), 40, "Milan", null, true),
Person(Name("Bob", "Marley"), 30, "Tokyo", 68, true),
Person(Name("Alice", "Wolf"), 20, null, 55, false),
Person(Name("Charlie", "Byrd"), 30, "Moscow", 90, true),
).toDF()
// we can perform large operations in Spark.
// DataFrames are in-memory structures, so this is a good place to limit the number of rows if you don't have the RAM ;)
val dataset = rawDataset.filter(col("age") gt 17)
// Using inference
val df1 = dataset.convertToDataFrameByInference()
df1.schema().print()
df1.print(columnTypes = true, borders = true)
// Using full schema mapping
val df2 = dataset.convertToDataFrame()
df2.schema().print()
df2.print(columnTypes = true, borders = true)
// now we can use DataFrame-specific functions
val ageStats = df1
.groupBy("city").aggregate {
mean("age") into "meanAge"
std("age") into "stdAge"
min("age") into "minAge"
max("age") into "maxAge"
}
ageStats.print(columnTypes = true, borders = true)
// and when we want to convert a DataFrame back to Spark, we will use the `convertToSpark()` extension function
// This performs the necessary schema mapping under the hood.
val sparkDataset = df2.convertToSpark(spark, sc)
sparkDataset.printSchema()
sparkDataset.show()
}
@@ -0,0 +1,330 @@
package org.jetbrains.kotlinx.dataframe.examples.spark
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.RowFactory
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.ArrayType
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.types.Decimal
import org.apache.spark.sql.types.DecimalType
import org.apache.spark.sql.types.MapType
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.CalendarInterval
import org.jetbrains.kotlinx.dataframe.AnyFrame
import org.jetbrains.kotlinx.dataframe.DataColumn
import org.jetbrains.kotlinx.dataframe.DataFrame
import org.jetbrains.kotlinx.dataframe.DataRow
import org.jetbrains.kotlinx.dataframe.api.rows
import org.jetbrains.kotlinx.dataframe.api.schema
import org.jetbrains.kotlinx.dataframe.api.toDataFrame
import org.jetbrains.kotlinx.dataframe.columns.ColumnGroup
import org.jetbrains.kotlinx.dataframe.columns.TypeSuggestion
import org.jetbrains.kotlinx.dataframe.schema.ColumnSchema
import org.jetbrains.kotlinx.dataframe.schema.DataFrameSchema
import java.math.BigDecimal
import java.math.BigInteger
import java.sql.Date
import java.sql.Timestamp
import java.time.Instant
import java.time.LocalDate
import kotlin.reflect.KType
import kotlin.reflect.KTypeProjection
import kotlin.reflect.full.createType
import kotlin.reflect.full.isSubtypeOf
import kotlin.reflect.full.withNullability
import kotlin.reflect.typeOf
// region Spark to DataFrame
/**
* Converts an untyped Spark [Dataset] (Dataframe) to a Kotlin [DataFrame].
* [StructTypes][StructType] are converted to [ColumnGroups][ColumnGroup].
*
* DataFrame supports type inference to do the conversion automatically.
* This is usually fine for smaller data sets, but when working with larger datasets, a type map might be a good idea.
* See [convertToDataFrame] for more information.
*/
fun Dataset<Row>.convertToDataFrameByInference(
schema: StructType = schema(),
prefix: List<String> = emptyList(),
): AnyFrame {
val columns = schema.fields().map { field ->
val name = field.name()
when (val dataType = field.dataType()) {
is StructType ->
// a column group can be easily created from a dataframe and a name
DataColumn.createColumnGroup(
name = name,
df = this.convertToDataFrameByInference(dataType, prefix + name),
)
else ->
// we can use DataFrame type inference to create a column with the correct type
// from Spark we use `select()` to select a single column
// and `collectAsList()` to get all the values in a list of single-celled rows
DataColumn.createByInference(
name = name,
values = this.select((prefix + name).joinToString("."))
.collectAsList()
.map { it[0] },
suggestedType = TypeSuggestion.Infer,
// Spark provides nullability :) you can leave this out if you want this to be inferred too
nullable = field.nullable(),
)
}
}
return columns.toDataFrame()
}
/**
* Converts an untyped Spark [Dataset] (Dataframe) to a Kotlin [DataFrame].
* [StructTypes][StructType] are converted to [ColumnGroups][ColumnGroup].
*
* This version uses a [type-map][DataType.convertToDataFrame] to convert the schemas with a fallback to inference.
* For smaller data sets, inference is usually fine too.
* See [convertToDataFrameByInference] for more information.
*/
fun Dataset<Row>.convertToDataFrame(schema: StructType = schema(), prefix: List<String> = emptyList()): AnyFrame {
val columns = schema.fields().map { field ->
val name = field.name()
when (val dataType = field.dataType()) {
is StructType ->
// a column group can be easily created from a dataframe and a name
DataColumn.createColumnGroup(
name = name,
df = convertToDataFrame(dataType, prefix + name),
)
else ->
// we create a column with the correct type using our type-map with fallback to inference
// from Spark we use `select()` to select a single column
// and `collectAsList()` to get all the values in a list of single-celled rows
DataColumn.createByInference(
name = name,
values = select((prefix + name).joinToString("."))
.collectAsList()
.map { it[0] },
suggestedType =
dataType.convertToDataFrame()
?.let(TypeSuggestion::Use)
?: TypeSuggestion.Infer, // fallback to inference if needed
nullable = field.nullable(),
)
}
}
return columns.toDataFrame()
}
/**
* Returns the corresponding [Kotlin type][KType] for a given Spark [DataType].
*
* This list may be incomplete, but it can at least give you a good start.
*
* @return The [KType] that corresponds to the Spark [DataType], or null if no matching [KType] is found.
*/
fun DataType.convertToDataFrame(): KType? =
when {
this == DataTypes.ByteType -> typeOf<Byte>()
this == DataTypes.ShortType -> typeOf<Short>()
this == DataTypes.IntegerType -> typeOf<Int>()
this == DataTypes.LongType -> typeOf<Long>()
this == DataTypes.BooleanType -> typeOf<Boolean>()
this == DataTypes.FloatType -> typeOf<Float>()
this == DataTypes.DoubleType -> typeOf<Double>()
this == DataTypes.StringType -> typeOf<String>()
this == DataTypes.DateType -> typeOf<Date>()
this == DataTypes.TimestampType -> typeOf<Timestamp>()
this is DecimalType -> typeOf<Decimal>()
this == DataTypes.CalendarIntervalType -> typeOf<CalendarInterval>()
this == DataTypes.NullType -> nullableNothingType
this == DataTypes.BinaryType -> typeOf<ByteArray>()
this is ArrayType -> {
when (elementType()) {
DataTypes.ShortType -> typeOf<ShortArray>()
DataTypes.IntegerType -> typeOf<IntArray>()
DataTypes.LongType -> typeOf<LongArray>()
DataTypes.FloatType -> typeOf<FloatArray>()
DataTypes.DoubleType -> typeOf<DoubleArray>()
DataTypes.BooleanType -> typeOf<BooleanArray>()
else -> null
}
}
this is MapType -> {
val key = keyType().convertToDataFrame() ?: return null
val value = valueType().convertToDataFrame() ?: return null
Map::class.createType(
listOf(
KTypeProjection.invariant(key),
KTypeProjection.invariant(value.withNullability(valueContainsNull())),
),
)
}
else -> null
}
// endregion
// region DataFrame to Spark
/**
* Converts the [DataFrame] to a Spark [Dataset] of [Rows][Row] using the provided [SparkSession] and [JavaSparkContext].
*
* Spark needs both the data and the schema to be converted to create a correct [Dataset],
* so we need to map our types somehow.
*
* @param spark The [SparkSession] object to use for creating the [DataFrame].
* @param sc The [JavaSparkContext] object to use for converting the [DataFrame] to [RDD][JavaRDD].
* @return A [Dataset] of [Rows][Row] representing the converted DataFrame.
*/
fun DataFrame<*>.convertToSpark(spark: SparkSession, sc: JavaSparkContext): Dataset<Row> {
// Convert each row to spark rows
val rows = sc.parallelize(this.rows().map { it.convertToSpark() })
// convert the data schema to a spark StructType
val schema = this.schema().convertToSpark()
return spark.createDataFrame(rows, schema)
}
/**
* Converts a [DataRow] to a Spark [Row] object.
*
* @return The converted Spark [Row].
*/
fun DataRow<*>.convertToSpark(): Row =
RowFactory.create(
*values().map {
when (it) {
// a row can be nested inside another row if it's a column group
is DataRow<*> -> it.convertToSpark()
is DataFrame<*> -> error("nested dataframes are not supported")
else -> it
}
}.toTypedArray(),
)
/**
* Converts a [DataFrameSchema] to a Spark [StructType].
*
* @return The converted Spark [StructType].
*/
fun DataFrameSchema.convertToSpark(): StructType =
DataTypes.createStructType(
this.columns.map { (name, schema) ->
DataTypes.createStructField(name, schema.convertToSpark(), schema.nullable)
},
)
/**
* Converts a [ColumnSchema] object to Spark [DataType].
*
* @return The Spark [DataType] corresponding to the given [ColumnSchema] object.
* @throws IllegalArgumentException if the column type or kind is unknown.
*/
fun ColumnSchema.convertToSpark(): DataType =
when (this) {
is ColumnSchema.Value -> type.convertToSpark() ?: error("unknown data type: $type")
is ColumnSchema.Group -> schema.convertToSpark()
is ColumnSchema.Frame -> error("nested dataframes are not supported")
else -> error("unknown column kind: $this")
}
/**
* Returns the corresponding Spark [DataType] for a given [Kotlin type][KType].
*
* This list may be incomplete, but it can at least give you a good start.
*
* @return The Spark [DataType] that corresponds to the [Kotlin type][KType], or null if no matching [DataType] is found.
*/
fun KType.convertToSpark(): DataType? =
when {
isSubtypeOf(typeOf<Byte?>()) -> DataTypes.ByteType
isSubtypeOf(typeOf<Short?>()) -> DataTypes.ShortType
isSubtypeOf(typeOf<Int?>()) -> DataTypes.IntegerType
isSubtypeOf(typeOf<Long?>()) -> DataTypes.LongType
isSubtypeOf(typeOf<Boolean?>()) -> DataTypes.BooleanType
isSubtypeOf(typeOf<Float?>()) -> DataTypes.FloatType
isSubtypeOf(typeOf<Double?>()) -> DataTypes.DoubleType
isSubtypeOf(typeOf<String?>()) -> DataTypes.StringType
isSubtypeOf(typeOf<LocalDate?>()) -> DataTypes.DateType
isSubtypeOf(typeOf<Date?>()) -> DataTypes.DateType
isSubtypeOf(typeOf<Timestamp?>()) -> DataTypes.TimestampType
isSubtypeOf(typeOf<Instant?>()) -> DataTypes.TimestampType
isSubtypeOf(typeOf<Decimal?>()) -> DecimalType.SYSTEM_DEFAULT()
isSubtypeOf(typeOf<BigDecimal?>()) -> DecimalType.SYSTEM_DEFAULT()
isSubtypeOf(typeOf<BigInteger?>()) -> DecimalType.SYSTEM_DEFAULT()
isSubtypeOf(typeOf<CalendarInterval?>()) -> DataTypes.CalendarIntervalType
isSubtypeOf(nullableNothingType) -> DataTypes.NullType
isSubtypeOf(typeOf<ByteArray?>()) -> DataTypes.BinaryType
isSubtypeOf(typeOf<ShortArray?>()) -> DataTypes.createArrayType(DataTypes.ShortType, false)
isSubtypeOf(typeOf<IntArray?>()) -> DataTypes.createArrayType(DataTypes.IntegerType, false)
isSubtypeOf(typeOf<LongArray?>()) -> DataTypes.createArrayType(DataTypes.LongType, false)
isSubtypeOf(typeOf<FloatArray?>()) -> DataTypes.createArrayType(DataTypes.FloatType, false)
isSubtypeOf(typeOf<DoubleArray?>()) -> DataTypes.createArrayType(DataTypes.DoubleType, false)
isSubtypeOf(typeOf<BooleanArray?>()) -> DataTypes.createArrayType(DataTypes.BooleanType, false)
isSubtypeOf(typeOf<Array<*>>()) ->
error("non-primitive arrays are not supported for now, you can add it yourself")
isSubtypeOf(typeOf<List<*>>()) -> error("lists are not supported for now, you can add it yourself")
isSubtypeOf(typeOf<Set<*>>()) -> error("sets are not supported for now, you can add it yourself")
classifier == Map::class -> {
val (key, value) = arguments
DataTypes.createMapType(
key.type?.convertToSpark(),
value.type?.convertToSpark(),
value.type?.isMarkedNullable ?: true,
)
}
else -> null
}
private val nullableNothingType: KType = typeOf<List<Nothing?>>().arguments.first().type!!
// endregion
@@ -0,0 +1,105 @@
@file:Suppress("ktlint:standard:function-signature")
package org.jetbrains.kotlinx.dataframe.examples.spark
import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SparkSession
import org.jetbrains.kotlinx.dataframe.annotations.DataSchema
import org.jetbrains.kotlinx.dataframe.api.aggregate
import org.jetbrains.kotlinx.dataframe.api.groupBy
import org.jetbrains.kotlinx.dataframe.api.max
import org.jetbrains.kotlinx.dataframe.api.mean
import org.jetbrains.kotlinx.dataframe.api.min
import org.jetbrains.kotlinx.dataframe.api.print
import org.jetbrains.kotlinx.dataframe.api.schema
import org.jetbrains.kotlinx.dataframe.api.std
import org.jetbrains.kotlinx.dataframe.api.toDataFrame
import org.jetbrains.kotlinx.dataframe.api.toList
import java.io.Serializable
/**
* For Spark, Kotlin data classes are supported if we:
* - Add [@JvmOverloads][JvmOverloads] to the constructor
* - Make all parameter arguments mutable and with defaults
* - Make them [Serializable]
*
* But by adding [@DataSchema][DataSchema] we can reuse the same class for Spark and DataFrame!
*
* See [Person] and [Name] for an example.
*
* Also, since we use an actual class to define the schema, we need no type conversion!
*
* NOTE: You will likely need to run this function with Java 8 or 11 for it to work correctly.
* Use the `runSparkTypedDataset` Gradle task to do so.
*/
fun main() {
val spark = SparkSession.builder()
.master(SparkConf().get("spark.master", "local[*]"))
.appName("Kotlin Spark Sample")
.getOrCreate()
val sc = JavaSparkContext(spark.sparkContext())
// Creating a Spark Dataset. Usually, this is loaded from some server or database.
val rawDataset: Dataset<Person> = spark.createDataset(
listOf(
Person(Name("Alice", "Cooper"), 15, "London", 54, true),
Person(Name("Bob", "Dylan"), 45, "Dubai", 87, true),
Person(Name("Charlie", "Daniels"), 20, "Moscow", null, false),
Person(Name("Charlie", "Chaplin"), 40, "Milan", null, true),
Person(Name("Bob", "Marley"), 30, "Tokyo", 68, true),
Person(Name("Alice", "Wolf"), 20, null, 55, false),
Person(Name("Charlie", "Byrd"), 30, "Moscow", 90, true),
),
beanEncoderOf(),
)
// we can perform large operations in Spark.
// DataFrames are in-memory structures, so this is a good place to limit the number of rows if you don't have the RAM ;)
val dataset = rawDataset.filter { it.age > 17 }
// and convert it to DataFrame via a typed List
val dataframe = dataset.collectAsList().toDataFrame()
dataframe.schema().print()
dataframe.print(columnTypes = true, borders = true)
// now we can use DataFrame-specific functions
val ageStats = dataframe
.groupBy { city }.aggregate {
mean { age } into "meanAge"
std { age } into "stdAge"
min { age } into "minAge"
max { age } into "maxAge"
}
ageStats.print(columnTypes = true, borders = true)
// and when we want to convert a DataFrame back to Spark, we can do the same trick via a typed List
val sparkDatasetAgain = spark.createDataset(dataframe.toList(), beanEncoderOf())
sparkDatasetAgain.printSchema()
sparkDatasetAgain.show()
spark.stop()
}
/** Creates a [bean encoder][Encoders.bean] for the given [T] instance. */
inline fun <reified T : Serializable> beanEncoderOf(): Encoder<T> = Encoders.bean(T::class.java)
@DataSchema
data class Name
@JvmOverloads
constructor(var firstName: String = "", var lastName: String = "") : Serializable
@DataSchema
data class Person
@JvmOverloads
constructor(
var name: Name = Name(),
var age: Int = -1,
var city: String? = null,
var weight: Int? = null,
var isHappy: Boolean = false,
) : Serializable
@@ -0,0 +1,87 @@
@file:Suppress("ktlint:standard:function-signature")
package org.jetbrains.kotlinx.dataframe.examples.spark
import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.jetbrains.kotlinx.dataframe.api.aggregate
import org.jetbrains.kotlinx.dataframe.api.groupBy
import org.jetbrains.kotlinx.dataframe.api.max
import org.jetbrains.kotlinx.dataframe.api.mean
import org.jetbrains.kotlinx.dataframe.api.min
import org.jetbrains.kotlinx.dataframe.api.print
import org.jetbrains.kotlinx.dataframe.api.schema
import org.jetbrains.kotlinx.dataframe.api.std
import org.jetbrains.kotlinx.dataframe.examples.spark.convertToDataFrame
import org.jetbrains.kotlinx.dataframe.examples.spark.convertToDataFrameByInference
import org.jetbrains.kotlinx.dataframe.examples.spark.convertToSpark
import org.jetbrains.kotlinx.spark.api.col
import org.jetbrains.kotlinx.spark.api.gt
/**
* Since we don't know the schema at compile time this time, we need to do
* some schema mapping in between Spark and DataFrame.
*
* We will use spark/compatibilityLayer.kt to do this.
* Take a look at that file for the implementation details!
*
* NOTE: You will likely need to run this function with Java 8 or 11 for it to work correctly.
* Use the `runSparkUntypedDataset` Gradle task to do so.
*/
fun main() {
val spark = SparkSession.builder()
.master(SparkConf().get("spark.master", "local[*]"))
.appName("Kotlin Spark Sample")
.getOrCreate()
val sc = JavaSparkContext(spark.sparkContext())
// Creating a Spark Dataframe (untyped Dataset). Usually, this is loaded from some server or database.
val rawDataset: Dataset<Row> = spark.createDataset(
listOf(
Person(Name("Alice", "Cooper"), 15, "London", 54, true),
Person(Name("Bob", "Dylan"), 45, "Dubai", 87, true),
Person(Name("Charlie", "Daniels"), 20, "Moscow", null, false),
Person(Name("Charlie", "Chaplin"), 40, "Milan", null, true),
Person(Name("Bob", "Marley"), 30, "Tokyo", 68, true),
Person(Name("Alice", "Wolf"), 20, null, 55, false),
Person(Name("Charlie", "Byrd"), 30, "Moscow", 90, true),
),
beanEncoderOf<Person>(),
).toDF()
// we can perform large operations in Spark.
// DataFrames are in-memory structures, so this is a good place to limit the number of rows if you don't have the RAM ;)
val dataset = rawDataset.filter(col("age") gt 17)
// Using inference
val df1 = dataset.convertToDataFrameByInference()
df1.schema().print()
df1.print(columnTypes = true, borders = true)
// Using full schema mapping
val df2 = dataset.convertToDataFrame()
df2.schema().print()
df2.print(columnTypes = true, borders = true)
// now we can use DataFrame-specific functions
val ageStats = df1
.groupBy("city").aggregate {
mean("age") into "meanAge"
std("age") into "stdAge"
min("age") into "minAge"
max("age") into "maxAge"
}
ageStats.print(columnTypes = true, borders = true)
// and when we want to convert a DataFrame back to Spark, we will use the `convertToSpark()` extension function
// This performs the necessary schema mapping under the hood.
val sparkDataset = df2.convertToSpark(spark, sc)
sparkDataset.printSchema()
sparkDataset.show()
spark.stop()
}