Как работает функция Distinct () в Spark?

19

Я новичок в Apache Spark и изучал основные функции. Было небольшое сомнение. Предположим, у меня есть RDD кортежей (ключ, значение) и вы хотите получить из них некоторые уникальные. Я использую функцию distinct (). Мне интересно, на каком основании функция считает, что кортежи как разрозненные.? Он основан на ключах или значениях или обоих?

    
задан preetham madeti 21.06.2015 в 01:47
источник

5 ответов

22

.distinct (), безусловно, делает перетасовку между разделами. Чтобы узнать больше о том, что происходит, запустите .toDebugString на вашем RDD.

val hashPart = new HashPartitioner(<number of partitions>)

val myRDDPreStep = <load some RDD>

val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER)
myRDD.checkpoint
println(myRDD.toDebugString)

, который для примера RDD у меня (myRDDPreStep уже hash-partitioned ключом, сохраняемый StorageLevel.MEMORY_AND_DISK_SER и контрольным), возвращает:

(2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
    |    ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
    +-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
        |    myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated]
        |        CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B
        |    myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated]

Обратите внимание, что могут быть более эффективные способы получения отдельных элементов, которые включают в себя меньшее количество тасований, ОСОБЕННО, если ваш RDD уже разбит на разделы интеллектуальным способом, а разделы не слишком перекошены.

См. Есть ли способ переписать Spark RDD, отличный от использования mapPartitions вместо отдельных? а также Apache Spark: какова эквивалентная реализация RDD.groupByKey () с использованием RDD.aggregateByKey ()?

    
ответ дан Glenn Strycker 30.06.2015 в 01:23
источник
8

Документы API для RDD.distinct () предоставляют только одно предложение:

  

"Возвращает новый RDD, содержащий отдельные элементы в этом RDD.

Из недавнего опыта я могу сказать вам, что в кортеже-RDD рассматривается кортеж в целом.

Если вам нужны разные ключи или разные значения, то в зависимости от того, что вы хотите выполнить, вы можете:

а. вызовите groupByKey() , чтобы преобразовать {(k1,v11),(k1,v12),(k2,v21),(k2,v22)} в {(k1,[v11,v12]), (k2,[v21,v22])} ; или

В. вычеркните либо ключи, либо значения, вызвав keys() или values() , а затем distinct()

На момент написания этой статьи (июнь 2015 г.) UC Berkeley + EdX запускает бесплатный онлайн-курс Введение в Big Data и Apache Spark , которые обеспечивали бы практическую реализацию этих функций.

    
ответ дан Paul 21.06.2015 в 02:19
источник
  • Привет, Пол! Позвольте мне предположить, что мы имеем кортеж RDD следующим образом: (1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3 , 21), (3,22) и т. Д. Здесь вы могли заметить, что и ключи, и значения повторяются в разных кортежах. поэтому, если я применил различные () на вышеописанном RDD, каков был бы результат ..? Пожалуйста, уделите немного времени. Спасибо! И, да, я принимал этот курс онлайн! :) –  preetham madeti 21.06.2015 в 21:47
  • У меня нет времени прямо сейчас, но вы можете настроить свой собственный RDD с помощью myRDD = sc.parallelize ([(1,20), (1,21), (1,20), (2,20) , (2,22), (2,20), (3,21), (3,22)]); Это может даже работать в одном из предыдущих лабораторных ноутбуков из курса Spark. Затем запустите myRDD.distinct (). Collect (), чтобы проверить результат –  Paul 22.06.2015 в 02:28
6

Джастин Пихони прав. Для этого определения используется метод hashCode и equals. Его возвращают отдельные элементы (объект)

val rdd = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))

Distinct

rdd.distinct.collect().foreach(println)
(2,22)
(1,20)
(3,22)
(2,20)
(1,21)
(3,21)

Если вы хотите применить отдельный ключ. В этом случае лучше уменьшить вариант

ReduceBy

 val reduceRDD= rdd.map(tup =>
    (tup._1, tup)).reduceByKey { case (a, b) => a }.map(_._2)

reduceRDD.collect().foreach(println)

Вывод: -

(2,20)
(1,20)
(3,21)
    
ответ дан Amit Dubey 09.09.2016 в 11:19
источник
4

distinct использует методы hashCode и equals объектов для этого определения. Наборы построены с механизмами равенства, делегирующими вниз в равенство и положение каждого объекта. Таким образом, distinct будет работать против всего Tuple2 объекта. Как указал Павел, вы можете назвать keys или values , а затем distinct . Или вы можете написать свои собственные значения через aggregateByKey , что будет поддерживать спаривание ключей. Или, если вам нужны разные клавиши, вы можете использовать обычный aggregate

    
ответ дан Justin Pihony 21.06.2015 в 07:50
источник
  • Спасибо! Имеет смысл. –  preetham madeti 21.06.2015 в 21:44
2

Похоже, что distinct избавится от дубликатов (ключ, значение).

В приведенном ниже примере (1,20) и (2,20) повторяются дважды в myRDD , но после distinct() дубликаты удаляются.

scala> val myRDD = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))
myRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1274] at parallelize at <console>:22

scala> myRDD.collect().foreach(println _)
(1,20)
(1,21)
(1,20)
(2,20)
(2,22)
(2,20)
(3,21)
(3,22)

scala> myRDD.distinct.collect().foreach(println _)
(2,22)
(1,20)
(3,22)
(2,20)
(1,21)
(3,21)
    
ответ дан user3654449 25.10.2015 в 01:28
источник