MySQL Spark SQL 和 MySQL- SaveMode.Overwrite 不会插入修改后的数据
在本文中,我们将介绍使用MySQL Spark SQL时,遇到的一个问题,即使用SaveMode.Overwrite模式时,MySQL不会插入修改后的数据。
阅读更多:MySQL 教程
问题描述
在使用Spark SQL的过程中,我们可能会遇到需要将数据存储到MySQL数据库中的情况。在使用SaveMode.Overwrite模式时,我们期望原有的数据会被修改后的数据覆盖。然而,有时我们会发现MySQL没有将修改后的数据插入到数据库中,而是保留了原有的数据。
问题复现
为了复现这个问题,我们可以使用以下代码:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MySQL SaveMode.Overwrite") \
.config("spark.jars", "/path/to/mysql-connector-java.jar") \
.getOrCreate()
data = [("John", 25), ("Alice", 30), ("Bob", 35)]
df = spark.createDataFrame(data, ["name", "age"])
df.write \
.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/mydatabase") \
.option("driver", "com.mysql.jdbc.Driver") \
.option("dbtable", "mytable") \
.option("user", "myuser") \
.option("password", "mypassword") \
.mode("overwrite") \
.save()
假设我们已经创建了一个名为mydatabase的数据库,并在其中创建了一个名为mytable的表格。我们执行上述代码后,期望的结果是mytable中只包含数据[(“John”, 25), (“Alice”, 30), (“Bob”, 35)],但实际上会发现mytable中依然包含之前的数据。
问题分析
这个问题的根本原因在于MySQL的机制。对于SaveMode.Overwrite模式,Spark SQL会将数据以临时表的形式存储在MySQL中,然后使用DELETE和INSERT语句分别删除原有的数据和插入新的数据。然而,由于MySQL在执行DELETE语句时的默认隔离级别(REPEATABLE READ)下,对应的INSERT语句看到的是一个已经被删除的数据集。
举个例子来说明这个问题。假设我们的表格包含如下数据:
name | age |
---|---|
John | 20 |
Alice | 30 |
Bob | 35 |
当Spark SQL将数据以临时表的形式存储在MySQL中时,表格的状态如下:
name | age |
---|---|
John | 25 |
Alice | 30 |
Bob | 35 |
接着,Spark SQL执行DELETE语句删除原有的数据:
name | age |
---|---|
Alice | 30 |
Bob | 35 |
随后,INSERT语句执行插入新的数据:
name | age |
---|---|
Alice | 30 |
Bob | 35 |
John | 25 |
由于INSERT语句看到的是一个已经被删除的数据集,所以最终的结果并不是我们期望的。
解决方案
要解决这个问题,我们可以通过设置MySQL的隔离级别来避免。使用如下代码,我们可以将隔离级别设置为READ COMMITTED:
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
spark.conf.set("spark.sql.hive.convertMetastoreOrc", "true")
data = [("John", 25), ("Alice", 30), ("Bob", 35)]
df = spark.createDataFrame(data, ["name", "age"])
df.write \
.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/mydatabase") \
.option("driver", "com.mysql.jdbc.Driver") \
.option("dbtable", "mytable") \
.option("user", "myuser") \
.option("password", "mypassword") \
.option("rewriteBatchedStatements", "true") \
.option("isolationLevel", "READ_COMMITTED") \
.mode("overwrite") \
.save()
通过设置isolationLevel
为READ_COMMITTED
,我们告诉MySQL在DELETE和INSERT语句的执行过程中使用读取提交的隔离级别。
总结
在本文中,我们介绍了在使用MySQL Spark SQL时,使用SaveMode.Overwrite模式时不会插入修改后的数据的问题。我们解释了这个问题的原因,即由于MySQL的隔离级别,默认的删除和插入操作会导致插入的数据与预期不符。为了解决这个问题,我们需要显式地设置MySQL的隔离级别为READ COMMITTED。通过这种方式,我们可以确保DELETE和INSERT操作的正确执行,从而将修改后的数据按预期插入到MySQL数据库中。
希望本文对于使用MySQL Spark SQL的读者们有所帮助,能够更好地理解和解决类似的问题。如果在实际应用中遇到类似的难题,记得参考本文所提供的解决方案。祝您编程愉快!