原创

Spark连接MySQL、Hive以及使用JSON充当数据源

温馨提示:
本文最后更新于 2018年04月04日,已超过 2,424 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

一、Spark连接Hive

1.首先

噢对了,还有导入的数据:

1,小明,23,15082754112
2,李四,46,15182754132
3,老五,19,12382754166
4,小花,56,12082752166

2.导包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>root</artifactId>
        <groupId>com.lzhpo.bigdata</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spark-04-hive</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
    </dependencies>

</project>

3.java代码

package com.lzhpo.spark;

import org.apache.spark.sql.SparkSession;

/**
 * <p>Create By IntelliJ IDEA</p>
 * <p>Author:lzhpo</p>
 */
public class Hive {
    public static void main(String[] args) {
        SparkSession session = SparkSession
                .builder()
                .appName("Hive")
                .master("local")
                .enableHiveSupport()//开启hive支持
                .config("spark.sql.warehouse.dir", "/user/hive/warehouse")//"/user/hive/warehouse"的路径
                .getOrCreate();

        /**
         * 创建表
         */
        session.sql("" +
                "create table db_spark.tb_user" +
                "(" +
                "id int," +
                "name string," +
                "age int," +
                "iphone string" +
                ")" +
                "row format delimited fields terminated by ','" +
                "lines terminated by '\\n'"
        );
        //查询是否创建成功
        session.sql("select * from db_spark.tb_user").show();


        /**
         * 加载数据到创建的表中
         *      -load
         */
        //load加载数据到数据表中
        session.sql("load data inpath '/file/tb_user.txt' into table db_spark.tb_user");
        session.sql("select * from db_spark.tb_user");

        //再次查询数据
        session.sql("select * from db_spark.tb_user").show();


        /**
         *  统计
         *
         */
        session.sql("select count(*) count_num from db_spark.tb_user").show();


        /**
         *  模糊查询
         */
        session.sql("select * from db_spark.tb_user where name like '%小%'").show();



    }
}

4.结果

看我SQL语句,猜也猜得出来吧?

二、Spark连接MySQL

1.导包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>root</artifactId>
        <groupId>com.lzhpo.bigdata</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spark-03-mysql</artifactId>


    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>2.3.2</version>
            <scope>runtime</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.3.2</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
</project>

2.java代码

package com.lzhpo.spark;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.Properties;

/**
 * <p>Create By IntelliJ IDEA</p>
 * <p>Author:lzhpo</p>
 */
public class MySQL {
    public static void main(String[] args) {
        SparkSession session = SparkSession
                .builder()
                .master("local")
                .appName("MySQL")
                .getOrCreate();

        /**
         * 数据库连接信息
         */
        Dataset<Row> df = session
            .read()
            .format("jdbc")
            .option("url", "jdbc:mysql://192.168.200.111:3306/bigdata?useUnicode=true&characterEncoding=utf8&useSSL=false")//url连接信息
            .option("dbtable", "user")//使用的数据表
            .option("user", "root")//数据库用户名
            .option("password", "123456")//数据库密码
            .option("driver", "com.mysql.jdbc.Driver")//自己加上的
            .load();

        /**
         * 展示全部数据
         */
        df.show();//默认仅仅展示前20条数据:only showing top 20 rows
//        +---+----+---+------+
//        | id|name|age|iphone|
//        +---+----+---+------+
//        |  1|  小明| 16|   111|
//        |  2|  张三| 23|   222|
//        |  3|  小花| 21|   333|
//        |  4|  李思| 24|   444|
//        |  5|  张思| 24|   555|
//        |  6|  张思| 24|   555|
//        +---+----+---+------+


        /**
         * 查看表的DataFrame
         *          注意:它会自动打印表结构图
         */
        df.printSchema();
//        root
//            |-- id: integer (nullable = true)
//            |-- name: string (nullable = true)
//            |-- age: integer (nullable = true)
//            |-- iphone: string (nullable = true)

        /**
         * 查询全部数据
         */
        Dataset<Row> AllMessage = df.select(new Column("*"));
        AllMessage.show();
//        +---+----+---+------+
//        | id|name|age|iphone|
//        +---+----+---+------+
//        |  1|  小明| 16|   111|
//        |  2|  张三| 23|   222|
//        |  3|  小花| 21|   333|
//        |  4|  李思| 24|   444|
//        |  5|  张思| 24|   555|
//        |  6|  张思| 24|   555|
//        +---+----+---+------+

        /**
         * 模糊查询:手机号最后面一个数字是5的
         */
        Dataset<Row> like = df.where("iphone like '5%'");
        like.show();
//        +---+----+---+------+
//        | id|name|age|iphone|
//        +---+----+---+------+
//        |  5|  张思| 24|   555|
//        |  6|  张思| 24|   555|
//        +---+----+---+------+

        /**
         * 去重
         *      注意,这里是将name重复的字段去掉,然后再放进tb_write表中
         */
        Dataset<Row> name = df.select(new Column("name"));
        Dataset<Row> distinct = name.distinct();
        distinct.show();
//去重之后的结果
//        +----+
//        |name|
//        +----+
//        |  小明|
//        |  小花|
//        |  李思|
//        |  张三|
//        |  张思|
//        +----+

        /**
         * 将去重之后的结果写入其它表
         *
         *      注意:这里是将重复name的字段写入tb_write表中,如果没有tb_write表,会自动创建
         */
        Properties prop = new Properties();
        prop.put("user","root");
        prop.put("password","123456");
        prop.put("driver","com.mysql.jdbc.Driver");
        distinct.write().jdbc("jdbc:mysql://192.168.200.111:3306/bigdata?useUnicode=true&characterEncoding=utf8&useSSL=false","tb_write",prop);
        distinct.show();
//去重之后的结果
//        +----+
//        |name|
//        +----+
//        |  小明|
//        |  小花|
//        |  李思|
//        |  张三|
//        |  张思|
//        +----+

        /**
         * 统计年龄
         */
        Dataset<Row> countsByAge = df.groupBy("age").count();
        countsByAge.show();
//        +---+-----+
//        |age|count|
//        +---+-----+
//        | 16|    1|
//        | 23|    1|
//        | 24|    3|
//        | 21|    1|
//        +---+-----+


        /**
         * 将countsByAge保存为JSON格式的S3
         */
        countsByAge.write().format("json").save("file:///E:/Code/LearningBigData/spark-03-mysql/src/File/tojson");
    }
}

3.运行结果

三、JSON充当数据源

1.JSON数据

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

2.导包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>root</artifactId>
        <groupId>com.lzhpo.bigdata</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spark-02-json</artifactId>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
    </dependencies>

</project>

3.java代码

package com.lzhpo.spark;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.List;

/**
 * <p>Create By IntelliJ IDEA</p>
 * <p>Author:lzhpo</p>
 */
public class ReadJson {
    public static void main(String[] args){

        SparkSession spark = SparkSession
                .builder()
                .appName("ReadJson")
                .master("local")
                .getOrCreate();

        /**
         * 读取JSON文件目录
         */
        //路径可以是单个文本文件,也可以是存储文本文件的目录。
        Dataset<Row> people = spark.read().json("file:///E:/Code/LearningBigData/spark-02-json/src/File/people.json");

        /**
         * 可以使用printSchema()方法可视化推断的模式
         */
        people.printSchema();
        // root
        //  |-- age: long (nullable = true)
        //  |-- name: string (nullable = true)

        /**
         * 创建视图
         */
        people.createOrReplaceTempView("people");

        /**
         * 查询年龄在13~19的人并打印名字
         */
        Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
        namesDF.show();
        // +------+
        // |  name|
        // +------+
        // |Justin|
        // +------+

        /**
         * 可以为由每个字符串存储一个JSON对象的Dataset<String>表示的JSON数据集创建DataFrame。
         */
        List<String> jsonData = Arrays.asList(
                "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
        Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
        Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
        anotherPeople.show();
        // +---------------+----+
        // |        address|name|
        // +---------------+----+
        // |[Columbus,Ohio]| Yin|
        // +---------------+----+
    }
}

4.运行结果

看java代码!!!

本文目录