包括其它的JDBC数据库节点都是这个JDBC服务的调用

name := "learn-scalikeJDBC"version := "0.1"scalaVersion := "2.12.4"// Scala 2.10, 2.11, 2.12libraryDependencies ++= Seq(  "org.scalikejdbc" %% "scalikejdbc"       % "3.1.0",  "org.scalikejdbc" %% "scalikejdbc-test"   % "3.1.0"   % "test",  "org.scalikejdbc" %% "scalikejdbc-config"  % "3.1.0",  "com.h2database"  %  "h2"                % "1.4.196",  "mysql" % "mysql-connector-java" % "6.0.6",  "org.postgresql" % "postgresql" % "42.2.0",  "commons-dbcp" % "commons-dbcp" % "1.4",  "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",  "com.zaxxer" % "HikariCP" % "2.7.4",  "com.jolbox" % "bonecp" % "0.8.0.RELEASE",  "com.typesafe.slick" %% "slick" % "3.2.1",  "ch.qos.logback"  %  "logback-classic"   % "1.2.3")

简单来说:JDBC是一种开放标准的跨编程语言、跨数据库类型编程API。各类型数据库产品厂商都会按它的标准要求来提供针对自身产品的JDBC驱动程序。最主要的这是一套成熟的工具,在编程人员中使用很普及。既然我们的目标是开发一套标准的大数据系统集成API,那么采用JDBC系统数据接口可以沿用业内丰富的技术资源、覆盖更多类型用户的编程需要,以及降低使用门槛。对于scala编程语言来讲,ScalikeJDBC是一套最合适满足我们下面开发要求的工具库,因为它可以用最简单易用的方式来实现JDBC的基本功能。

首先,登记jdbc驱动程序及准备连接池connection-pool。注意,ScalikeJDBC提供了固定名为'default的默认连接池。所以ConnectionPool.singleton

ConnectionPool.add('default。在'default连接池上的操作都无需提供名称,如:(DB==NamedDB('default)),(AutoSession==NamedAutoSession('default))等。上面的代码示范了手工方式的连接池设置。如果应用需要替换增加数据库或调整数据库参数必须通过修改代码来实现。实用的程序应该使用配置文件的方式进行数据库设置可以轻松随意调整数据库参数,我们会在下一章介绍配置文件使用方法。

跟着是一条SQL语句的构建和运算:

  //construct SQL object
  val createSQL: SQL[Nothing,NoExtractor] =SQL("""
    create table members (
      id bigint primary key auto_increment,
      name varchar(30) not null,
      description varchar(1000),
      birthday date,
      created_at timestamp not null
    )""")

  //implicit val session = AutoSession   'default
  //run this SQL
  createSQL.execute().apply()(NamedAutoSession('h2mem))   //autoCommit

通过SQL("...")方法构建SQL[A,E]类型的实例,用execute()方法设定不同的运算方式,然后用apply()(implicit s:DBSession)来实际运算SQL,DBSession其实就是个connection实例。相关SQL[A,E]的类型定义如下:

/**
 * SQL abstraction.
 *
 * @param statement SQL template
 * @param rawParameters parameters
 * @param f  extractor function
 * @tparam A return type
 */
abstract class SQL[A, E <: WithExtractor](
  val statement: String,
  private[scalikejdbc] val rawParameters: Seq[Any]
)(f: WrappedResultSet => A)
    extends Extractor[A] {...}

/**
 * Extractor
 */
private[scalikejdbc] trait Extractor[A] {
  def extractor: (WrappedResultSet) => A
}

SQL("...")调用了apply构建方法:

object SQL {
 ...
  def apply[A](sql: String): SQL[A, NoExtractor] = new SQLToTraversableImpl[A, NoExtractor](sql, Seq.empty)(noExtractor[A](
    ErrorMessage.THIS_IS_A_BUG
  ))
}

/**
 * SQL which execute java.sql.Statement#executeQuery() and returns the result as scala.collection.Traversable value.
 *
 * @param statement SQL template
 * @param rawParameters parameters
 * @param extractor  extractor function
 * @tparam A return type
 */
class SQLToTraversableImpl[A, E <: WithExtractor](
  override val statement: String, override val rawParameters: Seq[Any]
)(
  override val extractor: WrappedResultSet => A
)
    extends SQL[A, E](statement, rawParameters)(extractor)
    with SQLToTraversable[A, E] {

  override protected def withParameters(params: Seq[Any]): SQLToResult[A, E, Traversable] = {
    new SQLToTraversableImpl[A, E](statement, params)(extractor)
  }

  override protected def withStatementAndParameters(state: String, params: Seq[Any]): SQLToResult[A, E, Traversable] = {
    new SQLToTraversableImpl[A, E](state, params)(extractor)
  }

  override protected def withExtractor[B](f: WrappedResultSet => B): SQLToResult[B, HasExtractor, Traversable] = {
    new SQLToTraversableImpl[B, HasExtractor](statement, rawParameters)(f)
  }

}

SQLToTraversableImpl类型提供了SQL语句参数组合方法并构建SQLToResult类型:

trait SQLToResult[A, E <: WithExtractor, C[_]] extends SQL[A, E] with Extractor[A] {
  import GeneralizedTypeConstraintsForWithExtractor._

  def result[AA](f: WrappedResultSet => AA, session: DBSession): C[AA]
  val statement: String
  private[scalikejdbc] val rawParameters: Seq[Any]
  def apply()(
    implicit
    session: DBSession,
    context: ConnectionPoolContext = NoConnectionPoolContext,
    hasExtractor: ThisSQL =:= SQLWithExtractor
  ): C[A] = {
    val attributesSwitcher = createDBSessionAttributesSwitcher()
    val f: DBSession => C[A] = s => result[A](extractor, DBSessionWrapper(s, attributesSwitcher))
    // format: OFF
    session match {
      case AutoSession | ReadOnlyAutoSession => DB.readOnly(f)
      case NamedAutoSession(name, _)         => NamedDB(name, session.settings).readOnly(f)
      case ReadOnlyNamedAutoSession(name, _) => NamedDB(name, session.settings).readOnly(f)
      case _                                 => f(session)
    }
    // format: ON
  }
}

这个SQLToResult的apply方法就是SQL-readonly-query的运算方法。

SQL[A,E]中类型参数A是运算SQL返回值类型,E是一种函数类型:(WrappedResultSet => A),用来把返回结果转换成A类型值。SQL类型还提供了bind,bindByName方法来绑定SQL参数。SQL是个泛式的类型,除默认转换成Query型的SQLToResult类型外还可以转换成execute、update、batch等等其它SQL类型:

/**
   * Binds parameters for batch
   *
   * @param parameters parameters
   * @return SQL for batch
   */
  def batch(parameters: Seq[Any]*): SQLBatch = {
    new SQLBatch(statement, parameters, tags)
  }
   /**
   * Binds parameters for largeBatch
   *
   * @param parameters parameters
   * @return SQL for batch
   */
  def largeBatch(parameters: Seq[Any]*): SQLLargeBatch =
    new SQLLargeBatch(statement, parameters, tags)
  /**
   * Binds parameters for batch
   *
   * @param parameters parameters
   * @return SQL for batch
   */
  def batchAndReturnGeneratedKey(parameters: Seq[Any]*): SQLBatchWithGeneratedKey = {
    new SQLBatchWithGeneratedKey(statement, parameters, tags)(None)
  }
  /**
   * Binds parameters for batch
   *
   * @param generatedKeyName generated key name
   * @param parameters parameters
   * @return SQL for batch
   */
  def batchAndReturnGeneratedKey(generatedKeyName: String, parameters: Seq[Any]*): SQLBatchWithGeneratedKey = {
    new SQLBatchWithGeneratedKey(statement, parameters, tags)(Some(generatedKeyName))
  }
  /**
   * Binds parameters for batch
   *
   * @param parameters parameters
   * @return SQL for batch
   */
  def batchByName(parameters: Seq[(Symbol, Any)]*): SQLBatch = {
    val names = SQLTemplateParser.extractAllParameters(statement)
    val sqlWithPlaceHolders = SQLTemplateParser.convertToSQLWithPlaceHolders(statement)
    val _sql = validateAndConvertToNormalStatement(statement, sqlWithPlaceHolders, names, _settings, parameters.headOption.getOrElse(Seq.empty))._1
    val _parameters: Seq[Seq[Any]] = parameters.map { p =>
      validateAndConvertToNormalStatement(statement, sqlWithPlaceHolders, names, _settings, p)._2
    }
    new SQLBatch(_sql, _parameters, tags)
  }
 /**
   * Set execution type as execute
   *
   * @return SQL instance
   */
  def execute(): SQLExecution = {
    new SQLExecution(statement, rawParameters, tags)((stmt: PreparedStatement) => {})((stmt: PreparedStatement) => {})
  }
  /**
   * Set execution type as execute with filters
   *
   * @param before before filter
   * @param after after filter
   * @return SQL instance
   */
  def executeWithFilters(before: (PreparedStatement) => Unit, after: (PreparedStatement) => Unit): SQLExecution = {
    new SQLExecution(statement, rawParameters, tags)(before)(after)
  }
  /**
   * Set execution type as executeUpdate
   *
   * @return SQL instance
   */
  def executeUpdate(): SQLUpdate = update()
  /**
   * Set execution type as executeUpdate with filters
   *
   * @param before before filter
   * @param after after filter
   * @return SQL instance
   */
  def executeUpdateWithFilters(before: (PreparedStatement) => Unit, after: (PreparedStatement) => Unit): SQLUpdate = {
    updateWithFilters(before, after)
  }
  /**
   * Set execution type as executeUpdate
   *
   * @return SQL instance
   */
  def update(): SQLUpdate = {
    new SQLUpdate(statement, rawParameters, tags)((stmt: PreparedStatement) => {})((stmt: PreparedStatement) => {})
  }
  /**
   * Set execution type as executeUpdate with filters
   *
   * @param before before filter
   * @param after after filter
   * @return SQL instance
   */
  def updateWithFilters(before: (PreparedStatement) => Unit, after: (PreparedStatement) => Unit): SQLUpdate = {
    new SQLUpdate(statement, rawParameters, tags)(before)(after)
  }
  /**
   * Set execution type as `executeLargeUpdate`
   *
   * @return SQL instance
   */
  def largeUpdate(): SQLLargeUpdate =
    new SQLLargeUpdate(statement, rawParameters, tags)(_ => {})(_ => {})
  /**
   * Set execution type as `executeLargeUpdate` with filters
   *
   * @param before before filter
   * @param after after filter
   * @return SQL instance
   */
  def largeUpdateWithFilters(before: PreparedStatement => Unit, after: PreparedStatement => Unit): SQLLargeUpdate =
    new SQLLargeUpdate(statement, rawParameters, tags)(before)(after)
  /**
   * Set execution type as updateAndReturnGeneratedKey
   *
   * @return SQL instance
   */
  def updateAndReturnGeneratedKey(): SQLUpdateWithGeneratedKey = {
    updateAndReturnGeneratedKey(1)
  }
  def updateAndReturnGeneratedKey(name: String): SQLUpdateWithGeneratedKey = {
    new SQLUpdateWithGeneratedKey(statement, rawParameters, this.tags)(name)
  }
  def updateAndReturnGeneratedKey(index: Int): SQLUpdateWithGeneratedKey = {
    new SQLUpdateWithGeneratedKey(statement, rawParameters, this.tags)(index)
  }

以上每种SQL类型都是一种特别的SQL运算方式,即它们都有自己独特的apply()函数如:

/**
 * SQL which execute java.sql.Statement#execute().
 *
 * @param statement SQL template
 * @param parameters parameters
 * @param before before filter
 * @param after after filter
 */
class SQLExecution(val statement: String, val parameters: Seq[Any], val tags: Seq[String] = Nil)(
    val before: (PreparedStatement) => Unit
)(
    val after: (PreparedStatement) => Unit
) {

  def apply()(implicit session: DBSession): Boolean = {
    val attributesSwitcher = new DBSessionAttributesSwitcher(SQL("").tags(tags: _*))
    val f: DBSession => Boolean = DBSessionWrapper(_, attributesSwitcher).executeWithFilters(before, after, statement, parameters: _*)
    // format: OFF
    session match {
      case AutoSession                       => DB.autoCommit(f)
      case NamedAutoSession(name, _)         => NamedDB(name, session.settings).autoCommit(f)
      case ReadOnlyAutoSession               => DB.readOnly(f)
      case ReadOnlyNamedAutoSession(name, _) => NamedDB(name, session.settings).readOnly(f)
      case _                                 => f(session)
    }
    // format: ON
  }

}

/**
 * SQL which execute java.sql.Statement#executeUpdate().
 *
 * @param statement SQL template
 * @param parameters parameters
 * @param before before filter
 * @param after after filter
 */
class SQLUpdate(val statement: String, val parameters: Seq[Any], val tags: Seq[String] = Nil)(
    val before: (PreparedStatement) => Unit
)(
    val after: (PreparedStatement) => Unit
) {

  def apply()(implicit session: DBSession): Int = {
    val attributesSwitcher = new DBSessionAttributesSwitcher(SQL("").tags(tags: _*))
    session match {
      case AutoSession =>
        DB.autoCommit(DBSessionWrapper(_, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*))
      case NamedAutoSession(name, _) =>
        NamedDB(name, session.settings).autoCommit(DBSessionWrapper(_, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*))
      case ReadOnlyAutoSession =>
        DB.readOnly(DBSessionWrapper(_, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*))
      case ReadOnlyNamedAutoSession(name, _) =>
        NamedDB(name, session.settings).readOnly(DBSessionWrapper(_, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*))
      case _ =>
        DBSessionWrapper(session, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*)
    }
  }

}

我们也可以把多个SQL语句共同放在一个session-block里:

 import org.joda.time._
  // DB autoCommit  ==  NamedDB('default) autoCommit
  NamedDB('h2mem) autoCommit  { implicit session =>
    val insertSql: SQL[Nothing,NoExtractor] =
      SQL("insert into members (name, birthday, created_at) values (?, ?, ?)")
    val createdAt: DateTime = DateTime.now

    insertSql.bind("Alice", Some(new LocalDate("1980-01-01")), createdAt).update.apply()  //commit
    insertSql.bind("Bob", None, createdAt).update.apply()   //commit
  }

这是种批次更新SQL:使用了共同PreparedStatement模版,然后绑定不同的参数进行运算。上面的代码是一种loan-pattern编程模式。除autoCommit还有readOnly,localTx等:

  /**
   * Provides auto-commit session block.
   * @param execution block
   * @tparam A  return type
   * @return result value
   */
  def autoCommit[A](execution: DBSession => A): A = {
    if (autoCloseEnabled) using(conn)(_ => execution(autoCommitSession()))
    else execution(autoCommitSession())
  }

  /**
   * Provides read-only session block.
   * @param execution block
   * @tparam A  return type
   * @return result value
   */
  def readOnly[A](execution: DBSession => A): A = {
    if (autoCloseEnabled) using(conn)(_ => execution(readOnlySession()))
    else execution(readOnlySession())
  }

 /**
   * Provides local-tx session block.
   * @param execution block
   * @tparam A  return type
   * @return result value
   */
  def localTx[A](execution: DBSession => A)(implicit boundary: TxBoundary[A] = defaultTxBoundary[A]): A = {
    val doClose = if (autoCloseEnabled) () => conn.close() else () => ()
    val tx = newTx
    begin(tx)
    val txResult = try {
      rollbackIfThrowable[A] {
        val session = DBSession(
          conn = conn,
          tx = Option(tx),
          connectionAttributes = connectionAttributes,
          settings = this.settingsProvider
        )
        val result: A = execution(session)
        boundary.finishTx(result, tx)
      }
    } catch {
      case e: Throwable => doClose(); throw e
    }
    boundary.closeConnection(txResult, doClose)
  }

在autoCommit域中每一次SQL运算都自动提交事务,不支持交易回滚rollback。在下面的localTx示范里,任何运算出现异常都会导致整体事务的回滚并抛出异常:

  //data model
  case class Member(
                     id: Long,
                     name: String,
                     description: Option[String] = None,
                     birthday: Option[LocalDate] = None,
                     createdAt: DateTime)

  def create(name: String, birthday: Option[LocalDate], remarks: Option[String])(implicit session: DBSession): Member = {
    val insertSQL: SQL[Nothing,NoExtractor]  =
      sql"""insert into members (name, birthday, description, created_at)
           values (${name}, ${birthday}, ${remarks}, ${DateTime.now})"""
    val id: Long = insertSQL.updateAndReturnGeneratedKey.apply()
    Member(id, name, remarks, birthday,DateTime.now)
  }

  val users = List(
    ("John",new LocalDate("2008-03-01"),"youngest user"),
    ("Susan",new LocalDate("2000-11-03"),"middle aged user"),
    ("Peter",new LocalDate("1983-01-21"),"oldest user"),
  )
  import scala.util._   //Try
  import scalikejdbc.TxBoundary.Try._
  val result: Try[List[Member]] =
    NamedDB('h2mem) localTx { implicit session =>
      Try {
        val members: List[Member] = users.map { person =>
          create(person._1, Some(person._2), Some(person._3))
        }
        throw new RuntimeException("fail test. boooommmm!!!!!")
        members
      }
    }

  result match {
    case Success(mlist) => println(s"batch added members: $mlist")
    case Failure(err) => println(s"${err.getMessage}")
  }

在上面这段代码中localTx block中的所有运算是包嵌在Try{}里的,即交易界限transaction-boundary,任何异常都被框定在这个界限里。以上的例子中不会抛出异常,返回结果包括了运算的所有状态。

我们也可以用case class或其它类型来模拟数据行类型以实现数据行的强类型操作:

 //data row converter
  val toMember = (rs: WrappedResultSet) => Member(
    id = rs.long("id"),
    name = rs.string("name"),
    description = rs.stringOpt("description"),
    birthday = rs.jodaLocalDateOpt("birthday"),
    createdAt = rs.jodaDateTime("created_at")
  )

  val selectSQL: SQL[Member,HasExtractor] = sql"""select * from members""".map(toMember)
  val members: List[Member] = NamedDB('h2mem) readOnly { implicit session =>
    selectSQL.list.apply()
  }

  println(s"all members: $members")
  NamedDB('h2mem).close()

上面的代码中使用了sql"""...""",这是标准的scala-string-interpolation,是构建SQL[A,E]的另一种方法。

下面是本次讨论的示范源代码:

import scalikejdbc._
object JDBCIntro extends App {
  //standard java procedure to register driver
  Class.forName("org.h2.Driver")
  //must have a db named 'default
  //ConnectionPool.singleton("jdbc:h2:mem:hello", "user", "pass")
  //==ConnectionPool.add('default,"jdbc:h2:mem:hello", "user", "pass")
  //db name is 'h2mem
  ConnectionPool.add('h2mem,"jdbc:h2:mem:hello", "user", "pass")
  //construct SQL object
  val createSQL: SQL[Nothing,NoExtractor] =SQL("""
    create table members (
      id bigint primary key auto_increment,
      name varchar(30) not null,
      description varchar(1000),
      birthday date,
      created_at timestamp not null
    )""")

  //implicit val session = AutoSession   'default
  //run this SQL
  createSQL.execute().apply()(NamedAutoSession('h2mem))   //autoCommit

  import org.joda.time._
  // DB autoCommit  ==  NamedDB('default) autoCommit
  NamedDB('h2mem) autoCommit  { implicit session =>
    val insertSql: SQL[Nothing,NoExtractor] =
      SQL("insert into members (name, birthday, created_at) values (?, ?, ?)")
    val createdAt: DateTime = DateTime.now
    insertSql.bind("Alice", Some(new LocalDate("1980-01-01")), createdAt).update.apply()  //commit
    insertSql.bind("Bob", None, createdAt).update.apply()   //commit
  }

  //data model
  case class Member(
                     id: Long,
                     name: String,
                     description: Option[String] = None,
                     birthday: Option[LocalDate] = None,
                     createdAt: DateTime)

  def create(name: String, birthday: Option[LocalDate], remarks: Option[String])(implicit session: DBSession): Member = {
    val insertSQL: SQL[Nothing,NoExtractor]  =
      sql"""insert into members (name, birthday, description, created_at)
           values (${name}, ${birthday}, ${remarks}, ${DateTime.now})"""
    val id: Long = insertSQL.updateAndReturnGeneratedKey.apply()
    Member(id, name, remarks, birthday,DateTime.now)
  }

  val users = List(
    ("John",new LocalDate("2008-03-01"),"youngest user"),
    ("Susan",new LocalDate("2000-11-03"),"middle aged user"),
    ("Peter",new LocalDate("1983-01-21"),"oldest user"),
  )
  import scala.util._   //Try
  import scalikejdbc.TxBoundary.Try._
  val result: Try[List[Member]] =
    NamedDB('h2mem) localTx { implicit session =>
      Try {
        val members: List[Member] = users.map { person =>
          create(person._1, Some(person._2), Some(person._3))
        }
        throw new RuntimeException("fail test. boooommmm!!!!!")
        members
      }
    }

  result match {
    case Success(mlist) => println(s"batch added members: $mlist")
    case Failure(err) => println(s"${err.getMessage}")
  }

  //data row converter
  val toMember = (rs: WrappedResultSet) => Member(
    id = rs.long("id"),
    name = rs.string("name"),
    description = rs.stringOpt("description"),
    birthday = rs.jodaLocalDateOpt("birthday"),
    createdAt = rs.jodaDateTime("created_at")
  )

  val selectSQL: SQL[Member,HasExtractor] = sql"""select * from members""".map(toMember)
  val members: List[Member] = NamedDB('h2mem) readOnly { implicit session =>
    selectSQL.list.apply()
  }

  println(s"all members: $members")
  NamedDB('h2mem).close()

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

  val p1 = MemberRow( "Peter Chan",Some(JDBCDate(1967,5,17)),"new member1",None,_root_.com.google.protobuf.ByteString.EMPTY)  val p2 = MemberRow( "Alanda Wong",Some(JDBCDate(1980,11,10)),"new member2",None,_root_.com.google.protobuf.ByteString.EMPTY)  val p3 = MemberRow( "Kate Zhang",Some(JDBCDate(1969,8,13)),"new member3",None,_root_.com.google.protobuf.ByteString.EMPTY)  val p4 = MemberRow( "Tiger Chan",Some(JDBCDate(1962,5,1)),"new member4",None,_root_.com.google.protobuf.ByteString.EMPTY)  def insertRows: Source[JDBCResult,NotUsed] = {    logger.info(s"running insertRows ...")    Source      .fromIterator => List(p1,p2,p3,p4).toIterator)      .via(stub.insertRows)  }

3、Query参数:

import scalikejdbc._object JDBCIntro extends App {  //standard java procedure to register driver  Class.forName("org.h2.Driver")  //default connectionPool named 'default  //ConnectionPool.singleton("jdbc:h2:mem:hello", "user", "pass")  //or ConnectionPool.add('default,"jdbc:h2:mem:hello", "user", "pass")  //db name is 'h2mem  ConnectionPool.add('h2mem,"jdbc:h2:mem:hello", "user", "pass")
import scalikejdbc._
object JDBCIntro extends App {
  //standard java procedure to register driver
  Class.forName("org.h2.Driver")
  //default connectionPool named 'default
  //ConnectionPool.singleton("jdbc:h2:mem:hello", "user", "pass")
  //or ConnectionPool.add('default,"jdbc:h2:mem:hello", "user", "pass")
  //db name is 'h2mem
  ConnectionPool.add('h2mem,"jdbc:h2:mem:hello", "user", "pass")

JDBCEngine.scala

ScalikeJDBC在覆盖JDBC基本功能上是比较完整的,而且实现这些功能的方式比较简洁,运算效率方面自然会稍高一筹了。理论上用ScalikeJDBC作为一种JDBC-Engine还是比较理想的:让它处于各种JDBC工具库和数据库实例之间接收JDBC运算指令然后连接目标数据库进行相关运算后返回结果。一般来说,各种JDBC工具库如ORM,FRM软件通过各自的DSL在复杂的数据库表关系环境内进行数据管理编程,最终产生相关的SQL语句即statement+parameters传递给指定类型的数据库JDBC驱动程序去运算并产生结果。如果这样描述,那么JDBC-Engine主要的功能就是支持下面这个函数:

首先,登记jdbc驱动程序及准备连接池connection-pool。注意,ScalikeJDBC提供了固定名为'default的默认连接池。所以ConnectionPool.singleton

ConnectionPool.add('default。在'default连接池上的操作都无需提供名称,如:(DB==NamedDB),(AutoSession==NamedAutoSession)等。上面的代码示范了手工方式的连接池设置。如果应用需要替换增加数据库或调整数据库参数必须通过修改代码来实现。实用的程序应该使用配置文件的方式进行数据库设置可以轻松随意调整数据库参数,我们会在下一章介绍配置文件使用方法。

跟着是一条SQL语句的构建和运算:

  //construct SQL object  val createSQL: SQL[Nothing,NoExtractor] =SQL("""    create table members (      id bigint primary key auto_increment,      name varchar(30) not null,      description varchar(1000),      birthday date,      created_at timestamp not null    )""")  //implicit val session = AutoSession   'default  //run this SQL  createSQL.execute(NamedAutoSession('h2mem))   //autoCommit

通过SQL方法构建SQL[A,E]类型的实例,用execute()方法设定不同的运算方式,然后用apply()(implicit s:DBSession)来实际运算SQL,DBSession其实就是个connection实例。相关SQL[A,E]的类型定义如下:

/** * SQL abstraction. * * @param statement SQL template * @param rawParameters parameters * @param f  extractor function * @tparam A return type */abstract class SQL[A, E <: WithExtractor](  val statement: String,  private[scalikejdbc] val rawParameters: Seq[Any])(f: WrappedResultSet => A)    extends Extractor[A] {...}/** * Extractor */private[scalikejdbc] trait Extractor[A] {  def extractor: (WrappedResultSet) => A}

SQL调用了apply构建方法:

object SQL { ...  def apply[A](sql: String): SQL[A, NoExtractor] = new SQLToTraversableImpl[A, NoExtractor](sql, Seq.empty)(noExtractor[A](    ErrorMessage.THIS_IS_A_BUG  ))}/** * SQL which execute java.sql.Statement#executeQuery() and returns the result as scala.collection.Traversable value. * * @param statement SQL template * @param rawParameters parameters * @param extractor  extractor function * @tparam A return type */class SQLToTraversableImpl[A, E <: WithExtractor](  override val statement: String, override val rawParameters: Seq[Any])(  override val extractor: WrappedResultSet => A)    extends SQL[A, E](statement, rawParameters)(extractor)    with SQLToTraversable[A, E] {  override protected def withParameters(params: Seq[Any]): SQLToResult[A, E, Traversable] = {    new SQLToTraversableImpl[A, E](statement, params)(extractor)  }  override protected def withStatementAndParameters(state: String, params: Seq[Any]): SQLToResult[A, E, Traversable] = {    new SQLToTraversableImpl[A, E](state, params)(extractor)  }  override protected def withExtractor[B](f: WrappedResultSet => B): SQLToResult[B, HasExtractor, Traversable] = {    new SQLToTraversableImpl[B, HasExtractor](statement, rawParameters)  }}

SQLToTraversableImpl类型提供了SQL语句参数组合方法并构建SQLToResult类型:

trait SQLToResult[A, E <: WithExtractor, C[_]] extends SQL[A, E] with Extractor[A] {  import GeneralizedTypeConstraintsForWithExtractor._  def result[AA](f: WrappedResultSet => AA, session: DBSession): C[AA]  val statement: String  private[scalikejdbc] val rawParameters: Seq[Any]  def apply()(    implicit    session: DBSession,    context: ConnectionPoolContext = NoConnectionPoolContext,    hasExtractor: ThisSQL =:= SQLWithExtractor  ): C[A] = {    val attributesSwitcher = createDBSessionAttributesSwitcher()    val f: DBSession => C[A] = s => result[A](extractor, DBSessionWrapper(s, attributesSwitcher))    // format: OFF    session match {      case AutoSession | ReadOnlyAutoSession => DB.readOnly      case NamedAutoSession         => NamedDB(name, session.settings).readOnly      case ReadOnlyNamedAutoSession => NamedDB(name, session.settings).readOnly      case _                                 => f    }    // format: ON  }}

这个SQLToResult的apply方法就是SQL-readonly-query的运算方法。

SQL[A,E]中类型参数A是运算SQL返回值类型,E是一种函数类型:(WrappedResultSet => A),用来把返回结果转换成A类型值。SQL类型还提供了bind,bindByName方法来绑定SQL参数。SQL是个泛式的类型,除默认转换成Query型的SQLToResult类型外还可以转换成execute、update、batch等等其它SQL类型:

/**   * Binds parameters for batch   *   * @param parameters parameters   * @return SQL for batch   */  def batch(parameters: Seq[Any]*): SQLBatch = {    new SQLBatch(statement, parameters, tags)  }   /**   * Binds parameters for largeBatch   *   * @param parameters parameters   * @return SQL for batch   */  def largeBatch(parameters: Seq[Any]*): SQLLargeBatch =    new SQLLargeBatch(statement, parameters, tags)  /**   * Binds parameters for batch   *   * @param parameters parameters   * @return SQL for batch   */  def batchAndReturnGeneratedKey(parameters: Seq[Any]*): SQLBatchWithGeneratedKey = {    new SQLBatchWithGeneratedKey(statement, parameters, tags)  }  /**   * Binds parameters for batch   *   * @param generatedKeyName generated key name   * @param parameters parameters   * @return SQL for batch   */  def batchAndReturnGeneratedKey(generatedKeyName: String, parameters: Seq[Any]*): SQLBatchWithGeneratedKey = {    new SQLBatchWithGeneratedKey(statement, parameters, tags)(Some(generatedKeyName))  }  /**   * Binds parameters for batch   *   * @param parameters parameters   * @return SQL for batch   */  def batchByName(parameters: Seq[(Symbol, Any)]*): SQLBatch = {    val names = SQLTemplateParser.extractAllParameters(statement)    val sqlWithPlaceHolders = SQLTemplateParser.convertToSQLWithPlaceHolders(statement)    val _sql = validateAndConvertToNormalStatement(statement, sqlWithPlaceHolders, names, _settings, parameters.headOption.getOrElse(Seq.empty))._1    val _parameters: Seq[Seq[Any]] = parameters.map { p =>      validateAndConvertToNormalStatement(statement, sqlWithPlaceHolders, names, _settings, p)._2    }    new SQLBatch(_sql, _parameters, tags)  } /**   * Set execution type as execute   *   * @return SQL instance   */  def execute(): SQLExecution = {    new SQLExecution(statement, rawParameters, tags)((stmt: PreparedStatement) => {})((stmt: PreparedStatement) => {})  }  /**   * Set execution type as execute with filters   *   * @param before before filter   * @param after after filter   * @return SQL instance   */  def executeWithFilters(before: (PreparedStatement) => Unit, after: (PreparedStatement) => Unit): SQLExecution = {    new SQLExecution(statement, rawParameters, tags)  }  /**   * Set execution type as executeUpdate   *   * @return SQL instance   */  def executeUpdate(): SQLUpdate = update()  /**   * Set execution type as executeUpdate with filters   *   * @param before before filter   * @param after after filter   * @return SQL instance   */  def executeUpdateWithFilters(before: (PreparedStatement) => Unit, after: (PreparedStatement) => Unit): SQLUpdate = {    updateWithFilters(before, after)  }  /**   * Set execution type as executeUpdate   *   * @return SQL instance   */  def update(): SQLUpdate = {    new SQLUpdate(statement, rawParameters, tags)((stmt: PreparedStatement) => {})((stmt: PreparedStatement) => {})  }  /**   * Set execution type as executeUpdate with filters   *   * @param before before filter   * @param after after filter   * @return SQL instance   */  def updateWithFilters(before: (PreparedStatement) => Unit, after: (PreparedStatement) => Unit): SQLUpdate = {    new SQLUpdate(statement, rawParameters, tags)  }  /**   * Set execution type as `executeLargeUpdate`   *   * @return SQL instance   */  def largeUpdate(): SQLLargeUpdate =    new SQLLargeUpdate(statement, rawParameters, tags)(_ => {})(_ => {})  /**   * Set execution type as `executeLargeUpdate` with filters   *   * @param before before filter   * @param after after filter   * @return SQL instance   */  def largeUpdateWithFilters(before: PreparedStatement => Unit, after: PreparedStatement => Unit): SQLLargeUpdate =    new SQLLargeUpdate(statement, rawParameters, tags)  /**   * Set execution type as updateAndReturnGeneratedKey   *   * @return SQL instance   */  def updateAndReturnGeneratedKey(): SQLUpdateWithGeneratedKey = {    updateAndReturnGeneratedKey(1)  }  def updateAndReturnGeneratedKey(name: String): SQLUpdateWithGeneratedKey = {    new SQLUpdateWithGeneratedKey(statement, rawParameters, this.tags)  }  def updateAndReturnGeneratedKey(index: Int): SQLUpdateWithGeneratedKey = {    new SQLUpdateWithGeneratedKey(statement, rawParameters, this.tags)  }

以上每种SQL类型都是一种特别的SQL运算方式,即它们都有自己独特的apply()函数如:

/** * SQL which execute java.sql.Statement#execute(). * * @param statement SQL template * @param parameters parameters * @param before before filter * @param after after filter */class SQLExecution(val statement: String, val parameters: Seq[Any], val tags: Seq[String] = Nil)(    val before: (PreparedStatement) => Unit)(    val after: (PreparedStatement) => Unit) {  def apply()(implicit session: DBSession): Boolean = {    val attributesSwitcher = new DBSessionAttributesSwitcher(SQL("").tags(tags: _*))    val f: DBSession => Boolean = DBSessionWrapper(_, attributesSwitcher).executeWithFilters(before, after, statement, parameters: _*)    // format: OFF    session match {      case AutoSession                       => DB.autoCommit      case NamedAutoSession         => NamedDB(name, session.settings).autoCommit      case ReadOnlyAutoSession               => DB.readOnly      case ReadOnlyNamedAutoSession => NamedDB(name, session.settings).readOnly      case _                                 => f    }    // format: ON  }}/** * SQL which execute java.sql.Statement#executeUpdate(). * * @param statement SQL template * @param parameters parameters * @param before before filter * @param after after filter */class SQLUpdate(val statement: String, val parameters: Seq[Any], val tags: Seq[String] = Nil)(    val before: (PreparedStatement) => Unit)(    val after: (PreparedStatement) => Unit) {  def apply()(implicit session: DBSession): Int = {    val attributesSwitcher = new DBSessionAttributesSwitcher(SQL("").tags(tags: _*))    session match {      case AutoSession =>        DB.autoCommit(DBSessionWrapper(_, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*))      case NamedAutoSession =>        NamedDB(name, session.settings).autoCommit(DBSessionWrapper(_, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*))      case ReadOnlyAutoSession =>        DB.readOnly(DBSessionWrapper(_, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*))      case ReadOnlyNamedAutoSession =>        NamedDB(name, session.settings).readOnly(DBSessionWrapper(_, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*))      case _ =>        DBSessionWrapper(session, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*)    }  }}

我们也可以把多个SQL语句共同放在一个session-block里:

 import org.joda.time._  // DB autoCommit  ==  NamedDB autoCommit  NamedDB('h2mem) autoCommit  { implicit session =>    val insertSql: SQL[Nothing,NoExtractor] =      SQL("insert into members (name, birthday, created_at) values ")    val createdAt: DateTime = DateTime.now    insertSql.bind("Alice", Some(new LocalDate("1980-01-01")), createdAt).update.apply()  //commit    insertSql.bind("Bob", None, createdAt).update.apply()   //commit  }

这是种批次更新SQL:使用了共同PreparedStatement模版,然后绑定不同的参数进行运算。上面的代码是一种loan-pattern编程模式。除autoCommit还有readOnly,localTx等:

  /**   * Provides auto-commit session block.   * @param execution block   * @tparam A  return type   * @return result value   */  def autoCommit[A](execution: DBSession => A): A = {    if (autoCloseEnabled) using(_ => execution(autoCommitSession    else execution(autoCommitSession  }  /**   * Provides read-only session block.   * @param execution block   * @tparam A  return type   * @return result value   */  def readOnly[A](execution: DBSession => A): A = {    if (autoCloseEnabled) using(_ => execution(readOnlySession    else execution(readOnlySession  } /**   * Provides local-tx session block.   * @param execution block   * @tparam A  return type   * @return result value   */  def localTx[A](execution: DBSession => A)(implicit boundary: TxBoundary[A] = defaultTxBoundary[A]): A = {    val doClose = if (autoCloseEnabled) () => conn.close() else () => ()    val tx = newTx    begin    val txResult = try {      rollbackIfThrowable[A] {        val session = DBSession(          conn = conn,          tx = Option,          connectionAttributes = connectionAttributes,          settings = this.settingsProvider        )        val result: A = execution        boundary.finishTx(result, tx)      }    } catch {      case e: Throwable => doClose(); throw e    }    boundary.closeConnection(txResult, doClose)  }

在autoCommit域中每一次SQL运算都自动提交事务,不支持交易回滚rollback。在下面的localTx示范里,任何运算出现异常都会导致整体事务的回滚并抛出异常:

  //data model  case class Member(                     id: Long,                     name: String,                     description: Option[String] = None,                     birthday: Option[LocalDate] = None,                     createdAt: DateTime)  def create(name: String, birthday: Option[LocalDate], remarks: Option[String])(implicit session: DBSession): Member = {    val insertSQL: SQL[Nothing,NoExtractor]  =      sql"""insert into members (name, birthday, description, created_at)           values (${name}, ${birthday}, ${remarks}, ${DateTime.now})"""    val id: Long = insertSQL.updateAndReturnGeneratedKey.apply()    Member(id, name, remarks, birthday,DateTime.now)  }  val users = List(    ("John",new LocalDate("2008-03-01"),"youngest user"),    ("Susan",new LocalDate("2000-11-03"),"middle aged user"),    ("Peter",new LocalDate("1983-01-21"),"oldest user"),  )  import scala.util._   //Try  import scalikejdbc.TxBoundary.Try._  val result: Try[List[Member]] =    NamedDB('h2mem) localTx { implicit session =>      Try {        val members: List[Member] = users.map { person =>          create(person._1, Some(person._2), Some(person._3))        }        throw new RuntimeException("fail test. boooommmm!!!!!")        members      }    }  result match {    case Success => println(s"batch added members: $mlist")    case Failure => println(s"${err.getMessage}")  }

在上面这段代码中localTx block中的所有运算是包嵌在Try{}里的,即交易界限transaction-boundary,任何异常都被框定在这个界限里。以上的例子中不会抛出异常,返回结果包括了运算的所有状态。

我们也可以用case class或其它类型来模拟数据行类型以实现数据行的强类型操作:

 //data row converter  val toMember = (rs: WrappedResultSet) => Member(    id = rs.long("id"),    name = rs.string("name"),    description = rs.stringOpt("description"),    birthday = rs.jodaLocalDateOpt("birthday"),    createdAt = rs.jodaDateTime("created_at")  )  val selectSQL: SQL[Member,HasExtractor] = sql"""select * from members""".map  val members: List[Member] = NamedDB('h2mem) readOnly { implicit session =>    selectSQL.list.apply()  }    println(s"all members: $members")  NamedDB('h2mem).close()

上面的代码中使用了sql"""...""",这是标准的scala-string-interpolation,是构建SQL[A,E]的另一种方法。

下面是本次讨论的示范源代码:

import scalikejdbc._object JDBCIntro extends App {  //standard java procedure to register driver  Class.forName("org.h2.Driver")  //must have a db named 'default  //ConnectionPool.singleton("jdbc:h2:mem:hello", "user", "pass")  //==ConnectionPool.add('default,"jdbc:h2:mem:hello", "user", "pass")  //db name is 'h2mem  ConnectionPool.add('h2mem,"jdbc:h2:mem:hello", "user", "pass")  //construct SQL object  val createSQL: SQL[Nothing,NoExtractor] =SQL("""    create table members (      id bigint primary key auto_increment,      name varchar(30) not null,      description varchar(1000),      birthday date,      created_at timestamp not null    )""")  //implicit val session = AutoSession   'default  //run this SQL  createSQL.execute(NamedAutoSession('h2mem))   //autoCommit  import org.joda.time._  // DB autoCommit  ==  NamedDB autoCommit  NamedDB('h2mem) autoCommit  { implicit session =>    val insertSql: SQL[Nothing,NoExtractor] =      SQL("insert into members (name, birthday, created_at) values ")    val createdAt: DateTime = DateTime.now    insertSql.bind("Alice", Some(new LocalDate("1980-01-01")), createdAt).update.apply()  //commit    insertSql.bind("Bob", None, createdAt).update.apply()   //commit  }  //data model  case class Member(                     id: Long,                     name: String,                     description: Option[String] = None,                     birthday: Option[LocalDate] = None,                     createdAt: DateTime)  def create(name: String, birthday: Option[LocalDate], remarks: Option[String])(implicit session: DBSession): Member = {    val insertSQL: SQL[Nothing,NoExtractor]  =      sql"""insert into members (name, birthday, description, created_at)           values (${name}, ${birthday}, ${remarks}, ${DateTime.now})"""    val id: Long = insertSQL.updateAndReturnGeneratedKey.apply()    Member(id, name, remarks, birthday,DateTime.now)  }  val users = List(    ("John",new LocalDate("2008-03-01"),"youngest user"),    ("Susan",new LocalDate("2000-11-03"),"middle aged user"),    ("Peter",new LocalDate("1983-01-21"),"oldest user"),  )  import scala.util._   //Try  import scalikejdbc.TxBoundary.Try._  val result: Try[List[Member]] =    NamedDB('h2mem) localTx { implicit session =>      Try {        val members: List[Member] = users.map { person =>          create(person._1, Some(person._2), Some(person._3))        }        throw new RuntimeException("fail test. boooommmm!!!!!")        members      }    }  result match {    case Success => println(s"batch added members: $mlist")    case Failure => println(s"${err.getMessage}")  }  //data row converter  val toMember = (rs: WrappedResultSet) => Member(    id = rs.long("id"),    name = rs.string("name"),    description = rs.stringOpt("description"),    birthday = rs.jodaLocalDateOpt("birthday"),    createdAt = rs.jodaDateTime("created_at")  )  val selectSQL: SQL[Member,HasExtractor] = sql"""select * from members""".map  val members: List[Member] = NamedDB('h2mem) readOnly { implicit session =>    selectSQL.list.apply()  }  println(s"all members: $members")  NamedDB('h2mem).close()}

 JDBC的基本组件和工作原理比较简单:核心组件就connection、statement(preparedstatement)两个对象。connection提供与数据库的连接以及数据处理的运行环境,statement是connection的一个属性,包含了可运行的SQL语句及对它们的各种运算方法。下面我们就用ScalikeJDBC官网上的例子来示范说明ScalikeJDBC的应用:

package demo.grpc.jdbc.clientimport grpc.jdbc.services._import java.util.logging.Loggerimport protobuf.bytes.Converter._import akka.stream.scaladsl._import akka.NotUsedimport akka.actor.ActorSystemimport akka.stream.{ActorMaterializer, ThrottleMode}import io.grpc._import sdp.jdbc.engine._class JDBCStreamClient(host: String, port: Int) {  val logger: Logger = Logger.getLogger(classOf[JDBCStreamClient].getName)  val channel = ManagedChannelBuilder    .forAddress(host,port)    .usePlaintext(true)    .build()  val stub = JdbcGrpcAkkaStream.stub  val query = JDBCQuery (    dbName = "h2",    statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?",    parameters = marshal(Seq("Arizona", 2))  )  val query2 = JDBCQuery (    dbName = "h2",    statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?",    parameters = marshal(Seq("Colorado", 3))  )  val query3= JDBCQuery (    dbName = "h2",    statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?",    parameters = marshal(Seq("Arkansas", 8))  )  def queryRows: Source[JDBCDataRow,NotUsed] = {    logger.info(s"running queryRows ...")    Source        .single        .via(stub.runQuery)  }  def batQueryRows: Source[JDBCDataRow,NotUsed] = {    logger.info(s"running batQueryRows ...")    Source      .fromIterator => List(query,query2,query3).toIterator)      .via(stub.batQuery)  }  val dropSQL: String ="""      drop table members    """  val createSQL: String ="""    create table members (      id serial not null primary key,      name varchar(30) not null,      description varchar(1000),      birthday date,      created_at timestamp not null,      picture blob    )"""  val ctx = JDBCUpdate (    dbName = "h2",    sqlType = JDBCContext.SQL_EXEDDL,    statements = Seq(dropSQL,createSQL)  )  def createTbl: Source[JDBCResult,NotUsed] = {    logger.info(s"running createTbl ...")    Source      .single      .via(stub.runDDL)  }  val p1 = MemberRow( "Peter Chan",Some(JDBCDate(1967,5,17)),"new member1",None,_root_.com.google.protobuf.ByteString.EMPTY)  val p2 = MemberRow( "Alanda Wong",Some(JDBCDate(1980,11,10)),"new member2",None,_root_.com.google.protobuf.ByteString.EMPTY)  val p3 = MemberRow( "Kate Zhang",Some(JDBCDate(1969,8,13)),"new member3",None,_root_.com.google.protobuf.ByteString.EMPTY)  val p4 = MemberRow( "Tiger Chan",Some(JDBCDate(1962,5,1)),"new member4",None,_root_.com.google.protobuf.ByteString.EMPTY)  def insertRows: Source[JDBCResult,NotUsed] = {    logger.info(s"running insertRows ...")    Source      .fromIterator => List(p1,p2,p3,p4).toIterator)      .via(stub.insertRows)  }  val queryMember = JDBCQuery (    dbName = "h2",    statement = "select * from members"  )  def updateRows: Source[JDBCResult,NotUsed] = {    logger.info(s"running updateRows ...")    Source      .single(queryMember)      .via(stub.getMembers)      .via(stub.updateRows)  }  def updateBatches: Source[JDBCDataRow,NotUsed] = {    logger.info(s"running updateBatches ...")      Source        .fromIterator => List(query,query2,query3).toIterator)        .via(stub.batQuery)        .via(stub.updateBat)  }}object TestConversion extends App {  val orgval: Seq[Option[Any]] = Seq(Some(1),Some("id"),None,Some(2))  println(s"original value: ${orgval}")  val marval = marshal  println(s"marshal value: ${marval}")  val unmval = unmarshal[Seq[Option[Any]]]  println(s"marshal value: ${unmval}")  val m1 = MemberRow(name = "Peter")  val m2 = m1.update(    _.birthday.yyyy := 1989,    _.birthday.mm := 10,    _.birthday.dd := 3,    _.description := "a new member"  )}object QueryRows extends App {  implicit val system = ActorSystem("QueryRows")  implicit val mat = ActorMaterializer.create  val client = new JDBCStreamClient("localhost", 50051)  client.queryRows.runForeach { r => println }  scala.io.StdIn.readLine()  mat.shutdown()  system.terminate()}object BatQueryRows extends App {  implicit val system = ActorSystem("BatQueryRows")  implicit val mat = ActorMaterializer.create  val client = new JDBCStreamClient("localhost", 50051)  client.batQueryRows.runForeach  scala.io.StdIn.readLine()  mat.shutdown()  system.terminate()}object RunDDL extends App {  implicit val system = ActorSystem("RunDDL")  implicit val mat = ActorMaterializer.create  val client = new JDBCStreamClient("localhost", 50051)  client.createTbl.runForeach{r => println(unmarshal[Seq[Any]])}  scala.io.StdIn.readLine()  mat.shutdown()  system.terminate()}object InsertRows extends App {  implicit val system = ActorSystem("InsertRows")  implicit val mat = ActorMaterializer.create  val client = new JDBCStreamClient("localhost", 50051)  client.insertRows.runForeach { r => println(unmarshal[Vector[Long]]) }  scala.io.StdIn.readLine()  mat.shutdown()  system.terminate()}object UpdateRows extends App {  implicit val system = ActorSystem("UpdateRows")  implicit val mat = ActorMaterializer.create  val client = new JDBCStreamClient("localhost", 50051)  client.updateRows.runForeach{ r => println(unmarshal[Vector[Long]]) }  scala.io.StdIn.readLine()  mat.shutdown()  system.terminate()}object BatUpdates extends App {  implicit val system = ActorSystem("BatUpdates")  implicit val mat = ActorMaterializer.create  val client = new JDBCStreamClient("localhost", 50051)  client.updateBatches.runForeach  scala.io.StdIn.readLine()  mat.shutdown()  system.terminate()}

resources/application.conf 包括H2,MySQL,PostgreSQL

JDBC的基本组件和工作原理比较简单:核心组件就connection、statement(preparedstatement)两个对象。connection提供与数据库的连接以及数据处理的运行环境,statement是connection的一个属性,包含了可运行的SQL语句及对它们的各种运算方法。下面我们就用ScalikeJDBC官网上的例子来示范说明ScalikeJDBC的应用:

  简单来说:JDBC是一种开放标准的跨编程语言、跨数据库类型编程API。各类型数据库产品厂商都会按它的标准要求来提供针对自身产品的JDBC驱动程序。最主要的这是一套成熟的工具,在编程人员中使用很普及。既然我们的目标是开发一套标准的大数据系统集成API,那么采用JDBC系统数据接口可以沿用业内丰富的技术资源、覆盖更多类型用户的编程需要,以及降低使用门槛。对于scala编程语言来讲,ScalikeJDBC是一套最合适满足我们下面开发要求的工具库,因为它可以用最简单易用的方式来实现JDBC的基本功能。

服务函数runDDL返回消息类型JDBCResult: 包嵌一个Seq[Any]类型的返回值。下面是JDBCContext的protobuf message打包、还原使用方法示范,在服务端把JDBCUpdate拆解构建JDBCContext后调用jdbcExecuteDDL:

import java.sql.PreparedStatementimport scala.collection.generic.CanBuildFromimport scalikejdbc._  object JDBCContext {    type SQLTYPE = Int    val SQL_SELECT: Int = 0    val SQL_EXECUTE = 1    val SQL_UPDATE = 2    def returnColumnByIndex = Some    def returnColumnByName(col: String) = Some  }  case class JDBCContext(                          dbName: Symbol,                          statements: Seq[String],                          parameters: Seq[Seq[Any]] = Nil,                          fetchSize: Int = 100,                          queryTimeout: Option[Int] = None,                          queryTags: Seq[String] = Nil,                          sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT,                          batch: Boolean = false,                          returnGeneratedKey: Option[Any] = None,                          // no return: None, return by index: Some, by name: Some                          preAction: Option[PreparedStatement => Unit] = None,                          postAction: Option[PreparedStatement => Unit] = None)  object JDBCEngine {    import JDBCContext._    private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>      throw new IllegalStateException    }    def jdbcQueryResult[C[_] <: TraversableOnce[_], A](         ctx: JDBCContext, rowConverter: WrappedResultSet => A)(          implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {      ctx.sqlType match {        case SQL_SELECT => {          val params: Seq[Any] = ctx.parameters match {            case Nil => Nil            case p@_ => p.head          }          val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statements.head, params)(noExtractor("boom!"))          ctx.queryTimeout.foreach(rawSql.queryTimeout          ctx.queryTags.foreach(rawSql.tags          rawSql.fetchSize(ctx.fetchSize)          implicit val session = NamedAutoSession(ctx.dbName)          val sql: SQL[A, HasExtractor] = rawSql.map(rowConverter)          sql.collection.apply[C]()        }        case _ => throw new IllegalStateException("sqlType must be 'SQL_SELECT'!")      }    }  }

jdbc.proto

还需要提供noExtractor函数来符合SQLToCollectionImpl类型的参数款式要求:

message JDBCDate {  int32 yyyy = 1;  int32 mm   = 2;  int32 dd   = 3;}message JDBCTime {  int32 hh   = 1;  int32 mm   = 2;  int32 ss   = 3;  int32 nnn  = 4;}message JDBCDateTime {   JDBCDate date = 1;   JDBCTime time = 2;}message MemberRow {  string name = 1;  JDBCDate birthday = 2;  string description = 3;  JDBCDateTime created_at = 4;  bytes picture = 5;}service JDBCServices {  rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}  rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}  rpc runDDL(JDBCUpdate) returns (JDBCResult) {}  rpc insertRows(stream MemberRow) returns(JDBCResult) {}}
  private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>      throw new IllegalStateException    }

我们来测试用一下jdbcQueryResult:

import scalikejdbc._import JDBCEngine._import configdbs._import org.joda.time._object JDBCQueryDemo extends App {  ConfigDBsWithEnv("dev").setupAll()  val ctx = JDBCContext(    dbName = 'h2,    statements = Seq("select * from members where id = ?"),    parameters = Seq(Seq(2))  )  //data model  case class Member(                     id: Long,                     name: String,                     description: Option[String] = None,                     birthday: Option[LocalDate] = None,                     createdAt: DateTime)  //data row converter  val toMember = (rs: WrappedResultSet) => Member(    id = rs.long("id"),    name = rs.string("name"),    description = rs.stringOpt("description"),    birthday = rs.jodaLocalDateOpt("birthday"),    createdAt = rs.jodaDateTime("created_at")  )  val vecMember: Vector[Member] = jdbcQueryResult[Vector,Member](ctx,toMember)  println(s"members in vector: $vecMember")  val ctx1 = ctx.copy(dbName = 'mysql)  val names: List[String] = jdbcQueryResult[List,String](ctx1,{rs: WrappedResultSet => rs.string("name")})  println(s"selected name: $names")  val ctx2 = ctx1.copy(dbName = 'postgres)  val idname: List[(Long,String)] = jdbcQueryResult[List,(Long,String)](ctx2,{rs: WrappedResultSet => (rs.long("id"),rs.string("name"))})  println(s"selected id+name: $idname")}

runQuery和batQuery的函数款式是一样的。这就说明服务端提供的服务模式是一样的。在我们这个例子里它们都是对每个收到的JDBCQuery发还相关的数据流。实际上这两项服务的区别在客户方。下面是scalaPB产生的源代码:

Query类型:select/execute/update、单条/成批、前置/后置query、generateKey

class JDBCStreamingServices(implicit ec: ExecutionContextExecutor) extends JdbcGrpcAkkaStream.JDBCServices {  val logger = Logger.getLogger(classOf[JDBCStreamingServices].getName)  val toRow = (rs: WrappedResultSet) => JDBCDataRow(    year = rs.string("REPORTYEAR"),    state = rs.string("STATENAME"),    county = rs.string("COUNTYNAME"),    value = rs.string("VALUE")  )  override def runQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = {    logger.info("**** runQuery called on service side ***")    Flow[JDBCQuery]      .flatMapConcat { q =>        //unpack JDBCQuery and construct the context        val params: Seq[Any] = unmarshal[Seq[Any]](q.parameters)        logger.info(s"**** query parameters: ${params} ****")        val ctx = JDBCQueryContext[JDBCDataRow](          dbName = Symbol,          statement = q.statement,          parameters = params,          fetchSize = q.fetchSize.getOrElse(100),          autoCommit = q.autoCommit.getOrElse(false),          queryTimeout = q.queryTimeout        )        jdbcAkkaStream(ctx, toRow)      }  }}

下面就是本次示范的源代码:

用scalaPB编译后自动产生服务端和客户端框架代码(boilerplate-code)。我们需要实现具体的JDBC服务:

HikariConfig.scala HikariCP连接池实现

  def updateBatches: Source[JDBCDataRow,NotUsed] = {    logger.info(s"running updateBatches ...")      Source        .fromIterator => List(query,query2,query3).toIterator)        .via(stub.batQuery)        .via(stub.updateBat)  }
import java.sql.PreparedStatementimport scala.collection.generic.CanBuildFromimport scalikejdbc._  object JDBCContext {    type SQLTYPE = Int    val SQL_SELECT: Int = 0    val SQL_EXECUTE = 1    val SQL_UPDATE = 2    def returnColumnByIndex = Some    def returnColumnByName(col: String) = Some  }  case class JDBCContext(                          dbName: Symbol,                          statements: Seq[String],                          parameters: Seq[Seq[Any]] = Nil,                          fetchSize: Int = 100,                          queryTimeout: Option[Int] = None,                          queryTags: Seq[String] = Nil,                          sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT,                          batch: Boolean = false,                          returnGeneratedKey: Option[Any] = None,                          // no return: None, return by index: Some, by name: Some                          preAction: Option[PreparedStatement => Unit] = None,                          postAction: Option[PreparedStatement => Unit] = None)

重新考虑了一下,觉着把jdbc读写分开两个函数来实现更容易使用,因为这样比较符合编程模式和习性。所以最好把sqlType=SQL_SELECT类型SQL独立一个函数出来运算:

   def jdbcQueryResult[C[_] <: TraversableOnce[_], A](         ctx: JDBCContext, rowConverter: WrappedResultSet => A)(          implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {      ctx.sqlType match {        case SQL_SELECT => {          val params: Seq[Any] = ctx.parameters match {            case Nil => Nil            case p@_ => p.head          }          val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statements.head, params)(noExtractor(""))          ctx.queryTimeout.foreach(rawSql.queryTimeout          ctx.queryTags.foreach(rawSql.tags          rawSql.fetchSize(ctx.fetchSize)          implicit val session = NamedAutoSession(ctx.dbName)          val sql: SQL[A, HasExtractor] = rawSql.map(rowConverter)          sql.collection.apply[C]()        }        case _ => throw new IllegalStateException("sqlType must be 'SQL_SELECT'!")      }    }
package demo.grpc.jdbc.serverimport java.util.logging.Loggerimport akka.actor.ActorSystemimport akka.stream.ActorMaterializerimport io.grpc.Serverimport demo.grpc.jdbc.services._import io.grpc.ServerBuilderimport grpc.jdbc.services._class gRPCServer(server: Server) {  val logger: Logger = Logger.getLogger(classOf[gRPCServer].getName)  def start(): Unit = {    server.start()    logger.info(s"Server started, listening on ${server.getPort}")    sys.addShutdownHook {      // Use stderr here since the logger may has been reset by its JVM shutdown hook.      System.err.println("*** shutting down gRPC server since JVM is shutting down")      stop()      System.err.println("*** server shut down")    }    ()  }  def stop(): Unit = {    server.shutdown()  }  /**    * Await termination on the main thread since the grpc library uses daemon threads.    */  def blockUntilShutdown(): Unit = {    server.awaitTermination()  }}object JDBCServer extends App {  import sdp.jdbc.config._  implicit val system = ActorSystem("JDBCServer")  implicit val mat = ActorMaterializer.create  implicit val ec = system.dispatcher  ConfigDBsWithEnv("dev").setup('h2)  ConfigDBsWithEnv("dev").loadGlobalSettings()  val server = new gRPCServer(    ServerBuilder      .forPort(50051)      .addService(        JdbcGrpcAkkaStream.bindService(          new JDBCStreamingServices        )      ).build  server.start()  //  server.blockUntilShutdown()  scala.io.StdIn.readLine()  ConfigDBsWithEnv("dev").close('h2)  mat.shutdown()  system.terminate()}

JDBCEngine.scala jdbcQueryResult函数实现

以上数据类型JDBCDataRow和JDBCQuery分别对应JDBC-Streaming工具的流元素结构和JDBCQueryContext,如下:

package configdbsimport scala.collection.mutableimport scala.concurrent.duration.Durationimport scala.language.implicitConversionsimport com.typesafe.config._import java.util.concurrent.TimeUnitimport java.util.Propertiesimport scalikejdbc.config._import com.typesafe.config.Configimport com.zaxxer.hikari._import scalikejdbc.ConnectionPoolFactoryRepository/** Extension methods to make Typesafe Config easier to use */class ConfigExtensionMethods(val c: Config) extends AnyVal {  import scala.collection.JavaConverters._  def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath c.getBoolean else default  def getIntOr(path: String, default: => Int = 0) = if(c.hasPath c.getInt else default  def getStringOr(path: String, default: => String = null) = if(c.hasPath c.getString else default  def getConfigOr(path: String, default: => Config = ConfigFactory.empty = if(c.hasPath c.getConfig else default  def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath c.getDuration(path, TimeUnit.MILLISECONDS) else default  def getDurationOr(path: String, default: => Duration = Duration.Zero) =    if(c.hasPath Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default  def getPropertiesOr(path: String, default: => Properties = null): Properties =    if(c.hasPath new ConfigExtensionMethods(c.getConfig.toProperties else default  def toProperties: Properties = {    def toProps(m: mutable.Map[String, ConfigValue]): Properties = {      val props = new Properties(null)      m.foreach { case  =>        val v =          if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala)          else if(cv.unwrapped eq null) null          else cv.unwrapped.toString        if(v ne null) props.put      }      props    }    toProps(c.root.asScala)  }  def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath Some(c.getBoolean else None  def getIntOpt(path: String): Option[Int] = if(c.hasPath Some(c.getInt else None  def getStringOpt(path: String) = Option(getStringOr  def getPropertiesOpt(path: String) = Option(getPropertiesOr}object ConfigExtensionMethods {  @inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods}trait HikariConfigReader extends TypesafeConfigReader {  self: TypesafeConfig =>      // with TypesafeConfigReader => //NoEnvPrefix =>  import ConfigExtensionMethods.configExtensionMethods  def getFactoryName(dbName: Symbol): String = {    val c: Config = config.getConfig(envPrefix + "db." + dbName.name)    c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP)  }  def hikariCPConfig(dbName: Symbol): HikariConfig = {    val hconf = new HikariConfig()    val c: Config = config.getConfig(envPrefix + "db." + dbName.name)    // Connection settings    if (c.hasPath("dataSourceClass")) {      hconf.setDataSourceClassName(c.getString("dataSourceClass"))    } else {      Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _)    }    hconf.setJdbcUrl(c.getStringOr("url", null))    c.getStringOpt("user").foreach(hconf.setUsername)    c.getStringOpt("password").foreach(hconf.setPassword)    c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties)    // Pool configuration    hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000))    hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000))    hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000))    hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000))    hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0))    hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false))    c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery)    c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql)    val numThreads = c.getIntOr("numThreads", 20)    hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5))    hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads))    hconf.setPoolName(c.getStringOr("poolName", dbName.name))    hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false))    // Equivalent of ConnectionPreparer    hconf.setReadOnly(c.getBooleanOr("readOnly", false))    c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation)    hconf.setCatalog(c.getStringOr("catalog", null))    hconf  }}import scalikejdbc._trait ConfigDBs {  self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader =>  def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {    getFactoryName match {      case "hikaricp" => {        val hconf = hikariCPConfig        val hikariCPSource = new HikariDataSource        if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) {          Class.forName(hconf.getDriverClassName)        }        ConnectionPool.add(dbName, new DataSourceConnectionPool(hikariCPSource))      }      case _ => {        val JDBCSettings(url, user, password, driver) = readJDBCSettings        val cpSettings = readConnectionPoolSettings        if (driver != null && driver.trim.nonEmpty) {          Class.forName        }        ConnectionPool.add(dbName, url, user, password, cpSettings)      }    }  }  def setupAll(): Unit = {    loadGlobalSettings()    dbNames.foreach { dbName => setup(Symbol }  }  def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {    ConnectionPool.close  }  def closeAll(): Unit = {    ConnectionPool.closeAll  }}object ConfigDBs extends ConfigDBs  with TypesafeConfigReader  with StandardTypesafeConfig  with HikariConfigReadercase class ConfigDBsWithEnv(envValue: String) extends ConfigDBs  with TypesafeConfigReader  with StandardTypesafeConfig  with HikariConfigReader  with EnvPrefix {  override val env = Option}

那么如果从客户端发出一串的JDBCQuery又如何呢?这也是所谓的BiDi-Streaming模式,在jdbc.proto的服务描述如下:

JDBCQueryDemo.scala 功能测试代码

  def jdbcExecuteDDL(ctx: JDBCContext)(implicit ec: ExecutionContextExecutor): Future[String] = {    if (ctx.sqlType != SQL_EXEDDL) {      Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))    }    else {      Future {        NamedDB(ctx.dbName) localTx { implicit session =>          ctx.statements.foreach { stm =>            val ddl = new SQLExecution(statement = stm, parameters = Nil)(              before = WrappedResultSet => {})(              after = WrappedResultSet => {})            ddl.apply()          }          "SQL_EXEDDL executed succesfully."        }      }    }  }

下面就是JDBCContext类型定义:

JDBCServer.scala

这个函数的用户提供一个JDBCContext类型值,然后由jdbcRunSQL进行接下来的运算并返回结果。从这个角度分析,JDBCContext最起码需要提供下面的属性:

JDBCService.scala

本文由必威发布于必威-编程,转载请注明出处:包括其它的JDBC数据库节点都是这个JDBC服务的调用

相关阅读