How to Read a Csv Table to Hive From Spark Sql Scala

Affiliate 4. Spark SQL and DataFrames: Introduction to Built-in Data Sources

In the previous chapter, we explained the development of and justification for structure in Spark. In particular, nosotros discussed how the Spark SQL engine provides a unified foundation for the high-level DataFrame and Dataset APIs. At present, nosotros'll continue our give-and-take of the DataFrame and explore its interoperability with Spark SQL.

This chapter and the next also explore how Spark SQL interfaces with some of the external components shown in Figure 4-1.

In particular, Spark SQL:

  • Provides the engine upon which the high-level Structured APIs we explored in Chapter three are congenital.

  • Tin read and write data in a variety of structured formats (e.grand., JSON, Hive tables, Parquet, Avro, ORC, CSV).

  • Lets you query information using JDBC/ODBC connectors from external business intelligence (BI) data sources such every bit Tableau, Power BI, Talend, or from RDBMSs such as MySQL and PostgreSQL.

  • Provides a programmatic interface to collaborate with structured data stored as tables or views in a database from a Spark awarding

  • Offers an interactive shell to effect SQL queries on your structured data.

  • Supports ANSI SQL:2003-compliant commands and HiveQL.

Spark SQL connectors and data sources

Figure iv-1. Spark SQL connectors and data sources

Let's begin with how you tin apply Spark SQL in a Spark application.

Using Spark SQL in Spark Applications

The SparkSession, introduced in Spark 2.0, provides a unified entry point for programming Spark with the Structured APIs. You lot can use a SparkSession to access Spark functionality: merely import the class and create an instance in your lawmaking.

To issue any SQL query, use the sql() method on the SparkSession instance, spark, such equally spark.sql("SELECT * FROM myTableName"). All spark.sql queries executed in this manner return a DataFrame on which you may perform further Spark operations if y'all desire—the kind we explored in Chapter 3 and the ones yous will acquire almost in this chapter and the next.

Basic Query Examples

In this section we'll walk through a few examples of queries on the Airline On-Fourth dimension Operation and Causes of Flying Delays data set, which contains data on US flights including appointment, delay, distance, origin, and destination. It's bachelor as a CSV file with over a million records. Using a schema, we'll read the data into a DataFrame and register the DataFrame every bit a temporary view (more on temporary views shortly) then we can query it with SQL.

Query examples are provided in lawmaking snippets, and Python and Scala notebooks containing all of the code presented here are available in the book's GitHub repo. These examples will offer you a gustatory modality of how to utilise SQL in your Spark applications via the spark.sql programmatic interface. Similar to the DataFrame API in its declarative flavor, this interface allows you to query structured data in your Spark applications.

Normally, in a standalone Spark application, you will create a SparkSession instance manually, as shown in the post-obit example. Still, in a Spark beat (or Databricks notebook), the SparkSession is created for you lot and accessible via the appropriately named variable spark.

Let's get started by reading the information set into a temporary view:

              // In Scala              import              org.apache.spark.sql.SparkSession              val              spark              =              SparkSession              .              builder              .              appName              (              "SparkSQLExampleApp"              )              .              getOrCreate              ()              // Path to data fix                            val              csvFile              =              "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"              // Read and create a temporary view              // Infer schema (note that for larger files you may want to specify the schema)              val              df              =              spark              .              read              .              format              (              "csv"              )              .              option              (              "inferSchema"              ,              "truthful"              )              .              option              (              "header"              ,              "truthful"              )              .              load              (              csvFile              )              // Create a temporary view              df              .              createOrReplaceTempView              (              "us_delay_flights_tbl"              )            
              # In Python              from              pyspark.sql              import              SparkSession              # Create a SparkSession              spark              =              (              SparkSession              .              builder              .              appName              (              "SparkSQLExampleApp"              )              .              getOrCreate              ())              # Path to data set              csv_file              =              "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"              # Read and create a temporary view              # Infer schema (note that for larger files you                            # may desire to specify the schema)              df              =              (              spark              .              read              .              format              (              "csv"              )              .              selection              (              "inferSchema"              ,              "true"              )              .              option              (              "header"              ,              "true"              )              .              load              (              csv_file              ))              df              .              createOrReplaceTempView              (              "us_delay_flights_tbl"              )            
Note

If you want to specify a schema, you tin can use a DDL-formatted string. For example:

                // In Scala                val                schema                =                "date STRING, delay INT, distance INT,                                                  origin STRING, destination String"              
                # In Python                schema                =                "`date` String, `filibuster` INT, `altitude` INT,                                `origin`                STRING                ,                `destination`                STRING                "              

Now that we accept a temporary view, we can issue SQL queries using Spark SQL. These queries are no different from those you might issue against a SQL table in, say, a MySQL or PostgreSQL database. The signal here is to evidence that Spark SQL offers an ANSI:2003–compliant SQL interface, and to demonstrate the interoperability between SQL and DataFrames.

The US flying delays data set has five columns:

  • The date column contains a string like 02190925. When converted, this maps to 02-19 09:25 am.

  • The filibuster column gives the filibuster in minutes between the scheduled and actual departure times. Early on departures show negative numbers.

  • The altitude cavalcade gives the altitude in miles from the origin aerodrome to the destination airport.

  • The origin cavalcade contains the origin IATA airdrome code.

  • The destination column contains the destination IATA aerodrome code.

With that in mind, permit's try some case queries confronting this data set.

First, nosotros'll find all flights whose distance is greater than i,000 miles:

spark.sql("""SELECT altitude, origin, destination  FROM us_delay_flights_tbl WHERE distance > k  ORDER Past distance DESC""").show(10)  +--------+------+-----------+ |distance|origin|destination| +--------+------+-----------+ |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | +--------+------+-----------+ only showing top ten rows

As the results show, all of the longest flights were between Honolulu (HNL) and New York (JFK). Next, nosotros'll find all flights betwixt San Francisco (SFO) and Chicago (ORD) with at to the lowest degree a 2-hr delay:

spark.sql("""SELECT date, delay, origin, destination  FROM us_delay_flights_tbl  WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD'  Club by delay DESC""").show(ten)  +--------+-----+------+-----------+ |date    |filibuster|origin|destination| +--------+-----+------+-----------+ |02190925|1638 |SFO   |ORD        | |01031755|396  |SFO   |ORD        | |01022330|326  |SFO   |ORD        | |01051205|320  |SFO   |ORD        | |01190925|297  |SFO   |ORD        | |02171115|296  |SFO   |ORD        | |01071040|279  |SFO   |ORD        | |01051550|274  |SFO   |ORD        | |03120730|266  |SFO   |ORD        | |01261104|258  |SFO   |ORD        | +--------+-----+------+-----------+ only showing top 10 rows

Information technology seems at that place were many significantly delayed flights betwixt these 2 cities, on different dates. (Equally an practice, convert the date cavalcade into a readable format and notice the days or months when these delays were most mutual. Were the delays related to winter months or holidays?)

Let's effort a more complicated query where nosotros use the CASE clause in SQL. In the post-obit example, we want to label all US flights, regardless of origin and destination, with an indication of the delays they experienced: Very Long Delays (> 6 hours), Long Delays (ii–6 hours), etc. We'll add together these human-readable labels in a new column called Flight_Delays:

spark.sql("""SELECT filibuster, origin, destination,                Case                   WHEN filibuster > 360 And so 'Very Long Delays'                   WHEN delay >= 120 AND delay <= 360 So 'Long Delays'                   WHEN delay >= 60 AND delay < 120 THEN 'Brusque Delays'                   WHEN delay > 0 and filibuster < 60 Then 'Tolerable Delays'                   WHEN delay = 0 And so 'No Delays'                   ELSE 'Early on'                END As Flight_Delays                FROM us_delay_flights_tbl                Order By origin, delay DESC""").show(10)  +-----+------+-----------+-------------+ |delay|origin|destination|Flight_Delays| +-----+------+-----------+-------------+ |333  |ABE   |ATL        |Long Delays  | |305  |ABE   |ATL        |Long Delays  | |275  |ABE   |ATL        |Long Delays  | |257  |ABE   |ATL        |Long Delays  | |247  |ABE   |DTW        |Long Delays  | |247  |ABE   |ATL        |Long Delays  | |219  |ABE   |ORD        |Long Delays  | |211  |ABE   |ATL        |Long Delays  | |197  |ABE   |DTW        |Long Delays  | |192  |ABE   |ORD        |Long Delays  | +-----+------+-----------+-------------+ merely showing top 10 rows

As with the DataFrame and Dataset APIs, with the spark.sql interface you tin conduct common data assay operations like those we explored in the previous affiliate. The computations undergo an identical journey in the Spark SQL engine (run into "The Goad Optimizer" in Chapter 3 for details), giving yous the same results.

All three of the preceding SQL queries can be expressed with an equivalent DataFrame API query. For example, the starting time query can exist expressed in the Python DataFrame API every bit:

              # In Python              from              pyspark.sql.functions              import              col              ,              desc              (              df              .              select              (              "distance"              ,              "origin"              ,              "destination"              )              .              where              (              col              (              "altitude"              )              >              1000              )              .              orderBy              (              desc              (              "distance"              )))              .              show              (              x              )              # Or              (              df              .              select              (              "distance"              ,              "origin"              ,              "destination"              )              .              where              (              "distance > 1000"              )              .              orderBy              (              "altitude"              ,              ascending              =              Simulated              )              .              show              (              10              ))            

This produces the aforementioned results every bit the SQL query:

+--------+------+-----------+ |distance|origin|destination| +--------+------+-----------+ |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | |4330    |HNL   |JFK        | +--------+------+-----------+ only showing pinnacle ten rows

As an practise, try converting the other two SQL queries to use the DataFrame API.

Every bit these examples show, using the Spark SQL interface to query data is like to writing a regular SQL query to a relational database table. Although the queries are in SQL, y'all can feel the similarity in readability and semantics to DataFrame API operations, which yous encountered in Chapter iii and volition explore further in the adjacent chapter.

To enable you to query structured data as shown in the preceding examples, Spark manages all the complexities of creating and managing views and tables, both in memory and on disk. That leads usa to our next topic: how tables and views are created and managed.

SQL Tables and Views

Tables agree data. Associated with each table in Spark is its relevant metadata, which is information about the table and its data: the schema, description, table proper noun, database name, cavalcade names, partitions, physical location where the actual data resides, etc. All of this is stored in a central metastore.

Instead of having a split up metastore for Spark tables, Spark by default uses the Apache Hive metastore, located at /user/hive/warehouse, to persist all the metadata about your tables. Still, you may modify the default location by setting the Spark config variable spark.sql.warehouse.dir to another location, which can be ready to a local or external distributed storage.

Managed Versus UnmanagedTables

Spark allows you to create two types of tables: managed and unmanaged. For a managed table, Spark manages both the metadata and the data in the file store. This could be a local filesystem, HDFS, or an object store such every bit Amazon S3 or Azure Blob. For an unmanaged tabular array, Spark only manages the metadata, while yous manage the data yourself in an external data source such as Cassandra.

With a managed table, because Spark manages everything, a SQL command such as Driblet Tabular array table_name deletes both the metadata and the data. With an unmanaged table, the same command volition delete only the metadata, not the actual data. We will expect at some examples of how to create managed and unmanaged tables in the side by side department.

Creating SQL Databases and Tables

Tables reside inside a database. Past default, Spark creates tables under the default database. To create your ain database name, you lot tin can event a SQL control from your Spark application or notebook. Using the U.s.a. flying delays data set, let's create both a managed and an unmanaged table. To begin, nosotros'll create a database called learn_spark_db and tell Spark nosotros want to utilise that database:

              // In Scala/Python              spark              .              sql              (              "CREATE DATABASE learn_spark_db"              )              spark              .              sql              (              "USE learn_spark_db"              )            

From this point, any commands nosotros upshot in our application to create tables will outcome in the tables being created in this database and residing under the database proper noun learn_spark_db.

Creating a managed table

To create a managed table within the database learn_spark_db, you tin can upshot a SQL query like the following:

                // In Scala/Python                spark                .                sql                (                "CREATE TABLE managed_us_delay_flights_tbl (date Cord, filibuster INT,                                                  distance INT, origin STRING, destination String)"                )              

You lot can do the same affair using the DataFrame API similar this:

                # In Python                # Path to our The states flight delays CSV file                                csv_file                =                "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"                # Schema as defined in the preceding instance                schema                =                "appointment Cord, delay INT, distance INT, origin STRING, destination String"                flights_df                =                spark                .                read                .                csv                (                csv_file                ,                schema                =                schema                )                flights_df                .                write                .                saveAsTable                (                "managed_us_delay_flights_tbl"                )              

Both of these statements will create the managed table us_delay_flights_tbl in the learn_spark_db database.

Creating an unmanaged table

Past contrast, you lot can create unmanaged tables from your own information sources—say, Parquet, CSV, or JSON files stored in a file store accessible to your Spark application.

To create an unmanaged tabular array from a information source such every bit a CSV file, in SQL employ:

spark.sql("""CREATE Tabular array us_delay_flights_tbl(engagement STRING, delay INT,    altitude INT, origin STRING, destination Cord)    USING csv OPTIONS (PATH    '/databricks-datasets/learning-spark-v2/flights/departuredelays.csv')""")

And within the DataFrame API apply:

(flights_df   .write   .choice("path", "/tmp/data/us_flights_delay")   .saveAsTable("us_delay_flights_tbl"))
Note

To enable yous to explore these examples, we have created Python and Scala example notebooks that you can find in the book'southward GitHub repo.

Creating Views

In improver to creating tables, Spark can create views on top of existing tables. Views can be global (visible across all SparkSessiondue south on a given cluster) or session-scoped (visible only to a single SparkSession), and they are temporary: they disappear after your Spark application terminates.

Creating views has a like syntax to creating tables within a database. Once you lot create a view, you tin can query it equally you would a table. The difference between a view and a table is that views don't actually hold the data; tables persist later your Spark application terminates, but views disappear.

Y'all tin can create a view from an existing table using SQL. For example, if yous wish to work on only the subset of the United states of america flying delays information set with origin airports of New York (JFK) and San Francisco (SFO), the following queries will create global temporary and temporary views consisting of just that piece of the tabular array:

              -- In SQL              CREATE              OR              Supplant              GLOBAL              TEMP              VIEW              us_origin_airport_SFO_global_tmp_view              AS              SELECT              date              ,              delay              ,              origin              ,              destination              from              us_delay_flights_tbl              WHERE              origin              =              'SFO'              ;              CREATE              OR              REPLACE              TEMP              VIEW              us_origin_airport_JFK_tmp_view              Every bit              SELECT              date              ,              delay              ,              origin              ,              destination              from              us_delay_flights_tbl              WHERE              origin              =              'JFK'            

Y'all can attain the same thing with the DataFrame API as follows:

              # In Python              df_sfo              =              spark              .              sql              (              "SELECT engagement, filibuster, origin, destination FROM                            us_delay_flights_tbl              WHERE              origin              =              'SFO'              ")              df_jfk              =              spark              .              sql              (              "SELECT date, delay, origin, destination FROM                            us_delay_flights_tbl              WHERE              origin              =              'JFK'              ")              # Create a temporary and global temporary view              df_sfo              .              createOrReplaceGlobalTempView              (              "us_origin_airport_SFO_global_tmp_view"              )              df_jfk              .              createOrReplaceTempView              (              "us_origin_airport_JFK_tmp_view"              )            

Once you've created these views, you can effect queries against them only as you would confronting a table. Keep in mind that when accessing a global temporary view you must use the prefix global_temp.<view_name> , because Spark creates global temporary views in a global temporary database called global_temp. For example:

              -- In SQL                            SELECT              *              FROM              global_temp              .              us_origin_airport_SFO_global_tmp_view            

Past contrast, yous can access the normal temporary view without the global_temp prefix:

              -- In SQL                            SELECT              *              FROM              us_origin_airport_JFK_tmp_view            
              // In Scala/Python              spark              .              read              .              table              (              "us_origin_airport_JFK_tmp_view"              )              // Or              spark              .              sql              (              "SELECT * FROM us_origin_airport_JFK_tmp_view"              )            

Yous tin also drop a view just like you would a table:

              -- In SQL              Driblet              VIEW              IF              EXISTS              us_origin_airport_SFO_global_tmp_view              ;              Drib              VIEW              IF              EXISTS              us_origin_airport_JFK_tmp_view            
              // In Scala/Python              spark              .              catalog              .              dropGlobalTempView              (              "us_origin_airport_SFO_global_tmp_view"              )              spark              .              itemize              .              dropTempView              (              "us_origin_airport_JFK_tmp_view"              )            

Temporary views versus global temporary views

The difference between temporary and global temporary views existence subtle, it can be a source of mild confusion among developers new to Spark. A temporary view is tied to a single SparkSession inside a Spark awarding. In contrast, a global temporary view is visible across multiple SparkSessions inside a Spark application. Aye, you tin can create multiple SparkSessiondue south within a single Spark awarding—this tin can be handy, for example, in cases where you want to admission (and combine) data from two unlike SparkSessions that don't share the same Hive metastore configurations.

Caching SQL Tables

Although we will discuss table caching strategies in the side by side chapter, it's worth mentioning here that, like DataFrames, y'all tin cache and uncache SQL tables and views. In Spark iii.0, in addition to other options, yous can specify a tabular array as LAZY, meaning that it should simply exist cached when it is beginning used instead of immediately:

              -- In SQL                            Enshroud                                          [              LAZY              ]                                          TABLE                                                          <                table                -                name                >                                                        UNCACHE                                          Tabular array                                                          <                tabular array                -                name                >                          

Reading Tables into DataFrames

Oftentimes, data engineers build data pipelines every bit function of their regular data ingestion and ETL processes. They populate Spark SQL databases and tables with cleansed data for consumption by applications downstream.

Let's assume you have an existing database, learn_spark_db, and table, us_delay_flights_tbl, ready for employ. Instead of reading from an external JSON file, you tin can merely use SQL to query the table and assign the returned result to a DataFrame:

              // In Scala              val              usFlightsDF              =              spark              .              sql              (              "SELECT * FROM us_delay_flights_tbl"              )              val              usFlightsDF2              =              spark              .              table              (              "us_delay_flights_tbl"              )            
              # In Python              us_flights_df              =              spark              .              sql              (              "SELECT * FROM us_delay_flights_tbl"              )              us_flights_df2              =              spark              .              table              (              "us_delay_flights_tbl"              )            

Now y'all have a cleansed DataFrame read from an existing Spark SQL table. You tin can also read data in other formats using Spark'south built-in data sources, giving you the flexibility to interact with various common file formats.

Data Sources for DataFrames and SQL Tables

As shown in Figure 4-1, Spark SQL provides an interface to a variety of data sources. It too provides a set up of mutual methods for reading and writing data to and from these data sources using the Data Sources API.

In this section we volition cover some of the congenital-in data sources, available file formats, and ways to load and write data, forth with specific options pertaining to these information sources. Just kickoff, let's take a closer wait at ii high-level Data Source API constructs that dictate the manner in which y'all interact with unlike data sources: DataFrameReader and DataFrameWriter.

DataFrameReader

DataFrameReader is the core construct for reading information from a data source into a DataFrame. Information technology has a defined format and a recommended design for usage:

DataFrameReader.format(args).option("key", "value").schema(args).load()

This design of stringing methods together is common in Spark, and like shooting fish in a barrel to read. We saw information technology in Chapter 3 when exploring common information analysis patterns.

Annotation that you lot tin can but admission a DataFrameReader through a SparkSession case. That is, yous cannot create an instance of DataFrameReader. To get an instance handle to information technology, use:

SparkSession.read  // or  SparkSession.readStream

While read returns a handle to DataFrameReader to read into a DataFrame from a static information source, readStream returns an instance to read from a streaming source. (We will encompass Structured Streaming later in the book.)

Arguments to each of the public methods to DataFrameReader accept different values. Table four-ane enumerates these, with a subset of the supported arguments.

Tabular array 4-1. DataFrameReader methods, arguments, and options
Method Arguments Description
format() "parquet", "csv", "txt", "json", "jdbc", "orc", "avro", etc. If you lot don't specify this method, then the default is Parquet or whatever is set up in spark.sql.sources.default.
option() ("fashion", {PERMISSIVE | FAILFAST | DROPMALFORMED } )
("inferSchema", {true | false})
("path", "path_file_data_source")
A series of fundamental/value pairs and options.
The Spark documentation shows some examples and explains the different modes and their actions. The default style is PERMISSIVE. The "inferSchema" and "mode" options are specific to the JSON and CSV file formats.
schema() DDL String or StructType, due east.1000., 'A INT, B STRING' or
StructType(...)
For JSON or CSV format, yous tin can specify to infer the schema in the selection() method. Generally, providing a schema for whatsoever format makes loading faster and ensures your data conforms to the expected schema.
load() "/path/to/data/source" The path to the information source. This tin can be empty if specified in selection("path", "...").

While we won't comprehensively enumerate all the different combinations of arguments and options, the documentation for Python, Scala, R, and Coffee offers suggestions and guidance. Information technology's worthwhile to bear witness a couple of examples, though:

              // In Scala              // Use Parquet                            val              file              =              """/databricks-datasets/learning-spark-v2/flights/summary-                              data/parquet/2010-summary.parquet"""              val              df              =              spark              .              read              .              format              (              "parquet"              ).              load              (              file              )              // Use Parquet; yous can omit format("parquet") if you wish as information technology'due south the default              val              df2              =              spark              .              read              .              load              (              file              )              // Utilize CSV              val              df3              =              spark              .              read              .              format              (              "csv"              )              .              option              (              "inferSchema"              ,              "true"              )              .              option              (              "header"              ,              "truthful"              )              .              option              (              "mode"              ,              "PERMISSIVE"              )              .              load              (              "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"              )              // Apply JSON              val              df4              =              spark              .              read              .              format              (              "json"              )              .              load              (              "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"              )            
Note

In full general, no schema is needed when reading from a static Parquet data source—the Parquet metadata usually contains the schema, so it's inferred. However, for streaming information sources you will have to provide a schema. (Nosotros will cover reading from streaming data sources in Chapter 8.)

Parquet is the default and preferred data source for Spark because it's efficient, uses columnar storage, and employs a fast pinch algorithm. You lot will see boosted benefits afterward (such as columnar pushdown), when we comprehend the Catalyst optimizer in greater depth.

DataFrameWriter

DataFrameWriter does the reverse of its counterpart: it saves or writes information to a specified congenital-in data source. Unlike with DataFrameReader, yous access its example not from a SparkSession but from the DataFrame you wish to save. It has a few recommended usage patterns:

DataFrameWriter.format(args)   .selection(args)   .bucketBy(args)   .partitionBy(args)   .salve(path)  DataFrameWriter.format(args).pick(args).sortBy(args).saveAsTable(tabular array)

To go an instance handle, use:

DataFrame.write // or  DataFrame.writeStream

Arguments to each of the methods to DataFrameWriter also take different values. We listing these in Table four-2, with a subset of the supported arguments.

Tabular array 4-2. DataFrameWriter methods, arguments, and options
Method Arguments Clarification
format() "parquet", "csv", "txt", "json", "jdbc", "orc", "avro", etc. If you don't specify this method, and then the default is Parquet or whatsoever is set in spark.sql.sources.default.
option() ("mode", {append | overwrite | ignore | mistake or errorifexists} )
("mode", {SaveMode.Overwrite | SaveMode.Suspend, SaveMode.Ignore, SaveMode.ErrorIfExists})
("path", "path_to_write_to")
A serial of central/value pairs and options. The Spark documentation shows some examples. This is an overloaded method. The default mode options are error or errorifexists and SaveMode.ErrorIfExists; they throw an exception at runtime if the data already exists.
bucketBy() (numBuckets, col, col..., coln) The number of buckets and names of columns to bucket by. Uses Hive's bucketing scheme on a filesystem.
save() "/path/to/data/source" The path to save to. This can exist empty if specified in option("path", "...").
saveAsTable() "table_name" The tabular array to salve to.

Here's a brusk case snippet to illustrate the use of methods and arguments:

              // In Scala              // Utilise JSON              val              location              =              ...              df              .              write              .              format              (              "json"              ).              mode              (              "overwrite"              ).              save              (              location              )            

Parquet

We'll start our exploration of information sources with Parquet, considering it's the default information source in Spark. Supported and widely used by many large data processing frameworks and platforms, Parquet is an open up source columnar file format that offers many I/O optimizations (such every bit pinch, which saves storage space and allows for quick access to data columns).

Considering of its efficiency and these optimizations, we recommend that subsequently you have transformed and cleansed your data, you lot save your DataFrames in the Parquet format for downstream consumption. (Parquet is also the default table open format for Delta Lake, which we volition cover in Chapter nine.)

Reading Parquet files into a DataFrame

Parquet files are stored in a directory structure that contains the information files, metadata, a number of compressed files, and some condition files. Metadata in the footer contains the version of the file format, the schema, and cavalcade information such as the path, etc.

For example, a directory in a Parquet file might contain a gear up of files like this:

_SUCCESS _committed_1799640464332036264 _started_1799640464332036264 part-00000-tid-1799640464332036264-91273258-d7ef-4dc7-<...>-c000.snappy.parquet

There may exist a number of part-XXXX compressed files in a directory (the names shown here have been shortened to fit on the page).

To read Parquet files into a DataFrame, you simply specify the format and path:

                // In Scala                val                file                =                """/databricks-datasets/learning-spark-v2/flights/summary-data/                                  parquet/2010-summary.parquet/"""                val                df                =                spark                .                read                .                format                (                "parquet"                ).                load                (                file                )              
                # In Python                file                =                """/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/                                  2010-summary.parquet/"""                df                =                spark                .                read                .                format                (                "parquet"                )                .                load                (                file                )              

Unless you are reading from a streaming information source at that place's no need to supply the schema, because Parquet saves it as part of its metadata.

Reading Parquet files into a Spark SQL table

Besides equally reading Parquet files into a Spark DataFrame, you can also create a Spark SQL unmanaged tabular array or view direct using SQL:

                -- In SQL                CREATE                OR                REPLACE                TEMPORARY                VIEW                us_delay_flights_tbl                USING                parquet                OPTIONS                (                path                "/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/                                  2010-summary.parquet/"                )              

In one case you've created the tabular array or view, you can read data into a DataFrame using SQL, equally we saw in some before examples:

                // In Scala                spark                .                sql                (                "SELECT * FROM us_delay_flights_tbl"                ).                bear witness                ()              
                # In Python                spark                .                sql                (                "SELECT * FROM us_delay_flights_tbl"                )                .                show                ()              

Both of these operations return the aforementioned results:

+-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ |Usa    |Romania            |1    | |United states of america    |Ireland            |264  | |United States    |India              |69   | |Egypt            |United States      |24   | |Republic of equatorial guinea|Us      |1    | |U.s.    |Singapore          |25   | |United States    |Grenada            |54   | |Costa Rica       |United States      |477  | |Senegal          |United states      |29   | |U.s.    |Marshall Islands   |44   | +-----------------+-------------------+-----+ but showing top 10 rows

Writing DataFrames to Parquet files

Writing or saving a DataFrame equally a table or file is a common functioning in Spark. To write a DataFrame yous simply apply the methods and arguments to the DataFrameWriter outlined earlier in this chapter, supplying the location to save the Parquet files to. For example:

                // In Scala                df                .                write                .                format                (                "parquet"                )                .                mode                (                "overwrite"                )                .                option                (                "compression"                ,                "snappy"                )                .                save                (                "/tmp/data/parquet/df_parquet"                )              
                # In Python                (                df                .                write                .                format                (                "parquet"                )                .                mode                (                "overwrite"                )                .                option                (                "compression"                ,                "snappy"                )                .                relieve                (                "/tmp/data/parquet/df_parquet"                ))              
Note

Call back that Parquet is the default file format. If y'all don't include the format() method, the DataFrame will nevertheless be saved as a Parquet file.

This will create a fix of meaty and compressed Parquet files at the specified path. Since we used snappy as our compression option here, we'll have snappy compressed files. For brevity, this instance generated merely i file; normally, at that place may be a dozen or so files created:

-rw-r--r--  1 jules  wheel    0 May nineteen 10:58 _SUCCESS -rw-r--r--  1 jules  cycle  966 May 19 x:58 function-00000-<...>-c000.snappy.parquet

Writing DataFrames to Spark SQL tables

Writing a DataFrame to a SQL table is as easy equally writing to a file—simply use saveAsTable() instead of save(). This will create a managed table called us_delay_flights_tbl:

                // In Scala                df                .                write                .                mode                (                "overwrite"                )                .                saveAsTable                (                "us_delay_flights_tbl"                )              
                # In Python                (                df                .                write                .                mode                (                "overwrite"                )                .                saveAsTable                (                "us_delay_flights_tbl"                ))              

To sum up, Parquet is the preferred and default built-in data source file format in Spark, and it has been adopted past many other frameworks. Nosotros recommend that you employ this format in your ETL and data ingestion processes.

JSON

JavaScript Object Notation (JSON) is also a popular information format. It came to prominence as an piece of cake-to-read and easy-to-parse format compared to XML. It has 2 representational formats: single-line mode and multiline mode. Both modes are supported in Spark.

In single-line style each line denotes a single JSON object, whereas in multiline fashion the entire multiline object constitutes a unmarried JSON object. To read in this style, gear up multiLine to true in the option() method.

Reading a JSON file into a DataFrame

Yous can read a JSON file into a DataFrame the same fashion you did with Parquet—just specify "json" in the format() method:

                // In Scala                val                file                =                "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"                val                df                =                spark                .                read                .                format                (                "json"                ).                load                (                file                )              
                # In Python                file                =                "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"                df                =                spark                .                read                .                format                (                "json"                )                .                load                (                file                )              

Reading a JSON file into a Spark SQL tabular array

You tin as well create a SQL table from a JSON file only like you did with Parquet:

                -- In SQL                                CREATE                                                OR                                                Supervene upon                                                TEMPORARY                                                VIEW                                                                  us_delay_flights_tbl                                                                USING                                                json                                                OPTIONS                                                (                                                path                                                "/databricks-datasets/learning-spark-v2/flights/summary-data/json/*"                                                )              

Once the tabular array is created, you can read data into a DataFrame using SQL:

                //                In                Scala                /                Python                spark                .                sql                (                "SELECT * FROM us_delay_flights_tbl"                )                .                show                ()                +-----------------+-------------------+-----+                |                DEST_COUNTRY_NAME                |                ORIGIN_COUNTRY_NAME                |                count                |                +-----------------+-------------------+-----+                |                United                States                |                Romania                |                15                |                |                United                States                |                Republic of croatia                |                i                |                |                United                States                |                Republic of ireland                |                344                |                |                Arab republic of egypt                |                United                States                |                15                |                |                United                States                |                India                |                62                |                |                United                States                |                Singapore                |                1                |                |                United                States                |                Grenada                |                62                |                |                Costa                Rica                |                United                States                |                588                |                |                Senegal                |                United                States                |                40                |                |                Moldova                |                United                States                |                1                |                +-----------------+-------------------+-----+                simply                showing                acme                10                rows              

Writing DataFrames to JSON files

Saving a DataFrame as a JSON file is simple. Specify the appropriate DataFrameWriter methods and arguments, and supply the location to relieve the JSON files to:

                // In Scala                df                .                write                .                format                (                "json"                )                .                way                (                "overwrite"                )                .                choice                (                "compression"                ,                "snappy"                )                .                save                (                "/tmp/data/json/df_json"                )              
                # In Python                (                df                .                write                .                format                (                "json"                )                .                mode                (                "overwrite"                )                .                option                (                "compression"                ,                "snappy"                )                .                salve                (                "/tmp/data/json/df_json"                ))              

This creates a directory at the specified path populated with a set of compact JSON files:

-rw-r--r--  1 jules  wheel   0 May 16 14:44 _SUCCESS -rw-r--r--  one jules  wheel  71 May 16 14:44 function-00000-<...>-c000.json

JSON information source options

Table four-3 describes common JSON options for DataFrameReader and DataFrameWriter. For a comprehensive list, nosotros refer you to the documentation.

Table 4-iii. JSON options for DataFrameReader and DataFrameWriter
Property proper noun Values Meaning Scope
compression none, uncompressed, bzip2, debunk, gzip, lz4, or snappy Utilize this compression codec for writing. Annotation that read volition but detect the compression or codec from the file extension. Write
dateFormat yyyy-MM-dd or DateTimeFormatter Use this format or any format from Java'south DateTimeFormatter. Read/write
multiLine truthful, false Utilise multiline manner. Default is imitation (single-line mode). Read
allowUnquotedFieldNames true, simulated Allow unquoted JSON field names. Default is fake. Read

CSV

As widely used as plain text files, this common text file format captures each datum or field delimited past a comma; each line with comma-separated fields represents a tape. Even though a comma is the default separator, yous may use other delimiters to separate fields in cases where commas are function of your data. Popular spreadsheets can generate CSV files, so it's a popular format amidst information and business organization analysts.

Reading a CSV file into a DataFrame

As with the other built-in information sources, you tin can use the DataFrameReader methods and arguments to read a CSV file into a DataFrame:

                // In Scala                val                file                =                "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"                val                schema                =                "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT"                val                df                =                spark                .                read                .                format                (                "csv"                )                .                schema                (                schema                )                .                option                (                "header"                ,                "true"                )                .                choice                (                "style"                ,                "FAILFAST"                )                // Get out if whatsoever errors                .                option                (                "nullValue"                ,                ""                )                // Replace any null data with quotes                .                load                (                file                )              
                # In Python                file                =                "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"                schema                =                "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT"                df                =                (                spark                .                read                .                format                (                "csv"                )                .                option                (                "header"                ,                "true"                )                .                schema                (                schema                )                .                option                (                "mode"                ,                "FAILFAST"                )                # Exit if any errors                .                option                (                "nullValue"                ,                ""                )                # Replace any null data field with quotes                .                load                (                file                ))              

Reading a CSV file into a Spark SQL table

Creating a SQL table from a CSV data source is no different from using Parquet or JSON:

                -- In SQL                CREATE                OR                Supplant                TEMPORARY                VIEW                us_delay_flights_tbl                USING                csv                OPTIONS                (                path                "/databricks-datasets/learning-spark-v2/flights/summary-data/csv/*"                ,                header                "truthful"                ,                inferSchema                "true"                ,                fashion                "FAILFAST"                )              

Once you've created the table, y'all tin can read data into a DataFrame using SQL every bit before:

                //                In                Scala                /                Python                spark                .                sql                (                "SELECT * FROM us_delay_flights_tbl"                )                .                prove                (                10                )                +-----------------+-------------------+-----+                |                DEST_COUNTRY_NAME                |                ORIGIN_COUNTRY_NAME                |                count                |                +-----------------+-------------------+-----+                |                United                States                |                Romania                |                1                |                |                United                States                |                Ireland                |                264                |                |                United                States                |                India                |                69                |                |                Egypt                |                United                States                |                24                |                |                Equatorial                Republic of guinea                |                United                States                |                i                |                |                United                States                |                Singapore                |                25                |                |                United                States                |                Grenada                |                54                |                |                Costa                Rica                |                United                States                |                477                |                |                Senegal                |                United                States                |                29                |                |                United                States                |                Marshall                Islands                |                44                |                +-----------------+-------------------+-----+                just                showing                tiptop                10                rows              

Writing DataFrames to CSV files

Saving a DataFrame as a CSV file is elementary. Specify the appropriate DataFrameWriter methods and arguments, and supply the location to save the CSV files to:

                // In Scala                df                .                write                .                format                (                "csv"                ).                way                (                "overwrite"                ).                save                (                "/tmp/data/csv/df_csv"                )              
                # In Python                df                .                write                .                format                (                "csv"                )                .                mode                (                "overwrite"                )                .                salvage                (                "/tmp/data/csv/df_csv"                )              

This generates a folder at the specified location, populated with a agglomeration of compressed and meaty files:

-rw-r--r--  1 jules  cycle   0 May xvi 12:17 _SUCCESS -rw-r--r--  1 jules  wheel  36 May sixteen 12:17 part-00000-251690eb-<...>-c000.csv

CSV data source options

Table four-four describes some of the common CSV options for DataFrameReader and DataFrameWriter. Because CSV files can exist complex, many options are available; for a comprehensive listing we refer y'all to the documentation.

Table four-four. CSV options for DataFrameReader and DataFrameWriter
Holding name Values Meaning Scope
compression none, bzip2, deflate, gzip, lz4, or snappy Utilise this compression codec for writing. Write
dateFormat yyyy-MM-dd or DateTimeFormatter Use this format or whatever format from Coffee's DateTimeFormatter. Read/write
multiLine truthful, false Use multiline mode. Default is simulated (single-line mode). Read
inferSchema truthful, false If true, Spark will make up one's mind the cavalcade data types. Default is imitation. Read
sep Any character Use this grapheme to divide column values in a row. Default delimiter is a comma (,). Read/write
escape Whatever grapheme Utilise this character to escape quotes. Default is \. Read/write
header true, imitation Indicates whether the commencement line is a header cogent each column name. Default is false. Read/write

Avro

Introduced in Spark 2.4 as a congenital-in data source, the Avro format is used, for instance, by Apache Kafka for message serializing and deserializing. It offers many benefits, including direct mapping to JSON, speed and efficiency, and bindings available for many programming languages.

Reading an Avro file into a DataFrame

Reading an Avro file into a DataFrame using DataFrameReader is consistent in usage with the other data sources we have discussed in this section:

                // In Scala                val                df                =                spark                .                read                .                format                (                "avro"                )                .                load                (                "/databricks-datasets/learning-spark-v2/flights/summary-information/avro/*"                )                df                .                evidence                (                false                )              
                # In Python                df                =                (                spark                .                read                .                format                (                "avro"                )                .                load                (                "/databricks-datasets/learning-spark-v2/flights/summary-data/avro/*"                ))                df                .                prove                (                truncate                =                False                )                +-----------------+-------------------+-----+                |                DEST_COUNTRY_NAME                |                ORIGIN_COUNTRY_NAME                |                count                |                +-----------------+-------------------+-----+                |                United                States                |                Romania                |                1                |                |                United                States                |                Ireland                |                264                |                |                United                States                |                Bharat                |                69                |                |                Arab republic of egypt                |                United                States                |                24                |                |                Equatorial                Republic of guinea                |                United                States                |                i                |                |                United                States                |                Singapore                |                25                |                |                United                States                |                Grenada                |                54                |                |                Costa                Rica                |                United                States                |                477                |                |                Senegal                |                United                States                |                29                |                |                United                States                |                Marshall                Islands                |                44                |                +-----------------+-------------------+-----+                just                showing                pinnacle                10                rows              

Reading an Avro file into a Spark SQL table

Again, creating SQL tables using an Avro data source is no different from using Parquet, JSON, or CSV:

                -- In SQL                                CREATE                OR                REPLACE                TEMPORARY                VIEW                episode_tbl                USING                avro                OPTIONS                (                path                "/databricks-datasets/learning-spark-v2/flights/summary-information/avro/*"                )              

Once you've created a table, you lot can read information into a DataFrame using SQL:

                // In Scala                spark                .                sql                (                "SELECT * FROM episode_tbl"                ).                show                (                false                )              
                # In Python                spark                .                sql                (                "SELECT * FROM episode_tbl"                )                .                testify                (                truncate                =                False                )                +-----------------+-------------------+-----+                |                DEST_COUNTRY_NAME                |                ORIGIN_COUNTRY_NAME                |                count                |                +-----------------+-------------------+-----+                |                United                States                |                Romania                |                1                |                |                United                States                |                Ireland                |                264                |                |                United                States                |                India                |                69                |                |                Egypt                |                United                States                |                24                |                |                Equatorial                Guinea                |                United                States                |                1                |                |                United                States                |                Singapore                |                25                |                |                United                States                |                Grenada                |                54                |                |                Costa                Rica                |                United                States                |                477                |                |                Senegal                |                United                States                |                29                |                |                United                States                |                Marshall                Islands                |                44                |                +-----------------+-------------------+-----+                merely                showing                top                10                rows              

Writing DataFrames to Avro files

Writing a DataFrame as an Avro file is simple. As usual, specify the appropriate DataFrameWriter methods and arguments, and supply the location to save the Avro files to:

                // In Scala                df                .                write                .                format                (                "avro"                )                .                mode                (                "overwrite"                )                .                salvage                (                "/tmp/information/avro/df_avro"                )              
                # In Python                (                df                .                write                .                format                (                "avro"                )                .                mode                (                "overwrite"                )                .                save                (                "/tmp/information/avro/df_avro"                ))              

This generates a folder at the specified location, populated with a bunch of compressed and compact files:

-rw-r--r--  1 jules  wheel    0 May 17 11:54 _SUCCESS -rw-r--r--  ane jules  wheel  526 May 17 11:54 part-00000-ffdf70f4-<...>-c000.avro

Avro information source options

Table four-five describes common options for DataFrameReader and DataFrameWriter. A comprehensive list of options is in the documentation.

Table 4-5. Avro options for DataFrameReader and DataFrameWriter
Property name Default value Pregnant Scope
avroSchema None Optional Avro schema provided past a user in JSON format. The data type and naming of record fields should match the input Avro data or Catalyst information (Spark internal data type), otherwise the read/write action will neglect. Read/write
recordName topLevelRecord Top-level record name in write issue, which is required in the Avro spec. Write
recordNamespace "" Tape namespace in write result. Write
ignoreExtension true If this option is enabled, all files (with and without the .avro extension) are loaded. Otherwise, files without the .avro extension are ignored. Read
pinch snappy Allows you lot to specify the compression codec to use in writing. Currently supported codecs are uncompressed, snappy, deflate, bzip2, and xz.
If this option is not set, the value in spark.sql.avro.compression.codec is taken into account.
Write

ORC

As an additional optimized columnar file format, Spark 2.ten supports a vectorized ORC reader. Two Spark configurations dictate which ORC implementation to use. When spark.sql.orc.impl is set to native and spark.sql.orc.enableVectorizedReader is fix to truthful, Spark uses the vectorized ORC reader. A vectorized reader reads blocks of rows (often 1,024 per cake) instead of one row at a time, streamlining operations and reducing CPU usage for intensive operations like scans, filters, aggregations, and joins.

For Hive ORC SerDe (serialization and deserialization) tables created with the SQL command USING HIVE OPTIONS (fileFormat 'ORC'), the vectorized reader is used when the Spark configuration parameter spark.sql.hive.convertMetastoreOrc is gear up to truthful.

Reading an ORC file into a DataFrame

To read in a DataFrame using the ORC vectorized reader, you can simply use the normal DataFrameReader methods and options:

                // In Scala                                val                file                =                "/databricks-datasets/learning-spark-v2/flights/summary-information/orc/*"                val                df                =                spark                .                read                .                format                (                "orc"                ).                load                (                file                )                df                .                show                (                10                ,                false                )              
                # In Python                file                =                "/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"                df                =                spark                .                read                .                format                (                "orc"                )                .                option                (                "path"                ,                file                )                .                load                ()                df                .                show                (                10                ,                Simulated                )                +-----------------+-------------------+-----+                |                DEST_COUNTRY_NAME                |                ORIGIN_COUNTRY_NAME                |                count                |                +-----------------+-------------------+-----+                |                United                States                |                Romania                |                one                |                |                United                States                |                Ireland                |                264                |                |                United                States                |                India                |                69                |                |                Egypt                |                United                States                |                24                |                |                Equatorial                Guinea                |                United                States                |                1                |                |                United                States                |                Singapore                |                25                |                |                United                States                |                Grenada                |                54                |                |                Costa                Rica                |                United                States                |                477                |                |                Senegal                |                United                States                |                29                |                |                United                States                |                Marshall                Islands                |                44                |                +-----------------+-------------------+-----+                simply                showing                elevation                x                rows              

Reading an ORC file into a Spark SQL table

There is no difference from Parquet, JSON, CSV, or Avro when creating a SQL view using an ORC data source:

                -- In SQL                CREATE                OR                Supercede                TEMPORARY                VIEW                us_delay_flights_tbl                USING                orc                OPTIONS                (                path                "/databricks-datasets/learning-spark-v2/flights/summary-data/orc/*"                )              

Once a table is created, you can read data into a DataFrame using SQL as usual:

                //                In                Scala                /                Python                spark                .                sql                (                "SELECT * FROM us_delay_flights_tbl"                )                .                testify                ()                +-----------------+-------------------+-----+                |                DEST_COUNTRY_NAME                |                ORIGIN_COUNTRY_NAME                |                count                |                +-----------------+-------------------+-----+                |                United                States                |                Romania                |                i                |                |                United                States                |                Republic of ireland                |                264                |                |                United                States                |                Bharat                |                69                |                |                Egypt                |                United                States                |                24                |                |                Equatorial                Republic of guinea                |                United                States                |                1                |                |                United                States                |                Singapore                |                25                |                |                United                States                |                Grenada                |                54                |                |                Costa                Rica                |                United                States                |                477                |                |                Senegal                |                United                States                |                29                |                |                United                States                |                Marshall                Islands                |                44                |                +-----------------+-------------------+-----+                only                showing                top                10                rows              

Writing DataFrames to ORC files

Writing back a transformed DataFrame after reading is equally simple using the DataFrameWriter methods:

                // In Scala                df                .                write                .                format                (                "orc"                )                .                style                (                "overwrite"                )                .                pick                (                "compression"                ,                "snappy"                )                .                save                (                "/tmp/data/orc/df_orc"                )              
                # In Python                (                df                .                write                .                format                (                "orc"                )                .                style                (                "overwrite"                )                .                pick                (                "compression"                ,                "snappy"                )                .                save                (                "/tmp/data/orc/flights_orc"                ))              

The result will be a folder at the specified location containing some compressed ORC files:

-rw-r--r--  1 jules  bike    0 May sixteen 17:23 _SUCCESS -rw-r--r--  1 jules  bicycle  547 May xvi 17:23 part-00000-<...>-c000.snappy.orc

Images

In Spark 2.4 the community introduced a new data source, prototype files, to support deep learning and automobile learning frameworks such as TensorFlow and PyTorch. For computer vision–based machine learning applications, loading and processing image data sets is important.

Reading an image file into a DataFrame

As with all of the previous file formats, you can use the DataFrameReader methods and options to read in an image file as shown here:

                // In Scala                import                org.apache.spark.ml.source.image                val                imageDir                =                "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"                val                imagesDF                =                spark                .                read                .                format                (                "image"                ).                load                (                imageDir                )                imagesDF                .                printSchema                imagesDF                .                select                (                "image.superlative"                ,                "image.width"                ,                "image.nChannels"                ,                "image.mode"                ,                "label"                ).                evidence                (                5                ,                false                )              
                # In Python                from                pyspark.ml                import                image                image_dir                =                "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"                images_df                =                spark                .                read                .                format                (                "image"                )                .                load                (                image_dir                )                images_df                .                printSchema                ()                root                |--                image                :                struct                (                nullable                =                true                )                |                |--                origin                :                string                (                nullable                =                truthful                )                |                |--                peak                :                integer                (                nullable                =                truthful                )                |                |--                width                :                integer                (                nullable                =                truthful                )                |                |--                nChannels                :                integer                (                nullable                =                truthful                )                |                |--                manner                :                integer                (                nullable                =                truthful                )                |                |--                data                :                binary                (                nullable                =                true                )                |--                characterization                :                integer                (                nullable                =                true                )                images_df                .                select                (                "epitome.height"                ,                "image.width"                ,                "image.nChannels"                ,                "image.mode"                ,                "label"                )                .                show                (                5                ,                truncate                =                False                )                +------+-----+---------+----+-----+                |                pinnacle                |                width                |                nChannels                |                mode                |                label                |                +------+-----+---------+----+-----+                |                288                |                384                |                3                |                sixteen                |                0                |                |                288                |                384                |                iii                |                xvi                |                ane                |                |                288                |                384                |                3                |                16                |                0                |                |                288                |                384                |                3                |                16                |                0                |                |                288                |                384                |                3                |                16                |                0                |                +------+-----+---------+----+-----+                merely                showing                tiptop                5                rows              

Binary Files

Spark 3.0 adds back up for binary files equally a information source. The DataFrameReader converts each binary file into a single DataFrame row (record) that contains the raw content and metadata of the file. The binary file data source produces a DataFrame with the following columns:

  • path: StringType

  • modificationTime: TimestampType

  • length: LongType

  • content: BinaryType

Reading a binary file into a DataFrame

To read binary files, specify the data source format as a binaryFile. Y'all can load files with paths matching a given global pattern while preserving the beliefs of partition discovery with the data source option pathGlobFilter. For example, the following lawmaking reads all JPG files from the input directory with any partitioned directories:

                // In Scala                val                path                =                "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"                val                binaryFilesDF                =                spark                .                read                .                format                (                "binaryFile"                )                .                option                (                "pathGlobFilter"                ,                "*.jpg"                )                .                load                (                path                )                binaryFilesDF                .                show                (                5                )              
                # In Python                path                =                "/databricks-datasets/learning-spark-v2/cctvVideos/train_images/"                binary_files_df                =                (                spark                .                read                .                format                (                "binaryFile"                )                .                option                (                "pathGlobFilter"                ,                "*.jpg"                )                .                load                (                path                ))                binary_files_df                .                show                (                5                )                +--------------------+-------------------+------+--------------------+-----+                |                path                |                modificationTime                |                length                |                content                |                label                |                +--------------------+-------------------+------+--------------------+-----+                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                55037                |                [                FF                D8                FF                E0                00                one.                ..|                0                |                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54634                |                [                FF                D8                FF                E0                00                1.                ..|                i                |                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54624                |                [                FF                D8                FF                E0                00                ane.                ..|                0                |                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54505                |                [                FF                D8                FF                E0                00                1.                ..|                0                |                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54475                |                [                FF                D8                FF                E0                00                1.                ..|                0                |                +--------------------+-------------------+------+--------------------+-----+                just                showing                elevation                five                rows              

To ignore partitioning information discovery in a directory, you can set recursiveFileLookup to "true":

                // In Scala                val                binaryFilesDF                =                spark                .                read                .                format                (                "binaryFile"                )                .                option                (                "pathGlobFilter"                ,                "*.jpg"                )                .                option                (                "recursiveFileLookup"                ,                "true"                )                .                load                (                path                )                binaryFilesDF                .                show                (                5                )              
                # In Python                binary_files_df                =                (                spark                .                read                .                format                (                "binaryFile"                )                .                selection                (                "pathGlobFilter"                ,                "*.jpg"                )                .                option                (                "recursiveFileLookup"                ,                "true"                )                .                load                (                path                ))                binary_files_df                .                show                (                five                )                +--------------------+-------------------+------+--------------------+                |                path                |                modificationTime                |                length                |                content                |                +--------------------+-------------------+------+--------------------+                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                55037                |                [                FF                D8                FF                E0                00                one.                ..|                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54634                |                [                FF                D8                FF                E0                00                one.                ..|                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54624                |                [                FF                D8                FF                E0                00                ane.                ..|                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54505                |                [                FF                D8                FF                E0                00                i.                ..|                |                file                :                /                Users                /                jules                ...|                2020                -                02                -                12                12                :                04                :                24                |                54475                |                [                FF                D8                FF                E0                00                one.                ..|                +--------------------+-------------------+------+--------------------+                only                showing                top                v                rows              

Annotation that the label cavalcade is absent-minded when the recursiveFileLookup choice is gear up to "truthful".

Currently, the binary file data source does not support writing a DataFrame dorsum to the original file format.

In this section, you lot got a tour of how to read data into a DataFrame from a range of supported file formats. We also showed you how to create temporary views and tables from the existing congenital-in data sources. Whether y'all're using the DataFrame API or SQL, the queries produce identical outcomes. Yous can examine some of these queries in the notebook available in the GitHub repo for this book.

Summary

To recap, this chapter explored the interoperability between the DataFrame API and Spark SQL. In item, you got a season of how to apply Spark SQL to:

  • Create managed and unmanaged tables using Spark SQL and the DataFrame API.

  • Read from and write to diverse built-in data sources and file formats.

  • Employ the spark.sql programmatic interface to issue SQL queries on structured information stored as Spark SQL tables or views.

  • Peruse the Spark Catalog to inspect metadata associated with tables and views.

  • Use the DataFrameWriter and DataFrameReader APIs.

Through the lawmaking snippets in the chapter and the notebooks available in the book'southward GitHub repo, you got a feel for how to use DataFrames and Spark SQL. Continuing in this vein, the next chapter further explores how Spark interacts with the external data sources shown in Figure 4-1. You'll see some more in-depth examples of transformations and the interoperability between the DataFrame API and Spark SQL.

iredalelogy1996.blogspot.com

Source: https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/ch04.html

0 Response to "How to Read a Csv Table to Hive From Spark Sql Scala"

Post a Comment

Iklan Atas Artikel

Iklan Tengah Artikel 1

Iklan Tengah Artikel 2

Iklan Bawah Artikel