Checkpoint 1

In this project, you'll implement an unoptimized SQL runtime. It is worth 10 points

Requirements

Grading Rubric

To be posted shortly

Catalyzer

This project has you working with a hollowed-out version of Apache Spark called Catalyzer. For details on this project: To import Catalyzer and its dependencies into your project, add the following lines to your build.sbt:
resolvers += "MimirDB" at "https://maven.mimirdb.info/"
libraryDependencies += "edu.buffalo.cse.odin" %% "catalyzer" % "3.0"
Note that unlike Spark itself, all code in Catalyzer is fair game for this class, and reviewing it will not count as a violation of Academic integrity.

Expected API

Once your code is running, you should immediately print out $>\n. The runtime will wait for this to happen. You should then read in exactly one line from System.in, parse it, process it as described below, and then print out $>\n. This prompt is how the grading script knows that your code is ready or done with an instruction (e.g., it has printed all outputs). If your code does not print this, the grading script will time out, and you will receive a 0.

Parsing SQL

You may use Spark's SQL parser:
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

...
  def parseSql(sql: String): LogicalPlan = 
    new SparkSqlParser().parsePlan(sql)
...
Note that parsePlan will parse both queries (SELECT) and DDL/DML (e.g., CREATE TABLE).

CREATE TABLE

  case class CreateTableStatement(
    tableName: Seq[String],
    tableSchema: StructType,
    partitioning: Seq[Transform],
    bucketSpec: Option[Object],
    properties: Map[String, String],
    provider: Option[String],
    options: Map[String, String],
    location: Option[String],
    comment: Option[String],
    serde: Option[SerdeInfo],
    external: Boolean,
    ifNotExists: Boolean) extends ParsedStatement

This is a lot to take in, but for our purposes here, the only three parts of the CreateTableStatement class that we care about are the tableName, tableSchema, and location. The table name may seem a bit weird, as it's a sequence and not a string. This is to manage multi-part table names used to reference tables in a specific schema (e.g., dataSource.tableName). For our purposes, you can always expect this sequence to contain exactly one element.

The second part is the tableSchema. Recall that tables are collections of tuples. Spark refers to tuple types as StructTypes. A StructType's elements are StructFields, which include the field's name, its data type, and some other metadata like whether the field is allowed to be null.

Finally, we have the location. Unlike a CREATE TABLE command in a normal relational database, here we're defining what's called an "external table". We're defining a schema over a remote file so that the database can access its contents. The location field will point at a (relative) path to a data file with the contents of the table. For example:

  CREATE TABLE R(id int, fruit string) USING csv OPTIONS(path 'data/R.csv', delimiter '|')
The corresponding file located at data/R.data might contain:
1|apple
2|banana
3|clementine
4|duran

You'll note that the provider field will be set to Some("csv") and the options field will be Map("delimiter" -> "|"). Feel free to implement these as you see fit, but they will not change for this project. As in the example above, the data file itself will be CSV-like, with one record per line (\n-delimited), and fields in human-readable strings (|-delimited).

SELECT

Any other LogicalPlan returned by parsePlan will be a query. You'll need to evaluate this query and return results in a format identical to the data files above. If you need some help debugging, your output should be identical to (modulo row order) the output produced by Sqlite3 for the same query on the same data.

Although you will not be graded on the specific implementation strategy you pick, the following is one relatively straightforward approach to getting an A that closely mirrors how Spark itself is implemented. First though, let's talk a little about how Spark executes queries. Broadly, Spark queries go through five phases:

  1. Analysis: Placeholders left after parsing are replaced and variables are "wired up"
  2. Optimization: The logical plan is rewritten into a more efficient plan.
  3. Phyisical Planning: The logical plan is transformed into a physical plan, as specific algorithms are chosen. Some optimization happens here as well.
  4. Code Generation: Spark produces native code for its plans.
  5. Execution: The query is distributed to executor nodes (if needed) and run in parallel.

For this class, we'll be taking a slightly simpler approach to the latter three steps, as they are intended for a large-scale distributed system. For checkpoint 1, you can also still get an A without completing the optimization step (we'll come back to that in checkpoint 2). That leaves us with basically three steps:

  1. Analysis
  2. Iterator Construction
  3. Execution
Let's take each of these in turn.

Analysis

Shortly after parsing, Spark applies a two-part process called analysis:

Spark's validation logic is mostly intact, but you'll need to implement the Resolution step yourself. In particular, Spark's SQL Parser leaves behind "placeholder" nodes in both the Expression and LogicalPlan ASTs, whenever the user references something by a string. Normally, the analysis step replaces these placeholders with something that can actually be used. Placeholders that you can expect to encounter are listed below.

Before we discuss placeholders, we need to take a brief digression to explain the exprId field in many Expression AST nodes. For example:

case class AttributeReference(
    name: String,
    dataType: DataType,
    nullable: Boolean = true,
    override val metadata: Metadata = Metadata.empty)(
    val exprId: ExprId = NamedExpression.newExprId,
    val qualifier: Seq[String] = Seq.empty[String])
  extends Attribute with Unevaluable {
  

In most node types, the exprId field is allocated automatically with a fresh identifier (i.e., using NamedExpression.newExprId) when the node is created. Spark uses exprIds internally to keep track of which expressions line up with which other expressions. Two Attributes are the same if and only if they have the same exprId (whether their names are the same or different does not matter).

Both LogicalPlan and Expression provide a transform (and transformUp and transformDown) method to aid with rewriting ASTs. These methods make it very easy to replace parts of the tree. For example, to compute a new tree with UnresolvedRelation nodes replaced, you might write

  plan.transform {
    case UnresolvedRelation(Seq(tableName)) => /* write your replacement here */
  }
  

The other thing to discuss before we move on is resolution. Expression provides a resolved method that checks to see whether the expression has been fully resolved. The dataType method will not work until resolved returns true. resolved, in particular checks for three things:

  1. All descendents of the node must be resolved
  2. The node itself must not be an Unresolved___
  3. The node's children must have a dataType compatible with the node itself.

The last condition is especially tricky, but you can call e.checkInputDataTypes() on each node of the tree to check for errors (see below).

That all being said, here are unresolved nodes you can expect to encounter:

LogicalPlan

case class UnresolvedRelation(
    nameElements: Seq[String], 
    options: CaseInsensitiveStringMap, 
    isStreaming: Boolean)

This class is used when a relation is referenced in a LogicalPlan in SQL (typically the FROM clause of a SELECT). The nameElements field encodes the '.'-separated elements of the table name (e.g., foo.bar would be encoded as Seq("foo", "bar")). Under typical use, this sequence will always have only one element. Name elements are case-insensitive.

Occurrences of this class will need to be replaced during analysis with an AST node that knows what attributes the corresponding table has. Spark has several built-in LogicalPlan nodes that can be used to encode tables, but you might find it easier to just create your own subclass of LeafNode to represent a table node. A subclass of LeafNode only needs to implement one field:

case class ____(____) 
  extends LeafNode
{
  val output: AttributeSet = ???
}
Note that AttributeSet is a subclass of Seq[Attribute]. In general, the output field should be given as a sequence of AttributeReferences (see above).

Expression

case class UnresolvedStar(target: Option[Seq[String]])

Any asterisk * appearing in a SQL is translated into this class. Generally, this happens in three places:

The first two cases (the only two we care about in this checkpoint) are special, as they both represent multiple fields. You'll need to expand these out during the analysis phase. Note that like UnresolvedRelation, table names are Sequences of .-separated strings.


case class UnresolvedAttribute(nameParts: Seq[String])

An attribute name that hasn't been "wired up" with an exprId. In general, there are two cases that need to be handled during Analysis:

For resolving attributes, keep in mind that each operator (that has been resolved already) knows its output schema (typically computing it from the input schema):

val attributes: AttributeSet = source.output
As above, note that AttributeSet is a subclass of Seq[Attribute] In general, you can expect the contents of this sequence to consist of: Assuming that you've done your analysis job right, you should only see AttributeReferences.

One additional thing that may be helpful in resolving UnresolvedAttributes is that AttributeReference has a qualifiers field Spark uses to store the table name. This field is automatically managed in nested subqueries, but keep in mind that if you're using a custom table class (as suggested above), you will need to set this field yourself when declaring the table's output there.

Iterator Construction

Although Expression provides an eval method, LogicalPlan does not. To evaluate LogicalPlans, you need to compile them. The most straightforward way to implement a Relational Algebra plan is a so called "pull"-based model that you might already be familiar with: Iterators. This is the starting point for Spark, and many other relational database engine's runtimes. Implementing an iterator typically involves two methods:
class MyIterator extends Iterator[MyFoo]
{
    def hasNext: Boolean = /* return true if there are more elements */
    def next: MyFoo = /* assemble and return the MyFoo instance */
}
For example, here's a simple one that iterates over a range of values.
class Range(low: Int, high: Int) extends Iterator[Int]
{
    var current = low
    def hasNext = current < high
    def next: Int = { val ret = current; current += 1; return ret }
} 
In addition to the base table class you created above, you'll be expected to support the following `LogicalPlan` node types:

For each operator, look at the operator in isolation. Imagine that you have an iterator over its input(s). Forget about the full stack of plan nodes and focus on implementing each relational operator in terms of its inputs. Also keep in mind that some operators are just there as decorations (e.g., SubqueryAlias).

Expression Evaluation

It will be helpful (particularly for Project and Filter) to have a way to evaluate primitive-valued expressions. Fortunately, Expression already has a method for this: eval

def eval(input: InternalRow): Any = ???  
For example:
val row = InternalRow.fromSeq( Seq(1, 2, "bob") )
val literal = expression.eval(row)

Specifically eval takes an org.apache.spark.sql.catalyst.InternalRow as input and produces an Any as output (analogous to Java's Object). InternalRow is a very thin wrapper around an Array[Any]. The InternalRow argument will be passed down through the tree and evaluated.

Observe that some node types are marked as Unevaluable. These will need to be replaced before you call eval. In particular, AttributeReference is unevaluable because it references the attribute symbolically (by expreId), while InternalRow doesn't let you look up attributes this way. An easy way to fix this is to create your own subclass of Expression that instead references the attribute by its position in the InternalRow.

Since InternalRow is already used by Spark, you may wish to use it as the content type for your Iterators as well.

Evaluation

Once you have an iterator implemented, you just need to print out the contents, one per line, with fields separated by a | character.

Example Queries

You can find a collection of example queries here. Your code should support all of the queries in the TABLEXX.SQL files, at a minimum for this checkpoint.

This page last updated 2022-11-30 17:36:09 -0500