Группировка с условием when в Pyspark

Мой фрейм данных выглядит так

    id      |reg_date  |      txn_date|
+----------+----------+--------------------+
|1          |2019-01-06| 2019-02-15 12:51:15|
|1          |2019-01-06| 2019-03-29 13:15:27|
|1          |2019-01-06| 2019-06-01 01:42:57|
|1          |2019-01-06| 2019-01-06 17:01:...|
|5          |2019-06-16| 2019-07-19 11:50:34|
|5          |2019-06-16| 2019-07-13 19:49:39|
|5          |2019-06-16| 2019-08-27 17:37:22|
|2          |2018-07-30| 2019-01-01 07:03:...|
|2          |2018-07-30| 2019-07-30 01:27:57|
|2          |2018-07-30| 2019-02-01 00:08:35

Я хочу забрать первый txn_date после reg_date, то есть первый txn_date из reg_date >= txn_date.

Ожидаемый результат

    id      |reg_date  |      txn_date|
+----------+----------+--------------------+
|1          |2019-01-06| 2019-01-06 17:01:...|
|5          |2019-06-16| 2019-07-13 19:49:39|
|2          |2018-07-30| 2019-07-30 01:27:57|

Я сделал до сих пор,

df = df.withColumn('txn_date',to_date(unix_timestamp(F.col('txn_date'),'yyyy-MM-dd HH:mm:ss').cast("timestamp")))

df = df.withColumn('reg_date',to_date(unix_timestamp(F.col('reg_date'),'yyyy-MM-dd').cast("timestamp")))

gg = df.groupBy('id','reg_date').agg(min(F.col('txn_date')))

Но получаю неверные результаты.


person John Davis    schedule 16.10.2019    source источник
comment
Было бы неплохо, если бы вы подготовили данные для создания фрейма данных. Какой результат вы получаете. ?   -  person PIG    schedule 16.10.2019
comment
@PIG - есть 2 условия ... одно - reg_date ›= txn_date, другое основано на этом фильтре с использованием операции groupby find min.txn_date после reg_date. В моем случае я получил некоторые результаты, которые удовлетворяют reg_date ‹txn_date   -  person John Davis    schedule 16.10.2019
comment
@John Davis Где столбец "мобильный" в вашем df?   -  person QuantStats    schedule 16.10.2019
comment
@QuantStats - это не мобильный, это id   -  person John Davis    schedule 16.10.2019


Ответы (2)


Условие reg_date >= txn_date может быть неоднозначным.

2019-01-06>=2019-01-06 17:01:30 означает 2019-01-06 00:00:00>=2019-01-06 17:01:30 или 2019-01-06 23:59:59>=2019-01-06 17:01:30?

В вашем примере 2019-01-06>=2019-01-06 17:01:30 оценивается как истинное, поэтому я предполагаю, что это последний случай, то есть случай с 23:59:59.

Исходя из предположения выше, вот как я это закодировал.

import pyspark.sql.functions as F

#create a sample data frame
data = [('2019-01-06','2019-02-15 12:51:15'),('2019-01-06','2019-03-29 13:15:27'),('2019-01-06','2019-01-06 17:01:30'),\
('2019-07-30','2019-07-30 07:03:01'),('2019-07-30','2019-07-30 01:27:57'),('2019-07-30','2019-07-30 00:08:35')]

cols = ('reg_date', 'txn_date')

df = spark.DataFrame(data,cols)

#add 23:59:59 to reg_date as a dummy_date for a timestamp comparison later
df = df.withColumn('dummy_date', F.concat(F.col('reg_date'), F.lit(' 23:59:59')))

#convert columns to the appropriate time data types
df = df.select([F.to_date(F.col('reg_date'),'yyyy-MM-dd').alias('reg_date'),\
F.to_timestamp(F.col('txn_date'),'yyyy-MM-dd HH:mm:ss').alias('txn_date'),\
F.to_timestamp(F.col('dummy_date'),'yyyy-MM-dd HH:mm:ss').alias('dummy_date')])

#implementation part
(df.orderBy('reg_date')
   .filter(F.col('dummy_date')>=F.col('txn_date'))
   .groupBy('reg_date')
   .agg(F.first('txn_date').alias('txn_date'))
   .show()) 

#+----------+----------------------+
#|  reg_date|              txn_date|
#+----------+----------------------+
#|2019-01-06|   2019-01-06 17:01:30|
#|2019-07-30|   2019-07-30 07:03:01|
#+----------+----------------------+

person QuantStats    schedule 16.10.2019

Заказывать не нужно. Вы можете отбросить все меньшие значения с помощью фильтра, затем агрегировать по идентификатору и получить меньшую временную метку, потому что первая временная метка будет минимальной. Что-то вроде:

df.filter(df.reg_date >= df.txn_date) \
  .groupBy(df.reg_date) \
  .agg(F.min(df.txn_date)) \
  .show()
person Daniel Argüelles    schedule 16.10.2019
comment
Из OP: я хочу забрать 1-й txn_date после reg_date. Порядок имеет значение, если я правильно его понимаю, потому что он хочет первое свидание, которое проходит фильтр, даже если оно не может быть минимальным. - person QuantStats; 16.10.2019
comment
Если вы заказываете что-то, вы теряете исходный порядок, поэтому нет смысла брать первое после чего-то ... Более того, если вы думаете о больших данных или искре, вы не можете предполагать порядок в данных. С другой стороны, первая дата, прошедшая фильтр, является самой ранней датой после этого фильтра, поэтому это минимальная дата. - person Daniel Argüelles; 16.10.2019