Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @ConfigurationEmbedded(CassandraConfiguration.class)
- public class CassandraDataSource implements IDataSource, IConfigurationListener<CassandraConfiguration> {
- private static final Logger log = LoggerFactory.getLogger(CassandraDataSource.class);
- private CassandraConfiguration cassandraConfiguration;
- /**
- * Cassandra cluster.
- */
- protected Cluster mCluster = null;
- protected Keyspace mKeyspace = null;
- @Override
- @Inject
- public void setConfiguration(final CassandraConfiguration configuration) {
- this.cassandraConfiguration = configuration;
- // configure cassandra
- final CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator();
- cassandraHostConfigurator.setHosts(this.cassandraConfiguration.hosts);
- cassandraHostConfigurator.setAutoDiscoverHosts(this.cassandraConfiguration.autoDiscoveryHosts);
- cassandraHostConfigurator.setCassandraThriftSocketTimeout(this.cassandraConfiguration.socketTimeout);
- cassandraHostConfigurator.setMaxActive(this.cassandraConfiguration.maxClientsActive);
- cassandraHostConfigurator.setMaxIdle(this.cassandraConfiguration.maxIdle);
- cassandraHostConfigurator.setMaxWaitTimeWhenExhausted(this.cassandraConfiguration.maxWaitTimeWhenExhausted);
- if (this.cassandraConfiguration.retryDownedHostDelayInSeconds > 0) {
- cassandraHostConfigurator.setRetryDownedHosts(true);
- cassandraHostConfigurator.setRetryDownedHostsDelayInSeconds(this.cassandraConfiguration.retryDownedHostDelayInSeconds);
- }
- this.mCluster = HFactory.getOrCreateCluster(this.cassandraConfiguration.clusterName, cassandraHostConfigurator);
- // get consistency policy from configuration
- ConsistencyLevelPolicy consistencyLevelPolicy = null;
- if (this.cassandraConfiguration.consistency.equals(Consistency.CONSISTENT)) {
- consistencyLevelPolicy = new ConsistentConsistencyLevel();
- } else { // default all one
- consistencyLevelPolicy = new AllOneConsistencyLevelPolicy();
- }
- this.mKeyspace = HFactory.createKeyspace(this.cassandraConfiguration.keySpace, this.mCluster,
- consistencyLevelPolicy);
- }
- @Override
- @Traced(type = TracingType.DAO, description = "Cassandra get")
- public <K extends Serializable, C, T extends IBaseType<K, C>> T get(final K aKey, final Class<T> clazz,
- final C... columns) {
- checkNotNull(aKey, "Key can't be null");
- checkNotNull(clazz, "Type clazz can't be null");
- checkArgument(clazz.isAssignableFrom(AbstractBaseType.class),
- "Can't persist this class in cassandra implementation");
- final T baseType = newTypeInstance(clazz);
- checkNotNull(baseType, "Can't get an instance of type class");
- final AbstractBaseType<K, C> aBaseType = (AbstractBaseType<K, C>) baseType; // safe cast, check preconditions
- final String columnName = aBaseType.getColumnFamilyName();
- // build slice query
- final Serializer<K> keySerializer = getSerializer(aKey);
- final Serializer<C> columnSerializer = aBaseType.getColumnSerializer();
- final SliceQuery<K, C, String> q = createSliceQuery(this.mKeyspace, keySerializer, columnSerializer,
- StringSerializer.get());
- q.setColumnFamily(columnName).setKey(aKey);
- if (columns != null && columns.length > 0) {
- q.setColumnNames(columns);
- } else {
- q.setRange(null, null, false, this.cassandraConfiguration.maxColumns);
- }
- final QueryResult<ColumnSlice<C, String>> queryResult = q.execute();
- // check & log errors
- if (queryResult == null) {
- log.warn("No query result from cassandra for key: {}", aKey);
- return null;
- }
- // build & return object from result
- final ColumnSlice<C, String> result = queryResult.get();
- if (result == null) {
- log.warn("No object from query result cassandra for key: {}", aKey);
- return null;
- }
- final T type = createInstance(clazz, result.getColumns(), aKey);
- if (type == null) {
- log.debug("Result was empty for {}", aKey.toString());
- }
- return type;
- }
- /**
- * Creates a new instance based on a cassandra result. Can be overriden.
- * @param columns
- * @return
- */
- protected <K extends Serializable, C, T extends IBaseType<K, C>> T createInstance(final Class<T> clazz,
- final List<HColumn<C, String>> columns, final K aKey) {
- T type = null;
- if (columns != null && columns.size() > 0) {
- type = newTypeInstance(clazz);
- if (aKey != null) {
- type.setId(aKey);
- }
- for (final HColumn<C, String> column : columns) {
- type.setFieldValue(column.getName(), column.getValue());
- }
- }
- return type;
- }
- private <T> T newTypeInstance(final Class<T> clazz) {
- try {
- return clazz.newInstance();
- } catch (final Exception e) {
- log.error("Can't instantiate {} class type for this DAO", clazz.getName(), e);
- }
- return null;
- }
- @SuppressWarnings({ "unchecked", "rawtypes" })
- private <T> Serializer<T> getSerializer(final T aType) {
- if (aType instanceof AComposite) {
- return new CompositeSerializer(aType.getClass());
- }
- return SerializerTypeInferer.getSerializer(aType);
- }
- }
Add Comment
Please, Sign In to add comment