1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.openjpa.slice.jdbc;
20
21 import java.sql.Connection;
22 import java.sql.SQLException;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Properties;
28 import java.util.concurrent.ExecutorService;
29
30 import javax.sql.DataSource;
31 import javax.sql.XADataSource;
32
33 import org.apache.openjpa.conf.OpenJPAConfiguration;
34 import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
35 import org.apache.openjpa.jdbc.conf.JDBCConfigurationImpl;
36 import org.apache.openjpa.jdbc.schema.DataSourceFactory;
37 import org.apache.openjpa.lib.conf.BooleanValue;
38 import org.apache.openjpa.lib.conf.ConfigurationProvider;
39 import org.apache.openjpa.lib.conf.PluginValue;
40 import org.apache.openjpa.lib.conf.StringListValue;
41 import org.apache.openjpa.lib.conf.StringValue;
42 import org.apache.openjpa.lib.jdbc.DecoratingDataSource;
43 import org.apache.openjpa.lib.jdbc.DelegatingDataSource;
44 import org.apache.openjpa.lib.log.Log;
45 import org.apache.openjpa.lib.log.LogFactory;
46 import org.apache.openjpa.lib.log.LogFactoryImpl;
47 import org.apache.openjpa.lib.util.Localizer;
48 import org.apache.openjpa.slice.DistributedBrokerImpl;
49 import org.apache.openjpa.slice.DistributionPolicy;
50 import org.apache.openjpa.slice.ExecutorServiceValue;
51 import org.apache.openjpa.slice.ProductDerivation;
52 import org.apache.openjpa.slice.Slice;
53 import org.apache.openjpa.util.UserException;
54
55 /**
56 * Implements a distributed configuration of JDBCStoreManagers.
57 * The original configuration properties are analyzed to create a set of
58 * Slice specific properties with defaulting rules.
59 *
60 * @author Pinaki Poddar
61 *
62 */
63 public class DistributedJDBCConfigurationImpl extends JDBCConfigurationImpl
64 implements DistributedJDBCConfiguration {
65
66 private final List<Slice> _slices = new ArrayList<Slice>();
67 private List<String> _activeSliceNames = new ArrayList<String>();
68 private Slice _master;
69
70 private DecoratingDataSource virtualDataSource;
71
72 protected BooleanValue lenientPlugin;
73 protected StringValue masterPlugin;
74 protected StringListValue namesPlugin;
75 protected ExecutorServiceValue executorServicePlugin;
76 protected PluginValue distributionPolicyPlugin;
77
78 public static final String DOT = ".";
79 public static final String REGEX_DOT = "\\.";
80 public static final String PREFIX_SLICE = ProductDerivation.PREFIX_SLICE + DOT;
81 public static final String PREFIX_OPENJPA = "openjpa.";
82 private static Localizer _loc =
83 Localizer.forPackage(DistributedJDBCConfigurationImpl.class);
84
85 /**
86 * Configure itself as well as underlying slices.
87 *
88 */
89 public DistributedJDBCConfigurationImpl(ConfigurationProvider cp) {
90 super(true, false);
91 Map p = cp.getProperties();
92 String pUnit = getPersistenceUnitName(p);
93 setDiagnosticContext(pUnit);
94
95 brokerPlugin.setString(DistributedBrokerImpl.class.getName());
96
97 distributionPolicyPlugin = addPlugin("DistributionPolicy", true);
98 distributionPolicyPlugin.setDynamic(true);
99
100 lenientPlugin = addBoolean("Lenient");
101
102 masterPlugin = addString("Master");
103
104 namesPlugin = addStringList("Names");
105
106 executorServicePlugin = new ExecutorServiceValue();
107 addValue(executorServicePlugin);
108
109 setSlices(p);
110 }
111
112 private String getPersistenceUnitName(Map p) {
113 Object unit = p.get(PREFIX_OPENJPA+id.getProperty());
114 return (unit == null) ? "?" : unit.toString();
115 }
116
117 private void setDiagnosticContext(String unit) {
118 LogFactory logFactory = getLogFactory();
119 if (logFactory instanceof LogFactoryImpl) {
120 ((LogFactoryImpl)logFactory).setDiagnosticContext(unit);
121 }
122 }
123
124 /**
125 * Gets the name of the active slices.
126 */
127 public List<String> getActiveSliceNames() {
128 if (_activeSliceNames.isEmpty()) {
129 for (Slice slice:_slices)
130 if (slice.isActive())
131 _activeSliceNames.add(slice.getName());
132 }
133 return _activeSliceNames;
134 }
135
136 /**
137 * Gets the name of the available slices.
138 */
139 public List<String> getAvailableSliceNames() {
140 List<String> result = new ArrayList<String>();
141 for (Slice slice:_slices)
142 result.add(slice.getName());
143 return result;
144 }
145
146 /**
147 * Gets the slices of given status. Null returns all irrespective of status.
148 */
149 public List<Slice> getSlices(Slice.Status...statuses) {
150 if (statuses == null)
151 return Collections.unmodifiableList(_slices);
152 List<Slice> result = new ArrayList<Slice>();
153 for (Slice slice:_slices) {
154 for (Slice.Status status:statuses)
155 if (slice.getStatus().equals(status))
156 result.add(slice);
157 }
158 return result;
159 }
160
161 /**
162 * Gets the master slice.
163 */
164 public Slice getMaster() {
165 return _master;
166 }
167
168 /**
169 * Get the configuration for given slice.
170 */
171 public Slice getSlice(String name) {
172 for (Slice slice:_slices)
173 if (slice.getName().equals(name))
174 return slice;
175 throw new UserException(_loc.get("slice-not-found", name,
176 getActiveSliceNames()));
177 }
178
179 public DistributionPolicy getDistributionPolicyInstance() {
180 if (distributionPolicyPlugin.get() == null) {
181 distributionPolicyPlugin.instantiate(DistributionPolicy.class,
182 this, true);
183 }
184 return (DistributionPolicy) distributionPolicyPlugin.get();
185 }
186
187 public void setDistributionPolicyInstance(String val) {
188 distributionPolicyPlugin.set(val);
189 }
190
191 public Object getConnectionFactory() {
192 if (virtualDataSource == null) {
193 DistributedDataSource ds = createDistributedDataStore();
194 virtualDataSource =
195 DataSourceFactory.installDBDictionary(
196 getDBDictionaryInstance(), ds, this, false);
197 }
198 return virtualDataSource;
199 }
200
201 /**
202 * Create a virtual DistributedDataSource as a composite of individual
203 * slices as per configuration, optionally ignoring slices that can not be
204 * connected.
205 */
206 private DistributedDataSource createDistributedDataStore() {
207 List<DataSource> dataSources = new ArrayList<DataSource>();
208 boolean isLenient = lenientPlugin.get();
209 boolean isXA = true;
210 for (Slice slice : _slices) {
211 JDBCConfiguration conf = (JDBCConfiguration)slice.getConfiguration();
212 Log log = conf.getConfigurationLog();
213 String url = getConnectionInfo(conf);
214 if (log.isInfoEnabled())
215 log.info(_loc.get("slice-connect", slice, url));
216 try {
217 DataSource ds = DataSourceFactory.newDataSource(conf, false);
218 DecoratingDataSource dds = new DecoratingDataSource(ds);
219 ds = DataSourceFactory.installDBDictionary(
220 conf.getDBDictionaryInstance(), dds, conf, false);
221 if (verifyDataSource(slice, ds)) {
222 dataSources.add(ds);
223 isXA &= isXACompliant(ds);
224 }
225 } catch (Throwable ex) {
226 handleBadConnection(isLenient, slice, ex);
227 }
228 }
229 if (dataSources.isEmpty())
230 throw new UserException(_loc.get("no-slice"));
231 DistributedDataSource result = new DistributedDataSource(dataSources);
232 return result;
233 }
234
235 String getConnectionInfo(OpenJPAConfiguration conf) {
236 String result = conf.getConnectionURL();
237 if (result == null) {
238 result = conf.getConnectionDriverName();
239 String props = conf.getConnectionProperties();
240 if (props != null)
241 result += "(" + props + ")";
242 }
243 return result;
244 }
245
246 boolean isXACompliant(DataSource ds) {
247 if (ds instanceof DelegatingDataSource)
248 return ((DelegatingDataSource) ds).getInnermostDelegate()
249 instanceof XADataSource;
250 return ds instanceof XADataSource;
251 }
252
253 /**
254 * Verify that a connection can be established to the given slice. If
255 * connection can not be established then slice is set to INACTIVE state.
256 */
257 private boolean verifyDataSource(Slice slice, DataSource ds) {
258 Connection con = null;
259 try {
260 con = ds.getConnection();
261 slice.setStatus(Slice.Status.ACTIVE);
262 if (con == null) {
263 slice.setStatus(Slice.Status.INACTIVE);
264 return false;
265 }
266 return true;
267 } catch (SQLException ex) {
268 slice.setStatus(Slice.Status.INACTIVE);
269 return false;
270 } finally {
271 if (con != null)
272 try {
273 con.close();
274 } catch (SQLException ex) {
275 // ignore
276 }
277 }
278 }
279
280 /**
281 * Either throw a user exception or add the configuration to the given list,
282 * based on <code>isLenient</code>.
283 */
284 private void handleBadConnection(boolean isLenient, Slice slice,
285 Throwable ex) {
286 OpenJPAConfiguration conf = slice.getConfiguration();
287 String url = conf.getConnectionURL();
288 Log log = getLog(LOG_RUNTIME);
289 if (isLenient) {
290 if (ex != null) {
291 log.warn(_loc.get("slice-connect-known-warn", slice, url, ex
292 .getCause()));
293 } else {
294 log.warn(_loc.get("slice-connect-warn", slice, url));
295 }
296 } else if (ex != null) {
297 throw new UserException(_loc.get("slice-connect-known-error",
298 slice, url, ex), ex.getCause());
299 } else {
300 throw new UserException(_loc.get("slice-connect-error", slice, url));
301 }
302 }
303
304 /**
305 * Create individual slices with configurations from the given properties.
306 */
307 void setSlices(Map original) {
308 List<String> sliceNames = findSlices(original);
309 Log log = getConfigurationLog();
310 if (sliceNames.isEmpty()) {
311 throw new UserException(_loc.get("slice-none-configured"));
312 }
313 String unit = getPersistenceUnitName(original);
314 for (String key : sliceNames) {
315 JDBCConfiguration child = new JDBCConfigurationImpl();
316 child.fromProperties(createSliceProperties(original, key));
317 child.setId(unit+DOT+key);
318 Slice slice = new Slice(key, child);
319 _slices.add(slice);
320 if (log.isTraceEnabled())
321 log.trace(_loc.get("slice-configuration", key, child
322 .toProperties(false)));
323 }
324 setMaster(original);
325 }
326
327 /**
328 * Finds the slices. If <code>openjpa.slice.Names</code> property is
329 * specified then the slices are ordered in the way they are listed.
330 * Otherwise scans all available slices by looking for property of the form
331 * <code>openjpa.slice.XYZ.abc</code> where <code>XYZ</code> is the slice
332 * identifier and <code>abc</code> is any openjpa property name. The slices
333 * are then ordered alphabetically by their identifier.
334 */
335 private List<String> findSlices(Map p) {
336 List<String> sliceNames = new ArrayList<String>();
337
338 Log log = getConfigurationLog();
339 String key = PREFIX_SLICE + namesPlugin.getProperty();
340 boolean explicit = p.containsKey(key);
341 if (explicit) {
342 String[] values = p.get(key).toString().split("\\,");
343 for (String name:values)
344 if (!sliceNames.contains(name.trim()))
345 sliceNames.add(name.trim());
346 } else {
347 if (log.isWarnEnabled())
348 log.warn(_loc.get("no-slice-names", key));
349 sliceNames = scanForSliceNames(p);
350 Collections.sort(sliceNames);
351 }
352 if (log.isInfoEnabled()) {
353 log.info(_loc.get("slice-available", sliceNames));
354 }
355 return sliceNames;
356 }
357
358 /**
359 * Scan the given map for slice-specific property of the form
360 * <code>openjpa.slice.XYZ.abc</code> (while ignoring
361 * <code>openjpa.slice.XYZ</code> as they refer to slice-wide property)
362 * to determine the names of all available slices.
363 */
364 private List<String> scanForSliceNames(Map p) {
365 List<String> sliceNames = new ArrayList<String>();
366 for (Object o : p.keySet()) {
367 String key = o.toString();
368 if (key.startsWith(PREFIX_SLICE) && getPartCount(key) > 3) {
369 String sliceName =
370 chopTail(chopHead(o.toString(), PREFIX_SLICE), DOT);
371 if (!sliceNames.contains(sliceName))
372 sliceNames.add(sliceName);
373 }
374 }
375 return sliceNames;
376 }
377
378 private static int getPartCount(String s) {
379 return (s == null) ? 0 : s.split(REGEX_DOT).length;
380 }
381
382 private static String chopHead(String s, String head) {
383 if (s.startsWith(head))
384 return s.substring(head.length());
385 return s;
386 }
387
388 private static String chopTail(String s, String tail) {
389 int i = s.lastIndexOf(tail);
390 if (i == -1)
391 return s;
392 return s.substring(0, i);
393 }
394
395 /**
396 * Creates given <code>slice</code> specific configuration properties from
397 * given <code>original</code> key-value map. The rules are
398 * <LI> if key begins with <code>"slice.XXX."</code> where
399 * <code>XXX</code> is the given slice name, then replace
400 * <code>"slice.XXX.</code> with <code>openjpa.</code>.
401 * <LI>if key begins with <code>"slice."</code> but not with
402 * <code>"slice.XXX."</code>, the ignore i.e. any property of other
403 * slices or global slice property e.g.
404 * <code>slice.DistributionPolicy</code>
405 * <code>if key starts with <code>"openjpa."</code> and a corresponding
406 * <code>"slice.XXX."</code> property does not exist, then use this as
407 * default property
408 * <code>property with any other prefix is simply copied
409 *
410 */
411 Map createSliceProperties(Map original, String slice) {
412 Map result = new Properties();
413 String prefix = PREFIX_SLICE + slice + DOT;
414 for (Object o : original.keySet()) {
415 String key = o.toString();
416 if (key.startsWith(prefix)) {
417 String newKey = PREFIX_OPENJPA + key.substring(prefix.length());
418 result.put(newKey, original.get(o));
419 } else if (key.startsWith(PREFIX_SLICE)) {
420 // ignore keys that are in 'slice.' namespace but not this slice
421 } else if (key.startsWith(PREFIX_OPENJPA)) {
422 String newKey = prefix + key.substring(PREFIX_OPENJPA.length());
423 if (!original.containsKey(newKey))
424 result.put(key, original.get(o));
425 } else { // keys that are neither "openjpa" nor "slice" namespace
426 result.put(key, original.get(o));
427 }
428 }
429 return result;
430 }
431
432 /**
433 * Determine the master slice.
434 */
435 private void setMaster(Map original) {
436 String key = PREFIX_SLICE + masterPlugin.getProperty();
437 Object masterSlice = original.get(key);
438 Log log = getConfigurationLog();
439 List<Slice> activeSlices = getSlices(null);
440 if (masterSlice == null) {
441 _master = activeSlices.get(0);
442 if (log.isWarnEnabled())
443 log.warn(_loc.get("no-master-slice", key, _master));
444 return;
445 }
446 for (Slice slice:activeSlices)
447 if (slice.getName().equals(masterSlice))
448 _master = slice;
449 if (_master == null) {
450 _master = activeSlices.get(0);
451 }
452 }
453
454 public String getExecutorService() {
455 return executorServicePlugin.getString();
456 }
457
458 public void setExecutorService(ExecutorService txnManager) {
459 executorServicePlugin.set(txnManager);
460 }
461
462 public ExecutorService getExecutorServiceInstance() {
463 if (executorServicePlugin.get() == null) {
464 executorServicePlugin.instantiate(ExecutorService.class, this);
465 }
466 return (ExecutorService) executorServicePlugin.get();
467 }
468 }