Запрос на удаление Scala с помощью Akka и Postgres

Я пытаюсь написать метод, который просто удаляет строку из базы данных на основе идентификатора.

class PolicyHolderDAO(database: DatabaseDef) extends CRUDActor[PolicyHolder] {

  private val policyHolders: TableQuery[PolicyHolderTable] = TableQuery[PolicyHolderTable]
  implicit val system: ActorSystem = ActorSystem("Bitcoin-Insurance")
  import system.dispatcher
  implicit val timeout: Timeout = Timeout(5.seconds)
  private implicit var session = database.createSession

  override def receive = {
    case PolicyHolderDAO.Read(id) => sender ! read(id)
    case PolicyHolderDAO.Create(policyHolder) => sender ! create(policyHolder)
    case PolicyHolderDAO.Delete(policyHolder) => sender ! delete(policyHolder)
  }

  /**
   * @param policyHolder the policy holder to inserted into the database
   * @return id
   */
  override def create(policyHolder: PolicyHolder): Future[PolicyHolder] = {
    future {
      (policyHolders returning policyHolders.map(_.id) into
        ((policyHolder, id) => policyHolder.copy(id = Some(id)))) += policyHolder

    }
  }
  /**
   * @param id the id that corresponds to a policy holder
   * @return a future value of a policy holder if the policy holder exists in the database, else it returns none
   */
  override def read(id: Future[Long]): Future[Option[PolicyHolder]] = {
    id.map(i => policyHolders.filter(p => p.id === i).firstOption)
  }

  /**
   * @param policyHolder the policyHolder to be updated
   * @return policyHolder the policyHolders information now saved in the database
   */
  override def update(policyHolder: Future[PolicyHolder]): Future[Option[PolicyHolder]] = {
    /*  //val policyHolderFromDb = policyHolder.map(p => policyHolders.filter(_.id === p.id.getOrElse(-1)))
    val policyHolderFromDb = for (p <- policyHolder;  q = policyHolders.filter(_.id === p.id) ) yield q.update

    val updatedPolicyHolder: Future[Option[PolicyHolder]] = for (p <- policyHolderFromDb; result = create(p)) yield result
    updatedPolicyHolder*/
    Future(None)
  }
  /**
   * @param policyHolder the policy holder to be deleted from our database
   * @return affectedRows the number of rows effected by this query
   */
  override def delete(policyHolder: Future[PolicyHolder]): Future[Int] = {
    val policyHolderId: Future[Long] = policyHolder.map(p => p.id.getOrElse(-1))
    val affectedRows = for (id <- policyHolderId; q = policyHolders.filter(_.id === id)) yield q
    affectedRows.map(q => q.delete)

  }
}

Я пытаюсь проверить это с помощью этого тестового примера, написанного для теста Scala.

  "A PolicyHolderDAO Actor" must {
    "be able to delete an existing policy holder from our database" in {
      val policyHolder = PolicyHolder(None, "Chris", "Stewart")
      val createdPolicyHolderAny: Future[Any] = policyHolderDAOActor ? PolicyHolderDAO.Create(policyHolder)
      val createdPolicyHolder: Future[PolicyHolder] = createdPolicyHolderAny.mapTo[Future[PolicyHolder]].flatMap(p => p)

      policyHolderDAOActor ! PolicyHolderDAO.Delete(createdPolicyHolder)

      val deletedPolicyHolderAny: Future[Any] = policyHolderDAOActor ? PolicyHolderDAO.Get(createdPolicyHolder.map(_.id))
      val deletedPolicyHolder: Future[Option[PolicyHolder]] = deletedPolicyHolderAny.mapTo[Future[Option[PolicyHolder]]].flatMap(p => p)
      whenReady(deletedPolicyHolder, timeout(10 seconds), interval(5 millis)) { p =>
        val policyHolderExists = p match {
          case Some(a) =>
            println(a)
            true
          case None => false
        }
        policyHolderExists must be(false)
      }
    }
  }

Однако я проваливаю этот тест. Причина в том, что строка НЕ ​​удаляется из нашей базы данных. Я не уверен, почему этот тест не проходит. У меня есть соответствующий модульный тест для этого метода, который возвращает 1 затронутую строку в результате удаления, что имеет смысл. Может ли быть что-то, чего я не понимаю в Akka/Futures?

Спасибо!


person Chris Stewart    schedule 22.12.2014    source источник


Ответы (3)


Скорее всего, в вашем тесте есть состояние гонки:

...
policyHolderDAOActor ! PolicyHolderDAO.Delete(createdPolicyHolder)

val deletedPolicyHolderAny: Future[Any] = policyHolderDAOActor ? PolicyHolderDAO.Get(createdPolicyHolder.map(_.id))
...

Очень сложно сказать, так как вы не раскрыли Actor, который это делает, но на основании того факта, что ваш метод delete выше возвращает Future[Int], похоже, что происходит:

  • policyHolderDAOActor передается сообщение об удалении в сообщении.
  • Затем policyHolderDAOActor немедленно запрашивается получение той же записи по идентификатору.
  • policyHolderDAOActor вызывает метод delete, который возвращает Future[Int], поэтому результат, скорее всего, отбрасывается, а receive возвращается немедленно.
  • Затем policyHolderDAOActor может обработать следующее сообщение, чтобы получить запись по идентификатору, но удаление еще не завершило обработку, поэтому запись все еще существует.

Введение ожидания между двумя вышеприведенными строками вероятно сделает тест пройденным, но если это не так, то, возможно, обмен частью кода Actor будет более красноречивым.

Если другие сообщения работают аналогично, возможно, запись еще даже не была создана, когда вы пытаетесь ее удалить. Вышеупомянутые пули все равно будут правильными, просто с другими методами. Чтобы быть в безопасности, вам, вероятно, следует блокировать каждую асинхронную операцию в вашем тесте, чтобы убедиться, что все выполняется последовательно.

person Michael Zajac    schedule 24.12.2014
comment
Примечание: вы также можете сократить mapTo[Future[Option[PolicyHolder]]].flatMap(p => p) до mapTo[Option[PolicyHolder]]. - person Michael Zajac; 26.12.2014
comment
Я добавил всего актера в ОП. - person Chris Stewart; 26.12.2014
comment
Хм, да. Это то, что я ожидал, поэтому мой ответ остается в силе. - person Michael Zajac; 26.12.2014
comment
Можно ли это проверить с помощью простого Thread.sleep между этими двумя строками? - person Chris Stewart; 26.12.2014
comment
Стоит попробовать. Я бы сделал это между каждым будущим, чтобы быть уверенным. - person Michael Zajac; 26.12.2014
comment
Похоже, ваше решение сработало. Спасибо. Также ваше предложение сократить с помощью mapTo не работает, я получаю исключение приведения класса. Тип возвращаемого значения метода — Future[PolicyHolder], а не просто PolicyHolder. - person Chris Stewart; 26.12.2014
comment
Странно насчет mapTo, но рад, что вы нашли решение. - person Michael Zajac; 26.12.2014

Я думаю, что ваша проблема заключается в передаче сеанса Slick в нужное место:

  1. delete не имеет второго списка параметров, содержащего (implicit val session: Session)
  2. когда актер готов, сессия, вероятно, закрыта. Следовательно, вам нужно явно получить новый.

Я не уверен насчет 1), потому что я привык использовать Slick с Play, у которого может быть немного другой API, но у меня есть некоторый опыт работы со 2):

https://github.com/payola/payola-viz/blob/b5668bcbdefc3bce7e6095d211fa04c2aa697685/src/app/model/services/LDVMServiceImpl.scala#L44

person Jirka Helmich    schedule 22.12.2014
comment
Это не сработало, я переместил создание сеанса внутрь тела метода безрезультатно. - person Chris Stewart; 23.12.2014

Просто замечание, почему бы не сделать delete проще?

def delete(policyHolder: Future[PolicyHolder]): Future[Int] = {
   for (p <- policyHolder) yield policyHolders.delete(_.id === p.id)
}
person Ashalynd    schedule 25.12.2014