原创
Spark连接MySQL、Hive以及使用JSON充当数据源
温馨提示:
本文最后更新于 2018年04月04日,已超过 2,424 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我。
一、Spark连接Hive
1.首先
噢对了,还有导入的数据:
1,小明,23,15082754112
2,李四,46,15182754132
3,老五,19,12382754166
4,小花,56,12082752166
2.导包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>root</artifactId>
<groupId>com.lzhpo.bigdata</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spark-04-hive</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
</dependencies>
</project>
3.java代码
package com.lzhpo.spark;
import org.apache.spark.sql.SparkSession;
/**
* <p>Create By IntelliJ IDEA</p>
* <p>Author:lzhpo</p>
*/
public class Hive {
public static void main(String[] args) {
SparkSession session = SparkSession
.builder()
.appName("Hive")
.master("local")
.enableHiveSupport()//开启hive支持
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")//"/user/hive/warehouse"的路径
.getOrCreate();
/**
* 创建表
*/
session.sql("" +
"create table db_spark.tb_user" +
"(" +
"id int," +
"name string," +
"age int," +
"iphone string" +
")" +
"row format delimited fields terminated by ','" +
"lines terminated by '\\n'"
);
//查询是否创建成功
session.sql("select * from db_spark.tb_user").show();
/**
* 加载数据到创建的表中
* -load
*/
//load加载数据到数据表中
session.sql("load data inpath '/file/tb_user.txt' into table db_spark.tb_user");
session.sql("select * from db_spark.tb_user");
//再次查询数据
session.sql("select * from db_spark.tb_user").show();
/**
* 统计
*
*/
session.sql("select count(*) count_num from db_spark.tb_user").show();
/**
* 模糊查询
*/
session.sql("select * from db_spark.tb_user where name like '%小%'").show();
}
}
4.结果
看我SQL语句,猜也猜得出来吧?
二、Spark连接MySQL
1.导包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>root</artifactId>
<groupId>com.lzhpo.bigdata</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spark-03-mysql</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.3.2</version>
<scope>runtime</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.3.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
2.java代码
package com.lzhpo.spark;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.Properties;
/**
* <p>Create By IntelliJ IDEA</p>
* <p>Author:lzhpo</p>
*/
public class MySQL {
public static void main(String[] args) {
SparkSession session = SparkSession
.builder()
.master("local")
.appName("MySQL")
.getOrCreate();
/**
* 数据库连接信息
*/
Dataset<Row> df = session
.read()
.format("jdbc")
.option("url", "jdbc:mysql://192.168.200.111:3306/bigdata?useUnicode=true&characterEncoding=utf8&useSSL=false")//url连接信息
.option("dbtable", "user")//使用的数据表
.option("user", "root")//数据库用户名
.option("password", "123456")//数据库密码
.option("driver", "com.mysql.jdbc.Driver")//自己加上的
.load();
/**
* 展示全部数据
*/
df.show();//默认仅仅展示前20条数据:only showing top 20 rows
// +---+----+---+------+
// | id|name|age|iphone|
// +---+----+---+------+
// | 1| 小明| 16| 111|
// | 2| 张三| 23| 222|
// | 3| 小花| 21| 333|
// | 4| 李思| 24| 444|
// | 5| 张思| 24| 555|
// | 6| 张思| 24| 555|
// +---+----+---+------+
/**
* 查看表的DataFrame
* 注意:它会自动打印表结构图
*/
df.printSchema();
// root
// |-- id: integer (nullable = true)
// |-- name: string (nullable = true)
// |-- age: integer (nullable = true)
// |-- iphone: string (nullable = true)
/**
* 查询全部数据
*/
Dataset<Row> AllMessage = df.select(new Column("*"));
AllMessage.show();
// +---+----+---+------+
// | id|name|age|iphone|
// +---+----+---+------+
// | 1| 小明| 16| 111|
// | 2| 张三| 23| 222|
// | 3| 小花| 21| 333|
// | 4| 李思| 24| 444|
// | 5| 张思| 24| 555|
// | 6| 张思| 24| 555|
// +---+----+---+------+
/**
* 模糊查询:手机号最后面一个数字是5的
*/
Dataset<Row> like = df.where("iphone like '5%'");
like.show();
// +---+----+---+------+
// | id|name|age|iphone|
// +---+----+---+------+
// | 5| 张思| 24| 555|
// | 6| 张思| 24| 555|
// +---+----+---+------+
/**
* 去重
* 注意,这里是将name重复的字段去掉,然后再放进tb_write表中
*/
Dataset<Row> name = df.select(new Column("name"));
Dataset<Row> distinct = name.distinct();
distinct.show();
//去重之后的结果
// +----+
// |name|
// +----+
// | 小明|
// | 小花|
// | 李思|
// | 张三|
// | 张思|
// +----+
/**
* 将去重之后的结果写入其它表
*
* 注意:这里是将重复name的字段写入tb_write表中,如果没有tb_write表,会自动创建
*/
Properties prop = new Properties();
prop.put("user","root");
prop.put("password","123456");
prop.put("driver","com.mysql.jdbc.Driver");
distinct.write().jdbc("jdbc:mysql://192.168.200.111:3306/bigdata?useUnicode=true&characterEncoding=utf8&useSSL=false","tb_write",prop);
distinct.show();
//去重之后的结果
// +----+
// |name|
// +----+
// | 小明|
// | 小花|
// | 李思|
// | 张三|
// | 张思|
// +----+
/**
* 统计年龄
*/
Dataset<Row> countsByAge = df.groupBy("age").count();
countsByAge.show();
// +---+-----+
// |age|count|
// +---+-----+
// | 16| 1|
// | 23| 1|
// | 24| 3|
// | 21| 1|
// +---+-----+
/**
* 将countsByAge保存为JSON格式的S3
*/
countsByAge.write().format("json").save("file:///E:/Code/LearningBigData/spark-03-mysql/src/File/tojson");
}
}
3.运行结果
三、JSON充当数据源
1.JSON数据
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
2.导包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>root</artifactId>
<groupId>com.lzhpo.bigdata</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spark-02-json</artifactId>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.2</version>
</dependency>
</dependencies>
</project>
3.java代码
package com.lzhpo.spark;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.List;
/**
* <p>Create By IntelliJ IDEA</p>
* <p>Author:lzhpo</p>
*/
public class ReadJson {
public static void main(String[] args){
SparkSession spark = SparkSession
.builder()
.appName("ReadJson")
.master("local")
.getOrCreate();
/**
* 读取JSON文件目录
*/
//路径可以是单个文本文件,也可以是存储文本文件的目录。
Dataset<Row> people = spark.read().json("file:///E:/Code/LearningBigData/spark-02-json/src/File/people.json");
/**
* 可以使用printSchema()方法可视化推断的模式
*/
people.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
/**
* 创建视图
*/
people.createOrReplaceTempView("people");
/**
* 查询年龄在13~19的人并打印名字
*/
Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
namesDF.show();
// +------+
// | name|
// +------+
// |Justin|
// +------+
/**
* 可以为由每个字符串存储一个JSON对象的Dataset<String>表示的JSON数据集创建DataFrame。
*/
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
anotherPeople.show();
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
}
}
4.运行结果
看java代码!!!
- 本文标签: Spark
- 本文链接: http://www.lzhpo.com/article/50
- 版权声明: 本文由lzhpo原创发布,转载请遵循《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权