当要将dataframe进行序列化(df.show()或者df.collect())时,报这个错误。
原因是:maven的pom.xml中含有spark 和 kafka。
1.spark2.3用到了lz4-1.3.0.jar,kafka0.9.0.1用到了lz4-1.2.0.jar,而程序运行时使用的是lz4-1.3.0.jar。
2.lz4-1.3.0.jar包中net.jpountz.util.Utils 类中没有checkRange,该方法位于net.jpountz.util.SafeUtils和net.jpountz.util.UnsafeUtils
原文:https://blog.csdn.net/m0_37914799/article/details/84992275
可以通过:mvn dependency:tree|less查看冲突的包。
解决办法:
通过修改pom.xml
中·kafka-client·依赖,exclude掉lz4的依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> <groupId>net.jpountz.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> </exclusions> </dependency>
原因:这个错误是Hadoop版本的fasterxml版本较低,spark的较高,
解决方法:在maven的pom.xml文件中,记住一定要在</dependencyManagement> </dependencyManagement>中加入以下库。
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jaxb-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.17.Final</version> </dependency>
IDEA中JDK版本和Scala不匹配,我的JDK默认为IDEA自带的JDK12,所以我换成了JDK1.8,就可以了
还是版本问题:原来我用的是2.2.1,然后写Scalatest时会报这个错误
import com.google.common.io.Files import java.io.File import org.apache.commons.io.FileUtils import org.apache.spark.sql.Row import org.apache.spark.sql.test.SharedSparkSession import org.scalatest.FunSuite class UrlDmpDataProcessJobTest extends FunSuite with SharedSparkSession { val testDir = new File(".").getCanonicalPath + "/test/url" var output: File = _ override def beforeAll() { super.beforeAll() output = Files.createTempDir() } override def afterAll() { super.afterAll() FileUtils.deleteDirectory(output) } test("json parse") { import testImplicits._ val jsonFile = testDir + "/dmp-json-data.txt" val jsonData = spark.read.json(jsonFile).toDF() val jsonDf = jsonData.as[UrlLoadData].filter(_ != null) jsonDf.show() val outputDf = UrlDmpDataParse.getJsonParseData(spark, jsonDf) outputDf.show() val urls = outputDf.select("urls").collect() val pid = outputDf.select("pid").collect() val uid = outputDf.select("uid").collect() assert(pid === Array(Row("631"),Row("631"))) assert(uid === Array(Row("1bd55eb51c80733a994d853b98f46ce1"),Row("f34c9dc9f0b53cdd0972d50445759d45"))) } }
解决办法:
<dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.binary.version}</artifactId> <version>3.0.8</version> <scope>test</scope> </dependency>
原文:https://www.cnblogs.com/Lee-yl/p/11191378.html