Guest User

Untitled

a guest
Dec 9th, 2018
90
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.59 KB | None | 0 0
  1. @ConfigurationEmbedded(CassandraConfiguration.class)
  2. public class CassandraDataSource implements IDataSource, IConfigurationListener<CassandraConfiguration> {
  3. private static final Logger log = LoggerFactory.getLogger(CassandraDataSource.class);
  4.  
  5. private CassandraConfiguration cassandraConfiguration;
  6.  
  7. /**
  8. * Cassandra cluster.
  9. */
  10. protected Cluster mCluster = null;
  11. protected Keyspace mKeyspace = null;
  12.  
  13. @Override
  14. @Inject
  15. public void setConfiguration(final CassandraConfiguration configuration) {
  16. this.cassandraConfiguration = configuration;
  17.  
  18. // configure cassandra
  19. final CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator();
  20. cassandraHostConfigurator.setHosts(this.cassandraConfiguration.hosts);
  21. cassandraHostConfigurator.setAutoDiscoverHosts(this.cassandraConfiguration.autoDiscoveryHosts);
  22. cassandraHostConfigurator.setCassandraThriftSocketTimeout(this.cassandraConfiguration.socketTimeout);
  23. cassandraHostConfigurator.setMaxActive(this.cassandraConfiguration.maxClientsActive);
  24. cassandraHostConfigurator.setMaxIdle(this.cassandraConfiguration.maxIdle);
  25. cassandraHostConfigurator.setMaxWaitTimeWhenExhausted(this.cassandraConfiguration.maxWaitTimeWhenExhausted);
  26. if (this.cassandraConfiguration.retryDownedHostDelayInSeconds > 0) {
  27. cassandraHostConfigurator.setRetryDownedHosts(true);
  28. cassandraHostConfigurator.setRetryDownedHostsDelayInSeconds(this.cassandraConfiguration.retryDownedHostDelayInSeconds);
  29. }
  30. this.mCluster = HFactory.getOrCreateCluster(this.cassandraConfiguration.clusterName, cassandraHostConfigurator);
  31. // get consistency policy from configuration
  32. ConsistencyLevelPolicy consistencyLevelPolicy = null;
  33. if (this.cassandraConfiguration.consistency.equals(Consistency.CONSISTENT)) {
  34. consistencyLevelPolicy = new ConsistentConsistencyLevel();
  35. } else { // default all one
  36. consistencyLevelPolicy = new AllOneConsistencyLevelPolicy();
  37. }
  38. this.mKeyspace = HFactory.createKeyspace(this.cassandraConfiguration.keySpace, this.mCluster,
  39. consistencyLevelPolicy);
  40. }
  41.  
  42. @Override
  43. @Traced(type = TracingType.DAO, description = "Cassandra get")
  44. public <K extends Serializable, C, T extends IBaseType<K, C>> T get(final K aKey, final Class<T> clazz,
  45. final C... columns) {
  46. checkNotNull(aKey, "Key can't be null");
  47. checkNotNull(clazz, "Type clazz can't be null");
  48. checkArgument(clazz.isAssignableFrom(AbstractBaseType.class),
  49. "Can't persist this class in cassandra implementation");
  50.  
  51. final T baseType = newTypeInstance(clazz);
  52. checkNotNull(baseType, "Can't get an instance of type class");
  53. final AbstractBaseType<K, C> aBaseType = (AbstractBaseType<K, C>) baseType; // safe cast, check preconditions
  54.  
  55. final String columnName = aBaseType.getColumnFamilyName();
  56.  
  57. // build slice query
  58. final Serializer<K> keySerializer = getSerializer(aKey);
  59. final Serializer<C> columnSerializer = aBaseType.getColumnSerializer();
  60. final SliceQuery<K, C, String> q = createSliceQuery(this.mKeyspace, keySerializer, columnSerializer,
  61. StringSerializer.get());
  62. q.setColumnFamily(columnName).setKey(aKey);
  63. if (columns != null && columns.length > 0) {
  64. q.setColumnNames(columns);
  65. } else {
  66. q.setRange(null, null, false, this.cassandraConfiguration.maxColumns);
  67. }
  68. final QueryResult<ColumnSlice<C, String>> queryResult = q.execute();
  69.  
  70. // check & log errors
  71. if (queryResult == null) {
  72. log.warn("No query result from cassandra for key: {}", aKey);
  73. return null;
  74. }
  75. // build & return object from result
  76. final ColumnSlice<C, String> result = queryResult.get();
  77. if (result == null) {
  78. log.warn("No object from query result cassandra for key: {}", aKey);
  79. return null;
  80. }
  81. final T type = createInstance(clazz, result.getColumns(), aKey);
  82. if (type == null) {
  83. log.debug("Result was empty for {}", aKey.toString());
  84. }
  85. return type;
  86. }
  87.  
  88. /**
  89. * Creates a new instance based on a cassandra result. Can be overriden.
  90. * @param columns
  91. * @return
  92. */
  93. protected <K extends Serializable, C, T extends IBaseType<K, C>> T createInstance(final Class<T> clazz,
  94. final List<HColumn<C, String>> columns, final K aKey) {
  95. T type = null;
  96. if (columns != null && columns.size() > 0) {
  97. type = newTypeInstance(clazz);
  98. if (aKey != null) {
  99. type.setId(aKey);
  100. }
  101. for (final HColumn<C, String> column : columns) {
  102. type.setFieldValue(column.getName(), column.getValue());
  103. }
  104. }
  105. return type;
  106. }
  107.  
  108. private <T> T newTypeInstance(final Class<T> clazz) {
  109. try {
  110. return clazz.newInstance();
  111. } catch (final Exception e) {
  112. log.error("Can't instantiate {} class type for this DAO", clazz.getName(), e);
  113. }
  114. return null;
  115. }
  116.  
  117. @SuppressWarnings({ "unchecked", "rawtypes" })
  118. private <T> Serializer<T> getSerializer(final T aType) {
  119. if (aType instanceof AComposite) {
  120. return new CompositeSerializer(aType.getClass());
  121. }
  122. return SerializerTypeInferer.getSerializer(aType);
  123. }
  124.  
  125. }
Add Comment
Please, Sign In to add comment